mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-02 18:22:39 +08:00
196 lines
7.0 KiB
Python
196 lines
7.0 KiB
Python
import base64
|
||
import hashlib
|
||
from hashlib import md5
|
||
from typing import Union, Optional, Tuple
|
||
|
||
from Crypto import Random
|
||
from Crypto.Cipher import AES
|
||
from cryptography.hazmat.backends import default_backend
|
||
from cryptography.hazmat.primitives import hashes, serialization
|
||
from cryptography.hazmat.primitives.asymmetric import padding as asym_padding, rsa
|
||
|
||
|
||
class RSAUtils:
|
||
|
||
@staticmethod
|
||
def generate_rsa_key_pair(key_size: int = 2048) -> Tuple[str, str]:
|
||
"""
|
||
生成RSA密钥对
|
||
:return: 私钥和公钥(Base64 编码,无标识符)
|
||
"""
|
||
# 生成RSA密钥对
|
||
private_key = rsa.generate_private_key(
|
||
public_exponent=65537,
|
||
key_size=key_size,
|
||
)
|
||
|
||
public_key = private_key.public_key()
|
||
|
||
# 导出私钥为DER格式
|
||
private_key_der = private_key.private_bytes(
|
||
encoding=serialization.Encoding.DER,
|
||
format=serialization.PrivateFormat.PKCS8,
|
||
encryption_algorithm=serialization.NoEncryption()
|
||
)
|
||
|
||
# 导出公钥为DER格式
|
||
public_key_der = public_key.public_bytes(
|
||
encoding=serialization.Encoding.DER,
|
||
format=serialization.PublicFormat.SubjectPublicKeyInfo
|
||
)
|
||
|
||
# 将DER格式的密钥编码为Base64
|
||
private_key_b64 = base64.b64encode(private_key_der).decode("utf-8")
|
||
public_key_b64 = base64.b64encode(public_key_der).decode("utf-8")
|
||
|
||
return private_key_b64, public_key_b64
|
||
|
||
@staticmethod
|
||
def verify_rsa_keys(private_key: Optional[str], public_key: Optional[str]) -> bool:
|
||
"""
|
||
使用 RSA 验证私钥和公钥是否匹配
|
||
|
||
:param private_key: 私钥字符串 (Base64 编码,无标识符)
|
||
:param public_key: 公钥字符串 (Base64 编码,无标识符)
|
||
:return: 如果匹配则返回 True,否则返回 False
|
||
"""
|
||
if not private_key or not public_key:
|
||
return False
|
||
|
||
try:
|
||
# 解码 Base64 编码的公钥和私钥
|
||
public_key_bytes = base64.b64decode(public_key)
|
||
private_key_bytes = base64.b64decode(private_key)
|
||
|
||
# 加载公钥
|
||
public_key = serialization.load_der_public_key(public_key_bytes, backend=default_backend())
|
||
|
||
# 加载私钥
|
||
private_key = serialization.load_der_private_key(private_key_bytes, password=None,
|
||
backend=default_backend())
|
||
|
||
# 测试加解密
|
||
message = b'test'
|
||
encrypted_message = public_key.encrypt(
|
||
message,
|
||
asym_padding.OAEP(
|
||
mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
|
||
algorithm=hashes.SHA256(),
|
||
label=None
|
||
)
|
||
)
|
||
|
||
decrypted_message = private_key.decrypt(
|
||
encrypted_message,
|
||
asym_padding.OAEP(
|
||
mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
|
||
algorithm=hashes.SHA256(),
|
||
label=None
|
||
)
|
||
)
|
||
|
||
return message == decrypted_message
|
||
except Exception as e:
|
||
print(f"RSA 密钥验证失败: {e}")
|
||
return False
|
||
|
||
|
||
class HashUtils:
|
||
@staticmethod
|
||
def md5(data: Union[str, bytes], encoding: str = "utf-8") -> str:
|
||
"""
|
||
生成数据的MD5哈希值,并以字符串形式返回
|
||
|
||
:param data: 输入的数据,类型为字符串
|
||
:param encoding: 字符串编码类型,默认使用UTF-8
|
||
:return: 生成的MD5哈希字符串
|
||
"""
|
||
if isinstance(data, str):
|
||
data = data.encode(encoding)
|
||
return hashlib.md5(data).hexdigest()
|
||
|
||
@staticmethod
|
||
def md5_bytes(data: Union[str, bytes], encoding: str = "utf-8") -> bytes:
|
||
"""
|
||
生成数据的MD5哈希值,并以字节形式返回
|
||
|
||
:param data: 输入的数据,类型为字符串
|
||
:param encoding: 字符串编码类型,默认使用UTF-8
|
||
:return: 生成的MD5哈希二进制数据
|
||
"""
|
||
if isinstance(data, str):
|
||
data = data.encode(encoding)
|
||
return hashlib.md5(data).digest()
|
||
|
||
|
||
class CryptoJsUtils:
|
||
|
||
@staticmethod
|
||
def bytes_to_key(data: bytes, salt: bytes, output=48) -> bytes:
|
||
"""
|
||
生成加密/解密所需的密钥和初始化向量 (IV)
|
||
"""
|
||
# extended from https://gist.github.com/gsakkis/4546068
|
||
assert len(salt) == 8, len(salt)
|
||
data += salt
|
||
key = md5(data).digest()
|
||
final_key = key
|
||
while len(final_key) < output:
|
||
key = md5(key + data).digest()
|
||
final_key += key
|
||
return final_key[:output]
|
||
|
||
@staticmethod
|
||
def encrypt(message: bytes, passphrase: bytes) -> bytes:
|
||
"""
|
||
使用 CryptoJS 兼容的加密策略对消息进行加密
|
||
"""
|
||
# This is a modified copy of https://stackoverflow.com/questions/36762098/how-to-decrypt-password-from-javascript-cryptojs-aes-encryptpassword-passphras
|
||
# 生成8字节的随机盐值
|
||
salt = Random.new().read(8)
|
||
# 通过密码短语和盐值生成密钥和IV
|
||
key_iv = CryptoJsUtils.bytes_to_key(passphrase, salt, 32 + 16)
|
||
key = key_iv[:32]
|
||
iv = key_iv[32:]
|
||
# 创建AES加密器(CBC模式)
|
||
aes = AES.new(key, AES.MODE_CBC, iv)
|
||
# 应用PKCS#7填充
|
||
padding_length = 16 - (len(message) % 16)
|
||
padding = bytes([padding_length] * padding_length)
|
||
padded_message = message + padding
|
||
# 加密消息
|
||
encrypted = aes.encrypt(padded_message)
|
||
# 构建加密数据格式:b"Salted__" + salt + encrypted_message
|
||
salted_encrypted = b"Salted__" + salt + encrypted
|
||
# 返回Base64编码的加密数据
|
||
return base64.b64encode(salted_encrypted)
|
||
|
||
@staticmethod
|
||
def decrypt(encrypted: Union[str, bytes], passphrase: bytes) -> bytes:
|
||
"""
|
||
使用 CryptoJS 兼容的解密策略对加密消息进行解密
|
||
"""
|
||
# 确保输入是字节类型
|
||
if isinstance(encrypted, str):
|
||
encrypted = encrypted.encode("utf-8")
|
||
# Base64 解码
|
||
encrypted = base64.b64decode(encrypted)
|
||
# 检查前8字节是否为 "Salted__"
|
||
assert encrypted.startswith(b"Salted__"), "Invalid encrypted data format"
|
||
# 提取盐值
|
||
salt = encrypted[8:16]
|
||
# 通过密码短语和盐值生成密钥和IV
|
||
key_iv = CryptoJsUtils.bytes_to_key(passphrase, salt, 32 + 16)
|
||
key = key_iv[:32]
|
||
iv = key_iv[32:]
|
||
# 创建AES解密器(CBC模式)
|
||
aes = AES.new(key, AES.MODE_CBC, iv)
|
||
# 解密加密部分
|
||
decrypted_padded = aes.decrypt(encrypted[16:])
|
||
# 移除PKCS#7填充
|
||
padding_length = decrypted_padded[-1]
|
||
if isinstance(padding_length, str):
|
||
padding_length = ord(padding_length)
|
||
decrypted = decrypted_padded[:-padding_length]
|
||
return decrypted
|