Created
September 12, 2012 14:46
-
-
Save ronnix/3707121 to your computer and use it in GitHub Desktop.
Tornado + Redis listener thread example code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from threading import Thread | |
import redis | |
import tornado.web | |
class SomeRequestHandler(tornado.web.RequestHandler): | |
def initialize(self): | |
self.connection_pool = redis.ConnectionPool(host='127.0.0.1', port=6379) | |
self.redis_client = redis.Redis(connection_pool=self.connection_pool) | |
def get(self): | |
# Tell the listener to listen to some additional channels | |
client_id = '...something...' | |
channels = ['foo', 'bar'] | |
self.redis_client.publish('commands', 'subscribe %s %s' % (client_id, ' '.join(channels))) | |
# Do other stuff | |
# ... | |
class RedisSubscriber(Thread): | |
""" | |
This thread subscribes to Redis channels on behalf of multiple clients | |
""" | |
def __init__(self, *args, **kwargs): | |
Thread.__init__(self, *args, **kwargs) | |
# Setup the Redis connection | |
self.connection_pool = redis.ConnectionPool(host='127.0.0.1', port=6379) | |
self.client = redis.Redis(connection_pool=self.connection_pool) | |
self.pubsub = self.client.pubsub() | |
# Who subscribes to what, and vice-versa | |
self.channel_subscribers = defaultdict(set) | |
self.client_channels = defaultdict(set) | |
def run(self): | |
""" | |
Listen to events on the Redis channels | |
""" | |
# The 'commands' channel is used for internal communication | |
self.pubsub.subscribe('commands') | |
for event in self.pubsub.listen(): | |
if event['type'] == 'message': | |
channel = event['channel'] | |
data = event['data'] | |
# Internal commands | |
if channel == 'commands': | |
quit = self.process_command(data) | |
if quit: | |
break | |
# Other messages | |
else: | |
self.do_something(channel, message) | |
def process_command(self, command): | |
""" | |
Process a command | |
""" | |
if command == 'quit': | |
return True | |
command, params = command.split(' ', 1) | |
if command == 'subscribe': | |
client, channels = params.split(' ', 1) | |
channels = channels.split() | |
self.subscribe(client, channels) | |
elif command == 'unsubscribe': | |
client = params | |
self.unsubscribe(client) | |
return False | |
def subscribe(self, client, channels): | |
""" | |
A client wants to subscribe to some channels | |
""" | |
for channel in channels: | |
# Add the client ID to the subscribers of the channel | |
self.channel_subscribers[channel].add(client) | |
# Add the channel to the client subscriptions | |
self.client_channels[client].add(channel) | |
# Subscribe to the Redis channel | |
self.pubsub.subscribe(channel) | |
def unsubscribe(self, client): | |
""" | |
A client wants to unsubscribe from all channels | |
""" | |
# Remove the client ID from the subscribers of each channel | |
for channel in self.client_channels[client]: | |
self.channel_subscribers[channel].remove(client) | |
# No more subscribers? | |
if len(self.channel_subscribers[channel]) == 0: | |
self.pubsub.unsubscribe(channel) | |
del self.channel_subscribers[channel] | |
# Remove client subscriptions | |
del self.client_channels[client] | |
def do_something(self, channel, message): | |
# ... | |
if __name__ == "__main__": | |
# Start the Redis subscriber background thread | |
listener = RedisSubscriber() | |
listener.start() | |
# Start the Tornado I/O loop in the main thread | |
application = tornado.web.Application([ | |
# ... | |
]) | |
application.listen(8080) | |
tornado.ioloop.IOLoop.instance().start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment