import uasyncio import settings from umqtt.robust import MQTTClient import network import machine import gc import os import utime def String(x): if isinstance(x, bytes): return x.decode() return str(x) def raw(x): return x def replace_file(path): def do_replace(content): with open(path, 'wb') as f: length = f.write(content) return length return do_replace class MqttHandler(object): class Subscription(object): def __init__(self, handler, topic, cls, cb=None): self.topic = topic self.value = None self.cls = cls self.cb = cb self.handler = handler async def set_value(self, msg): val = self.cls(msg) if self.cb is None: self.value = val else: self.cb(val) async def wait_for_value(self, interval_ms=10): while self.value is None: await uasyncio.sleep_ms(interval_ms) return self.value def unsubscribe(self): self.handler.subscriptions.pop(self.topic) def __init__(self, loop, host): ip = network.WLAN(network.STA_IF).ifconfig()[0] self.client = MQTTClient(ip, host) self.client.connect() self.client.set_callback(self.cb) self.subscriptions = {} self.ev_loop = loop self.ev_loop.create_task(self.loop()) self.publish = self.client.publish def cb(self, topic, msg): if topic in self.subscriptions: self.ev_loop.create_task(self.subscriptions[topic].set_value(msg)) def subscribe(self, topic, cls=String, cb=None, local=False): if local: topic = self.local_topic(topic) if not isinstance(topic, bytes): topic = topic.encode() if topic in self.subscriptions: raise Exception('topic {} already subscribed'.format(topic)) sub = MqttHandler.Subscription(self, topic, cls=cls, cb=cb) self.subscriptions[topic] = sub self.client.subscribe(topic) return sub def local_topic(self, topic): return '{}/{}'.format(settings.device_topic, String(topic)) def register_command(self, cmd, cb, cls=String, local=True): self.subscribe(cmd, cls=cls, cb=cb, local=local) async def loop(self, interval_ms=10): while True: for i in range(1000/interval_ms): self.client.check_msg() await uasyncio.sleep_ms(interval_ms) gc.collect() def log(self, *args): if settings.LOG_TOPIC is not None: log_str = ' '.join([String(a) for a in args]) self.client.publish(self.local_topic(settings.LOG_TOPIC), log_str) async def update_app(mqtt, name): current_version = None current_app = None app_file = 'app.mpy' try: gc.collect() import app current_app = app.__app__ current_version = app.__version__ except: pass try: app_ns = '{}/apps/{}/{{}}'.format(settings.MQTT_PREFIX, name) version_sub = mqtt.subscribe(app_ns.format('version'), cls=int) latest = await version_sub.wait_for_value() version_sub.unsubscribe() if current_app != name or current_version != latest: mqtt.log('Current: {} v{}, want {} v{}'.format( current_app, current_version, name, latest)) try: # if the wrong app is installed, we remove it and reboot to # avoid running out of memory during the update os.remove('app.mpy') return True except Exception: pass gc.collect() sub = mqtt.subscribe(app_ns.format('content'), cls=replace_file(app_file)) app_size = await sub.wait_for_value() sub.unsubscribe() mqtt.log('Installed {} v{}, {}b'.format(name, latest, app_size)) install_dependencies(mqtt) return True except Exception as e: mqtt.log('Update error:', e) pass return False def install_dependencies(mqtt): import app try: deps = app.DEPENDENCIES mqtt.log('Installing {} dependencies'.format(len(deps))) for d in deps: gc.collect() if len(d) == 1: # upip package mqtt.log(' - upip package {}'.format(d)) import upip upip.install(d) elif len(d) == 2: # file path, source URL path, url = d mqtt.log(' - dependency {} from {}'.format(path, url)) gc.collect() import upip f = upip.url_open(url) buf = bytearray(128) view = memoryview(buf) with open(path, 'wb') as outfile: while True: n_read = f.readinto(buf) if n_read == 0: break outfile.write(view[:n_read]) gc.collect() except Exception as e: mqtt.log('Error installing dependencies: ', e) def deepsleep(duration_ms): utime.sleep_ms(200) # configure RTC.ALARM0 to be able to wake the device rtc = machine.RTC() rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP) # set RTC.ALARM0 to fire after the specified time rtc.alarm(rtc.ALARM0, duration_ms) machine.deepsleep() async def wait_for_connection(timeout=5): sta = network.WLAN(network.STA_IF) while not sta.isconnected(): await uasyncio.sleep_ms(100) def get_device_info(mqtt): sta = network.WLAN(network.STA_IF) mac = ':'.join(['{:02x}'.format(x) for x in sta.config('mac')]) ip = sta.ifconfig()[0] settings.device_topic = '{}/{}'.format(settings.MQTT_PREFIX, settings.DEVICE_TOPIC_FORMAT.format(ip=ip, mac=mac)) topic_fmt = '{}/{{}}'.format(settings.device_topic.strip('/')) dev_name = mqtt.subscribe(topic_fmt.format('config/name')) dev_app = mqtt.subscribe(topic_fmt.format('config/app')) return dev_name, dev_app async def main(loop): try: await uasyncio.wait_for(wait_for_connection(), settings.NETWORK_TIMEOUT) except Exception: raise Exception('Network error') mqtt = MqttHandler(loop, settings.MQTT_BROKER) dev_name_sub, app_name_sub = get_device_info(mqtt) dev_name = await dev_name_sub.wait_for_value() app_name = await app_name_sub.wait_for_value() dev_name_sub.unsubscribe() app_name_sub.unsubscribe() mqtt.log('Dev. info:', dev_name, app_name) # commands must be registered after get_device_info() since the device's # base topic is not known before. mqtt.register_command('reboot', lambda x: deepsleep(1000)) try: reboot = await uasyncio.wait_for(update_app(mqtt, app_name), 5) if reboot: deepsleep(100) except uasyncio.TimeoutError: mqtt.log('Error updating app: timeout') except Exception as e: mqtt.log('Error updating app:', e) # if no app is found, wake up every ten seconds to retry the update: sleep_duration_ms = 10000 # run app try: gc.collect() import app gc.collect() mqtt.log('Heap after import:', gc.mem_free()) mqtt.publish(mqtt.local_topic('status/app'), app.__app__) mqtt.publish(mqtt.local_topic('status/version'), str(app.__version__)) sleep_duration_ms = await app.run(mqtt, name=dev_name) except Exception as e: mqtt.log('Error running app:', e) mqtt.log('Done. Sleeping for {}ms'.format(sleep_duration_ms)) deepsleep(sleep_duration_ms) def run(): try: loop = uasyncio.get_event_loop() loop.run_until_complete(main(loop)) except Exception as e: print('Error:', e) deepsleep(1000)