Last active
July 22, 2021 17:41
-
-
Save lvxejay/a0dcb03453d73e8f9afc1d4c914e32ba to your computer and use it in GitHub Desktop.
ZMQ Benchmarking
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env/python3 | |
# --------------------------------------------------------------------------- HEADER --# | |
""" | |
:author: | |
Jared Webber | |
:description: | |
Somewhat crude ZMQ Performance Benchmark and Tests | |
:license: | |
Copyright (C) 2021 ForgeXYZ, LLC | |
""" | |
# -------------------------------------------------------------------------- IMPORTS --# | |
import sys | |
import zmq | |
from multiprocessing import Process | |
import numpy as np | |
import time | |
# ------------------------------------------------------------------------ FUNCTIONS --# | |
def array_worker(*args): | |
context = zmq.Context() | |
work_receiver = context.socket(zmq.PULL) | |
work_receiver.connect("tcp://127.0.0.1:5557") | |
mbits = work_receiver.recv() | |
deserialized_x = np.frombuffer(mbits, dtype=np.float64).reshape(3840, 2160, 4) | |
def float_worker(rounds): | |
context = zmq.Context() | |
work_receiver = context.socket(zmq.PULL) | |
work_receiver.connect("tcp://127.0.0.1:5557") | |
for task_nbr in range(rounds): | |
mbits = work_receiver.recv() | |
def mtx_worker(rounds): | |
context = zmq.Context() | |
work_receiver = context.socket(zmq.PULL) | |
work_receiver.connect("tcp://127.0.0.1:5557") | |
for task_nbr in range(rounds): | |
mbits = work_receiver.recv() | |
deserialized_x = np.frombuffer(mbits, dtype=np.float64).reshape(4, 4) | |
def zmq_vent(worker_func, msg=10.0, rounds=1): | |
"""Send 10 million messages across the network.""" | |
Process(target=worker_func, args=(rounds,)).start() | |
context = zmq.Context() | |
ventilator_send = context.socket(zmq.PUSH) | |
ventilator_send.bind("tcp://127.0.0.1:5557") | |
it = 0 | |
while it <= rounds: | |
ventilator_send.send(msg) | |
it += 1 | |
return it | |
def test(): | |
"""Ventillate 1 million messages.""" | |
print("Starting tests...") | |
# Test 1: Vent 5 million matrices | |
mtx = np.identity(4, dtype=np.float64) | |
mtx_bits = mtx.tobytes() | |
start_time = time.time() | |
it = zmq_vent(mtx_worker, msg=mtx_bits, rounds=5000000) | |
end_time = time.time() | |
duration = end_time - start_time | |
msg_per_sec = it / duration | |
print("\n -- Matrix Test --") | |
print("Duration: %s" % duration) | |
print("Messages Per Second: %s" % msg_per_sec) | |
# Test 2: Send a 4k image across the network | |
arr = np.random.ranf((3840, 2160, 4)) | |
start_time = time.time() | |
it = zmq_vent(array_worker, msg=arr, rounds=1) | |
end_time = time.time() | |
duration = end_time - start_time | |
msg_per_sec = (3840*2160*4) / duration | |
print("\n -- Image Test --") | |
print("Duration: %s" % duration) | |
print("Pixels per Second: %s " % msg_per_sec) | |
# Test 3: Send a floats across the network | |
start_time = time.time() | |
it = zmq_vent(float_worker, msg=b"10.0", rounds=5000000) | |
end_time = time.time() | |
duration = end_time - start_time | |
msg_per_sec = it / duration | |
print("\n -- Float Test --") | |
print("Duration: %s" % duration) | |
print("Messages Per Second: %s" % msg_per_sec) | |
print("\n Finished tests, exiting...") | |
sys.exit(1) | |
# ------------------------------------------------------------------------ FUNCTIONS --# | |
if __name__ == '__main__': | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment