Last active
April 29, 2023 08:04
-
-
Save hkulekci/db613ba510d0494cd1e414e922fd50e0 to your computer and use it in GitHub Desktop.
Kinesis Queue Check
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
def check_kinesis_stream_data(stream_name): | |
# Create a Kinesis client using your AWS credentials | |
kinesis_client = boto3.client('kinesis') | |
# Get the shard iterator for the specified stream | |
shard_response = kinesis_client.describe_stream(StreamName=stream_name) | |
shard_id = shard_response['StreamDescription']['Shards'][0]['ShardId'] | |
shard_iterator_response = kinesis_client.get_shard_iterator( | |
StreamName=stream_name, | |
ShardId=shard_id, | |
ShardIteratorType='TRIM_HORIZON' | |
) | |
shard_iterator = shard_iterator_response['ShardIterator'] | |
# Get the records from the shard iterator | |
records_response = kinesis_client.get_records(ShardIterator=shard_iterator, Limit=1) | |
records = records_response['Records'] | |
return len(records) > 0 | |
# Replace 'your-stream-name' with the name of the Kinesis stream you want to check | |
stream_name = 'your-stream-name' | |
if check_kinesis_stream_data(stream_name): | |
print(f"The {stream_name} has data.") | |
else: | |
print(f"The {stream_name} is empty.") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
def list_files_by_creation_date(bucket, prefix): | |
# Create an S3 client using your AWS credentials | |
s3_client = boto3.client('s3') | |
# List all objects within the specified folder | |
objects = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix) | |
# Sort the objects by creation date | |
sorted_objects = sorted(objects['Contents'], key=lambda obj: obj['LastModified'], reverse=True) | |
return sorted_objects | |
def read_most_recent_file(bucket, prefix): | |
sorted_objects = list_files_by_creation_date(bucket, prefix) | |
# Get the most recently created file | |
most_recent_file = sorted_objects[0] | |
# Read the content of the most recent file | |
s3_client = boto3.client('s3') | |
s3_file = s3_client.get_object(Bucket=bucket, Key=most_recent_file['Key']) | |
file_content = s3_file['Body'].read().decode('utf-8') | |
return file_content | |
# Replace 'your-s3-bucket' and 'your-s3-prefix' with the S3 bucket and prefix of the folder you want to read | |
s3_bucket = 'your-s3-bucket' | |
s3_prefix = 'your-s3-prefix' | |
most_recent_file_content = read_most_recent_file(s3_bucket, s3_prefix) | |
print(f"The content of the most recent file is:\n{most_recent_file_content}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import fnmatch | |
def list_files_with_dynamic_prefix(bucket, pattern): | |
s3_client = boto3.client('s3') | |
objects = s3_client.list_objects_v2(Bucket=bucket) | |
filtered_objects = [obj for obj in objects['Contents'] if fnmatch.fnmatch(obj['Key'], pattern)] | |
return filtered_objects | |
# Replace 'your-s3-bucket' with the name of your S3 bucket | |
s3_bucket = 'your-s3-bucket' | |
# Replace 'your-pattern' with the pattern to match, e.g., 'my-data-folder-*/*' | |
pattern = 'your-pattern' | |
filtered_objects = list_files_with_dynamic_prefix(s3_bucket, pattern) | |
print(f"Filtered objects:\n{filtered_objects}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment