diff --git a/yt_dlp/extractor/extrememusic.py b/yt_dlp/extractor/extrememusic.py index d49512ee6..aed6545a5 100644 --- a/yt_dlp/extractor/extrememusic.py +++ b/yt_dlp/extractor/extrememusic.py @@ -17,113 +17,111 @@ class ExtremeMusicBaseIE(InfoExtractor): _API_URL = 'https://snapi.extrememusic.com' _REQUEST_HEADERS = None + _REQUIRE_VERSION = [] - def _process_version_arg(self, arg): - self._version_requested = arg - - def _set_request_headers(self, url, video_id, country=None): - if not self._REQUEST_HEADERS: - # the site serves different versions of the same playlist id due to geo-restriction, - # so use user's own country code or geo_bypass_country code - if not country: - country = self._download_webpage('https://ipapi.co/country_code', video_id) - env = self._download_json('https://www.extrememusic.com/env', video_id) - self._REQUEST_HEADERS = { - 'Accept': 'application/json', - 'Origin': 'https://www.extrememusic.com', - 'Referer': url, - 'Sec-Fetch-Mode': 'cors', - 'X-API-Auth': env['token'], - 'X-Site-Id': 4, - 'X-Viewer-Country': country.upper(), - } - return self._REQUEST_HEADERS + def _initialize(self, url, video_id, country=None): + self._REQUIRE_VERSION = self._configuration_arg('ver') or self._configuration_arg('version') + # This site serves different versions of the same playlist id due to geo-restriction + # use user's own country code if no code (geo_bypass_country or pre-defined country code) is provided + if not country: + country = self._download_webpage('https://ipapi.co/country_code', video_id) + env = self._download_json('https://www.extrememusic.com/env', video_id) + self._REQUEST_HEADERS = { + 'Accept': 'application/json', + 'Origin': 'https://www.extrememusic.com', + 'Referer': url, + 'Sec-Fetch-Mode': 'cors', + 'X-API-Auth': env['token'], + 'X-Site-Id': 4, + 'X-Viewer-Country': country.upper(), + } def _get_album_data(self, album_id, video_id, fatal=True): - self._process_version_arg(self._configuration_arg('ver') or self._configuration_arg('version')) album = self._download_json(f'{self._API_URL}/albums/{album_id}', video_id, fatal=fatal, - note='Downloading album data', headers=self._REQUEST_HEADERS) + note='Downloading album data', errnote='Unable to download album data', + headers=self._REQUEST_HEADERS) or {} if video_id == album_id: bio = self._download_json(f'{self._API_URL}/albums/{album_id}/bio', video_id, fatal=False, - note='Downloading album data', headers=self._REQUEST_HEADERS) - return merge_dicts(album, bio or {}) + note='Downloading album data', errnote='Unable to download album data', + headers=self._REQUEST_HEADERS) or {} + return merge_dicts(album, bio) else: return album def _extract_track(self, album_data, track_id=None, version_id=None): - album_data = album_data or {} if 'tracks' in album_data and 'track_sounds' in album_data: if not track_id and version_id: track_id = traverse_obj(album_data['track_sounds'], (lambda _, v: v['id'] == int(version_id), 'track_id', {int}), get_all=False) - track = traverse_obj(album_data['tracks'], - (lambda _, v: v['id'] == int(track_id), {dict}), get_all=False) - info = {**traverse_obj(track, { - 'track': ('title', {str}), - 'track_number': ('sort_order', {lambda v: v + 1}, {int}), - 'track_id': ('track_no', {str}), - 'description': ('description', {lambda v: str_or_none(v) or None}), - 'artists': ('artists', {lambda v: v or traverse_obj(album_data, ('album', 'artist'))}, - {lambda v: (v if isinstance(v, list) else [v]) if v else None}), - 'composers': ('composers', ..., 'name'), - 'genres': (('genre', 'subgenre'), ..., 'label'), - 'tag': ('keywords', ..., 'label'), - 'album': ('album_title', {lambda v: str_or_none(v) or None}), - }), **traverse_obj(album_data, { - 'album_artists': ('album', 'artist', {lambda v: [v] if v else None}), - 'upload_date': ('album', 'created', {unified_strdate}), - })} - entries, thumbnails = [], [] - for image in traverse_obj(track, ('images', 'default')): - thumbnails.append(traverse_obj(image, { - 'url': ('url', {url_or_none}), - 'width': ('width', {int_or_none}), - 'height': ('height', {int_or_none}), - })) - for idx, sound_id in enumerate([version_id] if version_id else track['track_sound_ids']): - if sound := traverse_obj(album_data['track_sounds'], - (lambda _, v: v['id'] == int(sound_id) and v['track_id'] == int(track_id), - {dict}), get_all=False): - if (version_id - or (not version_id and ((not self._version_requested and idx == 0) - or 'all' in self._version_requested - or sound['version_type'].lower() in self._version_requested))): - formats = [] - for audio_url in traverse_obj(sound, ('assets', 'audio', ('preview_url', - 'preview_url_hls'))): - if determine_ext(audio_url) == 'm3u8': - m3u8_url = re.sub(r'\.m3u8\?.*', '/HLS/128_v4.m3u8', audio_url) - for f in self._extract_m3u8_formats(m3u8_url, sound_id, 'mpeg', fatal=False): + if track := traverse_obj(album_data['tracks'], + (lambda _, v: v['id'] == int(track_id), {dict}), get_all=False): + info = {**traverse_obj(track, { + 'track': ('title', {str}), + 'track_number': ('sort_order', {lambda v: v + 1}, {int}), + 'track_id': ('track_no', {str}), + 'description': ('description', {lambda v: str_or_none(v) or None}), + 'artists': ('artists', {lambda v: v or traverse_obj(album_data, ('album', 'artist'))}, + {lambda v: (v if isinstance(v, list) else [v]) if v else None}), + 'composers': ('composers', ..., 'name'), + 'genres': (('genre', 'subgenre'), ..., 'label'), + 'tag': ('keywords', ..., 'label'), + 'album': ('album_title', {lambda v: str_or_none(v) or None}), + }), **traverse_obj(album_data, { + 'album_artists': ('album', 'artist', {lambda v: [v] if v else None}), + 'upload_date': ('album', 'created', {unified_strdate}), + })} + entries, thumbnails = [], [] + for image in traverse_obj(track, ('images', 'default')): + thumbnails.append(traverse_obj(image, { + 'url': ('url', {url_or_none}), + 'width': ('width', {int_or_none}), + 'height': ('height', {int_or_none}), + })) + if not self._REQUIRE_VERSION: + version_id = version_id or traverse_obj(track, 'default_track_sound_id', ('track_sound_ids', 0)) + for sound_id in [version_id] if version_id else track['track_sound_ids']: + if sound := traverse_obj(album_data['track_sounds'], + (lambda _, v: v['id'] == int(sound_id) and v['track_id'] == int(track_id), + {dict}), get_all=False): + if version_id or any(x in self._REQUIRE_VERSION for x in ['all', sound['version_type'].lower()]): + formats = [] + for audio_url in traverse_obj(sound, ('assets', 'audio', ('preview_url', + 'preview_url_hls'))): + if determine_ext(audio_url) == 'm3u8': + m3u8_url = re.sub(r'\.m3u8\?.*', '/HLS/128_v4.m3u8', audio_url) + for f in self._extract_m3u8_formats(m3u8_url, sound_id, 'mpeg', fatal=False): + formats.append({ + **f, + 'vcodec': 'none', + 'perference': -2, + }) + else: formats.append({ - **f, + 'url': audio_url, 'vcodec': 'none', - 'perference': -2, }) - else: - formats.append({ - 'url': audio_url, - 'vcodec': 'none', - }) - entries.append({ - 'id': str(sound_id), - 'title': join_nonempty('title', 'version_type', from_dict=sound, delim=' - '), - 'alt_title': sound['version_type'], - **info, - 'thumbnails': thumbnails, - 'duration': sound.get('duration'), - 'formats': formats, - 'webpage_url': f"https://www.extrememusic.com/albums/{track['album_id']}?item={track_id}&ver={sound_id}", - }) + entries.append({ + 'id': str(sound_id), + 'title': join_nonempty('title', 'version_type', from_dict=sound, delim=' - '), + 'alt_title': sound['version_type'], + **info, + 'thumbnails': thumbnails, + 'duration': sound.get('duration'), + 'formats': formats, + 'webpage_url': f"https://www.extrememusic.com/albums/{track['album_id']}?item={track_id}&ver={sound_id}", + }) - if len(entries) > 1: - return { - 'id': track_id, - **info, - 'entries': entries, - '_type': 'playlist', - } - elif len(entries) == 1: - return entries[0] + if len(entries) > 1: + return { + 'id': track_id, + **info, + 'entries': entries, + '_type': 'playlist', + } + elif len(entries) == 1: + return entries[0] + else: + self.raise_no_formats('Track data not found', video_id=track_id) return [] @@ -210,7 +208,7 @@ class ExtremeMusicIE(ExtremeMusicBaseIE): def _real_extract(self, url): album_id, track_id, version_id = self._match_valid_url(url).group('album', 'id', 'ver') - self._set_request_headers(url, track_id or version_id, self.get_param('geo_bypass_country') or 'DE') + self._initialize(url, track_id or version_id, self.get_param('geo_bypass_country') or 'DE') album_data = self._get_album_data(album_id, track_id or version_id) if result := self._extract_track(album_data, track_id, version_id): return result @@ -243,7 +241,7 @@ class ExtremeMusicAIE(ExtremeMusicBaseIE): def _real_extract(self, url): album_id = self._match_id(url) - self._set_request_headers(url, album_id, self.get_param('geo_bypass_country') or 'DE') + self._initialize(url, album_id, self.get_param('geo_bypass_country') or 'DE') album_data = self._get_album_data(album_id, album_id) entries = [] @@ -297,7 +295,7 @@ class ExtremeMusicPIE(ExtremeMusicBaseIE): def _real_extract(self, url): playlist_id = self._match_id(url) - headers = self._set_request_headers(url, playlist_id, self.get_param('geo_bypass_country')) + self._initialize(url, playlist_id, self.get_param('geo_bypass_country')) def playlist_query(playlist_id, offset, limit): # playlist api: https://snapi.extrememusic.com/playlists?id={playlist_id}&range={offset}%2C{limit}' @@ -306,7 +304,7 @@ def playlist_query(playlist_id, offset, limit): note=f'Downloading item {offset + 1}-{offset + limit}', query={ 'id': playlist_id, 'range': f'{offset},{limit}', - }, headers=headers) + }, headers=self._REQUEST_HEADERS) thumbnails, entries = [], [] album_data, track_done, limit = {}, [], 50 @@ -329,11 +327,11 @@ def playlist_query(playlist_id, offset, limit): else: entries.append(track) track_done.append(track_id) - if len(entries) >= playlist['playlist']['playlist_items_count']: + if len(track_done) >= playlist['playlist']['playlist_items_count']: break if entries: - if len(entries) < playlist['playlist']['playlist_items_count']: + if len(track_done) < playlist['playlist']['playlist_items_count']: self.report_warning('This playlist has geo-restricted items. Try using --xff to specify a different country code.') for image in traverse_obj(playlist['playlist'], ('images', 'square')):