import paho.mqtt.client as mqtt import asyncio import secrets import data from random import randrange state = {} rules = data.rules mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) async def on_gesture_message(): state = False delay = 3 for i in range(randrange(4,8)): await asyncio.sleep(delay) delay = delay * 0.7 if state: print("do left") state = False else: print("do right") state = True print("done") def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") client.subscribe("main/#") mqttc.message_callback_add("gesture/#", on_gesture_message) client.subscribe("gesture/#") async def publish_later(rule): print(f"publish_later: Processing message with doTopic {rule['doTopic']} after {rule['delay']} seconds delay") await asyncio.sleep(rule['delay']) del rule['doingDelay'] publish(rule['doTopic'], rule['doMessage']) def publish(topic, msg): print(f"publish {topic} {msg}") result = mqttc.publish(topic, msg) status = result[0] if not status == 0: print(f"Failed to send message to topic {topic}") def check_against_rules(msg): if msg.topic in rules: print(f"Found {len(rules[msg.topic])} rules on this topic"), willReset = False for rule in rules[msg.topic]: print(f"Rule: when the count is > {rule['count']}. Current = {state[msg.topic]['count'][msg.payload]} on payload {msg.payload}"), if (state[msg.topic]['count'][msg.payload] >= rule['count'] or rule['count'] == False) and msg.payload == rule['state']: if(rule['reset']): willReset = True if rule['delay']: if not 'doingDelay' in rule: rule['doingDelay'] = True asyncio.run_coroutine_threadsafe(publish_later(rule), loop) else: print('not scheduling') else: publish(rule['doTopic'], rule['doMessage']) if willReset: state[msg.topic]['count'][msg.payload] = 0 else: print("... ... i have no rules for this topic") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): print("on message") if msg.retain: return False else: msg.payload = str(msg.payload.decode("utf-8")) if len(msg.payload) > 140: print("way to long of a message, not gonna store that") return False if msg.topic not in state: state[msg.topic] = {'count': {}} state[msg.topic]['count'].setdefault(msg.payload, 0) state[msg.topic]['count'][msg.payload] += 1 check_against_rules(msg) async def main(): global loop loop = asyncio.get_running_loop() # Store the main event loop mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set(secrets.mqtt_username, secrets.mqtt_password) print("connect to klank.school") mqttc.connect("mqtt.klank.school", 7000, 60) # Start the MQTT loop in a background thread mqttc.loop_start() # Keep the asyncio event loop running await asyncio.Event().wait() # Run the main function asyncio.run(main())