Compare commits
1 Commits
6ead6853df
...
6c14800a02
Author | SHA1 | Date |
---|---|---|
0xee | 6c14800a02 |
|
@ -0,0 +1,67 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import yaml
|
||||||
|
|
||||||
|
import paho.mqtt.client
|
||||||
|
import sqlite3
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
|
||||||
|
def init_db(path, topics):
|
||||||
|
db = sqlite3.connect(path)
|
||||||
|
c = db.cursor()
|
||||||
|
for topic in topics:
|
||||||
|
table = topic.replace('/', '_')
|
||||||
|
c.execute(
|
||||||
|
f'CREATE TABLE IF NOT EXISTS {table} (timestamp timestamp, value real)'
|
||||||
|
)
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
return db
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
options = argparse.ArgumentParser()
|
||||||
|
options.add_argument('--broker', '-b', default='localhost')
|
||||||
|
options.add_argument('--port', '-p', default='1883', type=int)
|
||||||
|
options.add_argument('topicfile')
|
||||||
|
options.add_argument('db')
|
||||||
|
args = options.parse_args()
|
||||||
|
|
||||||
|
with open(args.topicfile, 'r') as f:
|
||||||
|
config_dict = yaml.load(f, Loader=yaml.Loader)
|
||||||
|
|
||||||
|
print(config_dict['topics'])
|
||||||
|
db = init_db(args.db, config_dict['topics'])
|
||||||
|
client = paho.mqtt.client.Client()
|
||||||
|
|
||||||
|
def on_disconnect(client, userdata, rc):
|
||||||
|
client.reconnect()
|
||||||
|
|
||||||
|
def on_message(client, userdata, msg):
|
||||||
|
try:
|
||||||
|
c = db.cursor()
|
||||||
|
table = msg.topic.replace("/", "_")
|
||||||
|
ts = datetime.datetime.now()
|
||||||
|
value = float(msg.payload)
|
||||||
|
c.execute(f'INSERT INTO {table} VALUES (?, ?)', (ts, value))
|
||||||
|
c.execute(f'SELECT * from {table}')
|
||||||
|
db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
print(f'Error writing to database: {e}')
|
||||||
|
|
||||||
|
client.on_disconnect = on_disconnect
|
||||||
|
client.on_message = on_message
|
||||||
|
|
||||||
|
client.connect(args.broker, args.port)
|
||||||
|
client.subscribe([(topic, 2) for topic in config_dict['topics']])
|
||||||
|
try:
|
||||||
|
client.loop_forever()
|
||||||
|
except Exception:
|
||||||
|
db.commit()
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Loading…
Reference in New Issue