1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-08-18 20:59:38 +02:00

Download best-quality video (#1232)

Co-authored-by: Alexander Graf <17130992+aandergr@users.noreply.github.com>
This commit is contained in:
fireattack 2021-08-04 10:42:14 -05:00 committed by GitHub
parent ae39ab9893
commit 327fcfd8e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 60 additions and 6 deletions

View File

@ -527,6 +527,28 @@ class InstaloaderContext:
:raises ConnectionException: When download repeatedly failed."""
self.write_raw(self.get_raw(url), filename)
def head(self, url: str, allow_redirects: bool = False) -> requests.Response:
"""HEAD a URL anonymously.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises QueryReturnedForbiddenException: When the server responds with a 403.
:raises ConnectionException: When request failed.
.. versionadded:: 4.7.6
"""
with self.get_anonymous_session() as anonymous_session:
resp = anonymous_session.head(url, allow_redirects=allow_redirects)
if resp.status_code == 200:
return resp
else:
if resp.status_code == 403:
# suspected invalid URL signature
raise QueryReturnedForbiddenException("403 when accessing {}.".format(url))
if resp.status_code == 404:
# 404 not worth retrying.
raise QueryReturnedNotFoundException("404 when accessing {}.".format(url))
raise ConnectionException("HTTP error code {}.".format(resp.status_code))
@property
def root_rhx_gis(self) -> Optional[str]:
"""rhx_gis string returned in the / query."""

View File

@ -4,7 +4,7 @@ import re
from base64 import b64decode, b64encode
from collections import namedtuple
from datetime import datetime
from typing import Any, Dict, Iterable, Iterator, List, Optional, Union
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from . import __version__
from .exceptions import *
@ -374,13 +374,26 @@ class Post:
def video_url(self) -> Optional[str]:
"""URL of the video, or None."""
if self.is_video:
version_urls = [self._field('video_url')]
if self._context.is_logged_in:
version_urls.extend(version['url'] for version in self._iphone_struct['video_versions'])
url_candidates: List[Tuple[int, str]] = []
for idx, version_url in enumerate(version_urls):
if any(url_candidate[1] == version_url for url_candidate in url_candidates):
# Skip duplicates
continue
try:
url = self._iphone_struct['video_versions'][0]['url']
return url
url_candidates.append((
int(self._context.head(version_url, allow_redirects=True).headers.get('Content-Length', 0)),
version_url
))
except (InstaloaderException, KeyError, IndexError) as err:
self._context.error('{} Unable to fetch high quality video version of {}.'.format(err, self))
return self._field('video_url')
self._context.error(f"Video URL candidate {idx+1}/{len(version_urls)} for {self}: {err}")
if not url_candidates:
# All candidates fail: Fallback to default URL and handle errors later at the actual download attempt
return version_urls[0]
url_candidates.sort()
return url_candidates[-1][1]
return None
@property
@ -1103,7 +1116,26 @@ class StoryItem:
def video_url(self) -> Optional[str]:
"""URL of the video, or None."""
if self.is_video:
return self._node['video_resources'][-1]['src']
version_urls = [self._node['video_resources'][-1]['src']]
if self._context.is_logged_in:
version_urls.extend(version['url'] for version in self._iphone_struct['video_versions'])
url_candidates: List[Tuple[int, str]] = []
for idx, version_url in enumerate(version_urls):
if any(url_candidate[1] == version_url for url_candidate in url_candidates):
# Skip duplicates
continue
try:
url_candidates.append((
int(self._context.head(version_url, allow_redirects=True).headers.get('Content-Length', 0)),
version_url
))
except (InstaloaderException, KeyError, IndexError) as err:
self._context.error(f"Video URL candidate {idx+1}/{len(version_urls)} for {self}: {err}")
if not url_candidates:
# All candidates fail: Fallback to default URL and handle errors later at the actual download attempt
return version_urls[0]
url_candidates.sort()
return url_candidates[-1][1]
return None