Skip to content

Instantly share code, notes, and snippets.

@elmeunick9
Last active May 8, 2024 09:13
Show Gist options
  • Save elmeunick9/9081f37aa28076d7e0fd84c35f4c6a7a to your computer and use it in GitHub Desktop.
Save elmeunick9/9081f37aa28076d7e0fd84c35f4c6a7a to your computer and use it in GitHub Desktop.
Duplicate Images Removal Tool
import os
import cv2
import numpy as np
from sklearn.cluster import KMeans
import shutil
from PIL import Image
def descriptor(image, levels):
# Check if image is None
if image is None:
print("Error: Image is None.")
return None
# Initialize feature array
feature_array = []
# Compute average pixel value of the whole image
color = np.mean(image, axis=(0, 1))
# Append color number to the feature array
feature_array.extend([color[0], color[1], color[2]])
# Divide the image into quadrants
height, width, _ = image.shape
half_height = height // 2
half_width = width // 2
# Recursively compute descriptor for each quadrant
if levels > 1:
# Top-left quadrant
feature_array += descriptor(image[:half_height, :half_width], levels - 1)
# Top-right quadrant
feature_array += descriptor(image[:half_height, half_width:], levels - 1)
# Bottom-left quadrant
feature_array += descriptor(image[half_height:, :half_width], levels - 1)
# Bottom-right quadrant
feature_array += descriptor(image[half_height:, half_width:], levels - 1)
return feature_array
def open_image(image_path):
# Check if the file is a WebP image
if image_path.endswith('.webp'):
# Open the WebP image using Pillow
try:
img = Image.open(image_path)
# Convert the image to RGB mode if it's not already
if img.mode != "RGB":
img = img.convert("RGB")
# Convert the image to numpy array
image = np.array(img)
return image
except Exception as e:
print(f"Error: Unable to open or process WebP image file: {str(e)}")
return None
elif image_path.endswith('.mp4'):
print(f"Error: Unable to open image file. Path: {image_path}")
return None
else:
# For non-WebP images, read the image using OpenCV
image = cv2.imread(image_path)
if image is None:
print(f"Error: Unable to open image file. Path: {image_path}")
return None
else:
return image
def classify(paths, features, num_clusters):
model = KMeans(n_clusters=num_clusters, random_state=0)
labels = model.fit_predict(features)
cluster_results = [[] for _ in range(num_clusters)]
for i, path in enumerate(paths):
cluster_results[labels[i]].append(path)
return cluster_results
def compare(image_paths, window_title="Comparison"):
# Create a window to display images
cv2.namedWindow(window_title, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_title, 800, 600)
images = [open_image(image_path) for image_path in image_paths]
# Find maximum height and total width
max_height = max(image.shape[0] for image in images)
total_width = sum(image.shape[1] for image in images)
# Create a blank canvas to place images
canvas = np.zeros((max_height, total_width, 3), dtype=np.uint8)
# Paste images onto the canvas
current_width = 0
for image in images:
height, width, _ = image.shape
canvas[:height, current_width:current_width+width] = image
current_width += width
# Display the image
cv2.imshow(window_title, cv2.cvtColor(canvas, cv2.COLOR_BGR2RGB))
# Wait for key press
key = cv2.waitKey(0)
# Check the pressed key
r = "skip"
if key == ord('s') or key == ord('S'):
r = "same"
elif key == ord('d') or key == ord('D'):
r = "different"
# Destroy the window
cv2.destroyAllWindows()
return r
def copy(cluster_results, output_directory, failed_paths=[]):
# Create output directory if it doesn't exist
if not os.path.exists(output_directory):
os.makedirs(output_directory)
else:
shutil.rmtree(output_directory)
os.makedirs(output_directory)
diff_directory = os.path.join(output_directory, "diff")
os.makedirs(diff_directory)
# Iterate over each cluster and its corresponding paths
for cluster_index, paths in enumerate(cluster_results):
if len(paths) <= 1:
for path in paths:
file_name = os.path.basename(path)
shutil.copyfile(path, os.path.join(diff_directory, file_name))
continue
result = compare(paths, f"Cluster {cluster_index}")
print(f"Cluster {cluster_index}: {result}")
if result == "skip":
# Create cluster directory
cluster_directory = os.path.join(output_directory, f"cluster_{cluster_index}")
os.makedirs(cluster_directory)
# Copy each file to the cluster directory
for path in paths:
file_name = os.path.basename(path)
shutil.copyfile(path, os.path.join(cluster_directory, file_name))
elif result == "same":
path = paths[0]
file_name = os.path.basename(path)
shutil.copyfile(path, os.path.join(diff_directory, file_name))
elif result == "different":
for path in paths:
file_name = os.path.basename(path)
shutil.copyfile(path, os.path.join(diff_directory, file_name))
if len(failed_paths) > 0:
failed_directory = os.path.join(output_directory, "failed")
os.makedirs(failed_directory)
for path in failed_paths:
file_name = os.path.basename(path)
shutil.copyfile(path, os.path.join(failed_directory, file_name))
def main():
print("Initializing...")
files = os.listdir("in")
print(f"Found {len(files)} files.")
print("Computing features...")
image_paths = []
failed_paths = []
feature_set = []
# Loop over the file names and join them with the directory path
for i, file_name in enumerate(files):
# Get the full path by joining the directory path and the file name
file_path = os.path.join("in", file_name)
image = open_image(file_path)
if image is None:
failed_paths.append(file_path)
continue
features = descriptor(image, 3)
feature_set.append(features)
image_paths.append(file_path)
if (i + 1) % 100 == 0:
print(f"Processed {i + 1} images.")
print("Classifying...")
labels = classify(image_paths, feature_set, int(len(files) // 1.1))
print("Copying images...")
copy(labels, "out", failed_paths)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment