1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-10-03 22:07:11 +02:00

Fix anonymous GraphQL queries

Closes #94.
This commit is contained in:
André Koch-Kramer 2018-04-11 14:19:24 +02:00
parent 74d5e35eb8
commit 73ec884ea4

View File

@ -3,6 +3,7 @@
"""Download pictures (or videos) along with their captions and other metadata from Instagram."""
import ast
import getpass
import hashlib
import json
import os
import pickle
@ -201,6 +202,7 @@ class Post:
self._profile = profile
self._profile_id = profile_id
self._full_metadata_dict = None
self._rhx_gis = None
@classmethod
def from_shortcode(cls, instaloader: 'Instaloader', shortcode: str):
@ -239,11 +241,9 @@ class Post:
@property
def _full_metadata(self) -> Dict[str, Any]:
if not self._full_metadata_dict:
pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={'__a': 1})
if "graphql" in pic_json:
self._full_metadata_dict = pic_json["graphql"]["shortcode_media"]
else:
self._full_metadata_dict = pic_json["media"]
pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={})
self._full_metadata_dict = pic_json['entry_data']['PostPage'][0]['graphql']['shortcode_media']
self._rhx_gis = pic_json['rhx_gis']
return self._full_metadata_dict
def _field(self, *keys) -> Any:
@ -387,7 +387,8 @@ class Post:
return
yield from self._instaloader.graphql_node_list(17852405266163336, {'shortcode': self.shortcode},
'https://www.instagram.com/p/' + self.shortcode + '/',
lambda d: d['data']['shortcode_media']['edge_media_to_comment'])
lambda d: d['data']['shortcode_media']['edge_media_to_comment'],
rhx_gis=self._rhx_gis)
def get_likes(self) -> Iterator[Dict[str, Any]]:
"""Iterate over all likes of the post.
@ -405,7 +406,8 @@ class Post:
return
yield from self._instaloader.graphql_node_list("1cb6ec562846122743b61e492c85999f", {'shortcode': self.shortcode},
'https://www.instagram.com/p/' + self.shortcode + '/',
lambda d: d['data']['shortcode_media']['edge_liked_by'])
lambda d: d['data']['shortcode_media']['edge_liked_by'],
rhx_gis=self._rhx_gis)
def get_location(self) -> Optional[Dict[str, str]]:
"""If the Post has a location, returns a dictionary with fields 'lat' and 'lng'."""
@ -602,7 +604,7 @@ class Instaloader:
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises ConnectionException: When query repeatedly failed.
"""
def graphql_query_waittime(query_id: int, untracked_queries: bool = False) -> int:
def graphql_query_waittime(query_id: Union[int, str], untracked_queries: bool = False) -> int:
sliding_window = 660
timestamps = self.previous_queries.get(query_id)
if not timestamps:
@ -613,9 +615,9 @@ class Instaloader:
if len(timestamps) < 100 and not untracked_queries:
return 0
return round(min(timestamps) + sliding_window - current_time) + 6
is_graphql_query = 'query_id' in params and 'graphql/query' in path
is_graphql_query = 'graphql/query' in path
if is_graphql_query:
query_id = params['query_id']
query_id = params['query_id'] if 'query_id' in params else params['query_hash']
waittime = graphql_query_waittime(query_id)
if waittime > 0:
self._log('\nToo many queries in the last time. Need to wait {} seconds.'.format(waittime))
@ -635,7 +637,13 @@ class Instaloader:
raise TooManyRequests("429 - Too Many Requests")
if resp.status_code != 200:
raise ConnectionException("HTTP error code {}.".format(resp.status_code))
resp_json = resp.json()
if not is_graphql_query and not "__a" in params and host == "www.instagram.com":
match = re.search(r'window\._sharedData = (.*);</script>', resp.text)
if match is None:
raise ConnectionException("Could not find \"window._sharedData\" in html response.")
return json.loads(match.group(1))
else:
resp_json = resp.json()
if 'status' in resp_json and resp_json['status'] != "ok":
if 'message' in resp_json:
raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'],
@ -695,7 +703,7 @@ class Instaloader:
return session
def graphql_query(self, query_identifier: Union[int, str], variables: Dict[str, Any],
referer: Optional[str] = None) -> Dict[str, Any]:
referer: Optional[str] = None, rhx_gis: Optional[str] = None) -> Dict[str, Any]:
"""
Do a GraphQL Query.
@ -713,9 +721,18 @@ class Instaloader:
tmpsession.headers['accept'] = '*/*'
if referer is not None:
tmpsession.headers['referer'] = urllib.parse.quote(referer)
variables_json = json.dumps(variables, separators=(',', ':'))
if rhx_gis:
values = "{}:{}:{}:{}".format(rhx_gis, tmpsession.cookies['csrftoken'], self.user_agent, variables_json)
x_instagram_gis = hashlib.md5(values.encode()).hexdigest()
tmpsession.cookies.set('ig_pr', '2')
tmpsession.headers['x-instagram-gis'] = x_instagram_gis
resp_json = self.get_json('graphql/query',
params={'query_id' if isinstance(query_identifier, int) else 'query_hash': query_identifier,
'variables': json.dumps(variables, separators=(',', ':'))},
'variables': variables_json},
session=tmpsession)
if 'status' not in resp_json:
self.error("GraphQL response did not contain a \"status\" field.")
@ -740,20 +757,21 @@ class Instaloader:
def get_id_by_username(self, profile: str) -> int:
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
his/her username. To get said ID, given the profile's name, you may call this function."""
return int(self.get_profile_metadata(profile)['user']['id'])
return int(self.get_profile_metadata(profile)[0]['user']['id'])
def graphql_node_list(self, query_identifier: Union[int, str], query_variables: Dict[str, Any],
query_referer: Optional[str],
edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
rhx_gis: Optional[str] = None) -> Iterator[Dict[str, Any]]:
"""Retrieve a list of GraphQL nodes."""
query_variables['first'] = Instaloader.GRAPHQL_PAGE_LENGTH
data = self.graphql_query(query_identifier, query_variables, query_referer)
data = self.graphql_query(query_identifier, query_variables, query_referer, rhx_gis)
while True:
edge_struct = edge_extractor(data)
yield from [edge['node'] for edge in edge_struct['edges']]
if edge_struct['page_info']['has_next_page']:
query_variables['after'] = edge_struct['page_info']['end_cursor']
data = self.graphql_query(query_identifier, query_variables, query_referer)
data = self.graphql_query(query_identifier, query_variables, query_referer, rhx_gis)
else:
break
@ -1257,7 +1275,7 @@ class Instaloader:
if not self.is_logged_in:
return
data = self.get_profile_metadata(self.username)
data, full_metadata = self.get_profile_metadata(self.username)
user_id = data["user"]["id"]
while True:
@ -1277,7 +1295,8 @@ class Instaloader:
break
data = self.graphql_query("f883d95537fbcd400f466f63d42bd8a1",
{'id': user_id, 'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': saved_media["page_info"]["end_cursor"]})['data']
'after': saved_media["page_info"]["end_cursor"]},
rhx_gis=full_metadata['rhx_gis'])['data']
def download_saved_posts(self, max_count: int = None, fast_update: bool = False,
filter_func: Optional[Callable[[Post], bool]] = None) -> None:
@ -1403,15 +1422,15 @@ class Instaloader:
return profile, profile_id
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]:
def get_profile_metadata(self, profile_name: str) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""Retrieves a profile's metadata, for use with e.g. :meth:`get_profile_posts` and :meth:`check_profile_id`."""
try:
metadata = self.get_json('{}/'.format(profile_name), params={'__a': 1})
return metadata['graphql'] if 'graphql' in metadata else metadata
metadata = self.get_json('{}/'.format(profile_name), params={})
return metadata['entry_data']['ProfilePage'][0]['graphql'], metadata
except QueryReturnedNotFoundException:
raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name))
def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Post]:
def get_profile_posts(self, profile_metadata: Dict[str, Any], rhx_gis: str) -> Iterator[Post]:
"""Retrieve all posts from a profile."""
profile_name = profile_metadata['user']['username']
profile_id = int(profile_metadata['user']['id'])
@ -1432,7 +1451,7 @@ class Instaloader:
data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': end_cursor},
'https://www.instagram.com/{0}/'.format(profile_name))
referer='https://www.instagram.com/{0}/'.format(profile_name), rhx_gis=rhx_gis)
media = data['data']['user']['edge_owner_to_timeline_media']
yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id)
for edge in media['edges'])
@ -1452,14 +1471,14 @@ class Instaloader:
with suppress(ProfileNotExistsException):
# ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we
# must suppress it here.
profile_metadata = self.get_profile_metadata(name)
profile_metadata, full_metadata = self.get_profile_metadata(name)
# check if profile does exist or name has changed since last download
# and update name and json data if necessary
name_updated, profile_id = self.check_profile_id(name, profile_metadata)
if name_updated != name:
name = name_updated
profile_metadata = self.get_profile_metadata(name)
profile_metadata, full_metadata = self.get_profile_metadata(name)
# Download profile picture
if profile_pic or profile_pic_only:
@ -1494,7 +1513,7 @@ class Instaloader:
else:
totalcount = profile_metadata["user"]["edge_owner_to_timeline_media"]["count"]
count = 1
for post in self.get_profile_posts(profile_metadata):
for post in self.get_profile_posts(profile_metadata, rhx_gis=full_metadata['rhx_gis']):
self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
count += 1
if filter_func is not None and not filter_func(post):