Last active
August 18, 2022 15:53
-
-
Save piraka9011/a54c558016d40ac684a65d0e35e825f3 to your computer and use it in GitHub Desktop.
Transfer from AWS to Wasabi S3 using boto3. Run `list_keys.py` first then `move_keys.py`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
from tqdm import tqdm | |
# Estimate from the S3 console | |
NUM_OBJECTS = 17292115 | |
def list_objects_parallel(bucket, prefix): | |
objects = [] | |
pbar = tqdm(total=NUM_OBJECTS) | |
s3_client = boto3.client("s3") | |
paginator = s3_client.get_paginator("list_objects_v2") | |
pagination_config = { | |
# Max API provides anyway | |
"PageSize": 1000, | |
} | |
# Loop based | |
for page in paginator.paginate(Bucket=bucket, Prefix=prefix, PaginationConfig=pagination_config): | |
results = [f"{key['Key']}\n" for key in page["Contents"]] | |
objects.extend(results) | |
pbar.update(len(results)) | |
return objects | |
if __name__ == "__main__": | |
bucket_name = "my-bucket-name" | |
key_prefix = "key-prefix/" | |
output_filename = "all_keys.txt" | |
all_keys = list_objects_parallel(bucket_name, key_prefix) | |
with open(output_filename, "w") as fd: | |
fd.writelines(all_keys) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ThreadPoolExecutor, as_completed | |
from functools import lru_cache, partial | |
from itertools import chain | |
import multiprocessing | |
import boto3 | |
from botocore.errorfactory import ClientError | |
from tqdm import tqdm | |
@lru_cache() | |
def aws_s3_client(): | |
# Update the keys accordingly | |
return boto3.client( | |
"s3", | |
region_name="us-west-2", | |
aws_access_key_id="***", | |
aws_secret_access_key="***", | |
) | |
@lru_cache() | |
def wasabi_s3_client(): | |
# Update the keys accordingly | |
return boto3.client( | |
"s3", | |
endpoint_url="https://s3.us-west-1.wasabisys.com", | |
region_name="us-west-1", | |
aws_access_key_id="***", | |
aws_secret_access_key="***", | |
) | |
def chunks(l, n): | |
"""Yield n number of striped chunks from l.""" | |
for i in range(0, n): | |
yield l[i::n] | |
def move_file(key, bucket, src_client, dest_client): | |
try: | |
# Check if key exists first. | |
# Can ignore this check by removing try/except block and using only the exception handling block below. | |
dest_client.head_object(Bucket=bucket, Key=key) | |
except ClientError as e: | |
original_object = src_client.get_object( | |
Bucket=bucket, | |
Key=key, | |
) | |
dest_client.put_object( | |
Bucket=bucket, Key=key, Body=original_object["Body"].read() | |
) | |
def process_chunk(chunk, bucket, max_workers=32): | |
aws_s3 = aws_s3_client() | |
wasabi_s3 = wasabi_s3_client() | |
_move_file = partial(move_file, bucket=bucket, src_client=aws_s3, dest_client=wasabi_s3) | |
failed_moves = [] | |
keys = chunk[0] | |
position = chunk[1] | |
with tqdm(total=len(keys), position=position) as pbar: | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
futures = { | |
executor.submit(_move_file, key): key | |
for key in keys | |
} | |
for future in as_completed(futures): | |
if future.exception(): | |
failed_key = futures[future] | |
failed_moves.append(failed_key) | |
pbar.set_postfix_str(f"Failed: {failed_key}") | |
pbar.update(1) | |
return failed_moves | |
if __name__ == "__main__": | |
bucket_name = "my-bucket-name" | |
# Adjust based on output from `nproc` and your CPU's threads per core (`lscpu`) | |
num_proc = 126 | |
max_workers = 32 | |
filename = "all_keys.txt" | |
with open(filename, "r") as fd: | |
lines = fd.readlines() | |
# Remove newline (\n) character, whitespaces | |
lines = [line.strip() for line in lines] | |
line_chunks = list(chunks(lines, num_proc)) | |
# Add and index to help with positioning progress bars | |
idx = range(len(line_chunks)) | |
iterables = list(zip(line_chunks, idx)) | |
_process_chunk = partial(process_chunk, bucket=bucket_name, max_workers=max_workers) | |
with multiprocessing.Pool(num_proc) as p: | |
result = list(p.imap(_process_chunk, iterables)) | |
result = list(chain(*result)) | |
if len(result) > 0: | |
failed_moves = [f"{item}\n" for item in result] | |
with open("failed_keys.txt", "w") as fd: | |
fd.writelines(failed_moves) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment