Files
MoviePilot/app/utils/crypto.py
2025-03-23 19:09:48 +08:00

196 lines
7.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import hashlib
from hashlib import md5
from typing import Union, Optional, Tuple
from Crypto import Random
from Crypto.Cipher import AES
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding, rsa
class RSAUtils:
@staticmethod
def generate_rsa_key_pair(key_size: int = 2048) -> Tuple[str, str]:
"""
生成RSA密钥对
:return: 私钥和公钥Base64 编码,无标识符)
"""
# 生成RSA密钥对
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
)
public_key = private_key.public_key()
# 导出私钥为DER格式
private_key_der = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
# 导出公钥为DER格式
public_key_der = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# 将DER格式的密钥编码为Base64
private_key_b64 = base64.b64encode(private_key_der).decode("utf-8")
public_key_b64 = base64.b64encode(public_key_der).decode("utf-8")
return private_key_b64, public_key_b64
@staticmethod
def verify_rsa_keys(private_key: Optional[str], public_key: Optional[str]) -> bool:
"""
使用 RSA 验证私钥和公钥是否匹配
:param private_key: 私钥字符串 (Base64 编码,无标识符)
:param public_key: 公钥字符串 (Base64 编码,无标识符)
:return: 如果匹配则返回 True否则返回 False
"""
if not private_key or not public_key:
return False
try:
# 解码 Base64 编码的公钥和私钥
public_key_bytes = base64.b64decode(public_key)
private_key_bytes = base64.b64decode(private_key)
# 加载公钥
public_key = serialization.load_der_public_key(public_key_bytes, backend=default_backend())
# 加载私钥
private_key = serialization.load_der_private_key(private_key_bytes, password=None,
backend=default_backend())
# 测试加解密
message = b'test'
encrypted_message = public_key.encrypt(
message,
asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
decrypted_message = private_key.decrypt(
encrypted_message,
asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return message == decrypted_message
except Exception as e:
print(f"RSA 密钥验证失败: {e}")
return False
class HashUtils:
@staticmethod
def md5(data: Union[str, bytes], encoding: str = "utf-8") -> str:
"""
生成数据的MD5哈希值并以字符串形式返回
:param data: 输入的数据,类型为字符串
:param encoding: 字符串编码类型默认使用UTF-8
:return: 生成的MD5哈希字符串
"""
if isinstance(data, str):
data = data.encode(encoding)
return hashlib.md5(data).hexdigest()
@staticmethod
def md5_bytes(data: Union[str, bytes], encoding: str = "utf-8") -> bytes:
"""
生成数据的MD5哈希值并以字节形式返回
:param data: 输入的数据,类型为字符串
:param encoding: 字符串编码类型默认使用UTF-8
:return: 生成的MD5哈希二进制数据
"""
if isinstance(data, str):
data = data.encode(encoding)
return hashlib.md5(data).digest()
class CryptoJsUtils:
@staticmethod
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
"""
生成加密/解密所需的密钥和初始化向量 (IV)
"""
# extended from https://gist.github.com/gsakkis/4546068
assert len(salt) == 8, len(salt)
data += salt
key = md5(data).digest()
final_key = key
while len(final_key) < output:
key = md5(key + data).digest()
final_key += key
return final_key[:output]
@staticmethod
def encrypt(message: bytes, passphrase: bytes) -> bytes:
"""
使用 CryptoJS 兼容的加密策略对消息进行加密
"""
# This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras
# 生成8字节的随机盐值
salt = Random.new().read(8)
# 通过密码短语和盐值生成密钥和IV
key_iv = CryptoJsUtils.bytes_to_key(passphrase, salt, 32 + 16)
key = key_iv[:32]
iv = key_iv[32:]
# 创建AES加密器CBC模式
aes = AES.new(key, AES.MODE_CBC, iv)
# 应用PKCS#7填充
padding_length = 16 - (len(message) % 16)
padding = bytes([padding_length] * padding_length)
padded_message = message + padding
# 加密消息
encrypted = aes.encrypt(padded_message)
# 构建加密数据格式b"Salted__" + salt + encrypted_message
salted_encrypted = b"Salted__" + salt + encrypted
# 返回Base64编码的加密数据
return base64.b64encode(salted_encrypted)
@staticmethod
def decrypt(encrypted: Union[str, bytes], passphrase: bytes) -> bytes:
"""
使用 CryptoJS 兼容的解密策略对加密消息进行解密
"""
# 确保输入是字节类型
if isinstance(encrypted, str):
encrypted = encrypted.encode("utf-8")
# Base64 解码
encrypted = base64.b64decode(encrypted)
# 检查前8字节是否为 "Salted__"
assert encrypted.startswith(b"Salted__"), "Invalid encrypted data format"
# 提取盐值
salt = encrypted[8:16]
# 通过密码短语和盐值生成密钥和IV
key_iv = CryptoJsUtils.bytes_to_key(passphrase, salt, 32 + 16)
key = key_iv[:32]
iv = key_iv[32:]
# 创建AES解密器CBC模式
aes = AES.new(key, AES.MODE_CBC, iv)
# 解密加密部分
decrypted_padded = aes.decrypt(encrypted[16:])
# 移除PKCS#7填充
padding_length = decrypted_padded[-1]
if isinstance(padding_length, str):
padding_length = ord(padding_length)
decrypted = decrypted_padded[:-padding_length]
return decrypted