154 lines
4.7 KiB
Python
154 lines
4.7 KiB
Python
import paho.mqtt.client as mqtt
|
|
import asyncio
|
|
import secrets
|
|
import data
|
|
import random
|
|
import pprint
|
|
|
|
|
|
|
|
state = {}
|
|
rules = data.rules
|
|
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
|
|
def publish_transform(fromTopic, toTopic):
|
|
if fromTopic and toTopic:
|
|
ft = fromTopic.removeprefix("main")
|
|
tt = toTopic.removeprefix("main")
|
|
path = "transform" + ft + tt
|
|
publish(path, 1)
|
|
|
|
|
|
async def do_a_gesture(topic):
|
|
print(f"called to do a gesture for {topic}")
|
|
# TODO: make sure the data.topics array is shuffled ánd the send in topic is always first
|
|
tl = len(data.topics)
|
|
theseTopics = data.topics[random.randint(0, tl / 2):random.randint(tl/2, tl)]
|
|
wIndex = random.randint(1, tl)
|
|
isW = True
|
|
print(theseTopics)
|
|
|
|
while isW:
|
|
wIndex+=1
|
|
index = wIndex%len(theseTopics)
|
|
publish(theseTopics[index], 1)
|
|
publish_transform(theseTopics[index - 1], theseTopics[index])
|
|
await asyncio.sleep(data.walkDelay)
|
|
|
|
if index > 0:
|
|
publish(theseTopics[index - 1], 0)
|
|
print(f"Go off: {theseTopics[index - 1]}")
|
|
await asyncio.sleep(data.walkDelay * 0.8)
|
|
|
|
if(random.random() > 0.8):
|
|
# TODO send message that gesture is finished
|
|
print("finished the walk")
|
|
publish("main/gesture" + topic, 0)
|
|
isW = False
|
|
|
|
|
|
|
|
def on_connect(client, userdata, flags, reason_code, properties):
|
|
print(f"Connected with result code {reason_code}")
|
|
client.subscribe("main/#")
|
|
|
|
|
|
async def publish_later(rule, transformTopic):
|
|
print(
|
|
f"publish_later: Processing message with doTopic {rule['doTopic']} after {rule['delay']} seconds delay")
|
|
await asyncio.sleep(rule['delay'])
|
|
del rule['doingDelay']
|
|
publish(rule['doTopic'], rule['doMessage'])
|
|
publish(transformTopic, 1)
|
|
|
|
|
|
def publish(topic, msg):
|
|
result = mqttc.publish(topic, msg)
|
|
status = result[0]
|
|
if not status == 0:
|
|
print(f"Failed to send message to topic {topic}")
|
|
|
|
|
|
def check_against_rules(msg):
|
|
if msg.topic in rules:
|
|
print(f"Found {len(rules[msg.topic])} rules on {msg.topic}"),
|
|
willReset = False
|
|
|
|
for rule in rules[msg.topic]:
|
|
if (state[msg.topic]['count'][msg.payload] >= rule['count']
|
|
or rule['count'] is None) and int(float(msg.payload)) == rule['state']:
|
|
|
|
transformTopic = f"transform" + \
|
|
msg.topic.removeprefix("main") + rule['doTopic'].removeprefix("main")
|
|
print(f"---- rule is met----")
|
|
print(rule)
|
|
|
|
if (rule['reset']):
|
|
willReset = True
|
|
if rule['delay']:
|
|
if 'doingDelay' not in rule:
|
|
rule['doingDelay'] = True
|
|
asyncio.run_coroutine_threadsafe(publish_later(rule, transformTopic), loop)
|
|
else:
|
|
print('not scheduling')
|
|
else:
|
|
publish(transformTopic, 1)
|
|
publish(rule['doTopic'], rule['doMessage'])
|
|
|
|
if willReset:
|
|
state[msg.topic]['count'][msg.payload] = 0
|
|
|
|
else:
|
|
print("... ... i have no rules for this topic")
|
|
|
|
|
|
# The callback for when a PUBLISH message is received from the server.
|
|
def on_message(client, userdata, msg):
|
|
if msg.retain:
|
|
return False
|
|
else:
|
|
msg.payload = str(msg.payload.decode("utf-8"))
|
|
print(f'msg: {msg.payload} topic: {msg.topic}')
|
|
|
|
if len(msg.payload) > 140:
|
|
print("way to long of a message, not gonna store that")
|
|
return False
|
|
|
|
if msg.topic not in state:
|
|
state[msg.topic] = {'count': {}}
|
|
|
|
state[msg.topic]['count'].setdefault(msg.payload, 0)
|
|
state[msg.topic]['count'][msg.payload] += 1
|
|
|
|
# check if the message has a gesture
|
|
if "gesture" in msg.topic:
|
|
if msg.payload.strip() != 0:
|
|
asyncio.run_coroutine_threadsafe(do_a_gesture(msg.topic.removeprefix('main/gesture')), loop)
|
|
|
|
else:
|
|
check_against_rules(msg)
|
|
|
|
|
|
async def main():
|
|
global loop
|
|
loop = asyncio.get_running_loop() # Store the main event loop
|
|
|
|
|
|
mqttc.on_connect = on_connect
|
|
mqttc.on_message = on_message
|
|
|
|
mqttc.username_pw_set(secrets.mqtt_username, secrets.mqtt_password)
|
|
print("connect to klank.school")
|
|
mqttc.connect("mqtt.klank.school", 7000, 60)
|
|
|
|
# Start the MQTT loop in a background thread
|
|
mqttc.loop_start()
|
|
|
|
# asyncio.run_coroutine_threadsafe(do_a_gesture(data.topics[0].removeprefix('main')), loop)
|
|
|
|
# # Keep the asyncio event loop running
|
|
await asyncio.Event().wait()
|
|
|
|
# Run the main function
|
|
asyncio.run(main())
|