Created
May 24, 2022 20:01
-
-
Save wallyqs/544c6f4fd87c3e880c83b86acb081fe8 to your computer and use it in GitHub Desktop.
js pub bench
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import asyncio | |
import sys | |
import time | |
from random import randint | |
import nats | |
try: | |
import uvloop | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
except: | |
pass | |
DEFAULT_FLUSH_TIMEOUT = 30 | |
DEFAULT_NUM_MSGS = 100000 | |
DEFAULT_MSG_SIZE = 16 | |
DEFAULT_BATCH_SIZE = 100 | |
HASH_MODULO = 1000 | |
def show_usage(): | |
message = """ | |
Usage: pub_perf [options] | |
options: | |
-n COUNT Messages to send (default: 100000} | |
-s SIZE Message size (default: 16) | |
-S SUBJECT Send subject (default: (test) | |
-b BATCH Batch size (default: (100) | |
""" | |
print(message) | |
def show_usage_and_die(): | |
show_usage() | |
sys.exit(1) | |
async def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-n', '--count', default=DEFAULT_NUM_MSGS, type=int) | |
parser.add_argument('-s', '--size', default=DEFAULT_MSG_SIZE, type=int) | |
parser.add_argument('-S', '--subject', default='test') | |
parser.add_argument('-b', '--batch', default=DEFAULT_BATCH_SIZE, type=int) | |
parser.add_argument('--servers', default=[], action='append') | |
args = parser.parse_args() | |
data = [] | |
for i in range(0, args.size): | |
s = "%01x" % randint(0, 15) | |
data.append(s.encode()) | |
payload = b''.join(data) | |
servers = args.servers | |
if len(args.servers) < 1: | |
servers = ["nats://127.0.0.1:4222"] | |
# Make sure we're connected to a server first.. | |
try: | |
nc = await nats.connect(servers, pending_size=1024*1024) | |
except Exception as e: | |
sys.stderr.write(f"ERROR: {e}") | |
show_usage_and_die() | |
js = nc.jetstream() | |
await js.add_stream(name=args.subject) | |
# Start the benchmark | |
start = time.time() | |
to_send = args.count | |
print("Sending {} messages of size {} bytes on [{}]".format( | |
args.count, args.size, args.subject)) | |
while to_send > 0: | |
for i in range(0, args.batch): | |
to_send -= 1 | |
await js.publish(args.subject, payload) | |
if (to_send % HASH_MODULO) == 0: | |
sys.stdout.write("#") | |
sys.stdout.flush() | |
if to_send == 0: | |
break | |
# Minimal pause in between batches sent to server | |
await asyncio.sleep(0.00001) | |
# Additional roundtrip with server to try to ensure everything has been sent already. | |
try: | |
await nc.flush(DEFAULT_FLUSH_TIMEOUT) | |
except nats.aio.errors.ErrTimeout: | |
print(f"Server flush timeout after {DEFAULT_FLUSH_TIMEOUT}") | |
elapsed = time.time() - start | |
mbytes = "%.1f" % (((args.size * args.count)/elapsed) / (1024*1024)) | |
print("\nTest completed : {} msgs/sec ({}) MB/sec".format( | |
args.count/elapsed, | |
mbytes)) | |
await nc.close() | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment