166 lines
4.9 KiB
Python
166 lines
4.9 KiB
Python
import paho.mqtt.client as mqtt
|
|
import asyncio
|
|
import secrets
|
|
import data
|
|
import random
|
|
|
|
|
|
state = {}
|
|
rules = data.rules
|
|
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
|
|
|
|
def publish_transform(fromTopic, toTopic):
|
|
if fromTopic and toTopic:
|
|
ft = fromTopic.removeprefix("main")
|
|
tt = toTopic.removeprefix("main")
|
|
path = "transform" + ft + tt
|
|
publish(path, 1)
|
|
|
|
|
|
async def set_config(topic, payload):
|
|
print(f"Set the config for {topic} to {payload}")
|
|
data.config[topic] = int(float(payload))
|
|
|
|
|
|
async def do_a_gesture(topic):
|
|
print(f"called to do a gesture for {topic}")
|
|
tl = data.topics.copy()
|
|
tl.remove('main' + topic)
|
|
random.shuffle(tl)
|
|
tl.insert(0, topic)
|
|
tl[0:random.randint(0, len(tl))]
|
|
wIndex = random.randint(0, len(tl))
|
|
isW = True
|
|
|
|
while isW:
|
|
print("walking")
|
|
wIndex += 1
|
|
index = wIndex % len(tl)
|
|
publish(tl[index], 1)
|
|
publish_transform(tl[index - 1], tl[index])
|
|
|
|
await asyncio.sleep(data.config['delay'])
|
|
|
|
if index > 0:
|
|
publish(tl[index - 1], 0)
|
|
print(f"Go off: {tl[index - 1]}")
|
|
await asyncio.sleep(data.config['delay'] * 0.8)
|
|
|
|
if (random.random() > data.config['probability']):
|
|
print("finished the walk")
|
|
publish("main/gesture" + topic, 0)
|
|
isW = False
|
|
asyncio.run_coroutine_threadsafe(publish_later("main/gesture" + topic, 1), loop)
|
|
|
|
|
|
def on_connect(client, userdata, flags, reason_code, properties):
|
|
print(f"Connected with result code {reason_code}")
|
|
client.subscribe("main/#")
|
|
publish("main/start", 1)
|
|
|
|
|
|
async def publish_later_by(topic, payload, delay):
|
|
await asyncio.sleep(delay)
|
|
publish(topic, payload)
|
|
|
|
|
|
async def publish_later(rule, msg):
|
|
print(
|
|
f"publish_later: Processing message with doTopic {rule['doTopic']} after {rule['delay']} seconds delay")
|
|
await asyncio.sleep(rule['delay'])
|
|
del rule['doingDelay']
|
|
publish(rule['doTopic'], rule['doMessage'])
|
|
publish_transform(msg.topic, rule['doTopic'])
|
|
|
|
|
|
def publish(topic, msg):
|
|
result = mqttc.publish(topic, msg)
|
|
status = result[0]
|
|
if not status == 0:
|
|
print(f"Failed to send message to topic {topic}")
|
|
|
|
|
|
def check_against_rules(msg):
|
|
if msg.topic in rules:
|
|
print(f"Found {len(rules[msg.topic])} rules on {msg.topic}"),
|
|
willReset = False
|
|
|
|
for rule in rules[msg.topic]:
|
|
if (state[msg.topic]['count'][msg.payload] >= rule['count']
|
|
or rule['count'] is None) and int(float(msg.payload)) == rule['state']:
|
|
|
|
if (rule['reset']):
|
|
willReset = True
|
|
if rule['delay']:
|
|
if 'doingDelay' not in rule:
|
|
rule['doingDelay'] = True
|
|
asyncio.run_coroutine_threadsafe(publish_later(rule, msg), loop)
|
|
else:
|
|
print('not scheduling')
|
|
else:
|
|
publish_transform(msg.topic, rule['doTopic'])
|
|
publish(rule['doTopic'], rule['doMessage'])
|
|
|
|
if willReset:
|
|
state[msg.topic]['count'][msg.payload] = 0
|
|
|
|
else:
|
|
print("... ... i have no rules for this topic")
|
|
|
|
|
|
# The callback for when a PUBLISH message is received from the server.
|
|
def on_message(client, userdata, msg):
|
|
if msg.retain:
|
|
return False
|
|
else:
|
|
msg.payload = str(msg.payload.decode("utf-8"))
|
|
print(f'msg: {msg.payload} topic: {msg.topic}')
|
|
|
|
if len(msg.payload) > 140:
|
|
print("way to long of a message, not gonna store that")
|
|
return False
|
|
|
|
if msg.topic not in state:
|
|
state[msg.topic] = {'count': {}}
|
|
|
|
state[msg.topic]['count'].setdefault(msg.payload, 0)
|
|
state[msg.topic]['count'][msg.payload] += 1
|
|
|
|
# check if the message has a gesture
|
|
if "gesture" in msg.topic:
|
|
if msg.payload.strip() != 0:
|
|
asyncio.run_coroutine_threadsafe(do_a_gesture(
|
|
msg.topic.removeprefix('main/gesture')), loop)
|
|
|
|
if "config" in msg.topic:
|
|
if msg.payload.strip() != 0:
|
|
asyncio.run_coroutine_threadsafe(set_config(
|
|
msg.topic.removeprefix('main/config'), msg.payload), loop)
|
|
|
|
else:
|
|
check_against_rules(msg)
|
|
|
|
|
|
async def main():
|
|
global loop
|
|
loop = asyncio.get_running_loop() # Store the main event loop
|
|
|
|
mqttc.on_connect = on_connect
|
|
mqttc.on_message = on_message
|
|
|
|
mqttc.username_pw_set(secrets.mqtt_username, secrets.mqtt_password)
|
|
print("connect to klank.school")
|
|
mqttc.connect("mqtt.klank.school", 7000, 60)
|
|
|
|
# Start the MQTT loop in a background thread
|
|
mqttc.loop_start()
|
|
|
|
# asyncio.run_coroutine_threadsafe(do_a_gesture(data.topics[0].removeprefix('main')), loop)
|
|
|
|
# # Keep the asyncio event loop running
|
|
await asyncio.Event().wait()
|
|
|
|
# Run the main function
|
|
asyncio.run(main())
|