import paho.mqtt.client as mqtt import asyncio import secrets import data import random state = {} rules = data.rules mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def publish_transform(fromTopic, toTopic): if fromTopic and toTopic: ft = fromTopic.removeprefix("main") tt = toTopic.removeprefix("main") path = "transform" + ft + tt publish(path, 1) async def set_config(topic, payload): print(f"Set the config for {topic} to {payload}") data.config[topic] = int(float(payload)) async def do_a_gesture(topic): print(f"called to do a gesture for {topic}") tl = data.topics.copy() tl.remove('main' + topic) random.shuffle(tl) tl.insert(0, topic) tl[0:random.randint(0, len(tl))] wIndex = random.randint(0, len(tl)) isW = True while isW: print("walking") wIndex += 1 index = wIndex % len(tl) publish(tl[index], 1) publish_transform(tl[index - 1], tl[index]) await asyncio.sleep(data.config['delay']) if index > 0: publish(tl[index - 1], 0) print(f"Go off: {tl[index - 1]}") await asyncio.sleep(data.config['delay'] * 0.8) if (random.random() > data.config['probability']): print("finished the walk") publish("main/gesture" + topic, 0) isW = False asyncio.run_coroutine_threadsafe(publish_later("main/gesture" + topic, 1), loop) def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") client.subscribe("main/#") publish("main/start", 1) async def publish_later_by(topic, payload, delay): await asyncio.sleep(delay) publish(topic, payload) async def publish_later(rule, msg): print( f"publish_later: Processing message with doTopic {rule['doTopic']} after {rule['delay']} seconds delay") await asyncio.sleep(rule['delay']) del rule['doingDelay'] publish(rule['doTopic'], rule['doMessage']) publish_transform(msg.topic, rule['doTopic']) def publish(topic, msg): result = mqttc.publish(topic, msg) status = result[0] if not status == 0: print(f"Failed to send message to topic {topic}") def check_against_rules(msg): if msg.topic in rules: print(f"Found {len(rules[msg.topic])} rules on {msg.topic}"), willReset = False for rule in rules[msg.topic]: if (state[msg.topic]['count'][msg.payload] >= rule['count'] or rule['count'] is None) and int(float(msg.payload)) == rule['state']: if (rule['reset']): willReset = True if rule['delay']: if 'doingDelay' not in rule: rule['doingDelay'] = True asyncio.run_coroutine_threadsafe(publish_later(rule, msg), loop) else: print('not scheduling') else: publish_transform(msg.topic, rule['doTopic']) publish(rule['doTopic'], rule['doMessage']) if willReset: state[msg.topic]['count'][msg.payload] = 0 else: print("... ... i have no rules for this topic") # The callback for when a PUBLISH message is received from the server. def on_message(client, userdata, msg): if msg.retain: return False else: msg.payload = str(msg.payload.decode("utf-8")) print(f'msg: {msg.payload} topic: {msg.topic}') if len(msg.payload) > 140: print("way to long of a message, not gonna store that") return False if msg.topic not in state: state[msg.topic] = {'count': {}} state[msg.topic]['count'].setdefault(msg.payload, 0) state[msg.topic]['count'][msg.payload] += 1 # check if the message has a gesture if "gesture" in msg.topic: if msg.payload.strip() != 0: asyncio.run_coroutine_threadsafe(do_a_gesture( msg.topic.removeprefix('main/gesture')), loop) if "config" in msg.topic: if msg.payload.strip() != 0: asyncio.run_coroutine_threadsafe(set_config( msg.topic.removeprefix('main/config'), msg.payload), loop) else: check_against_rules(msg) async def main(): global loop loop = asyncio.get_running_loop() # Store the main event loop mqttc.on_connect = on_connect mqttc.on_message = on_message mqttc.username_pw_set(secrets.mqtt_username, secrets.mqtt_password) print("connect to klank.school") mqttc.connect("mqtt.klank.school", 7000, 60) # Start the MQTT loop in a background thread mqttc.loop_start() # asyncio.run_coroutine_threadsafe(do_a_gesture(data.topics[0].removeprefix('main')), loop) # # Keep the asyncio event loop running await asyncio.Event().wait() # Run the main function asyncio.run(main())