from digi.xbee.models.address import XBee64BitAddress, XBee16BitAddress, XBeeIMEIAddress from digi.xbee.devices import RemoteXBeeDevice, ZigBeeDevice from digi.xbee.exception import InvalidOperatingModeException, XBeeException, TimeoutException import time import RPi.GPIO as GPIO import traceback from digi.xbee.util import utils import sys from digi.xbee.models.status import TransmitStatus import digi.xbee.packets.common # XBee Communicaton Parameters PORT = "/dev/serial0" BAUD_RATE = 57600 RESET_GPIO_NUM = 16 MAX_BYTES_PER_PACKET = 255 # XBee reset pin initialisation GPIO.setmode(GPIO.BOARD) GPIO.setup(RESET_GPIO_NUM,GPIO.OUT) ''' -------------------- Rover Communication Methods -------------------- ''' def chunkArray(array, chunkSize): return [array[i:chunkSize+i] for i in range(0, len(array), chunkSize)] def sendXbeeUnicast(macID,message): try: # Unicast Request remote_device = RemoteXBeeDevice(zcXBeeDevice, XBee64BitAddress.from_hex_string(macID)) zcXBeeDevice.send_data_async(remote_device, message) print("[*] Sent unicast!") ''' broadcast zcXBeeDevice.send_data_broadcast(json.dumps(request)) ''' except Exception as e: print("[!] Error sending xbee unicast!") print("[!] type error: " + str(e)) print(traceback.format_exc()) def my_data_received_callback(xbee_message): try: address = xbee_message.remote_device.get_64bit_addr() data = xbee_message.data.decode("ascii") print("[*] Received :"+str(data)+". From :"+str(address)) except Exception as e: print("type error: " + str(e)) print(traceback.format_exc()) def packet_received_callback(packet): global transmitStatusReceived, transmitSuccess if debugMode: print("[*] Received Packet :"+str(type(packet))) print(packet.__dict__) if type(packet) == digi.xbee.packets.common.TransmitStatusPacket : transmitSuccess = packet.__dict__["_TransmitStatusPacket__transmit_status"] == TransmitStatus.SUCCESS transmitStatusReceived = True print("[*] Transmit Status OK!") ''' -------------------- XBEE Setup -------------------- ''' try: # Perform Zigbee Device Reset GPIO.output(RESET_GPIO_NUM, False) time.sleep(2) GPIO.output(RESET_GPIO_NUM, True) time.sleep(3) GPIO.cleanup() except Exception as e: print("[!] Error resetting GPIO pin") print("[!] type error: " + str(e)) print(traceback.format_exc()) retriesLeft = 5 while retriesLeft > 0: try: # Connect and initialise ZigBee Device instance zcXBeeDevice = ZigBeeDevice(PORT, BAUD_RATE) zcXBeeDevice.open() zcXBeeDevice.add_data_received_callback(my_data_received_callback) zcXBeeDevice.add_packet_received_callback(packet_received_callback) print("[*] Succesfully initialised zigbee device.") break except Exception as e: print("[!] Error initialising zigbee device. Retries Left:"+str(retriesLeft)) print("[!] type error: " + str(e)) print(traceback.format_exc()) retriesLeft = retriesLeft -1 time.sleep(2) if retriesLeft == 0: print("[!] Failed to initialise zigbee device. Quitting now...") sys.exit() ''' -------------------- Main -------------------- ''' if __name__ == "__main__": if len(sys.argv)>1: print("[*] Debug MODE!!!") global debugMode debugMode = True else: debugMode = False print("[*] USAGE: DESTINATION_MAC_ADDR MESSAGE ") while True: try: userInput = input() macID,message = userInput.split(" ") sendXbeeUnicast(macID,message) except Exception as e: print("[!] Error occurred while reading/parsing user input") print("[!] type error: " + str(e)) print(traceback.format_exc())