import urequests SSID = '[ENTER_SSID]' PSW = '[ENTER_PASSWORD]' CONNECTION_TIMEOUT = 10000 # milliseconds READ_TIMEOUT = 1000 # milliseconds TOKEN = '345342414:AAEwdKYs87fAJLPovD4f5k31yXnflnYFs54' userChatId = None notificationPowerMargin = 0.0 notificationCurrentMargin = 0.0 def buildTelegramAPIRequest(token, method, params): request = 'https://api.telegram.org/bot{}/{}?'.format(token, method) for key in params.keys(): request += '{}={}&'.format(key, params[key]) return request[:len(request) - 1] def makeRequest(token, method, lastUpdate, timeout=60): params = {'timeout': timeout, 'offset': lastUpdate + 1} url = buildTelegramAPIRequest(token, method, params) res = urequests.get(url) if res is not None and res.json()['ok']: jsonval = res.json() res.close() return jsonval['result'] elif res is not None: res.close() return None else: return None def sendMessage(token, chatid, msg): params = {'chat_id': chatid, 'text': msg} url = buildTelegramAPIRequest(token, 'sendMessage', params) res = urequests.post(url) if res is not None: res.close() def setAP(disableStation=False): '''Set ESP in AccesPoint mode. The network name is something like ESP_XXXXXX.''' import network ap_if = network.WLAN(network.AP_IF) ap_if.active(False) if disableStation: sta_if = network.WLAN(network.STA_IF) sta_if.active(False) ap_if.active(True) ap_if.ifconfig() print('AP config: {}'.format(ap_if.ifconfig())) def parseCommand(update): res = {} if 'entities' not in update['message']: return None for entity in update['message']['entities']: if entity['type'] == 'bot_command': start = entity['offset'] end = start + entity['length'] res['command'] = update['message']['text'][start:end] res['args'] = [x for x in update['message'] ['text'][end:].split(' ') if len(x) > 0] return res def setStation(ssid, passwd): '''Set ESP in Station mode. Parameters are network SSID and password.''' import network import time sta_if = network.WLAN(network.STA_IF) sta_if.active(False) if not sta_if.isconnected(): print('Connecting to \'{}\'...'.format(ssid)) sta_if.active(True) sta_if.connect(ssid, passwd) startTime = time.ticks_ms() # timeout for the loop below while not sta_if.isconnected(): if time.ticks_diff(time.ticks_ms(), startTime) > CONNECTION_TIMEOUT: print('Timeout while connecting to network \'{}\'.'.format(ssid)) sta_if.active(False) print('Enabling AP.') setAP() return False print('Connected to \'{}\''.format(ssid)) print('STA config: {}'.format(sta_if.ifconfig())) print('Disabling AP.') ap_if = network.WLAN(network.AP_IF) ap_if.active(False) return True def clearUart(): import machine uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10) uart.write('\n') def getFromUart(command): '''Write a command to the UART bus and return the result value. Accepted commands are: - `ATON`: turn relay on - `ATOFF`: turn relay off - `ATPRINT`: print status informations (every second) - `ATZERO`: reset energy consumption counter - `ATRESET`: reset any counter - `ATPOWER`: get actual power consumption - `ATREAD`: get actual current consumption - `ATSTATE`: get relay status (0/1) Since `uart.read()` is non-blocking, '\\n' is expected as terminating character.''' import machine import time uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10) uart.write(command + '\n') res = bytes() startTime = time.ticks_ms() # timeout for the loop below while b'\n' not in res: toAppend = uart.read() if toAppend: if res != b'': res += toAppend else: res = toAppend if time.ticks_diff(time.ticks_ms(), startTime) > READ_TIMEOUT: print('ERROR: read timeout') raise ValueError() # return b'ERROR: read timeout' res = res.decode('utf-8').replace('\n', '').encode() return res def readState(): state = getFromUart(b'ATSTATE\n') # 0 current = getFromUart(b'ATREAD\n') # 1 power = getFromUart(b'ATPOWER\n') # 2 return 'The switch is {}, discharging {} Amperes and has consumed {} Watts/hour'.format( 'on' if int(state) > 0 else 'off', current.decode(), power.decode() ) def respondToCommand(command): global inLoop, notificationPowerMargin, notificationCurrentMargin cmd = command['command'].upper() try: if cmd == '/ON': getFromUart('ATON') res = 'The switch is on' elif cmd == '/OFF': getFromUart('ATOFF') res = 'The switch is off' elif cmd == '/STATE': res = readState() elif cmd == '/RESET': getFromUart('ATZERO') res = readState() elif cmd == '/REPL': inLoop = False res = 'Activating webrepl...' elif cmd == '/NOTIFYPOWER': if len(command['args']) < 1: res = 'Missing arguments' else: try: margin = float(command['args'][0]) res = 'You will be notified when the power consumption exceeds {} Watts/hour'.format( margin) notificationPowerMargin = margin except BaseException: res = 'Wrong argument (expected number)' elif cmd == '/NOTIFYCURRENT': if len(command['args']) < 1: res = 'Missing arguments' else: try: margin = float(command['args'][0]) res = 'You will be notified when the current consumption exceeds {} Amperes'.format( margin) notificationCurrentMargin = margin except BaseException: res = 'Wrong argument (expected number)' else: res = 'Unknown command' return res except ValueError: return 'The current sensor is not responding!' def checkForCurrentMargin(): global notificationCurrentMargin current = float(getFromUart('ATREAD')) if current > notificationCurrentMargin: sendMessage(TOKEN, userChatId, 'The current absorption reached {} Amperes'.format(current)) notificationCurrentMargin = 0.0 def checkForPowerMargin(): global notificationPowerMargin power = float(getFromUart('ATPOWER')) if power > notificationPowerMargin: sendMessage(TOKEN, userChatId, 'The power consumption reached {} Watts/hour!'.format(power)) notificationPowerMargin = 0.0 def main(): global inLoop, userChatId, notificationPowerMargin, notificationCurrentMargin if setStation(SSID, PSW): clearUart() inLoop = True lastUpdate = 0 while inLoop: timeout = 60 if notificationPowerMargin > 0: checkForPowerMargin() timeout = 20 if notificationCurrentMargin > 0: checkForCurrentMargin() timeout = 5 res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=timeout) if not res: continue for update in res: if update['update_id'] > lastUpdate: lastUpdate = update['update_id'] command = parseCommand(update) if command: userChatId = update['message']['chat']['id'] msg = respondToCommand(command) sendMessage(TOKEN, update['message']['chat'] ['id'], msg) print(update) res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=0) print('Entering webrepl...')