Created
February 21, 2018 09:13
-
-
Save conrad784/cb0aa0aff2619fa43d34d8c2bff79fb5 to your computer and use it in GitHub Desktop.
python script to scan directory for duplicate files and keeping the latest file
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# (C) 2018 Conrad Sachweh | |
"""NAME | |
%(prog)s - <description> | |
SYNOPSIS | |
%(prog)s [--help] | |
DESCRIPTION | |
none | |
FILES | |
none | |
SEE ALSO | |
nothing | |
DIAGNOSTICS | |
none | |
BUGS | |
none | |
AUTHOR | |
Conrad Sachweh, [email protected] | |
""" | |
#--------- Classes, Functions, etc --------------------------------------------- | |
def checksum(filename, algo="sha256", block_size=65536): | |
""" | |
read file with chunk sizes, 65536 = 64kb chunk | |
""" | |
import hashlib | |
hashFunction = getattr(hashlib, algo)() | |
with open(filename, 'rb', buffering=0) as f: | |
for block in iter(lambda: f.read(block_size), b''): | |
hashFunction.update(block) | |
return filename, hashFunction.hexdigest() | |
# functions from https://stackoverflow.com/a/8558403 | |
def walk_files(topdir): | |
"""yield up full pathname for each file in tree under topdir""" | |
import os | |
for dirpath, dirnames, filenames in os.walk(topdir): | |
for fname in filenames: | |
pathname = os.path.join(dirpath, fname) | |
yield pathname | |
def files_to_process(topdir, size_limit=10000000): | |
""" | |
yield up full pathname for only files we want to process | |
size_limit in bytes | |
""" | |
import os | |
from stat import S_ISREG | |
for fname in walk_files(topdir): | |
try: sr = os.stat(fname) | |
except OSError: pass | |
else: | |
# if it is a regular file and small enough, we want to process it | |
if S_ISREG(sr.st_mode) and sr.st_size <= size_limit: | |
yield fname | |
def get_file_info(files): | |
import os | |
info = {} | |
for item in files: | |
st = os.stat(item) | |
info[item] = {"size": st.st_size, "timestamp": st.st_mtime} | |
return info | |
def get_filesize(files): | |
finfo = get_file_info(files) | |
size = 0 | |
for fname, info in finfo.items(): | |
size += info.get("size") | |
return size | |
def get_latest(finfo): | |
mtime = 0 | |
latestfile = None | |
for fname, info in finfo.items(): | |
if info.get("timestamp") > mtime: | |
mtime = info.get("timestamp") | |
latestfile = fname | |
return latestfile, mtime | |
def sizeof_fmt(num, suffix='B'): | |
""" | |
https://stackoverflow.com/a/1094933 | |
""" | |
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']: | |
if abs(num) < 1024.0: | |
return "%3.1f%s%s" % (num, unit, suffix) | |
num /= 1024.0 | |
return "%.1f%s%s" % (num, 'Yi', suffix) | |
#------------------------------------------------------------------------------- | |
# Main | |
#------------------------------------------------------------------------------- | |
if __name__=="__main__": | |
import sys, os, glob | |
from multiprocessing import Pool, freeze_support | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-r', '--recursive', action='store_true', | |
help="do this recursive") | |
parser.add_argument('-v', '--verbose', action='count', default=0, | |
help='show more verbose output') | |
parser.add_argument('--cores', action='store', default=1, type=int, | |
help='hashing is usually I/O bound, but feel free to increase this for high performance storage devices or only small files') | |
parser.add_argument('--dry-run', action='store_true', | |
help="only index directory, don't actually delete file") | |
parser.add_argument('directory', nargs=1, help='search directory') | |
args = parser.parse_args() | |
if args.verbose: | |
print("[INFO]", args) | |
# fix directory for invalid inputs | |
mdir = args.directory[0] | |
if not mdir.endswith("/"): | |
mdir = mdir + "/" | |
freeze_support() | |
nprocesses = args.cores | |
pool = Pool(processes=nprocesses) | |
from collections import defaultdict | |
allFiles = defaultdict(list) | |
# get the files to look at | |
sizeLimit = 2000000000 # ~1.9GB | |
files = files_to_process(mdir, sizeLimit) | |
print("Scanning directory {}".format(mdir)) | |
# initializing progress bar | |
l = len(list(files_to_process(mdir, sizeLimit))) | |
from tqdm import tqdm | |
pbar = tqdm(total=l) | |
# calculate checksums paralellized | |
for fname, hexdigest in pool.imap_unordered(checksum, files): | |
pbar.update(1) | |
allFiles[hexdigest].append(fname) | |
pbar.close() | |
print("Evaluating for duplicates") | |
deleteFiles = [] | |
for hexdigest, files in allFiles.items(): | |
if len(files) > 1: | |
finfo = get_file_info(files) | |
latestfile = get_latest(finfo) | |
for item in files: | |
if not latestfile[0] == item: | |
deleteFiles.append(item) | |
print("Going to delete {} files. You will gain {}.".format(len(deleteFiles), sizeof_fmt(get_filesize(deleteFiles)))) | |
if deleteFiles: | |
print(deleteFiles) | |
decision = input('Do you really want to delete those files? [y/N] ') | |
if decision.startswith("y") and not args.dry_run: | |
for item in deleteFiles: | |
os.remove(item) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment