Created
August 19, 2023 13:06
-
-
Save nhammad/1950d68434d754413a798e5a8d1c9f80 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Copyright 2013-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"). You may not | |
# use this file except in compliance with the License. A copy of the License | |
# is located at | |
# | |
# http://aws.amazon.com/apache2.0/ | |
# | |
# or in the "LICENSE.txt" file accompanying this file. This file is | |
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, either express or implied. See the License for the specific language | |
# governing permissions and limitations under the License. | |
from __future__ import print_function | |
import base64 | |
import boto3 | |
import botocore.exceptions | |
import argparse | |
import json | |
import threading | |
import time | |
import datetime | |
import uuid | |
from argparse import RawTextHelpFormatter | |
from random import choice | |
# To preclude inclusion of aws keys into this code, you may temporarily add | |
# your AWS credentials to the file: | |
# ~/.boto | |
# as follows: | |
# [Credentials] | |
# aws_access_key_id = <your access key> | |
# aws_secret_access_key = <your secret key> | |
contexts = { | |
"schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1", | |
"data": [ | |
{ | |
"schema": "iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0", | |
"data": {"useragentFamily": "Other", "useragentMajor": None, "useragentMinor": None, "useragentPatch": None, "useragentVersion": "Other", "osFamily": "Other", "osMajor": None, "osMinor": None, "osPatch": None, "osPatchMinor": None, "osVersion": "Other", "deviceFamily": "Other"} | |
}, | |
{ | |
"schema": "iglu:nl.basjes/yauaa_context/jsonschema/1-0-2", | |
"data": {"deviceBrand": "Unknown", "deviceName": "Unknown", "operatingSystemVersionMajor": "??", "layoutEngineNameVersion": "Unknown ??", "operatingSystemNameVersion": "Unknown ??", "layoutEngineNameVersionMajor": "Unknown ??", "operatingSystemName": "Unknown", "agentVersionMajor": "3", "layoutEngineVersionMajor": "??", "deviceClass": "Unknown", "agentNameVersionMajor": "Snowplow-Nodejs-Tracker 3", "operatingSystemNameVersionMajor": "Unknown ??", "operatingSystemClass": "Unknown", "layoutEngineName": "Unknown", "agentName": "Snowplow-Nodejs-Tracker", "agentVersion": "3.1.6", "layoutEngineClass": "Unknown", "agentNameVersion": "Snowplow-Nodejs-Tracker 3.1.6", "operatingSystemVersion": "??", "agentClass": "Special", "layoutEngineVersion": "??"} | |
} | |
] | |
} | |
event_with_placeholders = [ | |
"{app_id}", | |
"srv", | |
"{timestamp}", | |
"{timestamp}", | |
"{timestamp}", | |
"struct", | |
"{event_id}", | |
"", | |
"python", | |
"1.0", | |
# 10: v_collector | |
"ssc-2.3.2-rc1-kinesis", | |
"streamCommon-2.0.2-common-2.0.2", | |
"", | |
"46.114.141.x", | |
"", | |
"", | |
"", | |
"{network_userid}", | |
"DE", | |
"NI", | |
# 20: geo_city | |
"Garbsen", | |
"30823", | |
"52.4164", | |
"9.5963", | |
"Lower Saxony", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 30: page_title | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 40: refr_urlport | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 50: mkt_content | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 60: tr_affiliation | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 70: ti_category | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"snowplow-nodejs-tracker/3.1.6", | |
"", | |
"", | |
# 80: br_version | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 90: br_features_windowsmedia | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 100: os_timezone | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
# 110: tr_tax_base | |
"", | |
"", | |
"", | |
"", | |
"", | |
"Europe/Berlin", | |
"", | |
"", | |
"", | |
"{timestamp}", | |
# 120: refr_domain_userid | |
"", | |
"", | |
"{contexts}", | |
"", | |
"{timestamp}", | |
"{event_vendor}", | |
"{event_name}", | |
"jsonschema", | |
"1-0-1", | |
"", | |
"{timestamp}" | |
] | |
event = "\t".join(event_with_placeholders) | |
def make_string(x): | |
return event.format( | |
app_id="load-test", | |
contexts=json.dumps(contexts, separators=(',', ':')), | |
event_id=uuid.uuid4(), | |
event_vendor="com.axelspringer.ott", | |
event_name="hls_manifest_requested", | |
network_userid=uuid.uuid4(), | |
timestamp=datetime.datetime.now().isoformat(sep=" ", timespec="milliseconds")) | |
def get_or_create_stream(stream_name, shard_count): | |
stream = None | |
try: | |
stream = kinesis.describe_stream(StreamName=stream_name) | |
# print(json.dumps(stream, sort_keys=True, indent=2, | |
# separators=(',', ': '))) | |
except kinesis.exceptions.ResourceNotFoundException as rnfe: | |
while (stream is None) or ('ACTIVE' not in stream['StreamDescription']['StreamStatus']): | |
if stream is None: | |
print ('Could not find ACTIVE stream:{0} trying to create.'.format( | |
stream_name)) | |
kinesis.create_stream(stream_name, shard_count) | |
else: | |
print ("Stream status: %s" % stream['StreamDescription']['StreamStatus']) | |
time.sleep(1) | |
stream = kinesis.describe_stream(stream_name) | |
return stream | |
def sum_posts(kinesis_actors): | |
"""Sum all posts across an array of KinesisPosters | |
""" | |
total_records = 0 | |
for actor in kinesis_actors: | |
total_records += actor.total_records | |
return total_records | |
class KinesisPoster(threading.Thread): | |
"""The Poster thread that repeatedly posts records to shards in a given | |
Kinesis stream. | |
""" | |
def __init__(self, stream_name, partition_key, poster_time=30, quiet=False, | |
name=None, group=None, filename=None, args=(), kwargs={}): | |
super(KinesisPoster, self).__init__(name=name, group=group, | |
args=args, kwargs=kwargs) | |
self._pending_records = [] | |
self.stream_name = stream_name | |
self.partition_key = partition_key | |
self.quiet = quiet | |
self.default_records = [ | |
make_string(100), make_string(1000), make_string(500), | |
make_string(5000), make_string(10), make_string(750), | |
make_string(10), make_string(2000), make_string(500) | |
] | |
self.poster_time = poster_time | |
self.total_records = 0 | |
self.file_contents = None | |
if filename is not None: | |
print('~> opening file:{0}'.format(filename)) | |
with open(filename, 'r') as content_file: | |
self.file_contents = content_file.read(40000) | |
def add_records(self, records): | |
""" Add given records to the Poster's pending records list. | |
""" | |
# print('~> adding records:{0}'.format(records)) | |
if len(records) == 1: | |
self._pending_records.extend(records[0]) | |
else: | |
self._pending_records.extend(records) | |
def put_all_records(self): | |
"""Put all pending records in the Kinesis stream.""" | |
precs = self._pending_records | |
self._pending_records = [] | |
self.put_records(precs) | |
self.total_records += len(precs) | |
return len(precs) | |
def put_file_contents(self): | |
if self.file_contents: | |
response = kinesis.put_record( | |
StreamName=self.stream_name, | |
Data=self.file_contents, PartitionKey=self.partition_key) | |
self.total_records += 1 | |
if self.quiet is False: | |
print ("-= put seqNum:", response['SequenceNumber']) | |
def put_records(self, records): | |
"""Put the given records in the Kinesis stream.""" | |
def wrap(line): | |
return { | |
'Data': bytes(line, 'utf-8'), | |
'PartitionKey': self.partition_key | |
} | |
kinesis_records = list(map(wrap, records)) | |
response = kinesis.put_records( | |
StreamName=self.stream_name, | |
Records=kinesis_records) | |
if self.quiet is False: | |
print ("-= put successfulRecords:", len(response['Records'])) | |
def run(self): | |
start = datetime.datetime.now() | |
finish = start + datetime.timedelta(seconds=self.poster_time) | |
while finish > datetime.datetime.now(): | |
if self.file_contents: | |
self.put_file_contents() | |
else: | |
self.add_records([make_string(x) for x in range(50)]) | |
records_put = self.put_all_records() | |
if self.quiet is False: | |
print(' Total Records Put:', self.total_records) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser( | |
description='''Create or attach to a Kinesis stream and put records in the stream''', | |
formatter_class=RawTextHelpFormatter) | |
parser.add_argument('stream_name', | |
help='''the name of the Kinesis stream to either connect to or create''') | |
parser.add_argument('--region', type=str, default='us-east-1', | |
help='''the name of the Kinesis region to connect with [default: us-east-1]''') | |
parser.add_argument('--shard_count', type=int, default=1, | |
help='''the number of shards to create in the stream, if creating [default: 1]''') | |
parser.add_argument('--partition_key', default='PyKinesisExample', | |
help='''the partition key to use when communicating records to the | |
stream [default: 'PyKinesisExample-##']''') | |
parser.add_argument('--poster_count', type=int, default=2, | |
help='''the number of poster threads [default: 2]''') | |
parser.add_argument('--poster_time', type=int, default=30, | |
help='''how many seconds the poster threads should put records into | |
the stream [default: 30]''') | |
parser.add_argument('--record_file', type=str, default=None, | |
help='''the file whose contents to use as a record''') | |
parser.add_argument('--quiet', action='store_true', default=False, | |
help='''reduce console output to just initialization info''') | |
parser.add_argument('--delete_stream', action='store_true', default=False, | |
help='''delete the Kinesis stream matching the given stream_name''') | |
parser.add_argument('--describe_only', action='store_true', default=False, | |
help='''only describe the Kinesis stream matching the given stream_name''') | |
threads = [] | |
args = parser.parse_args() | |
#kinesis = boto3.client('kinesis', region_name=args.region) | |
kinesis = boto3.client('kinesis',region_name=args.region) | |
if (args.delete_stream): | |
# delete the given Kinesis stream name | |
kinesis.delete_stream(stream_name=args.stream_name) | |
else: | |
start_time = datetime.datetime.now() | |
if args.describe_only is True: | |
# describe the given Kinesis stream name | |
stream = kinesis.describe_stream(StreamName=args.stream_name) | |
print(json.dumps(stream, sort_keys=True, indent=2, | |
separators=(',', ': '))) | |
else: | |
stream = get_or_create_stream(args.stream_name, args.shard_count) | |
# Create a KinesisPoster thread up to the poster_count value | |
for pid in range(args.poster_count): | |
# create poster name per poster thread | |
poster_name = 'shard_poster:%s' % pid | |
# create partition key per poster thread | |
part_key = args.partition_key + '-' + str(pid) | |
poster = KinesisPoster( | |
stream_name=args.stream_name, | |
partition_key=part_key, # poster's partition key | |
poster_time=args.poster_time, | |
name=poster_name, # thread name | |
filename=args.record_file, | |
quiet=args.quiet) | |
poster.daemon = True | |
threads.append(poster) | |
print ('starting: ', poster_name) | |
poster.start() | |
# Wait for all threads to complete | |
for t in threads: | |
t.join() | |
finish_time = datetime.datetime.now() | |
duration = (finish_time - start_time).total_seconds() | |
total_records = sum_posts(threads) | |
print ("-=> Exiting Poster Main <=-") | |
print (" Total Records:", total_records) | |
print (" Total Time:", duration) | |
print (" Records / sec:", total_records / duration) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment