Skip to content

Instantly share code, notes, and snippets.

@0x2a94b5
Created September 10, 2021 05:18
Show Gist options
  • Save 0x2a94b5/03b5897f4a762ee79e3786bf7b12153a to your computer and use it in GitHub Desktop.
Save 0x2a94b5/03b5897f4a762ee79e3786bf7b12153a to your computer and use it in GitHub Desktop.
MinIO Python Client base operations
"""
Reference: https://docs.min.io/docs/python-client-api-reference.html
"""
from json import dumps
from os import environ, listdir
from minio import Minio
CLIENT = Minio(
environ.get('ENDPOINT'),
access_key=environ.get('ACCESS_KEY'),
secret_key=environ.get('SECRET_KEY')
)
BUCKETS = {'IMG':'images', 'QR':'qrcodes'}
def create_bucket(name):
if CLIENT.bucket_exists(name):
return False
CLIENT.make_bucket(name)
return True
def set_policy(bucket):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["s3:GetBucketLocation", "s3:ListBucket"],
"Resource": f"arn:aws:s3:::{bucket}",
},
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": f"arn:aws:s3:::{bucket}/*",
},
],
}
return CLIENT.set_bucket_policy(bucket, dumps(policy))
def upload_object(bucket, object, file_path):
return CLIENT.fput_object(bucket, object, file_path, content_type='image/jpeg')
def get_url(bucket, object):
return CLIENT.get_presigned_url('GET', bucket, object)
if __name__ == '__main__':
"""
images bucket: anonymous read-only bucket.
qrcodes bucket: private bucket.
"""
# make buckets
for value in BUCKETS.values():
if create_bucket(value):
print(f'Successful {value} bucket creation.')
else:
print(f'{value} bucket already exists and does not need to be created.')
# set policy
result = set_policy(BUCKETS['IMG'])
if result is None:
print('Successfully set up policy!')
else:
print('Failure to set policy!')
# upload images
server = 'https://oss.example.com'
access_url = {}
bucket = BUCKETS['IMG']
for file in listdir('images'):
if upload_object(bucket, file, f'images/{file}'):
access_url[file] = f'{server}/{bucket}/{file}'
print(access_url)
# upload qrcode
bucket = BUCKETS['QR']
object = 'qrcode.png'
path = 'qrcodes/qrcode.png'
if upload_object(bucket, object, path):
result = get_url(bucket, object)
print(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment