Created
April 23, 2025 00:44
-
-
Save qpwo/ec267f1442fb49945435b07a84472d57 to your computer and use it in GitHub Desktop.
remove duplicate directories with ck1sum
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# dups2 | |
from functools import lru_cache | |
import os | |
import subprocess | |
import csv | |
from concurrent.futures import ProcessPoolExecutor | |
from collections import Counter, defaultdict | |
from typing import Generator | |
import shutil | |
# Output file | |
output_file = "name-sum.csv" | |
@lru_cache | |
def single_sum(filepath: str) -> str | None: | |
result = subprocess.run(['cksum', filepath], capture_output=True, text=True) | |
if result.returncode == 0: | |
parts = result.stdout.strip().split() | |
checksum = parts[0] | |
if checksum: | |
return checksum | |
return None | |
# Function to calculate checksums for a batch of files | |
def calculate_checksums(filepaths: list[str]) -> list[tuple[str, str]]: | |
results = [] | |
for filepath in filepaths: | |
if 'trash' in filepath: | |
continue | |
if not os.path.isfile(filepath): | |
continue | |
try: | |
# Check file size first | |
# file_size = os.path.getsize(filepath) | |
# if file_size < 1_000_000: # 1MB = 1,000,000 bytes | |
# continue | |
result = single_sum(filepath) | |
if result: | |
print(f"{result} \"{filepath}\"") | |
results.append((result, filepath)) | |
except Exception as e: | |
print(f"Error processing {filepath}: {e}") | |
return results | |
def fileancestors(filepath: str) -> Generator[str, None, None]: | |
filepath = filepath.removeprefix('./') | |
parts = filepath.split('/') | |
for prefixlen in range(1, len(parts)): | |
yield '/'.join(parts[:prefixlen]) | |
def trash(directory): | |
# Create trash directory if it doesn't exist | |
trash_dir = os.path.join(os.getcwd(), "trash") | |
os.makedirs(trash_dir, exist_ok=True) | |
# Create target directory in trash | |
target_dir = os.path.join(trash_dir, directory) | |
os.makedirs(target_dir, exist_ok=True) | |
# Move all contents to trash | |
for item in os.listdir(directory): | |
src = os.path.join(directory, item) | |
dst = os.path.join(target_dir, item) | |
shutil.move(src, dst) | |
# Remove the now-empty directory | |
os.rmdir(directory) | |
def one_iter(): | |
all_files = [] | |
for root, _, files in os.walk('.'): | |
for filename in files: | |
# Skip the output file | |
if filename == output_file: | |
continue | |
if 'trash' in filename.split('/'): | |
continue | |
filepath = os.path.join(root, filename) | |
all_files.append(filepath) | |
# Split files into batches | |
batch_size = 10 # Adjust batch size as needed | |
batches = [all_files[i:i + batch_size] for i in range(0, len(all_files), batch_size)] | |
sumsin = defaultdict(set) | |
with ProcessPoolExecutor(max_workers=60) as executor: | |
batch_results = executor.map(calculate_checksums, batches) | |
# Process results | |
for results in batch_results: | |
for checksum, filepath in results: | |
filepath = filepath.strip('./') | |
parts = filepath.split('/') | |
if len(parts) < 2: | |
continue | |
for ancestor in fileancestors(filepath): | |
sumsin[ancestor].add(checksum) | |
sums_dirs: list[tuple[str, str]] = [] | |
for k, v in sumsin.items(): | |
sums = ''.join(sorted(v)) | |
sums_dirs.append((sums, k)) | |
# most sums first: | |
sums_dirs.sort(key=lambda x: len(sumsin[x[1]]), reverse=True) | |
seen = {} | |
for sums, d in sums_dirs: | |
if sums not in seen: | |
seen[sums] = d | |
continue | |
d2 = seen[sums] | |
print(f"{d} is a duplicate of {d2}") | |
try: | |
answer = input(f"Do you want to trash {d} (y/n)? ") | |
if answer.lower() == 'y': | |
print(f"Trashing {d}") | |
trash(d) | |
return | |
except Exception as exc: | |
print(f"{exc=}") | |
while True: | |
try: | |
one_iter() | |
except KeyboardInterrupt: | |
print("Exiting...") | |
break | |
except Exception as exc: | |
print(f"Error: {exc}") | |
continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment