Created
May 10, 2025 13:17
-
-
Save gojo22k/23d549b840eda9a7472c9a956063ad04 to your computer and use it in GitHub Desktop.
ja be le abhi pura copy paste mar
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# 🗂️ Google Drive Archive Extractor & Uploader | |
# Enhanced with improved authentication, visibility, and Google Drive link support | |
# ✅ STEP 1: Install required dependencies | |
!pip install -q PyDrive2 tqdm requests ipywidgets colorama google-api-python-client oauth2client | |
!apt -qq install p7zip-full unrar > /dev/null | |
from IPython.display import clear_output | |
clear_output() | |
# ✅ STEP 2: Import all necessary libraries | |
import os, re, sys, shutil, subprocess, time, requests, threading, urllib.parse, json, uuid, random, string, hashlib | |
import ipywidgets as widgets | |
from IPython.display import display, HTML, clear_output, Javascript | |
from tqdm.notebook import tqdm | |
from google.colab import auth, drive | |
from pydrive2.auth import GoogleAuth | |
from pydrive2.drive import GoogleDrive | |
from oauth2client.client import GoogleCredentials | |
# ✅ STEP 3: Create utility functions | |
def create_header(): | |
header_html = """ | |
<style> | |
@import url('https://fonts.googleapis.com/css2?family=Orbitron:wght@600&family=Roboto&display=swap'); | |
:root { | |
--bg-light: linear-gradient(270deg, #e0f7fa, #b2ebf2, #b3e5fc, #e1f5fe); | |
--bg-dark: linear-gradient(270deg, #0f2027, #203a43, #2c5364, #1a1a2e); | |
--text-light: #0f172a; | |
--text-dark: #e0e0e0; | |
--subtext-light: #374151; | |
--subtext-dark: #b0bec5; | |
} | |
@media (prefers-color-scheme: dark) { | |
.adaptive-header { | |
--bg: var(--bg-dark); | |
--text-color: var(--text-dark); | |
--subtext-color: var(--subtext-dark); | |
} | |
} | |
@media (prefers-color-scheme: light) { | |
.adaptive-header { | |
--bg: var(--bg-light); | |
--text-color: var(--text-light); | |
--subtext-color: var(--subtext-light); | |
} | |
} | |
.adaptive-header { | |
background: var(--bg); | |
background-size: 600% 600%; | |
animation: gradientShift 16s ease infinite; | |
padding: 40px; | |
border-radius: 20px; | |
margin-bottom: 30px; | |
text-align: center; | |
box-shadow: 0 8px 20px rgba(0, 0, 0, 0.15); | |
transition: all 0.3s ease; | |
position: relative; | |
} | |
.adaptive-header:hover { | |
transform: scale(1.01); | |
} | |
.adaptive-header h1 { | |
color: var(--text-color); | |
margin: 0; | |
font-family: 'Orbitron', sans-serif; | |
font-size: 32px; | |
text-shadow: 0 0 4px rgba(255, 255, 255, 0.2); | |
} | |
.adaptive-header p { | |
color: var(--subtext-color); | |
margin-top: 15px; | |
font-size: 18px; | |
font-family: 'Roboto', sans-serif; | |
opacity: 0.9; | |
} | |
.signature { | |
margin-top: 25px; | |
font-size: 14px; | |
font-family: 'Roboto', sans-serif; | |
color: var(--subtext-color); | |
opacity: 0.7; | |
} | |
.cancel-button { | |
position: absolute; | |
bottom: 15px; | |
right: 20px; | |
padding: 8px 16px; | |
background: linear-gradient(45deg, #ff4444, #ff6666); | |
color: white; | |
border: none; | |
border-radius: 20px; | |
font-weight: bold; | |
cursor: pointer; | |
transition: all 0.3s ease; | |
display: flex; | |
align-items: center; | |
gap: 8px; | |
box-shadow: 0 2px 8px rgba(255, 68, 68, 0.3); | |
} | |
.cancel-button:hover { | |
transform: scale(1.05); | |
box-shadow: 0 4px 12px rgba(255, 68, 68, 0.4); | |
} | |
.cancel-button:active { | |
transform: scale(0.95); | |
} | |
@keyframes gradientShift { | |
0% { background-position: 0% 50%; } | |
50% { background-position: 100% 50%; } | |
100% { background-position: 0% 50%; } | |
} | |
</style> | |
<div class="adaptive-header"> | |
<h1>🗂️ Archive Extractor & Drive Uploader</h1> | |
<p>Download → Extract → Upload to Multiple Google Drives</p> | |
<div class="signature">Made with ❤️ by Ankit</div> | |
<button id="cancel-process" class="cancel-button" onclick="cancelProcess()"> | |
<span>❌</span> Cancel Process | |
</button> | |
</div> | |
<script> | |
function cancelProcess() { | |
// Show confirmation dialog | |
if (confirm('Are you sure you want to cancel the current process?')) { | |
// Reload the page to cancel everything | |
window.location.reload(); | |
} | |
} | |
</script> | |
""" | |
display(HTML(header_html)) | |
def fancy_print(message, icon="🔔", color="#00ffee", bold=False, large=False): | |
icon_dict = { | |
"info": "ℹ️", "success": "✅", "warning": "⚠️", "error": "❌", | |
"download": "📥", "extract": "📂", "upload": "📤", "auth": "🔐", | |
"done": "🎉", "clean": "🧹", "retry": "🔄", "progress": "⏳" | |
} | |
icon = icon_dict.get(icon, icon) | |
font_weight = "bold" if bold else "normal" | |
font_size = "18px" if large else "16px" | |
display(HTML(f""" | |
<div style="margin:15px 0; padding:15px; background-color:rgba(0,0,0,0.05); | |
border-left:6px solid {color}; border-radius:8px; box-shadow:0 2px 5px rgba(0,0,0,0.05);"> | |
<span style="font-size:24px; margin-right:12px; vertical-align:middle;">{icon}</span> | |
<span style="color:{color}; font-weight:{font_weight}; font-size:{font_size}; vertical-align:middle;">{message}</span> | |
</div> | |
""")) | |
def is_google_drive_link(url): | |
patterns = [ | |
r'drive\.google\.com/open\?id=', | |
r'drive\.google\.com/file/d/', | |
r'drive\.google\.com/uc\?id=', | |
r'docs\.google\.com/uc\?export=download&id=' | |
] | |
return any(re.search(pattern, url) for pattern in patterns) | |
def get_file_id_from_url(url): | |
if 'id=' in url: | |
return url.split('id=')[1].split('&')[0] | |
elif '/file/d/' in url: | |
return url.split('/file/d/')[1].split('/')[0] | |
else: | |
match = re.search(r'[a-zA-Z0-9_-]{25,}', url) | |
return match.group(0) if match else None | |
def download_with_progress(url, output_path): | |
try: | |
# Check if it's a Google Drive link | |
is_gdrive = is_google_drive_link(url) | |
if is_gdrive: | |
file_id = get_file_id_from_url(url) | |
if not file_id: | |
fancy_print("Could not extract file ID from Google Drive link", "error", "#ff6347", True) | |
return False | |
url = create_direct_download_url(file_id) | |
fancy_print(f"Detected Google Drive link. Using file ID: {file_id}", "info", "#4285f4") | |
# Start with a HEAD request to check for size and confirmation token | |
with requests.head(url, allow_redirects=True) as head_r: | |
total_size = int(head_r.headers.get('content-length', 0)) | |
# For Google Drive, we need to handle the confirmation page | |
if is_gdrive and ('download_warning' in head_r.url or 'confirm=' in head_r.url): | |
fancy_print("Google Drive file requires confirmation. Handling confirmation...", "info", "#4285f4") | |
# Get the confirmation token | |
session = requests.Session() | |
response = session.get(url, stream=True) | |
# If we have a very large file, Google Drive shows a confirmation page | |
if 'confirm=' in response.url: | |
confirm_token = response.url.split('confirm=')[1].split('&')[0] | |
url = f"{url}&confirm={confirm_token}" | |
fancy_print("Confirmation token acquired. Proceeding with download.", "success", "#90ee90") | |
# Reset the session for the actual download | |
response = session.get(url, stream=True) | |
else: | |
# Regular download - no confirmation needed | |
session = requests.Session() | |
response = session.get(url, stream=True) | |
fancy_print(f"Downloading {os.path.basename(output_path)}", "download", "#4285f4", True, True) | |
# Enhanced download progress display with better ETA and responsive design | |
file_size_mb = total_size / (1024 * 1024) | |
file_size_str = f"{file_size_mb:.2f} MB" if file_size_mb < 1024 else f"{file_size_mb / 1024:.2f} GB" | |
file_size_display = f"{file_size_mb:.2f} MB" if file_size_mb > 0 else "Unknown size" | |
download_display = HTML(f""" | |
<style> | |
:root {{ | |
--light-bg: #ffffff; | |
--light-text: #202124; | |
--light-sub-bg: #e0e0e0; | |
--light-accent: #2196f3; | |
--light-progress: #4CAF50; | |
--dark-bg: #1e1e1e; | |
--dark-text: #f1f1f1; | |
--dark-sub-bg: #333333; | |
--dark-accent: #64b5f6; | |
--dark-progress: #81C784; | |
}} | |
@media (prefers-color-scheme: dark) {{ | |
.sexy-download {{ | |
background-color: var(--dark-bg); | |
color: var(--dark-text); | |
box-shadow: 0 4px 20px rgba(100,181,246,0.2); | |
}} | |
.sexy-download-bar {{ | |
background-color: var(--dark-sub-bg); | |
}} | |
.sexy-download-fill {{ | |
background: linear-gradient(90deg, var(--dark-accent), #bbdefb); | |
box-shadow: 0 0 10px var(--dark-accent); | |
}} | |
.sexy-download-title {{ | |
color: var(--dark-accent); | |
}} | |
.progress-border {{ | |
background: linear-gradient(90deg, var(--dark-progress), #8BC34A, var(--dark-progress)); | |
}} | |
}} | |
@media (prefers-color-scheme: light) {{ | |
.sexy-download {{ | |
background-color: var(--light-bg); | |
color: var(--light-text); | |
box-shadow: 0 4px 20px rgba(33,150,243,0.15); | |
}} | |
.sexy-download-bar {{ | |
background-color: var(--light-sub-bg); | |
}} | |
.sexy-download-fill {{ | |
background: linear-gradient(90deg, var(--light-accent), #e3f2fd); | |
box-shadow: 0 0 10px var(--light-accent); | |
}} | |
.sexy-download-title {{ | |
color: var(--light-accent); | |
}} | |
.progress-border {{ | |
background: linear-gradient(90deg, var(--light-progress), #8BC34A, var(--light-progress)); | |
}} | |
}} | |
.sexy-download {{ | |
margin: 20px 0; | |
padding: 20px; | |
border-radius: 16px; | |
font-family: "Segoe UI", sans-serif; | |
transition: all 0.3s ease; | |
position: relative; | |
overflow: hidden; | |
}} | |
.progress-border {{ | |
position: absolute; | |
top: 0; | |
left: 0; | |
right: 0; | |
height: 3px; | |
background-size: 200% 100%; | |
animation: progressGradient 2s linear infinite; | |
}} | |
.sexy-download-title {{ | |
font-size: 18px; | |
font-weight: 600; | |
margin-bottom: 15px; | |
display: flex; | |
align-items: center; | |
gap: 10px; | |
}} | |
.sexy-download-bar {{ | |
height: 20px; | |
border-radius: 10px; | |
overflow: hidden; | |
position: relative; | |
margin: 15px 0; | |
box-shadow: inset 0 2px 4px rgba(0,0,0,0.1); | |
}} | |
.sexy-download-fill {{ | |
height: 100%; | |
width: 0%; | |
transition: width 0.5s ease; | |
position: relative; | |
}} | |
.sexy-download-fill::after {{ | |
content: ''; | |
position: absolute; | |
top: 0; | |
left: 0; | |
right: 0; | |
bottom: 0; | |
background: linear-gradient(90deg, rgba(255,255,255,0.1), rgba(255,255,255,0.2), rgba(255,255,255,0.1)); | |
background-size: 200% 100%; | |
animation: shine 2s linear infinite; | |
}} | |
.sexy-download-meta {{ | |
display: flex; | |
justify-content: space-between; | |
font-size: 14px; | |
margin-top: 10px; | |
}} | |
.sexy-download-details {{ | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
gap: 10px; | |
margin-top: 15px; | |
padding: 10px; | |
background: rgba(0,0,0,0.05); | |
border-radius: 10px; | |
}} | |
.sexy-download-detail {{ | |
display: flex; | |
flex-direction: column; | |
gap: 5px; | |
}} | |
.sexy-download-label {{ | |
font-size: 12px; | |
opacity: 0.7; | |
}} | |
.sexy-download-value {{ | |
font-weight: 600; | |
font-size: 14px; | |
}} | |
.restart-button {{ | |
margin-top: 20px; | |
padding: 10px 20px; | |
background: linear-gradient(45deg, #2196f3, #64b5f6); | |
color: white; | |
border: none; | |
border-radius: 25px; | |
font-weight: bold; | |
cursor: pointer; | |
transition: all 0.3s ease; | |
display: none; | |
}} | |
.restart-button:hover {{ | |
transform: scale(1.05); | |
box-shadow: 0 4px 15px rgba(33,150,243,0.3); | |
}} | |
@keyframes progressGradient {{ | |
0% {{ background-position: 0% 0%; }} | |
100% {{ background-position: 200% 0%; }} | |
}} | |
@keyframes shine {{ | |
0% {{ background-position: 200% 0; }} | |
100% {{ background-position: -200% 0; }} | |
}} | |
.pulse {{ | |
animation: pulse 2s infinite; | |
}} | |
@keyframes pulse {{ | |
0% {{ transform: scale(1); }} | |
50% {{ transform: scale(1.05); }} | |
100% {{ transform: scale(1); }} | |
}} | |
</style> | |
<div class="sexy-download"> | |
<div class="progress-border"></div> | |
<div class="sexy-download-title"> | |
🚀 Downloading: <b>{os.path.basename(output_path)}</b> <small>({file_size_display})</small> | |
</div> | |
<div class="sexy-download-bar"> | |
<div id="download-progress-bar" class="sexy-download-fill"></div> | |
</div> | |
<div class="sexy-download-meta"> | |
<span id="download-progress-text">--%</span> | |
<span id="download-speed">-- KB/s</span> | |
<span id="download-size">--MB / {file_size_mb}</span> | |
</div> | |
<div class="sexy-download-details"> | |
<div class="sexy-download-detail"> | |
<span class="sexy-download-label">Time Elapsed</span> | |
<span class="sexy-download-value" id="time-elapsed">00:00:00</span> | |
</div> | |
<div class="sexy-download-detail"> | |
<span class="sexy-download-label">Estimated Time Remaining</span> | |
<span class="sexy-download-value" id="eta">--:--:--</span> | |
</div> | |
<div class="sexy-download-detail"> | |
<span class="sexy-download-label">Download Speed</span> | |
<span class="sexy-download-value" id="current-speed">-- KB/s</span> | |
</div> | |
<div class="sexy-download-detail"> | |
<span class="sexy-download-label">Data Downloaded</span> | |
<span class="sexy-download-value" id="data-downloaded">0 MB</span> | |
</div> | |
</div> | |
<button id="restart-button" class="restart-button pulse">🔄 Restart Process</button> | |
</div> | |
""") | |
display(download_display) | |
# Use a session to keep cookies for Google Drive downloads | |
with session.get(url, stream=True) as r: | |
r.raise_for_status() | |
# Update total_size - it might have changed after confirmation | |
total_size = int(r.headers.get('content-length', 0)) or total_size | |
block_size = 8192 # 8 KB | |
wrote = 0 | |
start_time = time.time() | |
last_update_time = start_time | |
last_wrote = 0 | |
speed_history = [] | |
with open(output_path, 'wb') as f: | |
for data in r.iter_content(block_size): | |
wrote += len(data) | |
f.write(data) | |
# Update progress bar approximately every 0.3 seconds | |
current_time = time.time() | |
if current_time - last_update_time > 0.3: | |
# Calculate percentage and speed | |
percentage = min(100, int((wrote / total_size) * 100)) if total_size > 0 else 0 | |
# Calculate current speed | |
time_diff = current_time - last_update_time | |
data_diff = wrote - last_wrote | |
current_speed = data_diff / time_diff if time_diff > 0 else 0 | |
# Keep track of speed history for better ETA | |
speed_history.append(current_speed) | |
if len(speed_history) > 10: # Keep last 10 speed measurements | |
speed_history.pop(0) | |
# Calculate average speed | |
avg_speed = sum(speed_history) / len(speed_history) if speed_history else current_speed | |
# Calculate ETA | |
remaining_bytes = total_size - wrote | |
eta_seconds = remaining_bytes / avg_speed if avg_speed > 0 else 0 | |
# Format time | |
elapsed_time = current_time - start_time | |
elapsed_str = time.strftime('%H:%M:%S', time.gmtime(elapsed_time)) | |
eta_str = time.strftime('%H:%M:%S', time.gmtime(eta_seconds)) | |
# Format the speed in appropriate units | |
if avg_speed < 1024: | |
speed_text = f"{avg_speed:.1f} B/s" | |
elif avg_speed < 1024 * 1024: | |
speed_text = f"{avg_speed/1024:.1f} KB/s" | |
else: | |
speed_text = f"{avg_speed/(1024*1024):.1f} MB/s" | |
# Format downloaded size | |
downloaded_mb = wrote / (1024 * 1024) | |
# Update the progress display | |
display(HTML(f""" | |
<script> | |
document.getElementById('download-progress-bar').style.width = '{percentage}%'; | |
document.getElementById('download-progress-text').textContent = '{percentage}%'; | |
document.getElementById('download-speed').textContent = '{speed_text}'; | |
document.getElementById('download-size').textContent = '{downloaded_mb:.2f} MB / {file_size_mb:.2f} MB'; | |
document.getElementById('time-elapsed').textContent = '{elapsed_str}'; | |
document.getElementById('eta').textContent = '{eta_str}'; | |
document.getElementById('current-speed').textContent = '{speed_text}'; | |
document.getElementById('data-downloaded').textContent = '{downloaded_mb:.2f} MB'; | |
</script> | |
""")) | |
last_update_time = current_time | |
last_wrote = wrote | |
if total_size != 0 and wrote != total_size: | |
fancy_print("Download incomplete, file might be corrupted", "error", "#ff6347", True) | |
return False | |
# Additional check for Google Drive downloads | |
if is_gdrive and os.path.getsize(output_path) < 10000: # Small file might be an error page | |
with open(output_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
if "Google Drive - Quota exceeded" in content or "Error 404" in content: | |
fancy_print("Google Drive download error: Quota exceeded or file not found", "error", "#ff6347", True) | |
return False | |
fancy_print(f"Download complete! File saved as: {os.path.basename(output_path)}", "success", "#90ee90", True) | |
# Show restart button | |
display(HTML(""" | |
<script> | |
document.getElementById('restart-button').style.display = 'block'; | |
</script> | |
""")) | |
return True | |
except Exception as e: | |
fancy_print(f"Download failed: {str(e)}", "error", "#ff6347", True) | |
return False | |
def extract_archive(filename, extract_dir): | |
try: | |
os.makedirs(extract_dir, exist_ok=True) | |
fancy_print(f"Extracting {filename}...", "extract", "#ff9900", True, True) | |
# Create a more visible, theme-adaptive extraction status display | |
extraction_display = HTML(f""" | |
<style> | |
:root {{ | |
--light-bg: #fffde7; | |
--light-text: #ff9900; | |
--light-border: #ff9900; | |
--light-inner-bg: #fff8e1; | |
--dark-bg: #2d2a17; | |
--dark-text: #ffcc66; | |
--dark-border: #cc9900; | |
--dark-inner-bg: #3a341c; | |
}} | |
@media (prefers-color-scheme: dark) {{ | |
.extract-box {{ | |
background-color: var(--dark-bg); | |
border-left: 6px solid var(--dark-border); | |
color: var(--dark-text); | |
}} | |
.extract-inner {{ | |
background-color: var(--dark-inner-bg); | |
color: var(--dark-text); | |
}} | |
}} | |
@media (prefers-color-scheme: light) {{ | |
.extract-box {{ | |
background-color: var(--light-bg); | |
border-left: 6px solid var(--light-border); | |
color: var(--light-text); | |
}} | |
.extract-inner {{ | |
background-color: var(--light-inner-bg); | |
color: var(--light-text); | |
}} | |
}} | |
.extract-box {{ | |
margin: 15px 0; | |
padding: 15px; | |
border-radius: 10px; | |
box-shadow: 0 2px 5px rgba(0,0,0,0.1); | |
}} | |
.extract-inner {{ | |
max-height: 200px; | |
overflow-y: auto; | |
padding: 10px; | |
border-radius: 5px; | |
font-family: monospace; | |
font-size: 12px; | |
white-space: pre-wrap; | |
word-wrap: break-word; | |
}} | |
.progress-bar {{ | |
width: 100%; | |
height: 4px; /* Thin progress bar */ | |
background-color: #f0f0f0; | |
border-radius: 2px; | |
overflow: hidden; | |
margin: 10px 0; | |
}} | |
.progress-fill {{ | |
height: 100%; | |
background-color: #ff9900; | |
width: 0%; | |
transition: width 0.3s ease; | |
}} | |
.progress-text {{ | |
text-align: center; | |
margin-top: 5px; | |
font-size: 14px; | |
color: var(--light-text); | |
}} | |
</style> | |
<div class="extract-box"> | |
<div style="font-weight:bold; margin-bottom:10px;"> | |
📂 Extraction in progress: {os.path.basename(filename)} | |
</div> | |
<div class="progress-bar"> | |
<div id="extract-progress" class="progress-fill"></div> | |
</div> | |
<div id="progress-text" class="progress-text">0%</div> | |
<div id="extraction-status" class="extract-inner"> | |
Starting extraction process... | |
</div> | |
</div> | |
""") | |
display(extraction_display) | |
def update_progress(current, total, message): | |
percentage = int((current / total) * 100) if total > 0 else 0 | |
display(HTML(f""" | |
<script> | |
document.getElementById('extract-progress').style.width = '{percentage}%'; | |
document.getElementById('progress-text').textContent = '{percentage}%'; | |
document.getElementById('extraction-status').innerHTML += `\\n{message}`; | |
document.getElementById('extraction-status').scrollTop = document.getElementById('extraction-status').scrollHeight; | |
</script> | |
""")) | |
def run_extraction_command(cmd, archive_type): | |
process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
universal_newlines=True, | |
bufsize=1 | |
) | |
# First pass to count total lines | |
total_lines = 0 | |
for line in process.stdout: | |
if line.strip(): | |
total_lines += 1 | |
process.stdout.close() | |
process.wait() | |
# Reset the process for actual extraction | |
process = subprocess.Popen( | |
cmd, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
universal_newlines=True, | |
bufsize=1 | |
) | |
current_line = 0 | |
for line in process.stdout: | |
if line.strip(): | |
current_line += 1 | |
# Clean up the line for better display | |
clean_line = line.strip() | |
if archive_type == 'zip': | |
clean_line = clean_line.replace('Scanning the drive for archives:', '') | |
clean_line = clean_line.replace('Extracting archive:', '') | |
# Check for header errors | |
if 'Headers Error' in clean_line: | |
fancy_print("Detected header errors in ZIP file. Attempting recovery...", "warning", "#FFA500", True) | |
return -1 # Special return code for header errors | |
elif archive_type == 'rar': | |
clean_line = clean_line.replace('Extracting from', '') | |
elif archive_type == '7z': | |
clean_line = clean_line.replace('Scanning the drive for archives:', '') | |
clean_line = clean_line.replace('Extracting archive:', '') | |
elif archive_type == 'tar': | |
clean_line = clean_line.replace('x ', '') | |
update_progress(current_line, total_lines, clean_line) | |
time.sleep(0.1) # Small delay to make updates visible | |
process.wait() | |
return process.returncode | |
# First, check if the file is actually an archive | |
try: | |
# Use file command to determine actual file type | |
file_type = subprocess.check_output(['file', '-b', filename]).decode('utf-8').strip().lower() | |
fancy_print(f"Detected file type: {file_type}", "info", "#4285f4") | |
# Check if it's a text file or HTML file (common for Google Drive error pages) | |
if 'text' in file_type or 'html' in file_type: | |
with open(filename, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
if "Google Drive - Quota exceeded" in content or "Error 404" in content: | |
fancy_print("Google Drive download error: Quota exceeded or file not found", "error", "#ff6347", True) | |
return False | |
else: | |
fancy_print("File appears to be a text/HTML file, not an archive", "error", "#ff6347", True) | |
return False | |
# Get file extension and determine archive type | |
file_ext = os.path.splitext(filename)[1].lower() | |
archive_type = None | |
# Define archive types and their extensions | |
archive_types = { | |
'.zip': 'zip', | |
'.rar': 'rar', | |
'.7z': '7z', | |
'.tar': 'tar', | |
'.tar.gz': 'tar', | |
'.tgz': 'tar', | |
'.gz': 'tar' | |
} | |
# Try to determine archive type from extension | |
if file_ext in archive_types: | |
archive_type = archive_types[file_ext] | |
fancy_print(f"Detected archive type: {archive_type.upper()}", "info", "#4285f4") | |
else: | |
# If extension not recognized, try to determine from file type | |
if 'zip' in file_type: | |
archive_type = 'zip' | |
elif 'rar' in file_type: | |
archive_type = 'rar' | |
elif '7-zip' in file_type: | |
archive_type = '7z' | |
elif 'tar' in file_type: | |
archive_type = 'tar' | |
else: | |
fancy_print("Could not determine archive type. Trying 7z as fallback...", "warning", "#FFA500") | |
archive_type = '7z' | |
# Extract based on archive type | |
if archive_type == 'zip': | |
# First try Python's zipfile module | |
try: | |
import zipfile | |
with zipfile.ZipFile(filename, 'r') as zip_ref: | |
# Get total size for progress tracking | |
total_size = sum(file.file_size for file in zip_ref.filelist) | |
extracted_size = 0 | |
# Extract each file with progress tracking | |
for file in zip_ref.filelist: | |
try: | |
# Update progress before extraction | |
update_progress( | |
extracted_size, | |
total_size, | |
f"Extracting: {file.filename}" | |
) | |
# Extract the file | |
zip_ref.extract(file, extract_dir) | |
# Update extracted size | |
extracted_size += file.file_size | |
# Update progress after extraction | |
update_progress( | |
extracted_size, | |
total_size, | |
f"Extracted: {file.filename}" | |
) | |
except Exception as e: | |
fancy_print(f"Error extracting {file.filename}: {str(e)}", "warning", "#FFA500") | |
continue | |
# Final progress update | |
update_progress(total_size, total_size, "Extraction complete!") | |
fancy_print("Successfully extracted using Python zipfile module", "success", "#00ffcc") | |
return True | |
except Exception as e: | |
fancy_print(f"Python zipfile extraction failed: {str(e)}", "warning", "#FFA500") | |
# Fallback to 7z | |
cmd = ["7z", "x", filename, f"-o{extract_dir}", "-y", "-bsp1"] | |
result = run_extraction_command(cmd, 'zip') | |
if result == -1: # Header errors detected | |
fancy_print("Attempting to repair ZIP file...", "warning", "#FFA500", True) | |
# Try with unzip | |
cmd = ["unzip", "-o", filename, "-d", extract_dir] | |
result = run_extraction_command(cmd, 'zip') | |
if result != 0: | |
fancy_print("Trying with 7z in recovery mode...", "warning", "#FFA500", True) | |
# Try 7z with recovery options | |
cmd = ["7z", "x", filename, f"-o{extract_dir}", "-y", "-bsp1", "-p", "-t*"] | |
result = run_extraction_command(cmd, 'zip') | |
success = result == 0 | |
if not success: | |
fancy_print("7z extraction failed for ZIP file", "error", "#ff6347") | |
return False | |
return True | |
elif archive_type == 'rar': | |
cmd = ["unrar", "x", "-y", filename, f"{extract_dir}/"] | |
success = run_extraction_command(cmd, 'rar') == 0 | |
if not success: | |
fancy_print("unrar extraction failed, trying 7z...", "warning", "#FFA500") | |
cmd = ["7z", "x", filename, f"-o{extract_dir}", "-y", "-bsp1"] | |
success = run_extraction_command(cmd, '7z') == 0 | |
elif archive_type == '7z': | |
cmd = ["7z", "x", filename, f"-o{extract_dir}", "-y", "-bsp1"] | |
success = run_extraction_command(cmd, '7z') == 0 | |
elif archive_type == 'tar': | |
cmd = ["7z", "x", filename, f"-o{extract_dir}", "-y", "-bsp1"] | |
success = run_extraction_command(cmd, 'tar') == 0 | |
if not success: | |
fancy_print("Extraction failed with all methods", "error", "#ff6347", True) | |
return False | |
file_count = sum([len(files) for _, _, files in os.walk(extract_dir)]) | |
fancy_print(f"Extraction complete! {file_count} files extracted", "success", "#00ffcc", True) | |
return True | |
except Exception as e: | |
fancy_print(f"Error detecting file type: {str(e)}", "error", "#ff6347", True) | |
return False | |
except Exception as e: | |
fancy_print(f"Extraction failed: {str(e)}", "error", "#ff6347", True) | |
return False | |
def authenticate_drive(account_num=1, previous_accounts=None): | |
fancy_print(f"Starting authentication for Google Account #{account_num}...", "auth", "#ff9900", True) | |
if previous_accounts is None: | |
previous_accounts = [] | |
try: | |
# First unmount any existing drives and clear the session | |
try: | |
drive.flush_and_unmount() | |
time.sleep(1) | |
# Clear any stored credentials to force new authentication | |
from google.colab import auth | |
auth.unload_ipython_extension(get_ipython()) | |
time.sleep(1) | |
except: | |
pass | |
# Show explicit instructions for authentication | |
auth_instructions = HTML(""" | |
<style> | |
:root { | |
--light-bg-auth: #e8f0fe; | |
--light-text-auth: #202124; | |
--light-border-auth: #4285f4; | |
--dark-bg-auth: #1f2a3a; | |
--dark-text-auth: #e0e0e0; | |
--dark-border-auth: #8ab4f8; | |
} | |
@media (prefers-color-scheme: dark) { | |
.auth-box { | |
background-color: var(--dark-bg-auth); | |
color: var(--dark-text-auth); | |
border-left: 6px solid var(--dark-border-auth); | |
} | |
.auth-box h3 { | |
color: var(--dark-border-auth); | |
} | |
} | |
@media (prefers-color-scheme: light) { | |
.auth-box { | |
background-color: var(--light-bg-auth); | |
color: var(--light-text-auth); | |
border-left: 6px solid var(--light-border-auth); | |
} | |
.auth-box h3 { | |
color: var(--light-border-auth); | |
} | |
} | |
.auth-box h3 { | |
margin-top: 0; | |
} | |
.auth-box ol { | |
padding-left: 20px; | |
} | |
.auth-box li { | |
margin-bottom: 6px; | |
} | |
</style> | |
<div class="auth-box" style="margin:15px 0; padding:15px; border-radius:10px;"> | |
<h3>Authentication Instructions</h3> | |
<p><strong>IMPORTANT:</strong> To connect a <u>different</u> Google account:</p> | |
<ol> | |
<li>Click the "Sign in with Google" button that appears</li> | |
<li>If your browser opens Google with a pre-selected account, click <strong>"Use another account"</strong></li> | |
<li>Sign in with a different Google account than previously used</li> | |
</ol> | |
<p>If you keep seeing the same account, try using incognito mode or clearing your browser cookies.</p> | |
</div> | |
""") | |
display(auth_instructions) | |
# Now authenticate | |
auth.authenticate_user() | |
# Try to initialize the PyDrive client | |
gauth = GoogleAuth() | |
gauth.credentials = GoogleCredentials.get_application_default() | |
drive_client = GoogleDrive(gauth) | |
# Test if it works by trying to list files | |
file_list = drive_client.ListFile({'q': "'root' in parents", 'maxResults': 1}).GetList() | |
# Check drive space and get account fingerprint | |
space_info = check_drive_space() | |
if not space_info: | |
raise Exception("Failed to get drive space information") | |
# Check if this account was already authenticated | |
if space_info.get('account_fingerprint') in previous_accounts: | |
fancy_print(f"⚠️ This appears to be the same account as previously authenticated. Please use a different Google account.", | |
"error", "#ff6347", True) | |
# Show an explicit error message with instructions | |
same_account_error = HTML(""" | |
<div style="margin:15px 0; padding:15px; background-color:#fef2f2; border-radius:10px; border-left:6px solid #dc3545;"> | |
<h3 style="color:#dc3545; margin-top:0;">Same Account Detected!</h3> | |
<p>You're trying to authenticate with an account that's already connected.</p> | |
<p><strong>To connect a different account:</strong></p> | |
<ol> | |
<li>Click the retry button below</li> | |
<li>When Google sign-in appears, click "Use another account"</li> | |
<li>Sign in with a completely different Google account</li> | |
</ol> | |
<p>If you keep seeing this error, try opening this Colab notebook in an incognito/private window.</p> | |
</div> | |
""") | |
display(same_account_error) | |
# Add retry button | |
retry_button = widgets.Button( | |
description=f'Try Different Account', | |
button_style='danger', | |
icon='refresh', | |
layout=widgets.Layout(width='200px', height='40px') | |
) | |
display(retry_button) | |
def on_retry_click(b): | |
retry_button.close() | |
authenticate_drive(account_num, previous_accounts) | |
retry_button.on_click(on_retry_click) | |
return None | |
# Add this account's fingerprint to the list | |
if space_info.get('account_fingerprint'): | |
previous_accounts.append(space_info.get('account_fingerprint')) | |
# Display space information with enhanced visualization | |
if space_info: | |
# Calculate the percentage as a number | |
try: | |
percent_num = int(space_info['used_percent'].strip('%')) | |
except: | |
percent_num = 0 | |
# Determine color based on usage | |
color = "#4CAF50" # Green for low usage | |
if percent_num > 70: | |
color = "#FF9800" # Orange for medium usage | |
if percent_num > 90: | |
color = "#F44336" # Red for high usage | |
# Create a nice HTML visualization (adaptive to light/dark themes) | |
space_display = HTML(f""" | |
<style> | |
:root {{ | |
--light-bg: #f5f5f5; | |
--light-text: #333; | |
--light-sub-bg: #e0e0e0; | |
--dark-bg: #2c2c2c; | |
--dark-text: #f1f1f1; | |
--dark-sub-bg: #444; | |
}} | |
@media (prefers-color-scheme: dark) {{ | |
.space-box {{ | |
background-color: var(--dark-bg); | |
color: var(--dark-text); | |
box-shadow: 0 2px 5px rgba(0,0,0,0.4); | |
}} | |
.space-bar-bg {{ | |
background-color: var(--dark-sub-bg); | |
}} | |
}} | |
@media (prefers-color-scheme: light) {{ | |
.space-box {{ | |
background-color: var(--light-bg); | |
color: var(--light-text); | |
box-shadow: 0 2px 5px rgba(0,0,0,0.1); | |
}} | |
.space-bar-bg {{ | |
background-color: var(--light-sub-bg); | |
}} | |
}} | |
</style> | |
<div class="space-box" style="margin:15px 0; padding:15px; border-radius:10px;"> | |
<h3 style="margin-top:0;">Google Account #{account_num} Successfully Connected!</h3> | |
<div style="display:flex; align-items:center; margin-bottom:10px;"> | |
<div style="width:20%; font-weight:bold;">Storage:</div> | |
<div style="width:80%;"> | |
<div class="space-bar-bg" style="height:20px; border-radius:10px; overflow:hidden;"> | |
<div style="background-color:{color}; height:100%; width:{space_info['used_percent']}; transition:width 1s;"></div> | |
</div> | |
<div style="display:flex; justify-content:space-between; margin-top:5px;"> | |
<span>0</span> | |
<span>{space_info['used_percent']} Used ({space_info['used']} of {space_info['total']})</span> | |
<span>{space_info['total']}</span> | |
</div> | |
</div> | |
</div> | |
<div style="display:flex; align-items:center;"> | |
<div style="width:20%; font-weight:bold;">Available:</div> | |
<div style="width:80%; color:{color}; font-weight:bold; font-size:18px;">{space_info['available']}</div> | |
</div> | |
</div> | |
""") | |
display(space_display) | |
# Return both the drive client and space info | |
return { | |
'drive_client': drive_client, | |
'space_info': space_info | |
} | |
except Exception as e: | |
error_msg = str(e) | |
fancy_print(f"Authentication failed for account #{account_num}: {error_msg}", "error", "#ff6347", True) | |
# Add retry button with better styling | |
retry_button = widgets.Button( | |
description=f'Retry Authentication', | |
button_style='warning', | |
icon='refresh', | |
layout=widgets.Layout(width='200px', height='40px') | |
) | |
display(retry_button) | |
def on_retry_click(b): | |
retry_button.close() | |
authenticate_drive(account_num, previous_accounts) | |
retry_button.on_click(on_retry_click) | |
return None | |
def create_drive_folder(drive_client, folder_name, parent_id=None): | |
try: | |
query = f"title = '{folder_name}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false" | |
if parent_id: | |
query += f" and '{parent_id}' in parents" | |
file_list = drive_client.ListFile({'q': query}).GetList() | |
if file_list: | |
return file_list[0]['id'] | |
else: | |
metadata = {'title': folder_name, 'mimeType': 'application/vnd.google-apps.folder'} | |
if parent_id: | |
metadata['parents'] = [{'id': parent_id}] | |
folder = drive_client.CreateFile(metadata) | |
folder.Upload() | |
return folder['id'] | |
except Exception as e: | |
fancy_print(f"Failed to create or find folder: {str(e)}", "error", "#ff6347") | |
return None | |
def upload_files(drive_client, files, folder_id=None, account_num=1): | |
try: | |
fancy_print(f"Uploading {len(files)} files to Drive #{account_num}...", "upload", "#fff200", True) | |
# Create upload progress display | |
upload_container = widgets.VBox([ | |
widgets.HTML(value=f""" | |
<div style="margin-bottom:10px; font-weight:bold; color:#fff200; font-size:16px;"> | |
Drive #{account_num} Upload Progress | |
</div> | |
"""), | |
widgets.FloatProgress( | |
value=0, | |
min=0, | |
max=len(files), | |
description='Overall:', | |
bar_style='warning', | |
style={'bar_color': '#fff200', 'description_width': '60px'}, | |
layout=widgets.Layout(width='100%', height='20px') | |
), | |
widgets.HTML(value='<div id="upload-progress-text">Starting upload...</div>'), | |
widgets.HTML(value='<div id="current-file-text" style="white-space:nowrap; overflow:hidden; text-overflow:ellipsis; max-width:100%;"></div>') | |
]) | |
display(upload_container) | |
success_count = 0 | |
start_time = time.time() | |
remaining_files = files.copy() | |
for index, file_path in enumerate(files): | |
try: | |
file_name = os.path.basename(file_path) | |
file_size = os.path.getsize(file_path) | |
file_size_readable = format_size(file_size) | |
upload_container.children[1].value = index | |
display(HTML(f""" | |
<script> | |
document.getElementById('upload-progress-text').innerHTML = 'Uploading {index+1} of {len(files)} ({(index+1)/len(files)*100:.1f}%)'; | |
document.getElementById('current-file-text').innerHTML = 'Current file: {file_name} ({file_size_readable})'; | |
</script> | |
""")) | |
drive_file = drive_client.CreateFile({'title': file_name}) | |
if folder_id: | |
drive_file['parents'] = [{'id': folder_id}] | |
drive_file.SetContentFile(file_path) | |
drive_file.Upload() | |
success_count += 1 | |
remaining_files.remove(file_path) | |
elapsed_time = time.time() - start_time | |
avg_speed = (sum([os.path.getsize(f) for f in files[:index+1]]) / elapsed_time) if elapsed_time > 0 else 0 | |
speed_text = format_speed(avg_speed) | |
display(HTML(f""" | |
<script> | |
document.getElementById('upload-progress-text').innerHTML = 'Uploading {index+1} of {len(files)} ({(index+1)/len(files)*100:.1f}%) - Average speed: {speed_text}'; | |
</script> | |
""")) | |
except Exception as e: | |
error_msg = str(e) | |
if "quotaExceeded" in error_msg: | |
fancy_print(f"⚠️ Drive #{account_num} quota exceeded. Please add another drive or free up space.", "warning", "#FFA500", True) | |
# Unmount current drive | |
try: | |
drive.flush_and_unmount() | |
except: | |
pass | |
# Show retry dialog | |
retry_dialog = HTML(f""" | |
<div style="margin:20px 0; padding:20px; background-color:#fff3cd; border-radius:10px; border-left:6px solid #ffc107;"> | |
<h3 style="color:#856404; margin-top:0;">Drive #{account_num} Full</h3> | |
<p>Please choose one of the following options:</p> | |
<ol> | |
<li>Add a different Google Drive account</li> | |
<li>Free up space in the current drive and try again</li> | |
</ol> | |
<p>Remaining files: {len(remaining_files)}</p> | |
</div> | |
""") | |
display(retry_dialog) | |
# Add retry button | |
retry_button = widgets.Button( | |
description='Try Another Drive', | |
button_style='warning', | |
icon='refresh', | |
layout=widgets.Layout(width='200px', height='40px') | |
) | |
display(retry_button) | |
def on_retry_click(b): | |
retry_button.close() | |
# Authenticate new drive | |
new_auth_result = authenticate_drive(account_num + 1, []) | |
if new_auth_result: | |
new_drive_client = new_auth_result['drive_client'] | |
# Create folder in new drive if needed | |
new_folder_id = None | |
if folder_id: | |
folder_name = drive_client.CreateFile({'id': folder_id}).GetMetadata()['title'] | |
new_folder_id = create_drive_folder(new_drive_client, folder_name) | |
# Continue upload with remaining files | |
upload_files(new_drive_client, remaining_files, new_folder_id, account_num + 1) | |
else: | |
fancy_print("Failed to authenticate new drive. Please try again.", "error", "#ff6347", True) | |
retry_button.on_click(on_retry_click) | |
return success_count | |
else: | |
fancy_print(f"Upload failed for {file_name}: {error_msg}", "error", "#ff6347", True) | |
continue | |
upload_container.children[1].value = len(files) | |
display(HTML(""" | |
<script> | |
document.getElementById('upload-progress-text').innerHTML = 'Upload complete!'; | |
document.getElementById('current-file-text').innerHTML = 'All files successfully uploaded'; | |
</script> | |
""")) | |
fancy_print(f"Successfully uploaded {success_count} of {len(files)} files to Drive #{account_num}", "done", "#90ee90", True) | |
return success_count | |
except Exception as e: | |
fancy_print(f"Upload failed: {str(e)}", "error", "#ff6347", True) | |
return 0 | |
def format_size(size_bytes): | |
if size_bytes < 1024: | |
return f"{size_bytes} B" | |
elif size_bytes < 1024 * 1024: | |
return f"{size_bytes/1024:.2f} KB" | |
elif size_bytes < 1024 * 1024 * 1024: | |
return f"{size_bytes/(1024*1024):.2f} MB" | |
else: | |
return f"{size_bytes/(1024*1024*1024):.2f} GB" | |
def format_speed(speed_bytes): | |
if speed_bytes < 1024: | |
return f"{speed_bytes:.2f} B/s" | |
elif speed_bytes < 1024 * 1024: | |
return f"{speed_bytes/1024:.2f} KB/s" | |
elif speed_bytes < 1024 * 1024 * 1024: | |
return f"{speed_bytes/(1024*1024):.2f} MB/s" | |
else: | |
return f"{speed_bytes/(1024*1024*1024):.2f} GB/s" | |
def list_files_recursive(directory): | |
return [os.path.join(root, file) for root, _, files in os.walk(directory) for file in files] | |
def clean_up(temp_dir): | |
try: | |
fancy_print("Cleaning up temporary files...", "clean", "#cccccc") | |
shutil.rmtree(temp_dir, ignore_errors=True) | |
fancy_print("Cleanup complete!", "success", "#90ee90") | |
except Exception as e: | |
fancy_print(f"Cleanup failed: {str(e)}", "warning", "#FFA500") | |
def check_drive_space(): | |
try: | |
# This will mount the drive first | |
drive.mount('/content/drive') | |
# Get drive usage information | |
usage_output = subprocess.check_output(['df', '-h', '/content/drive/My Drive']).decode('utf-8') | |
usage_lines = usage_output.strip().split('\n') | |
if len(usage_lines) >= 2: | |
parts = usage_lines[1].split() | |
if len(parts) >= 5: | |
total = parts[1] | |
used = parts[2] | |
available = parts[3] | |
used_percent = parts[4] | |
# Calculate available space in bytes for precise comparison | |
available_bytes = None | |
try: | |
if 'G' in available: | |
available_bytes = float(available.replace('G', '')) * 1024 * 1024 * 1024 | |
elif 'M' in available: | |
available_bytes = float(available.replace('M', '')) * 1024 * 1024 | |
elif 'T' in available: | |
available_bytes = float(available.replace('T', '')) * 1024 * 1024 * 1024 * 1024 | |
except: | |
pass | |
# Get a unique identifier for this account | |
account_fingerprint = get_unique_account_identifier() | |
return { | |
'total': total, | |
'used': used, | |
'available': available, | |
'used_percent': used_percent, | |
'available_bytes': available_bytes, | |
'account_fingerprint': account_fingerprint | |
} | |
return None | |
except Exception as e: | |
fancy_print(f"Failed to check drive space: {str(e)}", "error", "#ff6347") | |
return None | |
def confirm_another_drive(): | |
"""Show a confirmation dialog for adding another drive""" | |
# Create buttons for confirmation | |
button_container = widgets.HBox([ | |
widgets.Button( | |
description='Yes', | |
button_style='success', | |
icon='check', | |
layout=widgets.Layout(width='100px', height='40px') | |
), | |
widgets.Button( | |
description='No', | |
button_style='danger', | |
icon='times', | |
layout=widgets.Layout(width='100px', height='40px') | |
) | |
]) | |
# Create a container for the message and buttons | |
container = widgets.VBox([ | |
widgets.HTML(""" | |
<div style="margin:15px 0; padding:15px; background-color:#f8f9fa; border-radius:10px; border-left:6px solid #ffc107;"> | |
<h3 style="margin-top:0; color:#ffc107;">⚠️ Drive Quota Exceeded</h3> | |
<p>Would you like to add another Google Drive account to continue uploading?</p> | |
</div> | |
"""), | |
button_container | |
]) | |
display(container) | |
# Create a result variable to store the user's choice | |
result = {'choice': None} | |
def on_yes_clicked(b): | |
try: | |
# Disable the button to prevent multiple clicks | |
b.disabled = True | |
b.description = 'Processing...' | |
# Clear the container first | |
container.close() | |
# Unmount the current drive | |
try: | |
drive.flush_and_unmount() | |
time.sleep(1) | |
except Exception as e: | |
fancy_print(f"Warning: Could not unmount drive: {str(e)}", "warning", "#FFA500") | |
# Clear any stored credentials | |
try: | |
from google.colab import auth | |
auth.unload_ipython_extension(get_ipython()) | |
time.sleep(1) | |
except Exception as e: | |
fancy_print(f"Warning: Could not clear credentials: {str(e)}", "warning", "#FFA500") | |
# Show authentication instructions | |
auth_instructions = HTML(""" | |
<div style="margin:15px 0; padding:15px; background-color:#e8f0fe; border-radius:10px; border-left:6px solid #4285f4;"> | |
<h3 style="margin-top:0; color:#4285f4;">Authentication Instructions</h3> | |
<p><strong>IMPORTANT:</strong> To connect a different Google account:</p> | |
<ol> | |
<li>Click the "Sign in with Google" button that appears</li> | |
<li>If your browser opens Google with a pre-selected account, click <strong>"Use another account"</strong></li> | |
<li>Sign in with a different Google account than previously used</li> | |
</ol> | |
<p>If you keep seeing the same account, try using incognito mode or clearing your browser cookies.</p> | |
</div> | |
""") | |
display(auth_instructions) | |
# Start authentication | |
auth.authenticate_user() | |
result['choice'] = True | |
except Exception as e: | |
fancy_print(f"Error during authentication: {str(e)}", "error", "#ff6347") | |
result['choice'] = False | |
def on_no_clicked(b): | |
result['choice'] = False | |
container.close() | |
# Connect button click handlers | |
button_container.children[0].on_click(on_yes_clicked) | |
button_container.children[1].on_click(on_no_clicked) | |
# Wait for user input | |
while result['choice'] is None: | |
time.sleep(0.1) | |
return result['choice'] | |
def get_unique_account_identifier(): | |
"""Generate a more reliable unique identifier for Google accounts""" | |
try: | |
# Create a test file with unique name in a hidden folder | |
test_dir = '/content/drive/My Drive/.account_verification' | |
os.makedirs(test_dir, exist_ok=True) | |
test_filename = f'verify_{uuid.uuid4().hex}.txt' | |
test_filepath = f'{test_dir}/{test_filename}' | |
# Write something unique to the file | |
unique_content = ''.join(random.choices(string.ascii_letters + string.digits, k=64)) | |
with open(test_filepath, 'w') as f: | |
f.write(unique_content) | |
# Get user info from the drive | |
try: | |
# Try to get email from a special file if it exists | |
email_file = '/content/drive/My Drive/.account_verification/email.txt' | |
if not os.path.exists(email_file): | |
# Create it if it doesn't exist | |
with open(email_file, 'w') as f: | |
f.write(f"account_verification_{uuid.uuid4().hex}") | |
with open(email_file, 'r') as f: | |
email_content = f.read().strip() | |
# Create a hash combining multiple sources of identity | |
identity_string = f"{email_content}:{unique_content}" | |
identity_hash = hashlib.md5(identity_string.encode()).hexdigest() | |
except: | |
# Fallback to a hash of the unique content | |
identity_hash = hashlib.md5(unique_content.encode()).hexdigest() | |
# Clean up test file | |
try: | |
os.remove(test_filepath) | |
except: | |
pass | |
return identity_hash | |
except: | |
# Last resort fallback | |
return str(uuid.uuid4()) | |
# ✅ STEP 4: Main function | |
def main(): | |
# Create a nice header | |
create_header() | |
# Create a temporary directory for downloads and extraction | |
temp_dir = '/content/temp_archives' | |
os.makedirs(temp_dir, exist_ok=True) | |
# Step 1: Get download URL from the user with improved UI | |
url_input = widgets.Text( | |
value='', | |
placeholder='Enter direct download URL or Google Drive link', | |
description='URL:', | |
layout=widgets.Layout(width='80%'), | |
continuous_update=False # Add this to prevent continuous updates | |
) | |
# Add file naming option | |
filename_input = widgets.Text( | |
value='', | |
placeholder='Leave blank for auto-naming', | |
description='Custom filename (optional):', | |
layout=widgets.Layout(width='80%') | |
) | |
# Add folder creation option | |
folder_checkbox = widgets.Checkbox( | |
value=True, | |
description='Create folder in Google Drive', | |
layout=widgets.Layout(width='80%') | |
) | |
folder_name_input = widgets.Text( | |
value='Zippy', | |
placeholder='Enter folder name', | |
description='Folder name:', | |
layout=widgets.Layout(width='80%') | |
) | |
# Submit button with enhanced styling | |
submit_button = widgets.Button( | |
description='Start Process', | |
button_style='success', | |
tooltip='Click to start downloading and uploading', | |
icon='play', | |
layout=widgets.Layout(width='auto', height='auto') | |
) | |
# Add a disclaimer and instructions for better user guidance | |
instructions = HTML(""" | |
<style> | |
:root { | |
--light-bg: #f8f9fa; | |
--light-text: #343a40; | |
--light-border: #6c757d; | |
--light-accent: #dc3545; | |
--dark-bg: #2a2a2a; | |
--dark-text: #e0e0e0; | |
--dark-border: #888888; | |
--dark-accent: #ff4d4f; | |
} | |
@media (prefers-color-scheme: dark) { | |
.instructions-box { | |
background-color: var(--dark-bg); | |
color: var(--dark-text); | |
border-left: 7px solid #FF3D49; | |
transition: border-color 0.4s ease, border-left 0.4s ease; | |
} | |
.custom-input input, .custom-slider input { | |
background: #1e1e1e; | |
color: #fff; | |
border: none; | |
border-radius: 20px; | |
padding: 10px 15px; | |
font-size: 14px; | |
outline: none; | |
box-shadow: 0 2px 4px rgba(255, 255, 255, 0.05); | |
} | |
.custom-button button { | |
background: linear-gradient(135deg, #ff4d4f, #ff6666); | |
color: white; | |
border: none; | |
} | |
} | |
@media (prefers-color-scheme: light) { | |
.instructions-box { | |
background-color: var(--light-bg); | |
color: var(--light-text); | |
border-left: 7px solid #FF3D49; | |
transition: border-color 0.4s ease, border-left 0.4s ease; | |
} | |
.custom-input input, .custom-slider input { | |
background: #fff; | |
color: #000; | |
border: none; | |
border-radius: 20px; | |
padding: 10px 15px; | |
font-size: 14px; | |
outline: none; | |
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); | |
} | |
.custom-button button { | |
background: linear-gradient(135deg, #dc3545, #e74c3c); | |
color: white; | |
border: none; | |
} | |
} | |
.instructions-box h3 { | |
margin-top: 0; | |
} | |
.instructions-box li { | |
margin-bottom: 6px; | |
} | |
/* UI Container Styles */ | |
.ui-container { | |
height: auto; | |
display: flex; | |
flex-direction: column; | |
} | |
.step-box { | |
padding: 20px; | |
margin: 10px 0; | |
border-radius: 10px; | |
background: var(--light-bg); | |
color: var(--light-text); | |
transition: all 0.3s ease; | |
} | |
@media (prefers-color-scheme: dark) { | |
.step-box { | |
background: var(--dark-bg); | |
color: var(--dark-text); | |
} | |
} | |
.step-box:hover { | |
transform: translateY(-2px); | |
box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1); | |
} | |
/* Custom Checkbox Styles */ | |
.custom-checkbox label { | |
display: flex; | |
align-items: center; | |
gap: 10px; | |
font-size: 12px; | |
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
color: #eee; | |
} | |
.custom-checkbox input[type="checkbox"] { | |
appearance: none; | |
width: 30px; | |
height: 16px; | |
background: linear-gradient(145deg, #ff4d4d, #FF1F1F); | |
border-radius: 16px; | |
position: relative; | |
cursor: pointer; | |
outline: none; | |
box-shadow: inset 2px 2px 4px rgba(0,0,0,0.6), inset -2px -2px 4px rgba(255,255,255,0.05); | |
transition: background-color 0.3s ease, box-shadow 0.3s ease; | |
} | |
.custom-checkbox input[type="checkbox"]::before { | |
content: ''; | |
position: absolute; | |
top: 0.3px; | |
left: 1px; | |
width: 14px; | |
height: 14px; | |
background: radial-gradient(circle at 30% 30%, #fff, #ddd); | |
border-radius: 50%; | |
box-shadow: 0 1px 2px rgba(0,0,0,0.4); | |
transition: transform 0.3s cubic-bezier(0.34, 1.56, 0.64, 1), box-shadow 0.3s ease; | |
} | |
.custom-checkbox input[type="checkbox"]:checked { | |
background: var(--accent-green, #00c853); | |
box-shadow: 0 0 4px var(--accent-green, #00c853); | |
} | |
.custom-checkbox input[type="checkbox"]:checked::before { | |
transform: translateX(13.6px); | |
box-shadow: 0 1px 3px rgba(0,0,0,0.4); | |
} | |
/* Custom Slider Styles */ | |
.custom-slider .widget-label { | |
color: var(--text-color); | |
font-size: 14px; | |
margin-bottom: 5px; | |
} | |
.custom-slider input[type="range"] { | |
-webkit-appearance: none; | |
width: 100%; | |
height: 8px; | |
background: linear-gradient(90deg, #ff4d4d, #ff1f1f); | |
border-radius: 4px; | |
outline: none; | |
box-shadow: inset 0 1px 3px rgba(0,0,0,0.3); | |
} | |
.custom-slider input[type="range"]::-webkit-slider-thumb { | |
-webkit-appearance: none; | |
appearance: none; | |
width: 20px; | |
height: 20px; | |
background: radial-gradient(circle at 30% 30%, #fff, #ddd); | |
border-radius: 50%; | |
cursor: pointer; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.4); | |
transition: transform 0.2s ease; | |
} | |
.custom-slider input[type="range"]::-webkit-slider-thumb:hover { | |
transform: scale(1.1); | |
} | |
.custom-slider input[type="range"]::-moz-range-thumb { | |
width: 20px; | |
height: 20px; | |
background: radial-gradient(circle at 30% 30%, #fff, #ddd); | |
border-radius: 50%; | |
cursor: pointer; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.4); | |
transition: transform 0.2s ease; | |
} | |
.custom-slider input[type="range"]::-moz-range-thumb:hover { | |
transform: scale(1.1); | |
} | |
/* Start Process Button Styles */ | |
.start-process-button { | |
background: linear-gradient(45deg, #00c853, #76ff03, #00e676, #00c853); | |
background-size: 400% 400%; | |
animation: gradientAnimation 3s ease infinite; | |
border-radius: 30px; | |
padding: 15px 30px; | |
color: white; | |
border: none; | |
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.2); | |
cursor: pointer; | |
transition: all 0.4s ease-in-out; | |
font-size: 16px; | |
font-weight: bold; | |
text-shadow: 0 2px 4px rgba(0,0,0,0.6); | |
width: calc(100% - 60px) !important; | |
margin: 20px 30px !important; | |
display: block; | |
height: auto; | |
min-height: 50px; | |
} | |
.start-process-button:hover { | |
transform: scale(1.05); | |
box-shadow: 0 6px 14px rgba(0, 0, 0, 0.3), 0 0 15px #76ff03; | |
background: linear-gradient(45deg, #00e676, #00c853, #76ff03, #00e676); | |
background-size: 400% 400%; | |
} | |
.start-process-button:active { | |
transform: scale(0.98); | |
box-shadow: 0 2px 6px rgba(0, 0, 0, 0.3); | |
} | |
@keyframes gradientAnimation { | |
0% { background-position: 0% 50%; } | |
50% { background-position: 100% 50%; } | |
100% { background-position: 0% 50%; } | |
} | |
</style> | |
<div class="ui-container"> | |
<div class="instructions-box" style="padding:15px; border-radius:20px;"> | |
<h3>📝 Instructions & Tips</h3> | |
<ul> | |
<li><strong>URL:</strong> Enter a direct download link or Google Drive link for the archive file</li> | |
<li><strong>Multiple Google Drives:</strong> The system will automatically use multiple drives if needed</li> | |
<li><strong>Supported Archive Types:</strong> .zip, .rar, .7z, .tar.gz, and most other common archive formats</li> | |
<li><strong>Folder Creation:</strong> Creates a folder in your Google Drive(s) to keep uploaded files organized</li> | |
</ul> | |
<p><strong>⚠️ Note:</strong> Please make sure to delete old files in drive and collab before proceed for smooth process</p> | |
<p><strong>⚠️ Note:</strong> The only limit is the limit imposed by the gdrive that is 15gb max if you have premium plans then good to go.</p> | |
<p><strong>⚠️ Note:</strong> Large files may take some time to download, extract, and upload depending on your internet speed.</p> | |
</div> | |
</div> | |
""") | |
display(instructions) | |
# Step 1 | |
display(widgets.HTML(""" | |
<div class='step-box'> | |
<h3>💾 Step 1: Enter Download Information</h3> | |
<p>Provide the download URL and an optional custom filename.</p> | |
</div> | |
""")) | |
url_input.add_class("custom-input") | |
filename_input.add_class("custom-input") | |
display(url_input) | |
display(filename_input) | |
# Step 2 | |
display(widgets.HTML(""" | |
<div class='step-box'> | |
<h3>🔄 Step 2: Configure Upload Options</h3> | |
<p>Select whether to create a folder in Google Drive.</p> | |
</div> | |
""")) | |
folder_checkbox.add_class("custom-checkbox") | |
folder_name_input.add_class("custom-input") | |
display(folder_checkbox) | |
display(folder_name_input) | |
# Step 3 | |
display(widgets.HTML(""" | |
<div class='step-box'> | |
<h3>▶️ Step 3: Start the Process</h3> | |
<p>Click the button below to begin download, extraction, and upload.</p> | |
</div> | |
""")) | |
submit_button.add_class("start-process-button") | |
display(submit_button) | |
# Function to handle the submit button click | |
def on_submit_button_clicked(b): | |
# Disable the button to prevent multiple clicks | |
submit_button.disabled = True | |
submit_button.description = 'Processing...' | |
submit_button.icon = 'spinner' | |
try: | |
url = url_input.value.strip() | |
if not url: | |
submit_button.disabled = False | |
submit_button.description = 'Start Process' | |
submit_button.icon = 'play' | |
return | |
# Clear any previous output | |
clear_output(wait=True) | |
create_header() | |
display(instructions) | |
display(url_input) | |
display(filename_input) | |
display(folder_checkbox) | |
display(folder_name_input) | |
display(submit_button) | |
# Step 2: Download the file | |
if filename_input.value.strip(): | |
custom_filename = filename_input.value.strip() | |
if '.' not in custom_filename: | |
url_filename = url.split('/')[-1].split('?')[0] | |
if '.' in url_filename: | |
ext = url_filename.split('.')[-1] | |
custom_filename = f"{custom_filename}.{ext}" | |
download_path = os.path.join(temp_dir, custom_filename) | |
else: | |
url_filename = url.split('/')[-1].split('?')[0] | |
if url_filename and '.' in url_filename: | |
download_path = os.path.join(temp_dir, url_filename) | |
else: | |
download_path = os.path.join(temp_dir, f"download_{int(time.time())}.zip") | |
download_success = download_with_progress(url, download_path) | |
if not download_success: | |
fancy_print("Download failed. Please check the URL and try again.", "error", "#ff6347", True) | |
return | |
# Step 3: Extract the archive | |
extract_dir = os.path.join(temp_dir, 'extracted') | |
extraction_success = extract_archive(download_path, extract_dir) | |
if not extraction_success: | |
fancy_print("Extraction failed. The file might not be a valid archive.", "error", "#ff6347", True) | |
return | |
# Step 4: Get a list of all extracted files | |
all_files = list_files_recursive(extract_dir) | |
fancy_print(f"Found {len(all_files)} files to upload", "info", "#00ffcc") | |
if len(all_files) == 0: | |
fancy_print("No files found after extraction. The archive might be empty.", "warning", "#FFA500", True) | |
return | |
# Step 5: Calculate total size of files to upload | |
total_size = sum(os.path.getsize(f) for f in all_files) | |
fancy_print(f"Total size to upload: {format_size(total_size)}", "info", "#00ffcc") | |
# Keep track of authenticated accounts to avoid duplicates | |
previous_accounts = [] | |
remaining_files = all_files.copy() | |
account_num = 1 | |
while remaining_files: | |
# Authenticate Google Drive | |
auth_result = authenticate_drive(account_num, previous_accounts) | |
if not auth_result: | |
fancy_print(f"Authentication failed for account #{account_num}. Skipping.", "error", "#ff6347", True) | |
break | |
drive_client = auth_result['drive_client'] | |
space_info = auth_result['space_info'] | |
# Check if there's enough space | |
if space_info.get('available_bytes') and space_info['available_bytes'] < total_size: | |
fancy_print(f"⚠️ Drive #{account_num} doesn't have enough space. Available: {space_info['available']}", | |
"warning", "#FFA500", True) | |
# Show remaining files count | |
fancy_print(f"Remaining files to upload: {len(remaining_files)}", "info", "#00ffcc") | |
# Ask user if they want to try another drive | |
if not confirm_another_drive(): | |
break | |
# Unmount current drive and continue | |
try: | |
drive.flush_and_unmount() | |
time.sleep(1) | |
except: | |
pass | |
continue | |
# Create folder if enabled | |
folder_id = None | |
if folder_checkbox.value: | |
folder_name = folder_name_input.value.strip() if folder_name_input.value.strip() else "Zippy" | |
folder_id = create_drive_folder(drive_client, folder_name) | |
if folder_id: | |
fancy_print(f"Created folder '{folder_name}' in Drive #{account_num}", "success", "#90ee90") | |
else: | |
fancy_print(f"Failed to create folder in Drive #{account_num}", "warning", "#FFA500") | |
# Upload files | |
upload_success = upload_files(drive_client, remaining_files, folder_id, account_num) | |
if upload_success == len(remaining_files): | |
fancy_print(f"All files successfully uploaded to Drive #{account_num}", "done", "#90ee90", True) | |
break | |
else: | |
# Update remaining files | |
remaining_files = remaining_files[upload_success:] | |
fancy_print(f"Uploaded {upload_success} files to Drive #{account_num}. {len(remaining_files)} files remaining.", | |
"warning", "#FFA500", True) | |
# Ask user if they want to try another drive | |
if not confirm_another_drive(): | |
break | |
# Unmount current drive and continue | |
try: | |
drive.flush_and_unmount() | |
time.sleep(1) | |
except: | |
pass | |
account_num += 1 | |
# Clean up temporary files | |
clean_up(temp_dir) | |
# Show final success message | |
fancy_print("Process complete! Your files are now available in your Google Drive(s).", "done", "#00ffcc", True, True) | |
except Exception as e: | |
fancy_print(f"An error occurred: {str(e)}", "error", "#ff6347", True) | |
finally: | |
# Re-enable the button | |
submit_button.disabled = False | |
submit_button.description = 'Start Process' | |
submit_button.icon = 'play' | |
# Connect the button click to the handler | |
submit_button.on_click(on_submit_button_clicked) | |
# ✅ STEP 5: Run the main function | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment