Last active
May 8, 2024 09:13
-
-
Save elmeunick9/9081f37aa28076d7e0fd84c35f4c6a7a to your computer and use it in GitHub Desktop.
Duplicate Images Removal Tool
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import cv2 | |
import numpy as np | |
from sklearn.cluster import KMeans | |
import shutil | |
from PIL import Image | |
def descriptor(image, levels): | |
# Check if image is None | |
if image is None: | |
print("Error: Image is None.") | |
return None | |
# Initialize feature array | |
feature_array = [] | |
# Compute average pixel value of the whole image | |
color = np.mean(image, axis=(0, 1)) | |
# Append color number to the feature array | |
feature_array.extend([color[0], color[1], color[2]]) | |
# Divide the image into quadrants | |
height, width, _ = image.shape | |
half_height = height // 2 | |
half_width = width // 2 | |
# Recursively compute descriptor for each quadrant | |
if levels > 1: | |
# Top-left quadrant | |
feature_array += descriptor(image[:half_height, :half_width], levels - 1) | |
# Top-right quadrant | |
feature_array += descriptor(image[:half_height, half_width:], levels - 1) | |
# Bottom-left quadrant | |
feature_array += descriptor(image[half_height:, :half_width], levels - 1) | |
# Bottom-right quadrant | |
feature_array += descriptor(image[half_height:, half_width:], levels - 1) | |
return feature_array | |
def open_image(image_path): | |
# Check if the file is a WebP image | |
if image_path.endswith('.webp'): | |
# Open the WebP image using Pillow | |
try: | |
img = Image.open(image_path) | |
# Convert the image to RGB mode if it's not already | |
if img.mode != "RGB": | |
img = img.convert("RGB") | |
# Convert the image to numpy array | |
image = np.array(img) | |
return image | |
except Exception as e: | |
print(f"Error: Unable to open or process WebP image file: {str(e)}") | |
return None | |
elif image_path.endswith('.mp4'): | |
print(f"Error: Unable to open image file. Path: {image_path}") | |
return None | |
else: | |
# For non-WebP images, read the image using OpenCV | |
image = cv2.imread(image_path) | |
if image is None: | |
print(f"Error: Unable to open image file. Path: {image_path}") | |
return None | |
else: | |
return image | |
def classify(paths, features, num_clusters): | |
model = KMeans(n_clusters=num_clusters, random_state=0) | |
labels = model.fit_predict(features) | |
cluster_results = [[] for _ in range(num_clusters)] | |
for i, path in enumerate(paths): | |
cluster_results[labels[i]].append(path) | |
return cluster_results | |
def compare(image_paths, window_title="Comparison"): | |
# Create a window to display images | |
cv2.namedWindow(window_title, cv2.WINDOW_NORMAL) | |
cv2.resizeWindow(window_title, 800, 600) | |
images = [open_image(image_path) for image_path in image_paths] | |
# Find maximum height and total width | |
max_height = max(image.shape[0] for image in images) | |
total_width = sum(image.shape[1] for image in images) | |
# Create a blank canvas to place images | |
canvas = np.zeros((max_height, total_width, 3), dtype=np.uint8) | |
# Paste images onto the canvas | |
current_width = 0 | |
for image in images: | |
height, width, _ = image.shape | |
canvas[:height, current_width:current_width+width] = image | |
current_width += width | |
# Display the image | |
cv2.imshow(window_title, cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB)) | |
# Wait for key press | |
key = cv2.waitKey(0) | |
# Check the pressed key | |
r = "skip" | |
if key == ord('s') or key == ord('S'): | |
r = "same" | |
elif key == ord('d') or key == ord('D'): | |
r = "different" | |
# Destroy the window | |
cv2.destroyAllWindows() | |
return r | |
def copy(cluster_results, output_directory, failed_paths=[]): | |
# Create output directory if it doesn't exist | |
if not os.path.exists(output_directory): | |
os.makedirs(output_directory) | |
else: | |
shutil.rmtree(output_directory) | |
os.makedirs(output_directory) | |
diff_directory = os.path.join(output_directory, "diff") | |
os.makedirs(diff_directory) | |
# Iterate over each cluster and its corresponding paths | |
for cluster_index, paths in enumerate(cluster_results): | |
if len(paths) <= 1: | |
for path in paths: | |
file_name = os.path.basename(path) | |
shutil.copyfile(path, os.path.join(diff_directory, file_name)) | |
continue | |
result = compare(paths, f"Cluster {cluster_index}") | |
print(f"Cluster {cluster_index}: {result}") | |
if result == "skip": | |
# Create cluster directory | |
cluster_directory = os.path.join(output_directory, f"cluster_{cluster_index}") | |
os.makedirs(cluster_directory) | |
# Copy each file to the cluster directory | |
for path in paths: | |
file_name = os.path.basename(path) | |
shutil.copyfile(path, os.path.join(cluster_directory, file_name)) | |
elif result == "same": | |
path = paths[0] | |
file_name = os.path.basename(path) | |
shutil.copyfile(path, os.path.join(diff_directory, file_name)) | |
elif result == "different": | |
for path in paths: | |
file_name = os.path.basename(path) | |
shutil.copyfile(path, os.path.join(diff_directory, file_name)) | |
if len(failed_paths) > 0: | |
failed_directory = os.path.join(output_directory, "failed") | |
os.makedirs(failed_directory) | |
for path in failed_paths: | |
file_name = os.path.basename(path) | |
shutil.copyfile(path, os.path.join(failed_directory, file_name)) | |
def main(): | |
print("Initializing...") | |
files = os.listdir("in") | |
print(f"Found {len(files)} files.") | |
print("Computing features...") | |
image_paths = [] | |
failed_paths = [] | |
feature_set = [] | |
# Loop over the file names and join them with the directory path | |
for i, file_name in enumerate(files): | |
# Get the full path by joining the directory path and the file name | |
file_path = os.path.join("in", file_name) | |
image = open_image(file_path) | |
if image is None: | |
failed_paths.append(file_path) | |
continue | |
features = descriptor(image, 3) | |
feature_set.append(features) | |
image_paths.append(file_path) | |
if (i + 1) % 100 == 0: | |
print(f"Processed {i + 1} images.") | |
print("Classifying...") | |
labels = classify(image_paths, feature_set, int(len(files) // 1.1)) | |
print("Copying images...") | |
copy(labels, "out", failed_paths) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment