Created
January 18, 2019 05:58
-
-
Save banjin/afd585485ad5b8eb10de4513c438deab to your computer and use it in GitHub Desktop.
记录几种zmq的实现模式
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Request-Reply模式: | |
server端: | |
#!/usr/bin/env python | |
# -*- coding=utf-8 -*- | |
import zmq | |
import random | |
import time | |
context = zmq.Context() | |
socket = context.socket(zmq.REP) | |
socket.bind("tcp://*:5555") | |
while True: | |
message = socket.recv() | |
print("Received: %s" % message) | |
#time.sleep(10) | |
up = random.randint(1000,5000) | |
down = random.randint(800,1000) | |
socket.send("{},{},0,0,0,0,0,0,0".format(up,down)) | |
client 端 | |
#!/usr/bin/env python | |
# -*- coding=utf-8 -*- | |
import zmq | |
import sys | |
def get_flow(): | |
context = zmq.Context() | |
socket = context.socket(zmq.REQ) | |
socket.connect("tcp://localhost:5555") | |
socket.send('Are you OK?') | |
response = socket.recv(); | |
print("response: %s" % response) | |
if __name__=="__main__": | |
get_flow() | |
Publish-Subscribe模式: | |
广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤 | |
server 端 | |
# -*- coding=utf-8 -*- | |
import zmq | |
import time | |
context = zmq.Context() | |
socket = context.socket(zmq.PUB) | |
socket.bind("tcp://*:5555") | |
while True: | |
print('发送消息') | |
socket.send("消息群发") | |
time.sleep(1) | |
client端1 | |
# -*- coding=utf-8 -*- | |
import zmq | |
import sys | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.connect("tcp://localhost:5555") | |
socket.setsockopt(zmq.SUBSCRIBE,'') # 消息过滤 | |
while True: | |
response = socket.recv(); | |
print("response: %s" % response) | |
client 端2 | |
#!/usr/bin/env python | |
# -*- coding=utf-8 -*- | |
import zmq | |
import sys | |
context = zmq.Context() | |
socket = context.socket(zmq.SUB) | |
socket.connect("tcp://localhost:5555") | |
socket.setsockopt(zmq.SUBSCRIBE,'') | |
while True: | |
response = socket.recv(); | |
print("response: %s" % response) | |
Parallel Pipeline模式: | |
由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。 | |
当连接被断开,数据不会丢失,重连后数据继续发送到对端。 | |
Python实现 | |
server端: | |
# -*- coding=utf-8 -*- | |
import zmq | |
import time | |
context = zmq.Context() | |
socket = context.socket(zmq.PUSH) | |
socket.bind("tcp://*:5557") | |
while True: | |
socket.send("测试消息") | |
print "已发送" | |
time.sleep(1) | |
work端: | |
# -*- coding=utf-8 -*- | |
import zmq | |
context = zmq.Context() | |
recive = context.socket(zmq.PULL) | |
recive.connect('tcp://127.0.0.1:5557') | |
sender = context.socket(zmq.PUSH) | |
sender.connect('tcp://127.0.0.1:5558') | |
while True: | |
data = recive.recv() | |
print "正在转发..." | |
sender.send(data) | |
client端: | |
# -*- coding=utf-8 -*- | |
import zmq | |
import sys | |
context = zmq.Context() | |
socket = context.socket(zmq.PULL) | |
socket.bind("tcp://*:5558") | |
while True: | |
response = socket.recv(); | |
print("response: %s" % response) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment