Merge pull request #5071 from wikrin/optimize-file-size

This commit is contained in:
jxxghp
2025-10-23 22:54:43 +08:00
committed by GitHub
4 changed files with 88 additions and 49 deletions

View File

@@ -7,7 +7,6 @@ import shutil
import subprocess
import sys
import uuid
from glob import glob
from pathlib import Path
from typing import List, Optional, Tuple, Union
@@ -225,23 +224,31 @@ class SystemUtils:
if directory.is_file():
return [directory]
if not min_filesize:
min_filesize = 0
files = []
# 预编译正则表达式
if extensions:
pattern = r".*(" + "|".join(extensions) + ")$"
pattern = re.compile(r".*(" + "|".join(extensions) + r")$", re.IGNORECASE)
else:
pattern = r".*"
pattern = re.compile(r".*")
# 遍历目录及子目录
for matched_glob in glob('**', root_dir=directory, recursive=recursive, include_hidden=True):
path = directory.joinpath(matched_glob)
if path.is_file() \
and re.match(pattern, path.name, re.IGNORECASE) \
and path.stat().st_size >= min_filesize * 1024 * 1024:
files.append(path)
def _scan_directory(dir_path: Path, is_recursive: bool):
try:
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_file(follow_symlinks=False):
entry_path = Path(entry.path)
if (pattern.match(entry.name) and
(min_filesize <= 0 or entry.stat().st_size >= min_filesize * 1024 * 1024)):
files.append(entry_path)
elif entry.is_dir() and is_recursive:
_scan_directory(Path(entry.path), is_recursive)
except (OSError, PermissionError):
continue
except (OSError, PermissionError):
pass
_scan_directory(directory, recursive)
return files
@staticmethod
@@ -256,29 +263,44 @@ class SystemUtils:
:return: True存在 False不存在
"""
if not min_filesize:
min_filesize = 0
if not directory.exists():
return False
# 预编译正则表达式
if extensions:
pattern = re.compile(r".*(" + "|".join(extensions) + r")$", re.IGNORECASE)
else:
pattern = re.compile(r".*")
if directory.is_file():
# 检查单个文件是否符合条件
if extensions and not pattern.match(directory.name):
return False
if min_filesize > 0 and directory.stat().st_size < min_filesize * 1024 * 1024:
return False
return True
if not min_filesize:
min_filesize = 0
def _search_files(dir_path: Path, is_recursive: bool) -> bool:
try:
with os.scandir(dir_path) as entries:
for entry in entries:
try:
if entry.is_file(follow_symlinks=False):
# 检查文件是否符合条件
if (pattern.match(entry.name) and
(min_filesize <= 0 or entry.stat().st_size >= min_filesize * 1024 * 1024)):
return True
elif entry.is_dir() and is_recursive:
# 递归搜索子目录
if _search_files(Path(entry.path), is_recursive):
return True
except (OSError, PermissionError):
continue
except (OSError, PermissionError):
pass
return False
pattern = r".*(" + "|".join(extensions) + ")$"
# 遍历目录及子目录
for matched_glob in glob('**', root_dir=directory, recursive=recursive, include_hidden=True):
path = directory.joinpath(matched_glob)
if path.is_file() \
and re.match(pattern, path.name, re.IGNORECASE) \
and path.stat().st_size >= min_filesize * 1024 * 1024:
return True
return False
return _search_files(directory, recursive)
@staticmethod
def list_sub_files(directory: Path, extensions: list) -> List[Path]:
@@ -292,12 +314,20 @@ class SystemUtils:
return [directory]
files = []
pattern = r".*(" + "|".join(extensions) + ")$"
# 遍历目录
for path in directory.iterdir():
if path.is_file() and re.match(pattern, path.name, re.IGNORECASE):
files.append(path)
# 预编译正则表达式
if extensions:
pattern = re.compile(r".*(" + "|".join(extensions) + r")$", re.IGNORECASE)
else:
pattern = re.compile(r".*")
try:
with os.scandir(directory) as entries:
for entry in entries:
if entry.is_file() and pattern.match(entry.name):
files.append(Path(entry.path))
except OSError:
pass
return files
@@ -346,7 +376,7 @@ class SystemUtils:
return items
@staticmethod
def get_directory_size(path: Path) -> float:
def get_directory_size(path: Path) -> int:
"""
计算目录的大小
@@ -358,14 +388,21 @@ class SystemUtils:
"""
if not path or not path.exists():
return 0
if path.is_file():
return path.stat().st_size
total_size = 0
for path in path.glob('**/*'):
if path.is_file():
total_size += path.stat().st_size
return total_size
def _calc_dir_size(dir_path):
total = 0
try:
with os.scandir(dir_path) as entries:
for entry in entries:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += _calc_dir_size(entry.path)
except OSError:
pass
return total
return _calc_dir_size(path) if path.is_dir() else path.stat().st_size
@staticmethod
def space_usage(dir_list: Union[Path, List[Path]]) -> Tuple[float, float]: