Skip to content

Instantly share code, notes, and snippets.

@st1vms
Last active September 12, 2025 14:44
Show Gist options
  • Save st1vms/f99a364bafe37c686578b2a79962a616 to your computer and use it in GitHub Desktop.
Save st1vms/f99a364bafe37c686578b2a79962a616 to your computer and use it in GitHub Desktop.
Simple Binance Scanner
Simple Binance Scanner
import traceback
from dataclasses import dataclass
from multiprocessing import Process, Lock, Manager
from binance.client import Client
from pandas import DataFrame
_BINANCE_API_HISTORICAL_KLINE_COLUMNS = [
"time",
"open",
"high",
"low",
"close",
"volume",
"close_time",
"qav",
"trades",
"tb_base",
"tb_quote",
"ignore",
]
@dataclass
class ScanResult:
symbol: str
price: float
change_perc: float
current_volume: float
avg_volume: float
@dataclass
class ScanConfiguration:
QUOTE_ASSET = "USDC"
MIN_PRICE_CHANGE_PERCENTAGE = 10
MIN_PRICE = 0.0001
MAX_PRICE = 2
AVG_VOLUME_DAY_RANGE = 60
CURRENT_TO_AVG_VOLUME_RATIO = 2
class BinanceScanner:
def __init__(
self,
binance_api_key: str,
binance_api_secret: str,
config: ScanConfiguration = ScanConfiguration(),
):
self.api_key = binance_api_key
self.api_secret = binance_api_secret
self.config = config
def get_symbols(self, client: Client, quote_asset: str):
"""Retrieve all symbols pairing with `quote_asset`"""
exchange_info = client.get_exchange_info()
return [
s["symbol"]
for s in exchange_info["symbols"]
if s["quoteAsset"] == quote_asset
]
def check_scan_results(self, scan_results: ScanResult) -> bool:
"""Formula used to generate a signal"""
return (
scan_results.change_perc >= self.config.MIN_PRICE_CHANGE_PERCENTAGE
and self.config.MIN_PRICE <= scan_results.price <= self.config.MAX_PRICE
and scan_results.current_volume
>= self.config.CURRENT_TO_AVG_VOLUME_RATIO * scan_results.avg_volume
)
def scan_symbol(self, client: Client, symbol: str) -> ScanResult:
"""Scan a symbol to retrieve ScanResult data"""
ticker = client.get_ticker(symbol=symbol)
price = float(ticker["lastPrice"])
change = float(ticker["priceChangePercent"])
volume_now = float(ticker["quoteVolume"])
# Calculate the average volume based on the past days
klines = client.get_historical_klines(
symbol,
Client.KLINE_INTERVAL_1DAY,
f"{self.config.AVG_VOLUME_DAY_RANGE} day ago UTC",
)
df = DataFrame(
klines,
columns=_BINANCE_API_HISTORICAL_KLINE_COLUMNS,
)
# Use quote asset volume to calculate average volume
df["qav"] = df["qav"].astype(float)
avg_volume = df["qav"].mean()
return ScanResult(symbol, price, change, volume_now, avg_volume)
def _task_input_batcher(self, input_list: list, batch_size: int) -> list[list]:
"""Batches a list into evenly sized lists"""
n = len(input_list)
block_size, remainder = divmod(n, batch_size)
blocks = []
start = 0
for i in range(batch_size):
end = start + block_size + (1 if i < remainder else 0)
blocks.append(input_list[start:end])
start = end
return blocks
def _scan_pool_task(
self, symbols: list[str], shared_results: list[ScanResult], process_lock
) -> None:
client = Client(self.api_key, self.api_secret)
results = self._scan_task(client, symbols)
with process_lock:
# Extend managed list
shared_results.extend(results)
def _scan_task(self, client: Client, symbols: list[str]) -> list[ScanResult]:
results = []
for symbol in symbols:
try:
result = self.scan_symbol(client, symbol)
except Exception:
traceback.print_exc()
continue
if self.check_scan_results(result):
results.append(result)
return results
def scan(
self, n_procs: int = 1, output_csv_path: str = None
) -> list[ScanResult] | None:
# Retrieve all symbols pairing with configured quote asset
client = Client(self.api_key, self.api_secret)
symbols = self.get_symbols(client, self.config.QUOTE_ASSET)
if n_procs <= 1:
# Use only one process
return self._scan_task(client, symbols)
# Use a process pool
manager = Manager()
results = manager.list()
lock = Lock()
# Batch input for each process
batches = self._task_input_batcher(symbols, n_procs)
procs = []
for symbol_batch in batches:
p = Process(target=self._scan_pool_task, args=(symbol_batch, results, lock))
procs.append(p)
p.start()
for p in procs:
p.join()
if output_csv_path is not None and results:
# Save into csv
DataFrame(list(results)).to_csv(output_csv_path, index=False)
return results
if __name__ == "__main__":
# Test run
# Create an auth.py file with these two string variables
from auth import API_KEY, API_SECRET
OUTPUT_FILE = "scan.csv"
# Default scan configuration
CONFIG = ScanConfiguration()
try:
N_PROCS = int(input("\nHow many processes to use for scan?\n(Default 1)>>"))
if N_PROCS <= 0:
N_PROCS = 1
except ValueError:
N_PROCS = 1
print(f"\nRunning scan...")
RESULTS = BinanceScanner(API_KEY, API_SECRET, config=CONFIG).scan(
n_procs=N_PROCS, output_csv_path="scan.csv"
)
print(f"\nScan finished, saved {len(RESULTS)} results into scan.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment