Skip to content

Instantly share code, notes, and snippets.

@spmp
Created September 16, 2021 02:53

Revisions

  1. spmp created this gist Sep 16, 2021.
    254 changes: 254 additions & 0 deletions a2dp-agent.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,254 @@
    #!/usr/bin/python3
    # SPDX-License-Identifier: LGPL-2.1-or-later

    from __future__ import absolute_import, print_function, unicode_literals

    from optparse import OptionParser
    import sys
    import signal
    import dbus
    import dbus.service
    import dbus.mainloop.glib
    try:
    from gi.repository import GLib
    except ImportError:
    import gobject as GLib
    from functools import partial

    BUS_NAME = 'org.bluez'
    DEVICE_PATH = '/org/bluez/hci0'
    AGENT_INTERFACE = 'org.bluez.Agent1'
    AGENT_PATH = "/test/agent"

    bus = None
    device_obj = None
    dev_path = None

    '''bluezutils'''

    SERVICE_NAME = "org.bluez"
    ADAPTER_INTERFACE = SERVICE_NAME + ".Adapter1"
    DEVICE_INTERFACE = SERVICE_NAME + ".Device1"

    def get_managed_objects():
    bus = dbus.SystemBus()
    manager = dbus.Interface(bus.get_object("org.bluez", "/"),
    "org.freedesktop.DBus.ObjectManager")
    return manager.GetManagedObjects()

    def find_adapter(pattern=None):
    return find_adapter_in_objects(get_managed_objects(), pattern)

    def find_adapter_in_objects(objects, pattern=None):
    bus = dbus.SystemBus()
    for path, ifaces in objects.iteritems():
    adapter = ifaces.get(ADAPTER_INTERFACE)
    if adapter is None:
    continue
    if not pattern or pattern == adapter["Address"] or \
    path.endswith(pattern):
    obj = bus.get_object(SERVICE_NAME, path)
    return dbus.Interface(obj, ADAPTER_INTERFACE)
    raise Exception("Bluetooth adapter not found")

    def find_device(device_address, adapter_pattern=None):
    return find_device_in_objects(get_managed_objects(), device_address,
    adapter_pattern)

    def find_device_in_objects(objects, device_address, adapter_pattern=None):
    bus = dbus.SystemBus()
    path_prefix = ""
    if adapter_pattern:
    adapter = find_adapter_in_objects(objects, adapter_pattern)
    path_prefix = adapter.object_path
    for path, ifaces in objects.iteritems():
    device = ifaces.get(DEVICE_INTERFACE)
    if device is None:
    continue
    if (device["Address"] == device_address and
    path.startswith(path_prefix)):
    obj = bus.get_object(SERVICE_NAME, path)
    return dbus.Interface(obj, DEVICE_INTERFACE)

    raise Exception("Bluetooth device not found")


    '''simple-agent'''

    def ask(prompt):
    try:
    return raw_input(prompt)
    except:
    return input(prompt)

    def set_trusted(path):
    props = dbus.Interface(bus.get_object("org.bluez", path),
    "org.freedesktop.DBus.Properties")
    props.Set("org.bluez.Device1", "Trusted", True)

    def dev_connect(path):
    dev = dbus.Interface(bus.get_object("org.bluez", path),
    "org.bluez.Device1")
    dev.Connect()

    class Rejected(dbus.DBusException):
    _dbus_error_name = "org.bluez.Error.Rejected"

    class Agent(dbus.service.Object):
    exit_on_release = True

    def set_exit_on_release(self, exit_on_release):
    self.exit_on_release = exit_on_release

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="", out_signature="")
    def Release(self):
    print("Release")
    if self.exit_on_release:
    mainloop.quit()

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="os", out_signature="")
    def AuthorizeService(self, device, uuid):
    print("AuthorizeService (%s, %s)" % (device, uuid))
    if uuid == "0000110d-0000-1000-8000-00805f9b34fb":
    print("Authorized A2DP Service")
    return
    print("Rejecting non-A2DP Service")
    raise Rejected("Connection rejected")

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="o", out_signature="s")
    def RequestPinCode(self, device):
    print("RequestPinCode (%s)" % (device))
    set_trusted(device)
    return ask("Enter PIN Code: ")

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="o", out_signature="u")
    def RequestPasskey(self, device):
    print("RequestPasskey (%s)" % (device))
    set_trusted(device)
    passkey = ask("Enter passkey: ")
    return dbus.UInt32(passkey)

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="ouq", out_signature="")
    def DisplayPasskey(self, device, passkey, entered):
    print("DisplayPasskey (%s, %06u entered %u)" %
    (device, passkey, entered))

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="os", out_signature="")
    def DisplayPinCode(self, device, pincode):
    print("DisplayPinCode (%s, %s)" % (device, pincode))

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="ou", out_signature="")
    def RequestConfirmation(self, device, passkey):
    print("RequestConfirmation (%s, %06d)" % (device, passkey))
    print("Auto confirming...")
    return

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="o", out_signature="")
    def RequestAuthorization(self, device):
    print("RequestAuthorization (%s)" % (device))
    auth = ask("Authorize? (yes/no): ")
    if (auth == "yes"):
    return
    raise Rejected("Pairing rejected")

    @dbus.service.method(AGENT_INTERFACE,
    in_signature="", out_signature="")
    def Cancel(self):
    print("Cancel")

    def pair_reply():
    print("Device paired")
    set_trusted(dev_path)
    dev_connect(dev_path)
    mainloop.quit()

    def pair_error(error):
    err_name = error.get_dbus_name()
    if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
    print("Timed out. Cancelling pairing")
    device_obj.CancelPairing()
    else:
    print("Creating device failed: %s" % (error))

    mainloop.quit()

    def quit(manager, mloop):
    manager.UnregisterAgent(path)
    print("Agent unregistered")

    mloop.quit()

    if __name__ == '__main__':
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    bus = dbus.SystemBus()

    capability = "DisplayYesNo"

    parser = OptionParser()
    parser.add_option("-i", "--adapter", action="store",
    type="string",
    dest="adapter_pattern",
    default=None)
    parser.add_option("-c", "--capability", action="store",
    type="string", dest="capability")
    parser.add_option("-t", "--timeout", action="store",
    type="int", dest="timeout",
    default=60000)
    (options, args) = parser.parse_args()
    if options.capability:
    capability = options.capability

    path = "/test/agent"
    agent = Agent(bus, path)

    mainloop = GLib.MainLoop()

    obj = bus.get_object(BUS_NAME, "/org/bluez");


    # Set Discoverable and Pairable to always on
    print("Setting device to 'discoverable' and 'pairable'...")
    prop = dbus.Interface(bus.get_object("org.bluez", DEVICE_PATH), "org.freedesktop.DBus.Properties")
    prop.Set("org.bluez.Adapter1", "DiscoverableTimeout", dbus.UInt32(0))
    prop.Set("org.bluez.Adapter1", "PairableTimeout", dbus.UInt32(0))
    prop.Set("org.bluez.Adapter1", "Discoverable", dbus.Boolean(True))
    prop.Set("org.bluez.Adapter1", "Pairable", dbus.Boolean(True))

    # Create the agent manager
    manager = dbus.Interface(obj, "org.bluez.AgentManager1")
    manager.RegisterAgent(path, capability)

    print("Agent registered")

    # Fix-up old style invocation (BlueZ 4)
    if len(args) > 0 and args[0].startswith("hci"):
    options.adapter_pattern = args[0]
    del args[:1]

    if len(args) > 0:
    device = find_device(args[0],
    options.adapter_pattern)
    dev_path = device.object_path
    agent.set_exit_on_release(False)
    device.Pair(reply_handler=pair_reply, error_handler=pair_error,
    timeout=60000)
    device_obj = device
    else:
    manager.RequestDefaultAgent(path)

    # Ensure that ctrl+c is cuaght properly
    ## Assign the 'quit' function to a variable
    mquit = partial(quit, manager=manager, mloop=mainloop)
    GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, mquit)

    mainloop.run()

    11 changes: 11 additions & 0 deletions bt-agent-a2dp.service
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,11 @@
    [Unit]
    Description=A2DP Bluetooth Agent
    After=bluetooth.service
    Wants=bluetooth.service

    [Service]
    ExecStart=/usr/bin/python3 -u /usr/local/bin/a2dp-agent
    SyslogIdentifier=A2DP-Agent

    [Install]
    WantedBy=bluetooth.service