commit a03a46eddbe4341a79f576be6e47d64af3934823 Author: Lukas Schuller Date: Fri Feb 22 15:07:31 2019 +0100 Initial version of app loader diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4815d20 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.mpy diff --git a/src/app_loader.py b/src/app_loader.py new file mode 100644 index 0000000..dc5da6f --- /dev/null +++ b/src/app_loader.py @@ -0,0 +1,235 @@ +import uasyncio +import settings +from umqtt.robust import MQTTClient +import network +import machine +import gc +import os +import utime + +def String(x): + if isinstance(x, bytes): + return x.decode() + return str(x) + + +def raw(x): + return x + + +def replace_file(path): + def do_replace(content): + with open(path, 'wb') as f: + length = f.write(content) + return length + return do_replace + + +class MqttHandler(object): + + class Subscription(object): + def __init__(self, handler, topic, cls, cb=None): + self.topic = topic + self.value = None + self.cls = cls + self.cb = cb + self.handler = handler + + async def set_value(self, msg): + val = self.cls(msg) + if self.cb is None: + self.value = val + else: + self.cb(val) + + async def wait_for_value(self, interval_ms=10): + while self.value is None: + await uasyncio.sleep_ms(interval_ms) + return self.value + + def unsubscribe(self): + self.handler.subscriptions.pop(self.topic) + + def __init__(self, loop, host): + ip = network.WLAN(network.STA_IF).ifconfig()[0] + self.client = MQTTClient(ip, host) + self.client.connect() + self.client.set_callback(self.cb) + self.subscriptions = {} + self.ev_loop = loop + self.ev_loop.create_task(self.loop()) + self.publish = self.client.publish + + def cb(self, topic, msg): + if topic in self.subscriptions: + self.ev_loop.create_task(self.subscriptions[topic].set_value(msg)) + + def subscribe(self, topic, cls=String, cb=None, local=False): + if local: + topic = self.local_topic(topic) + + if not isinstance(topic, bytes): + topic = topic.encode() + + if topic in self.subscriptions: + raise Exception('topic {} already subscribed'.format(topic)) + + sub = MqttHandler.Subscription(self, topic, cls=cls, cb=cb) + self.subscriptions[topic] = sub + self.client.subscribe(topic) + return sub + + def local_topic(self, topic): + return '{}/{}'.format(settings.device_topic, String(topic)) + + def register_command(self, cmd, cb, cls=String, local=True): + self.subscribe(cmd, cls=cls, cb=cb, local=local) + + async def loop(self, interval_ms=10): + while True: + for i in range(1000/interval_ms): + self.client.check_msg() + await uasyncio.sleep_ms(interval_ms) + gc.collect() + + def log(self, *args): + if settings.LOG_TOPIC is not None: + log_str = ' '.join([String(a) for a in args]) + self.client.publish(self.local_topic(settings.LOG_TOPIC), log_str) + + +async def update_app(mqtt, name): + current_version = None + current_app = None + app_file = 'app.mpy' + try: + gc.collect() + import app + current_app = app.__app__ + current_version = app.__version__ + except: + pass + + try: + app_ns = '{}/apps/{}/{{}}'.format(settings.MQTT_PREFIX, name) + version_sub = mqtt.subscribe(app_ns.format('version'), + cls=int) + latest = await version_sub.wait_for_value() + version_sub.unsubscribe() + + if current_app != name or current_version != latest: + mqtt.log('installed: {} v{}, want {} v{}'.format( + current_app, current_version, name, latest)) + try: + # if the wrong app is installed, we remove it and reboot to + # avoid running out of memory during the update + os.remove('app.mpy') + return True + except Exception: + pass + + gc.collect() + sub = mqtt.subscribe(app_ns.format('content'), + cls=replace_file(app_file)) + app_size = await sub.wait_for_value() + sub.unsubscribe() + mqtt.log('Installed {} v{}, {}b'.format(name, latest, app_size)) + return True + except Exception as e: + mqtt.log('Update error:', e) + pass + + return False + + +def deepsleep(duration_ms): + utime.sleep_ms(200) + # configure RTC.ALARM0 to be able to wake the device + rtc = machine.RTC() + rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP) + # set RTC.ALARM0 to fire after the specified time + rtc.alarm(rtc.ALARM0, duration_ms) + machine.deepsleep() + + +async def wait_for_connection(timeout=5): + sta = network.WLAN(network.STA_IF) + + while not sta.isconnected(): + await uasyncio.sleep_ms(100) + + +def get_device_info(mqtt): + sta = network.WLAN(network.STA_IF) + mac = ':'.join(['{:02x}'.format(x) for x in sta.config('mac')]) + ip = sta.ifconfig()[0] + settings.device_topic = '{}/{}'.format(settings.MQTT_PREFIX, + settings.DEVICE_TOPIC_FORMAT.format(ip=ip, mac=mac)) + topic_fmt = '{}/{{}}'.format(settings.device_topic.strip('/')) + + dev_name = mqtt.subscribe(topic_fmt.format('config/name')) + dev_app = mqtt.subscribe(topic_fmt.format('config/app')) + + return dev_name, dev_app + + +async def main(loop): + try: + await uasyncio.wait_for(wait_for_connection(), + settings.NETWORK_TIMEOUT) + except Exception: + raise Exception('Network error') + + mqtt = MqttHandler(loop, settings.MQTT_BROKER) + + dev_name_sub, app_name_sub = get_device_info(mqtt) + + dev_name = await dev_name_sub.wait_for_value() + app_name = await app_name_sub.wait_for_value() + dev_name_sub.unsubscribe() + app_name_sub.unsubscribe() + + + mqtt.log('Dev. info:', dev_name, app_name) + + # commands must be registered after get_device_info() since the device's + # base topic is not known before. + mqtt.register_command('reboot', lambda x: deepsleep(1000)) + + try: + reboot = await uasyncio.wait_for(update_app(mqtt, app_name), 5) + if reboot: + deepsleep(100) + except uasyncio.TimeoutError: + mqtt.log('Error updating app: timeout') + except Exception as e: + mqtt.log('Error updating app:', e) + + # if no app is found, wake up every ten seconds to retry the update: + sleep_duration_ms = 10000 + + # run app + try: + gc.collect() + import app + gc.collect() + mqtt.log('Heap after import:', gc.mem_free()) + mqtt.publish(mqtt.local_topic('status/app'), app.__app__) + mqtt.publish(mqtt.local_topic('status/version'), str(app.__version__)) + + sleep_duration_ms = await app.run(mqtt, name=dev_name) + except Exception as e: + mqtt.log('Error running app:', e) + + mqtt.log('Done. Sleeping for {}ms'.format(sleep_duration_ms)) + + deepsleep(sleep_duration_ms) + + +def run(): + try: + loop = uasyncio.get_event_loop() + loop.run_until_complete(main(loop)) + except Exception as e: + print('Error:', e) + deepsleep(1000) diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..66ebf3c --- /dev/null +++ b/src/main.py @@ -0,0 +1,4 @@ +#import app_loader + +import app_loader +app_loader.run() diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..310173d --- /dev/null +++ b/src/settings.py @@ -0,0 +1,16 @@ + + +# timeout for connecting to the WiFi +NETWORK_TIMEOUT = 5 + +# address of MQTT broker +MQTT_BROKER = '192.168.1.1' + +# basic prefix used for all topics +MQTT_PREFIX = 'home' + +# format for device topics +DEVICE_TOPIC_FORMAT = 'devices/{ip}' + +# log topic (None to disable logging via MQTT). relative to device topic +LOG_TOPIC = 'log'