test(e2e): add comprehensive E2E integration test suite

67 tests across 11 phases exercising the full AutoBangumi workflow
against Docker infrastructure (qBittorrent + mock RSS server).
Covers setup wizard, auth, config, RSS CRUD, bangumi, downloader,
program lifecycle, log, search, notification, and credential updates
with both happy paths and error conditions.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Estrella Pan
2026-02-23 11:44:03 +01:00
parent 6cbe7090fe
commit 339166508b
9 changed files with 1143 additions and 0 deletions

View File

@@ -41,6 +41,9 @@ dev = [
testpaths = ["src/test"]
pythonpath = ["src"]
asyncio_mode = "auto"
markers = [
"e2e: End-to-end integration tests (require Docker)",
]
[tool.ruff]
line-length = 88

View File

@@ -0,0 +1,7 @@
FROM python:3.11-slim
RUN pip install --no-cache-dir aiohttp
COPY mock_rss_server.py /app/
COPY fixtures/ /app/fixtures/
WORKDIR /app
EXPOSE 18888
CMD ["python", "mock_rss_server.py"]

View File

View File

@@ -0,0 +1,180 @@
"""Shared fixtures for E2E integration tests.
These tests require Docker (qBittorrent + mock RSS server) and run
AutoBangumi as a real subprocess with isolated config/data directories.
Run with: cd backend && uv run pytest -m e2e -v
"""
import subprocess
import sys
import time
from pathlib import Path
import httpx
import pytest
# ---------------------------------------------------------------------------
# Auto-skip E2E tests unless explicitly selected
# ---------------------------------------------------------------------------
E2E_DIR = Path(__file__).parent
def pytest_collection_modifyitems(config, items):
"""Skip E2E tests unless -m e2e is specified."""
marker_expr = config.getoption("-m", default="")
if "e2e" in marker_expr:
return
skip = pytest.mark.skip(reason="E2E tests require: pytest -m e2e")
for item in items:
if "e2e" in item.keywords:
item.add_marker(skip)
# ---------------------------------------------------------------------------
# Test credentials (used in setup and login)
# ---------------------------------------------------------------------------
E2E_USERNAME = "testadmin"
E2E_PASSWORD = "testpassword123"
# ---------------------------------------------------------------------------
# Session-scoped fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def e2e_tmpdir(tmp_path_factory):
"""Session-scoped temp directory for AB config/data isolation."""
return tmp_path_factory.mktemp("e2e")
@pytest.fixture(scope="session")
def docker_services():
"""Start and stop Docker Compose test infrastructure."""
compose_file = E2E_DIR / "docker-compose.test.yml"
# Build mock RSS image
subprocess.run(
["docker", "compose", "-f", str(compose_file), "build"],
check=True,
capture_output=True,
)
# Start services and wait for health checks
subprocess.run(
["docker", "compose", "-f", str(compose_file), "up", "-d", "--wait"],
check=True,
timeout=120,
)
yield
# Teardown
subprocess.run(
["docker", "compose", "-f", str(compose_file), "down", "-v"],
check=True,
capture_output=True,
)
@pytest.fixture(scope="session")
def qb_password(docker_services):
"""Extract the auto-generated password from qBittorrent container logs."""
for _ in range(30):
result = subprocess.run(
["docker", "logs", "ab-test-qbittorrent"],
capture_output=True,
text=True,
)
for line in result.stdout.splitlines() + result.stderr.splitlines():
if "temporary password" in line.lower():
return line.split(":")[-1].strip()
time.sleep(2)
pytest.fail("Could not extract qBittorrent temporary password from Docker logs")
@pytest.fixture(scope="session")
def ab_process(e2e_tmpdir, docker_services):
"""Start AutoBangumi as a subprocess with isolated config/data dirs.
Uses CWD-based isolation: main.py resolves config/ and data/ relative
to the working directory, so we create those dirs in a temp location
and run the process from there.
"""
work_dir = e2e_tmpdir / "ab_workdir"
work_dir.mkdir()
(work_dir / "config").mkdir()
(work_dir / "data").mkdir()
# main.py mounts StaticFiles for dist/assets and dist/images when
# VERSION != "DEV_VERSION". Create dummy dirs so the mounts succeed
# (the E2E tests only exercise the API, not the frontend).
dist_dir = work_dir / "dist"
dist_dir.mkdir()
(dist_dir / "assets").mkdir()
(dist_dir / "images").mkdir()
# Jinja2Templates requires at least one template file
(dist_dir / "index.html").write_text(
"<html><body>e2e stub</body></html>"
)
# backend/src/ is the directory containing main.py and module/
src_dir = Path(__file__).resolve().parents[2]
proc = subprocess.Popen(
[sys.executable, str(src_dir / "main.py")],
cwd=str(work_dir),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
# Wait for AutoBangumi to be ready (poll setup status endpoint)
ready = False
for _ in range(30):
try:
resp = httpx.get(
"http://localhost:7892/api/v1/setup/status", timeout=3.0
)
if resp.status_code == 200:
ready = True
break
except (httpx.ConnectError, httpx.ReadTimeout):
pass
time.sleep(1)
if not ready:
proc.terminate()
stdout, stderr = proc.communicate(timeout=5)
pytest.fail(
f"AutoBangumi did not start within 30s.\n"
f"stdout: {stdout.decode(errors='replace')[-2000:]}\n"
f"stderr: {stderr.decode(errors='replace')[-2000:]}"
)
yield proc
proc.terminate()
try:
proc.wait(timeout=10)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait(timeout=5)
@pytest.fixture(scope="session")
def api_client(ab_process):
"""HTTP client pointing at the running AutoBangumi instance.
Maintains cookies across requests so that the auth token (set via
Set-Cookie on login) is automatically included in subsequent calls.
"""
with httpx.Client(base_url="http://localhost:7892", timeout=10.0) as client:
yield client
@pytest.fixture(scope="session")
def e2e_state():
"""Mutable dict for sharing state across ordered E2E tests."""
return {}

View File

@@ -0,0 +1,34 @@
services:
qbittorrent:
image: linuxserver/qbittorrent:latest
container_name: ab-test-qbittorrent
environment:
- PUID=1000
- PGID=1000
- TZ=UTC
- WEBUI_PORT=18080
ports:
- "18080:18080"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:18080"]
interval: 5s
timeout: 3s
retries: 15
start_period: 10s
tmpfs:
- /config
- /downloads
mock-rss:
build:
context: .
dockerfile: Dockerfile.mock-rss
container_name: ab-test-mock-rss
ports:
- "18888:18888"
healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:18888/health')"]
interval: 3s
timeout: 2s
retries: 5
start_period: 3s

View File

@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="utf-8"?>
<rss version="2.0">
<channel>
<title>Mikan Project - E2E Test Feed</title>
<link>https://mikanani.me</link>
<description>E2E test RSS feed for AutoBangumi</description>
<item>
<title>[Lilith-Raws] Sousou no Frieren - 01 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]</title>
<link>https://mikanani.me/Home/Episode/abc001</link>
<enclosure url="magnet:?xt=urn:btih:aaaa1111bbbb2222cccc3333dddd4444eeee5555&amp;dn=Frieren+01" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-06T12:00:00</pubDate>
</torrent>
</item>
<item>
<title>[Lilith-Raws] Sousou no Frieren - 02 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]</title>
<link>https://mikanani.me/Home/Episode/abc002</link>
<enclosure url="magnet:?xt=urn:btih:aaaa1111bbbb2222cccc3333dddd4444eeee6666&amp;dn=Frieren+02" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-13T12:00:00</pubDate>
</torrent>
</item>
<item>
<title>[Lilith-Raws] Sousou no Frieren - 03 [Baha][WEB-DL][1080p][AVC AAC][CHT][MP4]</title>
<link>https://mikanani.me/Home/Episode/abc003</link>
<enclosure url="magnet:?xt=urn:btih:aaaa1111bbbb2222cccc3333dddd4444eeee7777&amp;dn=Frieren+03" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-20T12:00:00</pubDate>
</torrent>
</item>
<item>
<title>[SubsPlease] Jujutsu Kaisen - 01 (1080p) [ABCD1234].mkv</title>
<link>https://mikanani.me/Home/Episode/def001</link>
<enclosure url="magnet:?xt=urn:btih:bbbb2222cccc3333dddd4444eeee5555ffff6666&amp;dn=JJK+01" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-07T12:00:00</pubDate>
</torrent>
</item>
<item>
<title>[SubsPlease] Jujutsu Kaisen - 02 (1080p) [EFGH5678].mkv</title>
<link>https://mikanani.me/Home/Episode/def002</link>
<enclosure url="magnet:?xt=urn:btih:bbbb2222cccc3333dddd4444eeee5555ffff7777&amp;dn=JJK+02" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-14T12:00:00</pubDate>
</torrent>
</item>
<item>
<title>[ANi] Spy x Family Season 2 - 01 [1080p][Baha][WEB-DL][AAC AVC][CHT]</title>
<link>https://mikanani.me/Home/Episode/ghi001</link>
<enclosure url="magnet:?xt=urn:btih:cccc3333dddd4444eeee5555ffff6666aaaa7777&amp;dn=SpyFamily+01" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-07T18:00:00</pubDate>
</torrent>
</item>
<item>
<title>[Nekomoe kissaten] Kusuriya no Hitorigoto - 01 [BDRip 1080p HEVC-10bit FLAC]</title>
<link>https://mikanani.me/Home/Episode/jkl001</link>
<enclosure url="magnet:?xt=urn:btih:dddd4444eeee5555ffff6666aaaa7777bbbb8888&amp;dn=Kusuriya+01" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-21T12:00:00</pubDate>
</torrent>
</item>
<item>
<title>[Nekomoe kissaten] Kusuriya no Hitorigoto - 02 [BDRip 1080p HEVC-10bit FLAC]</title>
<link>https://mikanani.me/Home/Episode/jkl002</link>
<enclosure url="magnet:?xt=urn:btih:dddd4444eeee5555ffff6666aaaa7777bbbb9999&amp;dn=Kusuriya+02" type="application/x-bittorrent" length="0"/>
<torrent xmlns="https://mikan.ani.rip/0.1/">
<pubDate>2025-10-28T12:00:00</pubDate>
</torrent>
</item>
</channel>
</rss>

View File

@@ -0,0 +1,34 @@
"""Minimal HTTP server that serves static RSS XML fixtures."""
import asyncio
from pathlib import Path
from aiohttp import web
FIXTURES_DIR = Path(__file__).parent / "fixtures"
async def handle_rss(request: web.Request) -> web.Response:
feed_name = request.match_info["feed_name"]
xml_path = FIXTURES_DIR / f"{feed_name}.xml"
if not xml_path.exists():
return web.Response(status=404, text=f"Feed not found: {feed_name}")
return web.Response(
text=xml_path.read_text(encoding="utf-8"),
content_type="application/xml",
)
async def handle_health(request: web.Request) -> web.Response:
return web.Response(text="OK")
def create_app() -> web.Application:
app = web.Application()
app.router.add_get("/health", handle_health)
app.router.add_get("/rss/{feed_name}.xml", handle_rss)
return app
if __name__ == "__main__":
web.run_app(create_app(), host="0.0.0.0", port=18888)

View File

@@ -0,0 +1,668 @@
"""E2E integration tests for the full AutoBangumi workflow.
Tests are executed in definition order within the class. Each phase
builds on state created by earlier phases (setup wizard -> auth ->
config -> RSS -> bangumi -> downloader -> program -> log -> search ->
notification -> credential update -> cleanup).
Prerequisites:
- Docker running (qBittorrent + mock RSS containers)
- Port 7892 free (AutoBangumi)
- Port 18080 free (qBittorrent)
- Port 18888 free (mock RSS server)
Run:
cd backend && uv run pytest -m e2e -v --tb=long
"""
import httpx
import pytest
from .conftest import E2E_PASSWORD, E2E_USERNAME
@pytest.mark.e2e
class TestE2EWorkflow:
"""Full workflow test against real qBittorrent and mock RSS server."""
# ===================================================================
# Phase 1: Setup Wizard
# ===================================================================
def test_01_setup_status_needs_setup(self, api_client):
"""Fresh instance should require setup."""
resp = api_client.get("/api/v1/setup/status")
assert resp.status_code == 200
data = resp.json()
assert data["need_setup"] is True
assert "version" in data
def test_02_verify_infrastructure(self, api_client, qb_password):
"""Verify Docker test infrastructure is reachable."""
# qBittorrent WebUI
qb_resp = httpx.get("http://localhost:18080", timeout=5.0)
assert qb_resp.status_code == 200
# Mock RSS server
rss_resp = httpx.get("http://localhost:18888/health", timeout=5.0)
assert rss_resp.status_code == 200
# Mock RSS feed content
xml_resp = httpx.get("http://localhost:18888/rss/mikan.xml", timeout=5.0)
assert xml_resp.status_code == 200
assert "<rss" in xml_resp.text
assert "Frieren" in xml_resp.text
# qBittorrent password was extracted
assert qb_password, "qBittorrent password should not be empty"
def test_03_mock_rss_nonexistent_feed(self):
"""Mock RSS server returns 404 for unknown feeds."""
resp = httpx.get("http://localhost:18888/rss/nonexistent.xml", timeout=5.0)
assert resp.status_code == 404
def test_04_test_mock_downloader(self, api_client):
"""Setup wizard test-downloader endpoint accepts mock type."""
resp = api_client.post(
"/api/v1/setup/test-downloader",
json={
"type": "mock",
"host": "localhost",
"username": "admin",
"password": "admin",
},
)
assert resp.status_code == 200
assert resp.json()["success"] is True
def test_05_setup_validation_username_too_short(self, api_client):
"""Username < 4 chars triggers Pydantic 422."""
resp = api_client.post(
"/api/v1/setup/complete",
json={
"username": "ab",
"password": "validpassword",
"downloader_type": "mock",
"downloader_host": "localhost",
"downloader_username": "x",
"downloader_password": "x",
"downloader_path": "/tmp",
},
)
assert resp.status_code == 422
def test_06_setup_validation_password_too_short(self, api_client):
"""Password < 8 chars triggers Pydantic 422."""
resp = api_client.post(
"/api/v1/setup/complete",
json={
"username": "validuser",
"password": "short",
"downloader_type": "mock",
"downloader_host": "localhost",
"downloader_username": "x",
"downloader_password": "x",
"downloader_path": "/tmp",
},
)
assert resp.status_code == 422
def test_07_complete_setup(self, api_client, e2e_state):
"""Complete the setup wizard with mock downloader and test RSS URL."""
resp = api_client.post(
"/api/v1/setup/complete",
json={
"username": E2E_USERNAME,
"password": E2E_PASSWORD,
"downloader_type": "mock",
"downloader_host": "localhost:18080",
"downloader_username": "admin",
"downloader_password": "admin",
"downloader_path": "/downloads/Bangumi",
"downloader_ssl": False,
"rss_url": "http://localhost:18888/rss/mikan.xml",
"rss_name": "Test Mikan Feed",
},
)
assert resp.status_code == 200
data = resp.json()
assert data["status"] is True
e2e_state["setup_complete"] = True
def test_08_setup_status_complete(self, api_client):
"""After setup, need_setup should be False."""
resp = api_client.get("/api/v1/setup/status")
assert resp.status_code == 200
assert resp.json()["need_setup"] is False
def test_09_setup_complete_blocked(self, api_client):
"""POST /setup/complete returns 403 after setup is done."""
resp = api_client.post(
"/api/v1/setup/complete",
json={
"username": "another",
"password": "anotherpassword",
"downloader_type": "mock",
"downloader_host": "localhost",
"downloader_username": "x",
"downloader_password": "x",
"downloader_path": "/tmp",
},
)
assert resp.status_code == 403
def test_09b_test_downloader_blocked(self, api_client):
"""POST /setup/test-downloader returns 403 after setup is done."""
resp = api_client.post(
"/api/v1/setup/test-downloader",
json={"type": "mock", "host": "x", "username": "x", "password": "x"},
)
assert resp.status_code == 403
def test_09c_test_rss_blocked(self, api_client):
"""POST /setup/test-rss returns 403 after setup is done."""
resp = api_client.post(
"/api/v1/setup/test-rss",
json={"url": "http://example.com/rss.xml"},
)
assert resp.status_code == 403
# ===================================================================
# Phase 2: Authentication
# ===================================================================
def test_10_login(self, api_client, e2e_state):
"""Login with credentials created during setup."""
resp = api_client.post(
"/api/v1/auth/login",
data={"username": E2E_USERNAME, "password": E2E_PASSWORD},
)
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
e2e_state["token"] = data["access_token"]
def test_11_login_cookie_set(self, api_client):
"""After login, the 'token' cookie should be set on the client."""
assert "token" in api_client.cookies
def test_12_access_protected_endpoint(self, api_client):
"""Authenticated client can access protected endpoints."""
resp = api_client.get("/api/v1/status")
assert resp.status_code == 200
data = resp.json()
assert "status" in data
assert "version" in data
assert "first_run" in data
def test_13_refresh_token(self, api_client, e2e_state):
"""Token refresh returns a new access token and updates cookie."""
resp = api_client.get("/api/v1/auth/refresh_token")
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
e2e_state["token"] = data["access_token"]
# NOTE: Tokens may be identical if login+refresh happen within the
# same second (JWT exp uses second-level granularity).
def test_14_login_wrong_password(self, api_client):
"""Login with incorrect password returns 401."""
resp = api_client.post(
"/api/v1/auth/login",
data={"username": E2E_USERNAME, "password": "wrong_password"},
)
assert resp.status_code == 401
def test_15_login_nonexistent_user(self, api_client):
"""Login with a user that doesn't exist returns 401."""
resp = api_client.post(
"/api/v1/auth/login",
data={"username": "no_such_user", "password": "irrelevant"},
)
assert resp.status_code == 401
def test_16_unauthenticated_client(self):
"""A fresh client with no cookies.
NOTE: In DEV_VERSION, auth is bypassed so this returns 200.
In production builds this would return 401.
"""
with httpx.Client(base_url="http://localhost:7892", timeout=5.0) as fresh:
resp = fresh.get("/api/v1/status")
assert resp.status_code in (200, 401)
# ===================================================================
# Phase 3: Configuration
# ===================================================================
def test_20_get_config(self, api_client):
"""Retrieve current configuration with all top-level sections."""
resp = api_client.get("/api/v1/config/get")
assert resp.status_code == 200
config = resp.json()
for section in (
"program",
"downloader",
"rss_parser",
"bangumi_manage",
"log",
"proxy",
"notification",
"experimental_openai",
):
assert section in config, f"Missing config section: {section}"
assert config["downloader"]["type"] == "mock"
def test_21_config_passwords_masked(self, api_client):
"""Sensitive fields are masked as '********' in GET /config/get."""
resp = api_client.get("/api/v1/config/get")
config = resp.json()
# downloader password
assert config["downloader"]["password"] == "********"
# proxy password (even if empty, still masked since key contains 'password')
assert config["proxy"]["password"] == "********"
def test_22_update_config(self, api_client):
"""Update a non-sensitive config field via PATCH."""
get_resp = api_client.get("/api/v1/config/get")
config = get_resp.json()
config["program"]["rss_time"] = 600
# Re-supply masked passwords with actual values
config["downloader"]["password"] = "admin"
config["proxy"]["password"] = ""
config["proxy"]["username"] = ""
resp = api_client.patch("/api/v1/config/update", json=config)
assert resp.status_code == 200
def test_23_config_update_persisted(self, api_client):
"""Verify the config update from previous test is persisted."""
resp = api_client.get("/api/v1/config/get")
assert resp.json()["program"]["rss_time"] == 600
# ===================================================================
# Phase 4: RSS Management
# ===================================================================
def test_30_list_rss_initial(self, api_client, e2e_state):
"""One RSS feed should exist from setup wizard."""
resp = api_client.get("/api/v1/rss")
assert resp.status_code == 200
feeds = resp.json()
assert isinstance(feeds, list)
assert len(feeds) == 1
assert feeds[0]["name"] == "Test Mikan Feed"
e2e_state["initial_rss_id"] = feeds[0]["id"]
def test_31_add_rss_feed(self, api_client, e2e_state):
"""Add a second RSS feed with unique URL."""
resp = api_client.post(
"/api/v1/rss/add",
json={
"url": "http://localhost:18888/rss/mikan.xml?tag=e2e",
"name": "E2E Second Feed",
"aggregate": False,
"parser": "mikan",
},
)
assert resp.status_code == 200
def test_32_add_rss_duplicate_url(self, api_client):
"""Adding RSS with an existing URL returns 406 (duplicate)."""
resp = api_client.post(
"/api/v1/rss/add",
json={
"url": "http://localhost:18888/rss/mikan.xml",
"name": "Duplicate Feed",
"aggregate": False,
"parser": "mikan",
},
)
# u_response returns the status_code from ResponseModel (406 for failed add)
assert resp.status_code == 406
def test_33_list_rss_after_add(self, api_client, e2e_state):
"""Two feeds should now exist."""
resp = api_client.get("/api/v1/rss")
feeds = resp.json()
assert len(feeds) == 2
names = {f["name"] for f in feeds}
assert "Test Mikan Feed" in names
assert "E2E Second Feed" in names
for feed in feeds:
if feed["name"] == "E2E Second Feed":
e2e_state["second_rss_id"] = feed["id"]
break
def test_34_disable_rss(self, api_client, e2e_state):
"""Disable the second RSS feed."""
rss_id = e2e_state["second_rss_id"]
resp = api_client.patch(f"/api/v1/rss/disable/{rss_id}")
assert resp.status_code == 200
def test_35_verify_rss_disabled(self, api_client, e2e_state):
"""Disabled feed should have enabled=False."""
resp = api_client.get("/api/v1/rss")
for feed in resp.json():
if feed["id"] == e2e_state["second_rss_id"]:
assert feed["enabled"] is False
break
else:
pytest.fail("Second RSS feed not found")
def test_36_enable_rss(self, api_client, e2e_state):
"""Re-enable the RSS feed via enable/many."""
rss_id = e2e_state["second_rss_id"]
resp = api_client.post("/api/v1/rss/enable/many", json=[rss_id])
assert resp.status_code == 200
def test_37_verify_rss_enabled(self, api_client, e2e_state):
"""Feed should be enabled again."""
resp = api_client.get("/api/v1/rss")
for feed in resp.json():
if feed["id"] == e2e_state["second_rss_id"]:
assert feed["enabled"] is True
break
def test_38_update_rss(self, api_client, e2e_state):
"""Update RSS feed name."""
rss_id = e2e_state["second_rss_id"]
resp = api_client.patch(
f"/api/v1/rss/update/{rss_id}",
json={"name": "Renamed Feed"},
)
assert resp.status_code == 200
def test_39_verify_rss_updated(self, api_client, e2e_state):
"""Verify the rename persisted."""
resp = api_client.get("/api/v1/rss")
for feed in resp.json():
if feed["id"] == e2e_state["second_rss_id"]:
assert feed["name"] == "Renamed Feed"
break
def test_39b_delete_nonexistent_rss(self, api_client):
"""Deleting a non-existent RSS ID returns 200.
The database DELETE WHERE id=X succeeds even when no rows match
(no exception raised), so the endpoint returns 200.
"""
resp = api_client.delete("/api/v1/rss/delete/99999")
assert resp.status_code == 200
def test_39c_disable_nonexistent_rss(self, api_client):
"""Disabling a non-existent RSS ID returns 406."""
resp = api_client.patch("/api/v1/rss/disable/99999")
assert resp.status_code == 406
def test_39d_delete_rss(self, api_client, e2e_state):
"""Delete the second RSS feed."""
rss_id = e2e_state["second_rss_id"]
resp = api_client.delete(f"/api/v1/rss/delete/{rss_id}")
assert resp.status_code == 200
def test_39e_verify_rss_deleted(self, api_client, e2e_state):
"""Only the initial feed should remain."""
resp = api_client.get("/api/v1/rss")
feeds = resp.json()
assert len(feeds) == 1
assert feeds[0]["name"] == "Test Mikan Feed"
# ===================================================================
# Phase 5: Bangumi
# ===================================================================
def test_40_bangumi_get_all_empty(self, api_client):
"""Bangumi list is empty until RSS refresh populates it."""
resp = api_client.get("/api/v1/bangumi/get/all")
assert resp.status_code == 200
assert isinstance(resp.json(), list)
def test_41_bangumi_needs_review_empty(self, api_client):
"""No bangumi should need review initially."""
resp = api_client.get("/api/v1/bangumi/needs-review")
assert resp.status_code == 200
assert resp.json() == []
def test_42_bangumi_dismiss_review_nonexistent(self, api_client):
"""Dismissing review for nonexistent bangumi returns 404."""
resp = api_client.post("/api/v1/bangumi/dismiss-review/99999")
assert resp.status_code == 404
def test_43_bangumi_reset_all(self, api_client):
"""Reset all bangumi (safe when list is empty)."""
resp = api_client.get("/api/v1/bangumi/reset/all")
assert resp.status_code == 200
# ===================================================================
# Phase 6: Downloader
# ===================================================================
def test_50_downloader_check(self, api_client):
"""Mock downloader health check should succeed."""
resp = api_client.get("/api/v1/check/downloader")
assert resp.status_code == 200
def test_51_downloader_torrents_empty(self, api_client):
"""No torrents in mock downloader initially."""
resp = api_client.get("/api/v1/downloader/torrents")
assert resp.status_code == 200
assert isinstance(resp.json(), list)
def test_52_downloader_pause_empty(self, api_client):
"""Pausing with empty hash list should succeed (no-op)."""
resp = api_client.post(
"/api/v1/downloader/torrents/pause", json={"hashes": []}
)
assert resp.status_code == 200
def test_53_downloader_resume_empty(self, api_client):
"""Resuming with empty hash list should succeed (no-op)."""
resp = api_client.post(
"/api/v1/downloader/torrents/resume", json={"hashes": []}
)
assert resp.status_code == 200
def test_54_downloader_delete_empty(self, api_client):
"""Deleting with empty hash list should succeed (no-op)."""
resp = api_client.post(
"/api/v1/downloader/torrents/delete",
json={"hashes": [], "delete_files": False},
)
assert resp.status_code == 200
def test_55_downloader_tag_nonexistent_bangumi(self, api_client):
"""Tagging a torrent with nonexistent bangumi_id returns status=false."""
resp = api_client.post(
"/api/v1/downloader/torrents/tag",
json={"hash": "abc123", "bangumi_id": 99999},
)
assert resp.status_code == 200
assert resp.json()["status"] is False
def test_56_downloader_auto_tag(self, api_client):
"""Auto-tag with no torrents returns 0 tagged."""
resp = api_client.post("/api/v1/downloader/torrents/tag/auto")
assert resp.status_code == 200
data = resp.json()
assert data["tagged_count"] == 0
assert data["unmatched_count"] == 0
def test_57_qbittorrent_direct_connectivity(self, qb_password):
"""Verify direct connectivity to the real qBittorrent instance."""
resp = httpx.post(
"http://localhost:18080/api/v2/auth/login",
data={"username": "admin", "password": qb_password},
timeout=5.0,
)
assert resp.status_code == 200
assert "ok" in resp.text.lower()
# ===================================================================
# Phase 7: Program Lifecycle
# ===================================================================
def test_60_program_status_not_running(self, api_client):
"""After first-run setup, program is NOT auto-started.
startup() detects first_run and returns early without calling start().
"""
resp = api_client.get("/api/v1/status")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data["status"], bool)
assert isinstance(data["version"], str)
assert isinstance(data["first_run"], bool)
def test_61_program_stop_when_not_running(self, api_client):
"""Stopping a program that isn't running returns 406."""
resp = api_client.get("/api/v1/stop")
assert resp.status_code == 406
def test_62_program_start(self, api_client):
"""Explicitly start the program."""
resp = api_client.get("/api/v1/start")
assert resp.status_code == 200
def test_63_program_stop(self, api_client):
"""Stop the now-running program."""
resp = api_client.get("/api/v1/stop")
assert resp.status_code == 200
def test_64_program_stop_already_stopped(self, api_client):
"""Stopping again returns 406 (not running)."""
resp = api_client.get("/api/v1/stop")
assert resp.status_code == 406
def test_65_program_restart(self, api_client):
"""Restart works even from a stopped state."""
resp = api_client.get("/api/v1/restart")
assert resp.status_code == 200
# ===================================================================
# Phase 8: Log
# ===================================================================
def test_70_get_log(self, api_client):
"""Retrieve application log (text/plain response)."""
resp = api_client.get("/api/v1/log")
# Log file may or may not exist depending on AB startup behavior
assert resp.status_code in (200, 404)
if resp.status_code == 200:
assert "text/plain" in resp.headers.get("content-type", "")
def test_71_clear_log(self, api_client):
"""Clear the log file."""
resp = api_client.get("/api/v1/log/clear")
# 200 if log exists, 406 if not found
assert resp.status_code in (200, 406)
def test_72_get_log_after_clear(self, api_client):
"""Log should be empty or very short after clear."""
resp = api_client.get("/api/v1/log")
if resp.status_code == 200:
# Log might have new entries from the clear request itself
assert len(resp.text) < 10000
# ===================================================================
# Phase 9: Search
# ===================================================================
def test_80_search_providers(self, api_client):
"""List available search providers."""
resp = api_client.get("/api/v1/search/provider")
assert resp.status_code == 200
providers = resp.json()
assert isinstance(providers, list)
assert len(providers) > 0
def test_81_search_provider_config(self, api_client):
"""Get search provider URL templates."""
resp = api_client.get("/api/v1/search/provider/config")
assert resp.status_code == 200
config = resp.json()
assert isinstance(config, dict)
def test_82_search_empty_keywords(self, api_client):
"""Search with no keywords returns empty list."""
resp = api_client.get("/api/v1/search/bangumi")
assert resp.status_code == 200
assert resp.json() == []
# ===================================================================
# Phase 10: Notification
# ===================================================================
def test_85_notification_test_invalid_index(self, api_client):
"""Test notification with out-of-range index returns success=false."""
resp = api_client.post(
"/api/v1/notification/test", json={"provider_index": 9999}
)
assert resp.status_code == 200
assert resp.json()["success"] is False
def test_86_notification_test_config_unknown_type(self, api_client):
"""Test-config with unknown provider type returns success=false."""
resp = api_client.post(
"/api/v1/notification/test-config",
json={"type": "nonexistent_provider", "enabled": True},
)
assert resp.status_code == 200
assert resp.json()["success"] is False
# ===================================================================
# Phase 11: Credential Update & Cleanup
# ===================================================================
def test_90_update_credentials(self, api_client, e2e_state):
"""Update user password via /auth/update."""
resp = api_client.post(
"/api/v1/auth/update",
json={"password": "newpassword123"},
)
assert resp.status_code == 200
data = resp.json()
assert "access_token" in data
assert data["message"] == "update success"
e2e_state["new_password"] = "newpassword123"
def test_91_login_with_new_password(self, api_client, e2e_state):
"""Login works with the updated password."""
resp = api_client.post(
"/api/v1/auth/login",
data={
"username": E2E_USERNAME,
"password": e2e_state["new_password"],
},
)
assert resp.status_code == 200
assert "access_token" in resp.json()
def test_92_login_old_password_fails(self, api_client):
"""Old password should no longer work after credential update."""
resp = api_client.post(
"/api/v1/auth/login",
data={"username": E2E_USERNAME, "password": E2E_PASSWORD},
)
assert resp.status_code == 401
def test_93_logout(self, api_client):
"""Logout clears the auth session and deletes cookie."""
resp = api_client.get("/api/v1/auth/logout")
assert resp.status_code == 200
def test_94_verify_logged_out(self, api_client):
"""After logout, the token cookie should be cleared.
NOTE: In DEV_VERSION, endpoints still work (auth bypass).
This test verifies the cookie was deleted.
"""
# httpx may still have a cookie if the server didn't properly
# delete it, but the logout response should have Set-Cookie
# with max-age=0 or explicit deletion.
resp = api_client.get("/api/v1/status")
# DEV_VERSION: 200 (bypass), Production: 401 (no token)
assert resp.status_code in (200, 401)

145
docs/dev/e2e-test-guide.md Normal file
View File

@@ -0,0 +1,145 @@
# E2E Integration Test Guide
End-to-end tests that exercise the full AutoBangumi workflow against real
Docker services (qBittorrent + mock RSS server).
## Prerequisites
- **Docker** with `docker compose` (v2)
- **uv** for Python dependency management
- Ports **7892**, **18080**, **18888** must be free
## Quick Start
```bash
# 1. Build the mock RSS server image
cd backend/src/test/e2e
docker build -f Dockerfile.mock-rss -t ab-mock-rss .
# 2. Start test infrastructure
docker compose -f docker-compose.test.yml up -d --wait
# 3. Verify services are healthy
docker compose -f docker-compose.test.yml ps
# 4. Run E2E tests
cd backend && uv run pytest -m e2e -v --tb=long
# 5. Cleanup
docker compose -f backend/src/test/e2e/docker-compose.test.yml down -v
```
## Architecture
```
Host machine
├── pytest (test runner)
│ └── Drives HTTP requests to AutoBangumi at localhost:7892
├── AutoBangumi subprocess
│ ├── Isolated config/ and data/ in temp directory
│ └── Uses mock downloader (no real qB coupling during setup)
├── qBittorrent container (localhost:18080)
│ └── linuxserver/qbittorrent:latest
└── Mock RSS server container (localhost:18888)
└── Serves static XML fixtures from fixtures/
```
## Test Phases
| Phase | Tests | What It Validates |
|-------|-------|-------------------|
| 1. Setup Wizard | `test_01` - `test_06` | First-run detection, mock downloader, setup completion, 403 guard |
| 2. Authentication | `test_10` - `test_13` | Login, cookie-based JWT, token refresh, logout |
| 3. Configuration | `test_20` - `test_22` | Config CRUD, password masking |
| 4. RSS Management | `test_30` - `test_32` | Add, list, delete RSS feeds |
| 5. Program Lifecycle | `test_40` - `test_41` | Status check, restart |
| 6. Downloader | `test_50` - `test_51` | Mock downloader health, direct qB connectivity |
| 7. Cleanup | `test_90` | Logout |
## Key Design Decisions
### Mock Downloader for Setup
The setup wizard's `_validate_url()` blocks private/loopback IPs (SSRF
protection). Since the Docker qBittorrent instance is on `localhost`, the
setup wizard's "test downloader" endpoint would reject it. Instead:
1. Setup uses `downloader_type: "mock"` (bypasses URL validation)
2. Config can be updated to point to real qBittorrent after auth
3. Direct qBittorrent connectivity is tested independently (`test_51`)
### DEV_VERSION Auth Bypass
When running from source, `VERSION == "DEV_VERSION"` which bypasses JWT
validation (`get_current_user` returns `"dev_user"` unconditionally). Tests
document this behavior: login/refresh/logout endpoints still work, but
unauthenticated access is also allowed. In production builds, test_13
would expect HTTP 401.
### CWD-Based Isolation
AutoBangumi resolves all paths relative to the working directory:
- `config/` - config files, JWT secret, setup sentinel
- `data/` - SQLite database, posters, logs
The `ab_process` fixture creates a temp directory with these subdirs and
runs `main.py` from there, ensuring complete isolation from any existing
installation.
### qBittorrent Password Extraction
Recent `linuxserver/qbittorrent` images generate a random temporary
password on first start. The `qb_password` fixture polls `docker logs`
until it finds the line:
```
A temporary password is provided for this session: XXXXXXXX
```
## Debugging Failures
### AutoBangumi won't start
```bash
# Check if port 7892 is in use
lsof -i :7892
# Run manually to see startup logs
cd /tmp/test-workdir && uv run python /path/to/backend/src/main.py
```
### qBittorrent issues
```bash
docker logs ab-test-qbittorrent
docker exec ab-test-qbittorrent curl -s http://localhost:18080
```
### Mock RSS server issues
```bash
docker logs ab-test-mock-rss
curl http://localhost:18888/health
curl http://localhost:18888/rss/mikan.xml
```
### Test infrastructure stuck
```bash
# Force cleanup
docker compose -f backend/src/test/e2e/docker-compose.test.yml down -v --remove-orphans
```
## Adding New Test Scenarios
1. Add new test methods to `TestE2EWorkflow` in definition order
2. Use `api_client` for HTTP requests (cookies persist across tests)
3. Use `e2e_state` dict to share data between tests
4. For new RSS fixtures, add XML files to `fixtures/` directory
5. Keep test names ordered: `test_XX_description` where XX reflects the phase
### Adding a new fixture feed
1. Create `backend/src/test/e2e/fixtures/your_feed.xml`
2. Access via `http://localhost:18888/rss/your_feed.xml`
3. Rebuild the mock RSS image: `docker compose ... build mock-rss`