import click import json import logging import sys import bluetooth as bt import coloredlogs from typing import * logger = logging.getLogger(__name__) coloredlogs.install(logger=logger, level=logging.DEBUG) class BLEServer: def __init__( self, uuid: str, server_name: str, port: int = 1, ): self.uuid = uuid self.server_name = server_name # Advertise the server self.port = port self.server_sock = self._advertise() # mock state self.state = [0, 0, 0, 0] def _advertise(self): logger.debug( f"Advertising with: server name: {self.server_name} " f"| port: {self.port} " f"| uuid: {self.uuid}" ) server_sock = bt.BluetoothSocket(bt.RFCOMM) server_sock.bind(("", bt.PORT_ANY)) server_sock.listen(self.port) bt.advertise_service( server_sock, self.server_name, service_id=self.uuid, service_classes=[self.uuid, bt.SERIAL_PORT_CLASS], profiles=[bt.SERIAL_PORT_PROFILE], ) return server_sock def _accept_connection(self) -> bt.BluetoothSocket: # Accept a connection client_sock, client_info = self.server_sock.accept() logger.info(f"Accepted connection from {client_info}") return client_sock def process_request(self, data: bytes): data = json.loads(data) logger.info(f"Should process: '{data}'") if data.get("cmd") == "/read": return self.state else: self.state[data["channels"][0] - 1] = data["mode"] return json.dumps({"ok": True, "state": self.state, "scheduled": [], }) def run(self): # Wait for an incoming connection _ = self.server_sock.getsockname()[self.port] client_sock = self._accept_connection() while True: try: data = client_sock.recv(1024) ret = self.process_request(data) logger.debug(f"request process returned: {ret}") client_sock.send(json.dumps(ret)) except bt.BluetoothError as be: if be.errno == 104: logger.warning(f"Connection reset by peer...") client_sock.close() # Accept a new connection client_sock = self._accept_connection() else: logger.debug(be.errno) logger.error(f"Something wrong with bluetooth: {be}") return except KeyboardInterrupt: logger.warning("\nDisconnected") return except Exception as e: logger.error(f"BT Server Unknown error: {e}") return @click.command() @click.option("--uuid", default="616d3aa1-689e-4e71-8fed-09f3c7c4ad91") @click.option("--server-name", default="RPI-BT-Server") @click.option("--port", default=1) def main(uuid: str, server_name: str, port: int): try: ble = BLEServer(uuid, server_name, port) ble.run() except bt.btcommon.BluetoothError as be: logger.error(f"Bluetooth Error: {be}") logger.error( "Try running like: " "'sudo hciconfig hci0 piscan && sudo .venv/bin/python ble_server.py'" ) except Exception as e: logger.error(f"Unknown error: {e}") if __name__ == "__main__": """ For an example for RFCOMM server from pybluez: # https://github.com/pybluez/pybluez/blob/master/examples/simple/rfcomm-server.py """ main()