import paho.mqtt.client as mqtt import asyncio import secrets import rules state = {} mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") client.subscribe("main/#") async def publish_later(topic, msg, delay): print(f"publish_later: Processing message with topic {topic} after {delay} seconds delay") await asyncio.sleep(delay) print(f"publish_later: completed delay") publish(topic, msg) def publish(topic, msg): result = mqttc.publish(topic, msg) status = result[0] if not status == 0: print(f"Failed to send message to topic {topic}") def check_against_rules(msg): if msg.topic in rules: print(f"... ... Found {len(rules[msg.topic])} rules on this topic") for rule in rules[msg.topic]: print(f"... ... Rule is met when the count is above {rule['count']}. Current count is {state[msg.topic]['count']}") if state[msg.topic]['count'] > rule['count']: if(rule['reset']): state[msg.topic]['count'] = 0 if rule['delay']: asyncio.run_coroutine_threadsafe(publish_later(rule['doTopic'], rule['doMessage'], rule['delay']), loop) else: publish(rule['doTopic'], rule['doMessage']) else: print("... ... i have no rules for this topic") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): print("on_message") if msg.retain: return False if msg.topic in state: state[msg.topic]["count"] += 1 else: state[msg.topic] = {} state[msg.topic]["count"] = 0 check_against_rules(msg) async def main(): global loop loop = asyncio.get_running_loop() # Store the main event loop mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set(secrets.mqtt_username, secrets.mqtt_password) print("connect to klank.school") mqttc.connect("mqtt.klank.school", 7000, 60) # Start the MQTT loop in a background thread mqttc.loop_start() # Keep the asyncio event loop running await asyncio.Event().wait() # Run the main function asyncio.run(main())