dupecheck.py

Wed, 13 Feb 2019 14:10:55 +0100

author
mdd
date
Wed, 13 Feb 2019 14:10:55 +0100
changeset 36
a1ad6f4728be
parent 35
14c966c10648
child 37
5be334b71b08
permissions
-rw-r--r--

added support for remote ssh dupe checking against local basedir

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Toolkit / executable to scan for duplicate filenames in movie database

2017-2019 by mdd
"""

#pylint: disable=line-too-long
#pylint: disable=invalid-name

from __future__ import print_function
import os, sys, re
import time

RE_PARENTHESES = re.compile("[\(\[].*?[\)\]]")

def similarity(a, b):
    if DIFFLIB:
        return difflib.SequenceMatcher(a=a, b=b).ratio()
    else:
        return Levenshtein.ratio(a, b)

suffixes = ['b', 'K', 'M', 'G', 'T', 'P']
def humansize(nbytes):
    i = 0
    while nbytes >= 1024 and i < len(suffixes)-1:
        nbytes /= 1024.
        i += 1
    f = ('%.2f' % nbytes).rstrip('0').rstrip('.')
    return '%s %s' % (f, suffixes[i])

def replace_all(text, dic):
    for i, j in dic.iteritems():
        text = text.replace(i, j)
    return text

class dupechecker(object):
    """
    Simple class to scan multiple directories recursive,
    build a list of movie filenames.
    analyze the list for duplicates and dump them
    """
    def __init__(self):
        self.basedir = ""
        self.filelist = []
        self.duplicates = {}
        self.ratio = 0.85
        self.ignore_fileprefix = []
        self.ssh = None
        self.ssh_data = None


    def reset(self):
        self.filelist = []
        self.duplicates = {}

    def __scandir_files(self, root, files, extra=[]):
        for filename in files:
            ext = os.path.splitext(filename)[1].lower()
            if ext == ".ts":
                #file_path = os.path.join(root, filename)
                title = filename.split(" - ")
                if len(title) == 1:
                    title = title[0]
                else:
                    title = " - ".join(title[2:])
                title = title[:-3].lower()

                # remove parentheses with contents in title
                title = RE_PARENTHESES.sub("", title)

                self.filelist.append([title, filename, root, ext])
            elif ext in ['.mkv', '.avi', '.mpg', '.mpeg', '.mp4']:
                title = filename[:-4].lower()
                title = RE_PARENTHESES.sub("", title)
                self.filelist.append([title, filename, root, ext])
            elif ext in extra:
                title = filename[:-4].lower()
                title = RE_PARENTHESES.sub("", title)
                self.filelist.append([title, filename, root, ext])


    def scandir(self, basedir, extra=[]):
        """
        Scan a base directory for movie files and add them to
        the list for analyze
        """
        self.basedir = basedir
        print("Scanning directory: %s" % basedir)
        for root, subdirs, files in os.walk(basedir):
            self.__scandir_files(root, files, extra)
        # print(repr(self.filelist))
        # sys.exit()

    def scandir_remote(self, extra=[]):
        """
        connect to remote ssh servers and get file lists for duplicate check
        """
        print("getting filelist from remote hosts...")
        try:
            from config import REMOTE_HOSTS
        except ImportError:
            print("Please configure REMOTE_HOSTS in config.py!")
            sys.exit(1)
        try:
            import paramiko
            self.ssh = paramiko.SSHClient()
            #self.ssh.set_missing_host_key_policy(paramiko.WarningPolicy())
            self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            #self.ssh_key = paramiko.RSAKey.from_private_key_file(SSH_PRIVATE_KEY_FILE)
        except ImportError:
            print("Please install Paramiko!")
            sys.exit(1)

        for host in REMOTE_HOSTS:
            self.ssh_data = host

            cleanlist = []
            lst = self.__ssh_exec('cd %s; ls -1 *.ts' % self.ssh_data['basedir'])[0]
            for item in lst:
                cleanlist.append(item.strip().encode('ascii','ignore'))
            self.__scandir_files("%s: %s" % (
                self.ssh_data['host'], self.ssh_data['basedir']), cleanlist)
            # self.__scandir_files(self.ssh_data['basedir'], cleanlist)
            self.__ssh_disconnect()

    def __ssh_exec(self, command):
        """
        establish ssh connection and execute command
        the connection remains open for following commands until ssh_disconnect is called
        """
        if self.ssh is None:
            return None
        try:
            transport = self.ssh.get_transport()
            if not transport or not transport.is_active():
                print("SSH: connecting to %s" % self.ssh_data['host'])
                self.ssh.connect(self.ssh_data['host'], self.ssh_data['port'], self.ssh_data['user'], self.ssh_data['pass'], self.ssh_data['key'])

            # Send the command (non-blocking)
            stdin, stdout, stderr = self.ssh.exec_command(command)

            # Wait for the command to terminate
            while not stdout.channel.exit_status_ready() and not stdout.channel.recv_ready():
                time.sleep(1)

            stdoutstring = stdout.readlines()
            stderrstring = stderr.readlines()
            return stdoutstring, stderrstring
        finally:
            pass

    def __ssh_disconnect(self):
        """
        check if ssh is connected and disconnect
        """
        if self.ssh is not None:
            # Close client connection.
            transport = self.ssh.get_transport()
            if not transport or not transport.is_active():
                print("SSH: disconnecting")
                self.ssh.close()

    def fixnames(self):
        """
        Search for defect filenames and remove illegal characters
        """
        import re
        for item in self.filelist:
            if not item[3] in ['.mkv', '.txt']:
                continue
            # any non-alphanumeric characters in filename?
            cleanfn = replace_all(item[1], {
                    #'ä':'ae', 'Ä':'Ae',
                    #'ö':'oe', 'Ö':'Oe',
                    #'ü':'ue', 'Ü':'Ue',
                    'ß':'ss',
                })
            cleanfn = re.sub(r'[^A-Za-z0-9\.\_\-\(\)\&öäüÖÄÜ\' ]', '-', cleanfn)
            if item[1] == cleanfn:
                continue
            print (item[1])
            os.rename(
                os.path.join(item[2], item[1]),
                os.path.join(item[2], cleanfn)
                )

    def statistics(self):
        """
        Summarize disk usage and print stats about found filetypes
        """
        stats = {}
        for item in self.filelist:
            if not item[3] in stats:
                stats[item[3]] = [0, 0.0]
            stats[item[3]][0] += 1
            stats[item[3]][1] += os.stat(
                os.path.join(
                    item[2], item[1])).st_size
        print ("%5s %6s %10s" % (
            "File:",
            "Count:",
            "Size:"))
        sum_count = 0
        sum_size = 0.0
        for ext in stats.keys():
            sum_count += stats[ext][0]
            sum_size += stats[ext][1]
            print ("%5s %6i %10s" % (
                ext, stats[ext][0],
                humansize(stats[ext][1])))
        print ("%5s %6i %10s" % (
            "TOTAL", sum_count,
            humansize(sum_size)))


    def analyze(self):
        """
        Analyze the scanlist for duplicates
        """
        listlen = len(self.filelist)
        print("%i files to analyze, running duplicate testing loop..." % (
            listlen))

        # remove potentially unwanted entries from the list
        if len(self.ignore_fileprefix) > 0:
            for idx in reversed(range(listlen)):
                for tst in self.ignore_fileprefix:
                    if tst == '':
                        continue
                    if self.filelist[idx][0].startswith(tst):
                        del self.filelist[idx]
                        break
            listlen = len(self.filelist)

        for idx in range(listlen):
            if not self.filelist[idx]:
                continue
            print("\r%d %s\033[K" % (
                idx, self.filelist[idx][0]), end='')
            sys.stdout.flush()
            for idx2 in range(idx + 1, listlen):
                if self.filelist[idx2]:
                    if similarity(self.filelist[idx][0], self.filelist[idx2][0]) > self.ratio:
                        #print "possible duplicate %d %s" % (idx2, item2[0])
                        key = os.path.join(self.filelist[idx][2], self.filelist[idx][1])
                        if not key in self.duplicates:
                            self.duplicates[key] = []
                        self.duplicates[key].append(
                            os.path.join(
                                self.filelist[idx2][2],
                                self.filelist[idx2][1]
                            ))
                        # unset the found duplicate, so that this will not be scanned again
                        self.filelist[idx2] = None
        print("\n\n")

    def output(self):
        """
        Dump found duplicates to console
        """
        idx = 1
        for base in self.duplicates:
            print("Duplicate file set #%i" % idx)
            print(base)
            for dup in self.duplicates[base]:
                print(dup)
            print()
            idx += 1


if __name__ == "__main__":
    # parse command line options
    import argparse

    parser = argparse.ArgumentParser(\
        description='Movie database filename duplicate checker')
    parser.add_argument('--ratio', type=float, default=0.85, \
        help='filename duplicate threshold 0.1 < ratio 1.0 (default 0.85)')
    parser.add_argument('--difflib', action='store_true', default=False, \
        help='force the use of difflib instead Levenshtein')
    parser.add_argument('--stats', action='store_true', default=False, \
        help='generate stats summary instead of check for duplicates')
    parser.add_argument('--remote', action='store_true', default=False, \
        help='Connect to ssh remotes, eg. dupecheck for dreambox local storage')
    parser.add_argument('--fixnames', action='store_true', default=False, \
        help='scan for mkv and txt, fix broken filenames for windows')
    parser.add_argument('basedir', metavar='basedir', nargs='+', \
        help='one or more base directories')

    args = parser.parse_args()
    dupe = dupechecker()
    dupe.ratio = args.ratio
    if args.difflib:
        DIFFLIB = True
        import difflib
    else:
        try:
            import Levenshtein
            DIFFLIB = False
        except ImportError:
            import difflib
            DIFFLIB = True
            print("Consider 'pip install python-Levenshtein' for faster analyze")

    if os.path.isfile("dupecheck-ignore.txt"):
        # read the entire file line by line into buffer
        print("Loading ignore filename prefixes file for dupe checking...")
        dupe.ignore_fileprefix = [line.rstrip('\n').rstrip('\r') for line in open("dupecheck-ignore.txt", "rb")]

    if args.fixnames:
        for srcstr in args.basedir:
            dupe.scandir(srcstr, ['.txt'])
        if len(dupe.filelist) > 0:
            print ("Checking %i file names..." % len(dupe.filelist))
            dupe.fixnames()
            dupe.filelist = []
        sys.exit(0)

    if args.remote:
        dupe.scandir_remote()

    for srcstr in args.basedir:
        dupe.scandir(srcstr)

    if args.stats or args.fixnames:
        dupe.statistics()
    else:
        dupe.analyze()
        dupe.output()

mercurial