Created
February 13, 2025 21:43
-
-
Save thiagorb/0e53b2b2dca7360b6fba382b637b5cef to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import subprocess | |
import re | |
import heapq | |
import sys | |
from datetime import datetime, timezone | |
from typing import Generator, Mapping | |
def convert_timestamp_ns_to_date(timestamp_ns: int) -> datetime: | |
return datetime.fromtimestamp(timestamp_ns / 1e9, tz=timezone.utc) | |
def run_lncli(*command: list[str]): | |
"""Runs an lncli command and returns the parsed JSON output.""" | |
result = subprocess.run(["lncli"] + list(command), capture_output=True, text=True) | |
if result.returncode != 0: | |
raise Exception( | |
f"Error running lncli command '{' '.join(command)}':\n{result.stdout}{result.stderr}" | |
) | |
return json.loads(result.stdout) | |
def list_invoices() -> Generator[list[str | int], None, None]: | |
last_index_offset = None | |
while last_index_offset != 0: | |
list_invoices = run_lncli( | |
"listinvoices", | |
f"--index_offset={last_index_offset or 0}", | |
"--paginate-forwards", | |
) | |
last_index_offset = int(list_invoices["last_index_offset"]) | |
for invoice in list_invoices["invoices"]: | |
if invoice["state"] != "SETTLED": | |
continue | |
# Not sure how to handle multiple htlcs | |
if len(invoice["htlcs"]) != 1: | |
raise ValueError(f"Multiple htlcs found in invoice {invoice['r_hash']}") | |
channel_id = invoice["htlcs"][0]["chan_id"] | |
timestamp = convert_timestamp_ns_to_date( | |
int(invoice["settle_date"]) * 1_000_000_000 | |
) | |
yield { | |
"timestamp": timestamp, | |
"type": "invoice", | |
"identifier": invoice["r_hash"], | |
"value": int(invoice["value_msat"]), | |
"channel": channel_id, | |
} | |
def list_payments() -> Generator[list[str | int], None, None]: | |
last_index_offset = None | |
while last_index_offset != 0: | |
list_payments = run_lncli( | |
"listpayments", | |
f"--index_offset={last_index_offset or 0}", | |
"--paginate_forwards", | |
) | |
last_index_offset = int(list_payments["last_index_offset"]) | |
for payment in list_payments["payments"]: | |
if payment["status"] != "SUCCEEDED": | |
continue | |
# Not sure how to handle multiple htlcs | |
if len(payment["htlcs"]) != 1: | |
raise ValueError( | |
f"Multiple htlcs found in payment {payment['payment_hash']}" | |
) | |
channel_id = payment["htlcs"][0]["route"]["hops"][0]["chan_id"] | |
settle_date = convert_timestamp_ns_to_date( | |
int(payment["htlcs"][0]["resolve_time_ns"]) | |
) | |
yield { | |
"timestamp": settle_date, | |
"type": "payment", | |
"identifier": payment["payment_hash"], | |
"value": -int(payment["value_msat"]), | |
"channel": channel_id, | |
} | |
yield { | |
"timestamp": settle_date, | |
"type": "payment fee", | |
"identifier": payment["payment_hash"], | |
"value": -int(payment["fee_msat"]), | |
"channel": channel_id, | |
} | |
def list_chain_transactions() -> Generator[list[str | int], None, None]: | |
result = [] | |
for tx in run_lncli("listchaintxns")["transactions"]: | |
timestamp = convert_timestamp_ns_to_date(int(tx["time_stamp"]) * 1_000_000_000) | |
amount = int(tx["amount"]) * 1_000 | |
fee = int(tx["total_fees"]) * 1_000 | |
result.append( | |
{ | |
"timestamp": timestamp, | |
"type": "chain", | |
"identifier": tx["tx_hash"], | |
"value": amount, | |
"channel": None, | |
} | |
) | |
match = re.match( | |
r"^0:(?P<type>openchannel|closechannel):shortchanid\-(?P<channel_id>[0-9]+)$", | |
tx["label"], | |
) | |
if match is None: | |
continue | |
type = match["type"] | |
result.append( | |
{ | |
"timestamp": timestamp, | |
"type": type, | |
"identifier": tx["tx_hash"], | |
"value": -amount, | |
"channel": match["channel_id"], | |
} | |
) | |
if fee != 0: | |
result.append( | |
{ | |
"timestamp": timestamp, | |
"type": f"{type} fee", | |
"identifier": tx["tx_hash"], | |
"value": -fee, | |
"channel": match["channel_id"], | |
} | |
) | |
return sorted(result, key=lambda x: x["timestamp"]) | |
def list_forwarding_events() -> Generator[list[str | int], None, None]: | |
last_index_offset = None | |
while last_index_offset != 0: | |
list_forwarding_events = run_lncli( | |
"fwdinghistory", | |
"--start_time=0", | |
"--max_events=100", | |
"--skip_peer_alias_lookup", | |
f"--index_offset={last_index_offset or 0}", | |
) | |
last_index_offset = list_forwarding_events["last_offset_index"] if list_forwarding_events["last_offset_index"] != last_index_offset else 0 | |
for event in list_forwarding_events["forwarding_events"]: | |
timestamp = convert_timestamp_ns_to_date(int(event["timestamp_ns"])) | |
yield { | |
"timestamp": timestamp, | |
"type": "forward in", | |
"identifier": event["timestamp_ns"], | |
"value": int(event["amt_in_msat"]) - int(event["fee_msat"]), | |
"channel": event["chan_id_in"], | |
} | |
yield { | |
"timestamp": timestamp, | |
"type": "forward fee", | |
"identifier": event["timestamp_ns"], | |
"value": int(event["fee_msat"]), | |
"channel": event["chan_id_in"], | |
} | |
yield { | |
"timestamp": timestamp, | |
"type": "forward out", | |
"identifier": event["timestamp_ns"], | |
"value": -int(event["amt_out_msat"]), | |
"channel": event["chan_id_out"], | |
} | |
def build_closed_channels_map() -> Mapping[str, int]: | |
closed_channels = run_lncli("closedchannels")["channels"] | |
return { | |
channel["chan_id"]: 0 | |
for channel in closed_channels | |
} | |
def generate_ledger() -> Generator[list[str | int], None, None]: | |
transactions = heapq.merge( | |
list_invoices(), | |
list_payments(), | |
list_chain_transactions(), | |
list_forwarding_events(), | |
key=lambda x: x["timestamp"], | |
) | |
closed_channels = build_closed_channels_map() | |
for tx in transactions: | |
if tx["channel"] is not None and tx["channel"] in closed_channels: | |
print(closed_channels[tx["channel"]], tx["value"], file=sys.stderr) | |
closed_channels[tx["channel"]] += tx["value"] | |
yield tx | |
if tx["type"] == "closechannel": | |
close_channel_fee = -closed_channels.pop(tx["channel"]) | |
yield { | |
"timestamp": tx["timestamp"], | |
"type": "closechannel fee", | |
"identifier": tx["identifier"], | |
"value": close_channel_fee, | |
"channel": tx["channel"], | |
} | |
if len(sys.argv) > 1 and sys.argv[1] == "json": | |
for row in generate_ledger(): | |
print(json.dumps(row)) | |
else: | |
print("timestamp,type,identifier,value,channel") | |
for row in generate_ledger(): | |
print(",".join(map(str, row.values()))) | |
### @todo: handle channels opened by others |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment