1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-11-05 01:42:29 +01:00
instaloader/instaloader/instaloadercontext.py
2018-09-06 09:17:38 +02:00

433 lines
21 KiB
Python

import hashlib
import json
import pickle
import random
import re
import shutil
import sys
import textwrap
import time
import urllib.parse
from contextlib import contextmanager
from datetime import datetime, timedelta
from typing import Any, Callable, Dict, Iterator, Optional
import requests
import requests.utils
import urllib3
from .exceptions import *
def copy_session(session: requests.Session) -> requests.Session:
"""Duplicates a requests.Session."""
new = requests.Session()
new.cookies = requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies))
new.headers = session.headers.copy()
return new
def default_user_agent() -> str:
return 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36'
class InstaloaderContext:
"""Class providing methods for (error) logging and low-level communication with Instagram.
It is not thought to be instantiated directly, rather :class:`Instaloader` instances maintain a context
object.
For logging, it provides :meth:`log`, :meth:`error`, :meth:`error_catcher`.
It provides low-level communication routines :meth:`get_json`, :meth:`graphql_query`, :meth:`graphql_node_list`,
:meth:`get_and_write_raw` and implements mechanisms for rate controlling and error handling.
Further, it provides methods for logging in and general session handles, which are used by that routines in
class :class:`Instaloader`.
"""
def __init__(self, sleep: bool = True, quiet: bool = False, user_agent: Optional[str] = None,
graphql_count_per_slidingwindow: Optional[int] = None, max_connection_attempts: int = 3):
self.user_agent = user_agent if user_agent is not None else default_user_agent()
self._session = self.get_anonymous_session()
self.username = None
self.sleep = sleep
self.quiet = quiet
self.max_connection_attempts = max_connection_attempts
self._graphql_page_length = 50
self.graphql_count_per_slidingwindow = graphql_count_per_slidingwindow or 200
self._root_rhx_gis = None
# error log, filled with error() and printed at the end of Instaloader.main()
self.error_log = []
# For the adaption of sleep intervals (rate control)
self.query_timestamps = list()
# Can be set to True for testing, disables supression of InstaloaderContext._error_catcher
self.raise_all_errors = False
# Cache profile from id (mapping from id to Profile)
self.profile_id_cache = dict()
@contextmanager
def anonymous_copy(self):
session = self._session
username = self.username
self._session = self.get_anonymous_session()
self.username = None
try:
yield self
finally:
self._session.close()
self.username = username
self._session = session
@property
def is_logged_in(self) -> bool:
"""True, if this Instaloader instance is logged in."""
return bool(self.username)
def log(self, *msg, sep='', end='\n', flush=False):
"""Log a message to stdout that can be suppressed with --quiet."""
if not self.quiet:
print(*msg, sep=sep, end=end, flush=flush)
def error(self, msg, repeat_at_end=True):
"""Log a non-fatal error message to stderr, which is repeated at program termination.
:param msg: Message to be printed.
:param repeat_at_end: Set to false if the message should be printed, but not repeated at program termination."""
print(msg, file=sys.stderr)
if repeat_at_end:
self.error_log.append(msg)
def close(self):
"""Print error log and close session"""
if self.error_log and not self.quiet:
print("\nErrors occured:", file=sys.stderr)
for err in self.error_log:
print(err, file=sys.stderr)
self._session.close()
@contextmanager
def error_catcher(self, extra_info: Optional[str] = None):
"""
Context manager to catch, print and record InstaloaderExceptions.
:param extra_info: String to prefix error message with."""
try:
yield
except InstaloaderException as err:
if extra_info:
self.error('{}: {}'.format(extra_info, err))
else:
self.error('{}'.format(err))
if self.raise_all_errors:
raise
def _default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]:
"""Returns default HTTP header we use for requests."""
header = {'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive',
'Content-Length': '0',
'Host': 'www.instagram.com',
'Origin': 'https://www.instagram.com',
'Referer': 'https://www.instagram.com/',
'User-Agent': self.user_agent,
'X-Instagram-AJAX': '1',
'X-Requested-With': 'XMLHttpRequest'}
if empty_session_only:
del header['Host']
del header['Origin']
del header['Referer']
del header['X-Instagram-AJAX']
del header['X-Requested-With']
return header
def get_anonymous_session(self) -> requests.Session:
"""Returns our default anonymous requests.Session object."""
session = requests.Session()
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
session.headers.update(self._default_http_header(empty_session_only=True))
return session
def save_session_to_file(self, sessionfile):
"""Not meant to be used directly, use :meth:`Instaloader.save_session_to_file`."""
pickle.dump(requests.utils.dict_from_cookiejar(self._session.cookies), sessionfile)
def load_session_from_file(self, username, sessionfile):
"""Not meant to be used directly, use :meth:`Instaloader.load_session_from_file`."""
session = requests.Session()
session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile))
session.headers.update(self._default_http_header())
session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']})
self._session = session
self.username = username
def test_login(self) -> Optional[str]:
"""Not meant to be used directly, use :meth:`Instaloader.test_login`."""
data = self.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {})
return data["data"]["user"]["username"] if data["data"]["user"] is not None else None
def login(self, user, passwd):
"""Not meant to be used directly, use :meth:`Instaloader.login`.
:raises InvalidArgumentException: If the provided username does not exist.
:raises BadCredentialsException: If the provided password is wrong.
:raises ConnectionException: If connection to Instagram failed."""
import http.client
# pylint:disable=protected-access
http.client._MAXHEADERS = 200
session = requests.Session()
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
session.headers.update(self._default_http_header())
session.headers.update({'X-CSRFToken': self.get_json('', {})['config']['csrf_token']})
# Not using self.get_json() here, because we need to access csrftoken cookie
self._sleep()
login = session.post('https://www.instagram.com/accounts/login/ajax/',
data={'password': passwd, 'username': user}, allow_redirects=True)
if login.status_code != 200:
raise ConnectionException("Login error: {} {}".format(login.status_code, login.reason))
resp_json = login.json()
if resp_json['status'] != 'ok':
if 'message' in resp_json:
raise ConnectionException("Login error: \"{}\" status, message \"{}\".".format(resp_json['status'],
resp_json['message']))
else:
raise ConnectionException("Login error: \"{}\" status.".format(resp_json['status']))
if not resp_json['authenticated']:
if resp_json['user']:
# '{"authenticated": false, "user": true, "status": "ok"}'
raise BadCredentialsException('Login error: Wrong password.')
else:
# '{"authenticated": false, "user": false, "status": "ok"}'
# Raise InvalidArgumentException rather than BadCredentialException, because BadCredentialException
# triggers re-asking of password in Instaloader.interactive_login(), which makes no sense if the
# username is invalid.
raise InvalidArgumentException('Login error: User {} does not exist.'.format(user))
# '{"authenticated": true, "user": true, "userId": ..., "oneTapPrompt": false, "status": "ok"}'
session.headers.update({'X-CSRFToken': login.cookies['csrftoken']})
self._session = session
self.username = user
def _sleep(self):
"""Sleep a short time if self.sleep is set. Called before each request to instagram.com."""
if self.sleep:
time.sleep(min(random.expovariate(0.7), 5.0))
def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram.com',
session: Optional[requests.Session] = None, _attempt=1) -> Dict[str, Any]:
"""JSON request to Instagram.
:param path: URL, relative to the given domain which defaults to www.instagram.com/
:param params: GET parameters
:param host: Domain part of the URL from where to download the requested JSON; defaults to www.instagram.com
:param session: Session to use, or None to use self.session
:return: Decoded response dictionary
:raises QueryReturnedBadRequestException: When the server responds with a 400.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises ConnectionException: When query repeatedly failed.
"""
def graphql_query_waittime(untracked_queries: bool = False) -> int:
sliding_window = 660
if not self.query_timestamps:
return sliding_window if untracked_queries else 0
current_time = time.monotonic()
self.query_timestamps = list(filter(lambda t: t > current_time - sliding_window, self.query_timestamps))
if len(self.query_timestamps) < self.graphql_count_per_slidingwindow and not untracked_queries:
return 0
return round(min(self.query_timestamps) + sliding_window - current_time) + 6
is_graphql_query = 'query_hash' in params and 'graphql/query' in path
# some queries are not rate limited if invoked anonymously:
query_not_limited = is_graphql_query and not self.is_logged_in \
and params['query_hash'] in ['9ca88e465c3f866a76f7adee3871bdd8']
if is_graphql_query and not query_not_limited:
waittime = graphql_query_waittime()
if waittime > 0:
self.log('\nToo many queries in the last time. Need to wait {} seconds, until {:%H:%M}.'
.format(waittime, datetime.now() + timedelta(seconds=waittime)))
time.sleep(waittime)
if self.query_timestamps is not None:
self.query_timestamps.append(time.monotonic())
else:
self.query_timestamps = [time.monotonic()]
sess = session if session else self._session
try:
self._sleep()
resp = sess.get('https://{0}/{1}'.format(host, path), params=params, allow_redirects=False)
while resp.is_redirect:
redirect_url = resp.headers['location']
self.log('\nHTTP redirect from https://{0}/{1} to {2}'.format(host, path, redirect_url))
if redirect_url.startswith('https://{}/'.format(host)):
resp = sess.get(redirect_url if redirect_url.endswith('/') else redirect_url + '/',
params=params, allow_redirects=False)
else:
break
if resp.status_code == 400:
raise QueryReturnedBadRequestException("400 Bad Request")
if resp.status_code == 404:
raise QueryReturnedNotFoundException("404 Not Found")
if resp.status_code == 429:
raise TooManyRequestsException("429 Too Many Requests")
if resp.status_code != 200:
raise ConnectionException("HTTP error code {}.".format(resp.status_code))
is_html_query = not is_graphql_query and not "__a" in params and host == "www.instagram.com"
if is_html_query:
match = re.search(r'window\._sharedData = (.*);</script>', resp.text)
if match is None:
raise ConnectionException("Could not find \"window._sharedData\" in html response.")
return json.loads(match.group(1))
else:
resp_json = resp.json()
if 'status' in resp_json and resp_json['status'] != "ok":
if 'message' in resp_json:
raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'],
resp_json['message']))
else:
raise ConnectionException("Returned \"{}\" status.".format(resp_json['status']))
return resp_json
except (ConnectionException, json.decoder.JSONDecodeError, requests.exceptions.RequestException) as err:
error_string = "JSON Query to {}: {}".format(path, err)
if _attempt == self.max_connection_attempts:
raise ConnectionException(error_string) from err
self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False)
text_for_429 = ("HTTP error code 429 was returned because too many queries occured in the last time. "
"Please do not use Instagram in your browser or run multiple instances of Instaloader "
"in parallel.")
try:
if isinstance(err, TooManyRequestsException):
print(textwrap.fill(text_for_429), file=sys.stderr)
if is_graphql_query:
print("Made {} GraphQL requests.".format(len(self.query_timestamps)), file=sys.stderr)
waittime = graphql_query_waittime(untracked_queries=True)
if waittime > 0:
self.log('The request will be retried in {} seconds, at {:%H:%M}.'
.format(waittime, datetime.now() + timedelta(seconds=waittime)))
time.sleep(waittime)
self._sleep()
return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1)
except KeyboardInterrupt:
self.error("[skipped by user]", repeat_at_end=False)
raise ConnectionException(error_string) from err
def graphql_query(self, query_hash: str, variables: Dict[str, Any],
referer: Optional[str] = None, rhx_gis: Optional[str] = None) -> Dict[str, Any]:
"""
Do a GraphQL Query.
:param query_hash: Query identifying hash.
:param variables: Variables for the Query.
:param referer: HTTP Referer, or None.
:param rhx_gis: 'rhx_gis' variable as somewhere returned by Instagram, needed to 'sign' request
:return: The server's response dictionary.
"""
with copy_session(self._session) as tmpsession:
tmpsession.headers.update(self._default_http_header(empty_session_only=True))
del tmpsession.headers['Connection']
del tmpsession.headers['Content-Length']
tmpsession.headers['authority'] = 'www.instagram.com'
tmpsession.headers['scheme'] = 'https'
tmpsession.headers['accept'] = '*/*'
if referer is not None:
tmpsession.headers['referer'] = urllib.parse.quote(referer)
variables_json = json.dumps(variables, separators=(',', ':'))
if rhx_gis:
#self.log("rhx_gis {} query_hash {}".format(rhx_gis, query_hash))
values = "{}:{}".format(rhx_gis, variables_json)
x_instagram_gis = hashlib.md5(values.encode()).hexdigest()
tmpsession.headers['x-instagram-gis'] = x_instagram_gis
resp_json = self.get_json('graphql/query',
params={'query_hash': query_hash,
'variables': variables_json},
session=tmpsession)
if 'status' not in resp_json:
self.error("GraphQL response did not contain a \"status\" field.")
return resp_json
def graphql_node_list(self, query_hash: str, query_variables: Dict[str, Any],
query_referer: Optional[str],
edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
rhx_gis: Optional[str] = None,
first_data: Optional[Dict[str, Any]] = None) -> Iterator[Dict[str, Any]]:
"""Retrieve a list of GraphQL nodes."""
def _query():
query_variables['first'] = self._graphql_page_length
try:
return edge_extractor(self.graphql_query(query_hash, query_variables, query_referer, rhx_gis))
except QueryReturnedBadRequestException:
new_page_length = int(self._graphql_page_length / 2)
if new_page_length >= 12:
self._graphql_page_length = new_page_length
self.error("HTTP Error 400 (Bad Request) on GraphQL Query. Retrying with shorter page length.",
repeat_at_end=False)
return _query()
else:
raise
if first_data:
data = first_data
else:
data = _query()
yield from (edge['node'] for edge in data['edges'])
while data['page_info']['has_next_page']:
query_variables['after'] = data['page_info']['end_cursor']
data = _query()
yield from (edge['node'] for edge in data['edges'])
def get_and_write_raw(self, url: str, filename: str, _attempt=1) -> None:
"""Downloads raw data.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises QueryReturnedForbiddenException: When the server responds with a 403.
:raises ConnectionException: When download repeatedly failed."""
try:
with self.get_anonymous_session() as anonymous_session:
resp = anonymous_session.get(url, stream=True)
if resp.status_code == 200:
self.log(filename, end=' ', flush=True)
with open(filename, 'wb') as file:
resp.raw.decode_content = True
shutil.copyfileobj(resp.raw, file)
else:
if resp.status_code == 403:
# suspected invalid URL signature
raise QueryReturnedForbiddenException("403 when accessing {}.".format(url))
if resp.status_code == 404:
# 404 not worth retrying.
raise QueryReturnedNotFoundException("404 when accessing {}.".format(url))
raise ConnectionException("HTTP error code {}.".format(resp.status_code))
except (urllib3.exceptions.HTTPError, requests.exceptions.RequestException, ConnectionException) as err:
error_string = "URL {}: {}".format(url, err)
if _attempt == self.max_connection_attempts:
raise ConnectionException(error_string) from err
self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False)
try:
self._sleep()
self.get_and_write_raw(url, filename, _attempt + 1)
except KeyboardInterrupt:
self.error("[skipped by user]", repeat_at_end=False)
raise ConnectionException(error_string) from err
@property
def root_rhx_gis(self) -> Optional[str]:
"""rhx_gis string returned in the / query."""
if self.is_logged_in:
# At the moment, rhx_gis seems to be required for anonymous requests only. By returning None when logged
# in, we can save the root_rhx_gis lookup query.
return None
if not self._root_rhx_gis:
self._root_rhx_gis = self.get_json('', {})['rhx_gis']
return self._root_rhx_gis