Created
April 30, 2012 08:19
-
-
Save dropwhile/2556514 to your computer and use it in GitHub Desktop.
socketpool memcache example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 - | |
# | |
import logging | |
import random | |
import memcache | |
import gevent.socket | |
# monkey patch to patch sockets | |
memcache.socket = gevent.socket | |
from socketpool.pool import ConnectionPool, MaxTriesError | |
from socketpool.conn import Connector | |
class MemcacheClient(memcache.Client): | |
def set_servers(self, host_backends): | |
self.servers = host_backends | |
self._init_buckets() | |
class MemcacheFactory(Connector): | |
def __init__(self, servers, backend_mod, pool=None): | |
self.servers = servers | |
self._pool = pool | |
self._connected = False | |
self.mclient = None | |
self.cache_servers = None | |
# use a jiggle of random 0,10 seconds, to avoid | |
# reaping all conns at once should they be created | |
# all at once from a burst of traffic or something | |
self._life = time.time() - random.randint(0,10) | |
self.cache_servers = [memcache._Host(s, 0) for s in self.servers] | |
self.mclient = MemcacheClient(self.cache_servers) | |
self._connected = True | |
def __del__(self): | |
try: | |
self.release() | |
except Exception as e: | |
logging.warning('Error in cache __del__', exc_info=e) | |
def matches(self, **match_options): | |
# all backend conns are the same, so match is always true | |
return True | |
def is_connected(self): | |
return self._connected | |
def get_lifetime(self): | |
return self._life | |
def invalidate(self): | |
if self.mclient is not None: | |
self.mclient.buckets = None | |
self.mclient.servers = None | |
self.mclient = None | |
if self._pool is not None: | |
self._pool = None | |
## test for none-ness in case invalidate gets called | |
## more than once | |
if self.cache_servers is not None: | |
for s in self.cache_servers: | |
s.close_socket() | |
self.cache_servers = None | |
self._connected = False | |
self._life = -1 | |
def release(self): | |
if self._pool is not None and self._connected: | |
#logging.debug('back in the pool with you!') | |
self._pool.release_connection(self) | |
else: | |
# otherwise cleanup and let it go | |
self.invalidate() | |
# close is an alias for release | |
close = release | |
def handle_exception(self, exception): | |
logging.exception("Memcache pool exception") | |
raise exception | |
def __getattr__(self, name): | |
#logging.debug('pool called: %s', name) | |
if self.mclient is not None: | |
return getattr(self.mclient, name) | |
else: | |
raise AttributeError | |
if __name__ == '__main__': | |
import time | |
servers = ['127.0.0.1:11211'] | |
pool = ConnectionPool( | |
factory=MemcacheFactory, backend="gevent", | |
max_size=20, max_lifetime=600, retry_max=4, retry_delay=.5) | |
def runpool(data): | |
print 'ok' | |
i = 0 | |
with pool.connection(servers=servers) as conn: | |
i += 1 | |
cache_key = 'cache-key-%d' % i | |
print 'set' | |
conn.set(cache_key, data) | |
print 'get' | |
val = conn.get(cache_key) | |
print "got %s" % val | |
assert data == val | |
gevent.sleep(1) | |
start = time.time() | |
jobs = [gevent.spawn(runpool, "blahblah") for _ in xrange(2)] | |
gevent.joinall(jobs) | |
delay = time.time() - start | |
print 'runtime: %s' % delay |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment