Created
January 28, 2019 18:54
-
-
Save aleeraser/41ae90eaaca5ce0b74cfc3dc317d497a to your computer and use it in GitHub Desktop.
Revisions
-
aleeraser created this gist
Jan 28, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,264 @@ import urequests SSID = '[ENTER_SSID]' PSW = '[ENTER_PASSWORD]' CONNECTION_TIMEOUT = 10000 # milliseconds READ_TIMEOUT = 1000 # milliseconds TOKEN = '345342414:AAEwdKYs87fAJLPovD4f5k31yXnflnYFs54' userChatId = None notificationPowerMargin = 0.0 notificationCurrentMargin = 0.0 def buildTelegramAPIRequest(token, method, params): request = 'https://api.telegram.org/bot{}/{}?'.format(token, method) for key in params.keys(): request += '{}={}&'.format(key, params[key]) return request[:len(request) - 1] def makeRequest(token, method, lastUpdate, timeout=60): params = {'timeout': timeout, 'offset': lastUpdate + 1} url = buildTelegramAPIRequest(token, method, params) res = urequests.get(url) if res is not None and res.json()['ok']: jsonval = res.json() res.close() return jsonval['result'] elif res is not None: res.close() return None else: return None def sendMessage(token, chatid, msg): params = {'chat_id': chatid, 'text': msg} url = buildTelegramAPIRequest(token, 'sendMessage', params) res = urequests.post(url) if res is not None: res.close() def setAP(disableStation=False): '''Set ESP in AccesPoint mode. The network name is something like ESP_XXXXXX.''' import network ap_if = network.WLAN(network.AP_IF) ap_if.active(False) if disableStation: sta_if = network.WLAN(network.STA_IF) sta_if.active(False) ap_if.active(True) ap_if.ifconfig() print('AP config: {}'.format(ap_if.ifconfig())) def parseCommand(update): res = {} if 'entities' not in update['message']: return None for entity in update['message']['entities']: if entity['type'] == 'bot_command': start = entity['offset'] end = start + entity['length'] res['command'] = update['message']['text'][start:end] res['args'] = [x for x in update['message'] ['text'][end:].split(' ') if len(x) > 0] return res def setStation(ssid, passwd): '''Set ESP in Station mode. Parameters are network SSID and password.''' import network import time sta_if = network.WLAN(network.STA_IF) sta_if.active(False) if not sta_if.isconnected(): print('Connecting to \'{}\'...'.format(ssid)) sta_if.active(True) sta_if.connect(ssid, passwd) startTime = time.ticks_ms() # timeout for the loop below while not sta_if.isconnected(): if time.ticks_diff(time.ticks_ms(), startTime) > CONNECTION_TIMEOUT: print('Timeout while connecting to network \'{}\'.'.format(ssid)) sta_if.active(False) print('Enabling AP.') setAP() return False print('Connected to \'{}\''.format(ssid)) print('STA config: {}'.format(sta_if.ifconfig())) print('Disabling AP.') ap_if = network.WLAN(network.AP_IF) ap_if.active(False) return True def clearUart(): import machine uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10) uart.write('\n') def getFromUart(command): '''Write a command to the UART bus and return the result value. Accepted commands are: - `ATON`: turn relay on - `ATOFF`: turn relay off - `ATPRINT`: print status informations (every second) - `ATZERO`: reset energy consumption counter - `ATRESET`: reset any counter - `ATPOWER`: get actual power consumption - `ATREAD`: get actual current consumption - `ATSTATE`: get relay status (0/1) Since `uart.read()` is non-blocking, '\\n' is expected as terminating character.''' import machine import time uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10) uart.write(command + '\n') res = bytes() startTime = time.ticks_ms() # timeout for the loop below while b'\n' not in res: toAppend = uart.read() if toAppend: if res != b'': res += toAppend else: res = toAppend if time.ticks_diff(time.ticks_ms(), startTime) > READ_TIMEOUT: print('ERROR: read timeout') raise ValueError() # return b'ERROR: read timeout' res = res.decode('utf-8').replace('\n', '').encode() return res def readState(): state = getFromUart(b'ATSTATE\n') # 0 current = getFromUart(b'ATREAD\n') # 1 power = getFromUart(b'ATPOWER\n') # 2 return 'The switch is {}, discharging {} Amperes and has consumed {} Watts/hour'.format( 'on' if int(state) > 0 else 'off', current.decode(), power.decode() ) def respondToCommand(command): global inLoop, notificationPowerMargin, notificationCurrentMargin cmd = command['command'].upper() try: if cmd == '/ON': getFromUart('ATON') res = 'The switch is on' elif cmd == '/OFF': getFromUart('ATOFF') res = 'The switch is off' elif cmd == '/STATE': res = readState() elif cmd == '/RESET': getFromUart('ATZERO') res = readState() elif cmd == '/REPL': inLoop = False res = 'Activating webrepl...' elif cmd == '/NOTIFYPOWER': if len(command['args']) < 1: res = 'Missing arguments' else: try: margin = float(command['args'][0]) res = 'You will be notified when the power consumption exceeds {} Watts/hour'.format( margin) notificationPowerMargin = margin except BaseException: res = 'Wrong argument (expected number)' elif cmd == '/NOTIFYCURRENT': if len(command['args']) < 1: res = 'Missing arguments' else: try: margin = float(command['args'][0]) res = 'You will be notified when the current consumption exceeds {} Amperes'.format( margin) notificationCurrentMargin = margin except BaseException: res = 'Wrong argument (expected number)' else: res = 'Unknown command' return res except ValueError: return 'The current sensor is not responding!' def checkForCurrentMargin(): global notificationCurrentMargin current = float(getFromUart('ATREAD')) if current > notificationCurrentMargin: sendMessage(TOKEN, userChatId, 'The current absorption reached {} Amperes'.format(current)) notificationCurrentMargin = 0.0 def checkForPowerMargin(): global notificationPowerMargin power = float(getFromUart('ATPOWER')) if power > notificationPowerMargin: sendMessage(TOKEN, userChatId, 'The power consumption reached {} Watts/hour!'.format(power)) notificationPowerMargin = 0.0 def main(): global inLoop, userChatId, notificationPowerMargin, notificationCurrentMargin if setStation(SSID, PSW): clearUart() inLoop = True lastUpdate = 0 while inLoop: timeout = 60 if notificationPowerMargin > 0: checkForPowerMargin() timeout = 20 if notificationCurrentMargin > 0: checkForCurrentMargin() timeout = 5 res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=timeout) if not res: continue for update in res: if update['update_id'] > lastUpdate: lastUpdate = update['update_id'] command = parseCommand(update) if command: userChatId = update['message']['chat']['id'] msg = respondToCommand(command) sendMessage(TOKEN, update['message']['chat'] ['id'], msg) print(update) res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=0) print('Entering webrepl...')