Initial version of app loader
This commit is contained in:
commit
a03a46eddb
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
*.mpy
|
235
src/app_loader.py
Normal file
235
src/app_loader.py
Normal file
@ -0,0 +1,235 @@
|
||||
import uasyncio
|
||||
import settings
|
||||
from umqtt.robust import MQTTClient
|
||||
import network
|
||||
import machine
|
||||
import gc
|
||||
import os
|
||||
import utime
|
||||
|
||||
def String(x):
|
||||
if isinstance(x, bytes):
|
||||
return x.decode()
|
||||
return str(x)
|
||||
|
||||
|
||||
def raw(x):
|
||||
return x
|
||||
|
||||
|
||||
def replace_file(path):
|
||||
def do_replace(content):
|
||||
with open(path, 'wb') as f:
|
||||
length = f.write(content)
|
||||
return length
|
||||
return do_replace
|
||||
|
||||
|
||||
class MqttHandler(object):
|
||||
|
||||
class Subscription(object):
|
||||
def __init__(self, handler, topic, cls, cb=None):
|
||||
self.topic = topic
|
||||
self.value = None
|
||||
self.cls = cls
|
||||
self.cb = cb
|
||||
self.handler = handler
|
||||
|
||||
async def set_value(self, msg):
|
||||
val = self.cls(msg)
|
||||
if self.cb is None:
|
||||
self.value = val
|
||||
else:
|
||||
self.cb(val)
|
||||
|
||||
async def wait_for_value(self, interval_ms=10):
|
||||
while self.value is None:
|
||||
await uasyncio.sleep_ms(interval_ms)
|
||||
return self.value
|
||||
|
||||
def unsubscribe(self):
|
||||
self.handler.subscriptions.pop(self.topic)
|
||||
|
||||
def __init__(self, loop, host):
|
||||
ip = network.WLAN(network.STA_IF).ifconfig()[0]
|
||||
self.client = MQTTClient(ip, host)
|
||||
self.client.connect()
|
||||
self.client.set_callback(self.cb)
|
||||
self.subscriptions = {}
|
||||
self.ev_loop = loop
|
||||
self.ev_loop.create_task(self.loop())
|
||||
self.publish = self.client.publish
|
||||
|
||||
def cb(self, topic, msg):
|
||||
if topic in self.subscriptions:
|
||||
self.ev_loop.create_task(self.subscriptions[topic].set_value(msg))
|
||||
|
||||
def subscribe(self, topic, cls=String, cb=None, local=False):
|
||||
if local:
|
||||
topic = self.local_topic(topic)
|
||||
|
||||
if not isinstance(topic, bytes):
|
||||
topic = topic.encode()
|
||||
|
||||
if topic in self.subscriptions:
|
||||
raise Exception('topic {} already subscribed'.format(topic))
|
||||
|
||||
sub = MqttHandler.Subscription(self, topic, cls=cls, cb=cb)
|
||||
self.subscriptions[topic] = sub
|
||||
self.client.subscribe(topic)
|
||||
return sub
|
||||
|
||||
def local_topic(self, topic):
|
||||
return '{}/{}'.format(settings.device_topic, String(topic))
|
||||
|
||||
def register_command(self, cmd, cb, cls=String, local=True):
|
||||
self.subscribe(cmd, cls=cls, cb=cb, local=local)
|
||||
|
||||
async def loop(self, interval_ms=10):
|
||||
while True:
|
||||
for i in range(1000/interval_ms):
|
||||
self.client.check_msg()
|
||||
await uasyncio.sleep_ms(interval_ms)
|
||||
gc.collect()
|
||||
|
||||
def log(self, *args):
|
||||
if settings.LOG_TOPIC is not None:
|
||||
log_str = ' '.join([String(a) for a in args])
|
||||
self.client.publish(self.local_topic(settings.LOG_TOPIC), log_str)
|
||||
|
||||
|
||||
async def update_app(mqtt, name):
|
||||
current_version = None
|
||||
current_app = None
|
||||
app_file = 'app.mpy'
|
||||
try:
|
||||
gc.collect()
|
||||
import app
|
||||
current_app = app.__app__
|
||||
current_version = app.__version__
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
app_ns = '{}/apps/{}/{{}}'.format(settings.MQTT_PREFIX, name)
|
||||
version_sub = mqtt.subscribe(app_ns.format('version'),
|
||||
cls=int)
|
||||
latest = await version_sub.wait_for_value()
|
||||
version_sub.unsubscribe()
|
||||
|
||||
if current_app != name or current_version != latest:
|
||||
mqtt.log('installed: {} v{}, want {} v{}'.format(
|
||||
current_app, current_version, name, latest))
|
||||
try:
|
||||
# if the wrong app is installed, we remove it and reboot to
|
||||
# avoid running out of memory during the update
|
||||
os.remove('app.mpy')
|
||||
return True
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
gc.collect()
|
||||
sub = mqtt.subscribe(app_ns.format('content'),
|
||||
cls=replace_file(app_file))
|
||||
app_size = await sub.wait_for_value()
|
||||
sub.unsubscribe()
|
||||
mqtt.log('Installed {} v{}, {}b'.format(name, latest, app_size))
|
||||
return True
|
||||
except Exception as e:
|
||||
mqtt.log('Update error:', e)
|
||||
pass
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def deepsleep(duration_ms):
|
||||
utime.sleep_ms(200)
|
||||
# configure RTC.ALARM0 to be able to wake the device
|
||||
rtc = machine.RTC()
|
||||
rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP)
|
||||
# set RTC.ALARM0 to fire after the specified time
|
||||
rtc.alarm(rtc.ALARM0, duration_ms)
|
||||
machine.deepsleep()
|
||||
|
||||
|
||||
async def wait_for_connection(timeout=5):
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
|
||||
while not sta.isconnected():
|
||||
await uasyncio.sleep_ms(100)
|
||||
|
||||
|
||||
def get_device_info(mqtt):
|
||||
sta = network.WLAN(network.STA_IF)
|
||||
mac = ':'.join(['{:02x}'.format(x) for x in sta.config('mac')])
|
||||
ip = sta.ifconfig()[0]
|
||||
settings.device_topic = '{}/{}'.format(settings.MQTT_PREFIX,
|
||||
settings.DEVICE_TOPIC_FORMAT.format(ip=ip, mac=mac))
|
||||
topic_fmt = '{}/{{}}'.format(settings.device_topic.strip('/'))
|
||||
|
||||
dev_name = mqtt.subscribe(topic_fmt.format('config/name'))
|
||||
dev_app = mqtt.subscribe(topic_fmt.format('config/app'))
|
||||
|
||||
return dev_name, dev_app
|
||||
|
||||
|
||||
async def main(loop):
|
||||
try:
|
||||
await uasyncio.wait_for(wait_for_connection(),
|
||||
settings.NETWORK_TIMEOUT)
|
||||
except Exception:
|
||||
raise Exception('Network error')
|
||||
|
||||
mqtt = MqttHandler(loop, settings.MQTT_BROKER)
|
||||
|
||||
dev_name_sub, app_name_sub = get_device_info(mqtt)
|
||||
|
||||
dev_name = await dev_name_sub.wait_for_value()
|
||||
app_name = await app_name_sub.wait_for_value()
|
||||
dev_name_sub.unsubscribe()
|
||||
app_name_sub.unsubscribe()
|
||||
|
||||
|
||||
mqtt.log('Dev. info:', dev_name, app_name)
|
||||
|
||||
# commands must be registered after get_device_info() since the device's
|
||||
# base topic is not known before.
|
||||
mqtt.register_command('reboot', lambda x: deepsleep(1000))
|
||||
|
||||
try:
|
||||
reboot = await uasyncio.wait_for(update_app(mqtt, app_name), 5)
|
||||
if reboot:
|
||||
deepsleep(100)
|
||||
except uasyncio.TimeoutError:
|
||||
mqtt.log('Error updating app: timeout')
|
||||
except Exception as e:
|
||||
mqtt.log('Error updating app:', e)
|
||||
|
||||
# if no app is found, wake up every ten seconds to retry the update:
|
||||
sleep_duration_ms = 10000
|
||||
|
||||
# run app
|
||||
try:
|
||||
gc.collect()
|
||||
import app
|
||||
gc.collect()
|
||||
mqtt.log('Heap after import:', gc.mem_free())
|
||||
mqtt.publish(mqtt.local_topic('status/app'), app.__app__)
|
||||
mqtt.publish(mqtt.local_topic('status/version'), str(app.__version__))
|
||||
|
||||
sleep_duration_ms = await app.run(mqtt, name=dev_name)
|
||||
except Exception as e:
|
||||
mqtt.log('Error running app:', e)
|
||||
|
||||
mqtt.log('Done. Sleeping for {}ms'.format(sleep_duration_ms))
|
||||
|
||||
deepsleep(sleep_duration_ms)
|
||||
|
||||
|
||||
def run():
|
||||
try:
|
||||
loop = uasyncio.get_event_loop()
|
||||
loop.run_until_complete(main(loop))
|
||||
except Exception as e:
|
||||
print('Error:', e)
|
||||
deepsleep(1000)
|
4
src/main.py
Normal file
4
src/main.py
Normal file
@ -0,0 +1,4 @@
|
||||
#import app_loader
|
||||
|
||||
import app_loader
|
||||
app_loader.run()
|
16
src/settings.py
Normal file
16
src/settings.py
Normal file
@ -0,0 +1,16 @@
|
||||
|
||||
|
||||
# timeout for connecting to the WiFi
|
||||
NETWORK_TIMEOUT = 5
|
||||
|
||||
# address of MQTT broker
|
||||
MQTT_BROKER = '192.168.1.1'
|
||||
|
||||
# basic prefix used for all topics
|
||||
MQTT_PREFIX = 'home'
|
||||
|
||||
# format for device topics
|
||||
DEVICE_TOPIC_FORMAT = 'devices/{ip}'
|
||||
|
||||
# log topic (None to disable logging via MQTT). relative to device topic
|
||||
LOG_TOPIC = 'log'
|
Loading…
Reference in New Issue
Block a user