Created
September 1, 2025 14:37
-
-
Save drohhyn/7a241c7867265f4123ff11d3a5dbd133 to your computer and use it in GitHub Desktop.
elegoo centauri carbon to mqtt python script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import websockets | |
| import paho.mqtt.client as mqtt | |
| import json | |
| WEBSOCKET_URL = 'ws://192.168.12.34:3030/websocket' | |
| MQTT_BROKER = 'mqtt.local' | |
| MQTT_USER = 'mqtt_login' | |
| MQTT_USER_PW = 'mqtt_password' | |
| MQTT_PORT = 1883 | |
| MQTT_BASE_TOPIC = "ECC" | |
| PING_INTERVAL = 58 | |
| async def websocket_handler(uri): | |
| async with websockets.connect(uri) as websocket: | |
| while True: | |
| try: | |
| response = await asyncio.wait_for(websocket.recv(), timeout=PING_INTERVAL) | |
| json_data = json.loads(response) | |
| mqtt_publish(json_data) | |
| # Reset the ping timer after receiving a message | |
| start_time = time.time() | |
| except asyncio.TimeoutError: | |
| # Send ping if no message received within PING_INTERVAL seconds | |
| await websocket.ping() | |
| def mqtt_publish(data, topic=''): | |
| client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "ecc_mqtt_client_HEX") | |
| client.username_pw_set(MQTT_USER, MQTT_USER_PW) | |
| client.connect(MQTT_BROKER, MQTT_PORT, 60) | |
| publish_mqtt_messages(client, data, topic) | |
| client.disconnect() | |
| def publish_mqtt_messages(client, data, base_topic): | |
| if isinstance(data, dict): | |
| for key, value in data.items(): | |
| new_topic = f"{base_topic}/{key}" if base_topic else key | |
| publish_mqtt_messages(client, value, new_topic) | |
| elif isinstance(data, list): | |
| for index, item in enumerate(data): | |
| new_topic = f"{base_topic}/item{index}" | |
| publish_mqtt_messages(client, item, new_topic) | |
| else: | |
| client.publish(f"{MQTT_BASE_TOPIC}/{base_topic}", json.dumps(data)) | |
| async def main(): | |
| while True: | |
| try: | |
| await websocket_handler(WEBSOCKET_URL) | |
| except websockets.ConnectionClosedError: | |
| print("Connection closed, reopening...") | |
| await asyncio.sleep(5) | |
| if __name__ == "__main__": | |
| import time | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment