Created
May 15, 2020 01:33
-
-
Save uadrive1/4c1e832d6ac8d06e59af4c73611446a7 to your computer and use it in GitHub Desktop.
Mock python hardware calls - relevant files
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| ''' | |
| * Copyright (C) 2015-2023 | |
| * | |
| * This file is part of Inertia.cc project. | |
| command line usage | |
| cd /usr/local/bin/Workspace/plugins/ && sudo python SimplemotorTest.py forward | |
| function usage | |
| forward() | |
| forwardadvanced(True, True) | |
| UDP commands | |
| forward | |
| reverse | |
| turnRight | |
| turnRight | |
| ''' | |
| import threading | |
| import io | |
| import sys | |
| sys.path.append('/usr/local/bin/Workspace') | |
| import os | |
| import shutil | |
| import redis | |
| import logging | |
| import re | |
| from Adafruit_PWM_Servo_Driver import PWM | |
| import time | |
| import json | |
| import RPi.GPIO as gpio | |
| from threading import Timer | |
| from datetime import datetime | |
| REDIS_HOST = 'localhost' | |
| REDIS_PORT = '6379'; | |
| gpio.setmode(gpio.BCM) | |
| gpio.setup(22, gpio.OUT) | |
| gpio.setup(27, gpio.OUT) | |
| gpio.setup(4, gpio.OUT) | |
| gpio.setup(17, gpio.OUT) | |
| if not os.geteuid() == 0: | |
| sys.exit("\nGPIO access requires super user access. Only root can run this script.\n") | |
| #DC motor params | |
| minRPM = 150.0 | |
| maxRPM = 313.0 | |
| frequency = 500.0*20 | |
| resolution = 4096.0 | |
| # Initialize the PWM device using the default address | |
| pwm = PWM(0x40, True) | |
| pwm.setPWMFreq(frequency) | |
| RIGHTWHEEL_ID ='DCMotor1' | |
| LEFTWHEEL_ID ='DCMotor2' | |
| ''' | |
| LEFTWHEEL_DCMOTOR_GPIOPIN1=22 | |
| LEFTWHEEL_DCMOTOR_GPIOPIN2=27 | |
| LEFTWHEEL_DCMOTOR_PWMCHANNEL=14 | |
| RIGHTWHEEL_DCMOTOR_GPIOPIN1=4 | |
| RIGHTWHEEL_DCMOTOR_GPIOPIN2=17 | |
| RIGHTWHEEL_DCMOTOR_PWMCHANNEL=15 | |
| ''' | |
| LEFTWHEEL_DCMOTOR_GPIOPIN1=4 | |
| LEFTWHEEL_DCMOTOR_GPIOPIN2=17 | |
| LEFTWHEEL_DCMOTOR_PWMCHANNEL=15 | |
| RIGHTWHEEL_DCMOTOR_GPIOPIN1=22 | |
| RIGHTWHEEL_DCMOTOR_GPIOPIN2=27 | |
| RIGHTWHEEL_DCMOTOR_PWMCHANNEL=14 | |
| TRANSLATION_SPEED = 200 | |
| ROTATION_SPEED = 200 | |
| BRAKETIME_ON_INACTIVITY_TRANSLATE = .7 #in seconds | |
| BRAKETIME_ON_INACTIVITY_ROTATE = .2 #in seconds | |
| ''' | |
| CAMERA Servo motor constants | |
| ''' | |
| CAMSERVO_STARTANGLE=-90.0 | |
| CAMSERVO_ENDANGLE=90.0 | |
| CAMSERVO_STARTANGLE_PULSEWIDTH_IN_MICROSEC = 1000.0 | |
| CAMSERVO_ENDANGLE_PULSEWIDTH_IN_MICROSEC = 2000.0 | |
| CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC = float(CAMSERVO_STARTANGLE_PULSEWIDTH_IN_MICROSEC) / 1000000 | |
| CAMSERVO_ENDANGLE_PULSEWIDTH_IN_SEC = float(CAMSERVO_ENDANGLE_PULSEWIDTH_IN_MICROSEC) / 1000000 | |
| CAMSERVO_FREQUENCY = 60.0 # in Hz | |
| CAMSERVO_RESOLUTION = 4096.0 | |
| CAMSERVO_VERTICAL_PWMCHANNEL=12 | |
| CAMSERVO_HORIZONTAL_PWMCHANNEL=13 | |
| r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT ) | |
| def now(): | |
| """Get the current Unix epoch time as an integer.""" | |
| return int(time.time()*1000*1000) | |
| def brakeoninactivity(brakeAfter): | |
| print "brakeoninactivity called" | |
| time.sleep(BRAKETIME_ON_INACTIVITY_TRANSLATE) | |
| timestamp_a, timestamp_b = r.get('inMotion'), r.get('braked') | |
| if timestamp_a is None : | |
| timestamp_a = 0 | |
| else: | |
| timestamp_a = int(timestamp_a) | |
| if timestamp_b is None : | |
| timestamp_b = 0 | |
| else: | |
| timestamp_b = int(timestamp_b) | |
| #print(now()) | |
| #print(timestamp_a) | |
| #print(timestamp_b) | |
| # If the last 'inMotion' call was 'brakeAfter' secs before and the last 'braked' call was even before that, then brake. | |
| print (str(now() - int(timestamp_a) >= brakeAfter*1000*1000 and int(timestamp_b)<= int(timestamp_a))) | |
| if now() - int(timestamp_a) >= brakeAfter*1000*1000 and int(timestamp_b)<= int(timestamp_a) : | |
| ''' | |
| print('Braking time '+ str(datetime.now())) | |
| logging.info('Braking Time '+ str(datetime.now())) | |
| #Slowing down from 100 to 0 | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput = 70 | |
| print (RPMInput) | |
| for n in range(int(RPMInput),-1,-1): | |
| outputPulse = ((n - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| off = on + int(outputPulse / timeperTick) | |
| print(n) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Brake at the end | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| ''' | |
| #brake() | |
| brake() | |
| #gpio.cleanup() | |
| #os._exit(0) | |
| #************************************************************* | |
| # | |
| #Methods with no RPM params | |
| #************************************************************* | |
| def forwardadvanced(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING): | |
| RPMInput = TRANSLATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| start_time = datetime.now() | |
| print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| #if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True): | |
| # """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs.""" | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY==True): | |
| if (SUCCESSIVE_ACCELERATION_HANDLING == True) : | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start() | |
| else: | |
| print('Braking time '+ str(datetime.now())) | |
| logging.info('Braking Time '+ str(datetime.now())) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No brake will be applied ') | |
| #gpio.cleanup() | |
| #sys.exit() | |
| def forward(): | |
| forwardadvanced(True, True) | |
| def reverseadvanced(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING): | |
| RPMInput = TRANSLATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| #if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True): | |
| # """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs.""" | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY==True): | |
| if (SUCCESSIVE_ACCELERATION_HANDLING == True) : | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start() | |
| else: | |
| print('Braking time '+ str(datetime.now())) | |
| logging.info('Braking Time '+ str(datetime.now())) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No brake will be applied ') | |
| #gpio.cleanup() | |
| #sys.exit() | |
| def reverse(): | |
| reverseadvanced(True, True) | |
| def turnLeftadvanced(BRAKE_ON_INACTIVITY): | |
| RPMInput = ROTATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('turnleft Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnLeft Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| #pwm.setPWM(14, on, off) | |
| #pwm.setPWM(15, on, off) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY == True) : | |
| #time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE) | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start() | |
| ''' | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| ''' | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No Brake will be applied') | |
| def turnLeft(): | |
| turnLeftadvanced(True) | |
| def turnRightadvanced(BRAKE_ON_INACTIVITY): | |
| RPMInput = ROTATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| start_time = datetime.now() | |
| print('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| #pwm.setPWM(14, on, off) | |
| #pwm.setPWM(15, on, off) | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY==True) : | |
| #time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE) | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start() | |
| ''' | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| ''' | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No Brake will be applied') | |
| def turnRight(): | |
| turnRightadvanced(True) | |
| def forwardnostop(): | |
| forwardadvanced(False,False) | |
| def reversenostop(): | |
| reverseadvanced(False,False) | |
| def turnLeftnostop(): | |
| turnLeftadvanced(False) | |
| def turnRightnostop(): | |
| turnRightadvanced(False) | |
| def turnLeftWheelOnlyNoStopForward(): | |
| RPMInput = ROTATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| #gpio.output(22, True) | |
| #gpio.output(27, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| #pwm.setPWM(14, on, off) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_GPIOPIN1, on, off) | |
| def turnLeftWheelOnlyNoStopReverse(): | |
| RPMInput = ROTATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| #gpio.output(22, False) | |
| #gpio.output(27, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| start_time = datetime.now() | |
| print('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| #pwm.setPWM(14, on, off) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_GPIOPIN1, on, off) | |
| def turnRightWheelOnlyNoStopForward(): | |
| RPMInput = ROTATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| def turnRightWheelOnlyNoStopReverse(): | |
| RPMInput = ROTATION_SPEED | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| start_time = datetime.now() | |
| print('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #************************************************************* | |
| # | |
| #Methods with RPM params | |
| #************************************************************* | |
| def forwardadvancedatspeed(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING, INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| start_time = datetime.now() | |
| print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| #if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True): | |
| # """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs.""" | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY==True): | |
| if (SUCCESSIVE_ACCELERATION_HANDLING == True) : | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start() | |
| else: | |
| print('Braking time '+ str(datetime.now())) | |
| logging.info('Braking Time '+ str(datetime.now())) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No brake will be applied ') | |
| #gpio.cleanup() | |
| #sys.exit() | |
| def forwardatspeed(INPUTRPM): | |
| forwardadvancedatspeed(True, True, INPUTRPM) | |
| def reverseadvancedatspeed(BRAKE_ON_INACTIVITY, SUCCESSIVE_ACCELERATION_HANDLING, INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('Forward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| #if (BRAKE_ON_INACTIVITY==True and SUCCESSIVE_ACCELERATION_HANDLING==True): | |
| # """Make translate motion; apply brakes if no further commands were recieved after "brakeAfter" secs.""" | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY==True): | |
| if (SUCCESSIVE_ACCELERATION_HANDLING == True) : | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_TRANSLATE] ).start() | |
| else: | |
| print('Braking time '+ str(datetime.now())) | |
| logging.info('Braking Time '+ str(datetime.now())) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No brake will be applied ') | |
| #gpio.cleanup() | |
| #sys.exit() | |
| def reverseatspeed(INPUTRPM): | |
| reverseadvancedatspeed(True, True, INPUTRPM) | |
| def turnLeftadvancedatspeed(BRAKE_ON_INACTIVITY, INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('turnleft Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnLeft Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| #pwm.setPWM(14, on, off) | |
| #pwm.setPWM(15, on, off) | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY == True) : | |
| #time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE) | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start() | |
| ''' | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| ''' | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No Brake will be applied') | |
| def turnLeftatspeed(INPUTRPM): | |
| turnLeftadvancedatspeed(True, INPUTRPM) | |
| def turnRightadvancedatspeed(BRAKE_ON_INACTIVITY,INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| print('RPMInput ') | |
| print(RPMInput) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| start_time = datetime.now() | |
| print('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnRight Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| #pwm.setPWM(14, on, off) | |
| #pwm.setPWM(15, on, off) | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| #Important: Always brake in an async thread, otherwise the time.sleep call blocks the main thread and subsequent motion calls (due to consecutive keypresses) will be forced to remain in the job queue and the wheels will in motion for a strangely longer period of time | |
| #IMportant: Dont brake in the main thread - use brakeoninactivity() | |
| if (BRAKE_ON_INACTIVITY==True) : | |
| #time.sleep(BRAKETIME_ON_INACTIVITY_ROTATE) # Dont do this , read above | |
| threading.Thread(target=brakeoninactivity, args=[BRAKETIME_ON_INACTIVITY_ROTATE] ).start() | |
| ''' | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| ''' | |
| #gpio.cleanup() | |
| #sys.exit() | |
| else: | |
| print('No Brake will be applied') | |
| def turnRightatspeed(INPUTRPM): | |
| turnRightadvancedatspeed(True,INPUTRPM) | |
| def forwardnostopatspeed(INPUTRPM): | |
| forwardadvancedatspeed(False,False,INPUTRPM) | |
| def reversenostopatspeed(INPUTRPM): | |
| reverseadvancedatspeed(False,False,INPUTRPM) | |
| def turnLeftnostopatspeed(INPUTRPM): | |
| turnLeftadvancedatspeed(False,INPUTRPM) | |
| def turnRightnostopatspeed(INPUTRPM): | |
| turnRightadvancedatspeed(False,INPUTRPM) | |
| def turnLeftWheelOnlyNoStopForwardatspeed(INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| #gpio.output(22, True) | |
| #gpio.output(27, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnLeftWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| #pwm.setPWM(14, on, off) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| def turnLeftWheelOnlyNoStopReverseatspeed(INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| #gpio.output(22, False) | |
| #gpio.output(27, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnLeftWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| #pwm.setPWM(14, on, off) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| def turnRightWheelOnlyNoStopForwardatspeed(INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, False) | |
| start_time = datetime.now() | |
| print('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnRightWheelOnlyNoStopForward Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| def turnRightWheelOnlyNoStopReverseatspeed(INPUTRPM): | |
| RPMInput = float(INPUTRPM) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput=maxRPM if (RPMInput>maxRPM) else RPMInput | |
| RPMInput=minRPM if (RPMInput<minRPM) else RPMInput | |
| outputPulse = ((RPMInput - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| #print (name+' PulseWidth for given RPM = '+str(RPMInput)+' is '+ str(outputPulse)+' sec') | |
| off = on + int(outputPulse / timeperTick) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, False) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| start_time = datetime.now() | |
| print('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput) ) | |
| logging.info('turnRightWheelOnlyNoStopReverse Start Time '+ str(start_time)+' at RPM '+str(RPMInput)) | |
| global r | |
| r.set('inMotion', now()) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| def turnCamHorizontal( positionAngle): | |
| positionAngle=float(positionAngle) | |
| if positionAngle>=float(CAMSERVO_STARTANGLE) and positionAngle<=float(CAMSERVO_ENDANGLE) : | |
| pwm.setPWMFreq(CAMSERVO_FREQUENCY) | |
| on = 0 | |
| outputPulse = CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC + ((positionAngle - CAMSERVO_STARTANGLE)*(CAMSERVO_ENDANGLE_PULSEWIDTH_IN_SEC - CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC)/(CAMSERVO_ENDANGLE - CAMSERVO_STARTANGLE)) | |
| FullPulse = 1/CAMSERVO_FREQUENCY | |
| timeperTick = FullPulse / CAMSERVO_RESOLUTION | |
| print(timeperTick) | |
| off = on + int(outputPulse / timeperTick) | |
| print('on '+str(on)) | |
| print('off '+str(off)) | |
| pwm.setPWM(int(CAMSERVO_HORIZONTAL_PWMCHANNEL), on, off) | |
| def turnCamVertical( positionAngle): | |
| positionAngle=float(positionAngle) | |
| if positionAngle>=float(CAMSERVO_STARTANGLE) and positionAngle<=float(CAMSERVO_ENDANGLE) : | |
| pwm.setPWMFreq(CAMSERVO_FREQUENCY) | |
| on = 0 | |
| outputPulse = CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC + ((positionAngle - CAMSERVO_STARTANGLE)*(CAMSERVO_ENDANGLE_PULSEWIDTH_IN_SEC - CAMSERVO_STARTANGLE_PULSEWIDTH_IN_SEC)/(CAMSERVO_ENDANGLE - CAMSERVO_STARTANGLE)) | |
| FullPulse = 1/CAMSERVO_FREQUENCY | |
| timeperTick = FullPulse / CAMSERVO_RESOLUTION | |
| off = on + int(outputPulse / timeperTick) | |
| print('on '+str(on)) | |
| print('off '+str(off)) | |
| pwm.setPWM(int(CAMSERVO_VERTICAL_PWMCHANNEL), on, off) | |
| def turnCamCenter(): | |
| turnCamHorizontal(0.0) | |
| turnCamVertical(0.0) | |
| def brake(): | |
| start_time = datetime.now() | |
| print('Brake Start Time '+ str(start_time) ) | |
| logging.info('Brake Start Time '+ str(start_time) ) | |
| global r | |
| r.set('braked', now()) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| def slowbrake(): | |
| start_time = datetime.now() | |
| print('Brake Start Time '+ str(start_time) ) | |
| logging.info('Brake Start Time '+ str(start_time) ) | |
| global r | |
| r.set('braked', now()) | |
| on = 0 | |
| FullPulse = 1/frequency | |
| timeperTick = FullPulse / resolution | |
| RPMInput = 160 | |
| print (RPMInput) | |
| for n in range(int(RPMInput),100,-20): | |
| outputPulse = ((n - minRPM)/(maxRPM - minRPM))*(FullPulse) | |
| off = on + int(outputPulse / timeperTick) | |
| print(n) | |
| pwm.setPWM(LEFTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| pwm.setPWM(RIGHTWHEEL_DCMOTOR_PWMCHANNEL, on, off) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| def emergencybrake(): | |
| start_time = datetime.now() | |
| print('Brake Start Time '+ str(start_time) ) | |
| logging.info('Brake Start Time '+ str(start_time) ) | |
| global r | |
| r.set('braked', now()) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(LEFTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN1, True) | |
| gpio.output(RIGHTWHEEL_DCMOTOR_GPIOPIN2, True) | |
| #gpio.cleanup() | |
| #sys.exit() | |
| if __name__ == '__main__': | |
| CommandLineArgCount = len(sys.argv) -1 | |
| print('Press Ctrl+C to stop') | |
| try: | |
| if (CommandLineArgCount == 0) : | |
| print(__doc__) | |
| if (sys.argv[1] == 'forward') : | |
| forward() | |
| if (sys.argv[1] == 'forwardnostop'): | |
| forwardadvanced(False,False) | |
| if (sys.argv[1] == 'reverse') : | |
| reverse() | |
| if (sys.argv[1] == 'reversenostop'): | |
| reverseadvanced(False,False) | |
| if (sys.argv[1] == 'turnLeft') : | |
| turnLeft() | |
| if (sys.argv[1] == 'turnLeftnostop') : | |
| turnLeftadvanced(False) | |
| if (sys.argv[1] == 'turnRight') : | |
| turnRight() | |
| if (sys.argv[1] == 'turnRightnostop') : | |
| turnRightadvanced(False) | |
| if (sys.argv[1] == 'brake') : | |
| brake() | |
| if (sys.argv[1] == 'emergencybrake') : | |
| emeregncybrake() | |
| if (sys.argv[1] == 'turnLeftWheelOnlyNoStopForward') : | |
| turnLeftWheelOnlyNoStopForward() | |
| if (sys.argv[1] == 'turnLeftWheelOnlyNoStopReverse') : | |
| turnLeftWheelOnlyNoStopReverse() | |
| if (sys.argv[1] == 'turnRightWheelOnlyNoStopForward') : | |
| turnRightWheelOnlyNoStopForward() | |
| if (sys.argv[1] == 'turnRightWheelOnlyNoStopReverse') : | |
| turnRightWheelOnlyNoStopReverse() | |
| if (sys.argv[1] == 'turnCamCenter') : | |
| turnCamCenter() | |
| if (sys.argv[1] == 'turnCamHorizontal' and CommandLineArgCount == 2) : | |
| turnCamHorizontal(float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnCamVertical' and CommandLineArgCount == 2) : | |
| turnCamVertical(float(sys.argv[2])) | |
| if (sys.argv[1] == 'forwardatspeed' and CommandLineArgCount == 2) : | |
| forwardatspeed(float(sys.argv[2])) | |
| if (sys.argv[1] == 'forwardnostopatspeed' and CommandLineArgCount == 2): | |
| forwardadvancedatspeed(False,False,float(sys.argv[2])) | |
| if (sys.argv[1] == 'reverseatspeed' and CommandLineArgCount == 2) : | |
| reverseatspeed(float(sys.argv[2])) | |
| if (sys.argv[1] == 'reversenostopatspeed' and CommandLineArgCount == 2): | |
| reverseadvancedatspeed(False,False,float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnLeftatspeed' and CommandLineArgCount == 2) : | |
| turnLeftatspeed(float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnLeftnostopatspeed' and CommandLineArgCount == 2) : | |
| turnLeftadvancedatspeed(False,float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnRightatspeed' and CommandLineArgCount == 2) : | |
| turnRightatspeed(float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnRightnostopatspeed' and CommandLineArgCount == 2) : | |
| turnRightadvancedatspeed(False,float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnLeftWheelOnlyNoStopForwardatspeed' and CommandLineArgCount == 2) : | |
| turnLeftWheelOnlyNoStopForwardatspeed(float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnLeftWheelOnlyNoStopReverseatspeed' and CommandLineArgCount == 2) : | |
| turnLeftWheelOnlyNoStopReverseatspeed(float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnRightWheelOnlyNoStopForwardatspeed' and CommandLineArgCount == 2) : | |
| turnRightWheelOnlyNoStopForwardatspeed(float(sys.argv[2])) | |
| if (sys.argv[1] == 'turnRightWheelOnlyNoStopReverseatspeed' and CommandLineArgCount == 2) : | |
| turnRightWheelOnlyNoStopReverseatspeed(float(sys.argv[2])) | |
| while True: | |
| pass | |
| except ValueError as err: | |
| print(err.args) | |
| except KeyboardInterrupt: | |
| #brakebothWheels('DCMotor2','DCMotor1') | |
| print 'Exiting Progam' | |
| finally : | |
| #gpio.cleanup() must be done before exiting the program | |
| gpio.cleanup() | |
| print('') | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| """ | |
| UDPListener.py | |
| """ | |
| from __future__ import print_function | |
| from plugins import find_callable | |
| from plugins import load_modules | |
| from select import select | |
| from socket import AF_INET | |
| from socket import SOCK_DGRAM | |
| from socket import inet_ntoa | |
| from socket import socket | |
| from threading import Lock | |
| import collections | |
| import logging | |
| import re | |
| import traceback | |
| def parse_command(command): | |
| return command.split() | |
| class UDPListener(object): | |
| def __init__(self, plugin_modules=[]): | |
| self.__listening = False | |
| self.__listening_lock = Lock() | |
| self._dgram_size = 1024 | |
| self._frequency = 0.1 | |
| self.plugin_modules = plugin_modules | |
| def __check_response(self, command, *return_values): | |
| # TODO: I wasn't sure what the exact return signature of | |
| # DriverMain was, but you may wish to assess its return | |
| # value(s), do any additional actions, and return a | |
| # success/failure assertion. | |
| return True | |
| def __service_command(self, argv): | |
| command = argv[0] | |
| args = argv[1:] if len(argv) > 1 else [] | |
| # See if there is a callable 'command' in the plugin | |
| # pool. | |
| c = find_callable(command, self.plugin_modules) | |
| if c != None: | |
| logging.debug('calling {}'.format(command)) | |
| # All StandardError-derived errors are caught | |
| # so that the service does not crash except in | |
| # extraordinary circumstances. | |
| try: | |
| result = c(*args) | |
| except StandardError: | |
| logging.error(traceback.format_exc()) | |
| else: | |
| logging.debug('{} finished'.format(command)) | |
| if not isinstance(result, | |
| collections.Sequence): | |
| result = [result] | |
| return self.__check_response( | |
| command, *result) | |
| else: | |
| logging.warning('unknown command "{}"'.format( | |
| command)) | |
| return False | |
| # Helper properties for thread-safe access to the listener state. | |
| @property | |
| def _listening(self): | |
| self.__listening_lock.acquire() | |
| listening = self.__listening | |
| self.__listening_lock.release() | |
| return listening | |
| @_listening.setter | |
| def _listening(self, listening): | |
| self.__listening_lock.acquire() | |
| self.__listening = listening | |
| self.__listening_lock.release() | |
| def _service(self, data, source_address): | |
| """Service a datagram.""" | |
| logging.info('################################################') | |
| logging.info('servicing request from {0}'.format(source_address[0])) | |
| logging.info('servicing request from {0}'.format(data)) | |
| try: | |
| argv = parse_command(data) | |
| except ValueError: | |
| pass | |
| else: | |
| if len(argv) > 0: | |
| return self.__service_command(argv) | |
| return False | |
| def listen(self, port, hostname='127.0.0.1'): | |
| """Listen on UDP hostname:port if not running, otherwise raise | |
| a RuntimeError. | |
| """ | |
| logging.info('listening on {0}:{1}'.format(hostname, port)) | |
| # Ensure that the listener is not already listening. If it | |
| # isn't then prepare the listen socket. | |
| service_requests = False | |
| self.__listening_lock.acquire() | |
| if not self.__listening: | |
| self.__listening = True | |
| sock = socket(AF_INET, SOCK_DGRAM) | |
| sock.bind((hostname, port)) | |
| service_requests = True | |
| self.__listening_lock.release() | |
| # If the listener was already listening then raise an error. | |
| if not service_requests: | |
| raise RuntimeError('the listener is already listening') | |
| # Poll the listen socket at `_frequency` interval and if there | |
| # is any pending data then service the request. | |
| while self._listening: | |
| r, _, _ = select([sock], [], [], self._frequency) | |
| if sock in r: | |
| # Nothing is currently done with the service method's | |
| # return value, but you could e.g. log failure. | |
| self._service(*sock.recvfrom(self._dgram_size)) | |
| # Close the listen socket and exit the frame. | |
| sock.close() | |
| def listening(self): | |
| """Returns the boolean listening state of the listener.""" | |
| return self._listening | |
| def stop(self): | |
| """If the listener is listening then stop it, otherwise raise | |
| a RuntimeError. | |
| """ | |
| self.__listening_lock.acquire() | |
| if self.__listening: | |
| raise RuntimeError('the listener is not listening') | |
| self.__listening = False | |
| self.__listening_lock.release() | |
| if __name__ == '__main__': | |
| import Driver | |
| import fcntl | |
| import os.path | |
| import struct | |
| import sys | |
| IFACE = 'wlan0' | |
| PORT = 9090 | |
| # XXX: You can replace the plugin directory with a directory | |
| # more suited to your application. If it's an appliance | |
| # you might want to consider a directory in /etc, | |
| # /usr/local/share, or /var/lib. | |
| PLUGIN_DIRECTORY = os.path.abspath('/usr/local/bin/Workspace/plugins') | |
| def get_ip_address(ifname): | |
| # This code was copied from: | |
| # http://code.activestate.com/recipes/439094-get-the-ip-address-associated-with-a-network-inter/ | |
| s = socket(AF_INET, SOCK_DGRAM) | |
| address = inet_ntoa(fcntl.ioctl( | |
| s.fileno(), | |
| 0x8915, # SIOCGIFADDR | |
| struct.pack('256s', ifname[:15]) | |
| )[20:24]) | |
| s.close() | |
| return address | |
| # Add the plugin directory to the Python module search path. | |
| # It is added first to prevent plugins from being occluded | |
| # by library modules. | |
| sys.path.insert(0, PLUGIN_DIRECTORY) | |
| hostname, port = get_ip_address(IFACE), PORT | |
| logging.basicConfig(format='%(asctime)s %(message)s', | |
| level=logging.DEBUG) | |
| # UDPListener is instantiated with any plugin modules in | |
| # addition to the Driver module, which appears first in the | |
| # plugin list. | |
| #listener = UDPListener([Driver] + load_modules(PLUGIN_DIRECTORY)) | |
| try: | |
| listener = UDPListener([Driver] + load_modules(PLUGIN_DIRECTORY)) | |
| listener.listen(port, hostname) | |
| except KeyboardInterrupt: | |
| print('Exiting...') | |
| except Exception as err: | |
| logging.error(traceback.format_exc()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment