#!/usr/bin/env python import argparse import yaml import paho.mqtt.client import sqlite3 import datetime MAX_VALUE_CACHE_SIZE = 1024 def init_db(path, topics): db = sqlite3.connect(path) return db def main(): options = argparse.ArgumentParser() options.add_argument('--broker', '-b', default='localhost') options.add_argument('--port', '-p', default='1883', type=int) options.add_argument('topicfile') options.add_argument('db') args = options.parse_args() with open(args.topicfile, 'r') as f: config_dict = yaml.load(f, Loader=yaml.Loader) print(config_dict['topics']) db = init_db(args.db, config_dict['topics']) client = paho.mqtt.client.Client() def on_disconnect(client, userdata, rc): client.reconnect() current_values = {} def on_message(client, userdata, msg): try: c = db.cursor() table = msg.topic.replace("/", "_") ts = datetime.datetime.now() textual = { b"on": 1., b"off": 0., b"yes": 1., b"no": 0. } if msg.payload.lower() in textual: value = textual[msg.payload.lower()] else: value = float(msg.payload) print(f"{table}: {value} ") if current_values.get(table, None) != value: current_values[table] = value if len(current_values) > MAX_VALUE_CACHE_SIZE: first_entry = current_values.keys().next() del current_values[first_entry] c.execute(f'CREATE TABLE IF NOT EXISTS "{table}" ' '(timestamp timestamp, value real)') c.execute(f'INSERT INTO "{table}" VALUES (?, ?)', (ts, value)) db.commit() except Exception as e: print(f'Error writing to database: {e}') client.on_disconnect = on_disconnect client.on_message = on_message client.connect(args.broker, args.port) client.subscribe([(topic, 2) for topic in config_dict['topics']]) try: client.loop_forever() except Exception: db.commit() db.close() if __name__ == '__main__': main()