Last active
November 14, 2023 16:48
-
-
Save cra/74bb7a9df8e29ac230f4001be31fb455 to your computer and use it in GitHub Desktop.
pin bw version
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from datetime import timedelta | |
import boto3 | |
import logging | |
# pip install bytewax==0.17.2 | |
from bytewax.connectors.stdio import StdOutput | |
from bytewax.connectors.periodic import SimplePollingInput | |
from bytewax.dataflow import Dataflow | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
) | |
class S3Input(SimplePollingInput): | |
def __init__(self, interval, bucket, subfolder): | |
# skip align_to argument | |
super().__init__(interval, None) | |
# custom S3 endpoint requeires a session obj | |
session = boto3.session.Session() | |
self.s3_client = session.client( | |
service_name='s3', | |
endpoint_url=os.getenv('S3_ENDPOINT_URL'), | |
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'), | |
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'), | |
) | |
self.bucket_name = bucket | |
self.subfolder = subfolder | |
# you might want to pass some initial value for this one | |
self.known_filenames = set() | |
def _fetch_filenames(self): | |
response = self.s3_client.list_objects_v2( | |
Bucket=self.bucket_name, | |
Prefix=self.subfolder, | |
) | |
return [ | |
item['Key'] | |
for item in response.get('Contents', []) | |
] | |
def next_item(self): | |
current = set(self._fetch_filenames()) | |
new_filenames = list(current - self.known_filenames) | |
if new_filenames: | |
logging.info(f'new filenames {new_filenames}') | |
else: | |
logging.info('no new filenames *yawns*') | |
# sorry, no transactions here | |
self.known_filenames = current | |
return new_filenames | |
def do_the_thing(filename): | |
''' | |
Placeholder function that does something with a filename passed to it | |
One cheap way to do pseudo-transactions is renaming files when you're done | |
''' | |
return (filename.lower(), len(filename)) | |
flow = Dataflow() | |
flow.input( | |
'inp', | |
S3Input( | |
interval=timedelta(seconds=10), | |
bucket=os.getenv('S3_BUCKET'), | |
subfolder=os.getenv('S3_SUBFOLDER'), | |
), | |
) | |
flow.flat_map(lambda x: x) | |
flow.redistribute() | |
flow.map(do_the_thing) | |
flow.output('out', StdOutput()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export S3_ENDPOINT_URL=https://storage.... | |
export AWS_ACCESS_KEY_ID=... | |
export AWS_SECRET_ACCESS_KEY=... | |
export S3_BUCKET=some-data | |
export S3_FOLDER=year=2023/month=05/day=02 |
Author
cra
commented
Nov 14, 2023
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment