mirror of
https://github.com/jxxghp/MoviePilot.git
synced 2026-03-20 12:08:09 +08:00
615 lines
25 KiB
Python
615 lines
25 KiB
Python
import sys
|
|
import unittest
|
|
from pathlib import Path
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
sys.modules['app.helper.sites'] = MagicMock()
|
|
sys.modules['app.db.systemconfig_oper'] = MagicMock()
|
|
sys.modules['app.db.systemconfig_oper'].SystemConfigOper.return_value.get.return_value = None
|
|
|
|
from app import schemas
|
|
from app.chain.media import MediaChain, ScrapingOption
|
|
from app.core.context import MediaInfo
|
|
from app.core.event import Event
|
|
from app.core.metainfo import MetaInfo
|
|
from app.schemas.types import EventType, MediaType, ScrapingTarget, ScrapingMetadata, ScrapingPolicy
|
|
|
|
|
|
class TestMediaScrapingPaths(unittest.TestCase):
|
|
def setUp(self):
|
|
self.media_chain = MediaChain()
|
|
self.media_chain.storagechain = MagicMock()
|
|
|
|
def test_movie_file_nfo_path(self):
|
|
fileitem = schemas.FileItem(path="/movies/avatar.mkv", name="avatar.mkv", type="file", storage="local")
|
|
parent_item = schemas.FileItem(path="/movies", name="movies", type="dir", storage="local")
|
|
self.media_chain.storagechain.get_parent_item.return_value = parent_item
|
|
|
|
target_item, target_path = self.media_chain._get_target_fileitem_and_path(
|
|
current_fileitem=fileitem,
|
|
item_type=ScrapingTarget.MOVIE,
|
|
metadata_type=ScrapingMetadata.NFO
|
|
)
|
|
self.assertEqual(target_item, parent_item)
|
|
self.assertEqual(target_path, Path("/movies/avatar.nfo"))
|
|
|
|
def test_movie_dir_nfo_path(self):
|
|
fileitem = schemas.FileItem(path="/movies/Avatar (2009)", name="Avatar (2009)", type="dir", storage="local")
|
|
|
|
target_item, target_path = self.media_chain._get_target_fileitem_and_path(
|
|
current_fileitem=fileitem,
|
|
item_type=ScrapingTarget.MOVIE,
|
|
metadata_type=ScrapingMetadata.NFO
|
|
)
|
|
self.assertEqual(target_item, fileitem)
|
|
self.assertEqual(target_path, Path("/movies/Avatar (2009)/Avatar (2009).nfo"))
|
|
|
|
def test_tv_dir_nfo_path(self):
|
|
fileitem = schemas.FileItem(path="/tv/Show", name="Show", type="dir", storage="local")
|
|
target_item, target_path = self.media_chain._get_target_fileitem_and_path(
|
|
current_fileitem=fileitem,
|
|
item_type=ScrapingTarget.TV,
|
|
metadata_type=ScrapingMetadata.NFO
|
|
)
|
|
self.assertEqual(target_item, fileitem)
|
|
self.assertEqual(target_path, Path("/tv/Show/tvshow.nfo"))
|
|
|
|
def test_season_dir_nfo_path(self):
|
|
fileitem = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local")
|
|
target_item, target_path = self.media_chain._get_target_fileitem_and_path(
|
|
current_fileitem=fileitem,
|
|
item_type=ScrapingTarget.SEASON,
|
|
metadata_type=ScrapingMetadata.NFO
|
|
)
|
|
self.assertEqual(target_item, fileitem)
|
|
self.assertEqual(target_path, Path("/tv/Show/Season 1/season.nfo"))
|
|
|
|
def test_episode_file_nfo_path(self):
|
|
fileitem = schemas.FileItem(path="/tv/Show/Season 1/S01E01.mp4", name="S01E01.mp4", type="file", storage="local")
|
|
parent_item = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local")
|
|
self.media_chain.storagechain.get_parent_item.return_value = parent_item
|
|
target_item, target_path = self.media_chain._get_target_fileitem_and_path(
|
|
current_fileitem=fileitem,
|
|
item_type=ScrapingTarget.EPISODE,
|
|
metadata_type=ScrapingMetadata.NFO
|
|
)
|
|
self.assertEqual(target_item, parent_item)
|
|
self.assertEqual(target_path, Path("/tv/Show/Season 1/S01E01.nfo"))
|
|
|
|
|
|
class TestMediaScrapingNFO(unittest.TestCase):
|
|
def setUp(self):
|
|
self.media_chain = MediaChain()
|
|
self.media_chain.storagechain = MagicMock()
|
|
self.media_chain.metadata_nfo = MagicMock(return_value="<nfo></nfo>")
|
|
self.media_chain._save_file = MagicMock()
|
|
self.media_chain.scraping_policies = MagicMock()
|
|
|
|
self.fileitem = schemas.FileItem(path="/movies/Avatar (2009)", name="Avatar (2009)", type="dir", storage="local")
|
|
self.meta = MetaInfo("Avatar (2009)")
|
|
self.mediainfo = MediaInfo()
|
|
|
|
def test_scrape_nfo_off(self):
|
|
self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.SKIP)
|
|
self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE)
|
|
self.media_chain.metadata_nfo.assert_not_called()
|
|
self.media_chain._save_file.assert_not_called()
|
|
|
|
def test_scrape_nfo_on_exists_skip(self):
|
|
self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.MISSINGONLY)
|
|
# mock file exists
|
|
self.media_chain.storagechain.get_file_item.return_value = schemas.FileItem(path="/movies/Avatar (2009)/Avatar (2009).nfo", name="Avatar (2009).nfo", type="file", storage="local")
|
|
|
|
self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE)
|
|
self.media_chain.metadata_nfo.assert_not_called()
|
|
self.media_chain._save_file.assert_not_called()
|
|
|
|
def test_scrape_nfo_on_not_exists_scrape(self):
|
|
self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.MISSINGONLY)
|
|
# mock file not exists
|
|
self.media_chain.storagechain.get_file_item.return_value = None
|
|
|
|
self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE)
|
|
self.media_chain.metadata_nfo.assert_called_once()
|
|
self.media_chain._save_file.assert_called_once()
|
|
|
|
def test_scrape_nfo_overwrite_exists_scrape(self):
|
|
self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "nfo", ScrapingPolicy.OVERWRITE)
|
|
# mock file exists
|
|
self.media_chain.storagechain.get_file_item.return_value = schemas.FileItem(path="/movies/Avatar (2009)/Avatar (2009).nfo", name="Avatar (2009).nfo", type="file", storage="local")
|
|
|
|
self.media_chain._scrape_nfo_generic(self.fileitem, self.meta, self.mediainfo, ScrapingTarget.MOVIE)
|
|
self.media_chain.metadata_nfo.assert_called_once()
|
|
self.media_chain._save_file.assert_called_once()
|
|
|
|
|
|
class TestMediaScrapingImages(unittest.TestCase):
|
|
def setUp(self):
|
|
self.media_chain = MediaChain()
|
|
self.original_download = self.media_chain._download_and_save_image
|
|
self.media_chain.storagechain = MagicMock()
|
|
self.media_chain.metadata_img = MagicMock()
|
|
self.media_chain._download_and_save_image = MagicMock()
|
|
self.media_chain.scraping_policies = MagicMock()
|
|
|
|
def tearDown(self):
|
|
self.media_chain._download_and_save_image = self.original_download
|
|
|
|
def test_scrape_images_mapping(self):
|
|
fileitem = schemas.FileItem(path="/movies/Avatar", name="Avatar", type="dir", storage="local")
|
|
mediainfo = MediaInfo()
|
|
self.media_chain.metadata_img.return_value = {
|
|
"poster.jpg": "http://poster",
|
|
"fanart.jpg": "http://fanart",
|
|
"logo.png": "http://logo"
|
|
}
|
|
self.media_chain.scraping_policies.option.return_value = ScrapingOption("movie", "poster", ScrapingPolicy.OVERWRITE)
|
|
self.media_chain.storagechain.get_file_item.return_value = None
|
|
|
|
self.media_chain._scrape_images_generic(fileitem, mediainfo, ScrapingTarget.MOVIE)
|
|
|
|
# Check download called for mapped metadata
|
|
calls = self.media_chain._download_and_save_image.call_args_list
|
|
self.assertEqual(len(calls), 3)
|
|
urls = [call.kwargs["url"] for call in calls]
|
|
self.assertIn("http://poster", urls)
|
|
self.assertIn("http://fanart", urls)
|
|
self.assertIn("http://logo", urls)
|
|
|
|
def test_scrape_images_season_filter(self):
|
|
fileitem = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local")
|
|
mediainfo = MediaInfo()
|
|
self.media_chain.metadata_img.return_value = {
|
|
"season01-poster.jpg": "http://season01",
|
|
"season02-poster.jpg": "http://season02"
|
|
}
|
|
self.media_chain.scraping_policies.option.return_value = ScrapingOption("season", "poster", ScrapingPolicy.OVERWRITE)
|
|
self.media_chain.storagechain.get_file_item.return_value = None
|
|
|
|
self.media_chain._scrape_images_generic(fileitem, mediainfo, ScrapingTarget.SEASON, season_number=1)
|
|
|
|
calls = self.media_chain._download_and_save_image.call_args_list
|
|
self.assertEqual(len(calls), 1)
|
|
self.assertEqual(calls[0].kwargs["url"], "http://season01")
|
|
|
|
@patch("app.chain.media.RequestUtils")
|
|
@patch("app.chain.media.NamedTemporaryFile")
|
|
@patch("app.chain.media.Path.chmod")
|
|
@patch("app.chain.media.settings")
|
|
def test_download_and_save_image(self, mock_settings, mock_chmod, mock_temp_file, mock_request_utils):
|
|
# We need to test _download_and_save_image directly so we remove mock
|
|
self.media_chain = MediaChain()
|
|
self.media_chain._download_and_save_image = self.original_download
|
|
self.media_chain.storagechain = MagicMock()
|
|
|
|
fileitem = schemas.FileItem(path="/movies/Avatar", name="Avatar", type="dir", storage="local")
|
|
target_path = Path("/movies/Avatar/poster.jpg")
|
|
url = "http://poster"
|
|
|
|
# mock temp file
|
|
tmp_mock = MagicMock()
|
|
tmp_mock.name = "/tmp/mockfile"
|
|
mock_temp_file.return_value.__enter__.return_value = tmp_mock
|
|
|
|
# mock stream
|
|
mock_stream = MagicMock()
|
|
mock_stream.status_code = 200
|
|
mock_stream.iter_content.return_value = [b"data1", b"data2"]
|
|
|
|
mock_instance = mock_request_utils.return_value
|
|
mock_instance.get_stream.return_value.__enter__.return_value = mock_stream
|
|
|
|
self.media_chain.storagechain.upload_file.return_value = fileitem
|
|
|
|
self.media_chain._download_and_save_image(fileitem, target_path, url)
|
|
|
|
mock_request_utils.assert_called_with(proxies=mock_settings.PROXY, ua=mock_settings.NORMAL_USER_AGENT)
|
|
mock_instance.get_stream.assert_called_with(url=url)
|
|
tmp_mock.write.assert_any_call(b"data1")
|
|
tmp_mock.write.assert_any_call(b"data2")
|
|
mock_chmod.assert_called()
|
|
self.media_chain.storagechain.upload_file.assert_called_once()
|
|
call_args = self.media_chain.storagechain.upload_file.call_args.kwargs
|
|
self.assertEqual(call_args["fileitem"], fileitem)
|
|
self.assertEqual(call_args["new_name"], "poster.jpg")
|
|
|
|
|
|
class TestMediaScrapingTVDirectory(unittest.TestCase):
|
|
def setUp(self):
|
|
self.media_chain = MediaChain()
|
|
self.media_chain.storagechain = MagicMock()
|
|
self.media_chain._scrape_nfo_generic = MagicMock()
|
|
self.media_chain._scrape_images_generic = MagicMock()
|
|
|
|
@patch("app.chain.media.settings")
|
|
def test_initialize_tv_directory_specials(self, mock_settings):
|
|
# mock specials directory recognition
|
|
mock_settings.RENAME_FORMAT_S0_NAMES = ["Specials", "SPs"]
|
|
mock_settings.RMT_MEDIAEXT = [".mp4", ".mkv"]
|
|
|
|
fileitem = schemas.FileItem(path="/tv/Show/Specials", name="Specials", type="dir", storage="local")
|
|
meta = MetaInfo("Show")
|
|
mediainfo = MediaInfo(type=MediaType.TV)
|
|
self.media_chain.storagechain.list_files.return_value = []
|
|
|
|
self.media_chain._handle_tv_scraping(fileitem, meta, mediainfo, init_folder=True, parent=None, overwrite=False, recursive=True)
|
|
|
|
self.media_chain._scrape_nfo_generic.assert_called_with(
|
|
current_fileitem=fileitem,
|
|
meta=meta,
|
|
mediainfo=mediainfo,
|
|
item_type=ScrapingTarget.SEASON,
|
|
overwrite=False,
|
|
season_number=0
|
|
)
|
|
self.media_chain._scrape_images_generic.assert_called_with(
|
|
current_fileitem=fileitem,
|
|
mediainfo=mediainfo,
|
|
item_type=ScrapingTarget.SEASON,
|
|
parent_fileitem=None,
|
|
overwrite=False,
|
|
season_number=0
|
|
)
|
|
|
|
def test_initialize_tv_directory_season(self):
|
|
fileitem = schemas.FileItem(path="/tv/Show/Season 1", name="Season 1", type="dir", storage="local")
|
|
meta = MetaInfo("Show")
|
|
mediainfo = MediaInfo(type=MediaType.TV)
|
|
self.media_chain.storagechain.list_files.return_value = []
|
|
|
|
self.media_chain._handle_tv_scraping(fileitem, meta, mediainfo, init_folder=True, parent=None, overwrite=False, recursive=True)
|
|
|
|
self.media_chain._scrape_nfo_generic.assert_called_with(
|
|
current_fileitem=fileitem,
|
|
meta=meta,
|
|
mediainfo=mediainfo,
|
|
item_type=ScrapingTarget.SEASON,
|
|
overwrite=False,
|
|
season_number=1
|
|
)
|
|
|
|
|
|
class TestMediaScrapeEvents(unittest.TestCase):
|
|
def setUp(self):
|
|
self.media_chain = MediaChain()
|
|
|
|
@patch("app.chain.media.MediaChain.scrape_metadata")
|
|
@patch("app.chain.media.StorageChain.get_item")
|
|
@patch("app.chain.media.StorageChain.get_parent_item")
|
|
def test_scrape_metadata_event_file(
|
|
self, mock_get_parent, mock_get_item, mock_scrape_metadata
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local")
|
|
parent_item = schemas.FileItem(path="/movies", name="movies", type="dir", storage="local")
|
|
|
|
mock_get_item.return_value = fileitem
|
|
mock_get_parent.return_value = parent_item
|
|
|
|
mediainfo = MediaInfo()
|
|
event = Event(
|
|
event_type=EventType.MetadataScrape,
|
|
event_data={
|
|
"fileitem": fileitem,
|
|
"mediainfo": mediainfo,
|
|
"overwrite": True
|
|
}
|
|
)
|
|
|
|
self.media_chain.scrape_metadata_event(event)
|
|
|
|
mock_scrape_metadata.assert_called_once_with(
|
|
fileitem=fileitem,
|
|
mediainfo=mediainfo,
|
|
init_folder=False,
|
|
parent=parent_item,
|
|
overwrite=True
|
|
)
|
|
|
|
@patch("app.chain.media.MediaChain.scrape_metadata")
|
|
@patch("app.chain.media.StorageChain.get_item")
|
|
@patch("app.chain.media.StorageChain.is_bluray_folder")
|
|
def test_scrape_metadata_event_dir_bluray(
|
|
self, mock_is_bluray, mock_get_item, mock_scrape_metadata
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/bluray_movie", name="bluray_movie", type="dir", storage="local")
|
|
|
|
mock_get_item.return_value = fileitem
|
|
mock_is_bluray.return_value = True
|
|
|
|
mediainfo = MediaInfo()
|
|
event = Event(
|
|
event_type=EventType.MetadataScrape,
|
|
event_data={
|
|
"fileitem": fileitem,
|
|
"file_list": ["/movies/bluray_movie/BDMV/index.bdmv"],
|
|
"mediainfo": mediainfo,
|
|
"overwrite": False
|
|
}
|
|
)
|
|
|
|
self.media_chain.scrape_metadata_event(event)
|
|
|
|
mock_scrape_metadata.assert_called_once_with(
|
|
fileitem=fileitem,
|
|
mediainfo=mediainfo,
|
|
init_folder=True,
|
|
recursive=False,
|
|
overwrite=False
|
|
)
|
|
|
|
@patch("app.chain.media.MediaChain.scrape_metadata")
|
|
@patch("app.chain.media.StorageChain.get_item")
|
|
@patch("app.chain.media.StorageChain.is_bluray_folder")
|
|
@patch("app.chain.media.StorageChain.get_file_item")
|
|
def test_scrape_metadata_event_dir_with_filelist(
|
|
self, mock_get_file_item, mock_is_bluray, mock_get_item, mock_scrape_metadata
|
|
):
|
|
fileitem = schemas.FileItem(path="/tv/show", name="show", type="dir", storage="local")
|
|
|
|
mock_get_item.return_value = fileitem
|
|
mock_is_bluray.return_value = False
|
|
|
|
def side_effect_get_file_item(storage, path):
|
|
path_str = str(path)
|
|
return schemas.FileItem(path=path_str, name=Path(path_str).name, type="dir" if "." not in path_str else "file", storage="local")
|
|
|
|
mock_get_file_item.side_effect = side_effect_get_file_item
|
|
|
|
mediainfo = MediaInfo()
|
|
event = Event(
|
|
event_type=EventType.MetadataScrape,
|
|
event_data={
|
|
"fileitem": fileitem,
|
|
"file_list": ["/tv/show/Season 1/S01E01.mp4"],
|
|
"mediainfo": mediainfo,
|
|
"overwrite": True
|
|
}
|
|
)
|
|
|
|
self.media_chain.scrape_metadata_event(event)
|
|
|
|
calls = mock_scrape_metadata.call_args_list
|
|
self.assertEqual(len(calls), 3)
|
|
|
|
paths = [call.kwargs['fileitem'].path for call in calls]
|
|
self.assertIn("/tv/show", paths)
|
|
self.assertIn("/tv/show/Season 1", paths)
|
|
self.assertIn("/tv/show/Season 1/S01E01.mp4", paths)
|
|
|
|
@patch("app.chain.media.MediaChain.scrape_metadata")
|
|
@patch("app.chain.media.StorageChain.get_item")
|
|
def test_scrape_metadata_event_dir_full(
|
|
self, mock_get_item, mock_scrape_metadata
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/movie", name="movie", type="dir", storage="local")
|
|
|
|
mock_get_item.return_value = fileitem
|
|
|
|
mediainfo = MediaInfo()
|
|
meta = MetaInfo("movie")
|
|
event = Event(
|
|
event_type=EventType.MetadataScrape,
|
|
event_data={
|
|
"fileitem": fileitem,
|
|
"meta": meta,
|
|
"mediainfo": mediainfo,
|
|
"overwrite": True
|
|
}
|
|
)
|
|
|
|
self.media_chain.scrape_metadata_event(event)
|
|
|
|
mock_scrape_metadata.assert_called_once_with(
|
|
fileitem=fileitem,
|
|
meta=meta,
|
|
mediainfo=mediainfo,
|
|
init_folder=True,
|
|
overwrite=True
|
|
)
|
|
|
|
@patch("app.chain.media.MediaChain._handle_movie_scraping")
|
|
@patch("app.chain.media.MediaChain.recognize_by_meta")
|
|
def test_scrape_metadata_movie(
|
|
self, mock_recognize, mock_handle_movie
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local")
|
|
meta = MetaInfo("Movie")
|
|
mediainfo = MediaInfo(type=MediaType.MOVIE)
|
|
|
|
self.media_chain.scrape_metadata(
|
|
fileitem=fileitem,
|
|
meta=meta,
|
|
mediainfo=mediainfo,
|
|
init_folder=True,
|
|
overwrite=False,
|
|
recursive=True
|
|
)
|
|
|
|
mock_recognize.assert_not_called()
|
|
mock_handle_movie.assert_called_once_with(
|
|
fileitem=fileitem,
|
|
meta=meta,
|
|
mediainfo=mediainfo,
|
|
init_folder=True,
|
|
parent=None,
|
|
overwrite=False,
|
|
recursive=True
|
|
)
|
|
|
|
@patch("app.chain.media.MediaChain._handle_tv_scraping")
|
|
@patch("app.chain.media.MediaChain.recognize_by_meta")
|
|
def test_scrape_metadata_tv(
|
|
self, mock_recognize, mock_handle_tv
|
|
):
|
|
fileitem = schemas.FileItem(path="/tv/show", name="show", type="dir", storage="local")
|
|
meta = MetaInfo("Show")
|
|
mediainfo = MediaInfo(type=MediaType.TV)
|
|
|
|
self.media_chain.scrape_metadata(
|
|
fileitem=fileitem,
|
|
meta=meta,
|
|
mediainfo=mediainfo,
|
|
init_folder=True,
|
|
overwrite=False,
|
|
recursive=True
|
|
)
|
|
|
|
mock_handle_tv.assert_called_once_with(
|
|
fileitem=fileitem,
|
|
meta=meta,
|
|
mediainfo=mediainfo,
|
|
init_folder=True,
|
|
parent=None,
|
|
overwrite=False,
|
|
recursive=True
|
|
)
|
|
|
|
@patch("app.chain.media.MediaChain._handle_movie_scraping")
|
|
@patch("app.chain.media.MediaChain.recognize_by_meta")
|
|
def test_scrape_metadata_recognize_fallback(
|
|
self, mock_recognize, mock_handle_movie
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local")
|
|
mediainfo = MediaInfo(type=MediaType.MOVIE)
|
|
mock_recognize.return_value = mediainfo
|
|
|
|
self.media_chain.scrape_metadata(
|
|
fileitem=fileitem,
|
|
init_folder=True,
|
|
overwrite=False,
|
|
recursive=True
|
|
)
|
|
|
|
mock_recognize.assert_called_once()
|
|
mock_handle_movie.assert_called_once()
|
|
args, kwargs = mock_handle_movie.call_args
|
|
self.assertEqual(kwargs['mediainfo'], mediainfo)
|
|
self.assertEqual(kwargs['meta'].name, "Movie")
|
|
|
|
@patch("app.chain.media.MediaChain._handle_movie_scraping")
|
|
@patch("app.chain.media.MediaChain._handle_tv_scraping")
|
|
def test_scrape_metadata_invalid_extension(
|
|
self, mock_handle_tv, mock_handle_movie
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/movie.txt", name="movie.txt", type="file", storage="local")
|
|
|
|
self.media_chain.scrape_metadata(
|
|
fileitem=fileitem
|
|
)
|
|
|
|
mock_handle_movie.assert_not_called()
|
|
mock_handle_tv.assert_not_called()
|
|
|
|
@patch("app.chain.media.MediaChain.scrape_metadata")
|
|
@patch("app.chain.media.StorageChain.get_item")
|
|
@patch("app.chain.media.StorageChain.is_bluray_folder")
|
|
@patch("app.chain.media.StorageChain.get_file_item")
|
|
def test_scrape_metadata_event_dir_with_multiple_files(
|
|
self, mock_get_file_item, mock_is_bluray, mock_get_item, mock_scrape_metadata
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/collection", name="collection", type="dir", storage="local")
|
|
|
|
mock_get_item.return_value = fileitem
|
|
mock_is_bluray.return_value = False
|
|
|
|
def side_effect_get_file_item(storage, path):
|
|
path_str = str(path)
|
|
return schemas.FileItem(path=path_str, name=Path(path_str).name, type="dir" if "." not in path_str else "file", storage="local")
|
|
|
|
mock_get_file_item.side_effect = side_effect_get_file_item
|
|
|
|
mediainfo = MediaInfo()
|
|
event = Event(
|
|
event_type=EventType.MetadataScrape,
|
|
event_data={
|
|
"fileitem": fileitem,
|
|
"file_list": [
|
|
"/movies/collection/movie1.mp4",
|
|
"/movies/collection/movie2.mkv",
|
|
"/movies/collection/movie3.avi"
|
|
],
|
|
"mediainfo": mediainfo,
|
|
"overwrite": True
|
|
}
|
|
)
|
|
|
|
self.media_chain.scrape_metadata_event(event)
|
|
|
|
calls = mock_scrape_metadata.call_args_list
|
|
# Should scrape directory and then each file item
|
|
self.assertEqual(len(calls), 4)
|
|
|
|
paths = [call.kwargs['fileitem'].path for call in calls]
|
|
self.assertIn("/movies/collection", paths)
|
|
self.assertIn("/movies/collection/movie1.mp4", paths)
|
|
self.assertIn("/movies/collection/movie2.mkv", paths)
|
|
self.assertIn("/movies/collection/movie3.avi", paths)
|
|
|
|
@patch("app.chain.media.MediaChain.scrape_metadata")
|
|
@patch("app.chain.media.StorageChain.get_item")
|
|
@patch("app.chain.media.StorageChain.is_bluray_folder")
|
|
@patch("app.chain.media.StorageChain.get_file_item")
|
|
def test_scrape_metadata_event_dir_with_tv_multi_seasons_episodes(
|
|
self, mock_get_file_item, mock_is_bluray, mock_get_item, mock_scrape_metadata
|
|
):
|
|
fileitem = schemas.FileItem(path="/tv/MultiSeasonShow", name="MultiSeasonShow", type="dir", storage="local")
|
|
|
|
mock_get_item.return_value = fileitem
|
|
mock_is_bluray.return_value = False
|
|
|
|
def side_effect_get_file_item(storage, path):
|
|
path_str = str(path)
|
|
return schemas.FileItem(path=path_str, name=Path(path_str).name, type="dir" if "." not in path_str else "file", storage="local")
|
|
|
|
mock_get_file_item.side_effect = side_effect_get_file_item
|
|
|
|
mediainfo = MediaInfo()
|
|
event = Event(
|
|
event_type=EventType.MetadataScrape,
|
|
event_data={
|
|
"fileitem": fileitem,
|
|
"file_list": [
|
|
"/tv/MultiSeasonShow/Season 1/S01E01.mp4",
|
|
"/tv/MultiSeasonShow/Season 1/S01E02.mp4",
|
|
"/tv/MultiSeasonShow/Season 2/S02E01.mkv",
|
|
"/tv/MultiSeasonShow/Season 2/S02E02.mkv",
|
|
"/tv/MultiSeasonShow/Specials/S00E01.mp4"
|
|
],
|
|
"mediainfo": mediainfo,
|
|
"overwrite": False
|
|
}
|
|
)
|
|
|
|
self.media_chain.scrape_metadata_event(event)
|
|
|
|
calls = mock_scrape_metadata.call_args_list
|
|
# main dir + 3 season dirs + 5 episode files
|
|
self.assertEqual(len(calls), 9)
|
|
|
|
paths = [call.kwargs['fileitem'].path for call in calls]
|
|
self.assertIn("/tv/MultiSeasonShow", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Season 1", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Season 2", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Specials", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Season 1/S01E01.mp4", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Season 1/S01E02.mp4", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Season 2/S02E01.mkv", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Season 2/S02E02.mkv", paths)
|
|
self.assertIn("/tv/MultiSeasonShow/Specials/S00E01.mp4", paths)
|
|
|
|
@patch("app.chain.media.MediaChain.recognize_by_meta")
|
|
def test_scrape_metadata_recognize_fail(
|
|
self, mock_recognize
|
|
):
|
|
fileitem = schemas.FileItem(path="/movies/movie.mkv", name="movie.mkv", type="file", storage="local")
|
|
mock_recognize.return_value = None
|
|
|
|
with patch('app.chain.media.logger.warn') as mock_logger:
|
|
self.media_chain.scrape_metadata(
|
|
fileitem=fileitem
|
|
)
|
|
mock_logger.assert_called_with(f"{Path(fileitem.path)} 无法识别文件媒体信息!")
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|