mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 03:57:30 +08:00
96 lines
3.3 KiB
Python
96 lines
3.3 KiB
Python
import base64
|
|
import hashlib
|
|
import json
|
|
import unittest
|
|
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import padding, rsa
|
|
|
|
from app.utils.ugreen_crypto import UgreenCrypto
|
|
|
|
|
|
def _generate_rsa_keys() -> tuple[str, rsa.RSAPrivateKey]:
|
|
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
|
|
public_pem = private_key.public_key().public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.PKCS1,
|
|
).decode("utf-8")
|
|
return public_pem, private_key
|
|
|
|
|
|
def _rsa_decrypt_long(private_key: rsa.RSAPrivateKey, payload_b64: str) -> str:
|
|
encrypted = base64.b64decode(payload_b64)
|
|
chunk_size = private_key.key_size // 8
|
|
plain_chunks = []
|
|
for start in range(0, len(encrypted), chunk_size):
|
|
chunk = encrypted[start : start + chunk_size]
|
|
plain_chunks.append(private_key.decrypt(chunk, padding.PKCS1v15()))
|
|
return b"".join(plain_chunks).decode("utf-8")
|
|
|
|
|
|
class UgreenCryptoTest(unittest.TestCase):
|
|
def setUp(self):
|
|
self.public_key, self.private_key = _generate_rsa_keys()
|
|
self.token = "demo-token-for-test"
|
|
self.crypto = UgreenCrypto(
|
|
public_key=self.public_key,
|
|
token=self.token,
|
|
client_id="test-client-id",
|
|
)
|
|
|
|
def test_rsa_encrypt_long(self):
|
|
plain = "A" * 400
|
|
encrypted = self.crypto.rsa_encrypt_long(plain)
|
|
self.assertEqual(plain, _rsa_decrypt_long(self.private_key, encrypted))
|
|
|
|
def test_build_encrypted_request_and_decrypt_response(self):
|
|
req = self.crypto.build_encrypted_request(
|
|
url="http://127.0.0.1:9999/ugreen/v1/video/homepage/media_list",
|
|
params={"page": 1, "page_size": 50},
|
|
data={"foo": "bar", "count": 2},
|
|
)
|
|
|
|
self.assertEqual(
|
|
req.plain_query,
|
|
"page=1&page_size=50",
|
|
)
|
|
self.assertEqual(
|
|
req.plain_query,
|
|
self.crypto.aes_gcm_decrypt(req.params["encrypt_query"], req.aes_key),
|
|
)
|
|
|
|
self.assertEqual(
|
|
req.headers["X-Ugreen-Security-Key"],
|
|
hashlib.md5(self.token.encode("utf-8")).hexdigest(),
|
|
)
|
|
self.assertEqual(
|
|
req.aes_key,
|
|
_rsa_decrypt_long(self.private_key, req.headers["X-Ugreen-Security-Code"]),
|
|
)
|
|
self.assertEqual(
|
|
self.token,
|
|
_rsa_decrypt_long(self.private_key, req.headers["X-Ugreen-Token"]),
|
|
)
|
|
|
|
encrypted_body = req.json["encrypt_req_body"]
|
|
body_plain = self.crypto.aes_gcm_decrypt(encrypted_body, req.aes_key)
|
|
self.assertEqual(json.loads(body_plain), {"foo": "bar", "count": 2})
|
|
self.assertEqual(
|
|
req.json["req_body_sha256"],
|
|
hashlib.sha256(body_plain.encode("utf-8")).hexdigest(),
|
|
)
|
|
|
|
server_payload = {"code": 0, "msg": "ok", "data": {"items": [1, 2, 3]}}
|
|
resp = {
|
|
"encrypt_resp_body": self.crypto.aes_gcm_encrypt(
|
|
json.dumps(server_payload, ensure_ascii=False, separators=(",", ":")),
|
|
req.aes_key,
|
|
)
|
|
}
|
|
decoded = self.crypto.decrypt_response(resp, req.aes_key)
|
|
self.assertEqual(decoded, server_payload)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|