Skip to content

Instantly share code, notes, and snippets.

@uadrive1
Created May 15, 2020 01:33
Show Gist options
  • Select an option

  • Save uadrive1/4c1e832d6ac8d06e59af4c73611446a7 to your computer and use it in GitHub Desktop.

Select an option

Save uadrive1/4c1e832d6ac8d06e59af4c73611446a7 to your computer and use it in GitHub Desktop.
Mock python hardware calls - relevant files
'''
* Copyright (C) 2015-2023
*
* This file is part of Inertia.cc project.
command line usage
cd /usr/local/bin/Workspace/plugins/ && sudo python SimplemotorTest.py forward
function usage
forward()
forwardadvanced(True, True)
UDP commands
forward
reverse
turnRight
turnRight
'''
import threading
import io
import sys
sys.path.append('/usr/local/bin/Workspace')
import os
import shutil
import redis
import logging
import re
from Adafruit_PWM_Servo_Driver import PWM
import time
import json
import RPi.GPIO as gpio
from threading import Timer
from datetime import datetime
REDIS_HOST = 'localhost'
REDIS_PORT = '6379';
gpio.setmode(gpio.BCM)
gpio.setup(22, gpio.OUT)
gpio.setup(27, gpio.OUT)
gpio.setup(4, gpio.OUT)
gpio.setup(17, gpio.OUT)
if not os.geteuid() == 0:
sys.exit("\nGPIO access requires super user access. Only root can run this script.\n")
#DC motor params
minRPM = 150.0
maxRPM = 313.0
frequency = 500.0*20
resolution = 4096.0
# Initialize the PWM device using the default address
pwm = PWM(0x40, True)
pwm.setPWMFreq(frequency)
RIGHTWHEEL_ID ='DCMotor1'
LEFTWHEEL_ID ='DCMotor2'
'''
LEFTWHEEL_DCMOTOR_GPIOPIN1=22
LEFTWHEEL_DCMOTOR_GPIOPIN2=27
LEFTWHEEL_DCMOTOR_PWMCHANNEL=14
RIGHTWHEEL_DCMOTOR_GPIOPIN1=4
RIGHTWHEEL_DCMOTOR_GPIOPIN2=17
RIGHTWHEEL_DCMOTOR_PWMCHANNEL=15
'''
LEFTWHEEL_DCMOTOR_GPIOPIN1=4
LEFTWHEEL_DCMOTOR_GPIOPIN2=17
LEFTWHEEL_DCMOTOR_PWMCHANNEL=15
RIGHTWHEEL_DCMOTOR_GPIOPIN1=22
RIGHTWHEEL_DCMOTOR_GPIOPIN2=27
RIGHTWHEEL_DCMOTOR_PWMCHANNEL=14
TRANSLATION_SPEED = 200
ROTATION_SPEED = 200
BRAKETIME_ON_INACTIVITY_TRANSLATE = .7 #in seconds
BRAKETIME_ON_INACTIVITY_ROTATE = .2 #in seconds
'''
CAMERA Servo motor constants
'''
CAMSERVO_STARTANGLE=-90.0
CAMSERVO_ENDANGLE=90.0
CAMSERVO_STARTANGLE_PULSEWIDTH_IN_MICROSEC = 1000.0
CAMSERVO_ENDANGLE_PULSEWIDTH_IN_MICROSEC = 2000.0
CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC = float(CAMSERVO_STARTANGLE_PULSEWIDTH_IN_MICROSEC) / 1000000
CAMSERVO_ENDANGLE_PULSEWIDTH_IN_SEC = float(CAMSERVO_ENDANGLE_PULSEWIDTH_IN_MICROSEC) / 1000000
CAMSERVO_FREQUENCY = 60.0 # in Hz
CAMSERVO_RESOLUTION = 4096.0
CAMSERVO_VERTICAL_PWMCHANNEL=12
CAMSERVO_HORIZONTAL_PWMCHANNEL=13
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT )
def now():
"""Get the current Unix epoch time as an integer."""
return int(time.time()*1000*1000)
def brakeoninactivity(brakeAfter):
print "brakeoninactivity called"
time.sleep(BRAKETIME_ON_INACTIVITY_TRANSLATE)
timestamp_a, timestamp_b = r.get('inMotion'), r.get('braked')
if timestamp_a is None :
timestamp_a = 0
else:
timestamp_a = int(timestamp_a)
if timestamp_b is None :
timestamp_b = 0
else:
timestamp_b = int(timestamp_b)
#print(now())
#print(timestamp_a)
#print(timestamp_b)
# If the last 'inMotion' call was 'brakeAfter' secs before and the last 'braked' call was even before that, then brake.
print (str(now() - int(timestamp_a) >= brakeAfter*1000*1000 and int(timestamp_b)<= int(timestamp_a)))
if now() - int(timestamp_a) >= brakeAfter*1000*1000 and int(timestamp_b)<= int(timestamp_a) :
'''
print('Braking time '+ str(datetime.now()))
logging.info('Braking Time '+ str(datetime.now()))
#Slowing down from 100 to 0
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput = 70
print (RPMInput)
for n in range(int(RPMInput),-1,-1):
outputPulse = ((n - minRPM)/(maxRPM - minRPM))*(FullPulse)
off = on + int(outputPulse / timeperTick)
print(n)
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Brake at the end
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
'''
#brake()
brake()
#gpio.cleanup()
#os._exit(0)
#*************************************************************
#
#Methods with no RPM params
#*************************************************************
def forwardadvanced(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING):
RPMInput = TRANSLATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
start_time = datetime.now()
print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
#if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True):
# """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs."""
global r
r.set('inMotion', now())
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY==True):
if (SUCCESSIVE_ACCELERATION_HANDLING == True) :
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start()
else:
print('Braking time '+ str(datetime.now()))
logging.info('Braking Time '+ str(datetime.now()))
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
#gpio.cleanup()
#sys.exit()
else:
print('No brake will be applied ')
#gpio.cleanup()
#sys.exit()
def forward():
forwardadvanced(True, True)
def reverseadvanced(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING):
RPMInput = TRANSLATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
#if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True):
# """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs."""
global r
r.set('inMotion', now())
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY==True):
if (SUCCESSIVE_ACCELERATION_HANDLING == True) :
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start()
else:
print('Braking time '+ str(datetime.now()))
logging.info('Braking Time '+ str(datetime.now()))
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
#gpio.cleanup()
#sys.exit()
else:
print('No brake will be applied ')
#gpio.cleanup()
#sys.exit()
def reverse():
reverseadvanced(True, True)
def turnLeftadvanced(BRAKE_ON_INACTIVITY):
RPMInput = ROTATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('turnleft Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnLeft Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
#pwm.setPWM(14, on, off)
#pwm.setPWM(15, on, off)
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY == True) :
#time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE)
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start()
'''
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
'''
#gpio.cleanup()
#sys.exit()
else:
print('No Brake will be applied')
def turnLeft():
turnLeftadvanced(True)
def turnRightadvanced(BRAKE_ON_INACTIVITY):
RPMInput = ROTATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
start_time = datetime.now()
print('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
#pwm.setPWM(14, on, off)
#pwm.setPWM(15, on, off)
global r
r.set('inMotion', now())
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY==True) :
#time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE)
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start()
'''
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
'''
#gpio.cleanup()
#sys.exit()
else:
print('No Brake will be applied')
def turnRight():
turnRightadvanced(True)
def forwardnostop():
forwardadvanced(False,False)
def reversenostop():
reverseadvanced(False,False)
def turnLeftnostop():
turnLeftadvanced(False)
def turnRightnostop():
turnRightadvanced(False)
def turnLeftWheelOnlyNoStopForward():
RPMInput = ROTATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
#gpio.output(22, True)
#gpio.output(27, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
#pwm.setPWM(14, on, off)
pwm.setPWM(LEFTWHEEL_DCMOTOR_GPIOPIN1, on, off)
def turnLeftWheelOnlyNoStopReverse():
RPMInput = ROTATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
#gpio.output(22, False)
#gpio.output(27, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
start_time = datetime.now()
print('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
#pwm.setPWM(14, on, off)
pwm.setPWM(LEFTWHEEL_DCMOTOR_GPIOPIN1, on, off)
def turnRightWheelOnlyNoStopForward():
RPMInput = ROTATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
def turnRightWheelOnlyNoStopReverse():
RPMInput = ROTATION_SPEED
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
start_time = datetime.now()
print('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#*************************************************************
#
#Methods with RPM params
#*************************************************************
def forwardadvancedatspeed(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING, INPUTRPM):
RPMInput = float(INPUTRPM)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
start_time = datetime.now()
print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
#if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True):
# """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs."""
global r
r.set('inMotion', now())
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY==True):
if (SUCCESSIVE_ACCELERATION_HANDLING == True) :
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start()
else:
print('Braking time '+ str(datetime.now()))
logging.info('Braking Time '+ str(datetime.now()))
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
#gpio.cleanup()
#sys.exit()
else:
print('No brake will be applied ')
#gpio.cleanup()
#sys.exit()
def forwardatspeed(INPUTRPM):
forwardadvancedatspeed(True, True, INPUTRPM)
def reverseadvancedatspeed(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING, INPUTRPM):
RPMInput = float(INPUTRPM)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
#if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True):
# """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs."""
global r
r.set('inMotion', now())
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY==True):
if (SUCCESSIVE_ACCELERATION_HANDLING == True) :
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start()
else:
print('Braking time '+ str(datetime.now()))
logging.info('Braking Time '+ str(datetime.now()))
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
#gpio.cleanup()
#sys.exit()
else:
print('No brake will be applied ')
#gpio.cleanup()
#sys.exit()
def reverseatspeed(INPUTRPM):
reverseadvancedatspeed(True, True, INPUTRPM)
def turnLeftadvancedatspeed(BRAKE_ON_INACTIVITY, INPUTRPM):
RPMInput = float(INPUTRPM)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('turnleft Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnLeft Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
#pwm.setPWM(14, on, off)
#pwm.setPWM(15, on, off)
global r
r.set('inMotion', now())
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY == True) :
#time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE)
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start()
'''
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
'''
#gpio.cleanup()
#sys.exit()
else:
print('No Brake will be applied')
def turnLeftatspeed(INPUTRPM):
turnLeftadvancedatspeed(True, INPUTRPM)
def turnRightadvancedatspeed(BRAKE_ON_INACTIVITY,INPUTRPM):
RPMInput = float(INPUTRPM)
print('RPMInput ')
print(RPMInput)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
start_time = datetime.now()
print('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
#pwm.setPWM(14, on, off)
#pwm.setPWM(15, on, off)
global r
r.set('inMotion', now())
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
#Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time
#IMportant: Dont brake in the main thread - use brakeoninactivity()
if (BRAKE_ON_INACTIVITY==True) :
#time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE) # Dont do this , read above
threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start()
'''
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
'''
#gpio.cleanup()
#sys.exit()
else:
print('No Brake will be applied')
def turnRightatspeed(INPUTRPM):
turnRightadvancedatspeed(True,INPUTRPM)
def forwardnostopatspeed(INPUTRPM):
forwardadvancedatspeed(False,False,INPUTRPM)
def reversenostopatspeed(INPUTRPM):
reverseadvancedatspeed(False,False,INPUTRPM)
def turnLeftnostopatspeed(INPUTRPM):
turnLeftadvancedatspeed(False,INPUTRPM)
def turnRightnostopatspeed(INPUTRPM):
turnRightadvancedatspeed(False,INPUTRPM)
def turnLeftWheelOnlyNoStopForwardatspeed(INPUTRPM):
RPMInput = float(INPUTRPM)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
#gpio.output(22, True)
#gpio.output(27, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
#pwm.setPWM(14, on, off)
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
def turnLeftWheelOnlyNoStopReverseatspeed(INPUTRPM):
RPMInput = float(INPUTRPM)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
#gpio.output(22, False)
#gpio.output(27, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
#pwm.setPWM(14, on, off)
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
def turnRightWheelOnlyNoStopForwardatspeed(INPUTRPM):
RPMInput = float(INPUTRPM)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False)
start_time = datetime.now()
print('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
def turnRightWheelOnlyNoStopReverseatspeed(INPUTRPM):
RPMInput = float(INPUTRPM)
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput
RPMInput=minRPM if (RPMInput<minRPM) else RPMInput
outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse)
#print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec')
off = on + int(outputPulse / timeperTick)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
start_time = datetime.now()
print('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) )
logging.info('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput))
global r
r.set('inMotion', now())
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
def turnCamHorizontal( positionAngle):
positionAngle=float(positionAngle)
if positionAngle>=float(CAMSERVO_STARTANGLE) and positionAngle<=float(CAMSERVO_ENDANGLE) :
pwm.setPWMFreq(CAMSERVO_FREQUENCY)
on = 0
outputPulse = CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC + ((positionAngle - CAMSERVO_STARTANGLE)*(CAMSERVO_ENDANGLE_PULSEWIDTH_IN_SEC - CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC)/(CAMSERVO_ENDANGLE - CAMSERVO_STARTANGLE))
FullPulse = 1/CAMSERVO_FREQUENCY
timeperTick = FullPulse / CAMSERVO_RESOLUTION
print(timeperTick)
off = on + int(outputPulse / timeperTick)
print('on '+str(on))
print('off '+str(off))
pwm.setPWM(int(CAMSERVO_HORIZONTAL_PWMCHANNEL), on, off)
def turnCamVertical( positionAngle):
positionAngle=float(positionAngle)
if positionAngle>=float(CAMSERVO_STARTANGLE) and positionAngle<=float(CAMSERVO_ENDANGLE) :
pwm.setPWMFreq(CAMSERVO_FREQUENCY)
on = 0
outputPulse = CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC + ((positionAngle - CAMSERVO_STARTANGLE)*(CAMSERVO_ENDANGLE_PULSEWIDTH_IN_SEC - CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC)/(CAMSERVO_ENDANGLE - CAMSERVO_STARTANGLE))
FullPulse = 1/CAMSERVO_FREQUENCY
timeperTick = FullPulse / CAMSERVO_RESOLUTION
off = on + int(outputPulse / timeperTick)
print('on '+str(on))
print('off '+str(off))
pwm.setPWM(int(CAMSERVO_VERTICAL_PWMCHANNEL), on, off)
def turnCamCenter():
turnCamHorizontal(0.0)
turnCamVertical(0.0)
def brake():
start_time = datetime.now()
print('Brake Start Time '+ str(start_time) )
logging.info('Brake Start Time '+ str(start_time) )
global r
r.set('braked', now())
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
def slowbrake():
start_time = datetime.now()
print('Brake Start Time '+ str(start_time) )
logging.info('Brake Start Time '+ str(start_time) )
global r
r.set('braked', now())
on = 0
FullPulse = 1/frequency
timeperTick = FullPulse / resolution
RPMInput = 160
print (RPMInput)
for n in range(int(RPMInput),100,-20):
outputPulse = ((n - minRPM)/(maxRPM - minRPM))*(FullPulse)
off = on + int(outputPulse / timeperTick)
print(n)
pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
def emergencybrake():
start_time = datetime.now()
print('Brake Start Time '+ str(start_time) )
logging.info('Brake Start Time '+ str(start_time) )
global r
r.set('braked', now())
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True)
gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True)
#gpio.cleanup()
#sys.exit()
if __name__ == '__main__':
CommandLineArgCount = len(sys.argv) -1
print('Press Ctrl+C to stop')
try:
if (CommandLineArgCount == 0) :
print(__doc__)
if (sys.argv[1] == 'forward') :
forward()
if (sys.argv[1] == 'forwardnostop'):
forwardadvanced(False,False)
if (sys.argv[1] == 'reverse') :
reverse()
if (sys.argv[1] == 'reversenostop'):
reverseadvanced(False,False)
if (sys.argv[1] == 'turnLeft') :
turnLeft()
if (sys.argv[1] == 'turnLeftnostop') :
turnLeftadvanced(False)
if (sys.argv[1] == 'turnRight') :
turnRight()
if (sys.argv[1] == 'turnRightnostop') :
turnRightadvanced(False)
if (sys.argv[1] == 'brake') :
brake()
if (sys.argv[1] == 'emergencybrake') :
emeregncybrake()
if (sys.argv[1] == 'turnLeftWheelOnlyNoStopForward') :
turnLeftWheelOnlyNoStopForward()
if (sys.argv[1] == 'turnLeftWheelOnlyNoStopReverse') :
turnLeftWheelOnlyNoStopReverse()
if (sys.argv[1] == 'turnRightWheelOnlyNoStopForward') :
turnRightWheelOnlyNoStopForward()
if (sys.argv[1] == 'turnRightWheelOnlyNoStopReverse') :
turnRightWheelOnlyNoStopReverse()
if (sys.argv[1] == 'turnCamCenter') :
turnCamCenter()
if (sys.argv[1] == 'turnCamHorizontal' and CommandLineArgCount == 2) :
turnCamHorizontal(float(sys.argv[2]))
if (sys.argv[1] == 'turnCamVertical' and CommandLineArgCount == 2) :
turnCamVertical(float(sys.argv[2]))
if (sys.argv[1] == 'forwardatspeed' and CommandLineArgCount == 2) :
forwardatspeed(float(sys.argv[2]))
if (sys.argv[1] == 'forwardnostopatspeed' and CommandLineArgCount == 2):
forwardadvancedatspeed(False,False,float(sys.argv[2]))
if (sys.argv[1] == 'reverseatspeed' and CommandLineArgCount == 2) :
reverseatspeed(float(sys.argv[2]))
if (sys.argv[1] == 'reversenostopatspeed' and CommandLineArgCount == 2):
reverseadvancedatspeed(False,False,float(sys.argv[2]))
if (sys.argv[1] == 'turnLeftatspeed' and CommandLineArgCount == 2) :
turnLeftatspeed(float(sys.argv[2]))
if (sys.argv[1] == 'turnLeftnostopatspeed' and CommandLineArgCount == 2) :
turnLeftadvancedatspeed(False,float(sys.argv[2]))
if (sys.argv[1] == 'turnRightatspeed' and CommandLineArgCount == 2) :
turnRightatspeed(float(sys.argv[2]))
if (sys.argv[1] == 'turnRightnostopatspeed' and CommandLineArgCount == 2) :
turnRightadvancedatspeed(False,float(sys.argv[2]))
if (sys.argv[1] == 'turnLeftWheelOnlyNoStopForwardatspeed' and CommandLineArgCount == 2) :
turnLeftWheelOnlyNoStopForwardatspeed(float(sys.argv[2]))
if (sys.argv[1] == 'turnLeftWheelOnlyNoStopReverseatspeed' and CommandLineArgCount == 2) :
turnLeftWheelOnlyNoStopReverseatspeed(float(sys.argv[2]))
if (sys.argv[1] == 'turnRightWheelOnlyNoStopForwardatspeed' and CommandLineArgCount == 2) :
turnRightWheelOnlyNoStopForwardatspeed(float(sys.argv[2]))
if (sys.argv[1] == 'turnRightWheelOnlyNoStopReverseatspeed' and CommandLineArgCount == 2) :
turnRightWheelOnlyNoStopReverseatspeed(float(sys.argv[2]))
while True:
pass
except ValueError as err:
print(err.args)
except KeyboardInterrupt:
#brakebothWheels('DCMotor2','DCMotor1')
print 'Exiting Progam'
finally :
#gpio.cleanup() must be done before exiting the program
gpio.cleanup()
print('')
#!/usr/bin/env python
"""
UDPListener.py
"""
from __future__ import print_function
from plugins import find_callable
from plugins import load_modules
from select import select
from socket import AF_INET
from socket import SOCK_DGRAM
from socket import inet_ntoa
from socket import socket
from threading import Lock
import collections
import logging
import re
import traceback
def parse_command(command):
return command.split()
class UDPListener(object):
def __init__(self, plugin_modules=[]):
self.__listening = False
self.__listening_lock = Lock()
self._dgram_size = 1024
self._frequency = 0.1
self.plugin_modules = plugin_modules
def __check_response(self, command, *return_values):
# TODO: I wasn't sure what the exact return signature of
# DriverMain was, but you may wish to assess its return
# value(s), do any additional actions, and return a
# success/failure assertion.
return True
def __service_command(self, argv):
command = argv[0]
args = argv[1:] if len(argv) > 1 else []
# See if there is a callable 'command' in the plugin
# pool.
c = find_callable(command, self.plugin_modules)
if c != None:
logging.debug('calling {}'.format(command))
# All StandardError-derived errors are caught
# so that the service does not crash except in
# extraordinary circumstances.
try:
result = c(*args)
except StandardError:
logging.error(traceback.format_exc())
else:
logging.debug('{} finished'.format(command))
if not isinstance(result,
collections.Sequence):
result = [result]
return self.__check_response(
command, *result)
else:
logging.warning('unknown command "{}"'.format(
command))
return False
# Helper properties for thread-safe access to the listener state.
@property
def _listening(self):
self.__listening_lock.acquire()
listening = self.__listening
self.__listening_lock.release()
return listening
@_listening.setter
def _listening(self, listening):
self.__listening_lock.acquire()
self.__listening = listening
self.__listening_lock.release()
def _service(self, data, source_address):
"""Service a datagram."""
logging.info('################################################')
logging.info('servicing request from {0}'.format(source_address[0]))
logging.info('servicing request from {0}'.format(data))
try:
argv = parse_command(data)
except ValueError:
pass
else:
if len(argv) > 0:
return self.__service_command(argv)
return False
def listen(self, port, hostname='127.0.0.1'):
"""Listen on UDP hostname:port if not running, otherwise raise
a RuntimeError.
"""
logging.info('listening on {0}:{1}'.format(hostname, port))
# Ensure that the listener is not already listening. If it
# isn't then prepare the listen socket.
service_requests = False
self.__listening_lock.acquire()
if not self.__listening:
self.__listening = True
sock = socket(AF_INET, SOCK_DGRAM)
sock.bind((hostname, port))
service_requests = True
self.__listening_lock.release()
# If the listener was already listening then raise an error.
if not service_requests:
raise RuntimeError('the listener is already listening')
# Poll the listen socket at `_frequency` interval and if there
# is any pending data then service the request.
while self._listening:
r, _, _ = select([sock], [], [], self._frequency)
if sock in r:
# Nothing is currently done with the service method's
# return value, but you could e.g. log failure.
self._service(*sock.recvfrom(self._dgram_size))
# Close the listen socket and exit the frame.
sock.close()
def listening(self):
"""Returns the boolean listening state of the listener."""
return self._listening
def stop(self):
"""If the listener is listening then stop it, otherwise raise
a RuntimeError.
"""
self.__listening_lock.acquire()
if self.__listening:
raise RuntimeError('the listener is not listening')
self.__listening = False
self.__listening_lock.release()
if __name__ == '__main__':
import Driver
import fcntl
import os.path
import struct
import sys
IFACE = 'wlan0'
PORT = 9090
# XXX: You can replace the plugin directory with a directory
# more suited to your application. If it's an appliance
# you might want to consider a directory in /etc,
# /usr/local/share, or /var/lib.
PLUGIN_DIRECTORY = os.path.abspath('/usr/local/bin/Workspace/plugins')
def get_ip_address(ifname):
# This code was copied from:
# http://code.activestate.com/recipes/439094-get-the-ip-address-associated-with-a-network-inter/
s = socket(AF_INET, SOCK_DGRAM)
address = inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', ifname[:15])
)[20:24])
s.close()
return address
# Add the plugin directory to the Python module search path.
# It is added first to prevent plugins from being occluded
# by library modules.
sys.path.insert(0, PLUGIN_DIRECTORY)
hostname, port = get_ip_address(IFACE), PORT
logging.basicConfig(format='%(asctime)s %(message)s',
level=logging.DEBUG)
# UDPListener is instantiated with any plugin modules in
# addition to the Driver module, which appears first in the
# plugin list.
#listener = UDPListener([Driver] + load_modules(PLUGIN_DIRECTORY))
try:
listener = UDPListener([Driver] + load_modules(PLUGIN_DIRECTORY))
listener.listen(port, hostname)
except KeyboardInterrupt:
print('Exiting...')
except Exception as err:
logging.error(traceback.format_exc())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment