Created
September 16, 2021 02:53
-
-
Save spmp/4be4b6c9754367cdb2da6cf60aad3da9 to your computer and use it in GitHub Desktop.
Headless A2DP - Note: Not working in Ubuntu 21.04
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# SPDX-License-Identifier: LGPL-2.1-or-later | |
from __future__ import absolute_import, print_function, unicode_literals | |
from optparse import OptionParser | |
import sys | |
import signal | |
import dbus | |
import dbus.service | |
import dbus.mainloop.glib | |
try: | |
from gi.repository import GLib | |
except ImportError: | |
import gobject as GLib | |
from functools import partial | |
BUS_NAME = 'org.bluez' | |
DEVICE_PATH = '/org/bluez/hci0' | |
AGENT_INTERFACE = 'org.bluez.Agent1' | |
AGENT_PATH = "/test/agent" | |
bus = None | |
device_obj = None | |
dev_path = None | |
'''bluezutils''' | |
SERVICE_NAME = "org.bluez" | |
ADAPTER_INTERFACE = SERVICE_NAME + ".Adapter1" | |
DEVICE_INTERFACE = SERVICE_NAME + ".Device1" | |
def get_managed_objects(): | |
bus = dbus.SystemBus() | |
manager = dbus.Interface(bus.get_object("org.bluez", "/"), | |
"org.freedesktop.DBus.ObjectManager") | |
return manager.GetManagedObjects() | |
def find_adapter(pattern=None): | |
return find_adapter_in_objects(get_managed_objects(), pattern) | |
def find_adapter_in_objects(objects, pattern=None): | |
bus = dbus.SystemBus() | |
for path, ifaces in objects.iteritems(): | |
adapter = ifaces.get(ADAPTER_INTERFACE) | |
if adapter is None: | |
continue | |
if not pattern or pattern == adapter["Address"] or \ | |
path.endswith(pattern): | |
obj = bus.get_object(SERVICE_NAME, path) | |
return dbus.Interface(obj, ADAPTER_INTERFACE) | |
raise Exception("Bluetooth adapter not found") | |
def find_device(device_address, adapter_pattern=None): | |
return find_device_in_objects(get_managed_objects(), device_address, | |
adapter_pattern) | |
def find_device_in_objects(objects, device_address, adapter_pattern=None): | |
bus = dbus.SystemBus() | |
path_prefix = "" | |
if adapter_pattern: | |
adapter = find_adapter_in_objects(objects, adapter_pattern) | |
path_prefix = adapter.object_path | |
for path, ifaces in objects.iteritems(): | |
device = ifaces.get(DEVICE_INTERFACE) | |
if device is None: | |
continue | |
if (device["Address"] == device_address and | |
path.startswith(path_prefix)): | |
obj = bus.get_object(SERVICE_NAME, path) | |
return dbus.Interface(obj, DEVICE_INTERFACE) | |
raise Exception("Bluetooth device not found") | |
'''simple-agent''' | |
def ask(prompt): | |
try: | |
return raw_input(prompt) | |
except: | |
return input(prompt) | |
def set_trusted(path): | |
props = dbus.Interface(bus.get_object("org.bluez", path), | |
"org.freedesktop.DBus.Properties") | |
props.Set("org.bluez.Device1", "Trusted", True) | |
def dev_connect(path): | |
dev = dbus.Interface(bus.get_object("org.bluez", path), | |
"org.bluez.Device1") | |
dev.Connect() | |
class Rejected(dbus.DBusException): | |
_dbus_error_name = "org.bluez.Error.Rejected" | |
class Agent(dbus.service.Object): | |
exit_on_release = True | |
def set_exit_on_release(self, exit_on_release): | |
self.exit_on_release = exit_on_release | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="", out_signature="") | |
def Release(self): | |
print("Release") | |
if self.exit_on_release: | |
mainloop.quit() | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="os", out_signature="") | |
def AuthorizeService(self, device, uuid): | |
print("AuthorizeService (%s, %s)" % (device, uuid)) | |
if uuid == "0000110d-0000-1000-8000-00805f9b34fb": | |
print("Authorized A2DP Service") | |
return | |
print("Rejecting non-A2DP Service") | |
raise Rejected("Connection rejected") | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="o", out_signature="s") | |
def RequestPinCode(self, device): | |
print("RequestPinCode (%s)" % (device)) | |
set_trusted(device) | |
return ask("Enter PIN Code: ") | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="o", out_signature="u") | |
def RequestPasskey(self, device): | |
print("RequestPasskey (%s)" % (device)) | |
set_trusted(device) | |
passkey = ask("Enter passkey: ") | |
return dbus.UInt32(passkey) | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="ouq", out_signature="") | |
def DisplayPasskey(self, device, passkey, entered): | |
print("DisplayPasskey (%s, %06u entered %u)" % | |
(device, passkey, entered)) | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="os", out_signature="") | |
def DisplayPinCode(self, device, pincode): | |
print("DisplayPinCode (%s, %s)" % (device, pincode)) | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="ou", out_signature="") | |
def RequestConfirmation(self, device, passkey): | |
print("RequestConfirmation (%s, %06d)" % (device, passkey)) | |
print("Auto confirming...") | |
return | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="o", out_signature="") | |
def RequestAuthorization(self, device): | |
print("RequestAuthorization (%s)" % (device)) | |
auth = ask("Authorize? (yes/no): ") | |
if (auth == "yes"): | |
return | |
raise Rejected("Pairing rejected") | |
@dbus.service.method(AGENT_INTERFACE, | |
in_signature="", out_signature="") | |
def Cancel(self): | |
print("Cancel") | |
def pair_reply(): | |
print("Device paired") | |
set_trusted(dev_path) | |
dev_connect(dev_path) | |
mainloop.quit() | |
def pair_error(error): | |
err_name = error.get_dbus_name() | |
if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj: | |
print("Timed out. Cancelling pairing") | |
device_obj.CancelPairing() | |
else: | |
print("Creating device failed: %s" % (error)) | |
mainloop.quit() | |
def quit(manager, mloop): | |
manager.UnregisterAgent(path) | |
print("Agent unregistered") | |
mloop.quit() | |
if __name__ == '__main__': | |
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | |
bus = dbus.SystemBus() | |
capability = "DisplayYesNo" | |
parser = OptionParser() | |
parser.add_option("-i", "--adapter", action="store", | |
type="string", | |
dest="adapter_pattern", | |
default=None) | |
parser.add_option("-c", "--capability", action="store", | |
type="string", dest="capability") | |
parser.add_option("-t", "--timeout", action="store", | |
type="int", dest="timeout", | |
default=60000) | |
(options, args) = parser.parse_args() | |
if options.capability: | |
capability = options.capability | |
path = "/test/agent" | |
agent = Agent(bus, path) | |
mainloop = GLib.MainLoop() | |
obj = bus.get_object(BUS_NAME, "/org/bluez"); | |
# Set Discoverable and Pairable to always on | |
print("Setting device to 'discoverable' and 'pairable'...") | |
prop = dbus.Interface(bus.get_object("org.bluez", DEVICE_PATH), "org.freedesktop.DBus.Properties") | |
prop.Set("org.bluez.Adapter1", "DiscoverableTimeout", dbus.UInt32(0)) | |
prop.Set("org.bluez.Adapter1", "PairableTimeout", dbus.UInt32(0)) | |
prop.Set("org.bluez.Adapter1", "Discoverable", dbus.Boolean(True)) | |
prop.Set("org.bluez.Adapter1", "Pairable", dbus.Boolean(True)) | |
# Create the agent manager | |
manager = dbus.Interface(obj, "org.bluez.AgentManager1") | |
manager.RegisterAgent(path, capability) | |
print("Agent registered") | |
# Fix-up old style invocation (BlueZ 4) | |
if len(args) > 0 and args[0].startswith("hci"): | |
options.adapter_pattern = args[0] | |
del args[:1] | |
if len(args) > 0: | |
device = find_device(args[0], | |
options.adapter_pattern) | |
dev_path = device.object_path | |
agent.set_exit_on_release(False) | |
device.Pair(reply_handler=pair_reply, error_handler=pair_error, | |
timeout=60000) | |
device_obj = device | |
else: | |
manager.RequestDefaultAgent(path) | |
# Ensure that ctrl+c is cuaght properly | |
## Assign the 'quit' function to a variable | |
mquit = partial(quit, manager=manager, mloop=mainloop) | |
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signal.SIGINT, mquit) | |
mainloop.run() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Unit] | |
Description=A2DP Bluetooth Agent | |
After=bluetooth.service | |
Wants=bluetooth.service | |
[Service] | |
ExecStart=/usr/bin/python3 -u /usr/local/bin/a2dp-agent | |
SyslogIdentifier=A2DP-Agent | |
[Install] | |
WantedBy=bluetooth.service |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment