Skip to content

Instantly share code, notes, and snippets.

@btc100k
Last active December 18, 2023 20:06
Show Gist options
  • Save btc100k/e05a1717497cef5fdbdad684174c2867 to your computer and use it in GitHub Desktop.
Save btc100k/e05a1717497cef5fdbdad684174c2867 to your computer and use it in GitHub Desktop.
Starting with a given UTXO, research backward through the BTC blockchain to find what pools solved the block it was in and its ancestors.
import time
import datetime
import requests
import copy
MEMPOOL_HOST = "umbrel.local:3006"
MEMPOOL_PROTOCOL = "http"
DEBUGGING = False
SLEEP_BETWEEN_LEVELS = 0
SLEEP_BETWEEN_BUCKETS = 1
SLEEP_BETWEEN_TOO_MANY_REQUESTS = 120
EXIT_AFTER_TOO_MANY_REQUESTS_COUNT = 5
TOO_MANY_REQUESTS_COUNT = 0
RESEARCH_SPENDS = False
class Attribution:
def __init__(self):
# Initialize the dictionary to store pool-block associations
self.attributions = {}
def associate_pool(self, pool: str, block: int):
# If the pool is already in the dictionary, append the block to its list
if pool in self.attributions:
if block not in self.attributions[pool]:
self.attributions[pool].append(block)
# Otherwise, create a new entry for this pool with the block in a new list
else:
self.attributions[pool] = [block]
def is_empty(self):
return len(self.attributions) == 0
def __str__(self):
sorted_attributions = sorted(self.attributions.items(), key=lambda item: len(item[1]), reverse=True)
association_info = ""
for pool, blocks in sorted_attributions:
association_info += f"- {pool:20}: {len(blocks):4} blocks\n"
return association_info
overpaid_attribution = Attribution()
spending_attribution = Attribution()
coinbase_attribution = Attribution()
class Research:
def __init__(self, address: str, utxo_size: int):
self.scriptpubkey_address = address
self.value = utxo_size
self.is_coinbase = False
self.vout_txid = None
self.vout_idx = 0
self.vout_pool_name = None
self.vout_blockheight = 0
self.transactions = []
self.research_above = []
self.research_below = []
self.overpaid = False
def vout_key(self):
return f"{self.vout_txid}:{self.vout_idx}"
def __str__(self):
if self.vout_pool_name is not None:
return (f"Research(Address: {self.scriptpubkey_address},"
f" sats: {self.value},"
f" block: {self.vout_blockheight},"
f" pool: {self.vout_pool_name}"
")"
)
else:
return (f"Research(Address: {self.scriptpubkey_address},"
f" sats: {self.value}"
")"
)
class Transaction:
def __init__(self, tx_data):
try:
self.txid = tx_data["txid"]
self.confirmed = tx_data["status"]["confirmed"]
if self.confirmed:
self.block_height = int(tx_data["status"]["block_height"])
else:
self.block_height = 0
if "vin" in tx_data:
self.vin = [Input(v) for v in tx_data["vin"]]
else:
self.vin = []
if "vout" in tx_data:
self.vout = [Output(v, idx, self.txid) for idx, v in enumerate(tx_data["vout"])]
else:
self.vout = []
if "fee" in tx_data:
self.fee = tx_data["fee"]
vout_value = self.get_total_vout_value()
self.overpaid = vout_value < self.fee
else:
self.fee = 0
self.overpaid = False
except TypeError as ee:
print("A TypeError occurred:", ee)
print("tx_data:", tx_data)
raise
except KeyError as e:
print("A KeyError occurred:", e)
print("tx_data:", tx_data)
# Handle the exception or re-raise it
raise
def get_filtered_vin(self, address: str):
if self.confirmed:
return [v for v in self.vin if v.scriptpubkey_address == address]
else:
return []
def get_filtered_vout(self, address):
if self.confirmed:
return [v for v in self.vout if v.scriptpubkey_address == address]
else:
return []
def get_total_vout_value(self):
return sum(v.value for v in self.vout)
class Input:
def __init__(self, vin_data):
self.is_coinbase = vin_data["is_coinbase"]
if "prevout" in vin_data and vin_data["prevout"]:
try:
self.scriptpubkey_address = vin_data["prevout"]["scriptpubkey_address"]
self.value = vin_data["prevout"]["value"]
self.txid = vin_data["txid"]
self.vout_idx = vin_data["vout"]
except TypeError as ee:
print("A TypeError occurred:", ee)
print("tx_data:", vin_data)
raise
except KeyError as e:
print("A KeyError occurred:", e)
print("vin_data:", vin_data)
# Handle the exception or re-raise it
raise
else:
self.scriptpubkey_address = ""
self.value = 0
self.txid = ""
self.vout_idx = 0
def __str__(self):
return (f"Input(Address: {self.scriptpubkey_address},"
f" sats: {self.value},"
f" CB: {self.is_coinbase},"
f" txid: {self.txid})"
f" vout_idx: {self.vout_idx})"
)
class Output:
def __init__(self, vout_data, vout_idx, src_txid):
try:
self.scriptpubkey_address = vout_data["scriptpubkey_address"]
self.value = vout_data["value"]
self.vout_idx = vout_idx
self.vin_txid = src_txid
except TypeError as ee:
print("A TypeError occurred:", ee)
print("vout_data:", vout_data)
raise
except KeyError as e:
print("A KeyError occurred:", e)
print("vout_data:", vout_data)
# Handle the exception or re-raise it
raise
def __str__(self):
return (f"Output(Address: {self.scriptpubkey_address},"
f" sats: {self.value}"
)
def timestamp_string():
right_now = int(datetime.datetime.now().timestamp())
datetimeobj = datetime.datetime.fromtimestamp(right_now)
return datetimeobj.strftime('%H:%M:%S')
def fetch_transaction_json(my_url):
while True:
response = requests.get(my_url)
if response.status_code == 200:
# Success, return the JSON data
return response.json()
elif response.status_code == 429:
global TOO_MANY_REQUESTS_COUNT
TOO_MANY_REQUESTS_COUNT += 1
if TOO_MANY_REQUESTS_COUNT >= EXIT_AFTER_TOO_MANY_REQUESTS_COUNT:
print(f"{timestamp_string()} -- Repeated 429 errors")
exit(1)
# 429 Too Many Requests - wait for 120 seconds before retrying
if SLEEP_BETWEEN_TOO_MANY_REQUESTS:
print(f"{timestamp_string()} -- 429 -- sleeping 2 minutes")
time.sleep(SLEEP_BETWEEN_TOO_MANY_REQUESTS)
else:
# Other errors - print the error and return None
print(f"Failed to fetch data: {response.status_code}")
return None
def update_research(research_tx: Research):
my_url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/tx/{research_tx.vout_txid}"
while True:
response = requests.get(my_url)
if response.status_code == 200:
# Success, return the JSON data
txjs = response.json()
if "status" in txjs:
status = txjs["status"]
if status["confirmed"]:
research_tx.vout_blockheight = status["block_height"]
research_tx.vout_pool_name = get_pool_name(research_tx.vout_blockheight)
one_tx = Transaction(txjs)
research_tx.transactions = [one_tx]
research_tx.overpaid = one_tx.overpaid
return True
# unconfirmed TX or unexpected JSON
print(f"-- unconfirmed TX or unexpected JSON", research_tx.vout_txid)
return None
elif response.status_code == 429:
global TOO_MANY_REQUESTS_COUNT
TOO_MANY_REQUESTS_COUNT += 1
if TOO_MANY_REQUESTS_COUNT >= EXIT_AFTER_TOO_MANY_REQUESTS_COUNT:
print(f"{timestamp_string()} -- Repeated 429 errors")
exit(1)
if SLEEP_BETWEEN_TOO_MANY_REQUESTS:
# 429 Too Many Requests - wait for 120 seconds before retrying
print(f"{timestamp_string()} -- 429 -- sleeping 2 minutes")
time.sleep(SLEEP_BETWEEN_TOO_MANY_REQUESTS)
else:
# Other errors - print the error and return None
print(f"Failed to fetch data: {response.status_code}")
return None
def fetch_block_hash(block_height):
url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/block-height/{block_height}"
response = requests.get(url)
if response.status_code == 200:
return response.text.strip()
else:
print(f"Error fetching block hash: {response.status_code}")
return None
def fetch_block_details(block_hash):
url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/block/{block_hash}"
response = requests.get(url)
if response.status_code == 200:
return response.json()
else:
print(f"Error fetching block details: {response.status_code}")
return None
def get_pool_name(block_height):
block_hash = fetch_block_hash(block_height)
if block_hash:
block_details = fetch_block_details(block_hash)
if block_details and "extras" in block_details and "pool" in block_details["extras"] and "name" in \
block_details["extras"]["pool"]:
return block_details["extras"]["pool"]["name"]
else:
return "Uknown"
else:
return "UNK"
def find_creation(research_tx: Research):
for tx in research_tx.transactions:
outputs = tx.get_filtered_vout(research_tx.scriptpubkey_address)
for one_out in outputs:
if (one_out.value == research_tx.value
and one_out.scriptpubkey_address == research_tx.scriptpubkey_address):
# this tx is the creation of this UTXO
if not research_tx.vout_pool_name:
# we get in here if this is the very first UTXO
# levels below have this data populated by update_research on the loop before
research_tx.vout_idx = one_out.vout_idx
research_tx.vout_txid = one_out.vin_txid
research_tx.vout_pool_name = get_pool_name(tx.block_height)
research_tx.vout_blockheight = tx.block_height
research_tx.overpaid = tx.overpaid
if research_tx.overpaid:
overpaid_attribution.associate_pool(research_tx.vout_pool_name, tx.block_height)
for one_in in tx.vin:
# assemble all the information we need for the next level evaluation
next_rx = Research(one_in.scriptpubkey_address, one_in.value)
next_rx.is_coinbase = one_in.is_coinbase
next_rx.vout_txid = one_in.txid
next_rx.vout_idx = one_in.vout_idx
if update_research(next_rx):
if next_rx.overpaid:
overpaid_attribution.associate_pool(next_rx.vout_pool_name, next_rx.vout_blockheight)
if next_rx.is_coinbase:
coinbase_attribution.associate_pool(next_rx.vout_pool_name, next_rx.vout_blockheight)
if DEBUGGING:
print(f"\t *** Coinbase Found: {one_in.txid}")
print(coinbase_attribution)
print(f"\t *** *** *** ***")
research_tx.research_above.append(next_rx)
if DEBUGGING:
print(f"TXID: {tx.txid}")
print("\t Block Number:", tx.block_height)
print("\t\t Pool:", research_tx.vout_pool_name)
print("\t\t", one_out)
# print("\t Research Next Level:", research_tx.research_above)
return True
# print(f"Failed to find creation for {research_tx}")
return False
def find_spends(research_tx: Research):
for tx in research_tx.transactions:
inputs = tx.get_filtered_vin(research_tx.scriptpubkey_address)
for one_in in inputs:
if (one_in.value == research_tx.value
and one_in.vout_idx == research_tx.vout_idx
and one_in.txid == research_tx.vout_txid):
# this is them spending the UTXO we're researching
next_rx = Research(one_in.scriptpubkey_address, one_in.value)
# next_rx.is_coinbase = one_in.is_coinbase # cannot be coinbase
next_rx.vout_txid = one_in.txid
next_rx.vout_idx = one_in.vout_idx
if update_research(next_rx):
spending_attribution.associate_pool(next_rx.vout_pool_name, next_rx.vout_blockheight)
research_tx.research_below.append(next_rx)
if DEBUGGING:
print(f"TXID: {tx.txid}")
print("\t Block Number:", tx.block_height)
print("\t VIN Info:", one_in)
return
else:
# this is the spending of sats on this address, but not the UTXO we're researching
print("\t VIN Info?", one_in)
print(f"Failed to find spend for for {research_tx}")
def start_research(research_tx: Research):
url = f"{MEMPOOL_PROTOCOL}://{MEMPOOL_HOST}/api/address/{research_tx.scriptpubkey_address}/txs" # /chain
if DEBUGGING:
print(f"-- Researching: {research_tx}")
bucket_number = 0
next_bucket_url = url
research_tx.transactions = []
while True:
tx_json = fetch_transaction_json(next_bucket_url)
if tx_json is None:
if bucket_number == 0:
print(f"No transactions found for {research_tx}")
else:
print(f"Failed to find creation for {research_tx}")
return
new_transactions = [Transaction(tx) for tx in tx_json]
research_tx.transactions += new_transactions
found = find_creation(research_tx)
if found:
break
if SLEEP_BETWEEN_BUCKETS:
time.sleep(SLEEP_BETWEEN_BUCKETS)
last_tx = new_transactions[-1]
next_bucket_url = url + f"?after_txid={last_tx.txid}"
bucket_number += 1
# print(f"-- Researching: {research_tx} bucket # {bucket_number}")
level_index = 0
seed_transaction = "bc1p27rngum66yxampv8ph8sk47kpr30anzu9fggn0pl5gtsm3lm35vsnj39j3"
seed_utxo_size = 58452
start_rx = Research(seed_transaction, seed_utxo_size)
start_research(start_rx)
if RESEARCH_SPENDS and start_rx.overpaid:
spend_rx = copy.deepcopy(start_rx)
spend_rx.research_above = []
next_level = [spend_rx]
while len(next_level):
below_level = []
print(f"{timestamp_string()} -- Spending Level #{level_index}: {len(next_level)} addresses")
for one_rx in next_level:
find_spends(one_rx)
below_level += one_rx.research_below
unique_vout = {}
for one_utxo in below_level:
# no reason to include non-overpaid tx
if one_utxo.overpaid:
key = one_utxo.vout_key()
unique_vout[key] = one_utxo
next_level = list(unique_vout.values())
print("Overpaid Attributions:")
print(spending_attribution)
print(f"***************************************")
next_level = start_rx.research_above
print(f"***************************************")
while len(next_level) > 0:
level_index += 1
print(f"{timestamp_string()} -- Level #{level_index}: {len(next_level)} addresses")
level_above = []
for one_item in next_level:
# sleep 1 minute between levels
if SLEEP_BETWEEN_LEVELS:
time.sleep(SLEEP_BETWEEN_LEVELS)
# presumably the item already has its creation tx from processing the level above
find_creation(one_item)
level_above += one_item.research_above
# unique all the inputs
unique_vout = {}
for one_utxo in level_above:
key = one_utxo.vout_key()
unique_vout[key] = one_utxo
next_level = list(unique_vout.values())
print("Overpaid Attributions:")
print(overpaid_attribution)
if not coinbase_attribution.is_empty():
print("Coinbase Attributions:")
print(coinbase_attribution)
print(f"***************************************")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment