Compare commits

...

1 Commits

Author SHA1 Message Date
0xee 6c14800a02 Add mqtt-collect script 2020-03-30 20:23:14 +02:00
2 changed files with 68 additions and 1 deletions

67
mqtt-collect Executable file
View File

@ -0,0 +1,67 @@
#!/usr/bin/env python
import argparse
import yaml
import paho.mqtt.client
import sqlite3
import datetime
def init_db(path, topics):
db = sqlite3.connect(path)
c = db.cursor()
for topic in topics:
table = topic.replace('/', '_')
c.execute(
f'CREATE TABLE IF NOT EXISTS {table} (timestamp timestamp, value real)'
)
db.commit()
return db
def main():
options = argparse.ArgumentParser()
options.add_argument('--broker', '-b', default='localhost')
options.add_argument('--port', '-p', default='1883', type=int)
options.add_argument('topicfile')
options.add_argument('db')
args = options.parse_args()
with open(args.topicfile, 'r') as f:
config_dict = yaml.load(f, Loader=yaml.Loader)
print(config_dict['topics'])
db = init_db(args.db, config_dict['topics'])
client = paho.mqtt.client.Client()
def on_disconnect(client, userdata, rc):
client.reconnect()
def on_message(client, userdata, msg):
try:
c = db.cursor()
table = msg.topic.replace("/", "_")
ts = datetime.datetime.now()
value = float(msg.payload)
c.execute(f'INSERT INTO {table} VALUES (?, ?)', (ts, value))
c.execute(f'SELECT * from {table}')
db.commit()
except Exception as e:
print(f'Error writing to database: {e}')
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect(args.broker, args.port)
client.subscribe([(topic, 2) for topic in config_dict['topics']])
try:
client.loop_forever()
except Exception:
db.commit()
db.close()
if __name__ == '__main__':
main()

View File

@ -8,4 +8,4 @@ setup(name='mqtt-tools',
author='0xee', author='0xee',
author_email='mqtt-tools@0xee.eu', author_email='mqtt-tools@0xee.eu',
packages=[], packages=[],
scripts=['mqtt-config']) scripts=['mqtt-config', 'mqtt-collect'])