mirror of
https://github.com/EstrellaXD/Auto_Bangumi.git
synced 2026-02-03 18:24:03 +08:00
- Configure VitePress locales with zh-CN as root and en-US as /en/ - Translate all documentation to Chinese (31 files) - Create English documentation under /en/ directory - Add Chinese UI labels for navigation and pagination - Language switcher now available in site header Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
11 KiB
11 KiB
数据库开发指南
本指南介绍 AutoBangumi 中的数据库架构、模型和操作。
概述
AutoBangumi 使用 SQLite 作为数据库,使用 SQLModel(Pydantic + SQLAlchemy 混合)作为 ORM。数据库文件位于 data/data.db。
架构
module/database/
├── engine.py # SQLAlchemy 引擎配置
├── combine.py # Database 类、迁移、会话管理
├── bangumi.py # 番剧(动画订阅)操作
├── rss.py # RSS 订阅源操作
├── torrent.py # 种子跟踪操作
└── user.py # 用户认证操作
核心组件
Database 类
combine.py 中的 Database 类是主入口点。它继承自 SQLModel 的 Session,并提供对所有子数据库的访问:
from module.database import Database
with Database() as db:
# 访问子数据库
bangumis = db.bangumi.search_all()
rss_items = db.rss.search_active()
torrents = db.torrent.search_all()
子数据库类
| 类 | 模型 | 用途 |
|---|---|---|
BangumiDatabase |
Bangumi |
动画订阅规则 |
RSSDatabase |
RSSItem |
RSS 订阅源 |
TorrentDatabase |
Torrent |
已下载种子跟踪 |
UserDatabase |
User |
认证 |
模型
Bangumi 模型
动画订阅的核心模型:
class Bangumi(SQLModel, table=True):
id: int # 主键
official_title: str # 显示名称(如"无职转生")
title_raw: str # 用于种子匹配的原始标题(有索引)
season: int = 1 # 季度编号
episode_offset: int = 0 # 集数编号调整
season_offset: int = 0 # 季度编号调整
rss_link: str # 逗号分隔的 RSS 订阅源 URL
filter: str # 排除过滤器(如 "720,\\d+-\\d+")
poster_link: str # TMDB 海报 URL
save_path: str # 下载目标路径
rule_name: str # qBittorrent RSS 规则名称
added: bool = False # 规则是否已添加到下载器
deleted: bool = False # 软删除标志(有索引)
archived: bool = False # 用于已完结系列(有索引)
needs_review: bool = False # 检测到偏移不匹配
needs_review_reason: str # 需要审核的原因
suggested_season_offset: int # 建议的季度偏移
suggested_episode_offset: int # 建议的集数偏移
air_weekday: int # 放送日(0=周日,6=周六)
RSSItem 模型
RSS 订阅源:
class RSSItem(SQLModel, table=True):
id: int # 主键
name: str # 显示名称
url: str # 订阅源 URL(唯一,有索引)
aggregate: bool = True # 是否解析种子
parser: str = "mikan" # 解析器类型:mikan、dmhy、nyaa
enabled: bool = True # 启用标志
connection_status: str # "healthy" 或 "error"
last_checked_at: str # ISO 时间戳
last_error: str # 最后一次错误消息
Torrent 模型
跟踪已下载的种子:
class Torrent(SQLModel, table=True):
id: int # 主键
name: str # 种子名称(有索引)
url: str # 种子/磁力链接 URL(唯一,有索引)
rss_id: int # 来源 RSS 订阅源 ID
bangumi_id: int # 关联的番剧 ID(可为空)
qb_hash: str # qBittorrent 信息哈希(有索引)
downloaded: bool = False # 下载完成
常用操作
BangumiDatabase
with Database() as db:
# 创建
db.bangumi.add(bangumi) # 单条插入
db.bangumi.add_all(bangumi_list) # 批量插入(去重)
# 读取
db.bangumi.search_all() # 所有记录(缓存,5分钟 TTL)
db.bangumi.search_id(123) # 按 ID 查询
db.bangumi.match_torrent("torrent name") # 按 title_raw 匹配查找
db.bangumi.not_complete() # 未完结系列
db.bangumi.get_needs_review() # 标记需要审核的
# 更新
db.bangumi.update(bangumi) # 更新单条记录
db.bangumi.update_all(bangumi_list) # 批量更新
# 删除
db.bangumi.delete_one(123) # 硬删除
db.bangumi.disable_rule(123) # 软删除(deleted=True)
RSSDatabase
with Database() as db:
# 创建
db.rss.add(rss_item) # 单条插入
db.rss.add_all(rss_items) # 批量插入(去重)
# 读取
db.rss.search_all() # 所有订阅源
db.rss.search_active() # 仅启用的订阅源
db.rss.search_aggregate() # 启用且 aggregate=True
# 更新
db.rss.update(id, rss_update) # 部分更新
db.rss.enable(id) # 启用订阅源
db.rss.disable(id) # 禁用订阅源
db.rss.enable_batch([1, 2, 3]) # 批量启用
db.rss.disable_batch([1, 2, 3]) # 批量禁用
TorrentDatabase
with Database() as db:
# 创建
db.torrent.add(torrent) # 单条插入
db.torrent.add_all(torrents) # 批量插入
# 读取
db.torrent.search_all() # 所有种子
db.torrent.search_by_qb_hash(hash) # 按 qBittorrent 哈希查询
db.torrent.search_by_url(url) # 按 URL 查询
db.torrent.check_new(torrents) # 过滤掉已存在的
# 更新
db.torrent.update_qb_hash(id, hash) # 设置 qb_hash
缓存
番剧缓存
search_all() 的结果在模块级别缓存,TTL 为 5 分钟:
# bangumi.py 中的模块级缓存
_bangumi_cache: list[Bangumi] | None = None
_bangumi_cache_time: float = 0
_BANGUMI_CACHE_TTL: float = 300.0 # 5 分钟
# 缓存失效
def _invalidate_bangumi_cache():
global _bangumi_cache, _bangumi_cache_time
_bangumi_cache = None
_bangumi_cache_time = 0
重要: 缓存在以下操作时自动失效:
add()、add_all()update()、update_all()delete_one()、delete_all()archive_one()、unarchive_one()- 任何 RSS 链接更新操作
会话分离
缓存的对象会从会话中分离,以防止 DetachedInstanceError:
for b in bangumis:
self.session.expunge(b) # 从会话中分离
迁移系统
Schema 版本控制
迁移通过 schema_version 表跟踪:
CURRENT_SCHEMA_VERSION = 7
# 每个迁移:(版本号, 描述, [SQL 语句])
MIGRATIONS = [
(1, "add air_weekday column", [...]),
(2, "add connection status columns", [...]),
(3, "create passkey table", [...]),
(4, "add archived column", [...]),
(5, "rename offset to episode_offset", [...]),
(6, "add qb_hash column", [...]),
(7, "add suggested offset columns", [...]),
]
添加新迁移
- 在
combine.py中增加CURRENT_SCHEMA_VERSION - 在
MIGRATIONS列表中添加迁移元组:
MIGRATIONS = [
# ... 现有迁移 ...
(
8,
"add my_new_column to bangumi",
[
"ALTER TABLE bangumi ADD COLUMN my_new_column TEXT DEFAULT NULL",
],
),
]
- 在
run_migrations()中添加幂等性检查:
if "bangumi" in tables and version == 8:
columns = [col["name"] for col in inspector.get_columns("bangumi")]
if "my_new_column" in columns:
needs_run = False
- 更新
module/models/中对应的 Pydantic 模型
默认值回填
迁移后,_fill_null_with_defaults() 会根据模型默认值自动填充 NULL 值:
# 如果模型定义为:
class Bangumi(SQLModel, table=True):
my_field: bool = False
# 那么现有记录中的 NULL 值将被更新为 False
性能模式
批量查询
add_all() 使用单个查询检查重复项,而不是 N 个查询:
# 高效:单个 SELECT
keys_to_check = [(d.title_raw, d.group_name) for d in datas]
conditions = [
and_(Bangumi.title_raw == tr, Bangumi.group_name == gn)
for tr, gn in keys_to_check
]
statement = select(Bangumi.title_raw, Bangumi.group_name).where(or_(*conditions))
正则表达式匹配
match_list() 为所有标题匹配编译单个正则表达式模式:
# 编译一次,匹配多次
sorted_titles = sorted(title_index.keys(), key=len, reverse=True)
pattern = "|".join(re.escape(title) for title in sorted_titles)
title_regex = re.compile(pattern)
# 每个种子 O(1) 查找而不是 O(n)
for torrent in torrent_list:
match = title_regex.search(torrent.name)
索引列
以下列具有索引以实现快速查找:
| 表 | 列 | 索引类型 |
|---|---|---|
bangumi |
title_raw |
普通 |
bangumi |
deleted |
普通 |
bangumi |
archived |
普通 |
rssitem |
url |
唯一 |
torrent |
name |
普通 |
torrent |
url |
唯一 |
torrent |
qb_hash |
普通 |
测试
测试数据库设置
测试使用内存中的 SQLite 数据库:
# conftest.py
@pytest.fixture
def db_engine():
engine = create_engine("sqlite:///:memory:")
SQLModel.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.fixture
def db_session(db_engine):
with Session(db_engine) as session:
yield session
工厂函数
使用工厂函数创建测试数据:
from test.factories import make_bangumi, make_torrent, make_rss_item
def test_bangumi_search():
bangumi = make_bangumi(title_raw="Test Title", season=2)
# ... 测试逻辑
设计说明
无外键
SQLite 外键强制默认是禁用的。关系(如 Torrent.bangumi_id)在应用程序逻辑中管理,而不是通过数据库约束。
软删除
Bangumi.deleted 标志启用软删除。面向用户的数据查询应按 deleted=False 过滤:
statement = select(Bangumi).where(Bangumi.deleted == false())
种子标记
种子在 qBittorrent 中使用 ab:{bangumi_id} 标记,用于重命名操作时的偏移查找。这使得无需数据库查询即可快速识别番剧。
常见问题
DetachedInstanceError
如果您从不同的会话访问缓存的对象:
# 错误:在新会话中访问缓存的对象
bangumis = db.bangumi.search_all() # 已缓存
with Database() as new_db:
new_db.session.add(bangumis[0]) # 错误!
# 正确:对象已分离,可独立工作
bangumis = db.bangumi.search_all()
bangumis[0].title_raw = "New Title" # 可以,但不会持久化
缓存过期
如果手动 SQL 更新绕过了 ORM,请使缓存失效:
from module.database.bangumi import _invalidate_bangumi_cache
with engine.connect() as conn:
conn.execute(text("UPDATE bangumi SET ..."))
conn.commit()
_invalidate_bangumi_cache() # 重要!