mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-02-03 02:25:32 +08:00
109 lines
3.8 KiB
Python
109 lines
3.8 KiB
Python
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from sqlalchemy import Column, Integer, String, JSON
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db import db_query, db_update, get_id_column, async_db_query, Base
|
|
|
|
|
|
class MediaServerItem(Base):
|
|
"""
|
|
媒体服务器媒体条目表
|
|
"""
|
|
id = get_id_column()
|
|
# 服务器类型
|
|
server = Column(String)
|
|
# 媒体库ID
|
|
library = Column(String)
|
|
# ID
|
|
item_id = Column(String, index=True)
|
|
# 类型
|
|
item_type = Column(String)
|
|
# 标题
|
|
title = Column(String, index=True)
|
|
# 原标题
|
|
original_title = Column(String)
|
|
# 年份
|
|
year = Column(String)
|
|
# TMDBID
|
|
tmdbid = Column(Integer, index=True)
|
|
# IMDBID
|
|
imdbid = Column(String, index=True)
|
|
# TVDBID
|
|
tvdbid = Column(String, index=True)
|
|
# 路径
|
|
path = Column(String)
|
|
# 季集
|
|
seasoninfo = Column(JSON, default=dict)
|
|
# 备注
|
|
note = Column(JSON)
|
|
# 同步时间
|
|
lst_mod_date = Column(String, default=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
|
|
|
@classmethod
|
|
@db_query
|
|
def get_by_itemid(cls, db: Session, item_id: str):
|
|
return db.query(cls).filter(cls.item_id == item_id).first()
|
|
|
|
@classmethod
|
|
@db_update
|
|
def empty(cls, db: Session, server: Optional[str] = None):
|
|
if server is None:
|
|
db.query(cls).delete()
|
|
else:
|
|
db.query(cls).filter(cls.server == server).delete()
|
|
|
|
@classmethod
|
|
@db_query
|
|
def exist_by_tmdbid(cls, db: Session, tmdbid: int, mtype: str):
|
|
return db.query(cls).filter(cls.tmdbid == tmdbid,
|
|
cls.item_type == mtype).first()
|
|
|
|
@classmethod
|
|
@db_query
|
|
def exists_by_title(cls, db: Session, title: str, mtype: str, year: str):
|
|
if not mtype and not year:
|
|
return db.query(cls).filter(cls.title == title).first()
|
|
elif not year:
|
|
return db.query(cls).filter(cls.title == title,
|
|
cls.item_type == mtype).first()
|
|
elif not mtype:
|
|
return db.query(cls).filter(cls.title == title,
|
|
cls.year == str(year)).first()
|
|
return db.query(cls).filter(cls.title == title,
|
|
cls.item_type == mtype,
|
|
cls.year == str(year)).first()
|
|
|
|
@classmethod
|
|
@async_db_query
|
|
async def async_get_by_itemid(cls, db: AsyncSession, item_id: str):
|
|
result = await db.execute(select(cls).filter(cls.item_id == item_id))
|
|
return result.scalars().first()
|
|
|
|
@classmethod
|
|
@async_db_query
|
|
async def async_exist_by_tmdbid(cls, db: AsyncSession, tmdbid: int, mtype: str):
|
|
result = await db.execute(select(cls).filter(cls.tmdbid == tmdbid,
|
|
cls.item_type == mtype))
|
|
return result.scalars().first()
|
|
|
|
@classmethod
|
|
@async_db_query
|
|
async def async_exists_by_title(cls, db: AsyncSession, title: str, mtype: str, year: str):
|
|
if not mtype and not year:
|
|
result = await db.execute(select(cls).filter(cls.title == title))
|
|
elif not year:
|
|
result = await db.execute(select(cls).filter(cls.title == title,
|
|
cls.item_type == mtype))
|
|
elif not mtype:
|
|
result = await db.execute(select(cls).filter(cls.title == title,
|
|
cls.year == str(year)))
|
|
else:
|
|
result = await db.execute(select(cls).filter(cls.title == title,
|
|
cls.item_type == mtype,
|
|
cls.year == str(year)))
|
|
return result.scalars().first()
|