#!/usr/bin/env python """ UDPListener.py """ from __future__ import print_function from plugins import find_callable from plugins import load_modules from select import select from socket import AF_INET from socket import SOCK_DGRAM from socket import inet_ntoa from socket import socket from threading import Lock import collections import logging import re import traceback def parse_command(command): return command.split() class UDPListener(object): def __init__(self, plugin_modules=[]): self.__listening = False self.__listening_lock = Lock() self._dgram_size = 1024 self._frequency = 0.1 self.plugin_modules = plugin_modules def __check_response(self, command, *return_values): # TODO: I wasn't sure what the exact return signature of # DriverMain was, but you may wish to assess its return # value(s), do any additional actions, and return a # success/failure assertion. return True def __service_command(self, argv): command = argv[0] args = argv[1:] if len(argv) > 1 else [] # See if there is a callable 'command' in the plugin # pool. c = find_callable(command, self.plugin_modules) if c != None: logging.debug('calling {}'.format(command)) # All StandardError-derived errors are caught # so that the service does not crash except in # extraordinary circumstances. try: result = c(*args) except StandardError: logging.error(traceback.format_exc()) else: logging.debug('{} finished'.format(command)) if not isinstance(result, collections.Sequence): result = [result] return self.__check_response( command, *result) else: logging.warning('unknown command "{}"'.format( command)) return False # Helper properties for thread-safe access to the listener state. @property def _listening(self): self.__listening_lock.acquire() listening = self.__listening self.__listening_lock.release() return listening @_listening.setter def _listening(self, listening): self.__listening_lock.acquire() self.__listening = listening self.__listening_lock.release() def _service(self, data, source_address): """Service a datagram.""" logging.info('################################################') logging.info('servicing request from {0}'.format(source_address[0])) logging.info('servicing request from {0}'.format(data)) try: argv = parse_command(data) except ValueError: pass else: if len(argv) > 0: return self.__service_command(argv) return False def listen(self, port, hostname='127.0.0.1'): """Listen on UDP hostname:port if not running, otherwise raise a RuntimeError. """ logging.info('listening on {0}:{1}'.format(hostname, port)) # Ensure that the listener is not already listening. If it # isn't then prepare the listen socket. service_requests = False self.__listening_lock.acquire() if not self.__listening: self.__listening = True sock = socket(AF_INET, SOCK_DGRAM) sock.bind((hostname, port)) service_requests = True self.__listening_lock.release() # If the listener was already listening then raise an error. if not service_requests: raise RuntimeError('the listener is already listening') # Poll the listen socket at `_frequency` interval and if there # is any pending data then service the request. while self._listening: r, _, _ = select([sock], [], [], self._frequency) if sock in r: # Nothing is currently done with the service method's # return value, but you could e.g. log failure. self._service(*sock.recvfrom(self._dgram_size)) # Close the listen socket and exit the frame. sock.close() def listening(self): """Returns the boolean listening state of the listener.""" return self._listening def stop(self): """If the listener is listening then stop it, otherwise raise a RuntimeError. """ self.__listening_lock.acquire() if self.__listening: raise RuntimeError('the listener is not listening') self.__listening = False self.__listening_lock.release() if __name__ == '__main__': import Driver import fcntl import os.path import struct import sys IFACE = 'wlan0' PORT = 9090 # XXX: You can replace the plugin directory with a directory # more suited to your application. If it's an appliance # you might want to consider a directory in /etc, # /usr/local/share, or /var/lib. PLUGIN_DIRECTORY = os.path.abspath('/usr/local/bin/Workspace/plugins') def get_ip_address(ifname): # This code was copied from: # http://code.activestate.com/recipes/439094-get-the-ip-address-associated-with-a-network-inter/ s = socket(AF_INET, SOCK_DGRAM) address = inet_ntoa(fcntl.ioctl( s.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24]) s.close() return address # Add the plugin directory to the Python module search path. # It is added first to prevent plugins from being occluded # by library modules. sys.path.insert(0, PLUGIN_DIRECTORY) hostname, port = get_ip_address(IFACE), PORT logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) # UDPListener is instantiated with any plugin modules in # addition to the Driver module, which appears first in the # plugin list. #listener = UDPListener([Driver] + load_modules(PLUGIN_DIRECTORY)) try: listener = UDPListener([Driver] + load_modules(PLUGIN_DIRECTORY)) listener.listen(port, hostname) except KeyboardInterrupt: print('Exiting...') except Exception as err: logging.error(traceback.format_exc())