Skip to content

Instantly share code, notes, and snippets.

@shankara-n
Last active April 11, 2020 09:18
Show Gist options
  • Save shankara-n/1271ea8dbcf699ab3be946ee86beda70 to your computer and use it in GitHub Desktop.
Save shankara-n/1271ea8dbcf699ab3be946ee86beda70 to your computer and use it in GitHub Desktop.
A simple script to send unicasts and receive data through xbee. Usage: python3 xbeeTester.py <optional:debug>
from digi.xbee.models.address import XBee64BitAddress, XBee16BitAddress, XBeeIMEIAddress
from digi.xbee.devices import RemoteXBeeDevice, ZigBeeDevice
from digi.xbee.exception import InvalidOperatingModeException, XBeeException, TimeoutException
import time
import RPi.GPIO as GPIO
import traceback
from digi.xbee.util import utils
import sys
from digi.xbee.models.status import TransmitStatus
import digi.xbee.packets.common
# XBee Communicaton Parameters
PORT = "/dev/serial0"
BAUD_RATE = 57600
RESET_GPIO_NUM = 16
MAX_BYTES_PER_PACKET = 255
# XBee reset pin initialisation
GPIO.setmode(GPIO.BOARD)
GPIO.setup(RESET_GPIO_NUM,GPIO.OUT)
'''
-------------------- Rover Communication Methods --------------------
'''
def chunkArray(array, chunkSize):
return [array[i:chunkSize+i] for i in range(0, len(array), chunkSize)]
def sendXbeeUnicast(macID,message):
try:
# Unicast Request
remote_device = RemoteXBeeDevice(zcXBeeDevice, XBee64BitAddress.from_hex_string(macID))
zcXBeeDevice.send_data_async(remote_device, message)
print("[*] Sent unicast!")
'''
broadcast
zcXBeeDevice.send_data_broadcast(json.dumps(request))
'''
except Exception as e:
print("[!] Error sending xbee unicast!")
print("[!] type error: " + str(e))
print(traceback.format_exc())
def my_data_received_callback(xbee_message):
try:
address = xbee_message.remote_device.get_64bit_addr()
data = xbee_message.data.decode("ascii")
print("[*] Received :"+str(data)+". From :"+str(address))
except Exception as e:
print("type error: " + str(e))
print(traceback.format_exc())
def packet_received_callback(packet):
global transmitStatusReceived, transmitSuccess
if debugMode:
print("[*] Received Packet :"+str(type(packet)))
print(packet.__dict__)
if type(packet) == digi.xbee.packets.common.TransmitStatusPacket :
transmitSuccess = packet.__dict__["_TransmitStatusPacket__transmit_status"] == TransmitStatus.SUCCESS
transmitStatusReceived = True
print("[*] Transmit Status OK!")
'''
-------------------- XBEE Setup --------------------
'''
try:
# Perform Zigbee Device Reset
GPIO.output(RESET_GPIO_NUM, False)
time.sleep(2)
GPIO.output(RESET_GPIO_NUM, True)
time.sleep(3)
GPIO.cleanup()
except Exception as e:
print("[!] Error resetting GPIO pin")
print("[!] type error: " + str(e))
print(traceback.format_exc())
retriesLeft = 5
while retriesLeft > 0:
try:
# Connect and initialise ZigBee Device instance
zcXBeeDevice = ZigBeeDevice(PORT, BAUD_RATE)
zcXBeeDevice.open()
zcXBeeDevice.add_data_received_callback(my_data_received_callback)
zcXBeeDevice.add_packet_received_callback(packet_received_callback)
print("[*] Succesfully initialised zigbee device.")
break
except Exception as e:
print("[!] Error initialising zigbee device. Retries Left:"+str(retriesLeft))
print("[!] type error: " + str(e))
print(traceback.format_exc())
retriesLeft = retriesLeft -1
time.sleep(2)
if retriesLeft == 0:
print("[!] Failed to initialise zigbee device. Quitting now...")
sys.exit()
'''
-------------------- Main --------------------
'''
if __name__ == "__main__":
if len(sys.argv)>1:
print("[*] Debug MODE!!!")
global debugMode
debugMode = True
else:
debugMode = False
print("[*] USAGE: DESTINATION_MAC_ADDR MESSAGE ")
while True:
try:
userInput = input()
macID,message = userInput.split(" ")
sendXbeeUnicast(macID,message)
except Exception as e:
print("[!] Error occurred while reading/parsing user input")
print("[!] type error: " + str(e))
print(traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment