Last active
March 3, 2025 10:21
-
-
Save pixelmager/098e2a162853f46bfc98ee984250b41d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import hashlib | |
from collections import defaultdict | |
from concurrent.futures import ThreadPoolExecutor | |
def hash_file(file_path, hash_algorithm='sha256'): | |
# For simplicity, this example uses MD5 hashing. You can switch to another hash algorithm if needed. | |
return hashlib.md5(open(file_path, 'rb').read()).hexdigest(), file_path | |
def compare_files(file1, file2): | |
""" | |
Perform a byte-for-byte comparison of two files. | |
""" | |
with open(file1, 'rb') as f1, open(file2, 'rb') as f2: | |
while True: | |
chunk1 = f1.read(8192) | |
chunk2 = f2.read(8192) | |
if chunk1 != chunk2: | |
return False, file1, file2 | |
if not chunk1: # End of file reached | |
break | |
return True, file1, file2 | |
# Function to convert bytes to a human-readable format | |
def human_readable_size(size_in_bytes): | |
for unit in ['bytes', 'KB', 'MB', 'GB', 'TB']: | |
if size_in_bytes < 1024: | |
return f"{size_in_bytes:.2f} {unit}" | |
size_in_bytes /= 1024 | |
def find_identical_files(folder_path, extension='.tga', max_threads=8): | |
""" | |
Recursively finds all files with the given extension in a folder, | |
first culls based on filesize to reduce unnecessary hashing, and then hashes them using threads. | |
""" | |
file_paths = [] | |
# Collect all file paths with the given extension | |
for dirpath, _, filenames in os.walk(folder_path): | |
for filename in filenames: | |
if filename.lower().endswith(extension): | |
file_path = os.path.join(dirpath, filename) | |
file_paths.append(file_path) | |
print("Total files found:", len(file_paths)) | |
# Initial culling: group files by size | |
size_groups = defaultdict(list) | |
for file_path in file_paths: | |
try: | |
size = os.path.getsize(file_path) | |
except OSError: | |
continue | |
size_groups[size].append(file_path) | |
# Only keep files from size groups with more than one file (possible duplicates) | |
culled_file_paths = [] | |
for size, paths in size_groups.items(): | |
if len(paths) > 1: | |
culled_file_paths.extend(paths) | |
# print("Files after culling by size (potential duplicates):", len(culled_file_paths)) | |
# Dictionary to store the hash -> list of file paths | |
file_hashes = defaultdict(list) | |
# Create a ThreadPoolExecutor to hash files concurrently | |
with ThreadPoolExecutor(max_workers=max_threads) as executor: | |
futures = [] | |
# Submit the hashing tasks for all culled files | |
for file_path in culled_file_paths: | |
futures.append(executor.submit(hash_file, file_path)) | |
# Process the results as they finish | |
for future in futures: | |
file_hash, file_path = future.result() | |
file_hashes[file_hash].append(file_path) | |
# Now, perform byte-for-byte comparison to filter out non-identical files | |
with ThreadPoolExecutor(max_workers=max_threads) as executor: | |
futures = [] | |
for hash_val, files in file_hashes.items(): | |
if len(files) > 1: # Only compare files if there are multiple files with the same hash | |
for i in range(len(files)): | |
for j in range(i + 1, len(files)): | |
futures.append(executor.submit(compare_files, files[i], files[j])) | |
# Filter out non-identical files by comparing them byte-by-byte | |
for future in futures: | |
are_identical, file1, file2 = future.result() | |
if not are_identical: | |
# Remove non-identical files from the file_hashes dictionary | |
for hash_val in list(file_hashes.keys()): | |
if file1 in file_hashes[hash_val]: | |
file_hashes[hash_val].remove(file1) | |
if file2 in file_hashes[hash_val]: | |
file_hashes[hash_val].remove(file2) | |
# Clean up empty hash entries | |
for hash_val in list(file_hashes.keys()): | |
if not file_hashes[hash_val]: | |
del file_hashes[hash_val] | |
# Sort the hash entries by the number of duplicates in descending order | |
sorted_file_hashes = sorted(file_hashes.items(), key=lambda x: len(x[1]), reverse=True) | |
# Output the final hash table with only identical files | |
for hash_val, files in sorted_file_hashes: | |
if len(files) > 1: # Only print hash entries with more than one file | |
print(f"\nIdentical files ({len(files)}):") | |
for file in files: | |
print(f" {file}") | |
# Calculate current total size of files that have duplicates | |
current_size_duplicates = sum(os.path.getsize(file) for files in file_hashes.values() if len(files) > 1 for file in files) | |
# Calculate new size if only unique files existed (i.e., remove duplicates) | |
new_size_duplicates = sum(os.path.getsize(files[0]) for files in file_hashes.values() if len(files) > 1) | |
print("") | |
print(f"Current total size of files with duplicates: {human_readable_size(current_size_duplicates)}") | |
print(f"New size if only unique files existed (duplicates removed): {human_readable_size(new_size_duplicates)}") | |
if len(sys.argv) != 2: | |
print("invalid input") | |
else: | |
find_identical_files(sys.argv[1], max_threads=os.cpu_count()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment