Created
September 5, 2024 23:31
-
-
Save password123456/555cc11f63aaf90f40bd940c678ea9c8 to your computer and use it in GitHub Desktop.
encrypt, decrypt the strings to use access-key. validate its expiration and IP address in a key
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# requirements | |
# pip install cryptography | |
import os | |
import sys | |
import json | |
from datetime import datetime, timedelta | |
from base64 import b64encode, b64decode | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.backends import default_backend | |
# AES-GCM 알고리즘으로 암호화하는 함수 | |
# Function to encrypt using the AES-GCM algorithm | |
def encrypt(original_access_key_string, passphrase_key, nonce): | |
encryptor = Cipher( | |
algorithms.AES(passphrase_key), # AES 알고리즘 사용 | |
modes.GCM(nonce), # GCM 모드와 nonce 설정 | |
backend=default_backend() # 암호화 백엔드 | |
).encryptor() | |
# 문자열 암호화 | |
# encrypt sring | |
encrypted_data = encryptor.update(original_access_key_string.encode()) + encryptor.finalize() | |
# 최종 엑세스 키: nonce + GCM 태그 + 암호화된 데이터 | |
# Final access key: nonce + GCM tag + encrypted data | |
encrypted_access_key = b64encode(nonce + encryptor.tag + encrypted_data).decode('utf-8') | |
return encrypted_access_key | |
# AES-GCM 알고리즘으로 복호화하는 함수 | |
# Function to decrypt using the AES-GCM algorithm | |
def decrypt(keydb, encrypted_access_key): | |
original_access_key_string = None | |
# key.db에서 app_id의 `passphrase_key` 가져오기 | |
# Retrieve the passphrase_key for the app_id from key.db | |
passphrase_key = get_key_data(keydb, encrypted_access_key) | |
try: | |
if passphrase_key: | |
passphrase_key = b64decode(passphrase_key) | |
# base64로 인코딩된 `encrypted_access_key`를 디코딩 | |
encrypted_access_key_bytes = b64decode(encrypted_access_key) | |
# nonce, tag, 암호화된 데이터 분리 | |
nonce = encrypted_access_key_bytes[:12] # nonce는 처음 12 바이트 | |
tag = encrypted_access_key_bytes[12:28] # GCM 태그는 12바이트 뒤의 16바이트 | |
ciphertext = encrypted_access_key_bytes[28:] # 28바이트 이후는 암호화된 문자열 | |
# 복호화 수행 | |
decryptor = Cipher( | |
algorithms.AES(passphrase_key), | |
modes.GCM(nonce, tag), | |
backend=default_backend() | |
).decryptor() | |
# 복호화된 데이터를 원래 문자열로 변환 | |
decrypted_data = decryptor.update(ciphertext) + decryptor.finalize() | |
original_access_key_string = decrypted_data.decode('utf-8') | |
except Exception as e: | |
print(str(e)) | |
return original_access_key_string | |
# 암호화된 데이터를 key.db에 저장하는 함수 | |
def write_to_db(keydb, app_id, passphrase_key, encrypted_access_key): | |
try: | |
if os.path.exists(keydb): | |
mode = 'a' # 기존 파일이 있으면 추가 모드 | |
else: | |
mode = 'w' # 없으면 새로 쓰기 모드 | |
with open(keydb, mode, encoding='utf-8') as f: | |
f.write(f'{datetime.now()}|{app_id}|{b64encode(passphrase_key).decode("utf-8")}|{encrypted_access_key}\n') | |
except Exception as e: | |
print(str(e)) | |
# key.db 에서 encrypted_access_key에 해당하는 키 데이터를 로드하는 함수 | |
# Function to load key data corresponding to the encrypted access key from key.db | |
def get_key_data(keydb, encrypted_access_key): | |
with (open(keydb, 'r', encoding='utf-8') as f): | |
for line in f: | |
if line.startswith('#') or len(line.strip()) == 0: | |
continue | |
# 주어진 암호화된 access_key와 일치하는 키를 찾음 | |
# Find the key that matches the given encrypted access key | |
if str(line.split('|')[3].strip()) == str(encrypted_access_key): | |
passphrase_key = line.split('|')[2].strip() | |
break | |
return passphrase_key | |
# access_key 유효성 검사를 수행하는 함수 (만료 시간과 허용된 IP 확인) | |
def validate_access_key(access_key, remote_addr): | |
# 현재 시간을 유닉스 타임스탬프로 변환 | |
# Convert the current time to Unix timestamp | |
current_timestamp = int(datetime.now().timestamp()) | |
try: | |
access_key = json.loads(access_key) # JSON Format | |
# `엑세스 키` 에서 `exp` 키 가 있는지 확인 | |
# Check if the 'exp' key exists in the access key | |
if 'exp' not in access_key: | |
raise KeyError('exp key is missing') # exp 키가 없으면 예외 발생 | |
# 'exp' 키의 시간과 현재 시간을 비교 엑세스 키가 만료 되었는지) | |
# Compare the 'exp' time with the current time (to check if the access key has expired) | |
if current_timestamp > int(access_key["exp"]): | |
print(' - Key is expired') | |
return False | |
else: | |
print(' - Key is valid') | |
# 2. `allow_ips` 리스트에서 remote_addr 확인 | |
# Verify the remote_addr in the `allow_ips` list | |
if 'allow_ips' not in access_key: | |
raise KeyError('allow_ips key is missing') # allow_ips 키가 없으면 예외 발생 Raise an exception if the `allow_ips` key is missing | |
allowed_ips = access_key['allow_ips'] | |
print(f'Access Key Allowed: {allowed_ips}') | |
# IP 주소가 허용된 리스트에 있는지 확인 | |
# Check if the IP address is in the allowed list | |
if remote_addr in allowed_ips: | |
print(f' - Remote address {remote_addr} is allowed') | |
return True | |
else: | |
print(f' - Remote address {remote_addr} is not allowed') | |
return False | |
except KeyError as e: | |
print(f'Error: {str(e)}. Key is invalid or expired.') | |
return False | |
except ValueError as e: | |
print(f'Invalid data format: {str(e)}') | |
return False | |
def main(): | |
# 현재 파일의 경로 가져오기 | |
# Get the path of the current file | |
home_path = os.path.dirname(os.path.realpath(__file__)) | |
# key.db 파일 경로 설정 | |
# Set the path for the key.db file | |
keydb = os.path.join(home_path, 'key.db') | |
# 암호화 키 (32바이트 랜덤 생성) | |
# Generate an encryption key (32-byte random) | |
passphrase_key_size = 32 | |
passphrase_key = os.urandom(passphrase_key_size) | |
# nonce 생성 (12바이트 랜덤 생성) | |
# Generate a nonce (12-byte random) | |
nonce_size = 12 | |
nonce = os.urandom(nonce_size) | |
# issuer | |
issuer = 'key-generator' | |
# Client ID | |
app_id = 'myapps' | |
# access_key 발급 시간 및 만료 시간 설정 | |
# Set the access_key issue time and expiration time | |
iat_time = datetime.now() # issue time | |
exp_time = iat_time + timedelta(days=90) # expiration time (+90d) | |
# 허용된 IP 주소 리스트 설정 | |
# Setting the list of allowed IP addresses | |
allow_ips = ['192.168.1.1', '192.168.1.2'] | |
# 원본 access_key 문자열 구성 (딕셔너리 형태) | |
# Constructing the original access key string (in dictionary format) | |
original_access_key_string_dict = { | |
'iss': issuer, | |
'app_id': app_id, | |
'iat': int(iat_time.timestamp()), # 발급 시간 Issuance time | |
'exp': int(exp_time.timestamp()), # 만료 시간 Expiration time | |
'allow_ips': allow_ips # 엑세스 키 사용이 허용된 IP The list of IPs allowed to use the access key is set | |
} | |
# 딕셔너리를 JSON 문자열로 변환 | |
original_access_key_string = json.dumps(original_access_key_string_dict) | |
# 암호화 수행 | |
encrypted_access_key = encrypt(original_access_key_string, passphrase_key, nonce) | |
print(f'Encrypted Access Key: {encrypted_access_key}') | |
# 암호화 결과 저장 | |
# Store the encryption result | |
if encrypted_access_key: | |
write_to_db(keydb, app_id, passphrase_key, encrypted_access_key) | |
# 복호화 수행 | |
# Perform decryption | |
decrypted_access_key = decrypt(keydb, encrypted_access_key) | |
print(f'Decrypted Access Key: {decrypted_access_key}') | |
if decrypted_access_key: | |
remote_addr = '192.168.10.1' # 엑세스키를 사용하고 있는 원격 IP 주소 (The remote IP address using the access key.) | |
validate_access_key(decrypted_access_key, remote_addr) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
sys.exit(0) | |
except Exception as error: | |
print(str(error)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment