Skip to content

Instantly share code, notes, and snippets.

@Richard-Mathie
Last active March 29, 2017 10:43
Show Gist options
  • Save Richard-Mathie/fa831f18d4a4396551e523ffd01b105f to your computer and use it in GitHub Desktop.
Save Richard-Mathie/fa831f18d4a4396551e523ffd01b105f to your computer and use it in GitHub Desktop.
Cassandra Concurrent writer

Cassandra Concurrent Writer

A class which constructs a concurrent writer object so that you can write async to cassandra, but limit the number of concurrent writes to some number, and block when we hit that limit. This can result in improved write throuput withough having to construct your query with genorators, as the default cassandra concurrent method requires.

I would sugest adding callbacks to handle write time outs, as you may want to retry the write.

Usage

conn = get_connection()
session = conn.session
writer = ConcurrentWriter(session, concurrency=50, logger=error_logger)
query = ession.prepare("INSERT INTO  {}.model (cola, colb, colc) Values (?,?,?)".format(keyspace))
# ....
# then
for i in xrange(10000)
    values = (i,2,3)
    writer.execute_async(query, (values))
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 27 17:14:19 2017
@author: richard
"""
from threading import Condition
class ConcurrentWriter(object):
def __init__(self, session, concurrency=100, logger=None):
self.concurrency = concurrency
self.session = session
self._condition = Condition()
self._exec_count = 0
self.log = logger
def _on_success(self, result, future):
self._release()
def _on_error(self, result, future):
if self.log:
self.log("exception in future {}, {}".format(result, future))
self._release()
def _release(self):
with self._condition:
self._exec_count -= 1
self._condition.notify()
def execute_async(self, *args, **kwargs):
with self._condition:
while self._exec_count >= self.concurrency:
self._condition.wait()
self._exec_count += 1
future = self.session.execute_async(*args, timeout=None, **kwargs)
args = (future,)
future.add_callbacks(
callback=self._on_success, callback_args=args,
errback=self._on_error, errback_args=args)
return future
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 14 17:07:36 2017
@author: richard
"""
import nose
from nose.tools import assert_equals, assert_is_instance
import logging
# from cassandra.cluster import Cluster
from cassandra.cqlengine.models import Model, columns
from cassandra.cqlengine.connection import get_connection
from cassandra.io.asyncorereactor import AsyncoreConnection
from cassandra import InvalidRequest, OperationTimedOut
from concurrent import ConcurrentWriter
from flask_cassandra import get_db, init_db, drop_db
from app_config import get_app
from time import time, sleep
from collections import Counter
class TestModel(Model):
a = columns.Integer(primary_key=True)
b = columns.Integer(partition_key=True)
c = columns.Integer()
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
log.addHandler(ch)
have_twisted = False
reactor = AsyncoreConnection
try:
from cassandra.io.twistedreactor import TwistedConnection
have_twisted = True
reactor = TwistedConnection
except ImportError as exc:
log.exception("Error importing twisted")
pass
app = get_app(__name__, environment='test')
kwargs = app.config['CASSANDRA_SETUP_KWARGS']
kwargs.update({ # 'metrics_enabled': True,
'connection_class': reactor})
app.config['CASSANDRA_SETUP_KWARGS'] = kwargs
app.config['CASSANDRA_SETUP_KWARGS'] = kwargs
db = get_db(app)
data = [(x % 10, x, x) for x in range(4000)]
query = """INSERT INTO {}.test_model
(a,b,c)
VALUES
(?,?,?)""".format(db._keyspace_)
def setup_module():
init_db(db, autosync=False)
db.sync_table(TestModel)
def get_session():
conn = get_connection()
return conn.session
def get_writer(session=None):
if session is None:
session = get_session()
return ConcurrentWriter(session, logger=log.debug)
def teardown_module():
drop_db(db)
def test_write():
session = get_session()
concurrent = get_writer(session)
insert = session.prepare(query)
ts = time()
for d in data:
concurrent.execute_async(insert, d)
te = time()
dt = te - ts
log.info("1kmsg in {} s, rate: {} msg/s".format(dt, len(data)/dt))
sleep(5)
assert_equals(len(data), TestModel.all().count())
def test_fail():
bad_insert = """INSERT INTO {}.test_models
(a,b,c)
VALUES
({},{},{})""".format(db._keyspace_, 1, 1, 1)
session = get_session()
concurrent = get_writer(session)
future = concurrent.execute_async(bad_insert)
try:
future.result()
except Exception as exp:
assert_is_instance(exp, InvalidRequest)
log.debug("exec count: {}".format(concurrent._exec_count))
assert_equals(0, concurrent._exec_count)
def test_fail_retry():
retried = Counter()
def retry_execute(exception, session, query):
session.execute_async(query, timeout=10.)
retried['retry'] += 1
insert = """INSERT INTO {}.test_model
(a,b,c)
VALUES
({},{},{})""".format(db._keyspace_, 1, 1, 1)
session = get_session()
concurrent = get_writer(session)
future = concurrent.execute_async(insert, timeout=0.00001)
future.add_errback(retry_execute, session, future.query)
# future.result()
try:
future.result()
except Exception as exp:
assert_is_instance(exp, OperationTimedOut)
sleep(5)
assert_equals(1, retried['retry'])
log.debug("exec count: {}".format(concurrent._exec_count))
if __name__ == '__main__':
nose.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment