Last active
August 29, 2025 01:14
-
-
Save naviat/20db761cd6bd064223ea81177f0cf134 to your computer and use it in GitHub Desktop.
Debug degradation connection, no tx received when replace account in the same filter name.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::HashMap; | |
use std::time::{Duration, SystemTime}; | |
use std::path::Path; | |
use tokio::time::sleep; | |
use futures::SinkExt; | |
use yellowstone_grpc_client::{GeyserGrpcClient, Interceptor}; | |
use yellowstone_grpc_proto::prelude::*; | |
use serde::{Deserialize, Serialize}; | |
#[derive(Clone, Serialize, Deserialize)] | |
struct Config { | |
endpoint: String, | |
x_token: String, | |
} | |
#[derive(Clone)] | |
struct TestConfig { | |
endpoint: String, | |
x_token: Option<String>, | |
all_accounts: Vec<String>, | |
} | |
#[derive(Debug)] | |
struct TransactionStats { | |
total_transactions: u32, | |
new_account_transactions: u32, | |
original_account_transactions: u32, | |
unique_slots: std::collections::HashSet<u64>, | |
first_transaction_time: Option<SystemTime>, | |
last_transaction_time: Option<SystemTime>, | |
test_duration: Duration, | |
replacements_performed: u32, | |
filter_approach: String, | |
accounts_replaced: u32, | |
} | |
fn load_config() -> Result<Config, Box<dyn std::error::Error>> { | |
// Try to load from .env file | |
if Path::new(".env").exists() { | |
println!("π Loading configuration from .env file"); | |
dotenv::dotenv().ok(); | |
let endpoint = std::env::var("ENDPOINT") | |
.or_else(|_| std::env::var("GRPC_ENDPOINT"))?; | |
let x_token = std::env::var("X_TOKEN") | |
.or_else(|_| std::env::var("AUTH_TOKEN"))?; | |
return Ok(Config { endpoint, x_token }); | |
} | |
// Try environment variables without .env file | |
if let (Ok(endpoint), Ok(x_token)) = ( | |
std::env::var("ENDPOINT").or_else(|_| std::env::var("GRPC_ENDPOINT")), | |
std::env::var("X_TOKEN").or_else(|_| std::env::var("AUTH_TOKEN")) | |
) { | |
println!("π Loading configuration from environment variables"); | |
return Ok(Config { endpoint, x_token }); | |
} | |
Err("Configuration not found. Please provide either .env file, config.yml, or environment variables (ENDPOINT and X_TOKEN)".into()) | |
} | |
#[tokio::main] | |
async fn main() -> Result<(), Box<dyn std::error::Error>> { | |
// Parse command line arguments | |
let args: Vec<String> = std::env::args().collect(); | |
let test_mode = if args.len() > 1 { | |
match args[1].as_str() { | |
"2" | "test2" | "working" => "test2", | |
_ => "test1" // Default to broken test for any other input | |
} | |
} else { | |
"test1" // Default to broken test when no args | |
}; | |
// Load configuration from .env, config.yml, or environment variables | |
let loaded_config = load_config()?; | |
let config = TestConfig { | |
endpoint: loaded_config.endpoint, | |
x_token: Some(loaded_config.x_token), | |
// 50 high-activity Solana accounts for realistic testing | |
all_accounts: vec![ | |
// Popular DEX and DeFi accounts with high transaction volume | |
"JUP4Fb2cqiRUcaTHdrPC8h2gNsA2ETXiPDD33WcGuJB".to_string(), // Jupiter | |
"PhoeNiXZ8ByJGLkxNfZRnkUfjvmuYqLR89jjFHGqdXY".to_string(), // Phoenix | |
"CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK".to_string(), // Raydium CAMM | |
"675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8".to_string(), // Raydium AMM | |
"9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM".to_string(), // Orca | |
"whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc".to_string(), // Orca Whirlpool | |
"SSwpkEEcbUqx4vtoEByFjSkhKdCT862DNVb52nZg1UZ".to_string(), // Step Finance | |
"7dHbWXmci3dT8UFYWYZweBLXgycu7Y3iL6trKn1Y7ARj".to_string(), // Lido | |
"MarBmsSgKXdrN1egZf5sqe1TMai9K1rChYNDJgjq7aD".to_string(), // Marinade | |
"mSoLzYCxHdYgdzU16g5QSh3i5K3z3KZK7ytfqcJm7So".to_string(), // Marinade mSOL | |
// Popular token accounts | |
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v".to_string(), // USDC | |
"Es9vMFrzaCERmJfrF4H2FYD4KCoNkY11McCe8BenwNYB".to_string(), // USDT | |
"So11111111111111111111111111111111111111112".to_string(), // Wrapped SOL | |
"4k3Dyjzvzp8eMZWUXbBCjEvwSkkk59S5iCNLY3QrkX6R".to_string(), // Raydium RAY | |
"SRMuApVNdxXokk5GT7XD5cUUgXMBCoAz2LHeuAoKWRt".to_string(), // Serum SRM | |
// More popular token accounts and wallets | |
"7vfCXTUXx5WJV5JADk17DUJ4ksgau7utNKj4b963voxs".to_string(), // Ethereum Wormhole | |
"2FPyTwcZLUg1MDrwsyoP4D6s1tM7hAkHYRjkNb5w6Pxk".to_string(), // Solana Foundation | |
"GDDMwNyyx8uB6zrqwBFHjLLG3TBYk2F8Az4yrQC5RzMp".to_string(), // Large wallet | |
"5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1".to_string(), // Active trader | |
"BJ3jrUzddfuSrZHXSCxMbkKyRxJDN5n7m1DQSQQ1fyZZ".to_string(), // Popular DEX user | |
"7xKXtg2CW87d97TXJSDpbD5jBkheTqA83TZRuJosgAsU".to_string(), // DeFi protocol | |
"9n4nbM75f5Ui33ZbPYXn59EwSgE8CGsHtAeTH5YFeJ9E".to_string(), // Bridge program | |
"Saber2gLauYim4Mvftnrasomsv6NvAuncvMEZwcLpD1".to_string(), // Saber | |
"CURVGoZn8zycx6FXwwevgBTB2gVvdbGTEpvMJDbgs2t4".to_string(), // Curve | |
"port7gfFQRPdASFEwCMcjqWMJ9AMLKthtJWZkXWEe5L".to_string(), // Port Finance | |
"SLNDpmoWTVADgEdndyvWzroNL7zSi1dF9PC3xHGtPwp".to_string(), // Solend | |
"MERLuDFBMmsHnsBPZw2sDQZHvXFMwp8EdjudcU2HKky".to_string(), // Mercurial | |
"FarmuwXPWXvefWUeqFAa5w6rifLkq5X6E8bimYvrhCVx".to_string(), // Tulip | |
"6LtLpnUFNByNXLyCoK9wA2MykKAmQNZKBdY8s47dehDc".to_string(), // Solrise | |
"B7bX8HqWGgEh4kR9UQqZnkFzrw85YFRyv1REAaYBUjZe".to_string(), // Quarry | |
"QMNeHCGYnLVDn1icRAfQZpjPLBNkfGbSKRB83G5d8KB".to_string(), // Quarry Mine | |
"BrTJjJCTL2aFKhtCNLrz9NvdWtcV7YdQJuGZLjSAYREr".to_string(), // Bonfida | |
"namesLPneVptA9Z5rqUDD9tMTWEJwofgaYwp8cawRkX".to_string(), // Bonfida Names | |
"E2VmbootbVCBkMNNxKQgCLMS1X3NoGMaYAsufaAsf7M".to_string(), // MonkeDAO | |
"BonkDkNiDPxG6FeWsXi2QKL8NWzJbvKWdKnKvUpkEGi2".to_string(), // BONK | |
"DezXAZ8z7PnrnRJjz3wXBoRgixCa6xjnB7YaB1pPB263".to_string(), // Bonk DEX | |
"HUBsveNpjo5pWqNkH57QzxjQASdTVXcSK7bVKTSZtcSX".to_string(), // Hubble | |
"DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1".to_string(), // Popular NFT marketplace | |
"M2mx93ekt1fmXSVkTrUL9xVFHkmME8HTUi5Cyc5aF7K".to_string(), // Magic Eden | |
"CJsLwbP1iu5DuUikHEJnLfANgKy6stB2uFgvBBHoyxwz".to_string(), // Solanart | |
"A7p8451ktDCHq5yYaHczeLMYsjRsAkzc3hCXcSrwYHU7".to_string(), // OpenSea Solana | |
"BUX7s2ef2htTGb2KKoPHWkmzxPj4nTWMWRgs5CSbQxf9".to_string(), // Solana Beach | |
"rFqFJ9g7TGBD8Ed7TPDnvGKZ5pWLPDyxLcvcH2eRCtt".to_string(), // StepN | |
"nosXBVoaCTtYdLvKY6Csb4AC8JCdQKKAaWYtx2ZMoo7".to_string(), // Nosana | |
"kinXdEcpDQeHPEuQnqmUgtYykqKGVFq6CeVX5iAHJq6".to_string(), // KIN | |
"SHDWyBxihqiCj6YekG2GUr7wqKLeLAMK1gHZck9pL6y".to_string(), // Shadow Drive | |
"AVNP8n7vWb3ZwH8YMhp9jdSkC8V7uu44YP1PGU3jUc8".to_string(), // Sollamas | |
"ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL".to_string(), // Associated Token | |
"11111111111111111111111111111112".to_string(), // System Program | |
"ComputeBudget111111111111111111111111111111".to_string(), // Compute Budget | |
], | |
}; | |
println!("π§ͺ Solana gRPC Stream Degradation Test"); | |
println!("======================================"); | |
println!("π SCENARIO: 7min stable period, then single replacement"); | |
println!("π°οΈ Running for 40 minutes total - degradation expected 25-30min AFTER replacement"); | |
println!("π Monitoring for data loss when accounts are replaced"); | |
match test_mode { | |
"test1" => { | |
println!("π¬ Running BROKEN approach (same filter name) - reproducing bug\n"); | |
} | |
"test2" => { | |
println!("π¬ Running WORKING approach (unique filter names) - demonstrating fix\n"); | |
} | |
_ => {} | |
} | |
// Run the selected test | |
match test_mode { | |
"test1" => { | |
println!("π΄ BROKEN APPROACH (Same Filter Name)"); | |
println!("{}", "=".repeat(50)); | |
match run_stream_degradation_test(&config, false).await { | |
Ok(stats) => { | |
println!("β Test completed"); | |
print_final_stats(&stats); | |
} | |
Err(e) => { | |
println!("β Test failed with error: {}", e); | |
} | |
} | |
} | |
"test2" => { | |
println!("π’ WORKING SOLUTION (Unique Filter Names)"); | |
println!("{}", "=".repeat(50)); | |
match run_stream_degradation_test(&config, true).await { | |
Ok(stats) => { | |
println!("β Test completed"); | |
print_final_stats(&stats); | |
} | |
Err(e) => { | |
println!("β Test failed with error: {}", e); | |
} | |
} | |
} | |
_ => {} | |
} | |
Ok(()) | |
} | |
async fn run_stream_degradation_test(config: &TestConfig, use_unique_names: bool) -> Result<TransactionStats, Box<dyn std::error::Error>> { | |
// Stream 1: First 12 accounts | |
let stream1_accounts = &config.all_accounts[0..12]; | |
// Stream 2: Next 13 accounts | |
let stream2_accounts = &config.all_accounts[12..25]; | |
// Replacement pool: Separate accounts not in initial streams | |
let replacement_accounts = &config.all_accounts[25..50]; // 25 replacement accounts (all remaining) | |
println!("π Setting up 2-stream configuration"); | |
println!(" Stream 1 accounts: {}", stream1_accounts.len()); | |
println!(" Stream 2 accounts: {}", stream2_accounts.len()); | |
println!(" Replacement pool: {}", replacement_accounts.len()); | |
println!(" Approach: {}", if use_unique_names { "Unique filter names (WORKING)" } else { "Same filter names (BROKEN)" }); | |
let mut client = create_client(config).await?; | |
let (mut sink, mut stream) = client.subscribe().await?; | |
// Real pattern: Multiple separate streams per gRPC connection (2-3 streams) | |
// Each stream has up to 25 accounts, they reuse THE SAME stream names but replace some accounts (this causes the bug) | |
let mut transactions = HashMap::new(); | |
// Stream 1: Up to 25 accounts | |
transactions.insert( | |
"stream_1".to_string(), // Stream name that will be reused | |
SubscribeRequestFilterTransactions { | |
account_include: stream1_accounts.to_vec(), // 12 accounts in stream 1 | |
account_exclude: vec![], | |
account_required: vec![], | |
vote: Some(false), | |
failed: Some(false), // Include failed transactions for testing | |
signature: None, | |
}, | |
); | |
// Stream 2: Up to 25 accounts | |
transactions.insert( | |
"stream_2".to_string(), // Stream name that will be reused | |
SubscribeRequestFilterTransactions { | |
account_include: stream2_accounts.to_vec(), // 13 accounts in stream 2 | |
account_exclude: vec![], | |
account_required: vec![], | |
vote: Some(false), | |
failed: Some(false), // Include failed transactions for testing | |
signature: None, | |
}, | |
); | |
let initial_request = SubscribeRequest { | |
accounts: HashMap::new(), | |
slots: HashMap::new(), | |
transactions, | |
transactions_status: HashMap::new(), | |
blocks: HashMap::new(), | |
blocks_meta: HashMap::new(), | |
entry: HashMap::new(), | |
accounts_data_slice: vec![], | |
commitment: Some(CommitmentLevel::Confirmed as i32), | |
ping: None, // PING will be sent separately every 30s | |
from_slot: None, | |
}; | |
println!("π§ DEBUG: Sending initial request with 2 streams:"); | |
println!(" π Stream 'stream_1': {} accounts", stream1_accounts.len()); | |
println!(" π Stream 'stream_2': {} accounts", stream2_accounts.len()); | |
println!(" π PING keep-alive will be sent every 30 seconds separately"); | |
// Debug: Show first few accounts being monitored per stream | |
println!(" π Stream 1 first 3 accounts:"); | |
for (i, account) in stream1_accounts.iter().take(3).enumerate() { | |
println!(" [{}] {}", i, account); | |
} | |
println!(" π Stream 2 first 3 accounts:"); | |
for (i, account) in stream2_accounts.iter().take(3).enumerate() { | |
println!(" [{}] {}", i, account); | |
} | |
sink.send(initial_request).await?; | |
println!("β Initial setup: 2 streams with {} total accounts", stream1_accounts.len() + stream2_accounts.len()); | |
println!(" π Real pattern: Same stream names reused with updated account lists"); | |
// Monitor with 2-stream account replacement | |
let stats = monitor_with_2stream_replacement( | |
&mut sink, | |
&mut stream, | |
stream1_accounts, | |
stream2_accounts, | |
replacement_accounts, | |
40 * 60, // 40 minutes total - wait longer for degradation | |
7 * 60, // Single replacement at 7 minutes | |
use_unique_names, | |
).await?; | |
Ok(stats) | |
} | |
async fn create_client(config: &TestConfig) -> Result<GeyserGrpcClient<impl Interceptor>, Box<dyn std::error::Error>> { | |
println!("π Connecting to: {}", config.endpoint); | |
let mut client_builder = GeyserGrpcClient::build_from_shared(config.endpoint.clone())?; | |
if let Some(token) = &config.x_token { | |
println!("π Using authentication token"); | |
client_builder = client_builder.x_token(Some(token.clone()))?; | |
} | |
// Enable TLS with native root certificates | |
let tls_config = tonic::transport::ClientTlsConfig::new() | |
.with_native_roots(); | |
client_builder = client_builder.tls_config(tls_config)?; | |
let client = client_builder | |
.connect_timeout(Duration::from_secs(30)) | |
.timeout(Duration::from_secs(30)) | |
.connect() | |
.await | |
.map_err(|e| { | |
eprintln!("β Connection failed: {:?}", e); | |
e | |
})?; | |
println!("β Connected successfully"); | |
Ok(client) | |
} | |
async fn monitor_with_2stream_replacement( | |
sink: &mut (impl futures::Sink<SubscribeRequest, Error = futures::channel::mpsc::SendError> + Unpin), | |
stream: &mut (impl futures::Stream<Item = Result<SubscribeUpdate, tonic::Status>> + Unpin), | |
stream1_accounts: &[String], | |
stream2_accounts: &[String], | |
replacement_accounts: &[String], | |
total_duration_secs: u64, | |
replacement_time: u64, | |
use_unique_names: bool, | |
) -> Result<TransactionStats, Box<dyn std::error::Error>> { | |
use futures::{StreamExt, SinkExt}; | |
let mut stats = TransactionStats { | |
total_transactions: 0, | |
new_account_transactions: 0, | |
original_account_transactions: 0, | |
unique_slots: std::collections::HashSet::new(), | |
first_transaction_time: None, | |
last_transaction_time: None, | |
test_duration: Duration::from_secs(total_duration_secs), | |
replacements_performed: 0, | |
filter_approach: if use_unique_names { "Unique Names".to_string() } else { "Same Name".to_string() }, | |
accounts_replaced: 0, | |
}; | |
let start_time = SystemTime::now(); | |
let mut current_stream1_accounts = stream1_accounts.to_vec(); | |
let mut current_stream2_accounts = stream2_accounts.to_vec(); | |
let mut replacement_done = false; | |
let mut last_tx_for_replaced_accounts = start_time; | |
let mut replaced_account_addresses = std::collections::HashSet::new(); // Track actual replaced account addresses | |
let mut original_account_addresses = std::collections::HashSet::new(); // Track original account addresses | |
let mut replaced_account_tx_count = HashMap::new(); // Track transactions per replaced account | |
let mut replaced_account_last_tx = HashMap::new(); // Track last transaction time per replaced account | |
let mut degraded_accounts = std::collections::HashSet::new(); // Track accounts confirmed as degraded | |
// Store original account addresses for categorization from both streams | |
for account in stream1_accounts { | |
original_account_addresses.insert(account.clone()); | |
} | |
for account in stream2_accounts { | |
original_account_addresses.insert(account.clone()); | |
} | |
// Create combined account list for easier transaction matching | |
let mut all_current_accounts = current_stream1_accounts.clone(); | |
all_current_accounts.extend(current_stream2_accounts.clone()); | |
// Create separate timer channels for replacements and keep-alive | |
let (timer_tx, mut timer_rx) = tokio::sync::mpsc::unbounded_channel::<String>(); | |
let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>(); | |
// Spawn background timer task for test events | |
let timer_start = start_time; | |
tokio::spawn(async move { | |
let mut last_report = timer_start; | |
loop { | |
sleep(Duration::from_secs(5)).await; // Check every 5 seconds | |
let elapsed = timer_start.elapsed().unwrap().as_secs(); | |
// Send periodic updates | |
if last_report.elapsed().unwrap().as_secs() >= 10 { | |
let _ = timer_tx.send(format!("report:{}", elapsed)); | |
last_report = SystemTime::now(); | |
} | |
// Check for replacement time | |
if elapsed >= replacement_time { | |
let _ = timer_tx.send("replace1".to_string()); | |
} | |
if elapsed >= total_duration_secs { | |
let _ = timer_tx.send("done".to_string()); | |
break; | |
} | |
} | |
}); | |
// Spawn background PING keep-alive task | |
tokio::spawn(async move { | |
let mut _ping_counter = 1; | |
loop { | |
sleep(Duration::from_secs(30)).await; // Send PING every 30 seconds | |
let _ = ping_tx.send(()); | |
_ping_counter += 1; | |
} | |
}); | |
println!("π Monitoring transactions on {} total accounts across 2 streams...", all_current_accounts.len()); | |
println!("π Pattern: Single replacement at {}min, then monitor for degradation", | |
replacement_time / 60); | |
println!("π¬ SCENARIO: Degradation should occur 25-30 minutes AFTER replacement!"); | |
println!("π§ Debug: replacement_time={}s, total_duration={}s", | |
replacement_time, total_duration_secs); | |
// Debug: Show first few monitored accounts per stream | |
println!("π Stream 1 first 3 accounts:"); | |
for (i, account) in current_stream1_accounts.iter().take(3).enumerate() { | |
println!(" [{}] {}", i, &account[0..8]); | |
} | |
println!("π Stream 2 first 3 accounts:"); | |
for (i, account) in current_stream2_accounts.iter().take(3).enumerate() { | |
println!(" [{}] {}", i, &account[0..8]); | |
} | |
println!(); | |
loop { | |
tokio::select! { | |
Some(message) = stream.next() => { | |
match message { | |
Ok(update) => { | |
// Debug: Show all update types we receive | |
match &update.update_oneof { | |
Some(subscribe_update::UpdateOneof::Pong(pong_response)) => { | |
println!(" π Received PONG response: id={}", pong_response.id); | |
continue; | |
} | |
Some(subscribe_update::UpdateOneof::Transaction(_)) => { | |
// Continue with transaction processing below | |
} | |
Some(subscribe_update::UpdateOneof::Slot(slot_update)) => { | |
if stats.total_transactions == 0 { | |
println!(" π Received slot update: {}", slot_update.slot); | |
} | |
continue; | |
} | |
Some(other_update) => { | |
if stats.total_transactions == 0 { | |
println!(" π¨ Received other update type: {:?}", std::mem::discriminant(other_update)); | |
} | |
continue; | |
} | |
None => { | |
if stats.total_transactions == 0 { | |
println!(" β οΈ Received update with no content"); | |
} | |
continue; | |
} | |
} | |
if let Some(subscribe_update::UpdateOneof::Transaction(tx_update)) = &update.update_oneof { | |
stats.total_transactions += 1; | |
let now = SystemTime::now(); | |
if stats.first_transaction_time.is_none() { | |
stats.first_transaction_time = Some(now); | |
} | |
stats.last_transaction_time = Some(now); | |
stats.unique_slots.insert(tx_update.slot); | |
if let Some(transaction_info) = &tx_update.transaction { | |
// Parse the actual transaction to see which accounts are involved | |
let mut involves_original_accounts = false; | |
let mut involves_replaced_accounts = false; | |
// Access the transaction data within SubscribeUpdateTransactionInfo | |
if let Some(transaction) = &transaction_info.transaction { | |
if let Some(message) = &transaction.message { | |
// Debug: Show first few transactions in detail | |
if stats.total_transactions <= 3 { | |
println!(" π Debug tx #{}: {} account keys", | |
stats.total_transactions, message.account_keys.len()); | |
} | |
// Check each account in the transaction | |
for account_key in &message.account_keys { | |
let account_str = bs58::encode(account_key).into_string(); | |
// Debug: Show first few account comparisons | |
if stats.total_transactions <= 3 { | |
println!(" Checking: {}", &account_str[0..8]); | |
} | |
// Check if this account is in our monitored set | |
if all_current_accounts.contains(&account_str) { | |
if stats.total_transactions <= 10 { | |
println!(" β MATCH found: {}", &account_str[0..8]); | |
} | |
// Check if this is a replaced account or original account | |
if replaced_account_addresses.contains(&account_str) { | |
involves_replaced_accounts = true; | |
// Track which replaced account is active | |
let count = replaced_account_tx_count.entry(account_str.clone()).or_insert(0); | |
*count += 1; | |
// Update last transaction time for this replaced account | |
replaced_account_last_tx.insert(account_str.clone(), now); | |
// Debug: Show when we find replaced account transactions | |
if stats.new_account_transactions < 10 || stats.new_account_transactions % 50 == 0 { | |
println!(" π― REPLACED ACCOUNT TX #{}: {} (count: {})", | |
stats.new_account_transactions + 1, &account_str[0..8], count); | |
} | |
} else if original_account_addresses.contains(&account_str) { | |
involves_original_accounts = true; | |
} | |
break; | |
} | |
} | |
} | |
} | |
// Categorize the transaction based on which accounts are involved | |
if involves_replaced_accounts { | |
stats.new_account_transactions += 1; | |
last_tx_for_replaced_accounts = now; | |
let minutes_since_replacement = if replacement_done { | |
(now.duration_since(start_time).unwrap().as_secs() - replacement_time) / 60 | |
} else { | |
0 | |
}; | |
// Debug: Show we found a transaction from replaced accounts with timing | |
if stats.new_account_transactions % 50 == 1 { | |
println!(" π― REPLACED ACCOUNT TX #{}: {}min after replacement (slot: {})", | |
stats.new_account_transactions, minutes_since_replacement, tx_update.slot); | |
} | |
// Special debug for the critical period when degradation should occur | |
if replacement_done && minutes_since_replacement >= 25 && minutes_since_replacement <= 30 { | |
if stats.new_account_transactions % 10 == 1 { | |
println!(" π¨ CRITICAL PERIOD: Replaced account still receiving tx #{} at {}min post-replacement!", | |
stats.new_account_transactions, minutes_since_replacement); | |
} | |
} | |
} else if involves_original_accounts { | |
stats.original_account_transactions += 1; | |
} else { | |
// Transaction doesn't involve any of our monitored accounts | |
// This shouldn't happen if the filter is working correctly | |
if stats.total_transactions % 10000 == 1 { | |
println!(" β οΈ Warning: Transaction doesn't involve monitored accounts (slot: {}) - {} total so far", | |
tx_update.slot, stats.total_transactions); | |
} | |
} | |
} | |
} | |
} | |
Err(e) => { | |
eprintln!("β Stream error: {}", e); | |
} | |
} | |
} | |
Some(_) = ping_rx.recv() => { | |
// Send PING keep-alive every 30 seconds | |
let ping_request = SubscribeRequest { | |
accounts: HashMap::new(), | |
slots: HashMap::new(), | |
transactions: HashMap::new(), | |
transactions_status: HashMap::new(), | |
blocks: HashMap::new(), | |
blocks_meta: HashMap::new(), | |
entry: HashMap::new(), | |
accounts_data_slice: vec![], | |
commitment: None, | |
ping: Some(SubscribeRequestPing { id: 1 }), | |
from_slot: None, | |
}; | |
match sink.send(ping_request).await { | |
Ok(_) => { | |
println!(" π Sent PING keep-alive"); | |
} | |
Err(e) => { | |
eprintln!(" β Failed to send PING: {}", e); | |
} | |
} | |
} | |
Some(timer_msg) = timer_rx.recv() => { | |
let elapsed_secs = start_time.elapsed().unwrap().as_secs(); | |
match timer_msg.as_str() { | |
"done" => { | |
println!("β° Timer: Test duration completed, exiting..."); | |
break; | |
} | |
msg if msg.starts_with("report:") => { | |
let elapsed_mins = elapsed_secs / 60; | |
if elapsed_mins < 7 { | |
println!("π’ {}min: BASELINE PHASE - {} total tx", elapsed_mins, stats.total_transactions); | |
} else if elapsed_mins < 17 { | |
println!("π‘ {}min: POST-UPDATE MONITORING - {} total tx (original: {}, replaced: {})", | |
elapsed_mins, stats.total_transactions, | |
stats.original_account_transactions, stats.new_account_transactions); | |
} else { | |
let mins_since_replacement = if replacement_done { | |
elapsed_mins - (replacement_time / 60) | |
} else { | |
0 | |
}; | |
// Check for 10-minute silence detection (start checking at 17min = 10min post-replacement) | |
let mut newly_degraded = Vec::new(); | |
let mut active_replaced = 0; | |
let mut silent_replaced = 0; | |
for (account, last_tx_time) in &replaced_account_last_tx { | |
let silence_duration = last_tx_time.elapsed().unwrap().as_secs(); | |
if silence_duration >= 600 { // 10 minutes = 600 seconds | |
if !degraded_accounts.contains(account) { | |
newly_degraded.push((account.clone(), silence_duration)); | |
degraded_accounts.insert(account.clone()); | |
} | |
silent_replaced += 1; | |
} else { | |
active_replaced += 1; | |
} | |
} | |
// Count replaced accounts that haven't had any transactions yet | |
let never_seen = replaced_account_addresses.len() - replaced_account_last_tx.len(); | |
if never_seen > 0 { | |
silent_replaced += never_seen; | |
} | |
println!("π΄ {}min: DEGRADATION WATCH ({}min since replacement) - {} total tx", | |
elapsed_mins, mins_since_replacement, stats.total_transactions); | |
println!(" π Replaced accounts: {} active, {} silent", active_replaced, silent_replaced); | |
// Report newly degraded accounts | |
for (account, silence_secs) in &newly_degraded { | |
println!(" π¨ DEGRADATION DETECTED: {} silent for {}min {}s", | |
&account[0..8], silence_secs / 60, silence_secs % 60); | |
} | |
// Overall degradation status | |
if !degraded_accounts.is_empty() { | |
println!(" π― BUG CONFIRMED: {}/{} replaced accounts showing 10min+ silence!", | |
degraded_accounts.len(), replaced_account_addresses.len()); | |
println!(" π§ Same filter name reuse causes account data loss!"); | |
} | |
} | |
} | |
"replace1" if !replacement_done => { | |
replacement_done = true; | |
stats.replacements_performed += 1; | |
println!("\nπ 2-STREAM UPDATE at {}min ({}s)", elapsed_secs / 60, elapsed_secs); | |
println!(" π Multi-stream pattern: Replace accounts in both streams simultaneously"); | |
println!(" π Updating both streams with new account lists (approach: {})", | |
if use_unique_names { "UNIQUE filter names" } else { "SAME filter names" }); | |
println!(" β οΈ After this update, degradation expected 25-30min later (at 32-37min mark)"); | |
// Replace 4 accounts in Stream 1 (indices 2-5) with replacement accounts 0-3 | |
println!(" π§ Stream 1: Replacing accounts at indices 2-5:"); | |
for i in 2..6 { | |
if i < current_stream1_accounts.len() && (i-2) < replacement_accounts.len() { | |
let old_account = current_stream1_accounts[i].clone(); | |
let new_account = replacement_accounts[i-2].clone(); | |
current_stream1_accounts[i] = new_account.clone(); | |
// Track the actual replacement account addresses | |
replaced_account_addresses.insert(new_account.clone()); | |
stats.accounts_replaced += 1; | |
println!(" Stream1[{}]: {} -> {}", i, &old_account[0..8], &new_account[0..8]); | |
} | |
} | |
// Replace 5 accounts in Stream 2 (indices 3-7) with replacement accounts 4-8 | |
println!(" π§ Stream 2: Replacing accounts at indices 3-7:"); | |
for i in 3..8 { | |
if i < current_stream2_accounts.len() && (i-3+4) < replacement_accounts.len() { | |
let old_account = current_stream2_accounts[i].clone(); | |
let new_account = replacement_accounts[i-3+4].clone(); | |
current_stream2_accounts[i] = new_account.clone(); | |
// Track the actual replacement account addresses | |
replaced_account_addresses.insert(new_account.clone()); | |
stats.accounts_replaced += 1; | |
println!(" Stream2[{}]: {} -> {}", i, &old_account[0..8], &new_account[0..8]); | |
} | |
} | |
println!(" β Replaced {} accounts total across 2 streams", stats.accounts_replaced); | |
// Update the combined account list | |
all_current_accounts = current_stream1_accounts.clone(); | |
all_current_accounts.extend(current_stream2_accounts.clone()); | |
println!(" π§ DEBUG: Updated account lists:"); | |
println!(" π Stream 1: {}", | |
current_stream1_accounts.iter().map(|a| &a[0..8]).collect::<Vec<_>>().join(", ")); | |
println!(" π Stream 2: {}", | |
current_stream2_accounts.iter().map(|a| &a[0..8]).collect::<Vec<_>>().join(", ")); | |
// THE CRITICAL BUG: Reuse same stream names vs unique names | |
let (stream1_filter_name, stream2_filter_name) = if use_unique_names { | |
let timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); | |
(format!("stream_1_{}", timestamp), format!("stream_2_{}", timestamp)) // UNIQUE names (WORKING) | |
} else { | |
("stream_1".to_string(), "stream_2".to_string()) // SAME names (BROKEN - this is the bug!) | |
}; | |
println!(" π§ DEBUG: Stream naming strategy:"); | |
println!(" π·οΈ Stream 1 name: '{}'", stream1_filter_name); | |
println!(" π·οΈ Stream 2 name: '{}'", stream2_filter_name); | |
if !use_unique_names { | |
println!(" β οΈ CRITICAL: Reusing SAME stream names - this should trigger the bug!"); | |
println!(" π§ Real pattern: Same stream names with updated account lists"); | |
} else { | |
println!(" β Using unique stream names - this should prevent the bug"); | |
} | |
// Real pattern: Update same stream names with modified account lists | |
let mut transactions = HashMap::new(); | |
// Stream 1 update | |
transactions.insert( | |
stream1_filter_name, | |
SubscribeRequestFilterTransactions { | |
account_include: current_stream1_accounts.clone(), // Stream 1 with some replaced | |
account_exclude: vec![], | |
account_required: vec![], | |
vote: Some(false), | |
failed: Some(false), | |
signature: None, | |
}, | |
); | |
// Stream 2 update | |
transactions.insert( | |
stream2_filter_name, | |
SubscribeRequestFilterTransactions { | |
account_include: current_stream2_accounts.clone(), // Stream 2 with some replaced | |
account_exclude: vec![], | |
account_required: vec![], | |
vote: Some(false), | |
failed: Some(false), | |
signature: None, | |
}, | |
); | |
let update_request = SubscribeRequest { | |
accounts: HashMap::new(), | |
slots: HashMap::new(), | |
transactions, | |
transactions_status: HashMap::new(), | |
blocks: HashMap::new(), | |
blocks_meta: HashMap::new(), | |
entry: HashMap::new(), | |
accounts_data_slice: vec![], | |
commitment: Some(CommitmentLevel::Confirmed as i32), | |
ping: None, // PING will be sent separately every 30s | |
from_slot: None, | |
}; | |
println!(" π‘ DEBUG: Sending filter update to server..."); | |
match sink.send(update_request).await { | |
Err(e) => { | |
eprintln!(" β Failed to send first replacement: {}", e); | |
} | |
Ok(_) => { | |
println!(" β 2-Stream filter update sent successfully!"); | |
println!(" π§ Server received updates for both stream filters"); | |
println!(" π§ Filter approach: {} filter names", if use_unique_names { "UNIQUE" } else { "SAME" }); | |
println!(" β° TIMELINE: If using SAME names, degradation expected at 32-37min total"); | |
println!(" π Now monitoring for transactions from replaced accounts in both streams...\n"); | |
} | |
} | |
} | |
"replace2" => { | |
// Skip second replacement - we only do one replacement now | |
println!(" βΉοΈ No second replacement - single replacement strategy"); | |
} | |
_ => {} // Ignore other messages | |
} | |
} | |
} | |
} | |
println!("π Single replacement scenario test completed after {} minutes", total_duration_secs / 60); | |
println!("π FINAL TIMELINE: {}min stable + {}min post-replacement monitoring = {}min total", | |
replacement_time / 60, | |
(total_duration_secs - replacement_time) / 60, | |
total_duration_secs / 60); | |
if replacement_done { | |
let _final_time_since_replaced_tx = last_tx_for_replaced_accounts.elapsed().unwrap().as_secs(); | |
let total_minutes_since_replacement = (start_time.elapsed().unwrap().as_secs() - replacement_time) / 60; | |
println!("\nπ FINAL DEGRADATION ANALYSIS:"); | |
println!(" π°οΈ Total time since replacement: {} minutes", total_minutes_since_replacement); | |
println!(" π Total replaced account transactions received: {}", stats.new_account_transactions); | |
// Analyze degradation using 10-minute silence detection | |
let mut final_degraded = std::collections::HashSet::new(); | |
let mut final_active = 0; | |
let mut final_silent = 0; | |
for (account, last_tx_time) in &replaced_account_last_tx { | |
let silence_duration = last_tx_time.elapsed().unwrap().as_secs(); | |
if silence_duration >= 600 { // 10 minutes | |
final_degraded.insert(account.clone()); | |
final_silent += 1; | |
} else { | |
final_active += 1; | |
} | |
} | |
// Count accounts that never received any transactions | |
let never_active = replaced_account_addresses.len() - replaced_account_last_tx.len(); | |
if never_active > 0 { | |
final_silent += never_active; | |
println!(" β οΈ {} replaced accounts never received any transactions", never_active); | |
} | |
println!(" π Final account status: {} active, {} silent (10min+ no tx)", final_active, final_silent); | |
if final_silent > 0 { | |
println!(" π¨ BUG REPRODUCTION SUCCESSFUL:"); | |
println!(" β’ {}/{} replaced accounts failed", final_silent, replaced_account_addresses.len()); | |
if never_active > 0 { | |
println!(" β’ {} accounts never worked (immediate failure)", never_active); | |
} | |
if final_degraded.len() > 0 { | |
println!(" β’ {} accounts worked initially then went silent", final_degraded.len()); | |
} | |
println!(" β’ High-activity accounts NEVER stop for 10+ minutes normally"); | |
println!(" β’ π― This definitively confirms the gRPC filter reuse bug!"); | |
if final_degraded.len() > 0 { | |
println!(" π Accounts that worked then degraded:"); | |
for account in final_degraded.iter().take(5) { | |
if let Some(last_tx) = replaced_account_last_tx.get(account) { | |
let silence_mins = last_tx.elapsed().unwrap().as_secs() / 60; | |
let tx_count = replaced_account_tx_count.get(account).unwrap_or(&0); | |
println!(" β’ {}: {} transactions, then silent {}+ minutes", &account[0..8], tx_count, silence_mins); | |
} | |
} | |
} | |
} else { | |
println!(" β NO DEGRADATION DETECTED:"); | |
println!(" β’ All replaced accounts remained active throughout test"); | |
println!(" β’ Filter approach is working correctly"); | |
println!(" β’ Bug was not reproduced in this run"); | |
} | |
// Debug: Show activity breakdown by replaced account | |
if !replaced_account_tx_count.is_empty() { | |
println!(" π§ DEBUG: Transaction breakdown by replaced account:"); | |
let mut sorted_accounts: Vec<_> = replaced_account_tx_count.iter().collect(); | |
sorted_accounts.sort_by(|a, b| b.1.cmp(a.1)); // Sort by transaction count, descending | |
for (account, count) in sorted_accounts.iter().take(10) { | |
println!(" β’ {}: {} transactions", &account[0..8], count); | |
} | |
if sorted_accounts.len() > 10 { | |
println!(" β’ ... and {} more accounts", sorted_accounts.len() - 10); | |
} | |
let active_accounts = sorted_accounts.len(); | |
let total_replaced = stats.accounts_replaced; | |
println!(" π Activity summary: {}/{} replaced accounts generated transactions", | |
active_accounts, total_replaced); | |
} | |
} | |
Ok(stats) | |
} | |
fn print_final_stats(stats: &TransactionStats) { | |
println!("\nπ DETAILED TEST RESULTS"); | |
println!("{}", "=".repeat(60)); | |
println!("π¬ Filter Approach: {}", stats.filter_approach); | |
println!("β±οΈ Test Duration: {} minutes", stats.test_duration.as_secs() / 60); | |
println!("π Replacements Performed: {}", stats.replacements_performed); | |
println!("π Accounts Replaced: {}", stats.accounts_replaced); | |
println!(); | |
println!("π TRANSACTION STATISTICS:"); | |
println!(" Total transactions received: {}", stats.total_transactions); | |
println!(" Original account transactions: {}", stats.original_account_transactions); | |
println!(" Replaced account transactions: {}", stats.new_account_transactions); | |
println!(" Unique slots processed: {}", stats.unique_slots.len()); | |
if let (Some(first), Some(last)) = (&stats.first_transaction_time, &stats.last_transaction_time) { | |
let duration = last.duration_since(*first).unwrap_or_default(); | |
let rate = stats.total_transactions as f64 / duration.as_secs_f64(); | |
println!(" Average transaction rate: {:.2} tx/sec", rate); | |
} | |
println!(); | |
println!("π― ANALYSIS:"); | |
if stats.accounts_replaced > 0 { | |
let replacement_success_rate = (stats.new_account_transactions as f64 / stats.accounts_replaced as f64) * 100.0; | |
if stats.new_account_transactions == 0 { | |
println!(" β COMPLETE FAILURE: 0 transactions from {} replaced accounts", stats.accounts_replaced); | |
println!(" β οΈ This confirms the gRPC filter reuse bug!"); | |
} else if replacement_success_rate < 10.0 { | |
println!(" β οΈ SEVERE DEGRADATION: Only {:.1}% success rate for replaced accounts", replacement_success_rate); | |
println!(" π {} transactions from {} replaced accounts", stats.new_account_transactions, stats.accounts_replaced); | |
} else { | |
// Check if there was a long gap at the end (indicates degradation) | |
let time_since_last = if stats.last_transaction_time.is_some() { | |
stats.last_transaction_time.unwrap().elapsed().unwrap().as_secs() | |
} else { | |
0 | |
}; | |
if time_since_last >= 300 { // 5+ minute gap indicates degradation | |
println!(" β DEGRADATION: {} transactions initially, then {} second gap at end", | |
stats.new_account_transactions, time_since_last); | |
println!(" π This confirms filter reuse causes data loss over time"); | |
} else { | |
println!(" β SUCCESS: {:.1}% of replaced accounts receiving data", replacement_success_rate); | |
println!(" π {} transactions from {} replaced accounts", stats.new_account_transactions, stats.accounts_replaced); | |
println!(" π Filter approach is working correctly!"); | |
} | |
} | |
} else { | |
println!(" βΉοΈ No account replacements were performed in this test"); | |
} | |
println!("{}", "=".repeat(60)); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment