Skip to content

Instantly share code, notes, and snippets.

@naviat
Last active August 29, 2025 01:14
Show Gist options
  • Save naviat/20db761cd6bd064223ea81177f0cf134 to your computer and use it in GitHub Desktop.
Save naviat/20db761cd6bd064223ea81177f0cf134 to your computer and use it in GitHub Desktop.
Debug degradation connection, no tx received when replace account in the same filter name.
use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use std::path::Path;
use tokio::time::sleep;
use futures::SinkExt;
use yellowstone_grpc_client::{GeyserGrpcClient, Interceptor};
use yellowstone_grpc_proto::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize)]
struct Config {
endpoint: String,
x_token: String,
}
#[derive(Clone)]
struct TestConfig {
endpoint: String,
x_token: Option<String>,
all_accounts: Vec<String>,
}
#[derive(Debug)]
struct TransactionStats {
total_transactions: u32,
new_account_transactions: u32,
original_account_transactions: u32,
unique_slots: std::collections::HashSet<u64>,
first_transaction_time: Option<SystemTime>,
last_transaction_time: Option<SystemTime>,
test_duration: Duration,
replacements_performed: u32,
filter_approach: String,
accounts_replaced: u32,
}
fn load_config() -> Result<Config, Box<dyn std::error::Error>> {
// Try to load from .env file
if Path::new(".env").exists() {
println!("πŸ“‚ Loading configuration from .env file");
dotenv::dotenv().ok();
let endpoint = std::env::var("ENDPOINT")
.or_else(|_| std::env::var("GRPC_ENDPOINT"))?;
let x_token = std::env::var("X_TOKEN")
.or_else(|_| std::env::var("AUTH_TOKEN"))?;
return Ok(Config { endpoint, x_token });
}
// Try environment variables without .env file
if let (Ok(endpoint), Ok(x_token)) = (
std::env::var("ENDPOINT").or_else(|_| std::env::var("GRPC_ENDPOINT")),
std::env::var("X_TOKEN").or_else(|_| std::env::var("AUTH_TOKEN"))
) {
println!("πŸ“‚ Loading configuration from environment variables");
return Ok(Config { endpoint, x_token });
}
Err("Configuration not found. Please provide either .env file, config.yml, or environment variables (ENDPOINT and X_TOKEN)".into())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Parse command line arguments
let args: Vec<String> = std::env::args().collect();
let test_mode = if args.len() > 1 {
match args[1].as_str() {
"2" | "test2" | "working" => "test2",
_ => "test1" // Default to broken test for any other input
}
} else {
"test1" // Default to broken test when no args
};
// Load configuration from .env, config.yml, or environment variables
let loaded_config = load_config()?;
let config = TestConfig {
endpoint: loaded_config.endpoint,
x_token: Some(loaded_config.x_token),
// 50 high-activity Solana accounts for realistic testing
all_accounts: vec![
// Popular DEX and DeFi accounts with high transaction volume
"JUP4Fb2cqiRUcaTHdrPC8h2gNsA2ETXiPDD33WcGuJB".to_string(), // Jupiter
"PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY".to_string(), // Phoenix
"CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK".to_string(), // Raydium CAMM
"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8".to_string(), // Raydium AMM
"9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM".to_string(), // Orca
"whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc".to_string(), // Orca Whirlpool
"SSwpkEEcbUqx4vtoEByFjSkhKdCT862DNVb52nZg1UZ".to_string(), // Step Finance
"7dHbWXmci3dT8UFYWYZweBLXgycu7Y3iL6trKn1Y7ARj".to_string(), // Lido
"MarBmsSgKXdrN1egZf5sqe1TMai9K1rChYNDJgjq7aD".to_string(), // Marinade
"mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So".to_string(), // Marinade mSOL
// Popular token accounts
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string(), // USDC
"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB".to_string(), // USDT
"So11111111111111111111111111111111111111112".to_string(), // Wrapped SOL
"4k3Dyjzvzp8eMZWUXbBCjEvwSkkk59S5iCNLY3QrkX6R".to_string(), // Raydium RAY
"SRMuApVNdxXokk5GT7XD5cUUgXMBCoAz2LHeuAoKWRt".to_string(), // Serum SRM
// More popular token accounts and wallets
"7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs".to_string(), // Ethereum Wormhole
"2FPyTwcZLUg1MDrwsyoP4D6s1tM7hAkHYRjkNb5w6Pxk".to_string(), // Solana Foundation
"GDDMwNyyx8uB6zrqwBFHjLLG3TBYk2F8Az4yrQC5RzMp".to_string(), // Large wallet
"5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1".to_string(), // Active trader
"BJ3jrUzddfuSrZHXSCxMbkKyRxJDN5n7m1DQSQQ1fyZZ".to_string(), // Popular DEX user
"7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU".to_string(), // DeFi protocol
"9n4nbM75f5Ui33ZbPYXn59EwSgE8CGsHtAeTH5YFeJ9E".to_string(), // Bridge program
"Saber2gLauYim4Mvftnrasomsv6NvAuncvMEZwcLpD1".to_string(), // Saber
"CURVGoZn8zycx6FXwwevgBTB2gVvdbGTEpvMJDbgs2t4".to_string(), // Curve
"port7gfFQRPdASFEwCMcjqWMJ9AMLKthtJWZkXWEe5L".to_string(), // Port Finance
"SLNDpmoWTVADgEdndyvWzroNL7zSi1dF9PC3xHGtPwp".to_string(), // Solend
"MERLuDFBMmsHnsBPZw2sDQZHvXFMwp8EdjudcU2HKky".to_string(), // Mercurial
"FarmuwXPWXvefWUeqFAa5w6rifLkq5X6E8bimYvrhCVx".to_string(), // Tulip
"6LtLpnUFNByNXLyCoK9wA2MykKAmQNZKBdY8s47dehDc".to_string(), // Solrise
"B7bX8HqWGgEh4kR9UQqZnkFzrw85YFRyv1REAaYBUjZe".to_string(), // Quarry
"QMNeHCGYnLVDn1icRAfQZpjPLBNkfGbSKRB83G5d8KB".to_string(), // Quarry Mine
"BrTJjJCTL2aFKhtCNLrz9NvdWtcV7YdQJuGZLjSAYREr".to_string(), // Bonfida
"namesLPneVptA9Z5rqUDD9tMTWEJwofgaYwp8cawRkX".to_string(), // Bonfida Names
"E2VmbootbVCBkMNNxKQgCLMS1X3NoGMaYAsufaAsf7M".to_string(), // MonkeDAO
"BonkDkNiDPxG6FeWsXi2QKL8NWzJbvKWdKnKvUpkEGi2".to_string(), // BONK
"DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263".to_string(), // Bonk DEX
"HUBsveNpjo5pWqNkH57QzxjQASdTVXcSK7bVKTSZtcSX".to_string(), // Hubble
"DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1".to_string(), // Popular NFT marketplace
"M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K".to_string(), // Magic Eden
"CJsLwbP1iu5DuUikHEJnLfANgKy6stB2uFgvBBHoyxwz".to_string(), // Solanart
"A7p8451ktDCHq5yYaHczeLMYsjRsAkzc3hCXcSrwYHU7".to_string(), // OpenSea Solana
"BUX7s2ef2htTGb2KKoPHWkmzxPj4nTWMWRgs5CSbQxf9".to_string(), // Solana Beach
"rFqFJ9g7TGBD8Ed7TPDnvGKZ5pWLPDyxLcvcH2eRCtt".to_string(), // StepN
"nosXBVoaCTtYdLvKY6Csb4AC8JCdQKKAaWYtx2ZMoo7".to_string(), // Nosana
"kinXdEcpDQeHPEuQnqmUgtYykqKGVFq6CeVX5iAHJq6".to_string(), // KIN
"SHDWyBxihqiCj6YekG2GUr7wqKLeLAMK1gHZck9pL6y".to_string(), // Shadow Drive
"AVNP8n7vWb3ZwH8YMhp9jdSkC8V7uu44YP1PGU3jUc8".to_string(), // Sollamas
"ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL".to_string(), // Associated Token
"11111111111111111111111111111112".to_string(), // System Program
"ComputeBudget111111111111111111111111111111".to_string(), // Compute Budget
],
};
println!("πŸ§ͺ Solana gRPC Stream Degradation Test");
println!("======================================");
println!("πŸ“Š SCENARIO: 7min stable period, then single replacement");
println!("πŸ•°οΈ Running for 40 minutes total - degradation expected 25-30min AFTER replacement");
println!("πŸ“ˆ Monitoring for data loss when accounts are replaced");
match test_mode {
"test1" => {
println!("πŸ”¬ Running BROKEN approach (same filter name) - reproducing bug\n");
}
"test2" => {
println!("πŸ”¬ Running WORKING approach (unique filter names) - demonstrating fix\n");
}
_ => {}
}
// Run the selected test
match test_mode {
"test1" => {
println!("πŸ”΄ BROKEN APPROACH (Same Filter Name)");
println!("{}", "=".repeat(50));
match run_stream_degradation_test(&config, false).await {
Ok(stats) => {
println!("βœ… Test completed");
print_final_stats(&stats);
}
Err(e) => {
println!("❌ Test failed with error: {}", e);
}
}
}
"test2" => {
println!("🟒 WORKING SOLUTION (Unique Filter Names)");
println!("{}", "=".repeat(50));
match run_stream_degradation_test(&config, true).await {
Ok(stats) => {
println!("βœ… Test completed");
print_final_stats(&stats);
}
Err(e) => {
println!("❌ Test failed with error: {}", e);
}
}
}
_ => {}
}
Ok(())
}
async fn run_stream_degradation_test(config: &TestConfig, use_unique_names: bool) -> Result<TransactionStats, Box<dyn std::error::Error>> {
// Stream 1: First 12 accounts
let stream1_accounts = &config.all_accounts[0..12];
// Stream 2: Next 13 accounts
let stream2_accounts = &config.all_accounts[12..25];
// Replacement pool: Separate accounts not in initial streams
let replacement_accounts = &config.all_accounts[25..50]; // 25 replacement accounts (all remaining)
println!("πŸ“ˆ Setting up 2-stream configuration");
println!(" Stream 1 accounts: {}", stream1_accounts.len());
println!(" Stream 2 accounts: {}", stream2_accounts.len());
println!(" Replacement pool: {}", replacement_accounts.len());
println!(" Approach: {}", if use_unique_names { "Unique filter names (WORKING)" } else { "Same filter names (BROKEN)" });
let mut client = create_client(config).await?;
let (mut sink, mut stream) = client.subscribe().await?;
// Real pattern: Multiple separate streams per gRPC connection (2-3 streams)
// Each stream has up to 25 accounts, they reuse THE SAME stream names but replace some accounts (this causes the bug)
let mut transactions = HashMap::new();
// Stream 1: Up to 25 accounts
transactions.insert(
"stream_1".to_string(), // Stream name that will be reused
SubscribeRequestFilterTransactions {
account_include: stream1_accounts.to_vec(), // 12 accounts in stream 1
account_exclude: vec![],
account_required: vec![],
vote: Some(false),
failed: Some(false), // Include failed transactions for testing
signature: None,
},
);
// Stream 2: Up to 25 accounts
transactions.insert(
"stream_2".to_string(), // Stream name that will be reused
SubscribeRequestFilterTransactions {
account_include: stream2_accounts.to_vec(), // 13 accounts in stream 2
account_exclude: vec![],
account_required: vec![],
vote: Some(false),
failed: Some(false), // Include failed transactions for testing
signature: None,
},
);
let initial_request = SubscribeRequest {
accounts: HashMap::new(),
slots: HashMap::new(),
transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
entry: HashMap::new(),
accounts_data_slice: vec![],
commitment: Some(CommitmentLevel::Confirmed as i32),
ping: None, // PING will be sent separately every 30s
from_slot: None,
};
println!("πŸ”§ DEBUG: Sending initial request with 2 streams:");
println!(" πŸ“‹ Stream 'stream_1': {} accounts", stream1_accounts.len());
println!(" πŸ“‹ Stream 'stream_2': {} accounts", stream2_accounts.len());
println!(" πŸ“ PING keep-alive will be sent every 30 seconds separately");
// Debug: Show first few accounts being monitored per stream
println!(" πŸ” Stream 1 first 3 accounts:");
for (i, account) in stream1_accounts.iter().take(3).enumerate() {
println!(" [{}] {}", i, account);
}
println!(" πŸ” Stream 2 first 3 accounts:");
for (i, account) in stream2_accounts.iter().take(3).enumerate() {
println!(" [{}] {}", i, account);
}
sink.send(initial_request).await?;
println!("βœ… Initial setup: 2 streams with {} total accounts", stream1_accounts.len() + stream2_accounts.len());
println!(" πŸ“Š Real pattern: Same stream names reused with updated account lists");
// Monitor with 2-stream account replacement
let stats = monitor_with_2stream_replacement(
&mut sink,
&mut stream,
stream1_accounts,
stream2_accounts,
replacement_accounts,
40 * 60, // 40 minutes total - wait longer for degradation
7 * 60, // Single replacement at 7 minutes
use_unique_names,
).await?;
Ok(stats)
}
async fn create_client(config: &TestConfig) -> Result<GeyserGrpcClient<impl Interceptor>, Box<dyn std::error::Error>> {
println!("πŸ”Œ Connecting to: {}", config.endpoint);
let mut client_builder = GeyserGrpcClient::build_from_shared(config.endpoint.clone())?;
if let Some(token) = &config.x_token {
println!("πŸ”‘ Using authentication token");
client_builder = client_builder.x_token(Some(token.clone()))?;
}
// Enable TLS with native root certificates
let tls_config = tonic::transport::ClientTlsConfig::new()
.with_native_roots();
client_builder = client_builder.tls_config(tls_config)?;
let client = client_builder
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(30))
.connect()
.await
.map_err(|e| {
eprintln!("❌ Connection failed: {:?}", e);
e
})?;
println!("βœ… Connected successfully");
Ok(client)
}
async fn monitor_with_2stream_replacement(
sink: &mut (impl futures::Sink<SubscribeRequest, Error = futures::channel::mpsc::SendError> + Unpin),
stream: &mut (impl futures::Stream<Item = Result<SubscribeUpdate, tonic::Status>> + Unpin),
stream1_accounts: &[String],
stream2_accounts: &[String],
replacement_accounts: &[String],
total_duration_secs: u64,
replacement_time: u64,
use_unique_names: bool,
) -> Result<TransactionStats, Box<dyn std::error::Error>> {
use futures::{StreamExt, SinkExt};
let mut stats = TransactionStats {
total_transactions: 0,
new_account_transactions: 0,
original_account_transactions: 0,
unique_slots: std::collections::HashSet::new(),
first_transaction_time: None,
last_transaction_time: None,
test_duration: Duration::from_secs(total_duration_secs),
replacements_performed: 0,
filter_approach: if use_unique_names { "Unique Names".to_string() } else { "Same Name".to_string() },
accounts_replaced: 0,
};
let start_time = SystemTime::now();
let mut current_stream1_accounts = stream1_accounts.to_vec();
let mut current_stream2_accounts = stream2_accounts.to_vec();
let mut replacement_done = false;
let mut last_tx_for_replaced_accounts = start_time;
let mut replaced_account_addresses = std::collections::HashSet::new(); // Track actual replaced account addresses
let mut original_account_addresses = std::collections::HashSet::new(); // Track original account addresses
let mut replaced_account_tx_count = HashMap::new(); // Track transactions per replaced account
let mut replaced_account_last_tx = HashMap::new(); // Track last transaction time per replaced account
let mut degraded_accounts = std::collections::HashSet::new(); // Track accounts confirmed as degraded
// Store original account addresses for categorization from both streams
for account in stream1_accounts {
original_account_addresses.insert(account.clone());
}
for account in stream2_accounts {
original_account_addresses.insert(account.clone());
}
// Create combined account list for easier transaction matching
let mut all_current_accounts = current_stream1_accounts.clone();
all_current_accounts.extend(current_stream2_accounts.clone());
// Create separate timer channels for replacements and keep-alive
let (timer_tx, mut timer_rx) = tokio::sync::mpsc::unbounded_channel::<String>();
let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
// Spawn background timer task for test events
let timer_start = start_time;
tokio::spawn(async move {
let mut last_report = timer_start;
loop {
sleep(Duration::from_secs(5)).await; // Check every 5 seconds
let elapsed = timer_start.elapsed().unwrap().as_secs();
// Send periodic updates
if last_report.elapsed().unwrap().as_secs() >= 10 {
let _ = timer_tx.send(format!("report:{}", elapsed));
last_report = SystemTime::now();
}
// Check for replacement time
if elapsed >= replacement_time {
let _ = timer_tx.send("replace1".to_string());
}
if elapsed >= total_duration_secs {
let _ = timer_tx.send("done".to_string());
break;
}
}
});
// Spawn background PING keep-alive task
tokio::spawn(async move {
let mut _ping_counter = 1;
loop {
sleep(Duration::from_secs(30)).await; // Send PING every 30 seconds
let _ = ping_tx.send(());
_ping_counter += 1;
}
});
println!("πŸ‘€ Monitoring transactions on {} total accounts across 2 streams...", all_current_accounts.len());
println!("πŸ“Š Pattern: Single replacement at {}min, then monitor for degradation",
replacement_time / 60);
println!("πŸ”¬ SCENARIO: Degradation should occur 25-30 minutes AFTER replacement!");
println!("πŸ”§ Debug: replacement_time={}s, total_duration={}s",
replacement_time, total_duration_secs);
// Debug: Show first few monitored accounts per stream
println!("πŸ” Stream 1 first 3 accounts:");
for (i, account) in current_stream1_accounts.iter().take(3).enumerate() {
println!(" [{}] {}", i, &account[0..8]);
}
println!("πŸ” Stream 2 first 3 accounts:");
for (i, account) in current_stream2_accounts.iter().take(3).enumerate() {
println!(" [{}] {}", i, &account[0..8]);
}
println!();
loop {
tokio::select! {
Some(message) = stream.next() => {
match message {
Ok(update) => {
// Debug: Show all update types we receive
match &update.update_oneof {
Some(subscribe_update::UpdateOneof::Pong(pong_response)) => {
println!(" πŸ“ Received PONG response: id={}", pong_response.id);
continue;
}
Some(subscribe_update::UpdateOneof::Transaction(_)) => {
// Continue with transaction processing below
}
Some(subscribe_update::UpdateOneof::Slot(slot_update)) => {
if stats.total_transactions == 0 {
println!(" πŸ“Š Received slot update: {}", slot_update.slot);
}
continue;
}
Some(other_update) => {
if stats.total_transactions == 0 {
println!(" πŸ“¨ Received other update type: {:?}", std::mem::discriminant(other_update));
}
continue;
}
None => {
if stats.total_transactions == 0 {
println!(" ⚠️ Received update with no content");
}
continue;
}
}
if let Some(subscribe_update::UpdateOneof::Transaction(tx_update)) = &update.update_oneof {
stats.total_transactions += 1;
let now = SystemTime::now();
if stats.first_transaction_time.is_none() {
stats.first_transaction_time = Some(now);
}
stats.last_transaction_time = Some(now);
stats.unique_slots.insert(tx_update.slot);
if let Some(transaction_info) = &tx_update.transaction {
// Parse the actual transaction to see which accounts are involved
let mut involves_original_accounts = false;
let mut involves_replaced_accounts = false;
// Access the transaction data within SubscribeUpdateTransactionInfo
if let Some(transaction) = &transaction_info.transaction {
if let Some(message) = &transaction.message {
// Debug: Show first few transactions in detail
if stats.total_transactions <= 3 {
println!(" πŸ” Debug tx #{}: {} account keys",
stats.total_transactions, message.account_keys.len());
}
// Check each account in the transaction
for account_key in &message.account_keys {
let account_str = bs58::encode(account_key).into_string();
// Debug: Show first few account comparisons
if stats.total_transactions <= 3 {
println!(" Checking: {}", &account_str[0..8]);
}
// Check if this account is in our monitored set
if all_current_accounts.contains(&account_str) {
if stats.total_transactions <= 10 {
println!(" βœ… MATCH found: {}", &account_str[0..8]);
}
// Check if this is a replaced account or original account
if replaced_account_addresses.contains(&account_str) {
involves_replaced_accounts = true;
// Track which replaced account is active
let count = replaced_account_tx_count.entry(account_str.clone()).or_insert(0);
*count += 1;
// Update last transaction time for this replaced account
replaced_account_last_tx.insert(account_str.clone(), now);
// Debug: Show when we find replaced account transactions
if stats.new_account_transactions < 10 || stats.new_account_transactions % 50 == 0 {
println!(" 🎯 REPLACED ACCOUNT TX #{}: {} (count: {})",
stats.new_account_transactions + 1, &account_str[0..8], count);
}
} else if original_account_addresses.contains(&account_str) {
involves_original_accounts = true;
}
break;
}
}
}
}
// Categorize the transaction based on which accounts are involved
if involves_replaced_accounts {
stats.new_account_transactions += 1;
last_tx_for_replaced_accounts = now;
let minutes_since_replacement = if replacement_done {
(now.duration_since(start_time).unwrap().as_secs() - replacement_time) / 60
} else {
0
};
// Debug: Show we found a transaction from replaced accounts with timing
if stats.new_account_transactions % 50 == 1 {
println!(" 🎯 REPLACED ACCOUNT TX #{}: {}min after replacement (slot: {})",
stats.new_account_transactions, minutes_since_replacement, tx_update.slot);
}
// Special debug for the critical period when degradation should occur
if replacement_done && minutes_since_replacement >= 25 && minutes_since_replacement <= 30 {
if stats.new_account_transactions % 10 == 1 {
println!(" 🚨 CRITICAL PERIOD: Replaced account still receiving tx #{} at {}min post-replacement!",
stats.new_account_transactions, minutes_since_replacement);
}
}
} else if involves_original_accounts {
stats.original_account_transactions += 1;
} else {
// Transaction doesn't involve any of our monitored accounts
// This shouldn't happen if the filter is working correctly
if stats.total_transactions % 10000 == 1 {
println!(" ⚠️ Warning: Transaction doesn't involve monitored accounts (slot: {}) - {} total so far",
tx_update.slot, stats.total_transactions);
}
}
}
}
}
Err(e) => {
eprintln!("❌ Stream error: {}", e);
}
}
}
Some(_) = ping_rx.recv() => {
// Send PING keep-alive every 30 seconds
let ping_request = SubscribeRequest {
accounts: HashMap::new(),
slots: HashMap::new(),
transactions: HashMap::new(),
transactions_status: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
entry: HashMap::new(),
accounts_data_slice: vec![],
commitment: None,
ping: Some(SubscribeRequestPing { id: 1 }),
from_slot: None,
};
match sink.send(ping_request).await {
Ok(_) => {
println!(" πŸ“ Sent PING keep-alive");
}
Err(e) => {
eprintln!(" ❌ Failed to send PING: {}", e);
}
}
}
Some(timer_msg) = timer_rx.recv() => {
let elapsed_secs = start_time.elapsed().unwrap().as_secs();
match timer_msg.as_str() {
"done" => {
println!("⏰ Timer: Test duration completed, exiting...");
break;
}
msg if msg.starts_with("report:") => {
let elapsed_mins = elapsed_secs / 60;
if elapsed_mins < 7 {
println!("🟒 {}min: BASELINE PHASE - {} total tx", elapsed_mins, stats.total_transactions);
} else if elapsed_mins < 17 {
println!("🟑 {}min: POST-UPDATE MONITORING - {} total tx (original: {}, replaced: {})",
elapsed_mins, stats.total_transactions,
stats.original_account_transactions, stats.new_account_transactions);
} else {
let mins_since_replacement = if replacement_done {
elapsed_mins - (replacement_time / 60)
} else {
0
};
// Check for 10-minute silence detection (start checking at 17min = 10min post-replacement)
let mut newly_degraded = Vec::new();
let mut active_replaced = 0;
let mut silent_replaced = 0;
for (account, last_tx_time) in &replaced_account_last_tx {
let silence_duration = last_tx_time.elapsed().unwrap().as_secs();
if silence_duration >= 600 { // 10 minutes = 600 seconds
if !degraded_accounts.contains(account) {
newly_degraded.push((account.clone(), silence_duration));
degraded_accounts.insert(account.clone());
}
silent_replaced += 1;
} else {
active_replaced += 1;
}
}
// Count replaced accounts that haven't had any transactions yet
let never_seen = replaced_account_addresses.len() - replaced_account_last_tx.len();
if never_seen > 0 {
silent_replaced += never_seen;
}
println!("πŸ”΄ {}min: DEGRADATION WATCH ({}min since replacement) - {} total tx",
elapsed_mins, mins_since_replacement, stats.total_transactions);
println!(" πŸ“Š Replaced accounts: {} active, {} silent", active_replaced, silent_replaced);
// Report newly degraded accounts
for (account, silence_secs) in &newly_degraded {
println!(" 🚨 DEGRADATION DETECTED: {} silent for {}min {}s",
&account[0..8], silence_secs / 60, silence_secs % 60);
}
// Overall degradation status
if !degraded_accounts.is_empty() {
println!(" 🎯 BUG CONFIRMED: {}/{} replaced accounts showing 10min+ silence!",
degraded_accounts.len(), replaced_account_addresses.len());
println!(" πŸ”§ Same filter name reuse causes account data loss!");
}
}
}
"replace1" if !replacement_done => {
replacement_done = true;
stats.replacements_performed += 1;
println!("\nπŸ”„ 2-STREAM UPDATE at {}min ({}s)", elapsed_secs / 60, elapsed_secs);
println!(" πŸ“‹ Multi-stream pattern: Replace accounts in both streams simultaneously");
println!(" πŸ”„ Updating both streams with new account lists (approach: {})",
if use_unique_names { "UNIQUE filter names" } else { "SAME filter names" });
println!(" ⚠️ After this update, degradation expected 25-30min later (at 32-37min mark)");
// Replace 4 accounts in Stream 1 (indices 2-5) with replacement accounts 0-3
println!(" πŸ”§ Stream 1: Replacing accounts at indices 2-5:");
for i in 2..6 {
if i < current_stream1_accounts.len() && (i-2) < replacement_accounts.len() {
let old_account = current_stream1_accounts[i].clone();
let new_account = replacement_accounts[i-2].clone();
current_stream1_accounts[i] = new_account.clone();
// Track the actual replacement account addresses
replaced_account_addresses.insert(new_account.clone());
stats.accounts_replaced += 1;
println!(" Stream1[{}]: {} -> {}", i, &old_account[0..8], &new_account[0..8]);
}
}
// Replace 5 accounts in Stream 2 (indices 3-7) with replacement accounts 4-8
println!(" πŸ”§ Stream 2: Replacing accounts at indices 3-7:");
for i in 3..8 {
if i < current_stream2_accounts.len() && (i-3+4) < replacement_accounts.len() {
let old_account = current_stream2_accounts[i].clone();
let new_account = replacement_accounts[i-3+4].clone();
current_stream2_accounts[i] = new_account.clone();
// Track the actual replacement account addresses
replaced_account_addresses.insert(new_account.clone());
stats.accounts_replaced += 1;
println!(" Stream2[{}]: {} -> {}", i, &old_account[0..8], &new_account[0..8]);
}
}
println!(" βœ… Replaced {} accounts total across 2 streams", stats.accounts_replaced);
// Update the combined account list
all_current_accounts = current_stream1_accounts.clone();
all_current_accounts.extend(current_stream2_accounts.clone());
println!(" πŸ”§ DEBUG: Updated account lists:");
println!(" πŸ“‹ Stream 1: {}",
current_stream1_accounts.iter().map(|a| &a[0..8]).collect::<Vec<_>>().join(", "));
println!(" πŸ“‹ Stream 2: {}",
current_stream2_accounts.iter().map(|a| &a[0..8]).collect::<Vec<_>>().join(", "));
// THE CRITICAL BUG: Reuse same stream names vs unique names
let (stream1_filter_name, stream2_filter_name) = if use_unique_names {
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
(format!("stream_1_{}", timestamp), format!("stream_2_{}", timestamp)) // UNIQUE names (WORKING)
} else {
("stream_1".to_string(), "stream_2".to_string()) // SAME names (BROKEN - this is the bug!)
};
println!(" πŸ”§ DEBUG: Stream naming strategy:");
println!(" 🏷️ Stream 1 name: '{}'", stream1_filter_name);
println!(" 🏷️ Stream 2 name: '{}'", stream2_filter_name);
if !use_unique_names {
println!(" ⚠️ CRITICAL: Reusing SAME stream names - this should trigger the bug!");
println!(" πŸ”§ Real pattern: Same stream names with updated account lists");
} else {
println!(" βœ… Using unique stream names - this should prevent the bug");
}
// Real pattern: Update same stream names with modified account lists
let mut transactions = HashMap::new();
// Stream 1 update
transactions.insert(
stream1_filter_name,
SubscribeRequestFilterTransactions {
account_include: current_stream1_accounts.clone(), // Stream 1 with some replaced
account_exclude: vec![],
account_required: vec![],
vote: Some(false),
failed: Some(false),
signature: None,
},
);
// Stream 2 update
transactions.insert(
stream2_filter_name,
SubscribeRequestFilterTransactions {
account_include: current_stream2_accounts.clone(), // Stream 2 with some replaced
account_exclude: vec![],
account_required: vec![],
vote: Some(false),
failed: Some(false),
signature: None,
},
);
let update_request = SubscribeRequest {
accounts: HashMap::new(),
slots: HashMap::new(),
transactions,
transactions_status: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
entry: HashMap::new(),
accounts_data_slice: vec![],
commitment: Some(CommitmentLevel::Confirmed as i32),
ping: None, // PING will be sent separately every 30s
from_slot: None,
};
println!(" πŸ“‘ DEBUG: Sending filter update to server...");
match sink.send(update_request).await {
Err(e) => {
eprintln!(" ❌ Failed to send first replacement: {}", e);
}
Ok(_) => {
println!(" βœ… 2-Stream filter update sent successfully!");
println!(" πŸ”§ Server received updates for both stream filters");
println!(" πŸ”§ Filter approach: {} filter names", if use_unique_names { "UNIQUE" } else { "SAME" });
println!(" ⏰ TIMELINE: If using SAME names, degradation expected at 32-37min total");
println!(" πŸ“Š Now monitoring for transactions from replaced accounts in both streams...\n");
}
}
}
"replace2" => {
// Skip second replacement - we only do one replacement now
println!(" ℹ️ No second replacement - single replacement strategy");
}
_ => {} // Ignore other messages
}
}
}
}
println!("🏁 Single replacement scenario test completed after {} minutes", total_duration_secs / 60);
println!("πŸ“Š FINAL TIMELINE: {}min stable + {}min post-replacement monitoring = {}min total",
replacement_time / 60,
(total_duration_secs - replacement_time) / 60,
total_duration_secs / 60);
if replacement_done {
let _final_time_since_replaced_tx = last_tx_for_replaced_accounts.elapsed().unwrap().as_secs();
let total_minutes_since_replacement = (start_time.elapsed().unwrap().as_secs() - replacement_time) / 60;
println!("\nπŸ“Š FINAL DEGRADATION ANALYSIS:");
println!(" πŸ•°οΈ Total time since replacement: {} minutes", total_minutes_since_replacement);
println!(" πŸ“Š Total replaced account transactions received: {}", stats.new_account_transactions);
// Analyze degradation using 10-minute silence detection
let mut final_degraded = std::collections::HashSet::new();
let mut final_active = 0;
let mut final_silent = 0;
for (account, last_tx_time) in &replaced_account_last_tx {
let silence_duration = last_tx_time.elapsed().unwrap().as_secs();
if silence_duration >= 600 { // 10 minutes
final_degraded.insert(account.clone());
final_silent += 1;
} else {
final_active += 1;
}
}
// Count accounts that never received any transactions
let never_active = replaced_account_addresses.len() - replaced_account_last_tx.len();
if never_active > 0 {
final_silent += never_active;
println!(" ⚠️ {} replaced accounts never received any transactions", never_active);
}
println!(" πŸ“ˆ Final account status: {} active, {} silent (10min+ no tx)", final_active, final_silent);
if final_silent > 0 {
println!(" 🚨 BUG REPRODUCTION SUCCESSFUL:");
println!(" β€’ {}/{} replaced accounts failed", final_silent, replaced_account_addresses.len());
if never_active > 0 {
println!(" β€’ {} accounts never worked (immediate failure)", never_active);
}
if final_degraded.len() > 0 {
println!(" β€’ {} accounts worked initially then went silent", final_degraded.len());
}
println!(" β€’ High-activity accounts NEVER stop for 10+ minutes normally");
println!(" β€’ 🎯 This definitively confirms the gRPC filter reuse bug!");
if final_degraded.len() > 0 {
println!(" πŸ’€ Accounts that worked then degraded:");
for account in final_degraded.iter().take(5) {
if let Some(last_tx) = replaced_account_last_tx.get(account) {
let silence_mins = last_tx.elapsed().unwrap().as_secs() / 60;
let tx_count = replaced_account_tx_count.get(account).unwrap_or(&0);
println!(" β€’ {}: {} transactions, then silent {}+ minutes", &account[0..8], tx_count, silence_mins);
}
}
}
} else {
println!(" βœ… NO DEGRADATION DETECTED:");
println!(" β€’ All replaced accounts remained active throughout test");
println!(" β€’ Filter approach is working correctly");
println!(" β€’ Bug was not reproduced in this run");
}
// Debug: Show activity breakdown by replaced account
if !replaced_account_tx_count.is_empty() {
println!(" πŸ”§ DEBUG: Transaction breakdown by replaced account:");
let mut sorted_accounts: Vec<_> = replaced_account_tx_count.iter().collect();
sorted_accounts.sort_by(|a, b| b.1.cmp(a.1)); // Sort by transaction count, descending
for (account, count) in sorted_accounts.iter().take(10) {
println!(" β€’ {}: {} transactions", &account[0..8], count);
}
if sorted_accounts.len() > 10 {
println!(" β€’ ... and {} more accounts", sorted_accounts.len() - 10);
}
let active_accounts = sorted_accounts.len();
let total_replaced = stats.accounts_replaced;
println!(" πŸ“Š Activity summary: {}/{} replaced accounts generated transactions",
active_accounts, total_replaced);
}
}
Ok(stats)
}
fn print_final_stats(stats: &TransactionStats) {
println!("\nπŸ“Š DETAILED TEST RESULTS");
println!("{}", "=".repeat(60));
println!("πŸ”¬ Filter Approach: {}", stats.filter_approach);
println!("⏱️ Test Duration: {} minutes", stats.test_duration.as_secs() / 60);
println!("πŸ”„ Replacements Performed: {}", stats.replacements_performed);
println!("πŸ“ Accounts Replaced: {}", stats.accounts_replaced);
println!();
println!("πŸ“ˆ TRANSACTION STATISTICS:");
println!(" Total transactions received: {}", stats.total_transactions);
println!(" Original account transactions: {}", stats.original_account_transactions);
println!(" Replaced account transactions: {}", stats.new_account_transactions);
println!(" Unique slots processed: {}", stats.unique_slots.len());
if let (Some(first), Some(last)) = (&stats.first_transaction_time, &stats.last_transaction_time) {
let duration = last.duration_since(*first).unwrap_or_default();
let rate = stats.total_transactions as f64 / duration.as_secs_f64();
println!(" Average transaction rate: {:.2} tx/sec", rate);
}
println!();
println!("🎯 ANALYSIS:");
if stats.accounts_replaced > 0 {
let replacement_success_rate = (stats.new_account_transactions as f64 / stats.accounts_replaced as f64) * 100.0;
if stats.new_account_transactions == 0 {
println!(" ❌ COMPLETE FAILURE: 0 transactions from {} replaced accounts", stats.accounts_replaced);
println!(" ⚠️ This confirms the gRPC filter reuse bug!");
} else if replacement_success_rate < 10.0 {
println!(" ⚠️ SEVERE DEGRADATION: Only {:.1}% success rate for replaced accounts", replacement_success_rate);
println!(" πŸ“‰ {} transactions from {} replaced accounts", stats.new_account_transactions, stats.accounts_replaced);
} else {
// Check if there was a long gap at the end (indicates degradation)
let time_since_last = if stats.last_transaction_time.is_some() {
stats.last_transaction_time.unwrap().elapsed().unwrap().as_secs()
} else {
0
};
if time_since_last >= 300 { // 5+ minute gap indicates degradation
println!(" ❌ DEGRADATION: {} transactions initially, then {} second gap at end",
stats.new_account_transactions, time_since_last);
println!(" πŸ“Š This confirms filter reuse causes data loss over time");
} else {
println!(" βœ… SUCCESS: {:.1}% of replaced accounts receiving data", replacement_success_rate);
println!(" πŸ“ˆ {} transactions from {} replaced accounts", stats.new_account_transactions, stats.accounts_replaced);
println!(" πŸ’š Filter approach is working correctly!");
}
}
} else {
println!(" ℹ️ No account replacements were performed in this test");
}
println!("{}", "=".repeat(60));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment