Skip to content

Instantly share code, notes, and snippets.

@aleeraser
Created January 28, 2019 18:54
Show Gist options
  • Select an option

  • Save aleeraser/41ae90eaaca5ce0b74cfc3dc317d497a to your computer and use it in GitHub Desktop.

Select an option

Save aleeraser/41ae90eaaca5ce0b74cfc3dc317d497a to your computer and use it in GitHub Desktop.

Revisions

  1. aleeraser created this gist Jan 28, 2019.
    264 changes: 264 additions & 0 deletions PyPlugTelegramBot.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,264 @@
    import urequests

    SSID = '[ENTER_SSID]'
    PSW = '[ENTER_PASSWORD]'

    CONNECTION_TIMEOUT = 10000 # milliseconds
    READ_TIMEOUT = 1000 # milliseconds
    TOKEN = '345342414:AAEwdKYs87fAJLPovD4f5k31yXnflnYFs54'

    userChatId = None
    notificationPowerMargin = 0.0
    notificationCurrentMargin = 0.0


    def buildTelegramAPIRequest(token, method, params):
    request = 'https://api.telegram.org/bot{}/{}?'.format(token, method)

    for key in params.keys():
    request += '{}={}&'.format(key, params[key])

    return request[:len(request) - 1]


    def makeRequest(token, method, lastUpdate, timeout=60):
    params = {'timeout': timeout, 'offset': lastUpdate + 1}
    url = buildTelegramAPIRequest(token, method, params)
    res = urequests.get(url)
    if res is not None and res.json()['ok']:
    jsonval = res.json()
    res.close()
    return jsonval['result']
    elif res is not None:
    res.close()
    return None
    else:
    return None


    def sendMessage(token, chatid, msg):
    params = {'chat_id': chatid, 'text': msg}
    url = buildTelegramAPIRequest(token, 'sendMessage', params)
    res = urequests.post(url)
    if res is not None:
    res.close()


    def setAP(disableStation=False):
    '''Set ESP in AccesPoint mode. The network name is something like ESP_XXXXXX.'''
    import network

    ap_if = network.WLAN(network.AP_IF)
    ap_if.active(False)
    if disableStation:
    sta_if = network.WLAN(network.STA_IF)
    sta_if.active(False)
    ap_if.active(True)
    ap_if.ifconfig()
    print('AP config: {}'.format(ap_if.ifconfig()))


    def parseCommand(update):
    res = {}
    if 'entities' not in update['message']:
    return None

    for entity in update['message']['entities']:
    if entity['type'] == 'bot_command':
    start = entity['offset']
    end = start + entity['length']
    res['command'] = update['message']['text'][start:end]
    res['args'] = [x for x in update['message']
    ['text'][end:].split(' ') if len(x) > 0]
    return res


    def setStation(ssid, passwd):
    '''Set ESP in Station mode. Parameters are network SSID and password.'''
    import network
    import time

    sta_if = network.WLAN(network.STA_IF)
    sta_if.active(False)

    if not sta_if.isconnected():
    print('Connecting to \'{}\'...'.format(ssid))
    sta_if.active(True)
    sta_if.connect(ssid, passwd)

    startTime = time.ticks_ms() # timeout for the loop below
    while not sta_if.isconnected():
    if time.ticks_diff(time.ticks_ms(), startTime) > CONNECTION_TIMEOUT:
    print('Timeout while connecting to network \'{}\'.'.format(ssid))
    sta_if.active(False)
    print('Enabling AP.')
    setAP()
    return False

    print('Connected to \'{}\''.format(ssid))
    print('STA config: {}'.format(sta_if.ifconfig()))

    print('Disabling AP.')
    ap_if = network.WLAN(network.AP_IF)
    ap_if.active(False)

    return True


    def clearUart():
    import machine
    uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10)
    uart.write('\n')


    def getFromUart(command):
    '''Write a command to the UART bus and return the result value. Accepted commands are:
    - `ATON`: turn relay on
    - `ATOFF`: turn relay off
    - `ATPRINT`: print status informations (every second)
    - `ATZERO`: reset energy consumption counter
    - `ATRESET`: reset any counter
    - `ATPOWER`: get actual power consumption
    - `ATREAD`: get actual current consumption
    - `ATSTATE`: get relay status (0/1)
    Since `uart.read()` is non-blocking, '\\n' is expected as terminating character.'''
    import machine
    import time

    uart = machine.UART(1, baudrate=9600, rx=16, tx=17, timeout=10)
    uart.write(command + '\n')

    res = bytes()
    startTime = time.ticks_ms() # timeout for the loop below
    while b'\n' not in res:
    toAppend = uart.read()

    if toAppend:
    if res != b'':
    res += toAppend
    else:
    res = toAppend

    if time.ticks_diff(time.ticks_ms(), startTime) > READ_TIMEOUT:
    print('ERROR: read timeout')
    raise ValueError()
    # return b'ERROR: read timeout'

    res = res.decode('utf-8').replace('\n', '').encode()

    return res


    def readState():
    state = getFromUart(b'ATSTATE\n') # 0
    current = getFromUart(b'ATREAD\n') # 1
    power = getFromUart(b'ATPOWER\n') # 2

    return 'The switch is {}, discharging {} Amperes and has consumed {} Watts/hour'.format(
    'on' if int(state) > 0 else 'off',
    current.decode(), power.decode()
    )


    def respondToCommand(command):
    global inLoop, notificationPowerMargin, notificationCurrentMargin
    cmd = command['command'].upper()
    try:
    if cmd == '/ON':
    getFromUart('ATON')
    res = 'The switch is on'
    elif cmd == '/OFF':
    getFromUart('ATOFF')
    res = 'The switch is off'
    elif cmd == '/STATE':
    res = readState()
    elif cmd == '/RESET':
    getFromUart('ATZERO')
    res = readState()
    elif cmd == '/REPL':
    inLoop = False
    res = 'Activating webrepl...'
    elif cmd == '/NOTIFYPOWER':
    if len(command['args']) < 1:
    res = 'Missing arguments'
    else:
    try:
    margin = float(command['args'][0])
    res = 'You will be notified when the power consumption exceeds {} Watts/hour'.format(
    margin)
    notificationPowerMargin = margin
    except BaseException:
    res = 'Wrong argument (expected number)'
    elif cmd == '/NOTIFYCURRENT':
    if len(command['args']) < 1:
    res = 'Missing arguments'
    else:
    try:
    margin = float(command['args'][0])
    res = 'You will be notified when the current consumption exceeds {} Amperes'.format(
    margin)
    notificationCurrentMargin = margin
    except BaseException:
    res = 'Wrong argument (expected number)'
    else:
    res = 'Unknown command'

    return res
    except ValueError:
    return 'The current sensor is not responding!'


    def checkForCurrentMargin():
    global notificationCurrentMargin
    current = float(getFromUart('ATREAD'))
    if current > notificationCurrentMargin:
    sendMessage(TOKEN, userChatId,
    'The current absorption reached {} Amperes'.format(current))
    notificationCurrentMargin = 0.0


    def checkForPowerMargin():
    global notificationPowerMargin
    power = float(getFromUart('ATPOWER'))
    if power > notificationPowerMargin:
    sendMessage(TOKEN, userChatId,
    'The power consumption reached {} Watts/hour!'.format(power))
    notificationPowerMargin = 0.0


    def main():
    global inLoop, userChatId, notificationPowerMargin, notificationCurrentMargin
    if setStation(SSID, PSW):
    clearUart()
    inLoop = True
    lastUpdate = 0
    while inLoop:
    timeout = 60
    if notificationPowerMargin > 0:
    checkForPowerMargin()
    timeout = 20
    if notificationCurrentMargin > 0:
    checkForCurrentMargin()
    timeout = 5

    res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=timeout)
    if not res:
    continue

    for update in res:
    if update['update_id'] > lastUpdate:
    lastUpdate = update['update_id']

    command = parseCommand(update)

    if command:
    userChatId = update['message']['chat']['id']
    msg = respondToCommand(command)
    sendMessage(TOKEN, update['message']['chat']
    ['id'], msg)
    print(update)

    res = makeRequest(TOKEN, 'getUpdates', lastUpdate, timeout=0)

    print('Entering webrepl...')