Skip to content

Instantly share code, notes, and snippets.

@qpwo
Created April 23, 2025 00:44
Show Gist options
  • Save qpwo/ec267f1442fb49945435b07a84472d57 to your computer and use it in GitHub Desktop.
Save qpwo/ec267f1442fb49945435b07a84472d57 to your computer and use it in GitHub Desktop.
remove duplicate directories with ck1sum
#!/usr/bin/env python3
# dups2
from functools import lru_cache
import os
import subprocess
import csv
from concurrent.futures import ProcessPoolExecutor
from collections import Counter, defaultdict
from typing import Generator
import shutil
# Output file
output_file = "name-sum.csv"
@lru_cache
def single_sum(filepath: str) -> str | None:
result = subprocess.run(['cksum', filepath], capture_output=True, text=True)
if result.returncode == 0:
parts = result.stdout.strip().split()
checksum = parts[0]
if checksum:
return checksum
return None
# Function to calculate checksums for a batch of files
def calculate_checksums(filepaths: list[str]) -> list[tuple[str, str]]:
results = []
for filepath in filepaths:
if 'trash' in filepath:
continue
if not os.path.isfile(filepath):
continue
try:
# Check file size first
# file_size = os.path.getsize(filepath)
# if file_size < 1_000_000: # 1MB = 1,000,000 bytes
# continue
result = single_sum(filepath)
if result:
print(f"{result} \"{filepath}\"")
results.append((result, filepath))
except Exception as e:
print(f"Error processing {filepath}: {e}")
return results
def fileancestors(filepath: str) -> Generator[str, None, None]:
filepath = filepath.removeprefix('./')
parts = filepath.split('/')
for prefixlen in range(1, len(parts)):
yield '/'.join(parts[:prefixlen])
def trash(directory):
# Create trash directory if it doesn't exist
trash_dir = os.path.join(os.getcwd(), "trash")
os.makedirs(trash_dir, exist_ok=True)
# Create target directory in trash
target_dir = os.path.join(trash_dir, directory)
os.makedirs(target_dir, exist_ok=True)
# Move all contents to trash
for item in os.listdir(directory):
src = os.path.join(directory, item)
dst = os.path.join(target_dir, item)
shutil.move(src, dst)
# Remove the now-empty directory
os.rmdir(directory)
def one_iter():
all_files = []
for root, _, files in os.walk('.'):
for filename in files:
# Skip the output file
if filename == output_file:
continue
if 'trash' in filename.split('/'):
continue
filepath = os.path.join(root, filename)
all_files.append(filepath)
# Split files into batches
batch_size = 10 # Adjust batch size as needed
batches = [all_files[i:i + batch_size] for i in range(0, len(all_files), batch_size)]
sumsin = defaultdict(set)
with ProcessPoolExecutor(max_workers=60) as executor:
batch_results = executor.map(calculate_checksums, batches)
# Process results
for results in batch_results:
for checksum, filepath in results:
filepath = filepath.strip('./')
parts = filepath.split('/')
if len(parts) < 2:
continue
for ancestor in fileancestors(filepath):
sumsin[ancestor].add(checksum)
sums_dirs: list[tuple[str, str]] = []
for k, v in sumsin.items():
sums = ''.join(sorted(v))
sums_dirs.append((sums, k))
# most sums first:
sums_dirs.sort(key=lambda x: len(sumsin[x[1]]), reverse=True)
seen = {}
for sums, d in sums_dirs:
if sums not in seen:
seen[sums] = d
continue
d2 = seen[sums]
print(f"{d} is a duplicate of {d2}")
try:
answer = input(f"Do you want to trash {d} (y/n)? ")
if answer.lower() == 'y':
print(f"Trashing {d}")
trash(d)
return
except Exception as exc:
print(f"{exc=}")
while True:
try:
one_iter()
except KeyboardInterrupt:
print("Exiting...")
break
except Exception as exc:
print(f"Error: {exc}")
continue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment