Skip to content

Instantly share code, notes, and snippets.

@mosquito
Created October 23, 2025 08:17
Show Gist options
  • Save mosquito/ba494f43f8d03248ea74e2984dae04de to your computer and use it in GitHub Desktop.
Save mosquito/ba494f43f8d03248ea74e2984dae04de to your computer and use it in GitHub Desktop.
Collect max/ip/ip6 counter by each router client using nftables
#!/usr/bin/env python3
import argparse, json, sys, shutil
import logging
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from subprocess import check_output, CalledProcessError
def sh(*args, timeout: int = 1):
return check_output(args, text=True, timeout=timeout)
def read_set(family, table, setname, timeout):
try:
out = sh("nft", "-j", "list", "set", family, table, setname, timeout=timeout)
except CalledProcessError as e:
raise RuntimeError(f"{setname}: {e}") from e
js = json.loads(out)
logging.debug("nft output for set %s: %s", setname, js)
for item in js.get("nftables", []):
st = item.get("set")
if not st:
continue
for e in st.get("elem", []):
raw = e.get("elem") if isinstance(e, dict) and "elem" in e else e
# expect composite key [ifname, addr]
if isinstance(raw, list) and len(raw) == 2:
ifname, addr = raw
elif isinstance(raw, dict):
val = raw.get("val", raw.get("concat", raw))
if isinstance(val, dict) and "concat" in val and isinstance(val["concat"], list) and len(val["concat"]) == 2:
ifname, addr = val["concat"]
elif isinstance(val, list) and len(val) == 2:
ifname, addr = val
elif isinstance(val, str):
parts = val.split()
if len(parts) == 2:
ifname, addr = parts
else:
continue
else:
continue
elif isinstance(raw, str):
parts = raw.split()
if len(parts) == 2:
ifname, addr = parts
else:
continue
else:
continue
if isinstance(e, dict) and "elem" in e and isinstance(e["elem"], dict):
counter = e["elem"].get("counter", {})
elif isinstance(e, dict):
counter = e.get("counter", {})
else:
counter = {}
pkts = int(counter.get("packets", 0))
bytes_ = int(counter.get("bytes", 0))
yield str(ifname), str(addr), pkts, bytes_
def esc(s: str) -> str:
return s.replace("\\", "\\\\").replace("\n", "\\n").replace('"', '\\"')
def render_metrics(
mac_src, mac_dst,
ip4_src, ip4_dst,
ip6_src, ip6_dst
):
lines = [
'# TYPE nft_packets_total counter',
'# TYPE nft_bytes_total counter',
]
def add(rows, direction, family, keyname):
for ifname, addr, pkts, bytes_ in rows:
lbls = [f'ifname="{esc(ifname)}"', f'dir="{direction}"', f'family="{family}"']
if keyname == "mac":
lbls.append(f'mac="{esc(addr)}"')
else:
lbls.append(f'ip="{esc(addr)}"')
L = ",".join(lbls)
lines.append(f"nft_packets_total{{{L}}} {pkts}")
lines.append(f"nft_bytes_total{{{L}}} {bytes_}")
add(mac_src, "ingress", "mac", "mac")
add(mac_dst, "egress", "mac", "mac")
add(ip4_src, "ingress", "ip4", "ip")
add(ip4_dst, "egress", "ip4", "ip")
add(ip6_src, "ingress", "ip6", "ip")
add(ip6_dst, "egress", "ip6", "ip")
return ("\n".join(lines) + "\n").encode()
@dataclass
class ServerConfig:
family: str
table: str
set_mac_src: str
set_mac_dst: str
set_ip4_src: str
set_ip4_dst: str
set_ip6_src: str
set_ip6_dst: str
timeout: float
class Handler(BaseHTTPRequestHandler):
CONFIG: ServerConfig
def log_message(self, *a, **kw): pass
def do_GET(self):
if self.path != "/metrics":
self.send_response(404); self.end_headers(); return
try:
c = self.CONFIG
mac_src = list(read_set(c.family, c.table, c.set_mac_src, c.timeout))
mac_dst = list(read_set(c.family, c.table, c.set_mac_dst, c.timeout))
ip4_src = list(read_set(c.family, c.table, c.set_ip4_src, c.timeout))
ip4_dst = list(read_set(c.family, c.table, c.set_ip4_dst, c.timeout))
ip6_src = list(read_set(c.family, c.table, c.set_ip6_src, c.timeout))
ip6_dst = list(read_set(c.family, c.table, c.set_ip6_dst, c.timeout))
body = render_metrics(mac_src, mac_dst, ip4_src, ip4_dst, ip6_src, ip6_dst)
self.send_response(200)
self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except Exception as e:
msg = (f"# error: {e}\n").encode()
self.send_response(500)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Content-Length", str(len(msg)))
self.end_headers()
self.wfile.write(msg)
def main(argv):
if not shutil.which("nft"):
print("Error: 'nft' binary not found in PATH", file=sys.stderr)
return 1
p = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
p.add_argument("--family", default="netdev", help="nftables family")
p.add_argument("--table", default="macacct", help="nftables table")
p.add_argument("--set-mac-src", default="mac_src", help="set for source MACs (ifname . ether_addr)")
p.add_argument("--set-mac-dst", default="mac_dst", help="set for dest MACs (ifname . ether_addr)")
p.add_argument("--set-ip4-src", default="ip4_src", help="set for source IPv4 (ifname . ipv4_addr)")
p.add_argument("--set-ip4-dst", default="ip4_dst", help="set for dest IPv4 (ifname . ipv4_addr)")
p.add_argument("--set-ip6-src", default="ip6_src", help="set for source IPv6 (ifname . ipv6_addr)")
p.add_argument("--set-ip6-dst", default="ip6_dst", help="set for dest IPv6 (ifname . ipv6_addr)")
p.add_argument("--address", default="127.0.0.1", help="address to bind to")
p.add_argument("--port", type=int, default=9104, help="port to bind to")
p.add_argument("--timeout", type=float, default=2.0, help="timeout for nft commands (s)")
p.add_argument("--log-level", choices=["debug", "info", "warning", "error", "critical"], default="info")
a = p.parse_args(argv)
logging.basicConfig(level=getattr(logging, a.log_level.upper(), logging.INFO),
format="[%(levelname)s] %(message)s")
Handler.CONFIG = ServerConfig(
family=a.family,
table=a.table,
set_mac_src=a.set_mac_src,
set_mac_dst=a.set_mac_dst,
set_ip4_src=a.set_ip4_src,
set_ip4_dst=a.set_ip4_dst,
set_ip6_src=a.set_ip6_src,
set_ip6_dst=a.set_ip6_dst,
timeout=a.timeout,
)
httpd = ThreadingHTTPServer((a.address, a.port), Handler)
try:
logging.info("Starting nftables exporter on %s:%d", a.address, a.port)
httpd.serve_forever()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))
table netdev macacct {
# --- sets ---
set mac_src { type ifname . ether_addr; flags dynamic, timeout; timeout 48h; counter; }
set mac_dst { type ifname . ether_addr; flags dynamic, timeout; timeout 48h; counter; }
set ip4_src { type ifname . ipv4_addr; flags dynamic, timeout; timeout 48h; counter; }
set ip4_dst { type ifname . ipv4_addr; flags dynamic, timeout; timeout 48h; counter; }
set ip6_src { type ifname . ipv6_addr; flags dynamic, timeout; timeout 48h; counter; }
set ip6_dst { type ifname . ipv6_addr; flags dynamic, timeout; timeout 48h; counter; }
# --- lan ---
chain ingress_lan {
type filter hook ingress device "lan" priority 0;
# MAC
update @mac_src { "lan" . ether saddr }
"lan" . ether saddr @mac_src
# IPv4
ether type ip
update @ip4_src { "lan" . ip saddr }
"lan" . ip saddr @ip4_src
# IPv6
ether type ip6
update @ip6_src { "lan" . ip6 saddr }
"lan" . ip6 saddr @ip6_src
}
chain egress_lan {
type filter hook egress device "lan" priority 0;
# MAC
update @mac_dst { "lan" . ether daddr }
"lan" . ether daddr @mac_dst
# IPv4
ether type ip
update @ip4_dst { "lan" . ip daddr }
"lan" . ip daddr @ip4_dst
# IPv6
ether type ip6
update @ip6_dst { "lan" . ip6 daddr }
"lan" . ip6 daddr @ip6_dst
}
# copy ingress_/egress_ blocks for collect info by other interfaces
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment