文件表格检测重复生成

基于文件的前256KB的MD5,检测重复文件,生成表格记录文件

网络资讯 2024-11-21 10:44:53 2

导读

初衷……




初衷,因为电脑挂的硬盘都飘红了,如果完全读取文件的MD5值,估计又得几天几夜,再者会伤硬盘,所以AI了一个下午

import os

import hashlib

import pandas as pd

from concurrent.futures import ThreadPoolExecutor, as_completed

from collections import defaultdict

import configparser

import logging

from datetime import datetime, timedelta

import platform

import subprocess

import time

# 配置日志记录

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')

CHUNK_SIZE = 256 * 1024

md5_dict = defaultdict(list)

task_counter = 1

def create_default_config(config_file):

config_content = """[ScanSettings]

# 扫描目录或磁盘,多个目录用逗号分隔

directories =

# 文件大小区间(最小值,最大值),单位是MB

size_range =

# 限制文件格式列表,以逗号分隔,如果不限制格式,请用0

file_formats =

# 线程数

max_workers =

"""

with open(config_file, 'w') as configfile:

configfile.write(config_content)

print(f"生成默认配置文件:{config_file}")

def open_file(file_path):

try:

if platform.system() == "Windows":

os.startfile(file_path)

elif platform.system() == "Darwin":

subprocess.call(["open", file_path])

else:

subprocess.call(["xdgopen", file_path])

except Exception as e:

print(f"打开文件时出错: {e}")

def read_config():

config = configparser.ConfigParser()

config_file = 'Duplicate_config.ini'

if not os.path.exists(config_file):

create_default_config(config_file)

return "配置文件未找到,已生成默认配置文件。"

config.read(config_file)

directories = config['ScanSettings'].get('directories', '').split(',')

directories = [d.strip() for d in directories if d.strip()]

size_range_str = config['ScanSettings'].get('size_range', '').strip()

file_formats = config['ScanSettings'].get('file_formats', '0').strip()

max_workers = config['ScanSettings'].get('max_workers', '').strip()

errors = []

if not directories:

errors.append("directories 为空")

if not size_range_str:

errors.append("size_range 为空")

if not max_workers:

errors.append("max_workers 为空")

if errors:

error_message = ",".join(errors)

print(f"参数错误:{error_message},请填写后重新运行。")

return "配置文件有错误,请修改后重新运行。"

try:

min_size, max_size = [float(x) * (1024 ** 2) for x in size_range_str.split(',')]

except ValueError:

print("无效的 size_range 配置,请确保是两个数字(最小值和最大值),用逗号分隔。")

return "配置文件有错误,请修改后重新运行。"

file_formats_list = file_formats.split(',') if file_formats != '0' else []

max_workers = int(max_workers)

# 打印配置文件内容

print(

f"读取的配置参数: 扫描目录: {config['ScanSettings'].get('directories', '')} 文件大小范围: {size_range_str} MB 文件格式: {config['ScanSettings'].get('file_formats', '')} 线程数: {max_workers}")

return directories, (min_size, max_size), file_formats_list, max_workers

def get_md5(file_path):

try:

with open(file_path, 'rb') as f:

chunk = f.read(CHUNK_SIZE)

md5 = hashlib.md5(chunk).hexdigest()

return md5

except IOError as e:

print(f"读取文件时出错: {file_path}, 错误: {e}")

return None

def format_file_size(file_size):

if (file_size < 1024):

return f"{file_size} 字节"

elif (file_size < 1024 ** 2):

return f"{file_size / 1024:.2f} KB"

elif (file_size < 1024 ** 3):

return f"{file_size / (1024 ** 2):.2f} MB"

else:

return f"{file_size / (1024 ** 3):.2f} GB"

def should_scan(file_path, size_range, file_formats):

file_size = os.path.getsize(file_path)

min_size, max_size = size_range

if (file_size < min_size or (max_size > 0 and file_size > max_size)):

return False

if file_formats:

file_ext = os.path.splitext(file_path)[1].lower()

if file_ext not in file_formats:

return False

return True

def process_file(file_path, size_range, file_formats):

global task_counter

if not should_scan(file_path, size_range, file_formats):

return

md5 = get_md5(file_path)

if md5:

md5_dict[md5].append(file_path)

file_size = os.path.getsize(file_path)

formatted_size = format_file_size(file_size)

logging.info(f"正在处理第 {task_counter} 个文件: {file_path} | 文件大小: {formatted_size} | MD5值: {md5}")

task_counter += 1

def find_duplicates(directories, size_range, file_formats, max_workers=10):

start_time = time.time()

with ThreadPoolExecutor(max_workers=max_workers) as executor:

future_to_file = {

executor.submit(process_file, os.path.join(dirpath, filename), size_range, file_formats): filename

for root_dir in directories

for dirpath, _, filenames in os.walk(root_dir)

for filename in filenames

}

try:

for future in as_completed(future_to_file):

try:

future.result()

except Exception as e:

logging.error(f"文件处理时出错: {e}")

except KeyboardInterrupt:

executor.shutdown(wait=False)

logging.warning(" 扫描被中断。正在清理...")

end_time = time.time()

elapsed_time = end_time start_time

elapsed = timedelta(seconds=elapsed_time)

days, remainder = divmod(elapsed.total_seconds(), 86400)

hours, remainder = divmod(remainder, 3600)

minutes, seconds = divmod(remainder, 60)

logging.info(f"扫描结束,耗时 {int(days)} 天 {int(hours)} 小时 {int(minutes)} 分 {int(seconds)} 秒。")

print_duplicates()

def print_duplicates():

found_duplicates = False

data = []

for md5, files in md5_dict.items():

if len(files) > 1:

found_duplicates = True

for file in files:

file_size = os.path.getsize(file)

formatted_size = format_file_size(file_size)

data.append({"文件名": os.path.basename(file), "文件大小": formatted_size, "MD5": md5, "路径": file})

if not found_duplicates:

logging.info(" 未找到重复文件。")

else:

generate_report(data)

def generate_report(data):

df = pd.DataFrame(data)

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

report_file = f"重复文件报告_{timestamp}.xlsx"

df.to_excel(report_file, index=False)

print(f"重复文件报告已生成:{report_file}")

prompt_open_report(report_file)

def prompt_open_report(report_file):

user_input = input("是否需要打开报告文件?1:打开,0:退出 请输入:")

if user_input == '1':

open_file(report_file)

else:

print("程序结束。")

if __name__ == "__main__":

while True:

config_values = read_config()

if isinstance(config_values, str):

print(config_values)

user_input = input("是否需要打开配置文件?1:打开,0:退出程序 请输入:")

if user_input == '1':

open_file('Duplicate_config.ini')

else:

break

else:

directories, size_range, file_formats, max_workers = config_values

print("扫描开始,请稍候...")

find_duplicates(directories, size_range, file_formats, max_workers=max_workers)

break

user_input = input("1: 重新运行程序, 0: 退出 请输入:")

if user_input == '0':

break

复制代码

链接:通过百度云资源分享的文件:DuplicateFileSearch.exe

链接:https://pan.baidu.com/s/1iSN1PClYyXhSLMoxgJwFqw?pwd=fuli

提取码:fuli

来自百度云资源超级会员V9的分享

如有bug,请指出,谢谢,