Created
September 18, 2012 19:44
-
-
Save ralphbean/3745381 to your computer and use it in GitHub Desktop.
pub/sub zeromq testing utils
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Simple example of publish/subscribe illustrating topics. | |
Publisher and subscriber can be started in any order, though if publisher | |
starts first, any messages sent before subscriber starts are lost. More than | |
one subscriber can listen, and they can listen to different topics. | |
Topic filtering is done simply on the start of the string, e.g. listening to | |
's' will catch 'sports...' and 'stocks' while listening to 'w' is enough to | |
catch 'weather'. | |
""" | |
#----------------------------------------------------------------------------- | |
# Copyright (c) 2010 Brian Granger | |
# | |
# Distributed under the terms of the New BSD License. The full license is in | |
# the file COPYING.BSD, distributed as part of this software. | |
#----------------------------------------------------------------------------- | |
import itertools | |
import sys | |
import time | |
import zmq | |
def main(): | |
if len (sys.argv) != 2: | |
print 'usage: publisher <bind-to>' | |
sys.exit (1) | |
bind_to = sys.argv[1] | |
all_topics = ['sports.general','sports.football','sports.basketball', | |
'stocks.general','stocks.GOOG','stocks.AAPL', | |
'weather'] | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PUB) | |
s.bind(bind_to) | |
# WAT | |
s.close() | |
ctx.term() | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PUB) | |
s.bind(bind_to) | |
print "Starting broadcast on topics:" | |
print " %s" % all_topics | |
print "Hit Ctrl-C to stop broadcasting." | |
print "Waiting so subscriber sockets can connect..." | |
time.sleep(1.0) | |
msg_counter = itertools.count() | |
try: | |
for topic in itertools.cycle(all_topics): | |
msg_body = str(msg_counter.next()) | |
print ' Topic: %s, msg:%s' % (topic, msg_body) | |
s.send_multipart([topic, msg_body]) | |
# short wait so we don't hog the cpu | |
time.sleep(0.1) | |
except KeyboardInterrupt: | |
pass | |
print "Waiting for message queues to flush..." | |
time.sleep(0.5) | |
print "Done." | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
"""Simple example of publish/subscribe illustrating topics. | |
Publisher and subscriber can be started in any order, though if publisher | |
starts first, any messages sent before subscriber starts are lost. More than | |
one subscriber can listen, and they can listen to different topics. | |
Topic filtering is done simply on the start of the string, e.g. listening to | |
's' will catch 'sports...' and 'stocks' while listening to 'w' is enough to | |
catch 'weather'. | |
""" | |
#----------------------------------------------------------------------------- | |
# Copyright (c) 2010 Brian Granger, Fernando Perez | |
# | |
# Distributed under the terms of the New BSD License. The full license is in | |
# the file COPYING.BSD, distributed as part of this software. | |
#----------------------------------------------------------------------------- | |
import sys | |
import time | |
import zmq | |
import numpy | |
def main(): | |
if len (sys.argv) < 2: | |
print 'usage: subscriber <connect_to> [topic topic ...]' | |
sys.exit (1) | |
connect_to = sys.argv[1] | |
topics = sys.argv[2:] | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.SUB) | |
s.connect(connect_to) | |
# manage subscriptions | |
if not topics: | |
print "Receiving messages on ALL topics..." | |
s.setsockopt(zmq.SUBSCRIBE,'') | |
else: | |
print "Receiving messages on topics: %s ..." % topics | |
for t in topics: | |
s.setsockopt(zmq.SUBSCRIBE,t) | |
try: | |
while True: | |
topic, msg = s.recv_multipart() | |
print ' Topic: %s, msg:%s' % (topic, msg) | |
except KeyboardInterrupt: | |
pass | |
print "Done." | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment