Last active
April 2, 2025 15:18
-
-
Save a-r-g-v/a7899ec9d5a25739b34835c273949748 to your computer and use it in GitHub Desktop.
icloud
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import getpass | |
import json | |
import time | |
import logging | |
import requests | |
from shutil import copyfileobj | |
from pyicloud import PyiCloudService | |
from pyicloud.exceptions import PyiCloudAPIResponseException | |
# デバッグログを有効化 | |
def setup_logger(): | |
logger = logging.getLogger('pyicloud') | |
logger.setLevel(logging.DEBUG) | |
# コンソールハンドラー | |
console_handler = logging.StreamHandler() | |
console_handler.setLevel(logging.DEBUG) | |
# フォーマッタ | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
console_handler.setFormatter(formatter) | |
logger.addHandler(console_handler) | |
# requestsライブラリも詳細ログを出力 | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
return logger | |
def login_icloud(): | |
"""Apple ID とパスワードをプロンプト入力し、pyicloud でログインして認証済みの PyiCloudService を返す。""" | |
logger = setup_logger() | |
apple_id = input("Apple ID(メールアドレス)を入力してください: ") | |
password = getpass.getpass("Apple IDのパスワードを入力してください(非表示): ") | |
# デバッグ用ログ出力 | |
logger.debug("PyiCloudServiceを初期化します...") | |
# 標準のPyiCloudServiceを使用 | |
api = PyiCloudService(apple_id, password) | |
# 2FA が有効な場合の処理 | |
if api.requires_2fa: | |
print("二要素認証(2FA)コードの入力が必要です。") | |
# 信頼済みデバイスのリストを表示して選択させる | |
if api.trusted_devices: | |
print("\n信頼済みデバイスの一覧:") | |
for i, device in enumerate(api.trusted_devices): | |
# デバイス情報を表示(電話番号の一部が含まれる場合はSMSが可能なデバイス) | |
phone_number = device.get('phoneNumber', '') | |
device_name = device.get('deviceName', 'Unknown Device') | |
# デバイスの詳細情報をデバッグ表示 | |
logger.debug(f"デバイス {i} の詳細: {json.dumps(device, indent=2)}") | |
if phone_number: | |
print(f" [{i}]: {device_name} ({phone_number})") | |
else: | |
print(f" [{i}]: {device_name}") | |
device_index = int(input("\n認証コードを受け取るデバイスの番号を選択してください: ")) | |
device = api.trusted_devices[device_index] | |
# 選択したデバイスに検証コードを送信 | |
print(f"\n選択したデバイス {device.get('deviceName', 'Unknown Device')} に認証コードを送信します...") | |
# デバイス情報をログに出力 | |
logger.debug(f"選択したデバイス情報: {json.dumps(device, indent=2)}") | |
# 検証コードを送信 | |
success = api.send_verification_code(device) | |
logger.debug(f"検証コード送信結果: {success}") | |
if success: | |
# SMS受信の場合は電話番号で確認 | |
if device.get('phoneNumber'): | |
print(f"SMS経由で {device.get('phoneNumber')} に認証コードを送信しました。") | |
else: | |
print("デバイスに認証コードを送信しました。") | |
else: | |
print("検証コードの送信に失敗しました。") | |
return None | |
else: | |
print("信頼済みデバイスが見つかりません。") | |
return None | |
# コードの入力とリトライループ | |
max_attempts = 3 | |
attempt = 0 | |
verification_success = False | |
while attempt < max_attempts and not verification_success: | |
attempt += 1 | |
if attempt > 1: | |
print(f"\n認証コードの検証に失敗しました。残り試行回数: {max_attempts - attempt + 1}") | |
retry = input("新しいコードを要求しますか? (y/n): ") | |
if retry.lower() == 'y': | |
print("新しい認証コードを要求します...") | |
success = api.send_verification_code(device) | |
if success: | |
print("新しい認証コードを送信しました。") | |
else: | |
print("新しい認証コードの送信に失敗しました。") | |
# コードの入力 | |
code = input("受け取った6桁の認証コードを入力してください: ") | |
# 入力されたコードをログに記録 | |
logger.debug(f"入力されたコード: {code}") | |
# コードの前後の空白と改行を削除 | |
code = code.strip() | |
logger.debug(f"整形後のコード: {code}") | |
try: | |
# validate_verification_code メソッドを使用(2FA用のvalidate_2fa_codeではなく) | |
logger.debug("validate_verification_code メソッドを呼び出します...") | |
# validate_verification_code は device オブジェクトと code を必要とする | |
result = api.validate_verification_code(device, code) | |
logger.debug(f"検証結果: {result}") | |
if result: | |
verification_success = True | |
print("認証コードの検証に成功しました。") | |
else: | |
# APIからの応答が成功の場合は、resultがFalseでも認証は成功と判断 | |
if isinstance(result, dict) and result.get('success') is True: | |
verification_success = True | |
print("認証コードの検証に成功しました。") | |
else: | |
print("認証コードの検証に失敗しました。") | |
except PyiCloudAPIResponseException as ex: | |
status_code = getattr(ex, 'status', None) | |
reason = getattr(ex, 'reason', None) | |
response_text = getattr(ex, 'response_text', None) | |
print(f"APIエラー: {ex}") | |
print(f"ステータスコード: {status_code}, 理由: {reason}") | |
if response_text: | |
print(f"レスポンステキスト: {response_text}") | |
# エラーの詳細をログに出力 | |
logger.error(f"API応答エラー: {ex}") | |
logger.error(f"ステータスコード: {status_code}, 理由: {reason}") | |
if response_text: | |
try: | |
json_response = json.loads(response_text) | |
logger.error(f"JSONレスポンス: {json.dumps(json_response, indent=2)}") | |
except: | |
logger.error(f"テキストレスポンス: {response_text}") | |
print("コードが無効か有効期限が切れている可能性があります。") | |
except Exception as e: | |
logger.exception(f"予期しないエラー: {e}") | |
print(f"予期しないエラー: {e}") | |
if not verification_success: | |
raise Exception("認証コードの検証に失敗しました。最大試行回数を超えました。") | |
if not api.is_trusted_session: | |
# 今回のセッションを信頼済みに登録 | |
try: | |
api.trust_session() | |
print("このセッションを信頼済みに登録しました。(約2か月有効)") | |
except Exception as ex: | |
print(f"セッションの信頼登録に失敗しました: {ex}") | |
print("このエラーは無視して続行します。") | |
logger.warning(f"セッション信頼失敗: {ex}") | |
# 認証自体は成功しているので続行する | |
pass | |
elif api.requires_2sa: | |
# 古い 2ステップ認証(2SA)の場合(必要なら実装) | |
print("2SA 認証が必要です。処理を実装してください。") | |
sys.exit(1) | |
# 認証状態の最終確認 | |
if not api.user: | |
logger.error("iCloudユーザー情報を取得できません。認証に失敗しました。") | |
raise Exception("iCloud ログインに失敗しました。") | |
else: | |
logger.debug(f"認証成功: ユーザー {api.user.get('dsInfo', {}).get('fullName', 'Unknown')} としてログイン") | |
print("iCloud にログインしました。") | |
return api | |
def ensure_dir(path): | |
"""指定ディレクトリが存在しなければ作成する。""" | |
if not os.path.isdir(path): | |
os.makedirs(path) | |
def find_large_files_in_drive(folder, threshold): | |
"""iCloud Drive のフォルダを再帰的に探索し、閾値(バイト)以上のファイルをリストアップ。""" | |
items = [] | |
for name in folder.dir(): | |
item = folder[name] | |
if item.type == 'file': | |
# sizeがNoneの場合は比較をスキップ | |
if item.size is not None and item.size >= threshold: | |
items.append(item) | |
elif item.type == 'folder': | |
# 再帰的に探索 | |
items.extend(find_large_files_in_drive(item, threshold)) | |
return items | |
def download_icloud_drive(api, size_threshold, download_dir): | |
"""iCloud Drive 内のファイルから size_threshold(バイト)以上のものをダウンロード。""" | |
print(f"\n=== iCloud Drive から {size_threshold}バイト以上のファイルを探します ===") | |
large_files = find_large_files_in_drive(api.drive, size_threshold) | |
print(f"該当ファイル数: {len(large_files)} 件") | |
ensure_dir(download_dir) | |
for file_item in large_files: | |
file_name = file_item.name | |
file_path = os.path.join(download_dir, file_name) | |
size = file_item.size | |
print(f" ダウンロード: {file_name} (サイズ: {size} バイト) -> {file_path}") | |
with file_item.open(stream=True) as resp: | |
with open(file_path, 'wb') as out_f: | |
copyfileobj(resp.raw, out_f) | |
print("iCloud Drive のダウンロード完了。\n") | |
def download_icloud_photos(api, size_threshold, download_dir): | |
"""iCloud 写真ライブラリ(すべての写真)から size_threshold(バイト)以上の写真・動画をダウンロード。""" | |
print(f"=== iCloud 写真ライブラリから {size_threshold}バイト以上のファイルを探します ===") | |
all_photos = api.photos.all | |
large_assets = [] | |
# 写真ライブラリは多数ある場合があるので、時間がかかるかもしれません | |
for asset in all_photos: | |
if asset.size and asset.size >= size_threshold: | |
large_assets.append(asset) | |
print(f"該当写真/動画: {len(large_assets)} 件") | |
ensure_dir(download_dir) | |
for photo in large_assets: | |
filename = photo.filename or f"{photo.id}.jpg" # ファイル名がなければID使う | |
file_path = os.path.join(download_dir, filename) | |
print(f" ダウンロード: {filename} (サイズ: {photo.size} バイト) -> {file_path}") | |
resp = photo.download() # レスポンスを取得 | |
with open(file_path, 'wb') as out_f: | |
copyfileobj(resp.raw, out_f) | |
print("iCloud 写真ライブラリのダウンロード完了。\n") | |
def main(): | |
# 引数チェック: 第1引数に「閾値(MB)」を取る。指定がなければデフォルト10MB。 | |
if len(sys.argv) > 1: | |
try: | |
mb_value = float(sys.argv[1]) | |
except ValueError: | |
print("サイズ指定(MB)の引数が不正です。例: 10") | |
sys.exit(1) | |
else: | |
mb_value = 10.0 # デフォルト 10MB | |
size_threshold = int(mb_value * 1024 * 1024) | |
print(f"コマンドライン引数から閾値を取得しました: {mb_value} MB ({size_threshold} バイト以上を対象)") | |
# iCloud ログイン | |
api = login_icloud() | |
# ダウンロード先ディレクトリ (カレント下に downloads/{drive,photos} を作成) | |
drive_download_dir = os.path.join("downloads", "drive") | |
photos_download_dir = os.path.join("downloads", "photos") | |
# iCloud Drive の大きいファイルをダウンロード | |
download_icloud_drive(api, size_threshold, drive_download_dir) | |
# iCloud 写真ライブラリの大きいファイルをダウンロード | |
download_icloud_photos(api, size_threshold, photos_download_dir) | |
print("すべての処理が完了しました。") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment