Skip to content

Instantly share code, notes, and snippets.

@thiagorb
Created February 13, 2025 21:43
Show Gist options
  • Save thiagorb/0e53b2b2dca7360b6fba382b637b5cef to your computer and use it in GitHub Desktop.
Save thiagorb/0e53b2b2dca7360b6fba382b637b5cef to your computer and use it in GitHub Desktop.
import json
import subprocess
import re
import heapq
import sys
from datetime import datetime, timezone
from typing import Generator, Mapping
def convert_timestamp_ns_to_date(timestamp_ns: int) -> datetime:
return datetime.fromtimestamp(timestamp_ns / 1e9, tz=timezone.utc)
def run_lncli(*command: list[str]):
"""Runs an lncli command and returns the parsed JSON output."""
result = subprocess.run(["lncli"] + list(command), capture_output=True, text=True)
if result.returncode != 0:
raise Exception(
f"Error running lncli command '{' '.join(command)}':\n{result.stdout}{result.stderr}"
)
return json.loads(result.stdout)
def list_invoices() -> Generator[list[str | int], None, None]:
last_index_offset = None
while last_index_offset != 0:
list_invoices = run_lncli(
"listinvoices",
f"--index_offset={last_index_offset or 0}",
"--paginate-forwards",
)
last_index_offset = int(list_invoices["last_index_offset"])
for invoice in list_invoices["invoices"]:
if invoice["state"] != "SETTLED":
continue
# Not sure how to handle multiple htlcs
if len(invoice["htlcs"]) != 1:
raise ValueError(f"Multiple htlcs found in invoice {invoice['r_hash']}")
channel_id = invoice["htlcs"][0]["chan_id"]
timestamp = convert_timestamp_ns_to_date(
int(invoice["settle_date"]) * 1_000_000_000
)
yield {
"timestamp": timestamp,
"type": "invoice",
"identifier": invoice["r_hash"],
"value": int(invoice["value_msat"]),
"channel": channel_id,
}
def list_payments() -> Generator[list[str | int], None, None]:
last_index_offset = None
while last_index_offset != 0:
list_payments = run_lncli(
"listpayments",
f"--index_offset={last_index_offset or 0}",
"--paginate_forwards",
)
last_index_offset = int(list_payments["last_index_offset"])
for payment in list_payments["payments"]:
if payment["status"] != "SUCCEEDED":
continue
# Not sure how to handle multiple htlcs
if len(payment["htlcs"]) != 1:
raise ValueError(
f"Multiple htlcs found in payment {payment['payment_hash']}"
)
channel_id = payment["htlcs"][0]["route"]["hops"][0]["chan_id"]
settle_date = convert_timestamp_ns_to_date(
int(payment["htlcs"][0]["resolve_time_ns"])
)
yield {
"timestamp": settle_date,
"type": "payment",
"identifier": payment["payment_hash"],
"value": -int(payment["value_msat"]),
"channel": channel_id,
}
yield {
"timestamp": settle_date,
"type": "payment fee",
"identifier": payment["payment_hash"],
"value": -int(payment["fee_msat"]),
"channel": channel_id,
}
def list_chain_transactions() -> Generator[list[str | int], None, None]:
result = []
for tx in run_lncli("listchaintxns")["transactions"]:
timestamp = convert_timestamp_ns_to_date(int(tx["time_stamp"]) * 1_000_000_000)
amount = int(tx["amount"]) * 1_000
fee = int(tx["total_fees"]) * 1_000
result.append(
{
"timestamp": timestamp,
"type": "chain",
"identifier": tx["tx_hash"],
"value": amount,
"channel": None,
}
)
match = re.match(
r"^0:(?P<type>openchannel|closechannel):shortchanid\-(?P<channel_id>[0-9]+)$",
tx["label"],
)
if match is None:
continue
type = match["type"]
result.append(
{
"timestamp": timestamp,
"type": type,
"identifier": tx["tx_hash"],
"value": -amount,
"channel": match["channel_id"],
}
)
if fee != 0:
result.append(
{
"timestamp": timestamp,
"type": f"{type} fee",
"identifier": tx["tx_hash"],
"value": -fee,
"channel": match["channel_id"],
}
)
return sorted(result, key=lambda x: x["timestamp"])
def list_forwarding_events() -> Generator[list[str | int], None, None]:
last_index_offset = None
while last_index_offset != 0:
list_forwarding_events = run_lncli(
"fwdinghistory",
"--start_time=0",
"--max_events=100",
"--skip_peer_alias_lookup",
f"--index_offset={last_index_offset or 0}",
)
last_index_offset = list_forwarding_events["last_offset_index"] if list_forwarding_events["last_offset_index"] != last_index_offset else 0
for event in list_forwarding_events["forwarding_events"]:
timestamp = convert_timestamp_ns_to_date(int(event["timestamp_ns"]))
yield {
"timestamp": timestamp,
"type": "forward in",
"identifier": event["timestamp_ns"],
"value": int(event["amt_in_msat"]) - int(event["fee_msat"]),
"channel": event["chan_id_in"],
}
yield {
"timestamp": timestamp,
"type": "forward fee",
"identifier": event["timestamp_ns"],
"value": int(event["fee_msat"]),
"channel": event["chan_id_in"],
}
yield {
"timestamp": timestamp,
"type": "forward out",
"identifier": event["timestamp_ns"],
"value": -int(event["amt_out_msat"]),
"channel": event["chan_id_out"],
}
def build_closed_channels_map() -> Mapping[str, int]:
closed_channels = run_lncli("closedchannels")["channels"]
return {
channel["chan_id"]: 0
for channel in closed_channels
}
def generate_ledger() -> Generator[list[str | int], None, None]:
transactions = heapq.merge(
list_invoices(),
list_payments(),
list_chain_transactions(),
list_forwarding_events(),
key=lambda x: x["timestamp"],
)
closed_channels = build_closed_channels_map()
for tx in transactions:
if tx["channel"] is not None and tx["channel"] in closed_channels:
print(closed_channels[tx["channel"]], tx["value"], file=sys.stderr)
closed_channels[tx["channel"]] += tx["value"]
yield tx
if tx["type"] == "closechannel":
close_channel_fee = -closed_channels.pop(tx["channel"])
yield {
"timestamp": tx["timestamp"],
"type": "closechannel fee",
"identifier": tx["identifier"],
"value": close_channel_fee,
"channel": tx["channel"],
}
if len(sys.argv) > 1 and sys.argv[1] == "json":
for row in generate_ledger():
print(json.dumps(row))
else:
print("timestamp,type,identifier,value,channel")
for row in generate_ledger():
print(",".join(map(str, row.values())))
### @todo: handle channels opened by others
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment