Skip to content

Instantly share code, notes, and snippets.

@aleeraser
Created January 28, 2019 18:54
Show Gist options
  • Select an option

  • Save aleeraser/41ae90eaaca5ce0b74cfc3dc317d497a to your computer and use it in GitHub Desktop.

Select an option

Save aleeraser/41ae90eaaca5ce0b74cfc3dc317d497a to your computer and use it in GitHub Desktop.
Example for controlling PyPlug's ESP32 with a Telegram bot.
import urequests
SSID = '[ENTER_SSID]'
PSW = '[ENTER_PASSWORD]'
CONNECTION_TIMEOUT = 10000 # milliseconds
READ_TIMEOUT = 1000 # milliseconds
TOKEN = '345342414:AAEwdKYs87fAJLPovD4f5k31yXnflnYFs54'
userChatId = None
notificationPowerMargin = 0.0
notificationCurrentMargin = 0.0
def buildTelegramAPIRequest(token, method, params):
request = 'https://api.telegram.org/bot{}/{}?'.format(token, method)
for key in params.keys():
request += '{}={}&'.format(key, params[key])
return request[:len(request) - 1]
def makeRequest(token, method, lastUpdate, timeout=60):
params = {'timeout': timeout, 'offset': lastUpdate + 1}
url = buildTelegramAPIRequest(token, method, params)
res = urequests.get(url)
if res is not None and res.json()['ok']:
jsonval = res.json()
res.close()
return jsonval['result']
elif res is not None:
res.close()
return None
else:
return None
def sendMessage(token, chatid, msg):
params = {'chat_id': chatid, 'text': msg}
url = buildTelegramAPIRequest(token, 'sendMessage', params)
res = urequests.post(url)
if res is not None:
res.close()
def setAP(disableStation=False):
'''Set ESP in AccesPoint mode. The network name is something like ESP_XXXXXX.'''
import network
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
if disableStation:
sta_if = network.WLAN(network.STA_IF)
sta_if.active(False)
ap_if.active(True)
ap_if.ifconfig()
print('AP config: {}'.format(ap_if.ifconfig()))
def parseCommand(update):
res = {}
if 'entities' not in update['message']:
return None
for entity in update['message']['entities']:
if entity['type'] == 'bot_command':
start = entity['offset']
end = start + entity['length']
res['command'] = update['message']['text'][start:end]
res['args'] = [x for x in update['message']
['text'][end:].split(' ') if len(x) > 0]
return res
def setStation(ssid, passwd):
'''Set ESP in Station mode. Parameters are network SSID and password.'''
import network
import time
sta_if = network.WLAN(network.STA_IF)
sta_if.active(False)
if not sta_if.isconnected():
print('Connecting to \'{}\'...'.format(ssid))
sta_if.active(True)
sta_if.connect(ssid, passwd)
startTime = time.ticks_ms() # timeout for the loop below
while not sta_if.isconnected():
if time.ticks_diff(time.ticks_ms(), startTime) > CONNECTION_TIMEOUT:
print('Timeout while connecting to network \'{}\'.'.format(ssid))
sta_if.active(False)
print('Enabling AP.')
setAP()
return False
print('Connected to \'{}\''.format(ssid))
print('STA config: {}'.format(sta_if.ifconfig()))
print('Disabling AP.')
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
return True
def clearUart():
import machine
uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10)
uart.write('\n')
def getFromUart(command):
'''Write a command to the UART bus and return the result value. Accepted commands are:
- `ATON`: turn relay on
- `ATOFF`: turn relay off
- `ATPRINT`: print status informations (every second)
- `ATZERO`: reset energy consumption counter
- `ATRESET`: reset any counter
- `ATPOWER`: get actual power consumption
- `ATREAD`: get actual current consumption
- `ATSTATE`: get relay status (0/1)
Since `uart.read()` is non-blocking, '\\n' is expected as terminating character.'''
import machine
import time
uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10)
uart.write(command + '\n')
res = bytes()
startTime = time.ticks_ms() # timeout for the loop below
while b'\n' not in res:
toAppend = uart.read()
if toAppend:
if res != b'':
res += toAppend
else:
res = toAppend
if time.ticks_diff(time.ticks_ms(), startTime) > READ_TIMEOUT:
print('ERROR: read timeout')
raise ValueError()
# return b'ERROR: read timeout'
res = res.decode('utf-8').replace('\n', '').encode()
return res
def readState():
state = getFromUart(b'ATSTATE\n') # 0
current = getFromUart(b'ATREAD\n') # 1
power = getFromUart(b'ATPOWER\n') # 2
return 'The switch is {}, discharging {} Amperes and has consumed {} Watts/hour'.format(
'on' if int(state) > 0 else 'off',
current.decode(), power.decode()
)
def respondToCommand(command):
global inLoop, notificationPowerMargin, notificationCurrentMargin
cmd = command['command'].upper()
try:
if cmd == '/ON':
getFromUart('ATON')
res = 'The switch is on'
elif cmd == '/OFF':
getFromUart('ATOFF')
res = 'The switch is off'
elif cmd == '/STATE':
res = readState()
elif cmd == '/RESET':
getFromUart('ATZERO')
res = readState()
elif cmd == '/REPL':
inLoop = False
res = 'Activating webrepl...'
elif cmd == '/NOTIFYPOWER':
if len(command['args']) < 1:
res = 'Missing arguments'
else:
try:
margin = float(command['args'][0])
res = 'You will be notified when the power consumption exceeds {} Watts/hour'.format(
margin)
notificationPowerMargin = margin
except BaseException:
res = 'Wrong argument (expected number)'
elif cmd == '/NOTIFYCURRENT':
if len(command['args']) < 1:
res = 'Missing arguments'
else:
try:
margin = float(command['args'][0])
res = 'You will be notified when the current consumption exceeds {} Amperes'.format(
margin)
notificationCurrentMargin = margin
except BaseException:
res = 'Wrong argument (expected number)'
else:
res = 'Unknown command'
return res
except ValueError:
return 'The current sensor is not responding!'
def checkForCurrentMargin():
global notificationCurrentMargin
current = float(getFromUart('ATREAD'))
if current > notificationCurrentMargin:
sendMessage(TOKEN, userChatId,
'The current absorption reached {} Amperes'.format(current))
notificationCurrentMargin = 0.0
def checkForPowerMargin():
global notificationPowerMargin
power = float(getFromUart('ATPOWER'))
if power > notificationPowerMargin:
sendMessage(TOKEN, userChatId,
'The power consumption reached {} Watts/hour!'.format(power))
notificationPowerMargin = 0.0
def main():
global inLoop, userChatId, notificationPowerMargin, notificationCurrentMargin
if setStation(SSID, PSW):
clearUart()
inLoop = True
lastUpdate = 0
while inLoop:
timeout = 60
if notificationPowerMargin > 0:
checkForPowerMargin()
timeout = 20
if notificationCurrentMargin > 0:
checkForCurrentMargin()
timeout = 5
res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=timeout)
if not res:
continue
for update in res:
if update['update_id'] > lastUpdate:
lastUpdate = update['update_id']
command = parseCommand(update)
if command:
userChatId = update['message']['chat']['id']
msg = respondToCommand(command)
sendMessage(TOKEN, update['message']['chat']
['id'], msg)
print(update)
res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=0)
print('Entering webrepl...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment