|
|
|
# -*- coding: utf-8 -*- |
|
""" |
|
Created on Tue Feb 14 17:07:36 2017 |
|
@author: richard |
|
""" |
|
|
|
import nose |
|
from nose.tools import assert_equals, assert_is_instance |
|
|
|
import logging |
|
|
|
# from cassandra.cluster import Cluster |
|
from cassandra.cqlengine.models import Model, columns |
|
from cassandra.cqlengine.connection import get_connection |
|
from cassandra.io.asyncorereactor import AsyncoreConnection |
|
from cassandra import InvalidRequest, OperationTimedOut |
|
|
|
from concurrent import ConcurrentWriter |
|
from flask_cassandra import get_db, init_db, drop_db |
|
|
|
from app_config import get_app |
|
|
|
from time import time, sleep |
|
from collections import Counter |
|
|
|
|
|
class TestModel(Model): |
|
a = columns.Integer(primary_key=True) |
|
b = columns.Integer(partition_key=True) |
|
c = columns.Integer() |
|
|
|
log = logging.getLogger(__name__) |
|
log.setLevel(logging.INFO) |
|
|
|
# create console handler and set level to debug |
|
ch = logging.StreamHandler() |
|
ch.setLevel(logging.INFO) |
|
log.addHandler(ch) |
|
|
|
have_twisted = False |
|
reactor = AsyncoreConnection |
|
try: |
|
from cassandra.io.twistedreactor import TwistedConnection |
|
have_twisted = True |
|
reactor = TwistedConnection |
|
except ImportError as exc: |
|
log.exception("Error importing twisted") |
|
pass |
|
|
|
app = get_app(__name__, environment='test') |
|
kwargs = app.config['CASSANDRA_SETUP_KWARGS'] |
|
kwargs.update({ # 'metrics_enabled': True, |
|
'connection_class': reactor}) |
|
app.config['CASSANDRA_SETUP_KWARGS'] = kwargs |
|
app.config['CASSANDRA_SETUP_KWARGS'] = kwargs |
|
db = get_db(app) |
|
|
|
data = [(x % 10, x, x) for x in range(4000)] |
|
|
|
query = """INSERT INTO {}.test_model |
|
(a,b,c) |
|
VALUES |
|
(?,?,?)""".format(db._keyspace_) |
|
|
|
|
|
def setup_module(): |
|
init_db(db, autosync=False) |
|
db.sync_table(TestModel) |
|
|
|
|
|
def get_session(): |
|
conn = get_connection() |
|
return conn.session |
|
|
|
|
|
def get_writer(session=None): |
|
if session is None: |
|
session = get_session() |
|
return ConcurrentWriter(session, logger=log.debug) |
|
|
|
|
|
def teardown_module(): |
|
drop_db(db) |
|
|
|
|
|
def test_write(): |
|
session = get_session() |
|
concurrent = get_writer(session) |
|
insert = session.prepare(query) |
|
|
|
ts = time() |
|
for d in data: |
|
concurrent.execute_async(insert, d) |
|
te = time() |
|
dt = te - ts |
|
|
|
log.info("1kmsg in {} s, rate: {} msg/s".format(dt, len(data)/dt)) |
|
sleep(5) |
|
assert_equals(len(data), TestModel.all().count()) |
|
|
|
|
|
def test_fail(): |
|
bad_insert = """INSERT INTO {}.test_models |
|
(a,b,c) |
|
VALUES |
|
({},{},{})""".format(db._keyspace_, 1, 1, 1) |
|
session = get_session() |
|
concurrent = get_writer(session) |
|
future = concurrent.execute_async(bad_insert) |
|
|
|
try: |
|
future.result() |
|
except Exception as exp: |
|
assert_is_instance(exp, InvalidRequest) |
|
|
|
log.debug("exec count: {}".format(concurrent._exec_count)) |
|
assert_equals(0, concurrent._exec_count) |
|
|
|
|
|
def test_fail_retry(): |
|
retried = Counter() |
|
|
|
def retry_execute(exception, session, query): |
|
session.execute_async(query, timeout=10.) |
|
retried['retry'] += 1 |
|
|
|
insert = """INSERT INTO {}.test_model |
|
(a,b,c) |
|
VALUES |
|
({},{},{})""".format(db._keyspace_, 1, 1, 1) |
|
session = get_session() |
|
concurrent = get_writer(session) |
|
future = concurrent.execute_async(insert, timeout=0.00001) |
|
future.add_errback(retry_execute, session, future.query) |
|
# future.result() |
|
try: |
|
future.result() |
|
except Exception as exp: |
|
assert_is_instance(exp, OperationTimedOut) |
|
sleep(5) |
|
assert_equals(1, retried['retry']) |
|
|
|
log.debug("exec count: {}".format(concurrent._exec_count)) |
|
|
|
|
|
if __name__ == '__main__': |
|
nose.main() |