Programación de Index con Scylla (Optimización)
1. Configuración Óptima del Driver
Configuración de Conexiones
// Configurar para alta carga de trabajo
cass_cluster_set_protocol_version (cluster, CASS_PROTOCOL_VERSION_V4);
cass_cluster_set_num_threads_io (cluster, 4 );
cass_cluster_set_queue_size_io (cluster, 8192 );
cass_cluster_set_core_connections_per_host (cluster, 2 );
cass_cluster_set_max_connections_per_host (cluster, 4 );
cass_cluster_set_max_requests_per_flush (cluster, 128 );
Configuración de Timeouts
cass_cluster_set_connect_timeout (cluster, 5000 ); // 5 segundos
cass_cluster_set_request_timeout (cluster, 12000 ); // 12 segundos
2. Optimización de Prepared Statements
¿Por qué usar Prepared Statements?
Rendimiento : Parsing se hace una sola vez
Seguridad : Previene inyección CQL
Routing : El driver puede enrutar directamente al nodo correcto
Eficiencia de red : Menos datos enviados por la red
// Preparar una vez al inicio
const char * query = " INSERT INTO table (id, data) VALUES (?, ?)" ;
CassFuture* prepare_future = cass_session_prepare(session, query);
const CassPrepared* prepared = cass_future_get_prepared(prepare_future);
// Reutilizar múltiples veces
CassStatement* statement = cass_prepared_bind(prepared);
cass_statement_bind_string (statement, 0 , id.c_str());
cass_statement_bind_string (statement, 1 , data.c_str());
3. Estrategias de Batch Processing
Tipos de Batch y Cuándo Usarlos
Uso : Máximo rendimiento sin atomicidad
Ideal para : Indexación masiva, logging, métricas
Rendimiento : Mejor throughput
Uso : Cuando se requiere atomicidad
Ideal para : Transacciones críticas
Rendimiento : Mayor latencia debido a garantías ACID
Uso : Específico para operaciones de contador
Ideal para : Contadores, estadísticas
ScyllaDB : Hasta 1024 KB por defecto
Recomendado : 100-1000 statements por batch
Monitoreo : Verificar batch_size_fail_threshold_in_kb
4. Arquitectura de Thread Pool
class ThreadPool {
private:
std::vector<std::thread> workers;
std::queue<Task> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
std::atomic<bool > stop;
};
Regla general : 1-2 threads por CPU core
Para I/O intensivo : Hasta 4x el número de cores
Monitoreo : CPU utilization, queue depth
5. Manejo de Errores y Retry Logic
bool execute_with_retry (CassStatement* stmt, int max_retries = 3 ) {
for (int attempt = 0 ; attempt < max_retries; ++attempt) {
CassFuture* future = cass_session_execute (session, stmt);
CassError rc = cass_future_error_code (future);
if (rc == CASS_OK) {
cass_future_free (future);
return true ;
}
// Solo retry en errores transitorios
if (rc == CASS_ERROR_SERVER_OVERLOADED ||
rc == CASS_ERROR_LIB_REQUEST_TIMED_OUT) {
std::this_thread::sleep_for (
std::chrono::milliseconds (1000 * (attempt + 1 ))
);
cass_future_free (future);
continue ;
}
cass_future_free (future);
return false ;
}
return false ;
}
6. Modelado de Datos para Indexación
CREATE TABLE documents (
partition_key text , -- Para distribución uniforme
id text , -- Identificador único
content text ,
metadata map< text , text > ,
timestamp bigint ,
PRIMARY KEY (partition_key, id)
) WITH clustering_order_by = (id ASC );
-- Índices secundarios globales
CREATE INDEX ON documents (timestamp );
CREATE INDEX ON documents (KEYS(metadata));
Estrategias de Particionado
Time-based : Particionar por timestamp (día/hora)
Hash-based : Usar hash del ID para distribución uniforme
Bucketing : Combinar múltiples criterios
struct IndexerMetrics {
std::atomic<size_t > docs_processed{0 };
std::atomic<size_t > docs_failed{0 };
std::atomic<size_t > batches_executed{0 };
std::atomic<double > avg_latency_ms{0.0 };
std::atomic<size_t > queue_depth{0 };
};
void log_performance_stats () {
auto stats = get_stats ();
std::cout << " {"
<< " \" processed\" : " << stats.processed_documents << " , "
<< " \" errors\" : " << stats.error_count << " , "
<< " \" success_rate\" : " << stats.success_rate << " , "
<< " \" queue_depth\" : " << stats.pending_tasks
<< " }" << std::endl;
}
8. Consideraciones de Memoria
RAII : Usar smart pointers para gestión automática
Object pooling : Reutilizar objetos CassStatement
Batch size : Limitar para evitar OOM
class StatementPool {
private:
std::queue<CassStatement*> available_statements;
std::mutex pool_mutex;
const CassPrepared* prepared;
public:
CassStatement* acquire () {
std::lock_guard<std::mutex> lock (pool_mutex);
if (!available_statements.empty ()) {
auto stmt = available_statements.front ();
available_statements.pop ();
return stmt;
}
return cass_prepared_bind (prepared);
}
void release (CassStatement* stmt) {
cass_statement_reset (stmt);
std::lock_guard<std::mutex> lock (pool_mutex);
available_statements.push (stmt);
}
};
9. Patrones de Deployment
Usar datacenter local : Minimizar latencia
Token-aware routing : Habilitar en el driver
Load balancing : DCAwareRoundRobinPolicy
Múltiples instancias : Distribuir carga por partición
Coordinated shutdown : Graceful termination
Health checks : Monitoring de estado
10. Testing y Benchmarking
class IndexerBenchmark {
public:
void benchmark_throughput (size_t num_docs, size_t batch_size) {
auto start = std::chrono::high_resolution_clock::now ();
// Ejecutar indexación
index_documents (generate_test_docs (num_docs), batch_size);
auto end = std::chrono::high_resolution_clock::now ();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
double throughput = (double )num_docs / (duration.count () / 1000.0 );
std::cout << " Throughput: " << throughput << " docs/sec" << std::endl;
}
};
Métricas de Rendimiento Objetivo
Throughput : > 10,000 docs/segundo
Latencia P99 : < 100ms
Error rate : < 0.1%
CPU utilization : 70-80%
11. Troubleshooting Común
Causa : Batch excede batch_size_fail_threshold_in_kb
Solución : Reducir tamaño de batch o aumentar threshold
Causa : Red lenta o cluster sobrecargado
Solución : Aumentar timeouts, revisar configuración de red
Causa : No liberar objetos Cassandra
Solución : Verificar cass_*_free() calls
// Habilitar logging del driver
cass_log_set_level (CASS_LOG_DEBUG);
cass_log_set_callback (custom_log_callback, nullptr );
12. Checklist de Optimización