612 lines
23 KiB
Python
612 lines
23 KiB
Python
import os
|
||
import webbrowser
|
||
import threading
|
||
import tkinter as tk
|
||
from tkinter import ttk, filedialog, messagebox
|
||
from queue import Queue
|
||
from send2trash import send2trash
|
||
import subprocess
|
||
import logging
|
||
import requests
|
||
|
||
logging.basicConfig( # 配置日志记录器
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
handlers=[
|
||
logging.FileHandler("scanner.log", encoding="utf-8"),
|
||
logging.StreamHandler(),
|
||
],
|
||
)
|
||
|
||
proxy = {"http": None, "https": None} # Requests代理设置
|
||
|
||
|
||
def format_size(size): # 格式化文件/目录大小
|
||
units = ["B", "KB", "MB", "GB", "TB"]
|
||
index = 0
|
||
while size >= 1024 and index < len(units) - 1:
|
||
size /= 1024.0
|
||
index += 1
|
||
return f"{size:.2f} {units[index]}"
|
||
|
||
|
||
class FolderScanner(threading.Thread):
|
||
def __init__(self, start_path, queue, progress_queue):
|
||
threading.Thread.__init__(self)
|
||
self.start_path = start_path # 扫描路径
|
||
self.queue = queue # 用于存储扫描结果
|
||
self.progress_queue = progress_queue # 用于存储扫描进度
|
||
self.total_items = 0 # 总共扫描的文件/目录数量
|
||
|
||
def run(self): # 扫描文件/目录
|
||
logging.info(f"开始扫描目录: {self.start_path}")
|
||
self.total_items = sum(
|
||
[len(files) + len(dirs) for r, dirs, files in os.walk(self.start_path)]
|
||
) # 计算总共扫描的文件/目录数量
|
||
self.scan_folder(self.start_path) # 扫描文件/目录
|
||
self.queue.put(None) # 扫描结束
|
||
logging.info(f"扫描完成: {self.start_path}")
|
||
|
||
def scan_folder(self, folder):
|
||
scanned_items = 0
|
||
for dirpath, dirnames, filenames in os.walk(folder):
|
||
for dirname in dirnames:
|
||
folder_path = os.path.join(dirpath, dirname)
|
||
size = self.get_size(folder_path)
|
||
self.queue.put((folder_path, size, dirpath, "folder"))
|
||
scanned_items += 1
|
||
self.update_progress(scanned_items, folder_path)
|
||
logging.info(f"扫描目录: {folder_path} 大小: {format_size(size)} ")
|
||
|
||
for filename in filenames:
|
||
file_path = os.path.join(dirpath, filename)
|
||
size = os.path.getsize(file_path)
|
||
self.queue.put((file_path, size, dirpath, "file"))
|
||
scanned_items += 1
|
||
self.update_progress(scanned_items, file_path)
|
||
logging.info(f"扫描文件: {file_path} 大小: {format_size(size)} ")
|
||
|
||
def get_size(self, path): # 获取文件/目录大小
|
||
total_size = 0
|
||
for dirpath, dirnames, filenames in os.walk(path):
|
||
for f in filenames:
|
||
fp = os.path.join(dirpath, f)
|
||
total_size += os.path.getsize(fp)
|
||
return total_size
|
||
|
||
def update_progress(self, scanned_items, current_path): # 更新扫描进度
|
||
progress = (scanned_items / self.total_items) * 100
|
||
self.progress_queue.put(
|
||
(progress, scanned_items, self.total_items, current_path)
|
||
)
|
||
|
||
|
||
class App(tk.Tk):
|
||
def __init__(self, start_path): # 初始化
|
||
super().__init__()
|
||
self.withdraw()
|
||
self.title("目录扫描器 V1.4.2")
|
||
self.start_path = start_path
|
||
self.queue = Queue()
|
||
self.progress_queue = Queue()
|
||
self.progress_window = ProgressWindow(self)
|
||
|
||
self.scanner = FolderScanner(start_path, self.queue, self.progress_queue)
|
||
self.scanner.start()
|
||
|
||
self.style = ttk.Style(self)
|
||
self.style.configure("Treeview", font=("Helvetica", 10), rowheight=25)
|
||
self.style.configure("Treeview.Heading", font=("Helvetica", 12, "bold"))
|
||
|
||
self.tree = ttk.Treeview(self, style="Treeview")
|
||
self.tree.heading("#0", text="目录/文件名(目录/文件大小)", anchor="w")
|
||
self.tree.tag_configure("folder", foreground="orange")
|
||
self.tree.tag_configure("file", foreground="black")
|
||
self.tree.pack(fill=tk.BOTH, expand=True)
|
||
|
||
self.folder_first_var = tk.BooleanVar()
|
||
self.folder_first_checkbutton = tk.Checkbutton(
|
||
self,
|
||
text="目录优先显示",
|
||
variable=self.folder_first_var,
|
||
command=self.update_folder_first_var,
|
||
)
|
||
|
||
self.tree.bind("<Double-1>", self.on_double_click)
|
||
self.tree.bind("<Return>", self.on_enter_press)
|
||
self.tree.bind("<Button-3>", self.show_context_menu)
|
||
|
||
self.size_sort_order = True
|
||
self.name_sort_order = True
|
||
self.is_sorting = False
|
||
|
||
self.populate_root()
|
||
self.update_tree()
|
||
self.update_progress()
|
||
|
||
self.protocol("WM_DELETE_WINDOW", self.on_close)
|
||
self.set_window_size()
|
||
self.create_menu()
|
||
|
||
def create_menu(self):
|
||
menu_bar = tk.Menu(self)
|
||
self.config(menu=menu_bar)
|
||
|
||
file_menu = tk.Menu(menu_bar, tearoff=0)
|
||
menu_bar.add_cascade(label="文件", menu=file_menu)
|
||
file_menu.add_command(label="选择目录", command=self.reselect_directory)
|
||
file_menu.add_separator()
|
||
file_menu.add_command(label="退出", command=self.on_close)
|
||
|
||
view_menu = tk.Menu(menu_bar, tearoff=0)
|
||
menu_bar.add_cascade(label="视图", menu=view_menu)
|
||
view_menu.add_checkbutton(
|
||
label="目录优先显示",
|
||
variable=self.folder_first_var,
|
||
command=self.update_folder_first_var,
|
||
)
|
||
view_menu.add_command(
|
||
label="根据目录/文件大小排序",
|
||
command=self.sort_by_size,
|
||
)
|
||
view_menu.add_command(
|
||
label="根据目录/文件名排序",
|
||
command=self.sort_by_size,
|
||
)
|
||
view_menu.add_command(
|
||
label="刷新列表",
|
||
command=self.refresh_tree,
|
||
)
|
||
|
||
help_menu = tk.Menu(menu_bar, tearoff=0)
|
||
menu_bar.add_cascade(label="帮助", menu=help_menu)
|
||
help_menu.add_command(label="帮助", command=self.show_help)
|
||
help_menu.add_command(label="更新日志", command=self.show_update_log)
|
||
help_menu.add_command(label="扫描日志", command=self.show_log)
|
||
help_menu.add_command(label="检查更新", command=self.check_updates)
|
||
help_menu.add_command(label="关于", command=self.show_about)
|
||
|
||
def show_help(self):
|
||
messagebox.showinfo(
|
||
"帮助",
|
||
"1. 双击文件夹可以展开/折叠\n2. 双击文件可以直接打开\n3. 右键单击文件/文件夹可以打开、删除文件/文件夹、进入所在目录\n4. 点击左上角文件菜单可以重新选择目录进行扫描\n5. 点击左上角视图菜单可以根据目录/文件大小、目录/文件名排序、刷新列表\n",
|
||
)
|
||
|
||
def show_update_log(self):
|
||
repo_owner = "ahdoawhfo"
|
||
repo_name = "SpaceSniffer"
|
||
base_url = "https://git.a6.wiki/api/v1/repos"
|
||
|
||
try:
|
||
# 发送 GET 请求获取最新的 Release
|
||
response = requests.get(
|
||
f"{base_url}/{repo_owner}/{repo_name}/releases/latest", proxies=proxy
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
release_info = response.json()
|
||
latest_version = release_info["tag_name"]
|
||
release_notes = release_info["body"]
|
||
messagebox.showinfo(
|
||
"更新日志", f"版本: {latest_version}\n\n{release_notes}"
|
||
)
|
||
else:
|
||
messagebox.showinfo(
|
||
"更新日志",
|
||
f"获取更新日志失败:{response.status_code}",
|
||
)
|
||
except requests.RequestException as e:
|
||
messagebox.showinfo("更新日志", f"获取更新日志时发生错误:{e}")
|
||
|
||
def show_log(self):
|
||
log_content = self.read_log_file() # 从日志文件中读取日志内容
|
||
log_window = tk.Toplevel(self)
|
||
log_window.title("扫描日志")
|
||
log_window.geometry("1000x400")
|
||
|
||
text_widget = tk.Text(log_window, wrap=tk.WORD)
|
||
text_widget.pack(expand=True, fill=tk.BOTH)
|
||
|
||
scrollbar = tk.Scrollbar(text_widget)
|
||
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
|
||
text_widget.config(yscrollcommand=scrollbar.set)
|
||
scrollbar.config(command=text_widget.yview)
|
||
|
||
text_widget.insert(tk.END, log_content)
|
||
text_widget.config(state=tk.DISABLED)
|
||
|
||
def read_log_file(self):
|
||
log_file_path = "scanner.log" # 日志文件路径
|
||
if os.path.exists(log_file_path):
|
||
try:
|
||
with open(log_file_path, "r", encoding="utf-8") as log_file:
|
||
return log_file.read()
|
||
except UnicodeDecodeError:
|
||
return "日志文件编码错误,无法解析。"
|
||
else:
|
||
return "日志文件不存在。"
|
||
|
||
def check_updates(self):
|
||
# 创建一个新的 UpdateManager 实例,不销毁程序
|
||
update_manager = UpdateManager(self, is_startup=False)
|
||
update_manager.check_latest_release()
|
||
|
||
def show_about(self):
|
||
messagebox.showinfo(
|
||
"关于",
|
||
"一款开源的文件/目录扫描工具\n可以直观地展示目录的文件结构以及文件大小\n\n软件版本 V1.4.2\n作者 ahdoawhfo\n",
|
||
)
|
||
|
||
def on_double_click(self, event): # V1.2 Update:当双击的是文件时,打开文件
|
||
item = self.tree.identify_row(event.y)
|
||
if item:
|
||
if os.path.isfile(item):
|
||
self.open_item(item)
|
||
|
||
def on_enter_press(self, event): # V1.2 Update:当回车的是文件时,打开文件
|
||
item = self.tree.selection()[0]
|
||
if os.path.isfile(item):
|
||
self.open_item(item)
|
||
|
||
def reselect_directory(self): # V1.2 Update:重新选择目录
|
||
new_path = filedialog.askdirectory()
|
||
if new_path:
|
||
self.start_path = new_path
|
||
self.refresh_tree()
|
||
|
||
def populate_root(self): # 初始化根目录
|
||
size = self.scanner.get_size(self.start_path)
|
||
formatted_size = format_size(size)
|
||
self.tree.insert(
|
||
"",
|
||
"end",
|
||
iid=self.start_path,
|
||
text=f"{os.path.basename(self.start_path)} ({formatted_size})",
|
||
tags=("folder",),
|
||
open=True,
|
||
)
|
||
|
||
def update_tree(self): # 更新树状图
|
||
if not self.is_sorting:
|
||
while not self.queue.empty():
|
||
item = self.queue.get()
|
||
if item is None: # Scanning is done
|
||
self.progress_window.destroy()
|
||
self.deiconify()
|
||
return
|
||
|
||
path, size, parent, tag = item
|
||
formatted_size = format_size(size)
|
||
parent_iid = self.get_iid(parent)
|
||
if parent == self.start_path:
|
||
parent_iid = self.start_path
|
||
if not self.tree.exists(self.get_iid(path)):
|
||
self.tree.insert(
|
||
parent_iid,
|
||
"end",
|
||
iid=self.get_iid(path),
|
||
text=f"{os.path.basename(path)} ({formatted_size})",
|
||
tags=(tag,),
|
||
open=False,
|
||
)
|
||
self.after(100, self.update_tree)
|
||
|
||
def update_progress(self): # 更新扫描进度
|
||
if not self.progress_window.winfo_exists(): # Check if the window still exists
|
||
return
|
||
while not self.progress_queue.empty():
|
||
progress, scanned_items, total_items, current_path = (
|
||
self.progress_queue.get()
|
||
)
|
||
self.progress_window.progress_var.set(progress)
|
||
self.progress_window.progress_bar["value"] = progress
|
||
self.progress_window.progress_label.config(
|
||
text=f"扫描进度: {scanned_items} / {total_items}"
|
||
)
|
||
self.progress_window.path_label.config(text=f"正在扫描: {current_path}")
|
||
|
||
self.after(100, self.update_progress)
|
||
|
||
def get_iid(self, path): # 获取目录/文件的 iid
|
||
return path
|
||
|
||
def sort_by_size(self): # 根据目录/文件大小排序
|
||
self.sort_items(
|
||
lambda x: (
|
||
0 if x[1] == "folder" and self.folder_first_var.get() else 1,
|
||
(
|
||
-float(x[0].split()[-2].replace("(", ""))
|
||
if self.size_sort_order
|
||
else float(x[0].split()[-2].replace("(", ""))
|
||
),
|
||
),
|
||
False,
|
||
)
|
||
self.size_sort_order = not self.size_sort_order
|
||
|
||
def sort_by_name(self): # 根据目录/文件名排序
|
||
self.sort_items(
|
||
lambda x: (
|
||
0 if x[1] == "folder" and self.folder_first_var.get() else 1,
|
||
x[0].lower(),
|
||
),
|
||
self.name_sort_order,
|
||
)
|
||
self.name_sort_order = not self.name_sort_order
|
||
|
||
def sort_items(self, key, reverse): # 排序
|
||
if self.folder_first_var.get():
|
||
selected = self.tree.selection()
|
||
if not selected:
|
||
messagebox.showinfo("提示", "请选择你要排序的目录")
|
||
return
|
||
|
||
self.scanner.join()
|
||
self.is_sorting = True
|
||
|
||
for parent in selected:
|
||
children = self.tree.get_children(parent)
|
||
folder_items = []
|
||
file_items = []
|
||
|
||
for child in children:
|
||
item = (
|
||
self.tree.item(child)["text"],
|
||
self.tree.item(child)["tags"][0],
|
||
child,
|
||
)
|
||
if item[1] == "folder":
|
||
folder_items.append(item)
|
||
else:
|
||
file_items.append(item)
|
||
|
||
sorted_items = sorted(folder_items, key=key, reverse=reverse) + sorted(
|
||
file_items, key=key, reverse=reverse
|
||
)
|
||
|
||
for item in sorted_items:
|
||
self.tree.detach(item[2])
|
||
for index, item in enumerate(sorted_items):
|
||
self.tree.move(item[2], parent, index)
|
||
else:
|
||
selected = self.tree.selection()
|
||
if not selected:
|
||
messagebox.showinfo("提示", "请选择你要排序的目录")
|
||
return
|
||
|
||
self.scanner.join()
|
||
self.is_sorting = True
|
||
|
||
for parent in selected:
|
||
children = self.tree.get_children(parent)
|
||
items = [
|
||
(
|
||
self.tree.item(child)["text"],
|
||
self.tree.item(child)["tags"][0],
|
||
child,
|
||
)
|
||
for child in children
|
||
]
|
||
sorted_items = sorted(items, key=key, reverse=reverse)
|
||
|
||
for item in sorted_items:
|
||
self.tree.detach(item[2])
|
||
for index, item in enumerate(sorted_items):
|
||
self.tree.move(item[2], parent, index)
|
||
|
||
def show_context_menu(self, event): # 显示右键菜单
|
||
item = self.tree.identify_row(event.y)
|
||
if item:
|
||
self.tree.selection_set(item)
|
||
menu = tk.Menu(self, tearoff=0)
|
||
menu.add_command(
|
||
label="打开文件", command=lambda: self.open_item(item, "Open File")
|
||
)
|
||
menu.add_command(
|
||
label="打开所在目录",
|
||
command=lambda: self.open_directory(item, "Open Containing Folder"),
|
||
)
|
||
menu.add_command(
|
||
label="删除文件/目录(放入回收站)",
|
||
command=lambda: self.delete_item(item),
|
||
)
|
||
menu.post(event.x_root, event.y_root)
|
||
|
||
def open_item(self, item, context_menu_action=None): # 打开文件/目录
|
||
name = self.tree.item(item, "text").split(" ")[0]
|
||
try:
|
||
if os.path.isdir(item):
|
||
if context_menu_action == "Open File":
|
||
subprocess.Popen(f'explorer "{os.path.realpath(item)}"')
|
||
elif context_menu_action == "Open Containing Folder":
|
||
parent_folder = os.path.dirname(item)
|
||
subprocess.Popen(f'explorer "{os.path.realpath(parent_folder)}"')
|
||
else:
|
||
os.startfile(item)
|
||
except Exception as e:
|
||
messagebox.showerror("错误", f"无法打开 {name}\n{e}")
|
||
|
||
def open_directory(self, item, context_menu_action=None): # 打开所在目录
|
||
name = self.tree.item(item, "text").split(" ")[0]
|
||
if item == self.start_path:
|
||
parent_path = item
|
||
else:
|
||
parent_path = os.path.dirname(item)
|
||
try:
|
||
subprocess.Popen(f'explorer "{os.path.realpath(parent_path)}"')
|
||
except Exception as e:
|
||
messagebox.showerror("错误", f"无法打开 {name}\n{e}")
|
||
|
||
def delete_item(self, item): # 删除文件/目录
|
||
path = self.tree.item(item, "text").split(" ")[0]
|
||
full_path = os.path.normpath(os.path.join(self.start_path, path))
|
||
confirm = messagebox.askyesno("提示", f"你确定要删除 {full_path} 吗?")
|
||
if confirm:
|
||
try:
|
||
send2trash(full_path)
|
||
self.tree.delete(item)
|
||
except Exception as e:
|
||
messagebox.showerror("错误", f"无法删除 {full_path}\n{e}")
|
||
self.refresh_tree()
|
||
|
||
def refresh_tree(self): # 刷新树状图
|
||
self.is_sorting = False
|
||
self.queue.queue.clear()
|
||
self.progress_queue.queue.clear()
|
||
self.tree.delete(*self.tree.get_children())
|
||
self.populate_root()
|
||
self.scanner = FolderScanner(self.start_path, self.queue, self.progress_queue)
|
||
self.progress_window = ProgressWindow(self)
|
||
self.withdraw()
|
||
self.scanner.start()
|
||
self.update_tree()
|
||
self.update_progress()
|
||
|
||
def on_close(self): # 关闭程序
|
||
if self.scanner.is_alive():
|
||
self.scanner.join()
|
||
root.destroy()
|
||
exit()
|
||
|
||
def set_window_size(self): # 设置窗口大小
|
||
screen_width = self.winfo_screenwidth()
|
||
screen_height = self.winfo_screenheight()
|
||
window_width = int(screen_width * 0.6)
|
||
window_height = int(screen_height * 0.6)
|
||
self.geometry(
|
||
f"{window_width}x{window_height}+{screen_width//2 - window_width//2}+{screen_height//2 - window_height//2}"
|
||
)
|
||
self.update_font_size()
|
||
|
||
def update_font_size(self): # 更新字体大小
|
||
current_font = self.style.lookup("Treeview", "font")
|
||
font_size = int(current_font.split()[1])
|
||
screen_width = self.winfo_screenwidth()
|
||
new_font_size = int(screen_width / 100)
|
||
if new_font_size != font_size:
|
||
new_font = (current_font.split()[0], new_font_size)
|
||
self.style.configure("Treeview", font=new_font)
|
||
self.style.configure(
|
||
"Treeview.Heading",
|
||
font=(current_font.split()[0], new_font_size + 2, "bold"),
|
||
)
|
||
|
||
def update_folder_first_var(self): # 更新目录优先显示变量
|
||
self.folder_first_var.set(not self.folder_first_var.get())
|
||
|
||
|
||
class ProgressWindow(tk.Toplevel): # 进度窗口
|
||
def __init__(self, parent): # 初始化
|
||
super().__init__(parent)
|
||
self.title("扫描进度")
|
||
self.geometry("500x200")
|
||
self.label = tk.Label(
|
||
self, text="扫描中,请稍等", fg="deepskyblue", font=("Helvetica", 16)
|
||
)
|
||
self.label.pack(pady=20)
|
||
self.progress_var = tk.DoubleVar()
|
||
self.progress_bar = ttk.Progressbar(
|
||
self, variable=self.progress_var, maximum=100
|
||
)
|
||
self.progress_bar.pack(fill=tk.X, padx=20, pady=10)
|
||
self.progress_label = tk.Label(self, text="扫描进度: 0 / 0")
|
||
self.progress_label.pack(pady=5)
|
||
self.path_label = tk.Label(self, text="正在扫描: ")
|
||
self.path_label.pack(pady=5)
|
||
self.protocol("WM_DELETE_WINDOW", self.on_close)
|
||
|
||
def on_close(self):
|
||
pass # 禁止关闭进度窗口
|
||
|
||
|
||
class UpdateManager:
|
||
def __init__(self, root=None, is_startup=True):
|
||
self.root = root
|
||
self.is_startup = is_startup
|
||
|
||
def check_latest_release(self):
|
||
# Gitea 仓库信息和当前软件版本
|
||
repo_owner = "ahdoawhfo"
|
||
repo_name = "SpaceSniffer"
|
||
base_url = "https://git.a6.wiki/api/v1/repos"
|
||
current_version = "1.4.2" # 替换成你的当前软件版本
|
||
|
||
try:
|
||
# 发送 GET 请求获取最新的 Release
|
||
response = requests.get(
|
||
f"{base_url}/{repo_owner}/{repo_name}/releases/latest", proxies=proxy
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
release_info = response.json()
|
||
latest_version = release_info["tag_name"]
|
||
release_notes = release_info["body"]
|
||
|
||
# 比较最新版本和当前软件版本
|
||
if latest_version > current_version:
|
||
# 提示用户更新
|
||
choice = self.prompt_update(latest_version, release_notes)
|
||
if choice:
|
||
# 打开浏览器到 release 页面
|
||
release_url = release_info["html_url"]
|
||
webbrowser.open_new(release_url)
|
||
if self.is_startup:
|
||
return False
|
||
else:
|
||
return True
|
||
else:
|
||
if not self.is_startup: # 仅在用户点击检查更新时显示
|
||
self.show_message("检查更新", "当前版本已经是最新版本")
|
||
return True
|
||
else:
|
||
if not self.is_startup: # 仅在用户点击检查更新时显示
|
||
self.show_message(
|
||
"获取 Release 失败",
|
||
f"获取最新 Release 失败:{response.status_code}",
|
||
)
|
||
return True
|
||
except requests.RequestException as e:
|
||
if not self.is_startup: # 仅在用户点击检查更新时显示
|
||
self.show_message("请求错误", f"请求最新 Release 时发生错误:{e}")
|
||
return True
|
||
|
||
def prompt_update(self, latest_version, release_notes):
|
||
prompt_message = f"检查更新: {latest_version}\n\n更新说明:\n{release_notes}\n\n是否立即更新?"
|
||
choice = messagebox.askyesno("更新提示", prompt_message)
|
||
return choice
|
||
|
||
def show_message(self, title, message):
|
||
if self.root:
|
||
messagebox.showinfo(title, message)
|
||
else:
|
||
print(f"{title}: {message}")
|
||
|
||
|
||
def clear_log_file():
|
||
log_file = "scanner.log"
|
||
try:
|
||
with open(log_file, "w", encoding="utf-8") as f:
|
||
f.truncate(0) # 清空文件内容
|
||
print(f"已清空日志文件 {log_file} 的内容")
|
||
except Exception as e:
|
||
print(f"清空日志文件内容时发生错误: {e}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
clear_log_file()
|
||
root = tk.Tk()
|
||
root.withdraw()
|
||
update_manager = UpdateManager(root, is_startup=True)
|
||
proceed = update_manager.check_latest_release()
|
||
if not proceed:
|
||
root.destroy()
|
||
exit()
|
||
start_path = filedialog.askdirectory()
|
||
if start_path:
|
||
app = App(start_path)
|
||
app.mainloop()
|
||
else:
|
||
messagebox.showinfo("提示", "没有选中任何目录,程序即将退出")
|
||
root.destroy()
|
||
exit()
|