Last active
December 18, 2023 20:06
-
-
Save btc100k/e05a1717497cef5fdbdad684174c2867 to your computer and use it in GitHub Desktop.
Starting with a given UTXO, research backward through the BTC blockchain to find what pools solved the block it was in and its ancestors.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import datetime | |
import requests | |
import copy | |
MEMPOOL_HOST = "umbrel.local:3006" | |
MEMPOOL_PROTOCOL = "http" | |
DEBUGGING = False | |
SLEEP_BETWEEN_LEVELS = 0 | |
SLEEP_BETWEEN_BUCKETS = 1 | |
SLEEP_BETWEEN_TOO_MANY_REQUESTS = 120 | |
EXIT_AFTER_TOO_MANY_REQUESTS_COUNT = 5 | |
TOO_MANY_REQUESTS_COUNT = 0 | |
RESEARCH_SPENDS = False | |
class Attribution: | |
def __init__(self): | |
# Initialize the dictionary to store pool-block associations | |
self.attributions = {} | |
def associate_pool(self, pool: str, block: int): | |
# If the pool is already in the dictionary, append the block to its list | |
if pool in self.attributions: | |
if block not in self.attributions[pool]: | |
self.attributions[pool].append(block) | |
# Otherwise, create a new entry for this pool with the block in a new list | |
else: | |
self.attributions[pool] = [block] | |
def is_empty(self): | |
return len(self.attributions) == 0 | |
def __str__(self): | |
sorted_attributions = sorted(self.attributions.items(), key=lambda item: len(item[1]), reverse=True) | |
association_info = "" | |
for pool, blocks in sorted_attributions: | |
association_info += f"- {pool:20}: {len(blocks):4} blocks\n" | |
return association_info | |
overpaid_attribution = Attribution() | |
spending_attribution = Attribution() | |
coinbase_attribution = Attribution() | |
class Research: | |
def __init__(self, address: str, utxo_size: int): | |
self.scriptpubkey_address = address | |
self.value = utxo_size | |
self.is_coinbase = False | |
self.vout_txid = None | |
self.vout_idx = 0 | |
self.vout_pool_name = None | |
self.vout_blockheight = 0 | |
self.transactions = [] | |
self.research_above = [] | |
self.research_below = [] | |
self.overpaid = False | |
def vout_key(self): | |
return f"{self.vout_txid}:{self.vout_idx}" | |
def __str__(self): | |
if self.vout_pool_name is not None: | |
return (f"Research(Address: {self.scriptpubkey_address}," | |
f" sats: {self.value}," | |
f" block: {self.vout_blockheight}," | |
f" pool: {self.vout_pool_name}" | |
")" | |
) | |
else: | |
return (f"Research(Address: {self.scriptpubkey_address}," | |
f" sats: {self.value}" | |
")" | |
) | |
class Transaction: | |
def __init__(self, tx_data): | |
try: | |
self.txid = tx_data["txid"] | |
self.confirmed = tx_data["status"]["confirmed"] | |
if self.confirmed: | |
self.block_height = int(tx_data["status"]["block_height"]) | |
else: | |
self.block_height = 0 | |
if "vin" in tx_data: | |
self.vin = [Input(v) for v in tx_data["vin"]] | |
else: | |
self.vin = [] | |
if "vout" in tx_data: | |
self.vout = [Output(v, idx, self.txid) for idx, v in enumerate(tx_data["vout"])] | |
else: | |
self.vout = [] | |
if "fee" in tx_data: | |
self.fee = tx_data["fee"] | |
vout_value = self.get_total_vout_value() | |
self.overpaid = vout_value < self.fee | |
else: | |
self.fee = 0 | |
self.overpaid = False | |
except TypeError as ee: | |
print("A TypeError occurred:", ee) | |
print("tx_data:", tx_data) | |
raise | |
except KeyError as e: | |
print("A KeyError occurred:", e) | |
print("tx_data:", tx_data) | |
# Handle the exception or re-raise it | |
raise | |
def get_filtered_vin(self, address: str): | |
if self.confirmed: | |
return [v for v in self.vin if v.scriptpubkey_address == address] | |
else: | |
return [] | |
def get_filtered_vout(self, address): | |
if self.confirmed: | |
return [v for v in self.vout if v.scriptpubkey_address == address] | |
else: | |
return [] | |
def get_total_vout_value(self): | |
return sum(v.value for v in self.vout) | |
class Input: | |
def __init__(self, vin_data): | |
self.is_coinbase = vin_data["is_coinbase"] | |
if "prevout" in vin_data and vin_data["prevout"]: | |
try: | |
self.scriptpubkey_address = vin_data["prevout"]["scriptpubkey_address"] | |
self.value = vin_data["prevout"]["value"] | |
self.txid = vin_data["txid"] | |
self.vout_idx = vin_data["vout"] | |
except TypeError as ee: | |
print("A TypeError occurred:", ee) | |
print("tx_data:", vin_data) | |
raise | |
except KeyError as e: | |
print("A KeyError occurred:", e) | |
print("vin_data:", vin_data) | |
# Handle the exception or re-raise it | |
raise | |
else: | |
self.scriptpubkey_address = "" | |
self.value = 0 | |
self.txid = "" | |
self.vout_idx = 0 | |
def __str__(self): | |
return (f"Input(Address: {self.scriptpubkey_address}," | |
f" sats: {self.value}," | |
f" CB: {self.is_coinbase}," | |
f" txid: {self.txid})" | |
f" vout_idx: {self.vout_idx})" | |
) | |
class Output: | |
def __init__(self, vout_data, vout_idx, src_txid): | |
try: | |
self.scriptpubkey_address = vout_data["scriptpubkey_address"] | |
self.value = vout_data["value"] | |
self.vout_idx = vout_idx | |
self.vin_txid = src_txid | |
except TypeError as ee: | |
print("A TypeError occurred:", ee) | |
print("vout_data:", vout_data) | |
raise | |
except KeyError as e: | |
print("A KeyError occurred:", e) | |
print("vout_data:", vout_data) | |
# Handle the exception or re-raise it | |
raise | |
def __str__(self): | |
return (f"Output(Address: {self.scriptpubkey_address}," | |
f" sats: {self.value}" | |
) | |
def timestamp_string(): | |
right_now = int(datetime.datetime.now().timestamp()) | |
datetimeobj = datetime.datetime.fromtimestamp(right_now) | |
return datetimeobj.strftime('%H:%M:%S') | |
def fetch_transaction_json(my_url): | |
while True: | |
response = requests.get(my_url) | |
if response.status_code == 200: | |
# Success, return the JSON data | |
return response.json() | |
elif response.status_code == 429: | |
global TOO_MANY_REQUESTS_COUNT | |
TOO_MANY_REQUESTS_COUNT += 1 | |
if TOO_MANY_REQUESTS_COUNT >= EXIT_AFTER_TOO_MANY_REQUESTS_COUNT: | |
print(f"{timestamp_string()} -- Repeated 429 errors") | |
exit(1) | |
# 429 Too Many Requests - wait for 120 seconds before retrying | |
if SLEEP_BETWEEN_TOO_MANY_REQUESTS: | |
print(f"{timestamp_string()} -- 429 -- sleeping 2 minutes") | |
time.sleep(SLEEP_BETWEEN_TOO_MANY_REQUESTS) | |
else: | |
# Other errors - print the error and return None | |
print(f"Failed to fetch data: {response.status_code}") | |
return None | |
def update_research(research_tx: Research): | |
my_url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/tx/{research_tx.vout_txid}" | |
while True: | |
response = requests.get(my_url) | |
if response.status_code == 200: | |
# Success, return the JSON data | |
txjs = response.json() | |
if "status" in txjs: | |
status = txjs["status"] | |
if status["confirmed"]: | |
research_tx.vout_blockheight = status["block_height"] | |
research_tx.vout_pool_name = get_pool_name(research_tx.vout_blockheight) | |
one_tx = Transaction(txjs) | |
research_tx.transactions = [one_tx] | |
research_tx.overpaid = one_tx.overpaid | |
return True | |
# unconfirmed TX or unexpected JSON | |
print(f"-- unconfirmed TX or unexpected JSON", research_tx.vout_txid) | |
return None | |
elif response.status_code == 429: | |
global TOO_MANY_REQUESTS_COUNT | |
TOO_MANY_REQUESTS_COUNT += 1 | |
if TOO_MANY_REQUESTS_COUNT >= EXIT_AFTER_TOO_MANY_REQUESTS_COUNT: | |
print(f"{timestamp_string()} -- Repeated 429 errors") | |
exit(1) | |
if SLEEP_BETWEEN_TOO_MANY_REQUESTS: | |
# 429 Too Many Requests - wait for 120 seconds before retrying | |
print(f"{timestamp_string()} -- 429 -- sleeping 2 minutes") | |
time.sleep(SLEEP_BETWEEN_TOO_MANY_REQUESTS) | |
else: | |
# Other errors - print the error and return None | |
print(f"Failed to fetch data: {response.status_code}") | |
return None | |
def fetch_block_hash(block_height): | |
url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/block-height/{block_height}" | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.text.strip() | |
else: | |
print(f"Error fetching block hash: {response.status_code}") | |
return None | |
def fetch_block_details(block_hash): | |
url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/block/{block_hash}" | |
response = requests.get(url) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
print(f"Error fetching block details: {response.status_code}") | |
return None | |
def get_pool_name(block_height): | |
block_hash = fetch_block_hash(block_height) | |
if block_hash: | |
block_details = fetch_block_details(block_hash) | |
if block_details and "extras" in block_details and "pool" in block_details["extras"] and "name" in \ | |
block_details["extras"]["pool"]: | |
return block_details["extras"]["pool"]["name"] | |
else: | |
return "Uknown" | |
else: | |
return "UNK" | |
def find_creation(research_tx: Research): | |
for tx in research_tx.transactions: | |
outputs = tx.get_filtered_vout(research_tx.scriptpubkey_address) | |
for one_out in outputs: | |
if (one_out.value == research_tx.value | |
and one_out.scriptpubkey_address == research_tx.scriptpubkey_address): | |
# this tx is the creation of this UTXO | |
if not research_tx.vout_pool_name: | |
# we get in here if this is the very first UTXO | |
# levels below have this data populated by update_research on the loop before | |
research_tx.vout_idx = one_out.vout_idx | |
research_tx.vout_txid = one_out.vin_txid | |
research_tx.vout_pool_name = get_pool_name(tx.block_height) | |
research_tx.vout_blockheight = tx.block_height | |
research_tx.overpaid = tx.overpaid | |
if research_tx.overpaid: | |
overpaid_attribution.associate_pool(research_tx.vout_pool_name, tx.block_height) | |
for one_in in tx.vin: | |
# assemble all the information we need for the next level evaluation | |
next_rx = Research(one_in.scriptpubkey_address, one_in.value) | |
next_rx.is_coinbase = one_in.is_coinbase | |
next_rx.vout_txid = one_in.txid | |
next_rx.vout_idx = one_in.vout_idx | |
if update_research(next_rx): | |
if next_rx.overpaid: | |
overpaid_attribution.associate_pool(next_rx.vout_pool_name, next_rx.vout_blockheight) | |
if next_rx.is_coinbase: | |
coinbase_attribution.associate_pool(next_rx.vout_pool_name, next_rx.vout_blockheight) | |
if DEBUGGING: | |
print(f"\t *** Coinbase Found: {one_in.txid}") | |
print(coinbase_attribution) | |
print(f"\t *** *** *** ***") | |
research_tx.research_above.append(next_rx) | |
if DEBUGGING: | |
print(f"TXID: {tx.txid}") | |
print("\t Block Number:", tx.block_height) | |
print("\t\t Pool:", research_tx.vout_pool_name) | |
print("\t\t", one_out) | |
# print("\t Research Next Level:", research_tx.research_above) | |
return True | |
# print(f"Failed to find creation for {research_tx}") | |
return False | |
def find_spends(research_tx: Research): | |
for tx in research_tx.transactions: | |
inputs = tx.get_filtered_vin(research_tx.scriptpubkey_address) | |
for one_in in inputs: | |
if (one_in.value == research_tx.value | |
and one_in.vout_idx == research_tx.vout_idx | |
and one_in.txid == research_tx.vout_txid): | |
# this is them spending the UTXO we're researching | |
next_rx = Research(one_in.scriptpubkey_address, one_in.value) | |
# next_rx.is_coinbase = one_in.is_coinbase # cannot be coinbase | |
next_rx.vout_txid = one_in.txid | |
next_rx.vout_idx = one_in.vout_idx | |
if update_research(next_rx): | |
spending_attribution.associate_pool(next_rx.vout_pool_name, next_rx.vout_blockheight) | |
research_tx.research_below.append(next_rx) | |
if DEBUGGING: | |
print(f"TXID: {tx.txid}") | |
print("\t Block Number:", tx.block_height) | |
print("\t VIN Info:", one_in) | |
return | |
else: | |
# this is the spending of sats on this address, but not the UTXO we're researching | |
print("\t VIN Info?", one_in) | |
print(f"Failed to find spend for for {research_tx}") | |
def start_research(research_tx: Research): | |
url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/address/{research_tx.scriptpubkey_address}/txs" # /chain | |
if DEBUGGING: | |
print(f"-- Researching: {research_tx}") | |
bucket_number = 0 | |
next_bucket_url = url | |
research_tx.transactions = [] | |
while True: | |
tx_json = fetch_transaction_json(next_bucket_url) | |
if tx_json is None: | |
if bucket_number == 0: | |
print(f"No transactions found for {research_tx}") | |
else: | |
print(f"Failed to find creation for {research_tx}") | |
return | |
new_transactions = [Transaction(tx) for tx in tx_json] | |
research_tx.transactions += new_transactions | |
found = find_creation(research_tx) | |
if found: | |
break | |
if SLEEP_BETWEEN_BUCKETS: | |
time.sleep(SLEEP_BETWEEN_BUCKETS) | |
last_tx = new_transactions[-1] | |
next_bucket_url = url + f"?after_txid={last_tx.txid}" | |
bucket_number += 1 | |
# print(f"-- Researching: {research_tx} bucket # {bucket_number}") | |
level_index = 0 | |
seed_transaction = "bc1p27rngum66yxampv8ph8sk47kpr30anzu9fggn0pl5gtsm3lm35vsnj39j3" | |
seed_utxo_size = 58452 | |
start_rx = Research(seed_transaction, seed_utxo_size) | |
start_research(start_rx) | |
if RESEARCH_SPENDS and start_rx.overpaid: | |
spend_rx = copy.deepcopy(start_rx) | |
spend_rx.research_above = [] | |
next_level = [spend_rx] | |
while len(next_level): | |
below_level = [] | |
print(f"{timestamp_string()} -- Spending Level #{level_index}: {len(next_level)} addresses") | |
for one_rx in next_level: | |
find_spends(one_rx) | |
below_level += one_rx.research_below | |
unique_vout = {} | |
for one_utxo in below_level: | |
# no reason to include non-overpaid tx | |
if one_utxo.overpaid: | |
key = one_utxo.vout_key() | |
unique_vout[key] = one_utxo | |
next_level = list(unique_vout.values()) | |
print("Overpaid Attributions:") | |
print(spending_attribution) | |
print(f"***************************************") | |
next_level = start_rx.research_above | |
print(f"***************************************") | |
while len(next_level) > 0: | |
level_index += 1 | |
print(f"{timestamp_string()} -- Level #{level_index}: {len(next_level)} addresses") | |
level_above = [] | |
for one_item in next_level: | |
# sleep 1 minute between levels | |
if SLEEP_BETWEEN_LEVELS: | |
time.sleep(SLEEP_BETWEEN_LEVELS) | |
# presumably the item already has its creation tx from processing the level above | |
find_creation(one_item) | |
level_above += one_item.research_above | |
# unique all the inputs | |
unique_vout = {} | |
for one_utxo in level_above: | |
key = one_utxo.vout_key() | |
unique_vout[key] = one_utxo | |
next_level = list(unique_vout.values()) | |
print("Overpaid Attributions:") | |
print(overpaid_attribution) | |
if not coinbase_attribution.is_empty(): | |
print("Coinbase Attributions:") | |
print(coinbase_attribution) | |
print(f"***************************************") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment