import paho.mqtt.client as mqtt import asyncio import secrets import data from random import randrange state = {} rules = data.rules mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) async def do_a_gesture(gesture, topic): state = False delay = gesture["delay"] for i in range(randrange(gesture["range"][0],gesture["range"][1])): await asyncio.sleep(delay) delay = delay * gesture["multiplier"] if state: publish("gesture" + topic, gesture["on"]) state = False else: publish("gesture" + topic, gesture["off"]) state = True publish("main/gesture" + topic, 0) def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") client.subscribe("main/#") async def publish_later(rule, transformTopic): print(f"publish_later: Processing message with doTopic {rule['doTopic']} after {rule['delay']} seconds delay") await asyncio.sleep(rule['delay']) del rule['doingDelay'] publish(rule['doTopic'], rule['doMessage']) publish(transformTopic, 1) def publish(topic, msg): print(f"publish {topic} {msg}") result = mqttc.publish(topic, msg) status = result[0] if not status == 0: print(f"Failed to send message to topic {topic}") def check_against_rules(msg): if msg.topic in rules: print(f"Found {len(rules[msg.topic])} rules on this topic"), willReset = False for rule in rules[msg.topic]: print(f"Rule: when the count is > {rule['count']}. Current = {state[msg.topic]['count'][msg.payload]} on payload {msg.payload}"), if (state[msg.topic]['count'][msg.payload] >= rule['count'] or rule['count'] == False) and msg.payload == rule['state']: print(f"transform topic is {transformTopic}") if(rule['reset']): willReset = True if rule['delay']: if not 'doingDelay' in rule: rule['doingDelay'] = True asyncio.run_coroutine_threadsafe(publish_later(rule, transformTopic), loop) else: print('not scheduling') else: publish(transformTopic, 1) publish(rule['doTopic'], rule['doMessage']) if willReset: state[msg.topic]['count'][msg.payload] = 0 else: print("... ... i have no rules for this topic") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): print("on message") if msg.retain: return False else: msg.payload = str(msg.payload.decode("utf-8")) if len(msg.payload) > 140: print("way to long of a message, not gonna store that") return False if msg.topic not in state: state[msg.topic] = {'count': {}} state[msg.topic]['count'].setdefault(msg.payload, 0) state[msg.topic]['count'][msg.payload] += 1 # check if the message has a gesture if "gesture" in msg.topic: if msg.payload.strip() in data.gestures: asyncio.run_coroutine_threadsafe(do_a_gesture(data.gestures[msg.payload], msg.topic.removeprefix('main/gesture')), loop) else: check_against_rules(msg) async def main(): global loop loop = asyncio.get_running_loop() # Store the main event loop mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set(secrets.mqtt_username, secrets.mqtt_password) print("connect to klank.school") mqttc.connect("mqtt.klank.school", 7000, 60) # Start the MQTT loop in a background thread mqttc.loop_start() # Keep the asyncio event loop running await asyncio.Event().wait() # Run the main function asyncio.run(main())