#!/usr/bin/env python # -*- coding: utf-8 -*- # Based on http://www.marsohod.org/index.php/projects/plata1/205-carctrlandr import os import BaseHTTPServer from threading import Thread import urlparse import time try: import android droid = android.Android() except ImportError: def mediaPlay(path): print "PLAY", path import mock droid = mock.Mock() droid.mediaPlay = mediaPlay HOST_NAME = '' PORT_NUMBER = 9090 COMMAND_TIMEOUT = 1.0 # seconds PAGE_TEMPLATE = ''' DroidBot Remote Control ''' PAGE_TEMPLATE_A = '''

DroidBot Remote Control

" ''' PAGE_TEMPLATE_B = ''' DroidBot Remote Control
Joystick area. Press and drag (touch or mouse). Or use keyboard arrows
''' class CarControllerThread(Thread): path_to_audio = '/mnt/sdcard/droid_car/' command_to_audio = { 'up': 'up.wav', 'down': 'down.wav', 'left': 'left.wav', 'right': 'right.wav', } def __init__(self, timeout=1): super(CarControllerThread, self).__init__() self.timeout = timeout self.pool_timeout = timeout / 3.0 self.current_state = "" self.current_command = "" self.last_cmd_timestamp = time.time() def run(self): while True: elapsed = time.time() - self.last_cmd_timestamp if self.current_state == "begin" and elapsed > self.timeout: self.update_state("end", "") time.sleep(self.pool_timeout) def update_state(self, state, command): self.last_cmd_timestamp = time.time() if state == self.current_state and command == self.current_command: return self.current_state = state self.current_command = command if state == "end": print "PAUSE" droid.mediaPlaySetLooping(False) droid.mediaPlayPause() elif state == "begin": print "PLAY", command audio = self.command_to_audio.get(command) if audio: droid.mediaPlay(os.path.join(self.path_to_audio, audio)) droid.mediaPlaySetLooping(True) else: print "wrong command:", command class DroidHandler(BaseHTTPServer.BaseHTTPRequestHandler): def do_HEAD(self): self.send_response(200) self.send_header("Content-type", "text/html; charset=utf-8") self.end_headers() def do_GET(self): self.send_response(200) self.send_header("Content-type", "text/html; charset=utf-8") self.end_headers() url = urlparse.urlsplit(self.path) print url.path if url.path == '/frame_a.html': ip, port = self.headers.get('Host').split(":", 2) self.wfile.write(PAGE_TEMPLATE_A % ip) elif url.path == '/frame_b.html': self.wfile.write(PAGE_TEMPLATE_B) else: self.wfile.write(PAGE_TEMPLATE) def do_POST(self): self.send_response(200) self.send_header("Content-type", "text/plain; charset=utf-8") self.end_headers() if self.path.startswith("/command/"): try: #/command/state:up state, cmd = self.path.split("/")[-1].split(":") except ValueError: print "ERROR", self.path self.wfile.write("fail") return controller.update_state(state, cmd) self.wfile.write("ok") elif self.path.startswith("/ping"): self.wfile.write("pong") controller = CarControllerThread(COMMAND_TIMEOUT) controller.start() try: droid.wakeLockAcquireBright() droid.webcamStart(0, 10, 9091) droid.webcamAdjustQuality(0, 10) except Exception as e: print "Webcam error", e my_srv = BaseHTTPServer.HTTPServer((HOST_NAME, PORT_NUMBER), DroidHandler) print 'web server running on port %s' % PORT_NUMBER my_srv.serve_forever()