diff --git a/instaloader/instaloadercontext.py b/instaloader/instaloadercontext.py index 27474b1..39f4361 100644 --- a/instaloader/instaloadercontext.py +++ b/instaloader/instaloadercontext.py @@ -529,6 +529,28 @@ class InstaloaderContext: :raises ConnectionException: When download repeatedly failed.""" self.write_raw(self.get_raw(url), filename) + def head(self, url: str, allow_redirects: bool = False) -> requests.Response: + """HEAD a URL anonymously. + + :raises QueryReturnedNotFoundException: When the server responds with a 404. + :raises QueryReturnedForbiddenException: When the server responds with a 403. + :raises ConnectionException: When request failed. + + .. versionadded:: 4.7.6 + """ + with self.get_anonymous_session() as anonymous_session: + resp = anonymous_session.head(url, allow_redirects=allow_redirects) + if resp.status_code == 200: + return resp + else: + if resp.status_code == 403: + # suspected invalid URL signature + raise QueryReturnedForbiddenException("403 when accessing {}.".format(url)) + if resp.status_code == 404: + # 404 not worth retrying. + raise QueryReturnedNotFoundException("404 when accessing {}.".format(url)) + raise ConnectionException("HTTP error code {}.".format(resp.status_code)) + @property def root_rhx_gis(self) -> Optional[str]: """rhx_gis string returned in the / query.""" diff --git a/instaloader/structures.py b/instaloader/structures.py index de21e5c..7de0f2f 100644 --- a/instaloader/structures.py +++ b/instaloader/structures.py @@ -5,7 +5,7 @@ from base64 import b64decode, b64encode from collections import namedtuple from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterable, Iterator, List, Optional, Union +from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union from . import __version__ from .exceptions import * @@ -377,13 +377,28 @@ class Post: def video_url(self) -> Optional[str]: """URL of the video, or None.""" if self.is_video: + version_urls = [self._field('video_url')] if self._context.iphone_support and self._context.is_logged_in: + version_urls.extend(version['url'] for version in self._iphone_struct['video_versions']) + else: + return version_urls[0] + url_candidates: List[Tuple[int, str]] = [] + for idx, version_url in enumerate(version_urls): + if any(url_candidate[1] == version_url for url_candidate in url_candidates): + # Skip duplicates + continue try: - url = self._iphone_struct['video_versions'][0]['url'] - return url + url_candidates.append(( + int(self._context.head(version_url, allow_redirects=True).headers.get('Content-Length', 0)), + version_url + )) except (InstaloaderException, KeyError, IndexError) as err: - self._context.error('{} Unable to fetch high quality video version of {}.'.format(err, self)) - return self._field('video_url') + self._context.error(f"Video URL candidate {idx+1}/{len(version_urls)} for {self}: {err}") + if not url_candidates: + # All candidates fail: Fallback to default URL and handle errors later at the actual download attempt + return version_urls[0] + url_candidates.sort() + return url_candidates[-1][1] return None @property @@ -556,8 +571,8 @@ class Post: return None location_id = int(loc['id']) if any(k not in loc for k in ('name', 'slug', 'has_public_page', 'lat', 'lng')): - loc = self._context.get_json("explore/locations/{0}/".format(location_id), - params={'__a': 1})['graphql']['location'] + loc.update(self._context.get_json("explore/locations/{0}/".format(location_id), + params={'__a': 1})['native_location_data']['location_info']) self._location = PostLocation(location_id, loc['name'], loc['slug'], loc['has_public_page'], loc['lat'], loc['lng']) return self._location @@ -1110,7 +1125,28 @@ class StoryItem: def video_url(self) -> Optional[str]: """URL of the video, or None.""" if self.is_video: - return self._node['video_resources'][-1]['src'] + version_urls = [self._node['video_resources'][-1]['src']] + if self._context.iphone_support and self._context.is_logged_in: + version_urls.extend(version['url'] for version in self._iphone_struct['video_versions']) + else: + return version_urls[0] + url_candidates: List[Tuple[int, str]] = [] + for idx, version_url in enumerate(version_urls): + if any(url_candidate[1] == version_url for url_candidate in url_candidates): + # Skip duplicates + continue + try: + url_candidates.append(( + int(self._context.head(version_url, allow_redirects=True).headers.get('Content-Length', 0)), + version_url + )) + except (InstaloaderException, KeyError, IndexError) as err: + self._context.error(f"Video URL candidate {idx+1}/{len(version_urls)} for {self}: {err}") + if not url_candidates: + # All candidates fail: Fallback to default URL and handle errors later at the actual download attempt + return version_urls[0] + url_candidates.sort() + return url_candidates[-1][1] return None