基于文件的前256KB的MD5,检测重复文件,生成表格记录文件
导读
初衷,因为电脑挂的硬盘都飘红了,如果完全读取文件的MD5值,估计又得几天几夜,再者会伤硬盘,所以AI了一个下午
import os
import hashlib
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import defaultdict
import configparser
import logging
from datetime import datetime, timedelta
import platform
import subprocess
import time
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
CHUNK_SIZE = 256 * 1024
md5_dict = defaultdict(list)
task_counter = 1
def create_default_config(config_file):
config_content = """[ScanSettings]
# 扫描目录或磁盘,多个目录用逗号分隔
directories =
# 文件大小区间(最小值,最大值),单位是MB
size_range =
# 限制文件格式列表,以逗号分隔,如果不限制格式,请用0
file_formats =
# 线程数
max_workers =
"""
with open(config_file, 'w') as configfile:
configfile.write(config_content)
print(f"生成默认配置文件:{config_file}")
def open_file(file_path):
try:
if platform.system() == "Windows":
os.startfile(file_path)
elif platform.system() == "Darwin":
subprocess.call(["open", file_path])
else:
subprocess.call(["xdgopen", file_path])
except Exception as e:
print(f"打开文件时出错: {e}")
def read_config():
config = configparser.ConfigParser()
config_file = 'Duplicate_config.ini'
if not os.path.exists(config_file):
create_default_config(config_file)
return "配置文件未找到,已生成默认配置文件。"
config.read(config_file)
directories = config['ScanSettings'].get('directories', '').split(',')
directories = [d.strip() for d in directories if d.strip()]
size_range_str = config['ScanSettings'].get('size_range', '').strip()
file_formats = config['ScanSettings'].get('file_formats', '0').strip()
max_workers = config['ScanSettings'].get('max_workers', '').strip()
errors = []
if not directories:
errors.append("directories 为空")
if not size_range_str:
errors.append("size_range 为空")
if not max_workers:
errors.append("max_workers 为空")
if errors:
error_message = ",".join(errors)
print(f"参数错误:{error_message},请填写后重新运行。")
return "配置文件有错误,请修改后重新运行。"
try:
min_size, max_size = [float(x) * (1024 ** 2) for x in size_range_str.split(',')]
except ValueError:
print("无效的 size_range 配置,请确保是两个数字(最小值和最大值),用逗号分隔。")
return "配置文件有错误,请修改后重新运行。"
file_formats_list = file_formats.split(',') if file_formats != '0' else []
max_workers = int(max_workers)
# 打印配置文件内容
print(
f"读取的配置参数: 扫描目录: {config['ScanSettings'].get('directories', '')} 文件大小范围: {size_range_str} MB 文件格式: {config['ScanSettings'].get('file_formats', '')} 线程数: {max_workers}")
return directories, (min_size, max_size), file_formats_list, max_workers
def get_md5(file_path):
try:
with open(file_path, 'rb') as f:
chunk = f.read(CHUNK_SIZE)
md5 = hashlib.md5(chunk).hexdigest()
return md5
except IOError as e:
print(f"读取文件时出错: {file_path}, 错误: {e}")
return None
def format_file_size(file_size):
if (file_size < 1024):
return f"{file_size} 字节"
elif (file_size < 1024 ** 2):
return f"{file_size / 1024:.2f} KB"
elif (file_size < 1024 ** 3):
return f"{file_size / (1024 ** 2):.2f} MB"
else:
return f"{file_size / (1024 ** 3):.2f} GB"
def should_scan(file_path, size_range, file_formats):
file_size = os.path.getsize(file_path)
min_size, max_size = size_range
if (file_size < min_size or (max_size > 0 and file_size > max_size)):
return False
if file_formats:
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext not in file_formats:
return False
return True
def process_file(file_path, size_range, file_formats):
global task_counter
if not should_scan(file_path, size_range, file_formats):
return
md5 = get_md5(file_path)
if md5:
md5_dict[md5].append(file_path)
file_size = os.path.getsize(file_path)
formatted_size = format_file_size(file_size)
logging.info(f"正在处理第 {task_counter} 个文件: {file_path} | 文件大小: {formatted_size} | MD5值: {md5}")
task_counter += 1
def find_duplicates(directories, size_range, file_formats, max_workers=10):
start_time = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_file = {
executor.submit(process_file, os.path.join(dirpath, filename), size_range, file_formats): filename
for root_dir in directories
for dirpath, _, filenames in os.walk(root_dir)
for filename in filenames
}
try:
for future in as_completed(future_to_file):
try:
future.result()
except Exception as e:
logging.error(f"文件处理时出错: {e}")
except KeyboardInterrupt:
executor.shutdown(wait=False)
logging.warning(" 扫描被中断。正在清理...")
end_time = time.time()
elapsed_time = end_time start_time
elapsed = timedelta(seconds=elapsed_time)
days, remainder = divmod(elapsed.total_seconds(), 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
logging.info(f"扫描结束,耗时 {int(days)} 天 {int(hours)} 小时 {int(minutes)} 分 {int(seconds)} 秒。")
print_duplicates()
def print_duplicates():
found_duplicates = False
data = []
for md5, files in md5_dict.items():
if len(files) > 1:
found_duplicates = True
for file in files:
file_size = os.path.getsize(file)
formatted_size = format_file_size(file_size)
data.append({"文件名": os.path.basename(file), "文件大小": formatted_size, "MD5": md5, "路径": file})
if not found_duplicates:
logging.info(" 未找到重复文件。")
else:
generate_report(data)
def generate_report(data):
df = pd.DataFrame(data)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"重复文件报告_{timestamp}.xlsx"
df.to_excel(report_file, index=False)
print(f"重复文件报告已生成:{report_file}")
prompt_open_report(report_file)
def prompt_open_report(report_file):
user_input = input("是否需要打开报告文件?1:打开,0:退出 请输入:")
if user_input == '1':
open_file(report_file)
else:
print("程序结束。")
if __name__ == "__main__":
while True:
config_values = read_config()
if isinstance(config_values, str):
print(config_values)
user_input = input("是否需要打开配置文件?1:打开,0:退出程序 请输入:")
if user_input == '1':
open_file('Duplicate_config.ini')
else:
break
else:
directories, size_range, file_formats, max_workers = config_values
print("扫描开始,请稍候...")
find_duplicates(directories, size_range, file_formats, max_workers=max_workers)
break
user_input = input("1: 重新运行程序, 0: 退出 请输入:")
if user_input == '0':
break
复制代码
链接:通过百度云资源分享的文件:DuplicateFileSearch.exe
链接:https://pan.baidu.com/s/1iSN1PClYyXhSLMoxgJwFqw?pwd=fuli
提取码:fuli
来自百度云资源超级会员V9的分享
如有bug,请指出,谢谢,
