diff --git a/mqtt-collect b/mqtt-collect new file mode 100755 index 0000000..da88495 --- /dev/null +++ b/mqtt-collect @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +import argparse +import yaml + +import paho.mqtt.client +import sqlite3 +import datetime + + +def init_db(path, topics): + db = sqlite3.connect(path) + return db + + +def main(): + options = argparse.ArgumentParser() + options.add_argument('--broker', '-b', default='localhost') + options.add_argument('--port', '-p', default='1883', type=int) + options.add_argument('topicfile') + options.add_argument('db') + args = options.parse_args() + + with open(args.topicfile, 'r') as f: + config_dict = yaml.load(f, Loader=yaml.Loader) + + print(config_dict['topics']) + db = init_db(args.db, config_dict['topics']) + client = paho.mqtt.client.Client() + + def on_disconnect(client, userdata, rc): + client.reconnect() + + def on_message(client, userdata, msg): + try: + c = db.cursor() + table = msg.topic.replace("/", "_") + ts = datetime.datetime.now() + value = float(msg.payload) + c.execute(f'CREATE TABLE IF NOT EXISTS {table} ' + '(timestamp timestamp, value real)') + c.execute(f'INSERT INTO {table} VALUES (?, ?)', (ts, value)) + c.execute(f'SELECT * from {table}') + db.commit() + except Exception as e: + print(f'Error writing to database: {e}') + + client.on_disconnect = on_disconnect + client.on_message = on_message + + client.connect(args.broker, args.port) + client.subscribe([(topic, 2) for topic in config_dict['topics']]) + try: + client.loop_forever() + except Exception: + db.commit() + db.close() + + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index bf362e3..d6e429d 100644 --- a/setup.py +++ b/setup.py @@ -8,4 +8,4 @@ setup(name='mqtt-tools', author='0xee', author_email='mqtt-tools@0xee.eu', packages=[], - scripts=['mqtt-config']) + scripts=['mqtt-config', 'mqtt-collect'])