From 9b3014d5bf00ea667f609d17032c0ba22b77cda1 Mon Sep 17 00:00:00 2001 From: Alexander Graf Date: Wed, 11 Apr 2018 21:24:36 +0200 Subject: [PATCH] Fix anonymous GraphQL queries Port of 73ec884ea49cf099570b6b2dd4c6f1f751ae4b96 to v4-dev. --- instaloader/__main__.py | 3 +- instaloader/instaloader.py | 32 ++------------ instaloader/instaloadercontext.py | 30 ++++++++++--- instaloader/structures.py | 72 ++++++++++++++++++++++++++----- test/instaloader_unittests.py | 13 +++++- 5 files changed, 103 insertions(+), 47 deletions(-) diff --git a/instaloader/__main__.py b/instaloader/__main__.py index 20e61ae..8e95899 100644 --- a/instaloader/__main__.py +++ b/instaloader/__main__.py @@ -91,7 +91,8 @@ def _main(instaloader: Instaloader, targetlist: List[str], with instaloader.context.error_catcher(target): if target[0] == '@': instaloader.context.log("Retrieving followees of %s..." % target[1:]) - followees = instaloader.get_followees(Profile.from_username(instaloader.context, target[1:])) + profile = Profile.from_username(instaloader.context, target[1:]) + followees = profile.get_followees() profiles.update([followee['username'] for followee in followees]) elif target[0] == '#': instaloader.download_hashtag(hashtag=target[1:], max_count=max_count, fast_update=fast_update, diff --git a/instaloader/instaloader.py b/instaloader/instaloader.py index c9305b4..7fd9eef 100644 --- a/instaloader/instaloader.py +++ b/instaloader/instaloader.py @@ -135,34 +135,6 @@ class Instaloader: def __exit__(self, *args): self.close() - @_requires_login - def get_followers(self, profile: Profile) -> Iterator[Dict[str, Any]]: - """ - Retrieve list of followers of given profile. - To use this, one needs to be logged in and private profiles has to be followed, - otherwise this returns an empty list. - - :param profile: Name of profile to lookup followers. - """ - yield from self.context.graphql_node_list("37479f2b8209594dde7facb0d904896a", - {'id': str(profile.userid)}, - 'https://www.instagram.com/' + profile.username + '/', - lambda d: d['data']['user']['edge_followed_by']) - - @_requires_login - def get_followees(self, profile: Profile) -> Iterator[Dict[str, Any]]: - """ - Retrieve list of followees (followings) of given profile. - To use this, one needs to be logged in and private profiles has to be followed, - otherwise this returns an empty list. - - :param profile: Name of profile to lookup followers. - """ - yield from self.context.graphql_node_list("58712303d941c6855d4e888c5f0cd22f", - {'id': str(profile.userid)}, - 'https://www.instagram.com/' + profile.username + '/', - lambda d: d['data']['user']['edge_follow']) - def download_pic(self, filename: str, url: str, mtime: datetime, filename_alt: Optional[str] = None, filename_suffix: Optional[str] = None) -> bool: """Downloads and saves picture with given url under given directory with given timestamp. @@ -600,10 +572,12 @@ class Instaloader: @_requires_login def get_explore_posts(self) -> Iterator[Post]: """Get Posts which are worthy of exploring suggested by Instagram.""" + data = self.context.get_json('explore/', {}) yield from (Post(self.context, node) for node in self.context.graphql_node_list("df0dcc250c2b18d9fd27c5581ef33c7c", {}, 'https://www.instagram.com/explore/', - lambda d: d['data']['user']['edge_web_discover_media'])) + lambda d: d['data']['user']['edge_web_discover_media'], + data['rhx_gis'])) def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]: """Get Posts associated with a #hashtag.""" diff --git a/instaloader/instaloadercontext.py b/instaloader/instaloadercontext.py index 789b9db..684a930 100644 --- a/instaloader/instaloadercontext.py +++ b/instaloader/instaloadercontext.py @@ -1,6 +1,8 @@ +import hashlib import json import pickle import random +import re import shutil import sys import textwrap @@ -234,7 +236,14 @@ class InstaloaderContext: raise TooManyRequestsException("429 - Too Many Requests") if resp.status_code != 200: raise ConnectionException("HTTP error code {}.".format(resp.status_code)) - resp_json = resp.json() + is_html_query = not is_graphql_query and not "__a" in params and host == "www.instagram.com" + if is_html_query: + match = re.search(r'window\._sharedData = (.*);', resp.text) + if match is None: + raise ConnectionException("Could not find \"window._sharedData\" in html response.") + return json.loads(match.group(1)) + else: + resp_json = resp.json() if 'status' in resp_json and resp_json['status'] != "ok": if 'message' in resp_json: raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'], @@ -265,13 +274,14 @@ class InstaloaderContext: raise ConnectionException(error_string) def graphql_query(self, query_hash: str, variables: Dict[str, Any], - referer: Optional[str] = None) -> Dict[str, Any]: + referer: Optional[str] = None, rhx_gis: Optional[str] = None) -> Dict[str, Any]: """ Do a GraphQL Query. :param query_hash: Query identifying hash. :param variables: Variables for the Query. :param referer: HTTP Referer, or None. + :param rhx_gis: 'rhx_gis' variable as somewhere returned by Instagram, needed to 'sign' request :return: The server's response dictionary. """ tmpsession = copy_session(self._session) @@ -283,9 +293,18 @@ class InstaloaderContext: tmpsession.headers['accept'] = '*/*' if referer is not None: tmpsession.headers['referer'] = urllib.parse.quote(referer) + + variables_json = json.dumps(variables, separators=(',', ':')) + + if rhx_gis: + #self.log("rhx_gis {} query_hash {}".format(rhx_gis, query_hash)) + values = "{}:{}:{}:{}".format(rhx_gis, tmpsession.cookies['csrftoken'], self.user_agent, variables_json) + x_instagram_gis = hashlib.md5(values.encode()).hexdigest() + tmpsession.headers['x-instagram-gis'] = x_instagram_gis + resp_json = self.get_json('graphql/query', params={'query_hash': query_hash, - 'variables': json.dumps(variables, separators=(',', ':'))}, + 'variables': variables_json}, session=tmpsession) tmpsession.close() if 'status' not in resp_json: @@ -295,17 +314,18 @@ class InstaloaderContext: def graphql_node_list(self, query_hash: str, query_variables: Dict[str, Any], query_referer: Optional[str], edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]], + rhx_gis: Optional[str] = None, first_data: Optional[Dict[str, Any]] = None) -> Iterator[Dict[str, Any]]: """Retrieve a list of GraphQL nodes.""" query_variables['first'] = GRAPHQL_PAGE_LENGTH if first_data: data = first_data else: - data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer)) + data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer, rhx_gis)) yield from (edge['node'] for edge in data['edges']) while data['page_info']['has_next_page']: query_variables['after'] = data['page_info']['end_cursor'] - data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer)) + data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer, rhx_gis)) yield from (edge['node'] for edge in data['edges']) def get_and_write_raw(self, url: str, filename: str, _attempt=1) -> None: diff --git a/instaloader/structures.py b/instaloader/structures.py index 19ae390..910177f 100644 --- a/instaloader/structures.py +++ b/instaloader/structures.py @@ -56,6 +56,7 @@ class Post: self._node = node self._owner_profile = owner_profile self._full_metadata_dict = None + self._rhx_gis_str = None @classmethod def from_shortcode(cls, context: InstaloaderContext, shortcode: str): @@ -91,16 +92,22 @@ class Post: def __hash__(self) -> int: return hash(self.shortcode) + def _obtain_metadata(self): + if not self._full_metadata_dict: + pic_json = self._context.get_json("p/{0}/".format(self.shortcode), params={}) + self._full_metadata_dict = pic_json['entry_data']['PostPage'][0]['graphql']['shortcode_media'] + self._rhx_gis_str = pic_json['rhx_gis'] + @property def _full_metadata(self) -> Dict[str, Any]: - if not self._full_metadata_dict: - pic_json = self._context.get_json("p/{0}/".format(self.shortcode), params={'__a': 1}) - if "graphql" in pic_json: - self._full_metadata_dict = pic_json["graphql"]["shortcode_media"] - else: - self._full_metadata_dict = pic_json["media"] + self._obtain_metadata() return self._full_metadata_dict + @property + def _rhx_gis(self) -> str: + self._obtain_metadata() + return self._rhx_gis_str + def _field(self, *keys) -> Any: """Lookups given fields in _node, and if not found in _full_metadata. Raises KeyError if not found anywhere.""" try: @@ -252,7 +259,8 @@ class Post: yield from self._context.graphql_node_list("33ba35852cb50da46f5b5e889df7d159", {'shortcode': self.shortcode}, 'https://www.instagram.com/p/' + self.shortcode + '/', - lambda d: d['data']['shortcode_media']['edge_media_to_comment']) + lambda d: d['data']['shortcode_media']['edge_media_to_comment'], + self._rhx_gis) def get_likes(self) -> Iterator[Dict[str, Any]]: """Iterate over all likes of the post. @@ -270,7 +278,8 @@ class Post: return yield from self._context.graphql_node_list("1cb6ec562846122743b61e492c85999f", {'shortcode': self.shortcode}, 'https://www.instagram.com/p/' + self.shortcode + '/', - lambda d: d['data']['shortcode_media']['edge_liked_by']) + lambda d: d['data']['shortcode_media']['edge_liked_by'], + self._rhx_gis) def get_location(self) -> Optional[Dict[str, str]]: """If the Post has a location, returns a dictionary with fields 'lat' and 'lng'.""" @@ -311,6 +320,7 @@ class Profile: assert 'username' in node self._context = context self._node = node + self._rhx_gis = None @classmethod def from_username(cls, context: InstaloaderContext, username: str): @@ -340,8 +350,10 @@ class Profile: def _obtain_metadata(self): try: - metadata = self._context.get_json('{}/'.format(self.username), params={'__a': 1}) - self._node = metadata['graphql']['user'] if 'graphql' in metadata else metadata['user'] + if not self._rhx_gis: + metadata = self._context.get_json('{}/'.format(self.username), params={}) + self._node = metadata['entry_data']['ProfilePage'][0]['graphql']['user'] + self._rhx_gis = metadata['rhx_gis'] except QueryReturnedNotFoundException: raise ProfileNotExistsException('Profile {} does not exist.'.format(self.username)) @@ -434,11 +446,13 @@ class Profile: def get_posts(self) -> Iterator[Post]: """Retrieve all posts from a profile.""" + self._obtain_metadata() yield from (Post(self._context, node, self) for node in self._context.graphql_node_list("472f257a40c653c64c666ce877d59d2b", {'id': self.userid}, 'https://www.instagram.com/{0}/'.format(self.username), lambda d: d['data']['user']['edge_owner_to_timeline_media'], + self._rhx_gis, self._metadata('edge_owner_to_timeline_media'))) def get_saved_posts(self) -> Iterator[Post]: @@ -447,13 +461,51 @@ class Profile: if self.username != self._context.username: raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username)) + self._obtain_metadata() yield from (Post(self._context, node) for node in self._context.graphql_node_list("f883d95537fbcd400f466f63d42bd8a1", {'id': self.userid}, 'https://www.instagram.com/{0}/'.format(self.username), lambda d: d['data']['user']['edge_saved_media'], + self._rhx_gis, self._metadata('edge_saved_media'))) + def get_followers(self) -> Iterator[Dict[str, Any]]: + """ + Retrieve list of followers of given profile. + To use this, one needs to be logged in and private profiles has to be followed, + otherwise this returns an empty list. + + :param profile: Name of profile to lookup followers. + """ + if not self._context.is_logged_in: + raise LoginRequiredException("--login required to get a profile's followers.") + self._obtain_metadata() + yield from self._context.graphql_node_list("37479f2b8209594dde7facb0d904896a", + {'id': str(self.userid)}, + 'https://www.instagram.com/' + self.username + '/', + lambda d: d['data']['user']['edge_followed_by'], + self._rhx_gis) + + def get_followees(self) -> Iterator[Dict[str, Any]]: + """ + Retrieve list of followees (followings) of given profile. + To use this, one needs to be logged in and private profiles has to be followed, + otherwise this returns an empty list. + + :param profile: Name of profile to lookup followers. + """ + if not self._context.is_logged_in: + raise LoginRequiredException("--login required to get a profile's followees.") + self._obtain_metadata() + yield from self._context.graphql_node_list("58712303d941c6855d4e888c5f0cd22f", + {'id': str(self.userid)}, + 'https://www.instagram.com/' + self.username + '/', + lambda d: d['data']['user']['edge_follow'], + self._rhx_gis) + + + class StoryItem: """ diff --git a/test/instaloader_unittests.py b/test/instaloader_unittests.py index 8e949a5..63772ba 100644 --- a/test/instaloader_unittests.py +++ b/test/instaloader_unittests.py @@ -82,12 +82,14 @@ class TestInstaloader(unittest.TestCase): def test_get_followees(self): self.L.load_session_from_file(OWN_USERNAME) - for f in self.L.get_followees(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)): + profile = instaloader.Profile.from_username(self.L.context, OWN_USERNAME) + for f in profile.get_followees(): print(f['username']) def test_get_followers(self): self.L.load_session_from_file(OWN_USERNAME) - for f in self.L.get_followers(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)): + profile = instaloader.Profile.from_username(self.L.context, OWN_USERNAME) + for f in profile.get_followers(): print(f['username']) def test_get_username_by_id(self): @@ -112,6 +114,13 @@ class TestInstaloader(unittest.TestCase): self.assertEqual(post, post2) break + def test_explore_paging(self): + self.L.load_session_from_file(OWN_USERNAME) + for count, post in enumerate(self.L.get_explore_posts()): + print(post) + if count == PAGING_MAX_COUNT: + break + if __name__ == '__main__': unittest.main()