Created
January 28, 2019 18:54
-
-
Save aleeraser/41ae90eaaca5ce0b74cfc3dc317d497a to your computer and use it in GitHub Desktop.
Example for controlling PyPlug's ESP32 with a Telegram bot.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import urequests | |
| SSID = '[ENTER_SSID]' | |
| PSW = '[ENTER_PASSWORD]' | |
| CONNECTION_TIMEOUT = 10000 # milliseconds | |
| READ_TIMEOUT = 1000 # milliseconds | |
| TOKEN = '345342414:AAEwdKYs87fAJLPovD4f5k31yXnflnYFs54' | |
| userChatId = None | |
| notificationPowerMargin = 0.0 | |
| notificationCurrentMargin = 0.0 | |
| def buildTelegramAPIRequest(token, method, params): | |
| request = 'https://api.telegram.org/bot{}/{}?'.format(token, method) | |
| for key in params.keys(): | |
| request += '{}={}&'.format(key, params[key]) | |
| return request[:len(request) - 1] | |
| def makeRequest(token, method, lastUpdate, timeout=60): | |
| params = {'timeout': timeout, 'offset': lastUpdate + 1} | |
| url = buildTelegramAPIRequest(token, method, params) | |
| res = urequests.get(url) | |
| if res is not None and res.json()['ok']: | |
| jsonval = res.json() | |
| res.close() | |
| return jsonval['result'] | |
| elif res is not None: | |
| res.close() | |
| return None | |
| else: | |
| return None | |
| def sendMessage(token, chatid, msg): | |
| params = {'chat_id': chatid, 'text': msg} | |
| url = buildTelegramAPIRequest(token, 'sendMessage', params) | |
| res = urequests.post(url) | |
| if res is not None: | |
| res.close() | |
| def setAP(disableStation=False): | |
| '''Set ESP in AccesPoint mode. The network name is something like ESP_XXXXXX.''' | |
| import network | |
| ap_if = network.WLAN(network.AP_IF) | |
| ap_if.active(False) | |
| if disableStation: | |
| sta_if = network.WLAN(network.STA_IF) | |
| sta_if.active(False) | |
| ap_if.active(True) | |
| ap_if.ifconfig() | |
| print('AP config: {}'.format(ap_if.ifconfig())) | |
| def parseCommand(update): | |
| res = {} | |
| if 'entities' not in update['message']: | |
| return None | |
| for entity in update['message']['entities']: | |
| if entity['type'] == 'bot_command': | |
| start = entity['offset'] | |
| end = start + entity['length'] | |
| res['command'] = update['message']['text'][start:end] | |
| res['args'] = [x for x in update['message'] | |
| ['text'][end:].split(' ') if len(x) > 0] | |
| return res | |
| def setStation(ssid, passwd): | |
| '''Set ESP in Station mode. Parameters are network SSID and password.''' | |
| import network | |
| import time | |
| sta_if = network.WLAN(network.STA_IF) | |
| sta_if.active(False) | |
| if not sta_if.isconnected(): | |
| print('Connecting to \'{}\'...'.format(ssid)) | |
| sta_if.active(True) | |
| sta_if.connect(ssid, passwd) | |
| startTime = time.ticks_ms() # timeout for the loop below | |
| while not sta_if.isconnected(): | |
| if time.ticks_diff(time.ticks_ms(), startTime) > CONNECTION_TIMEOUT: | |
| print('Timeout while connecting to network \'{}\'.'.format(ssid)) | |
| sta_if.active(False) | |
| print('Enabling AP.') | |
| setAP() | |
| return False | |
| print('Connected to \'{}\''.format(ssid)) | |
| print('STA config: {}'.format(sta_if.ifconfig())) | |
| print('Disabling AP.') | |
| ap_if = network.WLAN(network.AP_IF) | |
| ap_if.active(False) | |
| return True | |
| def clearUart(): | |
| import machine | |
| uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10) | |
| uart.write('\n') | |
| def getFromUart(command): | |
| '''Write a command to the UART bus and return the result value. Accepted commands are: | |
| - `ATON`: turn relay on | |
| - `ATOFF`: turn relay off | |
| - `ATPRINT`: print status informations (every second) | |
| - `ATZERO`: reset energy consumption counter | |
| - `ATRESET`: reset any counter | |
| - `ATPOWER`: get actual power consumption | |
| - `ATREAD`: get actual current consumption | |
| - `ATSTATE`: get relay status (0/1) | |
| Since `uart.read()` is non-blocking, '\\n' is expected as terminating character.''' | |
| import machine | |
| import time | |
| uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10) | |
| uart.write(command + '\n') | |
| res = bytes() | |
| startTime = time.ticks_ms() # timeout for the loop below | |
| while b'\n' not in res: | |
| toAppend = uart.read() | |
| if toAppend: | |
| if res != b'': | |
| res += toAppend | |
| else: | |
| res = toAppend | |
| if time.ticks_diff(time.ticks_ms(), startTime) > READ_TIMEOUT: | |
| print('ERROR: read timeout') | |
| raise ValueError() | |
| # return b'ERROR: read timeout' | |
| res = res.decode('utf-8').replace('\n', '').encode() | |
| return res | |
| def readState(): | |
| state = getFromUart(b'ATSTATE\n') # 0 | |
| current = getFromUart(b'ATREAD\n') # 1 | |
| power = getFromUart(b'ATPOWER\n') # 2 | |
| return 'The switch is {}, discharging {} Amperes and has consumed {} Watts/hour'.format( | |
| 'on' if int(state) > 0 else 'off', | |
| current.decode(), power.decode() | |
| ) | |
| def respondToCommand(command): | |
| global inLoop, notificationPowerMargin, notificationCurrentMargin | |
| cmd = command['command'].upper() | |
| try: | |
| if cmd == '/ON': | |
| getFromUart('ATON') | |
| res = 'The switch is on' | |
| elif cmd == '/OFF': | |
| getFromUart('ATOFF') | |
| res = 'The switch is off' | |
| elif cmd == '/STATE': | |
| res = readState() | |
| elif cmd == '/RESET': | |
| getFromUart('ATZERO') | |
| res = readState() | |
| elif cmd == '/REPL': | |
| inLoop = False | |
| res = 'Activating webrepl...' | |
| elif cmd == '/NOTIFYPOWER': | |
| if len(command['args']) < 1: | |
| res = 'Missing arguments' | |
| else: | |
| try: | |
| margin = float(command['args'][0]) | |
| res = 'You will be notified when the power consumption exceeds {} Watts/hour'.format( | |
| margin) | |
| notificationPowerMargin = margin | |
| except BaseException: | |
| res = 'Wrong argument (expected number)' | |
| elif cmd == '/NOTIFYCURRENT': | |
| if len(command['args']) < 1: | |
| res = 'Missing arguments' | |
| else: | |
| try: | |
| margin = float(command['args'][0]) | |
| res = 'You will be notified when the current consumption exceeds {} Amperes'.format( | |
| margin) | |
| notificationCurrentMargin = margin | |
| except BaseException: | |
| res = 'Wrong argument (expected number)' | |
| else: | |
| res = 'Unknown command' | |
| return res | |
| except ValueError: | |
| return 'The current sensor is not responding!' | |
| def checkForCurrentMargin(): | |
| global notificationCurrentMargin | |
| current = float(getFromUart('ATREAD')) | |
| if current > notificationCurrentMargin: | |
| sendMessage(TOKEN, userChatId, | |
| 'The current absorption reached {} Amperes'.format(current)) | |
| notificationCurrentMargin = 0.0 | |
| def checkForPowerMargin(): | |
| global notificationPowerMargin | |
| power = float(getFromUart('ATPOWER')) | |
| if power > notificationPowerMargin: | |
| sendMessage(TOKEN, userChatId, | |
| 'The power consumption reached {} Watts/hour!'.format(power)) | |
| notificationPowerMargin = 0.0 | |
| def main(): | |
| global inLoop, userChatId, notificationPowerMargin, notificationCurrentMargin | |
| if setStation(SSID, PSW): | |
| clearUart() | |
| inLoop = True | |
| lastUpdate = 0 | |
| while inLoop: | |
| timeout = 60 | |
| if notificationPowerMargin > 0: | |
| checkForPowerMargin() | |
| timeout = 20 | |
| if notificationCurrentMargin > 0: | |
| checkForCurrentMargin() | |
| timeout = 5 | |
| res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=timeout) | |
| if not res: | |
| continue | |
| for update in res: | |
| if update['update_id'] > lastUpdate: | |
| lastUpdate = update['update_id'] | |
| command = parseCommand(update) | |
| if command: | |
| userChatId = update['message']['chat']['id'] | |
| msg = respondToCommand(command) | |
| sendMessage(TOKEN, update['message']['chat'] | |
| ['id'], msg) | |
| print(update) | |
| res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=0) | |
| print('Entering webrepl...') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment