mirror of
https://github.com/xhongc/music-tag-web.git
synced 2026-02-03 02:03:35 +08:00
feature:新增批量自动修改功能
This commit is contained in:
26
applications/task/migrations/0002_taskrecord.py
Normal file
26
applications/task/migrations/0002_taskrecord.py
Normal file
@@ -0,0 +1,26 @@
|
||||
# Generated by Django 2.2.6 on 2023-07-11 09:35
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('task', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='TaskRecord',
|
||||
fields=[
|
||||
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('song_name', models.CharField(default='', max_length=255)),
|
||||
('artist_name', models.CharField(default='', max_length=255)),
|
||||
('full_path', models.CharField(default='', max_length=255)),
|
||||
('tag_source', models.CharField(default='', max_length=255)),
|
||||
('icon', models.CharField(default='icon-folder', max_length=255)),
|
||||
('state', models.CharField(default='wait', max_length=255)),
|
||||
('extra', models.TextField(default='')),
|
||||
],
|
||||
),
|
||||
]
|
||||
18
applications/task/migrations/0003_taskrecord_created_at.py
Normal file
18
applications/task/migrations/0003_taskrecord_created_at.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 2.2.6 on 2023-07-11 09:46
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('task', '0002_taskrecord'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='taskrecord',
|
||||
name='created_at',
|
||||
field=models.DateTimeField(auto_now_add=True, null=True),
|
||||
),
|
||||
]
|
||||
18
applications/task/migrations/0004_taskrecord_batch.py
Normal file
18
applications/task/migrations/0004_taskrecord_batch.py
Normal file
@@ -0,0 +1,18 @@
|
||||
# Generated by Django 2.2.6 on 2023-07-11 10:43
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('task', '0003_taskrecord_created_at'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='taskrecord',
|
||||
name='batch',
|
||||
field=models.CharField(default='', max_length=255),
|
||||
),
|
||||
]
|
||||
33
applications/task/migrations/0005_auto_20230711_1403.py
Normal file
33
applications/task/migrations/0005_auto_20230711_1403.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# Generated by Django 2.2.6 on 2023-07-11 14:03
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('task', '0004_taskrecord_batch'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RenameField(
|
||||
model_name='task',
|
||||
old_name='name',
|
||||
new_name='full_path',
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='task',
|
||||
name='filename',
|
||||
field=models.CharField(default='', max_length=255),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='task',
|
||||
name='parent_path',
|
||||
field=models.CharField(default='', max_length=255),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='task',
|
||||
name='state',
|
||||
field=models.CharField(default='wait', max_length=255),
|
||||
),
|
||||
]
|
||||
@@ -2,4 +2,19 @@ from django.db import models
|
||||
|
||||
|
||||
class Task(models.Model):
|
||||
name = models.CharField(max_length=255)
|
||||
full_path = models.CharField(max_length=255)
|
||||
state = models.CharField(max_length=255, default="wait")
|
||||
parent_path = models.CharField(max_length=255, default="")
|
||||
filename = models.CharField(max_length=255, default="")
|
||||
|
||||
|
||||
class TaskRecord(models.Model):
|
||||
song_name = models.CharField(max_length=255, default="")
|
||||
artist_name = models.CharField(max_length=255, default="")
|
||||
full_path = models.CharField(max_length=255, default="")
|
||||
tag_source = models.CharField(max_length=255, default="")
|
||||
icon = models.CharField(max_length=255, default="icon-folder")
|
||||
state = models.CharField(max_length=255, default="wait")
|
||||
extra = models.TextField(default="")
|
||||
created_at = models.DateTimeField(null=True, auto_now_add=True)
|
||||
batch = models.CharField(max_length=255, default="")
|
||||
|
||||
@@ -217,7 +217,6 @@ class QQMusicApi:
|
||||
def getQQMusicMatchSong(self, name):
|
||||
song_list = self.getQQMusicSearch(name)
|
||||
songs = self.formatList(song_list["data"])
|
||||
print(f"搜索到{len(songs)}条。")
|
||||
if len(songs) == 0:
|
||||
return []
|
||||
return songs
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
|
||||
import music_tag
|
||||
|
||||
from applications.task.models import Task
|
||||
from applications.utils.constant_template import ConstantTemplate
|
||||
from applications.utils.send import send
|
||||
|
||||
@@ -11,59 +12,73 @@ COPYRIGHT = "感谢您的聆听,music-tag-web打上标签。POW~"
|
||||
def update_music_info(music_id3_info, is_raw_thumbnail=False):
|
||||
for each in music_id3_info:
|
||||
f = music_tag.load_file(each["file_full_path"])
|
||||
base_filename = ".".join(each["filename"].split(".")[:-1])
|
||||
var_dict = {
|
||||
"title": f["title"].value,
|
||||
"artist": f["artist"].value,
|
||||
"album": f["album"].value,
|
||||
"filename": base_filename
|
||||
}
|
||||
if each.get("title", None):
|
||||
if "${" in each["title"]:
|
||||
f["title"] = ConstantTemplate(each["title"]).resolve_data(var_dict)
|
||||
else:
|
||||
f["title"] = each["title"]
|
||||
if each.get("artist", None):
|
||||
if "${" in each["artist"]:
|
||||
f["artist"] = ConstantTemplate(each["artist"]).resolve_data(var_dict)
|
||||
else:
|
||||
f["artist"] = each["artist"]
|
||||
if each.get("album", None):
|
||||
if "${" in each["album"]:
|
||||
f["album"] = ConstantTemplate(each["album"]).resolve_data(var_dict)
|
||||
else:
|
||||
f["album"] = each["album"]
|
||||
if each.get("genre", None):
|
||||
f["genre"] = each["genre"]
|
||||
if each.get("year", None):
|
||||
f["year"] = each["year"]
|
||||
if each.get("lyrics", None):
|
||||
if each["lyrics"]:
|
||||
if not each["lyrics"].endswith(COPYRIGHT):
|
||||
each["lyrics"] = each["lyrics"] + "\n" + COPYRIGHT
|
||||
save_music(f, each, is_raw_thumbnail)
|
||||
parent_path = os.path.dirname(each["file_full_path"])
|
||||
filename = os.path.basename(each["file_full_path"])
|
||||
Task.objects.update_or_create(full_path=each["file_full_path"], defaults={
|
||||
"state": "success",
|
||||
"parent_path": parent_path,
|
||||
"filename": filename
|
||||
})
|
||||
|
||||
f["lyrics"] = each["lyrics"]
|
||||
if each.get("is_save_lyrics_file", False):
|
||||
lyrics_file_path = f"{os.path.dirname(each['file_full_path'])}/{base_filename}.lrc"
|
||||
with open(lyrics_file_path, "w", encoding="utf-8") as f_lyc:
|
||||
f_lyc.write(each["lyrics"])
|
||||
|
||||
def save_music(f, each, is_raw_thumbnail):
|
||||
base_filename = ".".join(os.path.basename(f.filename).split(".")[:-1])
|
||||
file_ext = os.path.basename(f.filename).split(".")[-1]
|
||||
var_dict = {
|
||||
"title": f["title"].value,
|
||||
"artist": f["artist"].value,
|
||||
"album": f["album"].value,
|
||||
"filename": base_filename
|
||||
}
|
||||
if each.get("title", None):
|
||||
if "${" in each["title"]:
|
||||
f["title"] = ConstantTemplate(each["title"]).resolve_data(var_dict)
|
||||
else:
|
||||
if each.get("is_save_lyrics_file", False):
|
||||
lyrics_file_path = f"{os.path.dirname(each['file_full_path'])}/{base_filename}.lrc"
|
||||
if not os.path.exists(lyrics_file_path):
|
||||
with open(lyrics_file_path, "w", encoding="utf-8") as f_lyc2:
|
||||
f_lyc2.write(f["lyrics"].value)
|
||||
if each.get("comment", None):
|
||||
f["comment"] = each["comment"]
|
||||
if each.get("album_img", None):
|
||||
f["title"] = each["title"]
|
||||
if each.get("artist", None):
|
||||
if "${" in each["artist"]:
|
||||
f["artist"] = ConstantTemplate(each["artist"]).resolve_data(var_dict)
|
||||
else:
|
||||
f["artist"] = each["artist"]
|
||||
if each.get("album", None):
|
||||
if "${" in each["album"]:
|
||||
f["album"] = ConstantTemplate(each["album"]).resolve_data(var_dict)
|
||||
else:
|
||||
f["album"] = each["album"]
|
||||
if each.get("genre", None):
|
||||
f["genre"] = each["genre"]
|
||||
if each.get("year", None):
|
||||
f["year"] = each["year"]
|
||||
if each.get("lyrics", None):
|
||||
f["lyrics"] = each["lyrics"]
|
||||
if each.get("is_save_lyrics_file", False):
|
||||
lyrics_file_path = f"{os.path.dirname(each['file_full_path'])}/{base_filename}.lrc"
|
||||
with open(lyrics_file_path, "w", encoding="utf-8") as f_lyc:
|
||||
f_lyc.write(each["lyrics"])
|
||||
else:
|
||||
if each.get("is_save_lyrics_file", False):
|
||||
lyrics_file_path = f"{os.path.dirname(each['file_full_path'])}/{base_filename}.lrc"
|
||||
if not os.path.exists(lyrics_file_path):
|
||||
with open(lyrics_file_path, "w", encoding="utf-8") as f_lyc2:
|
||||
f_lyc2.write(f["lyrics"].value)
|
||||
if each.get("comment", None):
|
||||
f["comment"] = each["comment"]
|
||||
if each.get("album_img", None):
|
||||
try:
|
||||
img_data = send().GET(each["album_img"])
|
||||
if img_data.status_code == 200:
|
||||
f['artwork'] = img_data.content
|
||||
if is_raw_thumbnail:
|
||||
f['artwork'] = f['artwork'].first.raw_thumbnail([128, 128])
|
||||
f.save()
|
||||
if each.get("filename", None):
|
||||
if "${" in each["filename"]:
|
||||
each["filename"] = ConstantTemplate(each["filename"]).resolve_data(var_dict)
|
||||
parent_path = os.path.dirname(each["file_full_path"])
|
||||
os.rename(each["file_full_path"], f"{parent_path}/{each['filename']}")
|
||||
except Exception:
|
||||
pass
|
||||
f.save()
|
||||
# 重命名文件名称
|
||||
if each.get("filename", None):
|
||||
if "${" in each["filename"]:
|
||||
each["filename"] = ConstantTemplate(each["filename"]).resolve_data(var_dict)
|
||||
if not each["filename"].endswith(file_ext):
|
||||
each["filename"] = f"{each['filename']}.{file_ext}"
|
||||
parent_path = os.path.dirname(each["file_full_path"])
|
||||
os.rename(each["file_full_path"], f"{parent_path}/{each['filename']}")
|
||||
|
||||
@@ -8,8 +8,10 @@ from django.db import transaction
|
||||
|
||||
from applications.music.models import Folder, Track, Album, Genre, Artist, Attachment
|
||||
from applications.subsonic.constants import AUDIO_EXTENSIONS_AND_MIMETYPE, COVER_TYPE
|
||||
from applications.task.models import TaskRecord, Task
|
||||
from applications.task.services.music_resource import MusicResource
|
||||
from applications.task.services.scan_utils import ScanMusic
|
||||
from applications.task.utils import folder_update_time, exists_dir
|
||||
from applications.task.utils import folder_update_time, exists_dir, match_song
|
||||
from django_vue_cli.celery_app import app
|
||||
|
||||
|
||||
@@ -234,3 +236,54 @@ def clear_music():
|
||||
Genre.objects.all().delete()
|
||||
Artist.objects.all().delete()
|
||||
Attachment.objects.all().delete()
|
||||
|
||||
|
||||
def batch_auto_tag_task(batch, source_list, select_mode):
|
||||
"""
|
||||
source_list: ["migu", "qmusic", "netease"]
|
||||
"""
|
||||
folder_list = TaskRecord.objects.filter(batch=batch, icon="icon-folder").all()
|
||||
for folder in folder_list:
|
||||
data = os.scandir(folder.full_path)
|
||||
allow_type = ["flac", "mp3", "ape", "wav", "aiff", "wv", "tta", "mp4", "m4a", "ogg", "mpc",
|
||||
"opus", "wma", "dsf", "dff"]
|
||||
bulk_set = []
|
||||
for entry in data:
|
||||
each = entry.name
|
||||
file_type = each.split(".")[-1]
|
||||
file_name = ".".join(each.split(".")[:-1])
|
||||
if file_type not in allow_type:
|
||||
continue
|
||||
bulk_set.append(TaskRecord(**{
|
||||
"batch": batch,
|
||||
"song_name": file_name,
|
||||
"full_path": f"{folder.full_path}/{each}",
|
||||
"icon": "icon-music",
|
||||
|
||||
}))
|
||||
TaskRecord.objects.bulk_create(bulk_set)
|
||||
task_list = TaskRecord.objects.filter(batch=batch).exclude(icon="icon-folder").all()
|
||||
for task in task_list:
|
||||
is_match = False
|
||||
for resource in source_list:
|
||||
print("开始匹配", resource)
|
||||
is_match = match_song(resource, task.full_path, select_mode)
|
||||
if is_match:
|
||||
task.state = "success"
|
||||
task.save()
|
||||
parent_path = os.path.dirname(task.full_path)
|
||||
Task.objects.update_or_create(full_path=task.full_path, defaults={
|
||||
"state": task.state,
|
||||
"parent_path": parent_path,
|
||||
"filename": os.path.basename(task.full_path)
|
||||
})
|
||||
break
|
||||
if not is_match:
|
||||
task.state = "failed"
|
||||
task.save()
|
||||
parent_path = os.path.dirname(task.full_path)
|
||||
Task.objects.update_or_create(full_path=task.full_path, defaults={
|
||||
"state": task.state,
|
||||
"parent_path": parent_path,
|
||||
"filename": os.path.basename(task.full_path)
|
||||
})
|
||||
|
||||
@@ -3,6 +3,10 @@ import datetime
|
||||
import os
|
||||
import time
|
||||
|
||||
import music_tag
|
||||
|
||||
from applications.task.services.update_ids import save_music
|
||||
|
||||
|
||||
def timestamp_to_dt(timestamp, format_type="%Y-%m-%d %H:%M:%S"):
|
||||
# 转换成localtime
|
||||
@@ -23,3 +27,61 @@ def exists_dir(dir_list):
|
||||
if os.path.isdir(_dir):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def match_song(resource, song_path, select_mode):
|
||||
from applications.task.services.music_resource import MusicResource
|
||||
|
||||
file = music_tag.load_file(song_path)
|
||||
file_name = song_path.split("/")[-1]
|
||||
file_title = file_name.split('.')[0]
|
||||
title = file["title"].value or file_title
|
||||
artist = file["artist"].value or ""
|
||||
album = file["album"].value or ""
|
||||
|
||||
songs = MusicResource(resource).fetch_id3_by_title(title)
|
||||
|
||||
is_match = False
|
||||
song_select = None
|
||||
for song in songs:
|
||||
if title == song["name"]:
|
||||
if select_mode == "simple":
|
||||
is_match = True
|
||||
song_select = song
|
||||
break
|
||||
else:
|
||||
if artist and (artist == song["artist"] or artist in song["artist"] or song["artist"] in artist):
|
||||
is_match = True
|
||||
song_select = song
|
||||
break
|
||||
elif album and (album == song["album"] or album in song["album"] or song["album"] in album):
|
||||
is_match = True
|
||||
song_select = song
|
||||
break
|
||||
elif title in song["name"]:
|
||||
if artist and (artist == song["artist"] or artist in song["artist"] or song["artist"] in artist):
|
||||
is_match = True
|
||||
song_select = song
|
||||
break
|
||||
elif album and (album == song["album"] or album in song["album"] or song["album"] in album):
|
||||
is_match = True
|
||||
song_select = song
|
||||
break
|
||||
elif song["name"] in title:
|
||||
if artist and (artist == song["artist"] or artist in song["artist"] or song["artist"] in artist):
|
||||
is_match = True
|
||||
song_select = song
|
||||
break
|
||||
elif album and (album == song["album"] or album in song["album"] or song["album"] in album):
|
||||
is_match = True
|
||||
song_select = song
|
||||
break
|
||||
else:
|
||||
continue
|
||||
if is_match:
|
||||
print(f"{title}>>>{song_select['name']}")
|
||||
song_select["filename"] = file_name
|
||||
song_select["file_full_path"] = song_path
|
||||
song_select["lyrics"] = MusicResource(resource).fetch_lyric(song_select["id"])
|
||||
save_music(file, song_select, False)
|
||||
return is_match
|
||||
|
||||
@@ -1,17 +1,19 @@
|
||||
import base64
|
||||
import copy
|
||||
import os
|
||||
import time
|
||||
|
||||
import music_tag
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.views.decorators.gzip import gzip_page
|
||||
from rest_framework.decorators import action
|
||||
|
||||
from applications.task.models import TaskRecord, Task
|
||||
from applications.task.serialziers import FileListSerializer, Id3Serializer, UpdateId3Serializer, \
|
||||
FetchId3ByTitleSerializer, FetchLlyricSerializer, BatchUpdateId3Serializer
|
||||
from applications.task.services.music_resource import MusicResource
|
||||
from applications.task.services.update_ids import update_music_info
|
||||
from applications.task.tasks import full_scan_folder, scan, clear_music
|
||||
from applications.task.tasks import full_scan_folder, scan, clear_music, batch_auto_tag_task
|
||||
from component.drf.viewsets import GenericViewSet
|
||||
from django_vue_cli.celery_app import app as celery_app
|
||||
|
||||
@@ -29,7 +31,7 @@ class TaskViewSets(GenericViewSet):
|
||||
return FetchId3ByTitleSerializer
|
||||
elif self.action == "fetch_lyric":
|
||||
return FetchLlyricSerializer
|
||||
elif self.action == "batch_update_id3":
|
||||
elif self.action in ["batch_update_id3", "batch_auto_update_id3"]:
|
||||
return BatchUpdateId3Serializer
|
||||
return FileListSerializer
|
||||
|
||||
@@ -47,8 +49,19 @@ class TaskViewSets(GenericViewSet):
|
||||
allow_type = ["flac", "mp3", "ape", "wav", "aiff", "wv", "tta", "mp4", "m4a", "ogg", "mpc",
|
||||
"opus", "wma", "dsf", "dff"]
|
||||
frc_map = {}
|
||||
for index, entry in enumerate(data, 1):
|
||||
file_data = []
|
||||
full_path_list = []
|
||||
for entry in data:
|
||||
each = entry.name
|
||||
file_data.append(each)
|
||||
full_path_list.append(f"{file_path}/{each}")
|
||||
file_type = each.split(".")[-1]
|
||||
file_name = ".".join(each.split(".")[:-1])
|
||||
if file_type in ["lrc", "txt"]:
|
||||
frc_map[file_name] = each
|
||||
task_map = dict(Task.objects.filter(parent_path=file_path).values_list("filename", "state"))
|
||||
for index, entry in enumerate(file_data, 1):
|
||||
each = entry
|
||||
file_type = each.split(".")[-1]
|
||||
file_name = ".".join(each.split(".")[:-1])
|
||||
if os.path.isdir(f"{file_path}/{each}"):
|
||||
@@ -57,11 +70,10 @@ class TaskViewSets(GenericViewSet):
|
||||
"name": each,
|
||||
"title": each,
|
||||
"icon": "icon-folder",
|
||||
"state": "null",
|
||||
"children": []
|
||||
})
|
||||
continue
|
||||
if file_type in ["lrc", "txt"]:
|
||||
frc_map[file_name] = each
|
||||
if file_type not in allow_type:
|
||||
continue
|
||||
if file_name in frc_map:
|
||||
@@ -72,7 +84,8 @@ class TaskViewSets(GenericViewSet):
|
||||
"id": index,
|
||||
"name": each,
|
||||
"title": each,
|
||||
"icon": icon
|
||||
"icon": icon,
|
||||
"state": task_map.get(each, "null")
|
||||
})
|
||||
res_data = [
|
||||
{
|
||||
@@ -155,12 +168,34 @@ class TaskViewSets(GenericViewSet):
|
||||
else:
|
||||
music_info.update({
|
||||
"file_full_path": f"{full_path}/{data.get('name')}",
|
||||
"filename": data.get('name')
|
||||
})
|
||||
music_id3_info.append(copy.deepcopy(music_info))
|
||||
update_music_info(music_id3_info)
|
||||
return self.success_response()
|
||||
|
||||
@action(methods=['POST'], detail=False)
|
||||
def batch_auto_update_id3(self, request, *args, **kwargs):
|
||||
validate_data = self.is_validated_data(request.data)
|
||||
full_path = validate_data['file_full_path']
|
||||
select_data = validate_data['select_data']
|
||||
music_info = validate_data['music_info']
|
||||
select_mode = music_info["select_mode"]
|
||||
source_list = music_info.get("source_list", [])
|
||||
timestamp = str(int(time.time() * 1000))
|
||||
bulk_set = []
|
||||
for each in select_data:
|
||||
name = each.get("name")
|
||||
song_name = ".".join(name.split(".")[:-1])
|
||||
bulk_set.append(TaskRecord(**{
|
||||
"song_name": song_name,
|
||||
"full_path": f"{full_path}/{name}",
|
||||
"icon": each.get("icon"),
|
||||
"batch": timestamp
|
||||
}))
|
||||
TaskRecord.objects.bulk_create(bulk_set, batch_size=500)
|
||||
batch_auto_tag_task(timestamp, source_list, select_mode)
|
||||
return self.success_response()
|
||||
|
||||
@action(methods=['POST'], detail=False)
|
||||
def fetch_lyric(self, request, *args, **kwargs):
|
||||
validate_data = self.is_validated_data(request.data)
|
||||
|
||||
Reference in New Issue
Block a user