import paho.mqtt.client as mqtt import asyncio import secrets import data import random import pprint state = {} rules = data.rules mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def publish_transform(fromTopic, toTopic): if fromTopic and toTopic: ft = fromTopic.removeprefix("main") tt = toTopic.removeprefix("main") path = "transform" + ft + tt publish(path, 1) async def do_a_gesture(topic): print(f"called to do a gesture for {topic}") # TODO: make sure the data.topics array is shuffled ánd the send in topic is always first tl = len(data.topics) theseTopics = data.topics[random.randint(0, tl / 2):random.randint(tl/2, tl)] wIndex = random.randint(1, tl) isW = True print(theseTopics) while isW: wIndex+=1 index = wIndex%len(theseTopics) publish(theseTopics[index], 1) publish_transform(theseTopics[index - 1], theseTopics[index]) await asyncio.sleep(data.walkDelay) if index > 0: publish(theseTopics[index - 1], 0) print(f"Go off: {theseTopics[index - 1]}") await asyncio.sleep(data.walkDelay * 0.8) if(random.random() > 0.8): # TODO send message that gesture is finished print("finished the walk") publish("main/gesture" + topic, 0) isW = False def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") client.subscribe("main/#") async def publish_later(rule, transformTopic): print( f"publish_later: Processing message with doTopic {rule['doTopic']} after {rule['delay']} seconds delay") await asyncio.sleep(rule['delay']) del rule['doingDelay'] publish(rule['doTopic'], rule['doMessage']) publish(transformTopic, 1) def publish(topic, msg): result = mqttc.publish(topic, msg) status = result[0] if not status == 0: print(f"Failed to send message to topic {topic}") def check_against_rules(msg): if msg.topic in rules: print(f"Found {len(rules[msg.topic])} rules on {msg.topic}"), willReset = False for rule in rules[msg.topic]: if (state[msg.topic]['count'][msg.payload] >= rule['count'] or rule['count'] is None) and int(float(msg.payload)) == rule['state']: transformTopic = f"transform" + \ msg.topic.removeprefix("main") + rule['doTopic'].removeprefix("main") print(f"---- rule is met----") print(rule) if (rule['reset']): willReset = True if rule['delay']: if 'doingDelay' not in rule: rule['doingDelay'] = True asyncio.run_coroutine_threadsafe(publish_later(rule, transformTopic), loop) else: print('not scheduling') else: publish(transformTopic, 1) publish(rule['doTopic'], rule['doMessage']) if willReset: state[msg.topic]['count'][msg.payload] = 0 else: print("... ... i have no rules for this topic") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): if msg.retain: return False else: msg.payload = str(msg.payload.decode("utf-8")) print(f'msg: {msg.payload} topic: {msg.topic}') if len(msg.payload) > 140: print("way to long of a message, not gonna store that") return False if msg.topic not in state: state[msg.topic] = {'count': {}} state[msg.topic]['count'].setdefault(msg.payload, 0) state[msg.topic]['count'][msg.payload] += 1 # check if the message has a gesture if "gesture" in msg.topic: if msg.payload.strip() != 0: asyncio.run_coroutine_threadsafe(do_a_gesture(msg.topic.removeprefix('main/gesture')), loop) else: check_against_rules(msg) async def main(): global loop loop = asyncio.get_running_loop() # Store the main event loop mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set(secrets.mqtt_username, secrets.mqtt_password) print("connect to klank.school") mqttc.connect("mqtt.klank.school", 7000, 60) # Start the MQTT loop in a background thread mqttc.loop_start() # asyncio.run_coroutine_threadsafe(do_a_gesture(data.topics[0].removeprefix('main')), loop) # # Keep the asyncio event loop running await asyncio.Event().wait() # Run the main function asyncio.run(main())