mirror of
https://github.com/xhongc/music-tag-web.git
synced 2026-04-25 03:00:54 +08:00
feature:支持音乐指纹识别,即使没有元数据也可以识别音乐
This commit is contained in:
@@ -12,6 +12,7 @@ DFF等音频格式。
|
||||
- 该版本是我自用的小工具,如果你也有这个需求,可以使用。欢迎提出issues,我会满足你的需求,在我的能力范围内。
|
||||
- 支持音乐标签来源 网易云音乐,QQ音乐,咪咕音乐, 酷狗音乐, 酷我音乐
|
||||
- 支持批量自动修改音乐标签
|
||||
- 支持音乐指纹识别,即使没有元数据也可以识别音乐
|
||||
# 🦀 Show Project
|
||||
|
||||
[【音乐标签Web|Music Tag Web】](http://42.193.218.103:8002/#/)
|
||||
|
||||
@@ -37,6 +37,7 @@ class BatchUpdateId3Serializer(serializers.Serializer):
|
||||
class FetchId3ByTitleSerializer(serializers.Serializer):
|
||||
title = serializers.CharField(required=True)
|
||||
resource = serializers.CharField(required=True)
|
||||
full_path = serializers.CharField(required=False, allow_null=True, allow_blank=True)
|
||||
|
||||
|
||||
class FetchLlyricSerializer(serializers.Serializer):
|
||||
|
||||
22
applications/task/services/acoust.py
Normal file
22
applications/task/services/acoust.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from component.mz.run import get_acoustid
|
||||
|
||||
|
||||
class AcoustidClient:
|
||||
def fetch_id3_by_title(self, title):
|
||||
songs = []
|
||||
res = get_acoustid(title)
|
||||
for each in res:
|
||||
songs.append({
|
||||
"id": each[1],
|
||||
"name": each[2],
|
||||
"artist": each[3],
|
||||
"artist_id": "",
|
||||
"album": each[4],
|
||||
"album_id": "",
|
||||
"album_img": "",
|
||||
"year": "",
|
||||
})
|
||||
return songs
|
||||
|
||||
def fetch_lyric(self, song_id):
|
||||
raise Exception("暂不支持该音乐平台")
|
||||
@@ -1,6 +1,7 @@
|
||||
import requests
|
||||
import base64
|
||||
|
||||
from applications.task.services.acoust import AcoustidClient
|
||||
from applications.task.services.kugou import KugouClient
|
||||
from applications.task.services.kuwo import KuwoClient
|
||||
from applications.task.services.qm import QQMusicApi
|
||||
@@ -23,6 +24,8 @@ class MusicResource:
|
||||
return KugouClient()
|
||||
elif info == "kuwo":
|
||||
return KuwoClient()
|
||||
elif info == "acoustid":
|
||||
return AcoustidClient()
|
||||
raise Exception("暂不支持该音乐平台")
|
||||
|
||||
def fetch_lyric(self, song_id):
|
||||
|
||||
@@ -211,8 +211,11 @@ class TaskViewSets(GenericViewSet):
|
||||
def fetch_id3_by_title(self, request, *args, **kwargs):
|
||||
validate_data = self.is_validated_data(request.data)
|
||||
resource = validate_data["resource"]
|
||||
|
||||
full_path = validate_data.get("full_path", "")
|
||||
title = validate_data["title"]
|
||||
|
||||
if resource == "acoustid":
|
||||
title = full_path
|
||||
songs = MusicResource(resource).fetch_id3_by_title(title)
|
||||
return self.success_response(data=songs)
|
||||
|
||||
|
||||
0
component/mz/__init__.py
Normal file
0
component/mz/__init__.py
Normal file
491
component/mz/acoustid.py
Normal file
491
component/mz/acoustid.py
Normal file
@@ -0,0 +1,491 @@
|
||||
# This file is part of pyacoustid.
|
||||
# Copyright 2014, Adrian Sampson.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining
|
||||
# a copy of this software and associated documentation files (the
|
||||
# "Software"), to deal in the Software without restriction, including
|
||||
# without limitation the rights to use, copy, modify, merge, publish,
|
||||
# distribute, sublicense, and/or sell copies of the Software, and to
|
||||
# permit persons to whom the Software is furnished to do so, subject to
|
||||
# the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
from __future__ import division
|
||||
from __future__ import absolute_import
|
||||
from typing import List
|
||||
|
||||
import os
|
||||
import json
|
||||
import requests
|
||||
import contextlib
|
||||
import errno
|
||||
|
||||
try:
|
||||
import audioread
|
||||
|
||||
have_audioread = True
|
||||
except ImportError:
|
||||
have_audioread = False
|
||||
try:
|
||||
import chromaprint
|
||||
|
||||
have_chromaprint = True
|
||||
except ImportError:
|
||||
have_chromaprint = False
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
import gzip
|
||||
from io import BytesIO
|
||||
|
||||
API_BASE_URL = 'http://api.acoustid.org/v2/'
|
||||
DEFAULT_META = ['recordings', 'releasegroups']
|
||||
REQUEST_INTERVAL = 0.33 # 3 requests/second.
|
||||
MAX_AUDIO_LENGTH = 120 # Seconds.
|
||||
FPCALC_COMMAND = 'fpcalc'
|
||||
FPCALC_ENVVAR = 'FPCALC'
|
||||
MAX_BIT_ERROR = 2 # comparison settings
|
||||
MAX_ALIGN_OFFSET = 120
|
||||
|
||||
|
||||
# Exceptions.
|
||||
|
||||
class AcoustidError(Exception):
|
||||
"""Base for exceptions in this module."""
|
||||
|
||||
|
||||
class FingerprintGenerationError(AcoustidError):
|
||||
"""The audio could not be fingerprinted."""
|
||||
|
||||
|
||||
class NoBackendError(FingerprintGenerationError):
|
||||
"""The audio could not be fingerprinted because neither the
|
||||
Chromaprint library nor the fpcalc command-line tool is installed.
|
||||
"""
|
||||
|
||||
|
||||
class FingerprintSubmissionError(AcoustidError):
|
||||
"""Missing required data for a fingerprint submission."""
|
||||
|
||||
|
||||
class WebServiceError(AcoustidError):
|
||||
"""The Web service request failed. The field ``message`` contains a
|
||||
description of the error. If this is an error that was specifically
|
||||
sent by the acoustid server, then the ``code`` field contains the
|
||||
acoustid error code.
|
||||
"""
|
||||
|
||||
def __init__(self, message, response=None):
|
||||
"""Create an error for the given HTTP response body, if
|
||||
provided, with the ``message`` as a fallback.
|
||||
"""
|
||||
if response:
|
||||
# Try to parse the JSON error response.
|
||||
try:
|
||||
data = json.loads(response)
|
||||
except ValueError:
|
||||
pass
|
||||
else:
|
||||
if isinstance(data.get('error'), dict):
|
||||
error = data['error']
|
||||
if 'message' in error:
|
||||
message = error['message']
|
||||
if 'code' in error:
|
||||
self.code = error['code']
|
||||
|
||||
super(WebServiceError, self).__init__(message)
|
||||
self.message = message
|
||||
|
||||
|
||||
# Endpoint configuration.
|
||||
|
||||
def set_base_url(url):
|
||||
"""Set the URL of the API server to query."""
|
||||
if not url.endswith('/'):
|
||||
url += '/'
|
||||
global API_BASE_URL
|
||||
API_BASE_URL = url
|
||||
|
||||
|
||||
def _get_lookup_url():
|
||||
"""Get the URL of the lookup API endpoint."""
|
||||
return API_BASE_URL + 'lookup'
|
||||
|
||||
|
||||
def _get_submit_url():
|
||||
"""Get the URL of the submission API endpoint."""
|
||||
return API_BASE_URL + 'submit'
|
||||
|
||||
|
||||
def _get_submission_status_url():
|
||||
"""Get the URL of the submission status API endpoint."""
|
||||
return API_BASE_URL + 'submission_status'
|
||||
|
||||
|
||||
# Compressed HTTP request bodies.
|
||||
|
||||
def _compress(data):
|
||||
"""Compress a bytestring to a gzip archive."""
|
||||
sio = BytesIO()
|
||||
with contextlib.closing(gzip.GzipFile(fileobj=sio, mode='wb')) as f:
|
||||
f.write(data)
|
||||
return sio.getvalue()
|
||||
|
||||
|
||||
class CompressedHTTPAdapter(requests.adapters.HTTPAdapter):
|
||||
"""An `HTTPAdapter` that compresses request bodies with gzip. The
|
||||
Content-Encoding header is set accordingly.
|
||||
"""
|
||||
|
||||
def add_headers(self, request, **kwargs):
|
||||
body = request.body
|
||||
if not isinstance(body, bytes):
|
||||
body = body.encode('utf8')
|
||||
request.prepare_body(_compress(body), None)
|
||||
request.headers['Content-Encoding'] = 'gzip'
|
||||
|
||||
|
||||
# Utilities.
|
||||
|
||||
class _rate_limit(object): # noqa: N801
|
||||
"""A decorator that limits the rate at which the function may be
|
||||
called. The rate is controlled by the REQUEST_INTERVAL module-level
|
||||
constant; set the value to zero to disable rate limiting. The
|
||||
limiting is thread-safe; only one thread may be in the function at a
|
||||
time (acts like a monitor in this sense).
|
||||
"""
|
||||
|
||||
def __init__(self, fun):
|
||||
self.fun = fun
|
||||
self.last_call = 0.0
|
||||
self.lock = threading.Lock()
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
with self.lock:
|
||||
# Wait until request_rate time has passed since last_call,
|
||||
# then update last_call.
|
||||
since_last_call = time.time() - self.last_call
|
||||
if since_last_call < REQUEST_INTERVAL:
|
||||
time.sleep(REQUEST_INTERVAL - since_last_call)
|
||||
self.last_call = time.time()
|
||||
|
||||
# Call the original function.
|
||||
return self.fun(*args, **kwargs)
|
||||
|
||||
|
||||
@_rate_limit
|
||||
def _api_request(url, params, timeout=None):
|
||||
"""Makes a POST request for the URL with the given form parameters,
|
||||
which are encoded as compressed form data, and returns a parsed JSON
|
||||
response. May raise a WebServiceError if the request fails.
|
||||
If the specified timeout passes, then raises a TimeoutError.
|
||||
"""
|
||||
headers = {
|
||||
'Accept-Encoding': 'gzip',
|
||||
"Content-Type": "application/x-www-form-urlencoded"
|
||||
}
|
||||
|
||||
with requests.Session() as session:
|
||||
session.mount('http://', CompressedHTTPAdapter())
|
||||
try:
|
||||
if isinstance(params.get('meta'), list):
|
||||
params['meta'] = ' '.join(params['meta'])
|
||||
response = session.post(url,
|
||||
data=params,
|
||||
headers=headers,
|
||||
timeout=timeout)
|
||||
except requests.exceptions.RequestException as exc:
|
||||
raise WebServiceError("HTTP request failed: {0}".format(exc))
|
||||
except requests.exceptions.ReadTimeout:
|
||||
raise WebServiceError(
|
||||
"HTTP request timed out ({0}s)".format(timeout)
|
||||
)
|
||||
|
||||
try:
|
||||
return response.json()
|
||||
except ValueError:
|
||||
raise WebServiceError('response is not valid JSON')
|
||||
|
||||
|
||||
# Main API.
|
||||
|
||||
def fingerprint(samplerate, channels, pcmiter, maxlength=MAX_AUDIO_LENGTH):
|
||||
"""Fingerprint audio data given its sample rate and number of
|
||||
channels. pcmiter should be an iterable containing blocks of PCM
|
||||
data as byte strings. Raises a FingerprintGenerationError if
|
||||
anything goes wrong.
|
||||
"""
|
||||
# Maximum number of samples to decode.
|
||||
endposition = samplerate * channels * maxlength
|
||||
|
||||
try:
|
||||
fper = chromaprint.Fingerprinter()
|
||||
fper.start(samplerate, channels)
|
||||
|
||||
position = 0 # Samples of audio fed to the fingerprinter.
|
||||
for block in pcmiter:
|
||||
fper.feed(block)
|
||||
position += len(block) // 2 # 2 bytes/sample.
|
||||
if position >= endposition:
|
||||
break
|
||||
|
||||
return fper.finish()
|
||||
except chromaprint.FingerprintError:
|
||||
raise FingerprintGenerationError("fingerprint calculation failed")
|
||||
|
||||
|
||||
def lookup(apikey, fingerprint, duration, meta=DEFAULT_META, timeout=None):
|
||||
"""Look up a fingerprint with the Acoustid Web service. Returns the
|
||||
Python object reflecting the response JSON data. To get more data
|
||||
back, ``meta`` can be a list of keywords from this list: recordings,
|
||||
recordingids, releases, releaseids, releasegroups, releasegroupids,
|
||||
tracks, compress, usermeta, sources.
|
||||
"""
|
||||
params = {
|
||||
'format': 'json',
|
||||
'client': apikey,
|
||||
'duration': int(duration),
|
||||
'fingerprint': fingerprint,
|
||||
'meta': meta,
|
||||
}
|
||||
return _api_request(_get_lookup_url(), params, timeout)
|
||||
|
||||
|
||||
def parse_lookup_result(data):
|
||||
"""Given a parsed JSON response, generate tuples containing the match
|
||||
score, the MusicBrainz recording ID, the title of the recording, and
|
||||
the artist name of the recording. Multiple artist names are joined
|
||||
by join phrases as displayed on web page. If an artist is not available,
|
||||
the last item is None. If the response is incomplete, raises a
|
||||
WebServiceError.
|
||||
"""
|
||||
if data['status'] != 'ok':
|
||||
raise WebServiceError("status: %s" % data['status'])
|
||||
if 'results' not in data:
|
||||
raise WebServiceError("results not included")
|
||||
|
||||
for result in data['results']:
|
||||
score = result['score']
|
||||
if 'recordings' not in result:
|
||||
# No recording attached. This result is not very useful.
|
||||
continue
|
||||
|
||||
for recording in result['recordings']:
|
||||
# Get the artist if available.
|
||||
artists = recording.get("artists")
|
||||
release_groups = recording.get('releasegroups', [])
|
||||
if artists:
|
||||
artist_name = "".join(
|
||||
[
|
||||
artist["name"] + artist.get("joinphrase", "")
|
||||
for artist in artists
|
||||
]
|
||||
)
|
||||
else:
|
||||
artist_name = None
|
||||
release_group_title = None
|
||||
if release_groups:
|
||||
release_group = release_groups[0]
|
||||
release_group_title = release_group.get('title')
|
||||
release_group_id = release_group.get('id')
|
||||
yield score, recording['id'], recording.get('title'), artist_name, release_group_title
|
||||
|
||||
|
||||
def _fingerprint_file_audioread(path, maxlength):
|
||||
"""Fingerprint a file by using audioread and chromaprint."""
|
||||
try:
|
||||
with audioread.audio_open(path) as f:
|
||||
duration = f.duration
|
||||
fp = fingerprint(f.samplerate, f.channels, iter(f), maxlength)
|
||||
except audioread.DecodeError:
|
||||
raise FingerprintGenerationError("audio could not be decoded")
|
||||
return duration, fp
|
||||
|
||||
|
||||
def _fingerprint_file_fpcalc(path, maxlength):
|
||||
"""Fingerprint a file by calling the fpcalc application."""
|
||||
fpcalc = os.environ.get(FPCALC_ENVVAR, FPCALC_COMMAND)
|
||||
command = [fpcalc, "-length", str(maxlength), path]
|
||||
try:
|
||||
with open(os.devnull, 'wb') as devnull:
|
||||
proc = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=devnull)
|
||||
output, _ = proc.communicate()
|
||||
except OSError as exc:
|
||||
if exc.errno == errno.ENOENT:
|
||||
raise NoBackendError("fpcalc not found")
|
||||
else:
|
||||
raise FingerprintGenerationError("fpcalc invocation failed: %s" %
|
||||
str(exc))
|
||||
except UnicodeEncodeError:
|
||||
# Due to a bug in Python 2's subprocess on Windows, Unicode
|
||||
# filenames can fail to encode on that platform. See:
|
||||
# http://bugs.python.org/issue1759845
|
||||
raise FingerprintGenerationError("argument encoding failed")
|
||||
retcode = proc.poll()
|
||||
if retcode:
|
||||
raise FingerprintGenerationError("fpcalc exited with status %i" %
|
||||
retcode)
|
||||
|
||||
duration = fp = None
|
||||
for line in output.splitlines():
|
||||
try:
|
||||
parts = line.split(b'=', 1)
|
||||
except ValueError:
|
||||
raise FingerprintGenerationError("malformed fpcalc output")
|
||||
if parts[0] == b'DURATION':
|
||||
try:
|
||||
duration = float(parts[1])
|
||||
except ValueError:
|
||||
raise FingerprintGenerationError("fpcalc duration not numeric")
|
||||
elif parts[0] == b'FINGERPRINT':
|
||||
fp = parts[1]
|
||||
|
||||
if duration is None or fp is None:
|
||||
raise FingerprintGenerationError("missing fpcalc output")
|
||||
return duration, fp
|
||||
|
||||
|
||||
def fingerprint_file(path, maxlength=MAX_AUDIO_LENGTH, force_fpcalc=False):
|
||||
"""Fingerprint a file either using the Chromaprint dynamic library
|
||||
or the fpcalc command-line tool, whichever is available (unless
|
||||
``force_fpcalc`` is specified). Returns the duration and the
|
||||
fingerprint.
|
||||
"""
|
||||
path = os.path.abspath(os.path.expanduser(path))
|
||||
if have_audioread and have_chromaprint and not force_fpcalc:
|
||||
return _fingerprint_file_audioread(path, maxlength)
|
||||
else:
|
||||
return _fingerprint_file_fpcalc(path, maxlength)
|
||||
|
||||
|
||||
def _popcount(x) -> int:
|
||||
"""count 1s in binary encoding of x"""
|
||||
return bin(x).count('1')
|
||||
|
||||
|
||||
def _match_fingerprints(a: List[int], b: List[int]) -> float:
|
||||
"""Compare two Chromaprint fingerprints, given as numbers.
|
||||
|
||||
For more details, see:
|
||||
https://essentia.upf.edu/tutorial_fingerprinting_chromaprint.html
|
||||
|
||||
:param a: decompressed fingerprint
|
||||
:param b: decompressed fingerprint
|
||||
:return: similarity score [0,1]
|
||||
"""
|
||||
asize = len(a)
|
||||
bsize = len(b)
|
||||
numcounts = asize + bsize + 1
|
||||
counts = [0] * numcounts
|
||||
|
||||
for i in range(asize):
|
||||
jbegin = max(0, i - MAX_ALIGN_OFFSET)
|
||||
jend = min(bsize, i + MAX_ALIGN_OFFSET)
|
||||
for j in range(jbegin, jend):
|
||||
biterror = _popcount(a[i] ^ b[j]) # xor operator
|
||||
if biterror <= MAX_BIT_ERROR:
|
||||
offset = i - j + bsize
|
||||
counts[offset] += 1
|
||||
topcount = counts.max()
|
||||
return topcount / min(asize, bsize)
|
||||
|
||||
|
||||
def compare_fingerprints(a, b) -> float:
|
||||
"""Compare two fingerprints produced by `fingerprint_file`.
|
||||
|
||||
:param a: A pair produced by `fingerprint_file`.
|
||||
:param b: A second such pair.
|
||||
:return: similarity score [0,1]
|
||||
"""
|
||||
if not have_chromaprint:
|
||||
raise ModuleNotFoundError("function needs chromaprint")
|
||||
|
||||
# decompress fingerprints
|
||||
a = [int(x) for x in chromaprint.decode_fingerprint(a)[0]]
|
||||
b = [int(x) for x in chromaprint.decode_fingerprint(b)[0]]
|
||||
return _match_fingerprints(a, b)
|
||||
|
||||
|
||||
def match(apikey, path, meta=DEFAULT_META, parse=True, force_fpcalc=False,
|
||||
timeout=None):
|
||||
"""Look up the metadata for an audio file. If ``parse`` is true,
|
||||
then ``parse_lookup_result`` is used to return an iterator over
|
||||
small tuple of relevant information; otherwise, the full parsed JSON
|
||||
response is returned. Fingerprinting uses either the Chromaprint
|
||||
library or the fpcalc command-line tool; if ``force_fpcalc`` is
|
||||
true, only the latter will be used. To get more data back, ``meta``
|
||||
can be a list of keywords from this list: recordings, recordingids,
|
||||
releases, releaseids, releasegroups, releasegroupids, tracks,
|
||||
compress, usermeta, sources.
|
||||
"""
|
||||
duration, fp = fingerprint_file(path, force_fpcalc=force_fpcalc)
|
||||
response = lookup(apikey, fp, duration, meta, timeout)
|
||||
if parse:
|
||||
return parse_lookup_result(response)
|
||||
else:
|
||||
return response
|
||||
|
||||
|
||||
def submit(apikey, userkey, data, timeout=None):
|
||||
"""Submit a fingerprint to the acoustid server. The ``apikey`` and
|
||||
``userkey`` parameters are API keys for the application and the
|
||||
submitting user, respectively.
|
||||
|
||||
``data`` may be either a single dictionary or a list of
|
||||
dictionaries. In either case, each dictionary must contain a
|
||||
``fingerprint`` key and a ``duration`` key and may include the
|
||||
following: ``puid``, ``mbid``, ``track``, ``artist``, ``album``,
|
||||
``albumartist``, ``year``, ``trackno``, ``discno``, ``fileformat``,
|
||||
``bitrate``
|
||||
|
||||
If the required keys are not present in a dictionary, a
|
||||
FingerprintSubmissionError is raised.
|
||||
|
||||
Returns the parsed JSON response.
|
||||
"""
|
||||
if isinstance(data, dict):
|
||||
data = [data]
|
||||
|
||||
args = {
|
||||
'format': 'json',
|
||||
'client': apikey,
|
||||
'user': userkey,
|
||||
}
|
||||
|
||||
# Build up "field.#" parameters corresponding to the parameters
|
||||
# given in each dictionary.
|
||||
for i, d in enumerate(data):
|
||||
if "duration" not in d or "fingerprint" not in d:
|
||||
raise FingerprintSubmissionError("missing required parameters")
|
||||
|
||||
# The duration needs to be an integer.
|
||||
d["duration"] = int(d["duration"])
|
||||
|
||||
for k, v in d.items():
|
||||
args["%s.%s" % (k, i)] = v
|
||||
|
||||
response = _api_request(_get_submit_url(), args, timeout)
|
||||
if response.get('status') != 'ok':
|
||||
try:
|
||||
code = response['error']['code']
|
||||
message = response['error']['message']
|
||||
except KeyError:
|
||||
raise WebServiceError("response: {0}".format(response))
|
||||
raise WebServiceError("error {0}: {1}".format(code, message))
|
||||
return response
|
||||
|
||||
|
||||
def get_submission_status(apikey, submission_id, timeout=None):
|
||||
"""Get the status of a submission to the acoustid server.
|
||||
``submission_id`` is the id of a fingerprint submission, as returned
|
||||
in the response object of a call to the ``submit`` endpoint.
|
||||
"""
|
||||
params = {
|
||||
'format': 'json',
|
||||
'client': apikey,
|
||||
'id': submission_id,
|
||||
}
|
||||
return _api_request(_get_submission_status_url(), params, timeout)
|
||||
66
component/mz/aidmatch.py
Normal file
66
component/mz/aidmatch.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from __future__ import print_function
|
||||
# This file is part of pyacoustid.
|
||||
# Copyright 2011, Adrian Sampson.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining
|
||||
# a copy of this software and associated documentation files (the
|
||||
# "Software"), to deal in the Software without restriction, including
|
||||
# without limitation the rights to use, copy, modify, merge, publish,
|
||||
# distribute, sublicense, and/or sell copies of the Software, and to
|
||||
# permit persons to whom the Software is furnished to do so, subject to
|
||||
# the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
"""Example script that identifies metadata for files specified on the
|
||||
command line.
|
||||
"""
|
||||
import acoustid
|
||||
import sys
|
||||
|
||||
# API key for this demo script only. Get your own API key at the
|
||||
# Acoustid Web for your application.
|
||||
# http://acoustid.org/
|
||||
API_KEY = 'cSpUJKpD'
|
||||
|
||||
|
||||
# Python 2/3 Unicode compatibility: this `print_` function forces a
|
||||
# unicode string into a byte string for printing on Python 2, avoiding
|
||||
# errors in the process, and does nothing on Python 3, where
|
||||
# stdout/stderr are text streams (and there's not much we can do about
|
||||
# that).
|
||||
if sys.version_info[0] < 3:
|
||||
def print_(s):
|
||||
print(s.encode(sys.stdout.encoding, 'replace'))
|
||||
else:
|
||||
def print_(s):
|
||||
print(s)
|
||||
|
||||
|
||||
def aidmatch(filename):
|
||||
try:
|
||||
results = acoustid.match(API_KEY, filename)
|
||||
except acoustid.NoBackendError:
|
||||
print("chromaprint library/tool not found", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except acoustid.FingerprintGenerationError:
|
||||
print("fingerprint could not be calculated", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
except acoustid.WebServiceError as exc:
|
||||
print("web service request failed:", exc.message, file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
first = True
|
||||
for score, rid, title, artist in results:
|
||||
if first:
|
||||
first = False
|
||||
else:
|
||||
print()
|
||||
print_('%s - %s' % (artist, title))
|
||||
print_('http://musicbrainz.org/recording/%s' % rid)
|
||||
print_('Score: %i%%' % (int(score * 100)))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
aidmatch(sys.argv[1])
|
||||
255
component/mz/chromaprint.py
Normal file
255
component/mz/chromaprint.py
Normal file
@@ -0,0 +1,255 @@
|
||||
# Copyright (C) 2011 Lukas Lalinsky
|
||||
# (Minor modifications by Adrian Sampson.)
|
||||
# Distributed under the MIT license, see the LICENSE file for details.
|
||||
|
||||
"""Low-level ctypes wrapper from the chromaprint library."""
|
||||
|
||||
import sys
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
BUFFER_TYPES = (memoryview, bytearray,)
|
||||
BYTES_TYPE = bytes
|
||||
elif sys.version_info[1] >= 7:
|
||||
BUFFER_TYPES = (buffer, memoryview, bytearray,) # noqa: F821
|
||||
BYTES_TYPE = str
|
||||
else:
|
||||
BUFFER_TYPES = (buffer, bytearray,) # noqa: F821
|
||||
BYTES_TYPE = str
|
||||
|
||||
|
||||
# Find the base library and declare prototypes.
|
||||
|
||||
def _guess_lib_name():
|
||||
if sys.platform == 'darwin':
|
||||
return ('libchromaprint.1.dylib', 'libchromaprint.0.dylib')
|
||||
elif sys.platform == 'win32':
|
||||
return ('chromaprint.dll', 'libchromaprint.dll')
|
||||
elif sys.platform == 'cygwin':
|
||||
return ('libchromaprint.dll.a', 'cygchromaprint-1.dll',
|
||||
'cygchromaprint-0.dll')
|
||||
return ('libchromaprint.so.1', 'libchromaprint.so.0')
|
||||
|
||||
|
||||
def _load_library(name):
|
||||
"""Try to load a dynamic library with ctypes, or return None if the
|
||||
library is not available.
|
||||
"""
|
||||
if sys.platform == 'win32':
|
||||
# On Windows since Python 3.8, we need an extra call to
|
||||
# `find_library` to search standard library paths.
|
||||
name = ctypes.util.find_library(name)
|
||||
if not name:
|
||||
return None
|
||||
|
||||
try:
|
||||
return ctypes.cdll.LoadLibrary(name)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
|
||||
for name in _guess_lib_name():
|
||||
_libchromaprint = _load_library(name)
|
||||
if _libchromaprint:
|
||||
break
|
||||
else:
|
||||
raise ImportError("couldn't find libchromaprint")
|
||||
|
||||
|
||||
_libchromaprint.chromaprint_get_version.argtypes = ()
|
||||
_libchromaprint.chromaprint_get_version.restype = ctypes.c_char_p
|
||||
|
||||
_libchromaprint.chromaprint_new.argtypes = (ctypes.c_int,)
|
||||
_libchromaprint.chromaprint_new.restype = ctypes.c_void_p
|
||||
|
||||
_libchromaprint.chromaprint_free.argtypes = (ctypes.c_void_p,)
|
||||
_libchromaprint.chromaprint_free.restype = None
|
||||
|
||||
_libchromaprint.chromaprint_start.argtypes = \
|
||||
(ctypes.c_void_p, ctypes.c_int, ctypes.c_int)
|
||||
_libchromaprint.chromaprint_start.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_feed.argtypes = \
|
||||
(ctypes.c_void_p, ctypes.POINTER(ctypes.c_char), ctypes.c_int)
|
||||
_libchromaprint.chromaprint_feed.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_finish.argtypes = (ctypes.c_void_p,)
|
||||
_libchromaprint.chromaprint_finish.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_get_fingerprint.argtypes = \
|
||||
(ctypes.c_void_p, ctypes.POINTER(ctypes.c_char_p))
|
||||
_libchromaprint.chromaprint_get_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_decode_fingerprint.argtypes = \
|
||||
(ctypes.POINTER(ctypes.c_char), ctypes.c_int,
|
||||
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint32)),
|
||||
ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int), ctypes.c_int)
|
||||
_libchromaprint.chromaprint_decode_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_encode_fingerprint.argtypes = \
|
||||
(ctypes.POINTER(ctypes.c_int32), ctypes.c_int, ctypes.c_int,
|
||||
ctypes.POINTER(ctypes.POINTER(ctypes.c_char)),
|
||||
ctypes.POINTER(ctypes.c_int), ctypes.c_int)
|
||||
_libchromaprint.chromaprint_encode_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_hash_fingerprint.argtypes = \
|
||||
(ctypes.POINTER(ctypes.c_int32), ctypes.c_int,
|
||||
ctypes.POINTER(ctypes.c_uint32))
|
||||
_libchromaprint.chromaprint_hash_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_dealloc.argtypes = (ctypes.c_void_p,)
|
||||
_libchromaprint.chromaprint_dealloc.restype = None
|
||||
|
||||
|
||||
# Main interface.
|
||||
|
||||
class FingerprintError(Exception):
|
||||
"""Raised when a call to the underlying library fails."""
|
||||
|
||||
|
||||
def _check(res):
|
||||
"""Check the result of a library call, raising an error if the call
|
||||
failed.
|
||||
"""
|
||||
if res != 1:
|
||||
raise FingerprintError()
|
||||
|
||||
|
||||
class Fingerprinter(object):
|
||||
|
||||
ALGORITHM_TEST1 = 0
|
||||
ALGORITHM_TEST2 = 1
|
||||
ALGORITHM_TEST3 = 2
|
||||
ALGORITHM_DEFAULT = ALGORITHM_TEST2
|
||||
|
||||
def __init__(self, algorithm=ALGORITHM_DEFAULT):
|
||||
self._ctx = _libchromaprint.chromaprint_new(algorithm)
|
||||
|
||||
def __del__(self):
|
||||
_libchromaprint.chromaprint_free(self._ctx)
|
||||
del self._ctx
|
||||
|
||||
def start(self, sample_rate, num_channels):
|
||||
"""Initialize the fingerprinter with the given audio parameters.
|
||||
"""
|
||||
_check(_libchromaprint.chromaprint_start(
|
||||
self._ctx, sample_rate, num_channels
|
||||
))
|
||||
|
||||
def feed(self, data):
|
||||
"""Send raw PCM audio data to the fingerprinter. Data may be
|
||||
either a bytestring or a buffer object.
|
||||
"""
|
||||
if isinstance(data, BUFFER_TYPES):
|
||||
data = BYTES_TYPE(data)
|
||||
elif not isinstance(data, bytes):
|
||||
raise TypeError('data must be bytes, buffer, or memoryview')
|
||||
_check(_libchromaprint.chromaprint_feed(
|
||||
self._ctx, data, len(data) // 2
|
||||
))
|
||||
|
||||
def finish(self):
|
||||
"""Finish the fingerprint generation process and retrieve the
|
||||
resulting fignerprint as a bytestring.
|
||||
"""
|
||||
_check(_libchromaprint.chromaprint_finish(self._ctx))
|
||||
fingerprint_ptr = ctypes.c_char_p()
|
||||
_check(_libchromaprint.chromaprint_get_fingerprint(
|
||||
self._ctx, ctypes.byref(fingerprint_ptr)
|
||||
))
|
||||
fingerprint = fingerprint_ptr.value
|
||||
_libchromaprint.chromaprint_dealloc(fingerprint_ptr)
|
||||
return fingerprint
|
||||
|
||||
|
||||
def decode_fingerprint(data, base64=True):
|
||||
"""Uncompress and optionally decode a fingerprint.
|
||||
|
||||
Args:
|
||||
data: An encoded fingerprint in bytes.
|
||||
base64: Whether to base64-decode the fingerprint.
|
||||
|
||||
Returns:
|
||||
A tuple containing the decoded raw fingerprint as an array
|
||||
of unsigned 32-bit integers, and an int representing the chromaprint
|
||||
algorithm used to generate the fingerprint.
|
||||
"""
|
||||
result_ptr = ctypes.POINTER(ctypes.c_uint32)()
|
||||
result_size = ctypes.c_int()
|
||||
algorithm = ctypes.c_int()
|
||||
_check(_libchromaprint.chromaprint_decode_fingerprint(
|
||||
data, len(data), ctypes.byref(result_ptr), ctypes.byref(result_size),
|
||||
ctypes.byref(algorithm), 1 if base64 else 0
|
||||
))
|
||||
result = result_ptr[:result_size.value]
|
||||
_libchromaprint.chromaprint_dealloc(result_ptr)
|
||||
return result, algorithm.value
|
||||
|
||||
|
||||
def encode_fingerprint(fingerprint, algorithm, base64=True):
|
||||
"""Compress and optionally encode a fingerprint.
|
||||
|
||||
Args:
|
||||
fingerprint: A bytestring with the fingerprint.
|
||||
algorithm: An int flag choosing the algorithm to use.
|
||||
base64: Whether to base64-encode the fingerprint.
|
||||
|
||||
Returns:
|
||||
A bytestring with the encoded fingerprint.
|
||||
"""
|
||||
fp_array = (ctypes.c_int * len(fingerprint))()
|
||||
for i in range(len(fingerprint)):
|
||||
fp_array[i] = fingerprint[i]
|
||||
result_ptr = ctypes.POINTER(ctypes.c_char)()
|
||||
result_size = ctypes.c_int()
|
||||
_check(_libchromaprint.chromaprint_encode_fingerprint(
|
||||
fp_array, len(fingerprint), algorithm, ctypes.byref(result_ptr),
|
||||
ctypes.byref(result_size), 1 if base64 else 0
|
||||
))
|
||||
result = result_ptr[:result_size.value]
|
||||
_libchromaprint.chromaprint_dealloc(result_ptr)
|
||||
return result
|
||||
|
||||
|
||||
def hash_fingerprint(fingerprint):
|
||||
"""Generate a single 32-bit hash for a raw, decoded fingerprint.
|
||||
|
||||
If two fingerprints are similar, their hashes generated by this
|
||||
function will also be similar. If they are significantly different,
|
||||
their hashes will most likely be significantly different as well
|
||||
(but clients should not rely on this).
|
||||
|
||||
Compare two hashes with their Hamming distance, i.e., by counting
|
||||
the bits in which they differ.
|
||||
|
||||
Args:
|
||||
fingerprint: A list of ints for the raw, decoded fingerprint.
|
||||
|
||||
Returns:
|
||||
A 32-bit integer hash.
|
||||
|
||||
Example usage:
|
||||
audio_fingerprint = <get fingerprint with Fingerprinter>
|
||||
decoded_fingerprint, algo = decode_fingerprint(audio_fingerprint)
|
||||
first_fingerprint_hash = hash_fingerprint(decoded_fingerprint)
|
||||
|
||||
second_fingerprint_hash = <repeat steps for second audio file>
|
||||
|
||||
# Compare the binary strings using Hamming distance.
|
||||
first_fp_binary = format(first_fingerprint_hash, 'b')
|
||||
second_fp_binary = format(second_fingerprint_hash, 'b')
|
||||
|
||||
# This value will be between 0 and 32 and represent the POPCNT.
|
||||
# A value > 15 indicates the two fingerprints are very different.
|
||||
bin(int(first_fp_binary,2)^int(second_fp_binary,2)).count
|
||||
"""
|
||||
|
||||
fp_array = (ctypes.c_int * len(fingerprint))()
|
||||
for i in range(len(fingerprint)):
|
||||
fp_array[i] = fingerprint[i]
|
||||
result_hash = ctypes.c_uint32()
|
||||
_check(_libchromaprint.chromaprint_hash_fingerprint(
|
||||
fp_array, len(fingerprint), ctypes.byref(result_hash)
|
||||
))
|
||||
return result_hash.value
|
||||
BIN
component/mz/fpcalc
Executable file
BIN
component/mz/fpcalc
Executable file
Binary file not shown.
319
component/mz/fpcalc.py
Normal file
319
component/mz/fpcalc.py
Normal file
@@ -0,0 +1,319 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# This file is part of pyacoustid.
|
||||
# Copyright 2012, Lukas Lalinsky.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining
|
||||
# a copy of this software and associated documentation files (the
|
||||
# "Software"), to deal in the Software without restriction, including
|
||||
# without limitation the rights to use, copy, modify, merge, publish,
|
||||
# distribute, sublicense, and/or sell copies of the Software, and to
|
||||
# permit persons to whom the Software is furnished to do so, subject to
|
||||
# the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be
|
||||
# included in all copies or substantial portions of the Software.
|
||||
|
||||
"""Simple script for calculating audio fingerprints, using the same
|
||||
arguments/output as the fpcalc utility from Chromaprint."""
|
||||
|
||||
from __future__ import division
|
||||
from __future__ import absolute_import
|
||||
from __future__ import print_function
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
import acoustid
|
||||
# Copyright (C) 2011 Lukas Lalinsky
|
||||
# (Minor modifications by Adrian Sampson.)
|
||||
# Distributed under the MIT license, see the LICENSE file for details.
|
||||
|
||||
"""Low-level ctypes wrapper from the chromaprint library."""
|
||||
|
||||
import sys
|
||||
import ctypes
|
||||
import ctypes.util
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
BUFFER_TYPES = (memoryview, bytearray,)
|
||||
BYTES_TYPE = bytes
|
||||
elif sys.version_info[1] >= 7:
|
||||
BUFFER_TYPES = (buffer, memoryview, bytearray,) # noqa: F821
|
||||
BYTES_TYPE = str
|
||||
else:
|
||||
BUFFER_TYPES = (buffer, bytearray,) # noqa: F821
|
||||
BYTES_TYPE = str
|
||||
|
||||
|
||||
# Find the base library and declare prototypes.
|
||||
|
||||
def _guess_lib_name():
|
||||
if sys.platform == 'darwin':
|
||||
return ('libchromaprint.1.dylib', 'libchromaprint.0.dylib')
|
||||
elif sys.platform == 'win32':
|
||||
return ('chromaprint.dll', 'libchromaprint.dll')
|
||||
elif sys.platform == 'cygwin':
|
||||
return ('libchromaprint.dll.a', 'cygchromaprint-1.dll',
|
||||
'cygchromaprint-0.dll')
|
||||
return ('libchromaprint.so.1', 'libchromaprint.so.0')
|
||||
|
||||
|
||||
def _load_library(name):
|
||||
"""Try to load a dynamic library with ctypes, or return None if the
|
||||
library is not available.
|
||||
"""
|
||||
if sys.platform == 'win32':
|
||||
# On Windows since Python 3.8, we need an extra call to
|
||||
# `find_library` to search standard library paths.
|
||||
name = ctypes.util.find_library(name)
|
||||
if not name:
|
||||
return None
|
||||
|
||||
try:
|
||||
return ctypes.cdll.LoadLibrary(name)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
|
||||
for name in _guess_lib_name():
|
||||
_libchromaprint = _load_library(name)
|
||||
if _libchromaprint:
|
||||
break
|
||||
else:
|
||||
raise ImportError("couldn't find libchromaprint")
|
||||
|
||||
|
||||
_libchromaprint.chromaprint_get_version.argtypes = ()
|
||||
_libchromaprint.chromaprint_get_version.restype = ctypes.c_char_p
|
||||
|
||||
_libchromaprint.chromaprint_new.argtypes = (ctypes.c_int,)
|
||||
_libchromaprint.chromaprint_new.restype = ctypes.c_void_p
|
||||
|
||||
_libchromaprint.chromaprint_free.argtypes = (ctypes.c_void_p,)
|
||||
_libchromaprint.chromaprint_free.restype = None
|
||||
|
||||
_libchromaprint.chromaprint_start.argtypes = \
|
||||
(ctypes.c_void_p, ctypes.c_int, ctypes.c_int)
|
||||
_libchromaprint.chromaprint_start.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_feed.argtypes = \
|
||||
(ctypes.c_void_p, ctypes.POINTER(ctypes.c_char), ctypes.c_int)
|
||||
_libchromaprint.chromaprint_feed.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_finish.argtypes = (ctypes.c_void_p,)
|
||||
_libchromaprint.chromaprint_finish.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_get_fingerprint.argtypes = \
|
||||
(ctypes.c_void_p, ctypes.POINTER(ctypes.c_char_p))
|
||||
_libchromaprint.chromaprint_get_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_decode_fingerprint.argtypes = \
|
||||
(ctypes.POINTER(ctypes.c_char), ctypes.c_int,
|
||||
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint32)),
|
||||
ctypes.POINTER(ctypes.c_int), ctypes.POINTER(ctypes.c_int), ctypes.c_int)
|
||||
_libchromaprint.chromaprint_decode_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_encode_fingerprint.argtypes = \
|
||||
(ctypes.POINTER(ctypes.c_int32), ctypes.c_int, ctypes.c_int,
|
||||
ctypes.POINTER(ctypes.POINTER(ctypes.c_char)),
|
||||
ctypes.POINTER(ctypes.c_int), ctypes.c_int)
|
||||
_libchromaprint.chromaprint_encode_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_hash_fingerprint.argtypes = \
|
||||
(ctypes.POINTER(ctypes.c_int32), ctypes.c_int,
|
||||
ctypes.POINTER(ctypes.c_uint32))
|
||||
_libchromaprint.chromaprint_hash_fingerprint.restype = ctypes.c_int
|
||||
|
||||
_libchromaprint.chromaprint_dealloc.argtypes = (ctypes.c_void_p,)
|
||||
_libchromaprint.chromaprint_dealloc.restype = None
|
||||
|
||||
|
||||
# Main interface.
|
||||
|
||||
class FingerprintError(Exception):
|
||||
"""Raised when a call to the underlying library fails."""
|
||||
|
||||
|
||||
def _check(res):
|
||||
"""Check the result of a library call, raising an error if the call
|
||||
failed.
|
||||
"""
|
||||
if res != 1:
|
||||
raise FingerprintError()
|
||||
|
||||
|
||||
class Fingerprinter(object):
|
||||
|
||||
ALGORITHM_TEST1 = 0
|
||||
ALGORITHM_TEST2 = 1
|
||||
ALGORITHM_TEST3 = 2
|
||||
ALGORITHM_DEFAULT = ALGORITHM_TEST2
|
||||
|
||||
def __init__(self, algorithm=ALGORITHM_DEFAULT):
|
||||
self._ctx = _libchromaprint.chromaprint_new(algorithm)
|
||||
|
||||
def __del__(self):
|
||||
_libchromaprint.chromaprint_free(self._ctx)
|
||||
del self._ctx
|
||||
|
||||
def start(self, sample_rate, num_channels):
|
||||
"""Initialize the fingerprinter with the given audio parameters.
|
||||
"""
|
||||
_check(_libchromaprint.chromaprint_start(
|
||||
self._ctx, sample_rate, num_channels
|
||||
))
|
||||
|
||||
def feed(self, data):
|
||||
"""Send raw PCM audio data to the fingerprinter. Data may be
|
||||
either a bytestring or a buffer object.
|
||||
"""
|
||||
if isinstance(data, BUFFER_TYPES):
|
||||
data = BYTES_TYPE(data)
|
||||
elif not isinstance(data, bytes):
|
||||
raise TypeError('data must be bytes, buffer, or memoryview')
|
||||
_check(_libchromaprint.chromaprint_feed(
|
||||
self._ctx, data, len(data) // 2
|
||||
))
|
||||
|
||||
def finish(self):
|
||||
"""Finish the fingerprint generation process and retrieve the
|
||||
resulting fignerprint as a bytestring.
|
||||
"""
|
||||
_check(_libchromaprint.chromaprint_finish(self._ctx))
|
||||
fingerprint_ptr = ctypes.c_char_p()
|
||||
_check(_libchromaprint.chromaprint_get_fingerprint(
|
||||
self._ctx, ctypes.byref(fingerprint_ptr)
|
||||
))
|
||||
fingerprint = fingerprint_ptr.value
|
||||
_libchromaprint.chromaprint_dealloc(fingerprint_ptr)
|
||||
return fingerprint
|
||||
|
||||
|
||||
def decode_fingerprint(data, base64=True):
|
||||
"""Uncompress and optionally decode a fingerprint.
|
||||
|
||||
Args:
|
||||
data: An encoded fingerprint in bytes.
|
||||
base64: Whether to base64-decode the fingerprint.
|
||||
|
||||
Returns:
|
||||
A tuple containing the decoded raw fingerprint as an array
|
||||
of unsigned 32-bit integers, and an int representing the chromaprint
|
||||
algorithm used to generate the fingerprint.
|
||||
"""
|
||||
result_ptr = ctypes.POINTER(ctypes.c_uint32)()
|
||||
result_size = ctypes.c_int()
|
||||
algorithm = ctypes.c_int()
|
||||
_check(_libchromaprint.chromaprint_decode_fingerprint(
|
||||
data, len(data), ctypes.byref(result_ptr), ctypes.byref(result_size),
|
||||
ctypes.byref(algorithm), 1 if base64 else 0
|
||||
))
|
||||
result = result_ptr[:result_size.value]
|
||||
_libchromaprint.chromaprint_dealloc(result_ptr)
|
||||
return result, algorithm.value
|
||||
|
||||
|
||||
def encode_fingerprint(fingerprint, algorithm, base64=True):
|
||||
"""Compress and optionally encode a fingerprint.
|
||||
|
||||
Args:
|
||||
fingerprint: A bytestring with the fingerprint.
|
||||
algorithm: An int flag choosing the algorithm to use.
|
||||
base64: Whether to base64-encode the fingerprint.
|
||||
|
||||
Returns:
|
||||
A bytestring with the encoded fingerprint.
|
||||
"""
|
||||
fp_array = (ctypes.c_int * len(fingerprint))()
|
||||
for i in range(len(fingerprint)):
|
||||
fp_array[i] = fingerprint[i]
|
||||
result_ptr = ctypes.POINTER(ctypes.c_char)()
|
||||
result_size = ctypes.c_int()
|
||||
_check(_libchromaprint.chromaprint_encode_fingerprint(
|
||||
fp_array, len(fingerprint), algorithm, ctypes.byref(result_ptr),
|
||||
ctypes.byref(result_size), 1 if base64 else 0
|
||||
))
|
||||
result = result_ptr[:result_size.value]
|
||||
_libchromaprint.chromaprint_dealloc(result_ptr)
|
||||
return result
|
||||
|
||||
|
||||
def hash_fingerprint(fingerprint):
|
||||
"""Generate a single 32-bit hash for a raw, decoded fingerprint.
|
||||
|
||||
If two fingerprints are similar, their hashes generated by this
|
||||
function will also be similar. If they are significantly different,
|
||||
their hashes will most likely be significantly different as well
|
||||
(but clients should not rely on this).
|
||||
|
||||
Compare two hashes with their Hamming distance, i.e., by counting
|
||||
the bits in which they differ.
|
||||
|
||||
Args:
|
||||
fingerprint: A list of ints for the raw, decoded fingerprint.
|
||||
|
||||
Returns:
|
||||
A 32-bit integer hash.
|
||||
|
||||
Example usage:
|
||||
audio_fingerprint = <get fingerprint with Fingerprinter>
|
||||
decoded_fingerprint, algo = decode_fingerprint(audio_fingerprint)
|
||||
first_fingerprint_hash = hash_fingerprint(decoded_fingerprint)
|
||||
|
||||
second_fingerprint_hash = <repeat steps for second audio file>
|
||||
|
||||
# Compare the binary strings using Hamming distance.
|
||||
first_fp_binary = format(first_fingerprint_hash, 'b')
|
||||
second_fp_binary = format(second_fingerprint_hash, 'b')
|
||||
|
||||
# This value will be between 0 and 32 and represent the POPCNT.
|
||||
# A value > 15 indicates the two fingerprints are very different.
|
||||
bin(int(first_fp_binary,2)^int(second_fp_binary,2)).count
|
||||
"""
|
||||
|
||||
fp_array = (ctypes.c_int * len(fingerprint))()
|
||||
for i in range(len(fingerprint)):
|
||||
fp_array[i] = fingerprint[i]
|
||||
result_hash = ctypes.c_uint32()
|
||||
_check(_libchromaprint.chromaprint_hash_fingerprint(
|
||||
fp_array, len(fingerprint), ctypes.byref(result_hash)
|
||||
))
|
||||
return result_hash.value
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-length', metavar='SECS', type=int, default=120,
|
||||
help='length of the audio data used for fingerprint '
|
||||
'calculation (default 120)')
|
||||
parser.add_argument('-raw', action='store_true',
|
||||
help='output the raw uncompressed fingerprint')
|
||||
parser.add_argument('paths', metavar='FILE', nargs='+',
|
||||
help='audio file to be fingerprinted')
|
||||
|
||||
args = parser.parse_args()
|
||||
# make gst not try to parse the args
|
||||
del sys.argv[1:]
|
||||
|
||||
first = True
|
||||
for i, path in enumerate(args.paths):
|
||||
try:
|
||||
duration, fp = acoustid.fingerprint_file(path, args.length)
|
||||
except Exception:
|
||||
print("ERROR: unable to calculate fingerprint "
|
||||
"for file %s, skipping" % path, file=sys.stderr)
|
||||
continue
|
||||
if args.raw:
|
||||
raw_fp = chromaprint.decode_fingerprint(fp)[0]
|
||||
fp = ','.join(map(str, raw_fp))
|
||||
if not first:
|
||||
print
|
||||
first = False
|
||||
print('FILE=%s' % path)
|
||||
print('DURATION=%d' % duration)
|
||||
print('FINGERPRINT=%s' % fp.decode('utf8'))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
12
component/mz/run.py
Normal file
12
component/mz/run.py
Normal file
@@ -0,0 +1,12 @@
|
||||
import os
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
os.environ["FPCALC"] = os.path.join(settings.BASE_DIR, "component", "mz", "fpcalc")
|
||||
from component.mz import acoustid
|
||||
|
||||
apikey = "cSpUJKpD"
|
||||
|
||||
|
||||
def get_acoustid(path):
|
||||
return acoustid.match(apikey, path)
|
||||
@@ -339,10 +339,12 @@
|
||||
searchWord: '',
|
||||
treeListOne: [],
|
||||
filePath: '/app/media/',
|
||||
fullPath: '',
|
||||
bakDir: '/app/media/',
|
||||
fileName: '',
|
||||
resource: 'netease',
|
||||
resourceList: [
|
||||
{id: 'acoustid', name: '音乐指纹识别'},
|
||||
{id: 'netease', name: '网易云音乐'},
|
||||
{id: 'migu', name: '咪咕音乐'},
|
||||
{id: 'qmusic', name: 'QQ音乐'},
|
||||
@@ -425,6 +427,7 @@
|
||||
}
|
||||
this.musicInfo = this.baseMusicInfo
|
||||
this.fileName = node.name
|
||||
this.fullPath = this.filePath + '/' + node.name
|
||||
this.$api.Task.musicId3({'file_path': this.filePath, 'file_name': node.name}).then((res) => {
|
||||
console.log(res)
|
||||
this.musicInfo = res.data
|
||||
@@ -512,7 +515,7 @@
|
||||
return
|
||||
}
|
||||
this.fadeShowDetail = false
|
||||
this.$api.Task.fetchId3Title({title: this.musicInfo.title, resource: this.resource}).then((res) => {
|
||||
this.$api.Task.fetchId3Title({title: this.musicInfo.title, resource: this.resource, full_path: this.fullPath}).then((res) => {
|
||||
this.fadeShowDetail = true
|
||||
this.SongList = res.data
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user