Skip to content

Instantly share code, notes, and snippets.

@felipemarques
Last active May 13, 2026 18:35
Show Gist options
  • Select an option

  • Save felipemarques/ea96876f403fb430f93b3b46a907dc4a to your computer and use it in GitHub Desktop.

Select an option

Save felipemarques/ea96876f403fb430f93b3b46a907dc4a to your computer and use it in GitHub Desktop.
Python client to use barrier protocol

How to use:

python barrier_client.py --host 192.168.0.50 --name mac1 --invert-scroll --reconnect-interval 15 --scroll-scale 3

Configure Barrier Host

image On the barrier host (Linux, Windows, Mac) you must to add a new computer in the screen layout.
#!/usr/bin/env python3
"""
Plain-TCP secondary client compatible with Barrier **1.6** — mouse, keyboard,
clipboard, and automatic reconnection.
**What this script does today**
- **Network**: connects to ``host:port`` (default 24800), Barrier 1.6 ``Barrier``
handshake, handles ``QINF``/``CALV``/``CNOP``/``DSOP``, and sends ``CNOP`` after each
command (expected by the server).
- **Mouse**: absolute ``DMMV``, relative ``DMRM``, left/middle/right ``DMDN``/``DMUP``,
wheel ``DMWM`` (vertical and horizontal). Barrier extra buttons (4–5) are not mapped in
macOS pynput.
- **Scroll**: converts wire delta (typically 120 units per “line”) into ``pynput`` steps.
Tune intensity with ``--scroll-scale`` (default **3**); higher values scroll more per
gesture, values below 1 reduce it. ``--invert-scroll`` flips X and Y (e.g. natural
scrolling on macOS).
- **Keyboard**: ``DKDN``/``DKUP``/``DKRP`` with X11 keysyms, ``mod`` mask
(Shift/Ctrl/Alt/Meta/Super/AltGr), key repeat, ASCII and Unicode BMP. KeyIDs ``0xEFxx``
from a macOS primary are normalized to keysyms ``0xFFxx``.
- **Clipboard**: CCLP/DCLP streaming; on **macOS** uses ``pbpaste`` (to server) and
``pbcopy`` (from server). ``--no-clipboard`` disables. On other OSes remote text is
still accepted in memory without OS integration.
- **Leaving client on server** (``COUT``): pushes local clipboard to the server before
releasing focus (flush), per Barrier flow.
- **Reconnection**: after ``CBYE``, socket drop, or recoverable network errors, waits
``--reconnect-interval`` seconds (default 10) and retries; ``0`` exits on first failure.
``Ctrl+C`` exits at any time.
- **Debugging**: ``--debug`` (keys, handshake, clipboard); ``--debug-packets`` logs every
post-handshake packet in hex.
- **Screen**: ``DINF`` with primary monitor geometry (tkinter, 1920×1080 fallback) and
resend on ``QINF`` (layout update).
**Server requirements**
- **Encryption off** (this client does not use SSL).
- ``--name`` must match the Barrier layout client name.
- Use on a trusted LAN (plaintext traffic).
Protocol reference:
https://www.qemu.org/docs/master/interop/barrier.html
Source: debauchee/barrier (``protocol_types.cpp``, ``ProtocolUtil.cpp``,
``PacketStreamFilter.cpp``, ``client/ServerProxy.cpp``).
"""
from __future__ import annotations
import argparse
import errno
import logging
import socket
import struct
import subprocess
import sys
import threading
import time
from typing import Callable
from pynput.keyboard import Controller as KeyController
from pynput.keyboard import Key, KeyCode
from pynput.mouse import Button, Controller as MouseController
# Protocol version negotiated by official Barrier 2.x client (protocol_types.h).
PROTOCOL_MAJOR = 1
PROTOCOL_MINOR = 6
MAX_PACKET = 4 * 1024 * 1024
# Barrier key_types.h — KeyModifierMask (key bits; Caps/Num/Scroll are locks)
MOD_SHIFT = 0x0001
MOD_CONTROL = 0x0002
MOD_ALT = 0x0004
MOD_META = 0x0008
MOD_SUPER = 0x0010
MOD_ALTGR = 0x0020
MOD_SYNTH_MASK = MOD_SHIFT | MOD_CONTROL | MOD_ALT | MOD_META | MOD_SUPER | MOD_ALTGR
# Press/release order (releases in reverse order during sync)
_MOD_BIT_TO_KEY: list[tuple[int, Key]] = [
(MOD_SHIFT, Key.shift),
(MOD_CONTROL, Key.ctrl),
(MOD_ALT, Key.alt),
(MOD_META, Key.cmd),
(MOD_SUPER, Key.cmd_r),
(MOD_ALTGR, Key.alt),
]
# Normalized X11 keysyms for modifier keys -> Barrier bit
KEYSYM_TO_MOD_BIT: dict[int, int] = {
0xFFE1: MOD_SHIFT,
0xFFE2: MOD_SHIFT,
0xFFE3: MOD_CONTROL,
0xFFE4: MOD_CONTROL,
0xFFE9: MOD_ALT,
0xFFEA: MOD_ALT,
0xFFE7: MOD_META,
0xFFE8: MOD_META,
}
# Common X11 keysyms -> pynput keys.
# Note: wire KeyID is unsigned 16-bit; reading as int16 breaks keysyms ≥0x8000
# (arrows, home, F-keys, etc.).
KEYSYM_TO_KEY: dict[int, Key | KeyCode] = {
0xFF08: Key.backspace,
0xFF09: Key.tab,
0xFF0D: Key.enter,
0xFF1B: Key.esc,
0xFFFF: Key.delete,
0xFF50: Key.home,
0xFF57: Key.end,
0xFF55: Key.page_up,
0xFF56: Key.page_down,
0xFF51: Key.left,
0xFF52: Key.up,
0xFF53: Key.right,
0xFF54: Key.down,
# Keypad (XK_KP_*), same effect as main keys.
0xFF95: Key.home,
0xFF96: Key.left,
0xFF97: Key.up,
0xFF98: Key.right,
0xFF99: Key.down,
0xFF9A: Key.page_up,
0xFF9B: Key.page_down,
0xFF9C: Key.end,
0xFF9F: Key.delete,
0xFFBE: Key.f1,
0xFFBF: Key.f2,
0xFFC0: Key.f3,
0xFFC1: Key.f4,
0xFFC2: Key.f5,
0xFFC3: Key.f6,
0xFFC4: Key.f7,
0xFFC5: Key.f8,
0xFFC6: Key.f9,
0xFFC7: Key.f10,
0xFFC8: Key.f11,
0xFFC9: Key.f12,
0xFFE1: Key.shift,
0xFFE2: Key.shift_r,
0xFFE3: Key.ctrl,
0xFFE4: Key.ctrl_r,
0xFFE9: Key.alt,
0xFFEA: Key.alt_r,
0xFFE7: Key.cmd,
0xFFE8: Key.cmd_r,
# Keypad (XK_KP_*)
0xFF8D: Key.enter,
0xFFAE: KeyCode.from_char("."),
}
for _kp_digit in range(10):
KEYSYM_TO_KEY[0xFFB0 + _kp_digit] = KeyCode.from_char(str(_kp_digit))
# KeyIDs sometimes arrive as ASCII control codes (e.g. Enter = 0x0D)
# instead of the X11 keysym (e.g. XK_Return = 0xFF0D), depending on the server OS.
ASCII_CONTROL_TO_KEY: dict[int, Key] = {
0x08: Key.backspace,
0x09: Key.tab,
0x0A: Key.enter,
0x0D: Key.enter,
0x1B: Key.esc,
0x7F: Key.delete,
}
# Human-readable names for logs only (subset of keysyms above + controls).
KEYSYM_LOG_NAMES: dict[int, str] = {
0xFF08: "BackSpace",
0xFF09: "Tab",
0xFF0D: "Return",
0xFF1B: "Escape",
0xFFFF: "Delete",
0xFF50: "Home",
0xFF57: "End",
0xFF55: "Page_Up",
0xFF56: "Page_Down",
0xFF51: "Left",
0xFF52: "Up",
0xFF53: "Right",
0xFF54: "Down",
0x08: "BS(ascii)",
0x09: "Tab(ascii)",
0x0A: "LF(ascii)",
0x0D: "CR/Enter(ascii)",
0x1B: "Esc(ascii)",
0x7F: "DEL(ascii)",
}
LOG = logging.getLogger(__name__)
# Barrier ButtonID: 1=left, 2=middle, 3=right, 4–5=extra (macOS pynput only exposes
# left/middle/right; extras are ignored).
BARRIER_BUTTON_TO_PYNPUT: dict[int, Button] = {
1: Button.left,
2: Button.middle,
3: Button.right,
}
def recv_exact(sock: socket.socket, n: int) -> bytes:
chunks: list[bytes] = []
remaining = n
while remaining > 0:
part = sock.recv(remaining)
if not part:
raise EOFError("connection closed by server")
chunks.append(part)
remaining -= len(part)
return b"".join(chunks)
def read_i16_be(buf: bytes, off: int) -> tuple[int, int]:
return struct.unpack_from(">h", buf, off)[0], off + 2
def read_u16_be(buf: bytes, off: int) -> tuple[int, int]:
"""Big-endian uint16 (use for KeyID/X11 keysym: values ≥0x8000)."""
return struct.unpack_from(">H", buf, off)[0], off + 2
def read_i32_be(buf: bytes, off: int) -> tuple[int, int]:
return struct.unpack_from(">i", buf, off)[0], off + 4
def read_u32_be(buf: bytes, off: int) -> tuple[int, int]:
return struct.unpack_from(">I", buf, off)[0], off + 4
def pack_i16_be(v: int) -> bytes:
return struct.pack(">h", v)
def pack_u32_be(v: int) -> bytes:
return struct.pack(">I", v & 0xFFFFFFFF)
def send_packet(sock: socket.socket, payload: bytes) -> None:
sock.sendall(pack_u32_be(len(payload)) + payload)
def recv_packet(sock: socket.socket) -> bytes:
hdr = recv_exact(sock, 4)
size = struct.unpack(">I", hdr)[0]
if size > MAX_PACKET:
raise ValueError(f"packet too large ({size} bytes)")
return recv_exact(sock, size)
def normalize_barrier_keysym(keysym: int) -> int:
"""Map wire KeyID to the X11 keysym used in lookup tables.
Barrier on macOS (as primary) often sends special keys as
``keysym_x11 - 0x1000`` (range 0xEF00–0xEFFF), e.g.::
0xEF0D + 0x1000 = 0xFF0D (Return)
0xEF51 + 0x1000 = 0xFF51 (Left)
0xEFFF + 0x1000 = 0xFFFF (Delete)
Plain ASCII letters (e.g. 0x0061) are unchanged.
"""
if (keysym & 0xFF00) == 0xEF00:
return (keysym + 0x1000) & 0xFFFF
return keysym
def keysym_to_keycode(keysym: int) -> Key | KeyCode | None:
k = normalize_barrier_keysym(keysym)
if k in KEYSYM_TO_KEY:
return KEYSYM_TO_KEY[k]
if k in ASCII_CONTROL_TO_KEY:
return ASCII_CONTROL_TO_KEY[k]
# Printable ASCII
if 0x20 <= k <= 0x7E:
return KeyCode.from_char(chr(k))
# Unicode BMP (e.g. ç = U+00E7) — UTF-32 KeyID truncated to 16 bits on the wire
if 0x80 <= k < 0xFF00:
try:
ch = chr(k)
except (ValueError, OverflowError):
return None
if ch.isprintable():
return KeyCode.from_char(ch)
return None
return None
def keysym_label(keysym: int) -> str:
k = normalize_barrier_keysym(keysym)
if k in KEYSYM_LOG_NAMES:
return KEYSYM_LOG_NAMES[k]
if 0x20 <= k <= 0x7E:
return repr(chr(k))
if 0x80 <= k < 0xFF00:
try:
ch = chr(k)
if ch.isprintable():
return repr(ch)
except (ValueError, OverflowError):
pass
return f"U+{k:04X}"
def payload_hex_preview(data: bytes, max_len: int = 48) -> str:
if len(data) <= max_len:
return data.hex()
return data[:max_len].hex() + "..."
# --- Clipboard (ClipboardChunk.cpp, IClipboard.cpp, clipboard_types.h) ---
CLIPBOARD_ID_PRIMARY = 0
CLIPBOARD_CHUNK = 32 * 1024
CLIPBOARD_MARSHALL_MAX = 4 * 1024 * 1024
CB_MARK_START = 1
CB_MARK_CHUNK = 2
CB_MARK_END = 3
FORMAT_TEXT = 0
def pack_i8(v: int) -> bytes:
return bytes([v & 0xFF])
def pack_barrier_string(payload: bytes) -> bytes:
"""Barrier string on wire: uint32 BE (length) + bytes."""
return pack_u32_be(len(payload)) + payload
def marshall_clipboard_text(text: str) -> bytes:
"""IClipboard::marshall with kText format only (UTF-8, LF)."""
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
raw = normalized.encode("utf-8")
return (
pack_u32_be(1)
+ pack_u32_be(FORMAT_TEXT)
+ pack_u32_be(len(raw))
+ raw
)
def unmarshall_clipboard_text(blob: bytes) -> str | None:
"""Extract the first kText block from Barrier unmarshalling."""
if len(blob) < 4:
return None
off = 0
nforms, off = read_u32_be(blob, off)
if nforms == 0 or nforms > 64:
return None
for _ in range(nforms):
if off + 8 > len(blob):
return None
fmt, off = read_u32_be(blob, off)
sz, off = read_u32_be(blob, off)
if sz > len(blob) or off + sz > len(blob):
return None
chunk = blob[off : off + sz]
off += sz
if fmt == FORMAT_TEXT:
return chunk.decode("utf-8", errors="replace")
return None
def parse_dclp_payload(payload: bytes) -> tuple[int, int, int, bytes] | None:
"""Parse a full DCLP packet (payload includes the 'DCLP' prefix)."""
if len(payload) < 4 + 1 + 4 + 1 + 4:
return None
off = 4
cb_id = payload[off]
off += 1
seq, off = read_u32_be(payload, off)
mark = payload[off]
off += 1
slen, off = read_u32_be(payload, off)
if off + slen > len(payload):
return None
data = payload[off : off + slen]
return cb_id, seq, mark, data
class BarrierClient:
def __init__(
self,
host: str,
port: int,
name: str,
*,
invert_scroll: bool = False,
scroll_scale: float = 3.0,
debug: bool = False,
debug_packets: bool = False,
clipboard: bool = True,
) -> None:
self._host = host
self._port = port
self._name = name.encode("utf-8")
self._invert_scroll = invert_scroll
self._scroll_scale = scroll_scale
self._debug = debug
self._debug_packets = debug_packets
self._clipboard_enabled = clipboard
self._sock: socket.socket | None = None
self._send_lock = threading.Lock()
self._mouse = MouseController()
self._keyboard = KeyController()
self._handshake_done = False
self._ignore_mouse = False
self._seq = 0
self._screen_shape: Callable[[], tuple[int, int, int, int]] | None = None
self._cursor_pos: Callable[[], tuple[int, int]] | None = None
# Modifiers: refcount per bit (DKDN/DKUP for Shift/Ctrl/…) + mask applied in pynput
self._mod_bit_ref: dict[int, int] = {bit: 0 for bit, _ in _MOD_BIT_TO_KEY}
self._applied_mod_mask: int = 0
self._clipboard_stop = threading.Event()
self._clipboard_thread: threading.Thread | None = None
self._clipboard_last_seen = ""
self._clipboard_last_pushed = ""
self._clipboard_remote_ts = 0.0
self._cb_buf = bytearray()
self._cb_expect: int | None = None
self._clipboard_last_sent_to_server: str | None = None
self._clipboard_last_remote_text: str = ""
def _scroll_steps_from_barrier_delta(self, delta: int) -> int:
"""Convert DMWM delta (typically multiples of 120 per line) to pynput scroll steps."""
if delta == 0:
return 0
# Scale amplifies |delta|/120 (e.g. trackpads with few ticks per event).
lines = abs(delta) / 120.0
n = int(round(lines * self._scroll_scale))
n = max(1, n)
return min(n, 1000)
def _guard_send(self, body: bytes) -> None:
assert self._sock is not None
with self._send_lock:
send_packet(self._sock, body)
def _darwin_clipboard_ok(self) -> bool:
return sys.platform == "darwin"
def _forced_modifier_bits(self) -> int:
m = 0
for bit, n in self._mod_bit_ref.items():
if n > 0:
m |= bit
return m
def _sync_modifier_mask_to(self, target: int) -> None:
"""Sync pynput modifier keys to mask ``target`` (MOD_* bits only)."""
target &= MOD_SYNTH_MASK
cur = self._applied_mod_mask
if cur == target:
return
for bit, pkey in reversed(_MOD_BIT_TO_KEY):
if (cur & bit) and not (target & bit):
try:
self._keyboard.release(pkey)
except Exception:
if self._debug:
LOG.exception("modifier release bit=0x%x key=%r", bit, pkey)
cur &= ~bit
for bit, pkey in _MOD_BIT_TO_KEY:
if (target & bit) and not (cur & bit):
try:
self._keyboard.press(pkey)
except Exception:
if self._debug:
LOG.exception("modifier press bit=0x%x key=%r", bit, pkey)
cur |= bit
self._applied_mod_mask = cur
def _modifier_keysym_bit(self, keysym_norm: int) -> int | None:
return KEYSYM_TO_MOD_BIT.get(keysym_norm)
def connect(self) -> None:
# New TCP session (or reconnect): handshake and partial clipboard state must not
# leak from the previous session.
self._handshake_done = False
self._ignore_mouse = False
self._cb_buf.clear()
self._cb_expect = None
if self._sock is not None:
try:
self._sock.close()
except OSError:
pass
self._sock = None
s = socket.create_connection((self._host, self._port), timeout=30)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._sock = s
self._do_hello()
def close(self) -> None:
self._clipboard_stop.set()
if self._clipboard_thread is not None:
self._clipboard_thread.join(timeout=2.0)
self._clipboard_thread = None
if self._sock:
try:
self._sock.close()
except OSError:
pass
self._sock = None
def _do_hello(self) -> None:
assert self._sock is not None
payload = recv_packet(self._sock)
if not payload.startswith(b"Barrier") or len(payload) < 7 + 4:
raise RuntimeError("invalid server greeting (expected 'Barrier')")
off = 7
major, off = read_i16_be(payload, off)
minor, off = read_i16_be(payload, off)
if major < PROTOCOL_MAJOR or (
major == PROTOCOL_MAJOR and minor < PROTOCOL_MINOR
):
raise RuntimeError(
f"incompatible server protocol ({major}.{minor}); "
f"this client requires >= {PROTOCOL_MAJOR}.{PROTOCOL_MINOR}"
)
if self._debug:
LOG.debug(
"server hello protocol major=%s minor=%s (required >= %s.%s)",
major,
minor,
PROTOCOL_MAJOR,
PROTOCOL_MINOR,
)
body = (
b"Barrier"
+ pack_i16_be(PROTOCOL_MAJOR)
+ pack_i16_be(PROTOCOL_MINOR)
+ pack_u32_be(len(self._name))
+ self._name
)
self._guard_send(body)
if self._debug:
LOG.debug(
"client hello sent version %s.%s name=%r",
PROTOCOL_MAJOR,
PROTOCOL_MINOR,
self._name.decode("utf-8", errors="replace"),
)
def _send_dinf(
self,
x: int,
y: int,
w: int,
h: int,
mx: int,
my: int,
) -> None:
assert self._sock is not None
body = (
b"DINF"
+ pack_i16_be(x)
+ pack_i16_be(y)
+ pack_i16_be(w)
+ pack_i16_be(h)
+ pack_i16_be(0)
+ pack_i16_be(mx)
+ pack_i16_be(my)
)
self._guard_send(body)
def _send_calv(self) -> None:
assert self._sock is not None
self._guard_send(b"CALV")
def _send_cnop(self) -> None:
assert self._sock is not None
self._guard_send(b"CNOP")
def _pack_dclp(self, cb_id: int, seq: int, mark: int, chunk: bytes) -> bytes:
return (
b"DCLP"
+ pack_i8(cb_id)
+ pack_u32_be(seq & 0xFFFFFFFF)
+ pack_i8(mark)
+ pack_barrier_string(chunk)
)
def _pbpaste_text(self) -> str:
try:
r = subprocess.run(
["pbpaste"],
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.SubprocessError) as e:
if self._debug:
LOG.debug("pbpaste failed: %s", e)
return self._clipboard_last_seen
if r.returncode != 0:
return self._clipboard_last_seen
return r.stdout.decode("utf-8", errors="replace")
def _pbcopy_text(self, text: str) -> None:
try:
subprocess.run(
["pbcopy"],
input=text.encode("utf-8"),
capture_output=True,
timeout=5,
check=False,
)
except (OSError, subprocess.SubprocessError) as e:
LOG.warning("pbcopy failed: %s", e)
def _start_clipboard_thread(self) -> None:
if not self._clipboard_enabled:
return
if not self._darwin_clipboard_ok():
LOG.warning(
"Barrier clipboard: pbpaste/pbcopy integration is macOS-only; "
"clipboard disabled on this platform"
)
return
if self._clipboard_thread is not None:
return
self._clipboard_stop.clear()
def loop() -> None:
while not self._clipboard_stop.is_set():
self._clipboard_stop.wait(0.35)
if self._clipboard_stop.is_set():
break
if not self._handshake_done or self._sock is None:
continue
try:
self._maybe_push_local_clipboard()
except Exception:
LOG.exception("clipboard: local poll")
t = threading.Thread(
target=loop,
name="barrier-clipboard",
daemon=True,
)
self._clipboard_thread = t
t.start()
if self._debug:
LOG.debug("clipboard thread started")
def _maybe_push_local_clipboard(self) -> None:
if not self._clipboard_enabled or not self._darwin_clipboard_ok():
return
text = self._pbpaste_text()
if text == self._clipboard_last_seen:
return
if (
time.time() - self._clipboard_remote_ts < 0.35
and text == self._clipboard_last_remote_text
):
self._clipboard_last_seen = text
return
self._clipboard_last_seen = text
if text == self._clipboard_last_sent_to_server:
return
blob = marshall_clipboard_text(text)
if len(blob) > CLIPBOARD_MARSHALL_MAX:
LOG.warning(
"local clipboard too large (%s bytes marshalled); not sent",
len(blob),
)
return
self._send_clipboard_to_server(blob)
self._clipboard_last_sent_to_server = text
self._clipboard_last_pushed = text
if self._debug:
LOG.debug(
"clipboard sent to server (%s chars, marshalled=%s bytes)",
len(text),
len(blob),
)
def _flush_clipboard_on_leave(self) -> None:
if not self._clipboard_enabled or not self._darwin_clipboard_ok():
return
text = self._pbpaste_text()
if text == self._clipboard_last_sent_to_server:
return
blob = marshall_clipboard_text(text)
if len(blob) > CLIPBOARD_MARSHALL_MAX:
return
self._send_clipboard_to_server(blob)
self._clipboard_last_sent_to_server = text
self._clipboard_last_seen = text
self._clipboard_last_pushed = text
def _send_clipboard_to_server(self, marshalled: bytes) -> None:
if self._sock is None:
return
seq = self._seq & 0xFFFFFFFF
self._guard_send(b"CCLP" + pack_i8(CLIPBOARD_ID_PRIMARY) + pack_u32_be(seq))
size_ascii = str(len(marshalled)).encode("ascii")
self._guard_send(
self._pack_dclp(CLIPBOARD_ID_PRIMARY, seq, CB_MARK_START, size_ascii)
)
raw = bytes(marshalled)
off = 0
while off < len(raw):
piece = raw[off : off + CLIPBOARD_CHUNK]
off += len(piece)
self._guard_send(
self._pack_dclp(CLIPBOARD_ID_PRIMARY, seq, CB_MARK_CHUNK, piece)
)
self._guard_send(
self._pack_dclp(CLIPBOARD_ID_PRIMARY, seq, CB_MARK_END, b"")
)
def _handle_dclp(self, payload: bytes) -> None:
parsed = parse_dclp_payload(payload)
if parsed is None:
if self._debug:
LOG.warning("DCLP parse failed len=%s", len(payload))
return
_cb_id, _seq, mark, data = parsed
if mark == CB_MARK_START:
self._cb_buf.clear()
try:
n = int(data.decode("ascii"))
except ValueError:
self._cb_expect = None
return
if n < 0 or n > CLIPBOARD_MARSHALL_MAX:
if self._debug:
LOG.warning("DCLP START invalid size: %s", n)
self._cb_expect = None
return
self._cb_expect = n
elif mark == CB_MARK_CHUNK:
if self._cb_expect is None:
if self._debug:
LOG.debug("DCLP CHUNK without START; ignored")
return
self._cb_buf.extend(data)
if len(self._cb_buf) > self._cb_expect:
if self._debug:
LOG.warning(
"DCLP data exceeds announced size (%s > %s)",
len(self._cb_buf),
self._cb_expect,
)
self._cb_buf.clear()
self._cb_expect = None
elif mark == CB_MARK_END:
if self._cb_expect is None:
if self._debug:
LOG.debug("DCLP END without START; ignored")
return
if self._cb_expect is None or len(self._cb_buf) != self._cb_expect:
if self._debug:
LOG.warning(
"DCLP END invalid assembly expect=%s actual=%s",
self._cb_expect,
len(self._cb_buf),
)
self._cb_buf.clear()
self._cb_expect = None
return
blob = bytes(self._cb_buf)
self._cb_buf.clear()
self._cb_expect = None
text = unmarshall_clipboard_text(blob)
if text is None:
return
if not self._clipboard_enabled:
return
if self._darwin_clipboard_ok():
self._pbcopy_text(text)
self._clipboard_last_remote_text = text
self._clipboard_remote_ts = time.time()
self._clipboard_last_seen = text
if self._debug:
if self._darwin_clipboard_ok():
LOG.debug(
"remote clipboard applied (%s chars)",
len(text),
)
else:
LOG.debug(
"remote clipboard received (%s chars); pbcopy unavailable on this OS",
len(text),
)
elif self._debug:
LOG.warning("DCLP unknown mark: %s", mark)
def run_forever(
self,
screen_shape: Callable[[], tuple[int, int, int, int]],
cursor_pos: Callable[[], tuple[int, int]],
) -> None:
assert self._sock is not None
self._screen_shape = screen_shape
self._cursor_pos = cursor_pos
sock = self._sock
while True:
payload = recv_packet(sock)
if len(payload) < 4:
raise RuntimeError("packet too short")
cmd = payload[:4]
if not self._handshake_done:
self._handle_handshake(cmd, payload, screen_shape, cursor_pos)
continue
if self._debug_packets:
LOG.debug(
"pkt cmd=%r len=%s hex=%s",
cmd.decode("ascii", errors="replace"),
len(payload),
payload_hex_preview(payload),
)
self._handle_command(cmd, payload)
self._send_cnop()
def _handle_handshake(
self,
cmd: bytes,
payload: bytes,
screen_shape: Callable[[], tuple[int, int, int, int]],
cursor_pos: Callable[[], tuple[int, int]],
) -> None:
assert self._sock is not None
if cmd == b"QINF":
sx, sy, sw, sh = screen_shape()
mx, my = cursor_pos()
if self._debug:
LOG.debug(
"handshake QINF -> DINF shape=(%s,%s) %sx%s cursor=(%s,%s)",
sx,
sy,
sw,
sh,
mx,
my,
)
self._send_dinf(sx, sy, sw, sh, mx, my)
elif cmd == b"CIAK":
self._ignore_mouse = False
elif cmd == b"CROP":
pass
elif cmd == b"CALV":
self._send_calv()
elif cmd == b"CNOP":
pass
elif cmd == b"DSOP":
if len(payload) < 8:
raise RuntimeError("DSOP truncated")
n_u32, _ = read_u32_be(payload, 4)
if n_u32 > 262144:
raise ValueError("DSOP option list too large")
need = 8 + n_u32 * 4
if len(payload) < need:
raise RuntimeError(
f"DSOP truncated: {len(payload)} bytes (need at least {need})"
)
if self._debug:
LOG.debug(
"handshake DSOP options=%s bytes payload hex=%s",
n_u32,
payload_hex_preview(payload, 96),
)
self._handshake_done = True
self._start_clipboard_thread()
elif cmd == b"CBYE":
raise EOFError("server closed connection (CBYE)")
elif cmd == b"EUNK":
raise RuntimeError(
f'the server rejected the client name "{self._name.decode(errors="replace")}". '
"Check the Barrier layout client name."
)
elif cmd == b"EBSY":
raise RuntimeError("client name already in use on server (EBSY)")
elif cmd == b"EICV":
raise RuntimeError("incompatible protocol version (EICV)")
elif cmd == b"EBAD":
raise RuntimeError("invalid protocol according to server (EBAD)")
else:
raise RuntimeError(f"unexpected handshake command: {cmd!r}")
def _handle_command(
self,
cmd: bytes,
payload: bytes,
) -> None:
assert self._sock is not None
if cmd == b"CALV":
self._send_calv()
elif cmd == b"CNOP":
pass
elif cmd == b"CBYE":
raise EOFError("server closed connection (CBYE)")
elif cmd == b"QINF":
# Layout update — resend DINF.
self._ignore_mouse = True
self._send_dinf_from_screen()
elif cmd == b"CIAK":
self._ignore_mouse = False
elif cmd == b"CROP":
pass
elif cmd == b"DSOP":
pass # options ignored in this minimal client
elif cmd == b"CINN" and len(payload) >= 4 + 2 + 2 + 4 + 2:
off = 4
_x, off = read_i16_be(payload, off)
_y, off = read_i16_be(payload, off)
self._seq, off = read_u32_be(payload, off)
_mod, off = read_i16_be(payload, off)
elif cmd == b"COUT":
self._flush_clipboard_on_leave()
elif cmd == b"DCLP":
self._handle_dclp(payload)
elif cmd == b"CCLP" and len(payload) >= 4 + 1 + 4:
off = 4
cb_id = payload[off]
off += 1
srv_seq, _ = read_u32_be(payload, off)
if self._debug:
LOG.debug("CCLP received id=%s seq=%s", cb_id, srv_seq)
elif cmd in (b"CSEC", b"DFTR", b"DDRG"):
pass
elif cmd == b"DMMV" and len(payload) >= 4 + 2 + 2:
if not self._ignore_mouse:
off = 4
x, off = read_i16_be(payload, off)
y, off = read_i16_be(payload, off)
self._mouse.position = (int(x), int(y))
elif cmd == b"DMRM" and len(payload) >= 4 + 2 + 2:
off = 4
dx, off = read_i16_be(payload, off)
dy, off = read_i16_be(payload, off)
cx, cy = self._mouse.position
self._mouse.position = (int(cx + dx), int(cy + dy))
elif cmd == b"DMDN" and len(payload) >= 4 + 1:
btn = payload[4]
b = BARRIER_BUTTON_TO_PYNPUT.get(btn)
if b is not None:
self._mouse.press(b)
elif cmd == b"DMUP" and len(payload) >= 4 + 1:
btn = payload[4]
b = BARRIER_BUTTON_TO_PYNPUT.get(btn)
if b is not None:
self._mouse.release(b)
elif cmd == b"DMWM":
off = 4
if len(payload) == 4 + 2:
y, off = read_i16_be(payload, off)
x = 0
elif len(payload) >= 4 + 2 + 2:
x, off = read_i16_be(payload, off)
y, off = read_i16_be(payload, off)
else:
return
if self._invert_scroll:
x = -x
y = -y
if y:
clicks = self._scroll_steps_from_barrier_delta(y)
if y > 0:
self._mouse.scroll(0, -clicks)
else:
self._mouse.scroll(0, clicks)
if x:
clicks_x = self._scroll_steps_from_barrier_delta(x)
if x > 0:
self._mouse.scroll(clicks_x, 0)
else:
self._mouse.scroll(-clicks_x, 0)
elif cmd == b"DKDN":
self._key_down(payload)
elif cmd == b"DKUP":
self._key_up(payload)
elif cmd == b"DKRP":
self._key_repeat(payload)
elif cmd == b"EBAD":
raise RuntimeError("server reported protocol error (EBAD)")
else:
pass
def _send_dinf_from_screen(self) -> None:
if self._screen_shape is None:
sx, sy, sw, sh = default_screen_shape()
else:
sx, sy, sw, sh = self._screen_shape()
if self._cursor_pos is not None:
mx, my = self._cursor_pos()
else:
mx, my = self._mouse.position
self._send_dinf(sx, sy, sw, sh, int(mx), int(my))
def _key_down(self, payload: bytes) -> None:
off = 4
if len(payload) < 8:
if self._debug:
LOG.warning(
"DKDN short payload: len=%s hex=%s",
len(payload),
payload_hex_preview(payload),
)
return
keysym, off = read_u16_be(payload, off)
mod, off = read_u16_be(payload, off)
keybutton: int | None = None
if len(payload) - off >= 2:
keybutton, off = read_u16_be(payload, off)
key = keysym_to_keycode(keysym)
if self._debug:
i16_wrong = struct.unpack_from(">h", payload, 4)[0]
norm = normalize_barrier_keysym(keysym)
norm_hex = f" norm=0x{norm:04X}" if norm != keysym else ""
LOG.debug(
"DKDN len=%s hex=%s wire=0x%04X%s (%s) mod=0x%04X keybutton=%s "
"-> pynput=%r (int16_err=%s)",
len(payload),
payload_hex_preview(payload),
keysym,
norm_hex,
keysym_label(keysym),
mod,
keybutton,
key,
i16_wrong,
)
if key is None:
if self._debug:
LOG.debug(
"DKDN no mapping: wire=0x%04X norm=0x%04X (%s)",
keysym,
normalize_barrier_keysym(keysym),
keysym_label(keysym),
)
return
assert key is not None
k_norm = normalize_barrier_keysym(keysym)
mbit = self._modifier_keysym_bit(k_norm)
if mbit is not None:
self._mod_bit_ref[mbit] = self._mod_bit_ref.get(mbit, 0) + 1
if self._mod_bit_ref[mbit] == 1:
self._applied_mod_mask |= mbit
try:
self._keyboard.press(key)
except Exception:
LOG.exception("DKDN modifier press keysym=0x%04X key=%r", keysym, key)
return
target = (mod & MOD_SYNTH_MASK) | self._forced_modifier_bits()
self._sync_modifier_mask_to(target)
try:
self._keyboard.press(key)
except Exception:
LOG.exception("DKDN press failed keysym=0x%04X key=%r", keysym, key)
def _key_up(self, payload: bytes) -> None:
off = 4
if len(payload) < 8:
if self._debug:
LOG.warning(
"DKUP short payload: len=%s hex=%s",
len(payload),
payload_hex_preview(payload),
)
return
keysym, off = read_u16_be(payload, off)
mod, off = read_u16_be(payload, off)
keybutton: int | None = None
if len(payload) - off >= 2:
keybutton, off = read_u16_be(payload, off)
key = keysym_to_keycode(keysym)
if self._debug:
norm = normalize_barrier_keysym(keysym)
norm_hex = f" norm=0x{norm:04X}" if norm != keysym else ""
LOG.debug(
"DKUP len=%s hex=%s wire=0x%04X%s (%s) mod=0x%04X keybutton=%s "
"-> pynput=%r",
len(payload),
payload_hex_preview(payload),
keysym,
norm_hex,
keysym_label(keysym),
mod,
keybutton,
key,
)
if key is None:
if self._debug:
LOG.debug(
"DKUP unmapped keysym: wire=0x%04X norm=0x%04X (%s)",
keysym,
normalize_barrier_keysym(keysym),
keysym_label(keysym),
)
return
assert key is not None
k_norm = normalize_barrier_keysym(keysym)
mbit = self._modifier_keysym_bit(k_norm)
if mbit is not None:
n = self._mod_bit_ref.get(mbit, 0)
if n <= 0:
if self._debug:
LOG.warning(
"DKUP modifier refcount underflow (keysym=0x%04X bit=0x%x)",
keysym,
mbit,
)
return
self._mod_bit_ref[mbit] = n - 1
if self._mod_bit_ref[mbit] == 0:
self._applied_mod_mask &= ~mbit
try:
self._keyboard.release(key)
except Exception:
LOG.exception("DKUP modifier release keysym=0x%04X key=%r", keysym, key)
return
try:
self._keyboard.release(key)
except Exception:
LOG.exception("DKUP release failed keysym=0x%04X key=%r", keysym, key)
target = (mod & MOD_SYNTH_MASK) | self._forced_modifier_bits()
self._sync_modifier_mask_to(target)
def _key_repeat(self, payload: bytes) -> None:
off = 4
if len(payload) < 10:
if self._debug:
LOG.warning(
"DKRP short payload: len=%s hex=%s",
len(payload),
payload_hex_preview(payload),
)
return
keysym, off = read_u16_be(payload, off)
mod, off = read_u16_be(payload, off)
repeat, off = read_u16_be(payload, off)
keybutton: int | None = None
if len(payload) - off >= 2:
keybutton, off = read_u16_be(payload, off)
key = keysym_to_keycode(keysym)
if self._debug:
norm = normalize_barrier_keysym(keysym)
norm_hex = f" norm=0x{norm:04X}" if norm != keysym else ""
LOG.debug(
"DKRP len=%s hex=%s wire=0x%04X%s (%s) mod=0x%04X repeat=%s "
"keybutton=%s -> pynput=%r",
len(payload),
payload_hex_preview(payload),
keysym,
norm_hex,
keysym_label(keysym),
mod,
repeat,
keybutton,
key,
)
if key is None:
if self._debug:
LOG.debug(
"DKRP ignored (no mapping) wire=0x%04X norm=0x%04X",
keysym,
normalize_barrier_keysym(keysym),
)
return
k_norm = normalize_barrier_keysym(keysym)
mbit = self._modifier_keysym_bit(k_norm)
if mbit is not None:
for _ in range(max(1, repeat)):
try:
self._keyboard.press(key)
self._keyboard.release(key)
except Exception:
LOG.exception("DKRP modifier keysym=0x%04X key=%r", keysym, key)
return
target = (mod & MOD_SYNTH_MASK) | self._forced_modifier_bits()
self._sync_modifier_mask_to(target)
for _ in range(max(1, repeat)):
try:
self._keyboard.press(key)
self._keyboard.release(key)
except Exception:
LOG.exception("DKRP repeat cycle failed keysym=0x%04X key=%r", keysym, key)
self._sync_modifier_mask_to((mod & MOD_SYNTH_MASK) | self._forced_modifier_bits())
def is_recoverable_network_error(exc: BaseException) -> bool:
"""Network errors or Barrier session ends that usually resolve on reconnect."""
if isinstance(exc, (EOFError, TimeoutError, BrokenPipeError)):
return True
if isinstance(exc, (ConnectionResetError, ConnectionAbortedError)):
return True
if isinstance(exc, OSError):
e = exc.errno
if e is None:
return False
return e in (
errno.ECONNRESET,
errno.EPIPE,
errno.ECONNABORTED,
errno.ETIMEDOUT,
errno.ECONNREFUSED,
errno.EHOSTUNREACH,
errno.ENETUNREACH,
)
return False
def default_screen_shape() -> tuple[int, int, int, int]:
"""Return (x, y, width, height) of the primary monitor."""
try:
import tkinter as tk
root = tk.Tk()
root.withdraw()
w = root.winfo_screenwidth()
h = root.winfo_screenheight()
root.destroy()
return 0, 0, w, h
except Exception:
return 0, 0, 1920, 1080
def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Barrier plain-TCP client (protocol 1.6): mouse, keyboard, clipboard "
"(macOS), reconnection, and scroll options — see module docstring."
)
)
parser.add_argument(
"--host",
required=True,
help="hostname or IP of the machine running the Barrier server",
)
parser.add_argument(
"--port",
type=int,
default=24800,
help="Barrier server TCP port (default: 24800)",
)
parser.add_argument(
"--name",
required=True,
help='client name exactly as in the server layout (e.g. "MacBook")',
)
parser.add_argument(
"--invert-scroll",
action="store_true",
help=(
"invert vertical and horizontal scroll (useful on macOS with "
"natural scrolling when direction is opposite to the server)"
),
)
parser.add_argument(
"--scroll-scale",
type=float,
default=3.0,
metavar="N",
help=(
"multiplies DMWM wheel intensity (default: 3). Increase if still too weak; "
"use values between 0 and 1 to reduce. Must be >0."
),
)
parser.add_argument(
"--debug",
action="store_true",
help=(
"verbose stderr logs: DKDN/DKUP/DKRP (keysym, mod, hex), "
"pynput errors, and handshake steps"
),
)
parser.add_argument(
"--debug-packets",
action="store_true",
help=(
"enable DEBUG and log every post-handshake packet (cmd, length, "
"hex). Combine with --debug for detailed key parsing"
),
)
parser.add_argument(
"--no-clipboard",
action="store_true",
help="disable clipboard sync (no CCLP/DCLP, no pbpaste/pbcopy)",
)
parser.add_argument(
"--reconnect-interval",
type=float,
default=10.0,
metavar="SEC",
help=(
"seconds between retries after disconnect or network failure (default: 10). "
"Use 0 to exit on first failure (legacy behavior)."
),
)
args = parser.parse_args()
if args.scroll_scale <= 0:
parser.error("--scroll-scale must be > 0")
if args.debug or args.debug_packets:
log_level = logging.DEBUG
else:
log_level = logging.WARNING
logging.basicConfig(
level=log_level,
format="%(levelname)s [barrier] %(message)s",
stream=sys.stderr,
force=True,
)
client = BarrierClient(
args.host,
args.port,
args.name,
invert_scroll=args.invert_scroll,
scroll_scale=args.scroll_scale,
debug=args.debug,
debug_packets=args.debug_packets,
clipboard=not args.no_clipboard,
)
reconnect_s = args.reconnect_interval
def shape() -> tuple[int, int, int, int]:
return default_screen_shape()
def cursor() -> tuple[int, int]:
return (int(client._mouse.position[0]), int(client._mouse.position[1]))
try:
while True:
try:
print(f"Connecting to {args.host}:{args.port} as {args.name!r}...")
client.connect()
print(
"Barrier handshake complete; registering screen with server "
"(Ctrl+C to exit)."
)
client.run_forever(shape, cursor)
except KeyboardInterrupt:
print("\nExiting (user interrupt).")
break
except Exception as e:
client.close()
recoverable = reconnect_s > 0 and is_recoverable_network_error(e)
if isinstance(e, EOFError):
msg = f"Disconnected: {e}"
else:
msg = f"Error: {e}"
if not recoverable:
print(msg, file=sys.stderr)
sys.exit(1)
print(
f"{msg} — retrying in {reconnect_s:.0f}s "
"(Ctrl+C while waiting to exit).",
file=sys.stderr,
)
try:
time.sleep(reconnect_s)
except KeyboardInterrupt:
print("\nExiting (user interrupt).")
break
finally:
client.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment