Skip to content

Instantly share code, notes, and snippets.

@a-r-g-v
Last active April 2, 2025 15:18
Show Gist options
  • Save a-r-g-v/a7899ec9d5a25739b34835c273949748 to your computer and use it in GitHub Desktop.
Save a-r-g-v/a7899ec9d5a25739b34835c273949748 to your computer and use it in GitHub Desktop.
icloud
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import getpass
import json
import time
import logging
import requests
from shutil import copyfileobj
from pyicloud import PyiCloudService
from pyicloud.exceptions import PyiCloudAPIResponseException
# デバッグログを有効化
def setup_logger():
logger = logging.getLogger('pyicloud')
logger.setLevel(logging.DEBUG)
# コンソールハンドラー
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# フォーマッタ
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# requestsライブラリも詳細ログを出力
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
return logger
def login_icloud():
"""Apple ID とパスワードをプロンプト入力し、pyicloud でログインして認証済みの PyiCloudService を返す。"""
logger = setup_logger()
apple_id = input("Apple ID(メールアドレス)を入力してください: ")
password = getpass.getpass("Apple IDのパスワードを入力してください(非表示): ")
# デバッグ用ログ出力
logger.debug("PyiCloudServiceを初期化します...")
# 標準のPyiCloudServiceを使用
api = PyiCloudService(apple_id, password)
# 2FA が有効な場合の処理
if api.requires_2fa:
print("二要素認証(2FA)コードの入力が必要です。")
# 信頼済みデバイスのリストを表示して選択させる
if api.trusted_devices:
print("\n信頼済みデバイスの一覧:")
for i, device in enumerate(api.trusted_devices):
# デバイス情報を表示(電話番号の一部が含まれる場合はSMSが可能なデバイス)
phone_number = device.get('phoneNumber', '')
device_name = device.get('deviceName', 'Unknown Device')
# デバイスの詳細情報をデバッグ表示
logger.debug(f"デバイス {i} の詳細: {json.dumps(device, indent=2)}")
if phone_number:
print(f" [{i}]: {device_name} ({phone_number})")
else:
print(f" [{i}]: {device_name}")
device_index = int(input("\n認証コードを受け取るデバイスの番号を選択してください: "))
device = api.trusted_devices[device_index]
# 選択したデバイスに検証コードを送信
print(f"\n選択したデバイス {device.get('deviceName', 'Unknown Device')} に認証コードを送信します...")
# デバイス情報をログに出力
logger.debug(f"選択したデバイス情報: {json.dumps(device, indent=2)}")
# 検証コードを送信
success = api.send_verification_code(device)
logger.debug(f"検証コード送信結果: {success}")
if success:
# SMS受信の場合は電話番号で確認
if device.get('phoneNumber'):
print(f"SMS経由で {device.get('phoneNumber')} に認証コードを送信しました。")
else:
print("デバイスに認証コードを送信しました。")
else:
print("検証コードの送信に失敗しました。")
return None
else:
print("信頼済みデバイスが見つかりません。")
return None
# コードの入力とリトライループ
max_attempts = 3
attempt = 0
verification_success = False
while attempt < max_attempts and not verification_success:
attempt += 1
if attempt > 1:
print(f"\n認証コードの検証に失敗しました。残り試行回数: {max_attempts - attempt + 1}")
retry = input("新しいコードを要求しますか? (y/n): ")
if retry.lower() == 'y':
print("新しい認証コードを要求します...")
success = api.send_verification_code(device)
if success:
print("新しい認証コードを送信しました。")
else:
print("新しい認証コードの送信に失敗しました。")
# コードの入力
code = input("受け取った6桁の認証コードを入力してください: ")
# 入力されたコードをログに記録
logger.debug(f"入力されたコード: {code}")
# コードの前後の空白と改行を削除
code = code.strip()
logger.debug(f"整形後のコード: {code}")
try:
# validate_verification_code メソッドを使用(2FA用のvalidate_2fa_codeではなく)
logger.debug("validate_verification_code メソッドを呼び出します...")
# validate_verification_code は device オブジェクトと code を必要とする
result = api.validate_verification_code(device, code)
logger.debug(f"検証結果: {result}")
if result:
verification_success = True
print("認証コードの検証に成功しました。")
else:
# APIからの応答が成功の場合は、resultがFalseでも認証は成功と判断
if isinstance(result, dict) and result.get('success') is True:
verification_success = True
print("認証コードの検証に成功しました。")
else:
print("認証コードの検証に失敗しました。")
except PyiCloudAPIResponseException as ex:
status_code = getattr(ex, 'status', None)
reason = getattr(ex, 'reason', None)
response_text = getattr(ex, 'response_text', None)
print(f"APIエラー: {ex}")
print(f"ステータスコード: {status_code}, 理由: {reason}")
if response_text:
print(f"レスポンステキスト: {response_text}")
# エラーの詳細をログに出力
logger.error(f"API応答エラー: {ex}")
logger.error(f"ステータスコード: {status_code}, 理由: {reason}")
if response_text:
try:
json_response = json.loads(response_text)
logger.error(f"JSONレスポンス: {json.dumps(json_response, indent=2)}")
except:
logger.error(f"テキストレスポンス: {response_text}")
print("コードが無効か有効期限が切れている可能性があります。")
except Exception as e:
logger.exception(f"予期しないエラー: {e}")
print(f"予期しないエラー: {e}")
if not verification_success:
raise Exception("認証コードの検証に失敗しました。最大試行回数を超えました。")
if not api.is_trusted_session:
# 今回のセッションを信頼済みに登録
try:
api.trust_session()
print("このセッションを信頼済みに登録しました。(約2か月有効)")
except Exception as ex:
print(f"セッションの信頼登録に失敗しました: {ex}")
print("このエラーは無視して続行します。")
logger.warning(f"セッション信頼失敗: {ex}")
# 認証自体は成功しているので続行する
pass
elif api.requires_2sa:
# 古い 2ステップ認証(2SA)の場合(必要なら実装)
print("2SA 認証が必要です。処理を実装してください。")
sys.exit(1)
# 認証状態の最終確認
if not api.user:
logger.error("iCloudユーザー情報を取得できません。認証に失敗しました。")
raise Exception("iCloud ログインに失敗しました。")
else:
logger.debug(f"認証成功: ユーザー {api.user.get('dsInfo', {}).get('fullName', 'Unknown')} としてログイン")
print("iCloud にログインしました。")
return api
def ensure_dir(path):
"""指定ディレクトリが存在しなければ作成する。"""
if not os.path.isdir(path):
os.makedirs(path)
def find_large_files_in_drive(folder, threshold):
"""iCloud Drive のフォルダを再帰的に探索し、閾値(バイト)以上のファイルをリストアップ。"""
items = []
for name in folder.dir():
item = folder[name]
if item.type == 'file':
# sizeがNoneの場合は比較をスキップ
if item.size is not None and item.size >= threshold:
items.append(item)
elif item.type == 'folder':
# 再帰的に探索
items.extend(find_large_files_in_drive(item, threshold))
return items
def download_icloud_drive(api, size_threshold, download_dir):
"""iCloud Drive 内のファイルから size_threshold(バイト)以上のものをダウンロード。"""
print(f"\n=== iCloud Drive から {size_threshold}バイト以上のファイルを探します ===")
large_files = find_large_files_in_drive(api.drive, size_threshold)
print(f"該当ファイル数: {len(large_files)} 件")
ensure_dir(download_dir)
for file_item in large_files:
file_name = file_item.name
file_path = os.path.join(download_dir, file_name)
size = file_item.size
print(f" ダウンロード: {file_name} (サイズ: {size} バイト) -> {file_path}")
with file_item.open(stream=True) as resp:
with open(file_path, 'wb') as out_f:
copyfileobj(resp.raw, out_f)
print("iCloud Drive のダウンロード完了。\n")
def download_icloud_photos(api, size_threshold, download_dir):
"""iCloud 写真ライブラリ(すべての写真)から size_threshold(バイト)以上の写真・動画をダウンロード。"""
print(f"=== iCloud 写真ライブラリから {size_threshold}バイト以上のファイルを探します ===")
all_photos = api.photos.all
large_assets = []
# 写真ライブラリは多数ある場合があるので、時間がかかるかもしれません
for asset in all_photos:
if asset.size and asset.size >= size_threshold:
large_assets.append(asset)
print(f"該当写真/動画: {len(large_assets)} 件")
ensure_dir(download_dir)
for photo in large_assets:
filename = photo.filename or f"{photo.id}.jpg" # ファイル名がなければID使う
file_path = os.path.join(download_dir, filename)
print(f" ダウンロード: {filename} (サイズ: {photo.size} バイト) -> {file_path}")
resp = photo.download() # レスポンスを取得
with open(file_path, 'wb') as out_f:
copyfileobj(resp.raw, out_f)
print("iCloud 写真ライブラリのダウンロード完了。\n")
def main():
# 引数チェック: 第1引数に「閾値(MB)」を取る。指定がなければデフォルト10MB。
if len(sys.argv) > 1:
try:
mb_value = float(sys.argv[1])
except ValueError:
print("サイズ指定(MB)の引数が不正です。例: 10")
sys.exit(1)
else:
mb_value = 10.0 # デフォルト 10MB
size_threshold = int(mb_value * 1024 * 1024)
print(f"コマンドライン引数から閾値を取得しました: {mb_value} MB ({size_threshold} バイト以上を対象)")
# iCloud ログイン
api = login_icloud()
# ダウンロード先ディレクトリ (カレント下に downloads/{drive,photos} を作成)
drive_download_dir = os.path.join("downloads", "drive")
photos_download_dir = os.path.join("downloads", "photos")
# iCloud Drive の大きいファイルをダウンロード
download_icloud_drive(api, size_threshold, drive_download_dir)
# iCloud 写真ライブラリの大きいファイルをダウンロード
download_icloud_photos(api, size_threshold, photos_download_dir)
print("すべての処理が完了しました。")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment