#!/usr/bin/python3 # SPDX-License-Identifier: LGPL-2.1-or-later # original source: https://github.com/bluez/bluez/blob/master/test/simple-obex-agent import os import sys import dbus import dbus.service import dbus.mainloop.glib from gi.repository import GLib BUS_NAME = 'org.bluez.obex' PATH = '/org/bluez/obex' AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1' AGENT_INTERFACE = 'org.bluez.obex.Agent1' TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1' bus = None transfers = {} class Transfer: def __init__(self, watch, path, file, size=0): self.watch = watch self.path = path self.file = file self.size = size def progress(self,amount): percent = (amount / self.size) * 100 print("Progress: "+str(round(percent))+"%\r",end="") def finish(self): print("Cleaning up for "+self.file) self.watch.remove() # FIXME: obexd path is hardcoded, could be extracted from Session interface os.rename(os.path.expanduser("~/.cache/obexd/")+self.file, GLib.get_user_special_dir(GLib.UserDirectory.DIRECTORY_DOWNLOAD)+"/"+self.file) transfers.pop(self.path) def signal_handler(name,properties,ignore,path=""): if "Size" in properties: transfers[path].size = properties["Size"] if "Status" in properties: if properties["Status"] == "complete": transfers[path].finish() if "Transferred" in properties: transfers[path].progress(properties["Transferred"]) class Agent(dbus.service.Object): def __init__(self, conn=None, obj_path=None): dbus.service.Object.__init__(self, conn, obj_path) @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def AuthorizePush(self, path): transfer = dbus.Interface(bus.get_object(BUS_NAME, path), 'org.freedesktop.DBus.Properties') properties = transfer.GetAll(TRANSFER_INTERFACE) filename = properties["Name"] print("Incoming transfer: "+filename) watch = bus.add_signal_receiver(signal_handler,path=path,path_keyword="path") transfers[path] = Transfer(watch,path,filename) return filename @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): print("Authorization Canceled") if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SessionBus() manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), AGENT_MANAGER_INTERFACE) path = "/test/agent" agent = Agent(bus, path) mainloop = GLib.MainLoop() manager.RegisterAgent(path) print("Agent registered") mainloop.run()