2 changed files with 62 additions and 1 deletions
@ -0,0 +1,61 @@ |
|||
#!/usr/bin/env python |
|||
|
|||
import argparse |
|||
import yaml |
|||
|
|||
import paho.mqtt.client |
|||
import sqlite3 |
|||
import datetime |
|||
|
|||
|
|||
def init_db(path, topics): |
|||
db = sqlite3.connect(path) |
|||
return db |
|||
|
|||
|
|||
def main(): |
|||
options = argparse.ArgumentParser() |
|||
options.add_argument('--broker', '-b', default='localhost') |
|||
options.add_argument('--port', '-p', default='1883', type=int) |
|||
options.add_argument('topicfile') |
|||
options.add_argument('db') |
|||
args = options.parse_args() |
|||
|
|||
with open(args.topicfile, 'r') as f: |
|||
config_dict = yaml.load(f, Loader=yaml.Loader) |
|||
|
|||
print(config_dict['topics']) |
|||
db = init_db(args.db, config_dict['topics']) |
|||
client = paho.mqtt.client.Client() |
|||
|
|||
def on_disconnect(client, userdata, rc): |
|||
client.reconnect() |
|||
|
|||
def on_message(client, userdata, msg): |
|||
try: |
|||
c = db.cursor() |
|||
table = msg.topic.replace("/", "_") |
|||
ts = datetime.datetime.now() |
|||
value = float(msg.payload) |
|||
c.execute(f'CREATE TABLE IF NOT EXISTS {table} ' |
|||
'(timestamp timestamp, value real)') |
|||
c.execute(f'INSERT INTO {table} VALUES (?, ?)', (ts, value)) |
|||
c.execute(f'SELECT * from {table}') |
|||
db.commit() |
|||
except Exception as e: |
|||
print(f'Error writing to database: {e}') |
|||
|
|||
client.on_disconnect = on_disconnect |
|||
client.on_message = on_message |
|||
|
|||
client.connect(args.broker, args.port) |
|||
client.subscribe([(topic, 2) for topic in config_dict['topics']]) |
|||
try: |
|||
client.loop_forever() |
|||
except Exception: |
|||
db.commit() |
|||
db.close() |
|||
|
|||
|
|||
if __name__ == '__main__': |
|||
main() |
Loading…
Reference in new issue