Files
REPO_TEST/common/file_utils.py
2025-07-23 20:41:13 +08:00

156 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding:utf-8 -*-
# 文件处理工具模块
import os
import bz2
import gzip
import lzma
import shutil
import logging
from typing import Optional, List, Tuple, Union, IO
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("file_utils")
class FileUtils:
"""文件处理工具类,处理文件操作和压缩文件"""
@staticmethod
def ensure_dir_exists(directory: str) -> bool:
"""
确保目录存在,如果不存在则创建
Args:
directory: 目录路径
Returns:
是否成功创建或已存在
"""
try:
if not os.path.exists(directory):
os.makedirs(directory)
return True
except Exception as e:
logger.error(f"创建目录失败: {directory}, 错误: {str(e)}")
return False
@staticmethod
def extract_bz2(source_path: str, target_path: str) -> bool:
"""
解压BZ2文件
Args:
source_path: BZ2文件路径
target_path: 解压后文件路径
Returns:
是否解压成功
"""
try:
logger.info(f"解压BZ2文件: {source_path} -> {target_path}")
with bz2.BZ2File(source_path) as bz2_file:
data = bz2_file.read()
with open(target_path, 'wb') as f:
f.write(data)
return True
except Exception as e:
logger.error(f"解压BZ2文件失败: {source_path}, 错误: {str(e)}")
return False
@staticmethod
def extract_xz(source_path: str, target_path: str) -> bool:
"""
解压XZ文件
Args:
source_path: XZ文件路径
target_path: 解压后文件路径
Returns:
是否解压成功
"""
try:
logger.info(f"解压XZ文件: {source_path} -> {target_path}")
with lzma.open(source_path, 'rb') as input_file:
with open(target_path, 'wb') as output_file:
shutil.copyfileobj(input_file, output_file)
return True
except Exception as e:
logger.error(f"解压XZ文件失败: {source_path}, 错误: {str(e)}")
return False
@staticmethod
def extract_gz(source_path: str, target_path: str) -> bool:
"""
解压GZ文件
Args:
source_path: GZ文件路径
target_path: 解压后文件路径
Returns:
是否解压成功
"""
try:
logger.info(f"解压GZ文件: {source_path} -> {target_path}")
with gzip.open(source_path, 'rb') as input_file:
with open(target_path, 'wb') as output_file:
shutil.copyfileobj(input_file, output_file)
return True
except Exception as e:
logger.error(f"解压GZ文件失败: {source_path}, 错误: {str(e)}")
return False
@staticmethod
def write_to_file(file_path: str, content: str, mode: str = 'w', encoding: str = 'utf-8') -> bool:
"""
写入内容到文件
Args:
file_path: 文件路径
content: 文件内容
mode: 文件打开模式
encoding: 编码方式
Returns:
是否写入成功
"""
try:
# 确保目录存在
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
with open(file_path, mode, encoding=encoding) as f:
f.write(content)
return True
except Exception as e:
logger.error(f"写入文件失败: {file_path}, 错误: {str(e)}")
return False
@staticmethod
def read_file(file_path: str, mode: str = 'r', encoding: Optional[str] = 'utf-8') -> Optional[Union[str, bytes]]:
"""
读取文件内容
Args:
file_path: 文件路径
mode: 文件打开模式
encoding: 编码方式二进制模式下为None
Returns:
文件内容失败返回None
"""
try:
with open(file_path, mode, encoding=encoding) as f:
return f.read()
except Exception as e:
logger.error(f"读取文件失败: {file_path}, 错误: {str(e)}")
return None