Last active
July 10, 2023 21:36
-
-
Save mhihasan/4dd0917758d0a3e9a3a58248ed201951 to your computer and use it in GitHub Desktop.
Dynamodb Parallel Scanner
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import time | |
import csv | |
from concurrent.futures import ThreadPoolExecutor, wait | |
import boto3 | |
dynamo_client = boto3.resource("dynamodb").meta.client | |
def _scan_segment(segment_no, total_segments, **kwargs): | |
print(f"Scanning segment no: {segment_no}") | |
items = [] | |
while True: | |
result = dynamo_client.scan(Segment=segment_no, TotalSegments=total_segments, **kwargs) | |
if result["Items"]: | |
items.extend(result["Items"]) | |
last_evaluated_key = result.get("LastEvaluatedKey") | |
if last_evaluated_key is None: | |
break | |
kwargs["ExclusiveStartKey"] = last_evaluated_key | |
print(f"total items in seg: {segment_no}", len(items)) | |
return items | |
def scan(**kwargs): | |
total_segments = kwargs.pop("TotalSegments", 10) | |
results = [] | |
with ThreadPoolExecutor(max_workers=total_segments) as executor: | |
futures = { | |
executor.submit(_scan_segment, segment_no=i, total_segments=total_segments, **kwargs) | |
for i in range(total_segments) | |
} | |
done, _ = wait(futures) | |
for future in done: | |
res = future.result() | |
results.extend(res) | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment