Compare commits
4 Commits
6c14800a02
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 98f7cd6d17 | |||
| b88fcb2f04 | |||
| adf4e564d9 | |||
| 6ead6853df |
@@ -1,10 +1,12 @@
|
||||
{ python3 }:
|
||||
with python3.pkgs;
|
||||
buildPythonPackage rec {
|
||||
name = "mqtt-config";
|
||||
name = "mqtt-tools";
|
||||
src = ./.;
|
||||
propagatedBuildInputs = [ pyyaml paho-mqtt ];
|
||||
buildInputs = [];
|
||||
buildInputs = [ ];
|
||||
doCheck = false;
|
||||
shellHook = "";
|
||||
pyproject = true;
|
||||
build-system = [ setuptools ];
|
||||
}
|
||||
|
||||
82
mqtt-collect
Executable file
82
mqtt-collect
Executable file
@@ -0,0 +1,82 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import argparse
|
||||
import yaml
|
||||
|
||||
import paho.mqtt.client
|
||||
import sqlite3
|
||||
import datetime
|
||||
|
||||
MAX_VALUE_CACHE_SIZE = 1024
|
||||
|
||||
def init_db(path, topics):
|
||||
db = sqlite3.connect(path)
|
||||
return db
|
||||
|
||||
|
||||
def main():
|
||||
options = argparse.ArgumentParser()
|
||||
options.add_argument('--broker', '-b', default='localhost')
|
||||
options.add_argument('--port', '-p', default='1883', type=int)
|
||||
options.add_argument('topicfile')
|
||||
options.add_argument('db')
|
||||
args = options.parse_args()
|
||||
|
||||
with open(args.topicfile, 'r') as f:
|
||||
config_dict = yaml.load(f, Loader=yaml.Loader)
|
||||
|
||||
print(config_dict['topics'])
|
||||
db = init_db(args.db, config_dict['topics'])
|
||||
client = paho.mqtt.client.Client()
|
||||
|
||||
def on_disconnect(client, userdata, rc):
|
||||
client.reconnect()
|
||||
|
||||
current_values = {}
|
||||
|
||||
def on_message(client, userdata, msg):
|
||||
try:
|
||||
c = db.cursor()
|
||||
table = msg.topic.replace("/", "_")
|
||||
ts = datetime.datetime.now()
|
||||
textual = {
|
||||
b"on": 1.,
|
||||
b"off": 0.,
|
||||
b"yes": 1.,
|
||||
b"no": 0.
|
||||
}
|
||||
|
||||
if msg.payload.lower() in textual:
|
||||
value = textual[msg.payload.lower()]
|
||||
else:
|
||||
value = float(msg.payload)
|
||||
print(f"{table}: {value} ")
|
||||
|
||||
if current_values.get(table, None) != value:
|
||||
current_values[table] = value
|
||||
if len(current_values) > MAX_VALUE_CACHE_SIZE:
|
||||
first_entry = current_values.keys().next()
|
||||
del current_values[first_entry]
|
||||
|
||||
c.execute(f'CREATE TABLE IF NOT EXISTS "{table}" '
|
||||
'(timestamp timestamp, value real)')
|
||||
c.execute(f'INSERT INTO "{table}" VALUES (?, ?)', (ts, value))
|
||||
db.commit()
|
||||
|
||||
except Exception as e:
|
||||
print(f'Error writing to database: {e}')
|
||||
|
||||
client.on_disconnect = on_disconnect
|
||||
client.on_message = on_message
|
||||
|
||||
client.connect(args.broker, args.port)
|
||||
client.subscribe([(topic, 2) for topic in config_dict['topics']])
|
||||
try:
|
||||
client.loop_forever()
|
||||
except Exception:
|
||||
db.commit()
|
||||
db.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user