Skip to content

Instantly share code, notes, and snippets.

@pixelmager
Last active March 3, 2025 10:21
Show Gist options
  • Save pixelmager/098e2a162853f46bfc98ee984250b41d to your computer and use it in GitHub Desktop.
Save pixelmager/098e2a162853f46bfc98ee984250b41d to your computer and use it in GitHub Desktop.
import os
import sys
import hashlib
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
def hash_file(file_path, hash_algorithm='sha256'):
# For simplicity, this example uses MD5 hashing. You can switch to another hash algorithm if needed.
return hashlib.md5(open(file_path, 'rb').read()).hexdigest(), file_path
def compare_files(file1, file2):
"""
Perform a byte-for-byte comparison of two files.
"""
with open(file1, 'rb') as f1, open(file2, 'rb') as f2:
while True:
chunk1 = f1.read(8192)
chunk2 = f2.read(8192)
if chunk1 != chunk2:
return False, file1, file2
if not chunk1: # End of file reached
break
return True, file1, file2
# Function to convert bytes to a human-readable format
def human_readable_size(size_in_bytes):
for unit in ['bytes', 'KB', 'MB', 'GB', 'TB']:
if size_in_bytes < 1024:
return f"{size_in_bytes:.2f} {unit}"
size_in_bytes /= 1024
def find_identical_files(folder_path, extension='.tga', max_threads=8):
"""
Recursively finds all files with the given extension in a folder,
first culls based on filesize to reduce unnecessary hashing, and then hashes them using threads.
"""
file_paths = []
# Collect all file paths with the given extension
for dirpath, _, filenames in os.walk(folder_path):
for filename in filenames:
if filename.lower().endswith(extension):
file_path = os.path.join(dirpath, filename)
file_paths.append(file_path)
print("Total files found:", len(file_paths))
# Initial culling: group files by size
size_groups = defaultdict(list)
for file_path in file_paths:
try:
size = os.path.getsize(file_path)
except OSError:
continue
size_groups[size].append(file_path)
# Only keep files from size groups with more than one file (possible duplicates)
culled_file_paths = []
for size, paths in size_groups.items():
if len(paths) > 1:
culled_file_paths.extend(paths)
# print("Files after culling by size (potential duplicates):", len(culled_file_paths))
# Dictionary to store the hash -> list of file paths
file_hashes = defaultdict(list)
# Create a ThreadPoolExecutor to hash files concurrently
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
# Submit the hashing tasks for all culled files
for file_path in culled_file_paths:
futures.append(executor.submit(hash_file, file_path))
# Process the results as they finish
for future in futures:
file_hash, file_path = future.result()
file_hashes[file_hash].append(file_path)
# Now, perform byte-for-byte comparison to filter out non-identical files
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
for hash_val, files in file_hashes.items():
if len(files) > 1: # Only compare files if there are multiple files with the same hash
for i in range(len(files)):
for j in range(i + 1, len(files)):
futures.append(executor.submit(compare_files, files[i], files[j]))
# Filter out non-identical files by comparing them byte-by-byte
for future in futures:
are_identical, file1, file2 = future.result()
if not are_identical:
# Remove non-identical files from the file_hashes dictionary
for hash_val in list(file_hashes.keys()):
if file1 in file_hashes[hash_val]:
file_hashes[hash_val].remove(file1)
if file2 in file_hashes[hash_val]:
file_hashes[hash_val].remove(file2)
# Clean up empty hash entries
for hash_val in list(file_hashes.keys()):
if not file_hashes[hash_val]:
del file_hashes[hash_val]
# Sort the hash entries by the number of duplicates in descending order
sorted_file_hashes = sorted(file_hashes.items(), key=lambda x: len(x[1]), reverse=True)
# Output the final hash table with only identical files
for hash_val, files in sorted_file_hashes:
if len(files) > 1: # Only print hash entries with more than one file
print(f"\nIdentical files ({len(files)}):")
for file in files:
print(f" {file}")
# Calculate current total size of files that have duplicates
current_size_duplicates = sum(os.path.getsize(file) for files in file_hashes.values() if len(files) > 1 for file in files)
# Calculate new size if only unique files existed (i.e., remove duplicates)
new_size_duplicates = sum(os.path.getsize(files[0]) for files in file_hashes.values() if len(files) > 1)
print("")
print(f"Current total size of files with duplicates: {human_readable_size(current_size_duplicates)}")
print(f"New size if only unique files existed (duplicates removed): {human_readable_size(new_size_duplicates)}")
if len(sys.argv) != 2:
print("invalid input")
else:
find_identical_files(sys.argv[1], max_threads=os.cpu_count())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment