Skip to content

Instantly share code, notes, and snippets.

@rafavg77
Created August 17, 2024 19:18
Show Gist options
  • Save rafavg77/d21d32c54e7e284492dd0fb431458d72 to your computer and use it in GitHub Desktop.
Save rafavg77/d21d32c54e7e284492dd0fb431458d72 to your computer and use it in GitHub Desktop.
import random
import time
import board
import adafruit_dht
from paho.mqtt import client as mqtt_client
import json
# Configuración del sensor DHT11
sensor = adafruit_dht.DHT11(board.D4)
# Configuración MQTT
broker = '127.0.0.1'
port = 1883
username = 'mqtt'
password = 'pass'
topic_config_prefix = "homeassistant/sensor/dht_monitor/oficina"
topic_data_prefix = "dht_monitor/oficina"
client_id = f'publish-{random.randint(0, 1000)}'
# Conectar al broker MQTT
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Conectado al broker MQTT")
publish_discovery_messages(client)
else:
print(f"Fallo al conectar, código de retorno: {rc}")
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
# Publicar mensajes de descubrimiento para Home Assistant
def publish_discovery_messages(client):
device_info = {
"identifiers": ["rpi_dht"],
"name": "Kyubi_dht",
"manufacturer": "Tota sensors Ltd.",
"model": "Sensor DHT11",
"sw_version": "2024.08.17.1"
}
# Configuración de la entidad de temperatura
temperature_config = {
"device_class": "temperature",
"state_topic": f"{topic_data_prefix}/temperature/state",
"unit_of_measurement": "°C",
"value_template": "{{ value_json.temperature }}",
"uniq_id": "dht_oficina_temperature",
"icon": "hass:thermometer",
"device": device_info
}
client.publish(f"{topic_config_prefix}_temperature/config", json.dumps(temperature_config), retain=True)
# Configuración de la entidad de humedad
humidity_config = {
"device_class": "humidity",
"state_topic": f"{topic_data_prefix}/humidity/state",
"unit_of_measurement": "%",
"value_template": "{{ value_json.humidity }}",
"uniq_id": "dht_oficina_humidity",
"device": device_info
}
client.publish(f"{topic_config_prefix}_humidity/config", json.dumps(humidity_config), retain=True)
# Publicar datos de temperatura y humedad
def publish(client):
while True:
try:
temperature_c = sensor.temperature
humidity = sensor.humidity
if temperature_c is not None:
state_temperature = {"temperature": temperature_c}
client.publish(f"{topic_data_prefix}/temperature/state", json.dumps(state_temperature), retain=True)
if humidity is not None:
state_humidity = {"humidity": humidity}
client.publish(f"{topic_data_prefix}/humidity/state", json.dumps(state_humidity), retain=True)
print(f"Temperature: {temperature_c}°C, Humidity: {humidity}%")
except RuntimeError as error:
print(f"RuntimeError: {error.args[0]}")
time.sleep(2.0)
except Exception as error:
sensor.exit()
raise error
time.sleep(60)
# Ejecutar el cliente MQTT
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
client.loop_stop()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment