1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-07-07 11:42:38 +02:00

Fix anonymous GraphQL queries

Port of 73ec884ea4 to v4-dev.
This commit is contained in:
Alexander Graf 2018-04-11 21:24:36 +02:00
parent 020830d591
commit 9b3014d5bf
5 changed files with 103 additions and 47 deletions

View File

@ -91,7 +91,8 @@ def _main(instaloader: Instaloader, targetlist: List[str],
with instaloader.context.error_catcher(target):
if target[0] == '@':
instaloader.context.log("Retrieving followees of %s..." % target[1:])
followees = instaloader.get_followees(Profile.from_username(instaloader.context, target[1:]))
profile = Profile.from_username(instaloader.context, target[1:])
followees = profile.get_followees()
profiles.update([followee['username'] for followee in followees])
elif target[0] == '#':
instaloader.download_hashtag(hashtag=target[1:], max_count=max_count, fast_update=fast_update,

View File

@ -135,34 +135,6 @@ class Instaloader:
def __exit__(self, *args):
self.close()
@_requires_login
def get_followers(self, profile: Profile) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followers of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
"""
yield from self.context.graphql_node_list("37479f2b8209594dde7facb0d904896a",
{'id': str(profile.userid)},
'https://www.instagram.com/' + profile.username + '/',
lambda d: d['data']['user']['edge_followed_by'])
@_requires_login
def get_followees(self, profile: Profile) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followees (followings) of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
"""
yield from self.context.graphql_node_list("58712303d941c6855d4e888c5f0cd22f",
{'id': str(profile.userid)},
'https://www.instagram.com/' + profile.username + '/',
lambda d: d['data']['user']['edge_follow'])
def download_pic(self, filename: str, url: str, mtime: datetime,
filename_alt: Optional[str] = None, filename_suffix: Optional[str] = None) -> bool:
"""Downloads and saves picture with given url under given directory with given timestamp.
@ -600,10 +572,12 @@ class Instaloader:
@_requires_login
def get_explore_posts(self) -> Iterator[Post]:
"""Get Posts which are worthy of exploring suggested by Instagram."""
data = self.context.get_json('explore/', {})
yield from (Post(self.context, node)
for node in self.context.graphql_node_list("df0dcc250c2b18d9fd27c5581ef33c7c",
{}, 'https://www.instagram.com/explore/',
lambda d: d['data']['user']['edge_web_discover_media']))
lambda d: d['data']['user']['edge_web_discover_media'],
data['rhx_gis']))
def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]:
"""Get Posts associated with a #hashtag."""

View File

@ -1,6 +1,8 @@
import hashlib
import json
import pickle
import random
import re
import shutil
import sys
import textwrap
@ -234,7 +236,14 @@ class InstaloaderContext:
raise TooManyRequestsException("429 - Too Many Requests")
if resp.status_code != 200:
raise ConnectionException("HTTP error code {}.".format(resp.status_code))
resp_json = resp.json()
is_html_query = not is_graphql_query and not "__a" in params and host == "www.instagram.com"
if is_html_query:
match = re.search(r'window\._sharedData = (.*);</script>', resp.text)
if match is None:
raise ConnectionException("Could not find \"window._sharedData\" in html response.")
return json.loads(match.group(1))
else:
resp_json = resp.json()
if 'status' in resp_json and resp_json['status'] != "ok":
if 'message' in resp_json:
raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'],
@ -265,13 +274,14 @@ class InstaloaderContext:
raise ConnectionException(error_string)
def graphql_query(self, query_hash: str, variables: Dict[str, Any],
referer: Optional[str] = None) -> Dict[str, Any]:
referer: Optional[str] = None, rhx_gis: Optional[str] = None) -> Dict[str, Any]:
"""
Do a GraphQL Query.
:param query_hash: Query identifying hash.
:param variables: Variables for the Query.
:param referer: HTTP Referer, or None.
:param rhx_gis: 'rhx_gis' variable as somewhere returned by Instagram, needed to 'sign' request
:return: The server's response dictionary.
"""
tmpsession = copy_session(self._session)
@ -283,9 +293,18 @@ class InstaloaderContext:
tmpsession.headers['accept'] = '*/*'
if referer is not None:
tmpsession.headers['referer'] = urllib.parse.quote(referer)
variables_json = json.dumps(variables, separators=(',', ':'))
if rhx_gis:
#self.log("rhx_gis {} query_hash {}".format(rhx_gis, query_hash))
values = "{}:{}:{}:{}".format(rhx_gis, tmpsession.cookies['csrftoken'], self.user_agent, variables_json)
x_instagram_gis = hashlib.md5(values.encode()).hexdigest()
tmpsession.headers['x-instagram-gis'] = x_instagram_gis
resp_json = self.get_json('graphql/query',
params={'query_hash': query_hash,
'variables': json.dumps(variables, separators=(',', ':'))},
'variables': variables_json},
session=tmpsession)
tmpsession.close()
if 'status' not in resp_json:
@ -295,17 +314,18 @@ class InstaloaderContext:
def graphql_node_list(self, query_hash: str, query_variables: Dict[str, Any],
query_referer: Optional[str],
edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
rhx_gis: Optional[str] = None,
first_data: Optional[Dict[str, Any]] = None) -> Iterator[Dict[str, Any]]:
"""Retrieve a list of GraphQL nodes."""
query_variables['first'] = GRAPHQL_PAGE_LENGTH
if first_data:
data = first_data
else:
data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer))
data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer, rhx_gis))
yield from (edge['node'] for edge in data['edges'])
while data['page_info']['has_next_page']:
query_variables['after'] = data['page_info']['end_cursor']
data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer))
data = edge_extractor(self.graphql_query(query_hash, query_variables, query_referer, rhx_gis))
yield from (edge['node'] for edge in data['edges'])
def get_and_write_raw(self, url: str, filename: str, _attempt=1) -> None:

View File

@ -56,6 +56,7 @@ class Post:
self._node = node
self._owner_profile = owner_profile
self._full_metadata_dict = None
self._rhx_gis_str = None
@classmethod
def from_shortcode(cls, context: InstaloaderContext, shortcode: str):
@ -91,16 +92,22 @@ class Post:
def __hash__(self) -> int:
return hash(self.shortcode)
def _obtain_metadata(self):
if not self._full_metadata_dict:
pic_json = self._context.get_json("p/{0}/".format(self.shortcode), params={})
self._full_metadata_dict = pic_json['entry_data']['PostPage'][0]['graphql']['shortcode_media']
self._rhx_gis_str = pic_json['rhx_gis']
@property
def _full_metadata(self) -> Dict[str, Any]:
if not self._full_metadata_dict:
pic_json = self._context.get_json("p/{0}/".format(self.shortcode), params={'__a': 1})
if "graphql" in pic_json:
self._full_metadata_dict = pic_json["graphql"]["shortcode_media"]
else:
self._full_metadata_dict = pic_json["media"]
self._obtain_metadata()
return self._full_metadata_dict
@property
def _rhx_gis(self) -> str:
self._obtain_metadata()
return self._rhx_gis_str
def _field(self, *keys) -> Any:
"""Lookups given fields in _node, and if not found in _full_metadata. Raises KeyError if not found anywhere."""
try:
@ -252,7 +259,8 @@ class Post:
yield from self._context.graphql_node_list("33ba35852cb50da46f5b5e889df7d159",
{'shortcode': self.shortcode},
'https://www.instagram.com/p/' + self.shortcode + '/',
lambda d: d['data']['shortcode_media']['edge_media_to_comment'])
lambda d: d['data']['shortcode_media']['edge_media_to_comment'],
self._rhx_gis)
def get_likes(self) -> Iterator[Dict[str, Any]]:
"""Iterate over all likes of the post.
@ -270,7 +278,8 @@ class Post:
return
yield from self._context.graphql_node_list("1cb6ec562846122743b61e492c85999f", {'shortcode': self.shortcode},
'https://www.instagram.com/p/' + self.shortcode + '/',
lambda d: d['data']['shortcode_media']['edge_liked_by'])
lambda d: d['data']['shortcode_media']['edge_liked_by'],
self._rhx_gis)
def get_location(self) -> Optional[Dict[str, str]]:
"""If the Post has a location, returns a dictionary with fields 'lat' and 'lng'."""
@ -311,6 +320,7 @@ class Profile:
assert 'username' in node
self._context = context
self._node = node
self._rhx_gis = None
@classmethod
def from_username(cls, context: InstaloaderContext, username: str):
@ -340,8 +350,10 @@ class Profile:
def _obtain_metadata(self):
try:
metadata = self._context.get_json('{}/'.format(self.username), params={'__a': 1})
self._node = metadata['graphql']['user'] if 'graphql' in metadata else metadata['user']
if not self._rhx_gis:
metadata = self._context.get_json('{}/'.format(self.username), params={})
self._node = metadata['entry_data']['ProfilePage'][0]['graphql']['user']
self._rhx_gis = metadata['rhx_gis']
except QueryReturnedNotFoundException:
raise ProfileNotExistsException('Profile {} does not exist.'.format(self.username))
@ -434,11 +446,13 @@ class Profile:
def get_posts(self) -> Iterator[Post]:
"""Retrieve all posts from a profile."""
self._obtain_metadata()
yield from (Post(self._context, node, self) for node in
self._context.graphql_node_list("472f257a40c653c64c666ce877d59d2b",
{'id': self.userid},
'https://www.instagram.com/{0}/'.format(self.username),
lambda d: d['data']['user']['edge_owner_to_timeline_media'],
self._rhx_gis,
self._metadata('edge_owner_to_timeline_media')))
def get_saved_posts(self) -> Iterator[Post]:
@ -447,13 +461,51 @@ class Profile:
if self.username != self._context.username:
raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username))
self._obtain_metadata()
yield from (Post(self._context, node) for node in
self._context.graphql_node_list("f883d95537fbcd400f466f63d42bd8a1",
{'id': self.userid},
'https://www.instagram.com/{0}/'.format(self.username),
lambda d: d['data']['user']['edge_saved_media'],
self._rhx_gis,
self._metadata('edge_saved_media')))
def get_followers(self) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followers of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
"""
if not self._context.is_logged_in:
raise LoginRequiredException("--login required to get a profile's followers.")
self._obtain_metadata()
yield from self._context.graphql_node_list("37479f2b8209594dde7facb0d904896a",
{'id': str(self.userid)},
'https://www.instagram.com/' + self.username + '/',
lambda d: d['data']['user']['edge_followed_by'],
self._rhx_gis)
def get_followees(self) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followees (followings) of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
"""
if not self._context.is_logged_in:
raise LoginRequiredException("--login required to get a profile's followees.")
self._obtain_metadata()
yield from self._context.graphql_node_list("58712303d941c6855d4e888c5f0cd22f",
{'id': str(self.userid)},
'https://www.instagram.com/' + self.username + '/',
lambda d: d['data']['user']['edge_follow'],
self._rhx_gis)
class StoryItem:
"""

View File

@ -82,12 +82,14 @@ class TestInstaloader(unittest.TestCase):
def test_get_followees(self):
self.L.load_session_from_file(OWN_USERNAME)
for f in self.L.get_followees(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)):
profile = instaloader.Profile.from_username(self.L.context, OWN_USERNAME)
for f in profile.get_followees():
print(f['username'])
def test_get_followers(self):
self.L.load_session_from_file(OWN_USERNAME)
for f in self.L.get_followers(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)):
profile = instaloader.Profile.from_username(self.L.context, OWN_USERNAME)
for f in profile.get_followers():
print(f['username'])
def test_get_username_by_id(self):
@ -112,6 +114,13 @@ class TestInstaloader(unittest.TestCase):
self.assertEqual(post, post2)
break
def test_explore_paging(self):
self.L.load_session_from_file(OWN_USERNAME)
for count, post in enumerate(self.L.get_explore_posts()):
print(post)
if count == PAGING_MAX_COUNT:
break
if __name__ == '__main__':
unittest.main()