import asyncio import aiohttp import aioboto3 import boto3 import xmltodict import json import time async def build_fetchRequest(oai, resumptionToken): try: base = f"{oai.get('url')}?verb=ListRecords" except KeyError: print('no oai feed url') if isinstance(resumptionToken[0], bool) and resumptionToken[0]: # first request - url includes metadataPrefix and set url = base if oai.get('metadataPrefix'): url = f"{url}&metadataPrefix={oai.get('metadataPrefix')}" if oai.get('oai_set'): url = f"{url}&set={oai.get('oai_set')}" elif resumptionToken[0].get('#text'): # next requests - url includes only resumptionToken url = f"{base}&resumptionToken={resumptionToken[0].get('#text')}" else: url = None return url async def parse_httpResp(httpResp): data = xmltodict.parse(httpResp) records = data.get('OAI-PMH', {}).get('ListRecords', {}).get('record') jsonl = "\n".join([json.dumps(record) for record in records]) resumptionToken = data.get('OAI-PMH', {}).get('ListRecords', {}).get('resumptionToken', False) return jsonl, resumptionToken """oai_read_write connects to s3 & http to get data from oai, process it, and write to s3 two side effects: increments page and resumptionToken to pass to next lambda """ async def oai_read_write(collection_id, page, resumptionToken, oai): # https://pypi.org/project/aioboto3/ async with aioboto3.client("s3") as s3_client: # https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request # Note: Don't create a session per request async with aiohttp.ClientSession() as http_client: fetchRequest = await build_fetchRequest(oai, resumptionToken) while(fetchRequest): async with http_client.get(fetchRequest) as response: httpResp = await response.text() records, nextResumptionToken = await parse_httpResp(httpResp) # upload records to s3 bucket = 'amy-test-bucket' key = f"{collection_id}/{time.strftime('%Y-%m-%d')}/{page[0]}.jsonl" acl = 'bucket-owner-full-control' try: # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html#S3.Client.put_object await s3_client.put_object( ACL=acl, Body=records, Bucket=bucket, Key=key) except Exception as e: print(e) # side effects page[0] += 1 resumptionToken[0] = nextResumptionToken # next step for this while loop fetchRequest = await build_fetchRequest(oai, resumptionToken) async def timer(payload): collection_id = payload.get('collection_id') page = payload.get('page') resumptionToken = payload.get('resumptionToken') oai = payload.get('oai') if not collection_id or not page or not resumptionToken or not oai: print('required parameters not sent') return None # start timer startTime = time.strftime('%X') try: # https://docs.python.org/3/library/asyncio-task.html#timeouts await asyncio.wait_for( oai_read_write(collection_id, page, resumptionToken, oai), 60) # time in seconds - set to 15 for testing with this small collection except asyncio.TimeoutError: # time to spawn a new lambda # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html lambda_client = boto3.client('lambda', region_name="us-west-2",) lambda_client.invoke( FunctionName="async-fetch-test", InvocationType="Event", #invoke asynchronously Payload=json.dumps({ 'collection_id': collection_id, 'page': page, 'resumptionToken': resumptionToken, 'oai': oai }).encode('utf-8') ) print(f"NEW LAMBDA: resumptionToken: {resumptionToken}, page: {page}") # await timer({ # 'collection_id': collection_id, # 'page': page, # 'resumptionToken': resumptionToken, # 'oai': oai}) endTime = time.strftime('%X') print(f"started at {startTime}") print(f"finished at {endTime}") def lambda_handler(payload, context): asyncio.run(timer(payload)) return { 'statusCode': 200, 'body': json.dumps('Hello from Lambda!') } # handler({ # 'collection_id': 27435, # 'page': [0], # 'resumptionToken': [True], # 'oai': { # 'url': "https://digicoll.lib.berkeley.edu/oai2d", # 'metadataPrefix': "marcxml", # 'oai_set': "sugoroku" # } # }, {})