Files
Auto_Bangumi/docs/dev/database.md
Estrella Pan 03bf265651 docs: add i18n support with Chinese as default language
- Configure VitePress locales with zh-CN as root and en-US as /en/
- Translate all documentation to Chinese (31 files)
- Create English documentation under /en/ directory
- Add Chinese UI labels for navigation and pagination
- Language switcher now available in site header

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-01-27 07:57:13 +01:00

395 lines
11 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 数据库开发指南
本指南介绍 AutoBangumi 中的数据库架构、模型和操作。
## 概述
AutoBangumi 使用 **SQLite** 作为数据库,使用 **SQLModel**Pydantic + SQLAlchemy 混合)作为 ORM。数据库文件位于 `data/data.db`
### 架构
```
module/database/
├── engine.py # SQLAlchemy 引擎配置
├── combine.py # Database 类、迁移、会话管理
├── bangumi.py # 番剧(动画订阅)操作
├── rss.py # RSS 订阅源操作
├── torrent.py # 种子跟踪操作
└── user.py # 用户认证操作
```
## 核心组件
### Database 类
`combine.py` 中的 `Database` 类是主入口点。它继承自 SQLModel 的 `Session`,并提供对所有子数据库的访问:
```python
from module.database import Database
with Database() as db:
# 访问子数据库
bangumis = db.bangumi.search_all()
rss_items = db.rss.search_active()
torrents = db.torrent.search_all()
```
### 子数据库类
| 类 | 模型 | 用途 |
|-------|-------|---------|
| `BangumiDatabase` | `Bangumi` | 动画订阅规则 |
| `RSSDatabase` | `RSSItem` | RSS 订阅源 |
| `TorrentDatabase` | `Torrent` | 已下载种子跟踪 |
| `UserDatabase` | `User` | 认证 |
## 模型
### Bangumi 模型
动画订阅的核心模型:
```python
class Bangumi(SQLModel, table=True):
id: int # 主键
official_title: str # 显示名称(如"无职转生"
title_raw: str # 用于种子匹配的原始标题(有索引)
season: int = 1 # 季度编号
episode_offset: int = 0 # 集数编号调整
season_offset: int = 0 # 季度编号调整
rss_link: str # 逗号分隔的 RSS 订阅源 URL
filter: str # 排除过滤器(如 "720,\\d+-\\d+"
poster_link: str # TMDB 海报 URL
save_path: str # 下载目标路径
rule_name: str # qBittorrent RSS 规则名称
added: bool = False # 规则是否已添加到下载器
deleted: bool = False # 软删除标志(有索引)
archived: bool = False # 用于已完结系列(有索引)
needs_review: bool = False # 检测到偏移不匹配
needs_review_reason: str # 需要审核的原因
suggested_season_offset: int # 建议的季度偏移
suggested_episode_offset: int # 建议的集数偏移
air_weekday: int # 放送日0=周日6=周六)
```
### RSSItem 模型
RSS 订阅源:
```python
class RSSItem(SQLModel, table=True):
id: int # 主键
name: str # 显示名称
url: str # 订阅源 URL唯一有索引
aggregate: bool = True # 是否解析种子
parser: str = "mikan" # 解析器类型mikan、dmhy、nyaa
enabled: bool = True # 启用标志
connection_status: str # "healthy" 或 "error"
last_checked_at: str # ISO 时间戳
last_error: str # 最后一次错误消息
```
### Torrent 模型
跟踪已下载的种子:
```python
class Torrent(SQLModel, table=True):
id: int # 主键
name: str # 种子名称(有索引)
url: str # 种子/磁力链接 URL唯一有索引
rss_id: int # 来源 RSS 订阅源 ID
bangumi_id: int # 关联的番剧 ID可为空
qb_hash: str # qBittorrent 信息哈希(有索引)
downloaded: bool = False # 下载完成
```
## 常用操作
### BangumiDatabase
```python
with Database() as db:
# 创建
db.bangumi.add(bangumi) # 单条插入
db.bangumi.add_all(bangumi_list) # 批量插入(去重)
# 读取
db.bangumi.search_all() # 所有记录缓存5分钟 TTL
db.bangumi.search_id(123) # 按 ID 查询
db.bangumi.match_torrent("torrent name") # 按 title_raw 匹配查找
db.bangumi.not_complete() # 未完结系列
db.bangumi.get_needs_review() # 标记需要审核的
# 更新
db.bangumi.update(bangumi) # 更新单条记录
db.bangumi.update_all(bangumi_list) # 批量更新
# 删除
db.bangumi.delete_one(123) # 硬删除
db.bangumi.disable_rule(123) # 软删除deleted=True
```
### RSSDatabase
```python
with Database() as db:
# 创建
db.rss.add(rss_item) # 单条插入
db.rss.add_all(rss_items) # 批量插入(去重)
# 读取
db.rss.search_all() # 所有订阅源
db.rss.search_active() # 仅启用的订阅源
db.rss.search_aggregate() # 启用且 aggregate=True
# 更新
db.rss.update(id, rss_update) # 部分更新
db.rss.enable(id) # 启用订阅源
db.rss.disable(id) # 禁用订阅源
db.rss.enable_batch([1, 2, 3]) # 批量启用
db.rss.disable_batch([1, 2, 3]) # 批量禁用
```
### TorrentDatabase
```python
with Database() as db:
# 创建
db.torrent.add(torrent) # 单条插入
db.torrent.add_all(torrents) # 批量插入
# 读取
db.torrent.search_all() # 所有种子
db.torrent.search_by_qb_hash(hash) # 按 qBittorrent 哈希查询
db.torrent.search_by_url(url) # 按 URL 查询
db.torrent.check_new(torrents) # 过滤掉已存在的
# 更新
db.torrent.update_qb_hash(id, hash) # 设置 qb_hash
```
## 缓存
### 番剧缓存
`search_all()` 的结果在模块级别缓存TTL 为 5 分钟:
```python
# bangumi.py 中的模块级缓存
_bangumi_cache: list[Bangumi] | None = None
_bangumi_cache_time: float = 0
_BANGUMI_CACHE_TTL: float = 300.0 # 5 分钟
# 缓存失效
def _invalidate_bangumi_cache():
global _bangumi_cache, _bangumi_cache_time
_bangumi_cache = None
_bangumi_cache_time = 0
```
**重要:** 缓存在以下操作时自动失效:
- `add()``add_all()`
- `update()``update_all()`
- `delete_one()``delete_all()`
- `archive_one()``unarchive_one()`
- 任何 RSS 链接更新操作
### 会话分离
缓存的对象会从会话中**分离**,以防止 `DetachedInstanceError`
```python
for b in bangumis:
self.session.expunge(b) # 从会话中分离
```
## 迁移系统
### Schema 版本控制
迁移通过 `schema_version` 表跟踪:
```python
CURRENT_SCHEMA_VERSION = 7
# 每个迁移:(版本号, 描述, [SQL 语句])
MIGRATIONS = [
(1, "add air_weekday column", [...]),
(2, "add connection status columns", [...]),
(3, "create passkey table", [...]),
(4, "add archived column", [...]),
(5, "rename offset to episode_offset", [...]),
(6, "add qb_hash column", [...]),
(7, "add suggested offset columns", [...]),
]
```
### 添加新迁移
1.`combine.py` 中增加 `CURRENT_SCHEMA_VERSION`
2.`MIGRATIONS` 列表中添加迁移元组:
```python
MIGRATIONS = [
# ... 现有迁移 ...
(
8,
"add my_new_column to bangumi",
[
"ALTER TABLE bangumi ADD COLUMN my_new_column TEXT DEFAULT NULL",
],
),
]
```
3.`run_migrations()` 中添加幂等性检查:
```python
if "bangumi" in tables and version == 8:
columns = [col["name"] for col in inspector.get_columns("bangumi")]
if "my_new_column" in columns:
needs_run = False
```
4. 更新 `module/models/` 中对应的 Pydantic 模型
### 默认值回填
迁移后,`_fill_null_with_defaults()` 会根据模型默认值自动填充 NULL 值:
```python
# 如果模型定义为:
class Bangumi(SQLModel, table=True):
my_field: bool = False
# 那么现有记录中的 NULL 值将被更新为 False
```
## 性能模式
### 批量查询
`add_all()` 使用单个查询检查重复项,而不是 N 个查询:
```python
# 高效:单个 SELECT
keys_to_check = [(d.title_raw, d.group_name) for d in datas]
conditions = [
and_(Bangumi.title_raw == tr, Bangumi.group_name == gn)
for tr, gn in keys_to_check
]
statement = select(Bangumi.title_raw, Bangumi.group_name).where(or_(*conditions))
```
### 正则表达式匹配
`match_list()` 为所有标题匹配编译单个正则表达式模式:
```python
# 编译一次,匹配多次
sorted_titles = sorted(title_index.keys(), key=len, reverse=True)
pattern = "|".join(re.escape(title) for title in sorted_titles)
title_regex = re.compile(pattern)
# 每个种子 O(1) 查找而不是 O(n)
for torrent in torrent_list:
match = title_regex.search(torrent.name)
```
### 索引列
以下列具有索引以实现快速查找:
| 表 | 列 | 索引类型 |
|-------|--------|------------|
| `bangumi` | `title_raw` | 普通 |
| `bangumi` | `deleted` | 普通 |
| `bangumi` | `archived` | 普通 |
| `rssitem` | `url` | 唯一 |
| `torrent` | `name` | 普通 |
| `torrent` | `url` | 唯一 |
| `torrent` | `qb_hash` | 普通 |
## 测试
### 测试数据库设置
测试使用内存中的 SQLite 数据库:
```python
# conftest.py
@pytest.fixture
def db_engine():
engine = create_engine("sqlite:///:memory:")
SQLModel.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture
def db_session(db_engine):
with Session(db_engine) as session:
yield session
```
### 工厂函数
使用工厂函数创建测试数据:
```python
from test.factories import make_bangumi, make_torrent, make_rss_item
def test_bangumi_search():
bangumi = make_bangumi(title_raw="Test Title", season=2)
# ... 测试逻辑
```
## 设计说明
### 无外键
SQLite 外键强制默认是禁用的。关系(如 `Torrent.bangumi_id`)在应用程序逻辑中管理,而不是通过数据库约束。
### 软删除
`Bangumi.deleted` 标志启用软删除。面向用户的数据查询应按 `deleted=False` 过滤:
```python
statement = select(Bangumi).where(Bangumi.deleted == false())
```
### 种子标记
种子在 qBittorrent 中使用 `ab:{bangumi_id}` 标记,用于重命名操作时的偏移查找。这使得无需数据库查询即可快速识别番剧。
## 常见问题
### DetachedInstanceError
如果您从不同的会话访问缓存的对象:
```python
# 错误:在新会话中访问缓存的对象
bangumis = db.bangumi.search_all() # 已缓存
with Database() as new_db:
new_db.session.add(bangumis[0]) # 错误!
# 正确:对象已分离,可独立工作
bangumis = db.bangumi.search_all()
bangumis[0].title_raw = "New Title" # 可以,但不会持久化
```
### 缓存过期
如果手动 SQL 更新绕过了 ORM请使缓存失效
```python
from module.database.bangumi import _invalidate_bangumi_cache
with engine.connect() as conn:
conn.execute(text("UPDATE bangumi SET ..."))
conn.commit()
_invalidate_bangumi_cache() # 重要!
```