Created
May 6, 2022 06:14
-
-
Save alivelimb/c7239272e58b8b1de6c7fd2df95da4f4 to your computer and use it in GitHub Desktop.
S3Connector
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from contextlib import contextmanager | |
from tempfile import NamedTemporaryFile | |
from typing import IO, Dict, Generator, Literal | |
import boto3 | |
from botocore.exceptions import ClientError | |
S3ConnectorMode = Literal["r", "w", "a"] | |
class S3Connector: | |
def __init__(self, bucket: str, **kwargs: Dict) -> None: | |
s3 = boto3.resource("s3", **kwargs) | |
try: | |
s3.meta.client.head_bucket(Bucket=bucket) | |
except ClientError as err: | |
raise err | |
self._bucket = s3.Bucket(bucket) | |
@contextmanager | |
def open(self, s3key: str, mode: S3ConnectorMode = "r") -> Generator[IO, None, None]: | |
if mode == "r": | |
yield from self._read(s3key) | |
elif mode == "w": | |
yield from self._write(s3key) | |
elif mode == "a": | |
yield from self._add(s3key) | |
else: | |
raise Exception("invalid mode") | |
def _read(self, s3key: str) -> Generator[IO, None, None]: | |
# 存在確認 | |
try: | |
self._bucket.meta.client.head_object(Bucket=self._bucket.name, Key=s3key) | |
except ClientError as err: | |
raise err | |
# S3のオブジェクトをローカルに一時ファイルとして作成 | |
with NamedTemporaryFile(delete=False, mode="w") as f_tmp: | |
tmpfile_name = f_tmp.name | |
self._bucket.download_file(s3key, tmpfile_name) | |
# withブロックに読み込み専用ファイルIOを渡す | |
# withブロック終了後に一時ファイルを閉じて削除する | |
f_yield = open(tmpfile_name) | |
try: | |
yield f_yield | |
finally: | |
f_yield.close() | |
os.unlink(tmpfile_name) | |
def _write(self, s3key: str) -> Generator[IO, None, None]: | |
# 書き込み用の一時ファイルを作成 | |
f_yield = NamedTemporaryFile(delete=False, mode="w") | |
tmpfile_name = f_yield.name | |
# withブロックに書き込み専用ファイルIOを渡す | |
# withブロック内で例外が発生した場合は一時ファイルを閉じて削除する | |
try: | |
yield f_yield | |
except Exception as err: | |
f_yield.close() | |
os.unlink(tmpfile_name) | |
raise err | |
# withブロックが正常終了した場合 | |
# S3にアップロードしてから一時ファイルを閉じて削除する | |
try: | |
f_yield.flush() | |
self._bucket.upload_file(tmpfile_name, s3key) | |
except ClientError as err: | |
raise err | |
finally: | |
f_yield.close() | |
os.unlink(tmpfile_name) | |
def _add(self, s3key: str) -> Generator[IO, None, None]: | |
# 存在確認 | |
try: | |
self._bucket.meta.client.head_object(Bucket=self._bucket.name, Key=s3key) | |
except ClientError as err: | |
raise err | |
# S3のオブジェクトをローカルに一時ファイルとして作成 | |
with NamedTemporaryFile(delete=False, mode="w") as f_tmp: | |
tmpfile_name = f_tmp.name | |
self._bucket.download_file(s3key, tmpfile_name) | |
# withブロックに追記専用ファイルIOを渡す | |
# withブロック終了後に一時ファイルを閉じて削除する | |
f_yield = open(tmpfile_name, "a") | |
try: | |
yield f_yield | |
except Exception as err: | |
f_yield.close() | |
os.unlink(tmpfile_name) | |
raise err | |
# withブロックが正常終了した場合 | |
# S3にアップロードしてから一時ファイルを閉じて削除する | |
try: | |
f_yield.flush() | |
self._bucket.upload_file(tmpfile_name, s3key) | |
except ClientError as err: | |
raise err | |
finally: | |
f_yield.close() | |
os.unlink(tmpfile_name) | |
if __name__ == "__main__": | |
bucket = "s3-bucket-name" # WRITE ME | |
key = "path/to/sample.txt" # WRITE ME | |
s3_conn = S3Connector(bucket) | |
with s3_conn.open(key) as f: | |
print(f.read()) | |
with s3_conn.open(key, "w") as f: | |
f.write("Hello World\n") | |
with s3_conn.open(key, "a") as f: | |
f.write("Goodbye\n") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment