Created
September 16, 2021 02:53
Revisions
-
spmp created this gist
Sep 16, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,254 @@ #!/usr/bin/python3 # SPDX-License-Identifier: LGPL-2.1-or-later from __future__ import absolute_import, print_function, unicode_literals from optparse import OptionParser import sys import signal import dbus import dbus.service import dbus.mainloop.glib try: from gi.repository import GLib except ImportError: import gobject as GLib from functools import partial BUS_NAME = 'org.bluez' DEVICE_PATH = '/org/bluez/hci0' AGENT_INTERFACE = 'org.bluez.Agent1' AGENT_PATH = "/test/agent" bus = None device_obj = None dev_path = None '''bluezutils''' SERVICE_NAME = "org.bluez" ADAPTER_INTERFACE = SERVICE_NAME + ".Adapter1" DEVICE_INTERFACE = SERVICE_NAME + ".Device1" def get_managed_objects(): bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") return manager.GetManagedObjects() def find_adapter(pattern=None): return find_adapter_in_objects(get_managed_objects(), pattern) def find_adapter_in_objects(objects, pattern=None): bus = dbus.SystemBus() for path, ifaces in objects.iteritems(): adapter = ifaces.get(ADAPTER_INTERFACE) if adapter is None: continue if not pattern or pattern == adapter["Address"] or \ path.endswith(pattern): obj = bus.get_object(SERVICE_NAME, path) return dbus.Interface(obj, ADAPTER_INTERFACE) raise Exception("Bluetooth adapter not found") def find_device(device_address, adapter_pattern=None): return find_device_in_objects(get_managed_objects(), device_address, adapter_pattern) def find_device_in_objects(objects, device_address, adapter_pattern=None): bus = dbus.SystemBus() path_prefix = "" if adapter_pattern: adapter = find_adapter_in_objects(objects, adapter_pattern) path_prefix = adapter.object_path for path, ifaces in objects.iteritems(): device = ifaces.get(DEVICE_INTERFACE) if device is None: continue if (device["Address"] == device_address and path.startswith(path_prefix)): obj = bus.get_object(SERVICE_NAME, path) return dbus.Interface(obj, DEVICE_INTERFACE) raise Exception("Bluetooth device not found") '''simple-agent''' def ask(prompt): try: return raw_input(prompt) except: return input(prompt) def set_trusted(path): props = dbus.Interface(bus.get_object("org.bluez", path), "org.freedesktop.DBus.Properties") props.Set("org.bluez.Device1", "Trusted", True) def dev_connect(path): dev = dbus.Interface(bus.get_object("org.bluez", path), "org.bluez.Device1") dev.Connect() class Rejected(dbus.DBusException): _dbus_error_name = "org.bluez.Error.Rejected" class Agent(dbus.service.Object): exit_on_release = True def set_exit_on_release(self, exit_on_release): self.exit_on_release = exit_on_release @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Release(self): print("Release") if self.exit_on_release: mainloop.quit() @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def AuthorizeService(self, device, uuid): print("AuthorizeService (%s, %s)" % (device, uuid)) if uuid == "0000110d-0000-1000-8000-00805f9b34fb": print("Authorized A2DP Service") return print("Rejecting non-A2DP Service") raise Rejected("Connection rejected") @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s") def RequestPinCode(self, device): print("RequestPinCode (%s)" % (device)) set_trusted(device) return ask("Enter PIN Code: ") @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="u") def RequestPasskey(self, device): print("RequestPasskey (%s)" % (device)) set_trusted(device) passkey = ask("Enter passkey: ") return dbus.UInt32(passkey) @dbus.service.method(AGENT_INTERFACE, in_signature="ouq", out_signature="") def DisplayPasskey(self, device, passkey, entered): print("DisplayPasskey (%s, %06u entered %u)" % (device, passkey, entered)) @dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="") def DisplayPinCode(self, device, pincode): print("DisplayPinCode (%s, %s)" % (device, pincode)) @dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="") def RequestConfirmation(self, device, passkey): print("RequestConfirmation (%s, %06d)" % (device, passkey)) print("Auto confirming...") return @dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="") def RequestAuthorization(self, device): print("RequestAuthorization (%s)" % (device)) auth = ask("Authorize? (yes/no): ") if (auth == "yes"): return raise Rejected("Pairing rejected") @dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="") def Cancel(self): print("Cancel") def pair_reply(): print("Device paired") set_trusted(dev_path) dev_connect(dev_path) mainloop.quit() def pair_error(error): err_name = error.get_dbus_name() if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj: print("Timed out. Cancelling pairing") device_obj.CancelPairing() else: print("Creating device failed: %s" % (error)) mainloop.quit() def quit(manager, mloop): manager.UnregisterAgent(path) print("Agent unregistered") mloop.quit() if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() capability = "DisplayYesNo" parser = OptionParser() parser.add_option("-i", "--adapter", action="store", type="string", dest="adapter_pattern", default=None) parser.add_option("-c", "--capability", action="store", type="string", dest="capability") parser.add_option("-t", "--timeout", action="store", type="int", dest="timeout", default=60000) (options, args) = parser.parse_args() if options.capability: capability = options.capability path = "/test/agent" agent = Agent(bus, path) mainloop = GLib.MainLoop() obj = bus.get_object(BUS_NAME, "/org/bluez"); # Set Discoverable and Pairable to always on print("Setting device to 'discoverable' and 'pairable'...") prop = dbus.Interface(bus.get_object("org.bluez", DEVICE_PATH), "org.freedesktop.DBus.Properties") prop.Set("org.bluez.Adapter1", "DiscoverableTimeout", dbus.UInt32(0)) prop.Set("org.bluez.Adapter1", "PairableTimeout", dbus.UInt32(0)) prop.Set("org.bluez.Adapter1", "Discoverable", dbus.Boolean(True)) prop.Set("org.bluez.Adapter1", "Pairable", dbus.Boolean(True)) # Create the agent manager manager = dbus.Interface(obj, "org.bluez.AgentManager1") manager.RegisterAgent(path, capability) print("Agent registered") # Fix-up old style invocation (BlueZ 4) if len(args) > 0 and args[0].startswith("hci"): options.adapter_pattern = args[0] del args[:1] if len(args) > 0: device = find_device(args[0], options.adapter_pattern) dev_path = device.object_path agent.set_exit_on_release(False) device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000) device_obj = device else: manager.RequestDefaultAgent(path) # Ensure that ctrl+c is cuaght properly ## Assign the 'quit' function to a variable mquit = partial(quit, manager=manager, mloop=mainloop) GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, mquit) mainloop.run() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,11 @@ [Unit] Description=A2DP Bluetooth Agent After=bluetooth.service Wants=bluetooth.service [Service] ExecStart=/usr/bin/python3 -u /usr/local/bin/a2dp-agent SyslogIdentifier=A2DP-Agent [Install] WantedBy=bluetooth.service