Created
January 25, 2021 17:50
-
-
Save grubles/c6819f11ede753802a2fc6d4e1912b58 to your computer and use it in GitHub Desktop.
single hop throughput test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent import futures | |
from fixtures import * # noqa: F401,F403 | |
from time import time, sleep | |
from tqdm import tqdm | |
from pyln.testing.utils import sync_blockheight, wait_for | |
from pytest_benchmark.stats import Metadata | |
from contextlib import contextmanager | |
import itertools | |
import logging | |
import pytest | |
import random | |
import sys | |
import os | |
num_workers = 75 | |
num_payments = 10000 | |
# To ensure optimal performance we need to run without the developer | |
# options. | |
assert os.environ.get("DEVELOPER", "0") == "0" | |
@contextmanager | |
def benchmark_throughput(benchmark, num_events: int): | |
"""Context manager to benchmark throughput. | |
Repeated timed function calls measures latency, but throughput, | |
with many parallel calls is better measured by dividing the time | |
to completion, and divide it by the number of parallel calls. | |
This results in a synthetic benchmark with no variance, but | |
repeating can amend that. | |
""" | |
m = Metadata( | |
fixture=benchmark, | |
iterations=num_events, | |
options={ | |
"disable_gc": False, | |
"timer": benchmark._timer, | |
"min_rounds": 1, | |
"max_time": benchmark._max_time, | |
"min_time": benchmark._min_time, | |
"warmup": False, | |
}, | |
) | |
benchmark._add_stats(m) | |
benchmark._mode = "with benchmark_throughput(...)" | |
start_time = time() | |
yield | |
m.update(time() - start_time) | |
# Common options to optimize throughput | |
opts = {"log-level": "broken", "max-concurrent-htlcs": 483} | |
@pytest.fixture | |
def executor(): | |
ex = futures.ThreadPoolExecutor(max_workers=num_workers) | |
yield ex | |
ex.shutdown(wait=False) | |
def line_graph(node_factory, num_nodes, opts): | |
"""Custom line_graph that doesn't rely on logs to proceed.""" | |
bitcoind = node_factory.bitcoind | |
nodes = [] | |
logging.debug(f"Starting {num_nodes} nodes") | |
for _ in range(num_nodes): | |
n = node_factory.get_node(options=opts, start=False) | |
n.daemon.start(wait_for_initialized=False) | |
nodes.append(n) | |
logging.debug(f"Node {n.daemon.prefix} started") | |
sleep(1) | |
# Annotate nodes with their info | |
for n in nodes: | |
n.info = n.rpc.getinfo() | |
n.port = n.info["binding"][0]["port"] | |
# Give each node some funds | |
connections = list(zip(nodes[:-1], nodes[1:])) | |
for src, dst in connections: | |
addr = src.rpc.newaddr()["bech32"] | |
bitcoind.rpc.sendtoaddress(addr, 5) | |
bitcoind.generate_block(1, wait_for_mempool=num_nodes - 1) | |
sync_blockheight(bitcoind, nodes) | |
# Fully connect the graph to speed up the gossip: | |
for src, dst in itertools.combinations(nodes, 2): | |
src.rpc.connect(dst.rpc.getinfo()["id"], "localhost:%d" % dst.port) | |
# Now fund the channels: | |
for src, dst in connections: | |
src.rpc.fundchannel(dst.info["id"], "all") | |
bitcoind.generate_block(6, wait_for_mempool=num_nodes - 1) | |
sync_blockheight(bitcoind, nodes) | |
print("Waiting for channels to be synced") | |
for n in nodes: | |
while True: | |
num_chans = len(n.rpc.listchannels()['channels']) | |
expected = 2 * len(connections) | |
if num_chans == expected: | |
break | |
print(f"Node {n.daemon.prefix} has {num_chans} channels vs {expected} expected") | |
sleep(1) | |
return nodes | |
def test_throughput_single_hop(node_factory, bitcoind, executor, benchmark): | |
"""Test a payment between two peers. | |
Quite a bit of trickery is required to push the limits from the | |
testing framework. | |
- Disable printing to stdout (syscalls are expensive) | |
- Can't use out `wait_for_log` helpers, hence the sleeps | |
""" | |
l1, l2 = line_graph(node_factory, 2, opts=opts) | |
route = l1.rpc.getroute(l2.rpc.getinfo()["id"], 1000, 1)["route"] | |
print("Collecting invoices") | |
fs = [] | |
invoices = [] | |
for i in tqdm(range(num_payments)): | |
invoices.append( | |
l2.rpc.invoice(1000, "invoice-%d" % (i), "desc")["payment_hash"] | |
) | |
print("Sending payments") | |
start_time = time() | |
def do_pay(i): | |
p = l1.rpc.sendpay(route, i) | |
r = l1.rpc.waitsendpay(p["payment_hash"]) | |
return r | |
for i in invoices: | |
fs.append(executor.submit(do_pay, i)) | |
with benchmark_throughput(benchmark, num_payments): | |
for f in tqdm(futures.as_completed(fs), total=len(fs)): | |
f.result() | |
diff = time() - start_time | |
sys.stderr.write( | |
"Done. {} payments performed in {} seconds ({} payments per second)\n".format( | |
num_payments, diff, num_payments / diff | |
) | |
) | |
def test_start(node_factory, benchmark): | |
benchmark(node_factory.get_node) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment