# SPDX-FileCopyrightText: 2020 Brent Rubell, written for Adafruit Industries # # SPDX-License-Identifier: Unlicense # # File very modified by @PaulskPt. Latest updates on: 2022-08-11, 2024-03-23 # (Github @PaulskPt) # Last CircuitPython update on: 2024-03-22. Updated CPY to V9.0.0 # This script has functionality to "hotplug-ing" the sensor. # 2024-03-24 15h00 in function send_to_pushingbox() added variable stat_result. If stat == 200, stat_result # will show the text "OK" in the top right of the MAGTAG page "latest sensor data". If stat != 200, stat_result # will show the text "NO" (meaning: fail). # 2024-03-25 13h20 added global variables use_sound and use_leds # Button A will toggle sound # Button B will toggle LEDs # Button C will toggle the global variable wait_for_zero_sec. Then tries to write this setting to file: settings.toml. # Button D will toggle the global variable my_debug. Then tries to write this setting to file: settings.toml. # Note that file settings.toml can only be updated from within this script when the MAGTAG is not connected to USB. # In function send_to_pushingbox() the response is 200 (OK), the LEDs will blink green if use_leds is True. # the leds will blink red if the response is other than 200. # For module cptoml see: https://github.com/beryllium-org/cptoml # When reading the sensor the LEDs of the MAGTAG will blink one time in color Yellow. # Depending on the global variable use_sound, a tone will be produced or not. # When the datetime stamp and sensor data successfully are sent to Pushingbox.com, the LEDs of the MAGTAG will blink # Depending on the global variable use_sound, a tone will be produced or not. # one time Green. # Added function is_usb() that ckecks if the board is connected to USB or not. """ The Adafruit MAGTAG contains an ESP32-S2 WROVER chip. Note: On 2022-08-10 at 17h12 utc+1 the Google Apps Script system reported that the project 'DataToGoogle_MAGTAG', deployment version 6, executed successfully. This execution came from Pushingbox.com that was triggered by this CircuitPython script (see function 'send_data_to_google()') Note also that the Google Apps Script deployment was set that it was available to 'anyone' (with the link). On 2024-03-24, in Google Apps Scripts I had to create a new deployment (version 7). It executed successfully. """ import sys import os import supervisor import board import time import wifi import rtc import ipaddress from adafruit_magtag.magtag import MagTag import adafruit_ahtx0 # Global flags (see setup()): use_sound = None use_leds = None wait_for_zero_sec = None my_debug = None on_usb = None SOUND_IDX = 0 LEDS_IDX = 1 ZEROSEC_IDX = 2 DEBUG_IDX = 3 sensor = None stat_result = None interval = None ip = 0 i2c = board.STEMMA_I2C() try: sensor = adafruit_ahtx0.AHTx0(i2c) except ValueError as e: print(f"Global. Error: {e}. Check wiring!") pass # Create an instance of the MagTag class magtag = MagTag() height = magtag.graphics.display.height -1 width = magtag.graphics.display.width -1 # Less brillance than originally (mod by @paulsk) b1=25 # originally 255 b2=15 # idem 150 b3=18 # idem 180 red_tpl = (b2, 0, 0) ylw_tpl = (b1, b2, 0) grn_tpl = (0, b2, 0) blu_tpl = (0, 0, b2) blk_tpl = (0,0,0) RED = 0 YLW = 1 GRN = 2 BLU = 3 BLK = 4 clr_dict = { RED : "RED", YLW : "YLW", GRN : "GRN", BLU : "BLU", BLK : "BLK" } button_colors = (red_tpl, ylw_tpl, grn_tpl, blu_tpl, blk_tpl) # last tuple added for color black button_tones = (1047, 1318, 1568, 2093, 440) msg_sent = 0 tmp_old = 0.00 hum_old = 0.00 upd_dt_old = "" dts = "" # Set by get_time_fm_aio() and used in get_th_direct() curr_tm = None # struct_time rtc_set = False rtc_bi = rtc.RTC() # create an instance of the built-in RTC weekdays = {0:"Monday", 1:"Tuesday",2:"Wednesday",3:"Thursday",4:"Friday",5:"Saturday",6:"Sunday"} tmp_cl = None hum_cl = None class sensor_tmp: def __init__(self, tmp, dt): self._tmp = tmp self._dt = dt @property def tmp(self): return self._tmp @tmp.setter def tmp(self, tmp): if isinstance(tmp, float): self._tmp = tmp elif isinstance(tmp, int): self._tmp = float(tmp) @property def last_upd(self): return self._dt @last_upd.setter def last_upd(self, dt): if isinstance(dt, str): self._dt = dt class sensor_hum: def __init__(self, hum, dt): self._hum = hum self._dt = dt @property def hum(self): return self._hum @hum.setter def hum(self, hum): if isinstance(hum, float): self._hum = hum elif isinstance(hum, int): self._hum = float(hum) @property def last_upd(self): return self._dt @last_upd.setter def last_upd(self, dt): if isinstance(dt, str): self._dt = dt # pylint: disable=no-name-in-module,wrong-import-order def setup(): global tmp_cl, hum_cl, on_usb, use_sound, use_leds, wait_for_zero_sec, my_debug, interval TAG = 'setup(): ' on_usb = is_usb() use_sound = True if int(os.getenv("USE_SOUND")) else False use_leds = True if int(os.getenv("USE_LEDS")) else False wait_for_zero_sec = True if int(os.getenv("WAIT_FOR_ZERO_SEC")) else False my_debug = True if int(os.getenv("MY_DEBUG")) else False interval = int(os.getenv("INTERVAL_SECONDS")) if my_debug: print(TAG+f"use_sound= {use_sound}") print(TAG+f"use_leds= {use_leds}") print(TAG+f"wait_for_zero_sec= {wait_for_zero_sec}") print(TAG+f"Interval set for {interval} seconds") print(TAG+"Display height= {}, width= {}".format(height, width)) # result: height 127, width 295 # h0 = height // 6 # 127 // 6 = 21 20 + 21 = 41 , 20 + (2 x 21) = 62, 20 + (3*21)= 83, 20 + (4x21)= 104 # hlist = [h0, h0*3, h0*5] hlist = [20, 41, 62, 83, 104] for _ in range (len(hlist)): magtag.add_text( #text_font="/fonts/Arial-12.pcf", text_position=( 10, hlist[_], ), text_scale=2, ) # Create instances of the sensor_tmp and sensor_hum classes tmp_cl = sensor_tmp(0.00, "") # create class instance with default values hum_cl = sensor_hum(0.00, "") # same text_items = ["Sensor data to Google", "using", "Pushingbox.com", "(c) 2024 @PaulskPt", f"USB connected: {"Yes" if on_usb else "No "}"] for i in range(len(text_items)): magtag.set_text(text_items[i], i, auto_refresh = False) # Display the dt, tm and tz time.sleep(1) # to prevent RuntimeError Refresh too soon try: magtag.display.refresh() # refresh the display except RuntimeError: print(TAG+"RuntimeError") pass # reason: Refresh too soon # +-----------------------------------------------------------+ # | CONNECT TO WiFi | # +-----------------------------------------------------------+ wifi.AuthMode.WPA2 # set only once ssid = os.getenv("CIRCUITPY_WIFI_SSID") print(TAG+f"Connecting to AP {os.getenv("CIRCUITPY_WIFI_SSID")}") wifi_cnt = 0 while not wifi_is_connected(): if not do_connect(): print(TAG+f"WiFi failed to connect to {ssid}") wifi_cnt += 1 if wifi_cnt >= 5: print(TAG+f"tried {wifi_cnt} times to connect WiFi but failed. Exiting...") time.sleep(3) break if wifi_is_connected(): time.sleep(2) ipv4 = ipaddress.ip_address("8.8.8.8") ping_time = 0.0 ping_cnt = 0 while ping_time is 0.00: ping_time = wifi.radio.ping(ipv4) if ping_time is None: # prevent crash ping_time = 0.0 if ping_time < 0.001: ping_cnt += 1 if ping_cnt >= 10: print(TAG+f"Ping to Google failed. Tried {ping_cnt} times") break if ping_time > 0.00: print(TAG+f"Ping google.com: {ping_time} ms") def blink_and_button_test(clr=None): TAG= "blink_test(): " if my_debug: print(TAG+f"param clr= {clr} = {clr_dict[clr]}") cnt = 0 if clr is None: # Do the test while True: for i, b in enumerate(magtag.peripherals.buttons): if not b.value: print("Button %c pressed" % chr((ord("A") + i))) magtag.peripherals.neopixel_disable = False magtag.peripherals.neopixels.fill(button_colors[i]) magtag.peripherals.play_tone(button_tones[i], 0.25) break else: magtag.peripherals.neopixels.fill(button_colors[BLK]) # switch neopixels to black (off) magtag.peripherals.neopixel_disable = True time.sleep(0.01) cnt += 1 if cnt >= len(button_colors): print(f"Exiting function: {TAG[:-4]}") break def blink_leds(clr=BLK): if not use_leds: return TAG= "blink_leds(): " if isinstance(clr, int): le = len(button_colors) if clr < 0: clr = 0 if clr >= le: clr = le -1 if my_debug: print(TAG+f"param clr= {clr} = {clr_dict[clr]}") magtag.peripherals.neopixel_disable = False magtag.peripherals.neopixels.fill(button_colors[clr]) if use_sound and clr != BLK: # Only sound tone if color is not black magtag.peripherals.play_tone(button_tones[clr], 0.25) time.sleep(0.01) magtag.peripherals.neopixels.fill(button_colors[BLK]) # switch neopixels to black (off) magtag.peripherals.neopixel_disable = True def dt_fm_rtc(): dt = time.localtime() dts = "{:d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}Z".format(dt.tm_year, dt.tm_mon, dt.tm_mday, dt.tm_hour, dt.tm_min, dt.tm_sec) return dts def get_time_fm_aio(): global rtc_set, curr_tm, dts TAG = "get_time_fm_aio(): " network = magtag.network TIME_URL = "https://io.adafruit.com/api/v2/{:s}/integrations/".format(os.getenv("ADAFRUIT_IO_USERNAME")) TIME_URL += "time/strftime?x-aio-key={:s}".format(os.getenv("ADAFRUIT_IO_KEY")) TIME_URL += "&fmt=%25Y-%25m-%25d+%25H%3A%25M%3A%25S.%25L+%25j+%25u+%25z+%25Z" print(TAG+"Fetching date and time from: \"{}...\"".format(TIME_URL[:31])) # don't print aio_username neither aio_key try: response = network.fetch(TIME_URL) except (ConnectionError, ValueError, RuntimeError) as e: print(TAG+f"Error: {e}. Restarting in 3 seconds...") # Exit program and restart in 3 seconds. magtag.exit_and_deep_sleep(3) if my_debug and response is not None: print() print("-" * 40) print(TAG+f"response.text= {response.text}") print("-" * 40) if response: if not rtc_set: curr_date = response.text[:10] curr_time = response.text[11:19] print(f"curr_date: {curr_date}, curr_time: {curr_time}") # tm = time.struct_time tz = response.text[30:40] if my_debug: print(TAG+"tz= \"{}\"".format(tz)) resp_lst = response.text.split(" ") # response split = ['2022-03-13', '17:02:51.303', '072', '7', '+0000', 'WET'] if resp_lst[5] == 'WEST': dst = 1 elif resp_lst[5] == 'WET': dst = 0 else: dst = -1 if my_debug: print(TAG+"dst=", dst) print(TAG+"response.text.split=", resp_lst) dt = response.text[:10] # tm = response.text[11:16] # 23] hh = int(resp_lst[1][:2]) mm = int(resp_lst[1][3:5]) # +mm_corr # add the correction ss = int(resp_lst[1][6:8]) yd = int(resp_lst[2]) # day of the year wd = int(resp_lst[3])-1 # day of the week -- strftime %u (weekday base Monday = 1), so correct because CPY datetime uses base 0 # sDt = "Day of the year: {}, {} {} {} {} {}".format(yd, weekdays[wd], resp_lst[0], resp_lst[1][:5], resp_lst[4], resp_lst[5]) # Set the internal RTC yy = int(dt[:4]) mo = int(dt[5:7]) dd = int(dt[8:10]) tm2 = (yy, mo, dd, hh, mm, ss, wd, yd, dst) tm3 = time.struct_time(tm2) if my_debug: print(TAG+"dt=",dt) print(TAG+"yy ={}, mo={}, dd={}".format(yy, mm, dd)) print(TAG+"tm2=",tm2) print(TAG+"tm3=",tm3) rtc_bi.datetime = tm3 # set the built-in RTC if wait_for_zero_sec: # see flag at start of this func print(TAG+"Waiting for time.localtime seconds reaching 0 ...") while True: t = time.localtime() if t[5] == 0: # Wait until seconds is almost 60 (0) # print("t[tm_sec]=", t[5]) break print(TAG+f"finished waiting for zero second") curr_tm = time.time() # get the time in seconds since epoch (set curr_tm only at startup) rtc_set = True # Prepare datetime for funct get_th_direct() # format: 2022-08-10T16:40:16Z dts = dt[:4] + "-" + dt[5:7] + "-" + dt[8:10] + "T" + resp_lst[1][:2] + ":" + resp_lst[1][3:5] + ":" + resp_lst[1][6:8] + "Z" response.close() # Free resources (like socket) - to be used elsewhere def get_th_direct(): global t_dict, h_dict, curr_tm, tmp_old, hum_old, tmp_cl, hum_cl, sensor # datetime e.g.: 2022-08-10T16:40:16Z TAG = "get_th_direct(): " dts = dt_fm_rtc() # get datetime from builtin rtc # -------------------------------------------------------------- if not sensor: try: sensor = adafruit_ahtx0.AHTx0(i2c) except ValueError as e: print(TAG+f"Error: {e}. Check wiring!") if sensor: try: tmp = sensor.temperature hum = sensor.relative_humidity if my_debug: blink_leds(YLW) tmp_cl.tmp = tmp tmp_cl.last_upd = dts if tmp != tmp_old: tmp_old = tmp # ------------------ hum_cl.hum = hum hum_cl.last_upd = dts if hum != hum_old: hum_old = hum # ----------------- if my_debug: print(TAG+f"updated_at: {tmp_cl.last_upd}") print(TAG+f"Temperature: {tmp_cl.tmp}") print(TAG+f"Humidity: {hum_cl.hum}") except ValueError as e: print(TAG+f"Error: {e}. Check wiring!") pass def pr_th_msg(): global stat_result TAG = "pr_th_msg(): " if stat_result is None: stat_result = "" ret = True upd_dt = tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z # ----------------- tmp = tmp_cl.tmp tmp_s = str(tmp) # ----------------- hum = hum_cl.hum hum_s = str(hum) # ---------------- text_items = ["latest sensor data "+stat_result, "-" * 23, upd_dt, "Temperature: "+tmp_s+" C", "Humidity: "+hum_s+" %"] magtag.remove_all_text for i in range(len(text_items)): magtag.set_text(text_items[i], i, auto_refresh = False) # Display the dt, tm and tz cnt = 0 while True: try: magtag.display.refresh() # refresh the display cnt += 1 if cnt >= 10: ret = False break except RuntimeError: # print(TAG+"RuntimeError") pass # reason: Refresh too soon return ret def send_to_pushingbox(): global msg_sent, upd_dt_old, tmp_old, hum_old, stat_result TAG = "send_to_pushingbox(): " ret = True tmp_s = None hum_s = None network = magtag.network stat_result = None devid = os.getenv("PUSHING_BOX_DEVID") # device ID on Pushingbox for our Scenario upd_dt = tmp_cl.last_upd # [:10] # e.g.: 2022-08-10T16:40:16Z # -------------------------------------------------------------- tmp = tmp_cl.tmp tmp_s = str(tmp) if my_debug: print(TAG+f"tmp_s= \'{tmp_s}\', tmp_old= {tmp_old}") # -------------------------------------------------------------- hum = hum_cl.hum hum_s = str(hum) if my_debug: print(TAG+f"hum_s= \'{hum_s}\', hum_old= {hum_old}") # -------------------------------------------------------------- # le_old = len(upd_dt_old) # if (le_old > 0) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old if (upd_dt_old == upd_dt) and (tmp == tmp_old) and (hum == hum_old): # and upd_dt == upd_dt_old print(TAG+"datetime stamp unchanged. Waiting for new sensor data...") return ret # We don't want to send duplicates else: upd_dt_old = upd_dt # #upd_tm = t_dict["updated_at"][11:19] msg_sent += 1 # increase the messages sent count # dteData = upd_dt # Added 2024-03-22 because this variable was missing. # ?date=$date$&time=$time$&temp=$temp$&hum=$hum$ s = "http://api.pushingbox.com/pushingbox?devid=" s += devid s += "&date=\"" + upd_dt + "\"" s += "&temp=" + tmp_s s += "&hum=" + hum_s if my_debug: print(TAG+upd_dt, end='') print(". Sending Data message nr: ", end='') print(msg_sent, end='') print(" to middle-man server...", end='\n') print(TAG+"Going to send:", end='') print("\n\""+s, end='') print("\"", end='\n') # To complete the message to the Serial (REPL) window print(TAG+"Data Sent") try: response = network.fetch(s) # Get the spreadsheet partly except OSError as e: print(TAG+f"Error: {e}") if e.args[0] == -2: # gaierror # See: https://docs.circuitpython.org/_/downloads/en/6.3.x/pdf/, page 216 return # do nothing if response: le = len(response.text) if le > 0: n = response.text.find("404") print(TAG,end='') if n >=0: print("error 404: file not found on this server.") else: print(f"response.text[:100]= {response.text[:100]}", end='') print(" [...] ", end='') print(f"{response.text[-100:]}", end='\n') status = response.status_code stat_result = "OK" if status == 200 else "NO" print() s1 = "response.status_code=" s2 = "{} (= {})".format(status, stat_result) print(TAG+f"{s1} {s2}") if status == 200: blink_leds(GRN) else: blink_leds(RED) if my_debug: pr_msg(["send result:", s2]) response = None if my_debug: print(TAG+f"upd_dt= {upd_dt}, tmp_s= {tmp_s}, hum_s= {hum_s}") pr_th_msg() return ret def yes_no(nr): ret = None y = "Yes" n = "No " if isinstance(nr, int): if nr >= 0 and nr <= 3: if nr == SOUND_IDX: ret = y if use_sound else n if nr == LEDS_IDX: ret = y if use_leds else n if nr == ZEROSEC_IDX: ret = y if wait_for_zero_sec else n if nr == DEBUG_IDX: ret = y if my_debug else n return ret def pr_buttons(): text_items = [ "Btn: Function:", "A: sound", "B: LEDs", "C: wait for zero sec", "D: debug texts"] pr_msg(text_items) def pr_flags(): TAG= "pr_flags(): " itms_lst = ["sound", "LEDs", "start at zero sec", "print debug", ] le = len(itms_lst) print("\nStatus of global flags: (set in file settings.toml):") for _ in range(le): print(TAG+"{:17s}: {:s}".format(itms_lst[_], yes_no(_))) print() text_items = ["global flags state:"] for _ in range(le): s = "{:17s}: {:s}".format(itms_lst[_], yes_no(_)) text_items.append(s) for i in range(len(text_items)): magtag.set_text(text_items[i], i, auto_refresh = False) # Display the dt, tm and tz time.sleep(1) # to prevent RuntimeError Refresh too soon cnt = 0 while True: try: magtag.display.refresh() # refresh the display cnt += 1 if cnt >= 10: break except RuntimeError: # print(TAG+"RuntimeError") pass # reason: Refresh too soon def is_usb(): # Test if we're connected to USB # If an error occurs we are on USB ret = False try: from storage import remount remount("/", False) except (RuntimeError) as e: ret = True print(f"is_usb(): Are we connected to USB? {"Yes" if ret else "No"}") return ret def do_connect(): global ip TAG = "do_connect(): " ret = False # Get env variables from file settings.toml ssid = os.getenv("CIRCUITPY_WIFI_SSID") pw = os.getenv("CIRCUITPY_WIFI_PASSWORD") s__ip = "" try: wifi.radio.connect(ssid=ssid, password=pw) except ConnectionError as e: print(TAG+f"WiFi connection Error: \'{e}\'") except Exception as e: print(TAG+f"Error: {dir(e)}") ip = wifi.radio.ipv4_address if ip: s__ip = str(ip) ret = True if my_debug: print(TAG+f"connected to \'{ssid}\'. IP: {s__ip}") return ret def wifi_is_connected(): global ip s__ip = str(ip) return True if s__ip is not None and len(s__ip) > 0 and s__ip != '0.0.0.0' else False def pr_msg(msg): text_items = [] le_max = 5 if isinstance(msg, list): le_msg = len(msg) if le_msg > 0 and le_msg <= le_max: for _ in range(len(msg)): text_items.append(msg[_]) le = len(text_items) if le_max - le > 0: # fill up with blank lines for _ in range(le_max - le): text_items.append(" ") for i in range(len(text_items)): magtag.set_text(text_items[i], i, auto_refresh = False) # Display the dt, tm and tz time.sleep(1) # to prevent RuntimeError Refresh too soon cnt = 0 while True: try: magtag.display.refresh() # refresh the display cnt += 1 if cnt >= 10: break except RuntimeError: # print(TAG+"RuntimeError") pass # reason: Refresh too soon def wr_to_toml(itm): TAG= "wr_to_toml(): " toml_f = "settings.toml" if on_usb: s1 = "Writing to file" s2 = "works only when" s3 = "not connected to USB" print(TAG+f"{s1}{toml_f} {s2} {s3}") pr_msg([s1, toml_f, s2, s3]) pr_th_msg() # rewrite the temperature & humidity page return False ret = True delay = 3 itms = [("WAIT_FOR_ZERO_SEC", wait_for_zero_sec), ("MY_DEBUG", my_debug)] print(TAG+f"param rcvd: itm= {itm}") itm_s = itms[itm][0] val = itms[itm][1] val_s = "1" if val == True else "0" print(TAG+f"Trying to write to file: {toml_f}, item: \"{itm_s}\" with value: {val_s}") # Write try: from cptoml import put from storage import remount remount("/", False) #put("CIRCUITPY_PYSTACK_SIZE", 7000) # To set an item in root table put(itm_s, val_s) #, "subtable1", comment="This is useless") # To set item1 in subtable1 with comment remount("/", True) pr_msg([itm_s, "value: " + val_s, "successfully", "written to:", toml_f]) print("Software reset in %d seconds" % delay) time.sleep(delay) supervisor.reload() except (RuntimeError, ImportError, OSError) as e: print(TAG+f"Error: {e}") ret = False return ret def main(): global curr_tm, use_sound, use_leds, wait_for_zero_sec, my_debug, interval TAG = 'main(): ' setup() curr_tm = time.time() # set curr_tm for the first time start = True msg_sent = False delay = 3 # blink_and_button_test() # Test the Magtag LEDs pr_buttons() pr_flags() toml_zero_sec = 0 toml_debug = 1 wifi_cnt = 0 doit = False while True: try: while not wifi_is_connected(): if do_connect(): break else: wifi_cnt += 1 if wifi_cnt >= 5: print(TAG+f"WiFi connection failed. Tried {wifi_cnt} times") raise RuntimeError if magtag.peripherals.button_a_pressed: use_sound = not use_sound # toggle sound s1 = "button A pressed." s2 = "sound {:s}".format(yes_no(SOUND_IDX)) print(TAG+f"{s1} {s2}") pr_msg([s1, s2]) pr_th_msg() # rewrite the temperature & humidity page if magtag.peripherals.button_b_pressed: use_leds = not use_leds # toggle leds s1 = "button B pressed." s2 = "LEDs {:s}".format(yes_no(LEDS_IDX)) print(TAG+f"{s1} {s2}") pr_msg([s1, s2]) pr_th_msg() # rewrite the temperature & humidity page if magtag.peripherals.button_c_pressed: wait_for_zero_sec = not wait_for_zero_sec print(TAG+"button C pressed. Start time wait for zero seconds {:s}".format(yes_no(ZEROSEC_IDX))) wr_to_toml(toml_zero_sec) if magtag.peripherals.button_d_pressed: my_debug = not my_debug print(TAG+"button d pressed. Debug print statements {:s}".format(yes_no(DEBUG_IDX))) wr_to_toml(toml_debug) if not rtc_set: get_time_fm_aio() if rtc_set: ct = time.time() # rtc_bi.datetime c_diff = ct - curr_tm dt1 = interval // 6 # default: 3600 seconds. 3600 // 6 = 600 seconds = 10 minutes. See setup(). dt2 = interval # default: 3600 seconds = 1 hour c1 = c_diff % dt1 c2 = c_diff % dt2 if c1 % 10 == 0: if msg_sent: msg_sent = False # print(TAG+f"ct= {ct}, curr_tm= {curr_tm}. Difference= {ct - curr_tm} secs.") # print(TAG+f"c_diff % {dt1}= {c1}. c_diff % {dt2}= {c2}. Looping...") print(TAG+"c_diff % {:3d}= {:4d}, c_diff % {:4d}= {:4d}. Looping...".format(dt1, c1, dt2, c2)) # doit = True if (c2 == dt2) else False if c2 == 0: doit = True else: doit = False if start or (not sensor) or (doit) and (not msg_sent): start = False blink_leds() # switch off LEDs get_th_direct() # sensor directly connected to MAGTAG board via I2C/Stemma_QT if not send_to_pushingbox(): raise KeyboardInterrupt else: msg_sent = True #if c2 == 30: # sys.exit() # Temporary forced end of execution if c2 >= (dt2 - 10) and c2 <= dt2: # 1 hour interval (give some space) get_time_fm_aio() curr_tm = ct time.sleep(1) except ImportError: # e.g.: no module named 'platform' (in file :/lib/Adafruit_IO/Client.py, line 24) pass except KeyboardInterrupt: raise SystemExit if __name__ == '__main__': main()