Last active
April 5, 2020 14:03
-
-
Save sonthonaxrk/254f61239e4683bc1dd8c12310f36dcb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import pampy | |
S3_MATCHING_RULES = { | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?restore"): { | |
"restore_object": { | |
"header": {}, | |
"method": "POST", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?select&select-type=2"): { | |
"select_object_content": { | |
"header": {}, | |
"method": "POST", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?acl"): { | |
"get_bucket_acl": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_acl": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?legal-hold"): { | |
"get_object_legal_hold": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"put_object_legal_hold": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?cors"): { | |
"delete_bucket_cors": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_cors": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_cors": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?uploads"): { | |
"list_multipart_uploads": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?logging"): { | |
"get_bucket_logging": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_logging": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?website"): { | |
"delete_bucket_website": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_website": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_website": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?tagging"): { | |
"delete_bucket_tagging": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_tagging": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_tagging": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?metrics"): { | |
"delete_bucket_metrics_configuration": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_metrics_configuration": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
"list_bucket_metrics_configurations": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_metrics_configuration": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))"): { | |
"abort_multipart_upload": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {"uploadId": str}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"complete_multipart_upload": { | |
"header": {}, | |
"method": "POST", | |
"querystring": {"uploadId": str}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"copy_object": { | |
"header": {"x-amz-copy-source": str}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"delete_object": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"get_object": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"head_object": { | |
"header": {}, | |
"method": "HEAD", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"list_parts": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {"uploadId": str}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"put_object": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"upload_part": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {"partNumber": str, "uploadId": str}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"upload_part_copy": { | |
"header": {"x-amz-copy-source": str}, | |
"method": "PUT", | |
"querystring": {"partNumber": str, "uploadId": str}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?requestPayment"): { | |
"get_bucket_request_payment": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_request_payment": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?publicAccessBlock"): { | |
"delete_public_access_block": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_public_access_block": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_public_access_block": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?retention"): { | |
"get_object_retention": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"put_object_retention": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?location"): { | |
"get_bucket_location": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?versions"): { | |
"list_object_versions": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?torrent"): { | |
"get_object_torrent": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?tagging"): { | |
"delete_object_tagging": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"get_object_tagging": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"put_object_tagging": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?uploads"): { | |
"create_multipart_upload": { | |
"header": {}, | |
"method": "POST", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?policyStatus"): { | |
"get_bucket_policy_status": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?notification"): { | |
"get_bucket_notification": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_notification_configuration": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_notification": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_notification_configuration": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))"): { | |
"create_bucket": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"delete_bucket": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"head_bucket": { | |
"header": {}, | |
"method": "HEAD", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"list_objects": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/"): { | |
"list_buckets": {"header": {}, "method": "GET", "querystring": {}, "uri": {}} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?lifecycle"): { | |
"delete_bucket_lifecycle": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_lifecycle": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_lifecycle_configuration": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_lifecycle": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_lifecycle_configuration": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?inventory"): { | |
"delete_bucket_inventory_configuration": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_inventory_configuration": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
"list_bucket_inventory_configurations": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_inventory_configuration": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?analytics"): { | |
"delete_bucket_analytics_configuration": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_analytics_configuration": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
"list_bucket_analytics_configurations": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_analytics_configuration": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {"id": str}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?list-type=2"): { | |
"list_objects_v2": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?object-lock"): { | |
"get_object_lock_configuration": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_object_lock_configuration": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))/(?P<Key>(.*))\\?acl"): { | |
"get_object_acl": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
"put_object_acl": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str, "Key": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?replication"): { | |
"delete_bucket_replication": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_replication": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_replication": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?versioning"): { | |
"get_bucket_versioning": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_versioning": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?accelerate"): { | |
"get_bucket_accelerate_configuration": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_accelerate_configuration": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?delete"): { | |
"delete_objects": { | |
"header": {}, | |
"method": "POST", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
} | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?policy"): { | |
"delete_bucket_policy": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_policy": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_policy": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
re.compile("/(?P<Bucket>([^\\/]+))\\?encryption"): { | |
"delete_bucket_encryption": { | |
"header": {}, | |
"method": "DELETE", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"get_bucket_encryption": { | |
"header": {}, | |
"method": "GET", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
"put_bucket_encryption": { | |
"header": {}, | |
"method": "PUT", | |
"querystring": {}, | |
"uri": {"Bucket": str}, | |
}, | |
}, | |
} | |
class BaseS3Server: | |
def run_flask_request(self, request): | |
for regex_path, actions in S3_MATCHING_RULES.items(): | |
m = regex_path.match(request.path) | |
if m: | |
request_dict = { | |
'uri': m.groupdict(), | |
'header': dict(request.headers), | |
'querystring': dict(request.args), | |
'method': request.method.upper() | |
} | |
for method_name, action in actions.items(): | |
is_match, _ = pampy.match_dict(action, request_dict) | |
if is_match: | |
method = getattr(self, method_name, None) | |
if method: | |
return method(request, **request_dict) | |
else: | |
return self.not_implemented(request, **request_dict) | |
return self.invalid_s3_request() | |
def invalid_s3_request(self, *args, **kwargs): | |
pass | |
def not_implemented(self, *args, **kwargs): | |
pass | |
def list_buckets(self, request, uri, header, querystring, method) -> Response: | |
raise NotImplemented('return a xml reponse') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment