83 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			83 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python
 | 
						|
 | 
						|
import argparse
 | 
						|
import yaml
 | 
						|
 | 
						|
import paho.mqtt.client
 | 
						|
import sqlite3
 | 
						|
import datetime
 | 
						|
 | 
						|
MAX_VALUE_CACHE_SIZE = 1024
 | 
						|
 | 
						|
def init_db(path, topics):
 | 
						|
    db = sqlite3.connect(path)
 | 
						|
    return db
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    options = argparse.ArgumentParser()
 | 
						|
    options.add_argument('--broker', '-b', default='localhost')
 | 
						|
    options.add_argument('--port', '-p', default='1883', type=int)
 | 
						|
    options.add_argument('topicfile')
 | 
						|
    options.add_argument('db')
 | 
						|
    args = options.parse_args()
 | 
						|
 | 
						|
    with open(args.topicfile, 'r') as f:
 | 
						|
        config_dict = yaml.load(f, Loader=yaml.Loader)
 | 
						|
 | 
						|
    print(config_dict['topics'])
 | 
						|
    db = init_db(args.db, config_dict['topics'])
 | 
						|
    client = paho.mqtt.client.Client()
 | 
						|
 | 
						|
    def on_disconnect(client, userdata, rc):
 | 
						|
        client.reconnect()
 | 
						|
 | 
						|
    current_values = {}
 | 
						|
 | 
						|
    def on_message(client, userdata, msg):
 | 
						|
        try:
 | 
						|
            c = db.cursor()
 | 
						|
            table = msg.topic.replace("/", "_")
 | 
						|
            ts = datetime.datetime.now()
 | 
						|
            textual = {
 | 
						|
                b"on": 1.,
 | 
						|
                b"off": 0.,
 | 
						|
                b"yes": 1.,
 | 
						|
                b"no": 0.
 | 
						|
            }
 | 
						|
 | 
						|
            if msg.payload.lower() in textual:
 | 
						|
                value = textual[msg.payload.lower()]
 | 
						|
            else:
 | 
						|
                value = float(msg.payload)
 | 
						|
            print(f"{table}: {value} ")
 | 
						|
 | 
						|
            if current_values.get(table, None) != value:
 | 
						|
                current_values[table] = value
 | 
						|
                if len(current_values) > MAX_VALUE_CACHE_SIZE:
 | 
						|
                    first_entry = current_values.keys().next()
 | 
						|
                    del current_values[first_entry]
 | 
						|
 | 
						|
                c.execute(f'CREATE TABLE IF NOT EXISTS "{table}" '
 | 
						|
                          '(timestamp timestamp, value real)')
 | 
						|
                c.execute(f'INSERT INTO "{table}" VALUES (?, ?)', (ts, value))
 | 
						|
                db.commit()
 | 
						|
 | 
						|
        except Exception as e:
 | 
						|
            print(f'Error writing to database: {e}')
 | 
						|
 | 
						|
    client.on_disconnect = on_disconnect
 | 
						|
    client.on_message = on_message
 | 
						|
 | 
						|
    client.connect(args.broker, args.port)
 | 
						|
    client.subscribe([(topic, 2) for topic in config_dict['topics']])
 | 
						|
    try:
 | 
						|
        client.loop_forever()
 | 
						|
    except Exception:
 | 
						|
        db.commit()
 | 
						|
        db.close()
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    main()
 |