Last active
April 11, 2020 09:18
-
-
Save shankara-n/1271ea8dbcf699ab3be946ee86beda70 to your computer and use it in GitHub Desktop.
A simple script to send unicasts and receive data through xbee. Usage: python3 xbeeTester.py <optional:debug>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from digi.xbee.models.address import XBee64BitAddress, XBee16BitAddress, XBeeIMEIAddress | |
| from digi.xbee.devices import RemoteXBeeDevice, ZigBeeDevice | |
| from digi.xbee.exception import InvalidOperatingModeException, XBeeException, TimeoutException | |
| import time | |
| import RPi.GPIO as GPIO | |
| import traceback | |
| from digi.xbee.util import utils | |
| import sys | |
| from digi.xbee.models.status import TransmitStatus | |
| import digi.xbee.packets.common | |
| # XBee Communicaton Parameters | |
| PORT = "/dev/serial0" | |
| BAUD_RATE = 57600 | |
| RESET_GPIO_NUM = 16 | |
| MAX_BYTES_PER_PACKET = 255 | |
| # XBee reset pin initialisation | |
| GPIO.setmode(GPIO.BOARD) | |
| GPIO.setup(RESET_GPIO_NUM,GPIO.OUT) | |
| ''' | |
| -------------------- Rover Communication Methods -------------------- | |
| ''' | |
| def chunkArray(array, chunkSize): | |
| return [array[i:chunkSize+i] for i in range(0, len(array), chunkSize)] | |
| def sendXbeeUnicast(macID,message): | |
| try: | |
| # Unicast Request | |
| remote_device = RemoteXBeeDevice(zcXBeeDevice, XBee64BitAddress.from_hex_string(macID)) | |
| zcXBeeDevice.send_data_async(remote_device, message) | |
| print("[*] Sent unicast!") | |
| ''' | |
| broadcast | |
| zcXBeeDevice.send_data_broadcast(json.dumps(request)) | |
| ''' | |
| except Exception as e: | |
| print("[!] Error sending xbee unicast!") | |
| print("[!] type error: " + str(e)) | |
| print(traceback.format_exc()) | |
| def my_data_received_callback(xbee_message): | |
| try: | |
| address = xbee_message.remote_device.get_64bit_addr() | |
| data = xbee_message.data.decode("ascii") | |
| print("[*] Received :"+str(data)+". From :"+str(address)) | |
| except Exception as e: | |
| print("type error: " + str(e)) | |
| print(traceback.format_exc()) | |
| def packet_received_callback(packet): | |
| global transmitStatusReceived, transmitSuccess | |
| if debugMode: | |
| print("[*] Received Packet :"+str(type(packet))) | |
| print(packet.__dict__) | |
| if type(packet) == digi.xbee.packets.common.TransmitStatusPacket : | |
| transmitSuccess = packet.__dict__["_TransmitStatusPacket__transmit_status"] == TransmitStatus.SUCCESS | |
| transmitStatusReceived = True | |
| print("[*] Transmit Status OK!") | |
| ''' | |
| -------------------- XBEE Setup -------------------- | |
| ''' | |
| try: | |
| # Perform Zigbee Device Reset | |
| GPIO.output(RESET_GPIO_NUM, False) | |
| time.sleep(2) | |
| GPIO.output(RESET_GPIO_NUM, True) | |
| time.sleep(3) | |
| GPIO.cleanup() | |
| except Exception as e: | |
| print("[!] Error resetting GPIO pin") | |
| print("[!] type error: " + str(e)) | |
| print(traceback.format_exc()) | |
| retriesLeft = 5 | |
| while retriesLeft > 0: | |
| try: | |
| # Connect and initialise ZigBee Device instance | |
| zcXBeeDevice = ZigBeeDevice(PORT, BAUD_RATE) | |
| zcXBeeDevice.open() | |
| zcXBeeDevice.add_data_received_callback(my_data_received_callback) | |
| zcXBeeDevice.add_packet_received_callback(packet_received_callback) | |
| print("[*] Succesfully initialised zigbee device.") | |
| break | |
| except Exception as e: | |
| print("[!] Error initialising zigbee device. Retries Left:"+str(retriesLeft)) | |
| print("[!] type error: " + str(e)) | |
| print(traceback.format_exc()) | |
| retriesLeft = retriesLeft -1 | |
| time.sleep(2) | |
| if retriesLeft == 0: | |
| print("[!] Failed to initialise zigbee device. Quitting now...") | |
| sys.exit() | |
| ''' | |
| -------------------- Main -------------------- | |
| ''' | |
| if __name__ == "__main__": | |
| if len(sys.argv)>1: | |
| print("[*] Debug MODE!!!") | |
| global debugMode | |
| debugMode = True | |
| else: | |
| debugMode = False | |
| print("[*] USAGE: DESTINATION_MAC_ADDR MESSAGE ") | |
| while True: | |
| try: | |
| userInput = input() | |
| macID,message = userInput.split(" ") | |
| sendXbeeUnicast(macID,message) | |
| except Exception as e: | |
| print("[!] Error occurred while reading/parsing user input") | |
| print("[!] type error: " + str(e)) | |
| print(traceback.format_exc()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment