From 58882f508e9d533772dbe0c242551c205684e6e1 Mon Sep 17 00:00:00 2001 From: Alexander Graf Date: Sun, 6 Aug 2017 19:27:46 +0200 Subject: [PATCH] Major code cleanup Remove many code duplications, merely by using more pythonic idioms. Use GraphQL more often. Better cope with errors: All requests can be retried; failed requests do not cause program termination; all error strings are repeated to the user at the end of execution. download_post() (formerly download_node()) does not repeat node metadata request (before this commit, this request was executed up to three times). --- README.rst | 16 +- instaloader.py | 710 +++++++++++++++++++++++++------------------------ 2 files changed, 367 insertions(+), 359 deletions(-) diff --git a/README.rst b/README.rst index a7ed7c4..d6ce865 100644 --- a/README.rst +++ b/README.rst @@ -235,15 +235,13 @@ For example, to get a list of all followees and a list of all followers of a pro loader.interactive_login(USERNAME) # Retrieve followees - followees = loader.get_followees(PROFILE) print(PROFILE + " follows these profiles:") - for f in followees: + for f in loader.get_followees(PROFILE): print("\t%s\t%s" % (f['username'], f['full_name'])) # Retrieve followers - followers = loader.get_followers(PROFILE) print("Followers of " + PROFILE + ":") - for f in followers: + for f in loader.get_followers(PROFILE): print("\t%s\t%s" % (f['username'], f['full_name'])) Then, you may download all pictures of all followees with @@ -252,7 +250,7 @@ Then, you may download all pictures of all followees with for f in followees: try: - loader.download(f['username']) + loader.download_profile(f['username']) except instaloader.NonfatalException: pass @@ -260,9 +258,11 @@ You could also download your last 20 liked pics with .. code:: python - loader.download_feed_pics(max_count=20, fast_update=True, - filter_func=lambda node: - not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"]) + loader.download_feed_posts(max_count=20, fast_update=True, + filter_func=lambda node: + not node["likes"]["viewer_has_liked"] + if "likes" in node else + not node["viewer_has_liked"]) To download the last 20 pictures with hashtag #cat, do diff --git a/instaloader.py b/instaloader.py index 3f00b50..3dfeb1d 100755 --- a/instaloader.py +++ b/instaloader.py @@ -15,9 +15,10 @@ import tempfile import time from argparse import ArgumentParser from base64 import b64decode, b64encode +from contextlib import contextmanager, suppress from datetime import datetime from io import BytesIO -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple import requests import requests.utils @@ -52,11 +53,11 @@ class NonfatalException(InstaloaderException): pass -class ProfileNotExistsException(NonfatalException): +class QueryReturnedNotFoundException(InstaloaderException): pass -class ProfileAccessDeniedException(NonfatalException): +class ProfileNotExistsException(NonfatalException): pass @@ -88,7 +89,7 @@ class BadCredentialsException(InstaloaderException): pass -class ConnectionException(InstaloaderException): +class ConnectionException(NonfatalException): pass @@ -149,17 +150,41 @@ class Instaloader: self.dirname_pattern = dirname_pattern if dirname_pattern is not None else '{target}' self.filename_pattern = filename_pattern.replace('{date}', '{date:%Y-%m-%d_%H-%M-%S}') \ if filename_pattern is not None else '{date:%Y-%m-%d_%H-%M-%S}' + self.error_log = [] def _log(self, *msg, sep='', end='\n', flush=False): + """Log a message to stdout that can be suppressed with --quiet.""" if not self.quiet: print(*msg, sep=sep, end=end, flush=flush) + def _error(self, msg: str): + """Log a non-fatal error message to stderr, which is repeated at program termination.""" + print(msg, file=sys.stderr) + self.error_log.append(msg) + + @contextmanager + def _error_catcher(self, extra_info: Optional[str] = None): + """ + Context manager to catch, output and record NonfatalExceptions. + + :param extra_info: String to prefix error message with.""" + try: + yield + except NonfatalException as err: + if extra_info: + self._error('{}: {}'.format(extra_info, err)) + else: + self._error('{}'.format(err)) + def _sleep(self): """Sleep a short, random time if self.sleep is set. Called before each request to the instagram.com.""" if self.sleep: - time.sleep(random.uniform(0.25, 2.0)) + time.sleep(random.uniform(0.5, 1.75)) def _get_and_write_raw(self, url: str, filename: str, tries: int = 3) -> None: + """Downloads raw data. + + :raises ConnectionException: When download repeatedly failed.""" try: resp = self.get_anonymous_session().get(url, stream=True) if resp.status_code == 200: @@ -168,27 +193,43 @@ class Instaloader: resp.raw.decode_content = True shutil.copyfileobj(resp.raw, file) else: - raise ConnectionException("Request returned HTTP error code {}.".format(resp.status_code)) + raise ConnectionException("HTTP error code {}.".format(resp.status_code)) except (urllib3.exceptions.HTTPError, requests.exceptions.RequestException, ConnectionException) as err: - print("URL: {}\n{}".format(url, err), file=sys.stderr) + error_string = "URL {}: {}".format(url, err) if tries <= 1: - raise NodeUnavailableException + raise ConnectionException(error_string) + else: + self._error(error_string) self._sleep() self._get_and_write_raw(url, filename, tries - 1) - def get_json(self, name: str, session: requests.Session = None, - max_id: Optional[str] = None) -> Optional[Dict[str, Any]]: - """Return JSON of a profile""" - if session is None: - session = self.session - self._sleep() - if not max_id: - resp = session.get('https://www.instagram.com/' + name) - else: - resp = session.get('https://www.instagram.com/' + name, params={'max_id': max_id}) - match = re.search('window\\._sharedData = .*<', resp.text) - if match is not None: - return json.loads(match.group(0)[21:-2]) + def _get_json(self, url: str, params: Optional[Dict[str, Any]] = None, + session: Optional[requests.Session] = None, tries: int = 3) -> Dict[str, Any]: + """JSON request to Instagram. + + :param url: URL, relative to https://www.instagram.com/ + :param params: GET parameters + :param session: Session to use, or None to use self.session + :param tries: Maximum number of attempts until a exception is raised + :return: Decoded response dictionary + """ + sess = session if session else self.session + try: + self._sleep() + resp = sess.get('https://www.instagram.com/' + url, params=params) + if resp.status_code == 404: + raise QueryReturnedNotFoundException("404") + if resp.status_code != 200: + raise ConnectionException("HTTP error code {}.".format(resp.status_code)) + return resp.json() + except (ConnectionException, json.decoder.JSONDecodeError) as err: + error_string = "JSON Query to {}: {}".format(url, err) + if tries <= 1: + raise ConnectionException(error_string) + else: + self._error(error_string) + self._sleep() + self._get_json(url, params, sess, tries - 1) def default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]: """Returns default HTTP header we use for requests.""" @@ -238,13 +279,10 @@ class Instaloader: tmpsession.headers['accept'] = '*/*' if referer is not None: tmpsession.headers['referer'] = referer - self._sleep() - response = tmpsession.get('https://www.instagram.com/graphql/query', - params={'query_id': query_id, - 'variables': json.dumps(variables, separators=(',', ':'))}) - if response.status_code != 200: - raise ConnectionException("GraphQL query returned HTTP error code {}.".format(response.status_code)) - return response.json() + return self._get_json('graphql/query', + params={'query_id': query_id, + 'variables': json.dumps(variables, separators=(',', ':'))}, + session=tmpsession) def get_username_by_id(self, profile_id: int) -> str: """To get the current username of a profile, given its unique ID, this function can be used.""" @@ -261,89 +299,49 @@ class Instaloader: raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").") else: shortcode = mediaid_to_shortcode(int(data['edges'][0]["node"]["id"])) - data = self.get_json("p/" + shortcode) - return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['owner']['username'] + return self.get_post_metadata(shortcode)['owner']['username'] def get_id_by_username(self, profile: str) -> int: """Each Instagram profile has its own unique ID which stays unmodified even if a user changes his/her username. To get said ID, given the profile's name, you may call this function.""" - data = self.get_json(profile, session=self.get_anonymous_session()) - if "ProfilePage" not in data["entry_data"]: - raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) - return int(data['entry_data']['ProfilePage'][0]['user']['id']) + return int(self.get_profile_metadata(profile)['user']['id']) - def get_followers(self, profile: str) -> List[Dict[str, Any]]: + def graphql_node_list(self, query_id: int, query_variables: Dict[str, Any], query_referer: Optional[str], + edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]]) -> Iterator[Dict[str, Any]]: + query_variables['first'] = 500 + data = self.graphql_query(query_id, query_variables, query_referer) + while True: + edge_struct = edge_extractor(data) + yield from [edge['node'] for edge in edge_struct['edges']] + if edge_struct['page_info']['has_next_page']: + query_variables['after'] = edge_struct['page_info']['end_cursor'] + data = self.graphql_query(query_id, query_variables, query_referer) + else: + break + + def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]: """ Retrieve list of followers of given profile. To use this, one needs to be logged in and private profiles has to be followed, otherwise this returns an empty list. :param profile: Name of profile to lookup followers. - :return: List of followers (list of dictionaries). """ - profile_id = self.get_id_by_username(profile) - data = self.graphql_query(17851374694183129, {'id': str(profile_id), - 'first': 500}, - referer='https://www.instagram.com/' + profile + '/') - followers = [] - while True: - edge_followed_by = data['data']['user']['edge_followed_by'] - followers.extend([follower['node'] for follower in edge_followed_by['edges']]) - page_info = edge_followed_by['page_info'] - if page_info['has_next_page']: - data = self.graphql_query(17851374694183129, {'id': str(profile_id), - 'first': 500, - 'after': page_info['end_cursor']}, - referer='https://www.instagram.com/' + profile + '/') - else: - break - return followers + yield from self.graphql_node_list(17851374694183129, {'id': str(self.get_id_by_username(profile))}, + 'https://www.instagram.com/' + profile + '/', + lambda d: d['data']['user']['edge_followed_by']) - def get_followees(self, profile: str) -> List[Dict[str, Any]]: + def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]: """ Retrieve list of followees (followings) of given profile. To use this, one needs to be logged in and private profiles has to be followed, otherwise this returns an empty list. :param profile: Name of profile to lookup followers. - :return: List of followees (list of dictionaries). """ - profile_id = self.get_id_by_username(profile) - data = self.graphql_query(17874545323001329, {'id': profile_id, - 'first': 500}, - referer='https://www.instagram.com/' + profile + '/') - followees = [] - while True: - edge_follow = data['data']['user']['edge_follow'] - followees.extend([followee['node'] for followee in edge_follow['edges']]) - page_info = edge_follow['page_info'] - if page_info['has_next_page']: - data = self.graphql_query(17874545323001329, {'id': profile_id, - 'first': 500, - 'after': page_info['end_cursor']}, - referer='https://www.instagram.com/' + profile + '/') - else: - break - return followees - - def get_comments(self, shortcode: str) -> List[Dict[str, Any]]: - """Retrieve comments of node with given shortcode.""" - data = self.graphql_query(17852405266163336, {'shortcode': shortcode, - 'first': 500}, - referer='https://www.instagram.com/p/' + shortcode + '/') - comments = [] - while True: - edge_media_to_comment = data['data']['shortcode_media']['edge_media_to_comment'] - comments.extend([comment['node'] for comment in edge_media_to_comment['edges']]) - page_info = edge_media_to_comment['page_info'] - if page_info['has_next_page']: - data = self.graphql_query(17852405266163336, {'shortcode': shortcode, - 'first': 500, - 'after': page_info['end_cursor']}, - referer='https://www.instagram.com/p/' + shortcode + '/') - else: - break - return comments + yield from self.graphql_node_list(17874545323001329, {'id': str(self.get_id_by_username(profile))}, + 'https://www.instagram.com/' + profile + '/', + lambda d: d['data']['user']['edge_follow']) def download_pic(self, filename: str, url: str, mtime: datetime, filename_suffix: Optional[str] = None) -> bool: @@ -361,6 +359,12 @@ class Instaloader: os.utime(filename, (datetime.now().timestamp(), mtime.timestamp())) return True + def get_comments(self, shortcode: str) -> Iterator[Dict[str, Any]]: + """Retrieve comments of node with given shortcode.""" + yield from self.graphql_node_list(17852405266163336, {'shortcode': shortcode}, + 'https://www.instagram.com/p/' + shortcode + '/', + lambda d: d['data']['shortcode_media']['edge_media_to_comment']) + def update_comments(self, filename: str, shortcode: str) -> None: filename += '_comments.json' try: @@ -393,7 +397,7 @@ class Instaloader: pcaption = "txt" else: pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']' - try: + with suppress(FileNotFoundError): with open(filename, 'rb') as file: file_caption = file.read() if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'): @@ -416,8 +420,6 @@ class Instaloader: self._log(pcaption + ' updated', end=' ', flush=True) except UnicodeEncodeError: self._log('txt updated', end=' ', flush=True) - except FileNotFoundError: - pass try: self._log(pcaption, end=' ', flush=True) except UnicodeEncodeError: @@ -463,6 +465,7 @@ class Instaloader: url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index + offset:] self._get_and_write_raw(url, filename) os.utime(filename, (datetime.now().timestamp(), date_object.timestamp())) + self._log('') # log output of _get_and_write_raw() does not produce \n def save_session_to_file(self, filename: Optional[str] = None) -> None: """Saves requests.Session object.""" @@ -495,14 +498,11 @@ class Instaloader: self.session = session self.username = username - def test_login(self, session: requests.Session) -> Optional[str]: + def test_login(self, session: Optional[requests.Session]) -> Optional[str]: """Returns the Instagram username to which given requests.Session object belongs, or None.""" - if self.session is None: - return - data = self.get_json(str(), session=session) - if data['config']['viewer'] is None: - return - return data['config']['viewer']['username'] + if session: + data = self._get_json('', params={'__a': 1}, session=session) + return data['graphql']['user']['username'] if 'graphql' in data else None def login(self, user: str, passwd: str) -> None: """Log in to instagram with given username and password and internally store session object""" @@ -527,51 +527,34 @@ class Instaloader: else: raise ConnectionException('Login error! Connection error!') - def get_feed_json(self, end_cursor: str = None) -> Dict[str, Any]: - """ - Get JSON of the user's feed. + def get_post_metadata(self, shortcode: str, tries: int = 3) -> Dict[str, Any]: + """Get full metadata of the post associated with given shortcode. - :param end_cursor: The end cursor, as from json["feed"]["media"]["page_info"]["end_cursor"] - :return: JSON - """ - if end_cursor is None: - return self.get_json(str())["entry_data"]["FeedPage"][0] - return self.graphql_query(17863003771166879, {'fetch_media_item_count': 12, - 'fetch_media_item_cursor': end_cursor, - 'fetch_comment_count': 4, - 'fetch_like': 10}) - - def get_node_metadata(self, node_code: str, tries: int = 3) -> Dict[str, Any]: - pic_json = self.get_json("p/" + node_code) + :raises NodeUnavailableException: If the data cannot be retrieved.""" + pic_json = self._get_json("p/{0}/".format(shortcode), params={'__a': 1}) try: - media = pic_json["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] \ - if "graphql" in pic_json["entry_data"]["PostPage"][0] \ - else pic_json["entry_data"]["PostPage"][0]["media"] + media = pic_json["graphql"]["shortcode_media"] if "graphql" in pic_json else pic_json["media"] except KeyError as err: - print(err, file=sys.stderr) print(json.dumps(pic_json, indent=4), file=sys.stderr) + error_string = "Post {}: {}".format(shortcode, err) if tries <= 1: - raise NodeUnavailableException + raise NodeUnavailableException(error_string) + else: + self._error(error_string) self._sleep() - media = self.get_node_metadata(node_code, tries - 1) + media = self.get_post_metadata(shortcode, tries - 1) return media - def get_location(self, node_code: str) -> Dict[str, str]: - try: - media = self.get_node_metadata(node_code) - except NodeUnavailableException: - print("Unable to lookup location for node \"https://www.instagram.com/p/{}/\".".format(node_code), - file=sys.stderr) - return dict() - if media["location"] is not None: - location_json = self.get_json("explore/locations/" + - media["location"]["id"]) - return location_json["entry_data"]["LocationsPage"][0]["location"] + def get_location(self, post_metadata: Dict[str, Any]) -> Optional[Dict[str, str]]: + if post_metadata["location"] is not None: + location_json = self._get_json("explore/locations/{0}/".format(post_metadata["location"]["id"]), + params={'__a': 1}) + return location_json["location"] - def download_node(self, node: Dict[str, Any], profile: Optional[str], target: str, + def download_post(self, node: Dict[str, Any], profile: Optional[str], target: str, download_videos: bool = True, geotags: bool = False, download_comments: bool = False) -> bool: """ - Download everything associated with one instagram node, i.e. picture, caption and video. + Download everything associated with one instagram post node, i.e. picture, caption and video. :param node: Node, as from media->nodes list in instagram's JSONs :param profile: Name of profile to which this node belongs @@ -585,36 +568,34 @@ class Instaloader: needs_profilename = (format_string_contains_key(self.dirname_pattern, 'profile') or format_string_contains_key(self.filename_pattern, 'profile')) shortcode = node['shortcode'] if 'shortcode' in node else node['code'] + post_metadata = None if needs_profilename: if already_has_profilename: profilename = profile if profile is not None else node['owner']['username'] + profilename = profilename.lower() else: try: - metadata = self.get_node_metadata(shortcode) - profilename = metadata['owner']['username'] - except NodeUnavailableException: - print("Unable to gather profilename for node " - "\"https://www.instagram.com/p/{}/\".".format(shortcode), file=sys.stderr) + post_metadata = self.get_post_metadata(shortcode) + profilename = post_metadata['owner']['username'].lower() + except (NonfatalException, KeyError) as err: + self._error("Unable to get owner name of post {}: {} -- using \'UNKNOWN\'.".format(shortcode, err)) profilename = 'UNKNOWN' else: profilename = None - profilename = profilename.lower() if profilename else None date = datetime.fromtimestamp(node["date"] if "date" in node else node["taken_at_timestamp"]) dirname = self.dirname_pattern.format(profile=profilename, target=target.lower()) filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(), date=date, shortcode=shortcode) os.makedirs(os.path.dirname(filename), exist_ok=True) + url = node["display_url"] if "display_url" in node else node["display_src"] if '__typename' in node: if node['__typename'] == 'GraphSidecar': - self._sleep() - sidecar_data = self.session.get('https://www.instagram.com/p/' + shortcode + '/', - params={'__a': 1}).json() + if not post_metadata: + post_metadata = self.get_post_metadata(shortcode) edge_number = 1 downloaded = True - media = sidecar_data["graphql"]["shortcode_media"] if "graphql" in sidecar_data else sidecar_data[ - "media"] - for edge in media['edge_sidecar_to_children']['edges']: + for edge in post_metadata['edge_sidecar_to_children']['edges']: edge_downloaded = self.download_pic(filename=filename, url=edge['node']['display_url'], mtime=date, @@ -622,7 +603,6 @@ class Instaloader: downloaded = downloaded and edge_downloaded edge_number += 1 elif node['__typename'] in ['GraphImage', 'GraphVideo']: - url = node["display_url"] if "display_url" in node else node["display_src"] downloaded = self.download_pic(filename=filename, url=url, mtime=date) @@ -631,7 +611,7 @@ class Instaloader: downloaded = False else: # Node is an old image or video. - downloaded = self.download_pic(filename=filename, url=node["display_src"], mtime=date) + downloaded = self.download_pic(filename=filename, url=url, mtime=date) if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]: self.save_caption(filename, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"]) elif "caption" in node: @@ -639,12 +619,15 @@ class Instaloader: else: self._log("", end=' ', flush=True) if node["is_video"] and download_videos: - video_data = self.get_json('p/' + shortcode) + if not post_metadata: + post_metadata = self.get_post_metadata(shortcode) self.download_pic(filename=filename, - url=video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'], + url=post_metadata['video_url'], mtime=date) if geotags: - location = self.get_location(shortcode) + if not post_metadata: + post_metadata = self.get_post_metadata(shortcode) + location = self.get_location(post_metadata) if location: self.save_location(filename, location, date) if download_comments: @@ -652,6 +635,41 @@ class Instaloader: self._log() return downloaded + def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Dict[str, Any]]: + """Get available stories from followees or all stories of users whose ID are given. + Does not mark stories as seen. + To use this, one needs to be logged in + + :param userids: List of user IDs to be processed in terms of downloading their stories, or None. + """ + tempsession = copy_session(self.session) + header = tempsession.headers + header['User-Agent'] = 'Instagram 10.3.2 (iPhone7,2; iPhone OS 9_3_3; en_US; en-US; scale=2.00; 750x1334) ' \ + 'AppleWebKit/420+' + del header['Host'] + del header['Origin'] + del header['X-Instagram-AJAX'] + del header['X-Requested-With'] + + def _get(url): + self._sleep() + resp = tempsession.get(url) + if resp.status_code != 200: + raise ConnectionException('Failed to fetch stories.') + return json.loads(resp.text) + + url_reel_media = 'https://i.instagram.com/api/v1/feed/user/{0}/reel_media/' + url_reels_tray = 'https://i.instagram.com/api/v1/feed/reels_tray/' + if userids is not None: + for userid in userids: + yield _get(url_reel_media.format(userid)) + else: + data = _get(url_reels_tray) + if not 'tray' in data: + raise BadResponseException('Bad story reel JSON.') + for user in data["tray"]: + yield user if "items" in user else _get(url_reel_media.format(user['user']['pk'])) + def download_stories(self, userids: Optional[List[int]] = None, download_videos: bool = True, @@ -671,35 +689,7 @@ class Instaloader: if self.username is None: raise LoginRequiredException('Login required to download stories') - tempsession = copy_session(self.session) - header = tempsession.headers - header['User-Agent'] = 'Instagram 10.3.2 (iPhone7,2; iPhone OS 9_3_3; en_US; en-US; scale=2.00; 750x1334) ' \ - 'AppleWebKit/420+' - del header['Host'] - del header['Origin'] - del header['X-Instagram-AJAX'] - del header['X-Requested-With'] - - def _user_stories(): - def _get(url): - self._sleep() - resp = tempsession.get(url) - if resp.status_code != 200: - raise ConnectionException('Failed to fetch stories.') - return json.loads(resp.text) - url_reel_media = 'https://i.instagram.com/api/v1/feed/user/{0}/reel_media/' - url_reels_tray = 'https://i.instagram.com/api/v1/feed/reels_tray/' - if userids is not None: - for userid in userids: - yield _get(url_reel_media.format(userid)) - else: - data = _get(url_reels_tray) - if not 'tray' in data: - raise BadResponseException('Bad story reel JSON.') - for user in data["tray"]: - yield user if "items" in user else _get(url_reel_media.format(user['user']['pk'])) - - for user_stories in _user_stories(): + for user_stories in self.get_stories(userids): if "items" not in user_stories: continue name = user_stories["user"]["username"].lower() @@ -710,7 +700,6 @@ class Instaloader: self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) count += 1 - self._sleep() shortcode = item["code"] if "code" in item else "no_code" date_float = item["device_timestamp"] if "device_timestamp" in item else item["taken_at"] @@ -726,7 +715,7 @@ class Instaloader: date=date, shortcode=shortcode) os.makedirs(os.path.dirname(filename), exist_ok=True) - try: + with self._error_catcher('Download story {} from user {}'.format(shortcode, name)): if "image_versions2" in item: url = item["image_versions2"]["candidates"][0]["url"] downloaded = self.download_pic(filename=filename, @@ -746,45 +735,19 @@ class Instaloader: downloaded = self.download_pic(filename=filename, url=item["video_versions"][0]["url"], mtime=date) - if "video_duration" in item and self.sleep and downloaded: - time.sleep(item["video_duration"]) - except NodeUnavailableException: - print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {} from stories." - .format(shortcode, name), file=sys.stderr) - continue - if item["story_locations"]: - location = item["story_locations"][0]["location"] - if location: - self.save_location(filename, location, date) - self._log() - if fast_update and not downloaded: - break + if item["story_locations"]: + location = item["story_locations"][0]["location"] + if location: + self.save_location(filename, location, date) + self._log() + if fast_update and not downloaded: + break - def download_feed_pics(self, max_count: int = None, fast_update: bool = False, - filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, - download_videos: bool = True, geotags: bool = False, - download_comments: bool = False) -> None: - """ - Download pictures from the user's feed. + def get_feed_posts(self) -> Iterator[Dict[str, Any]]: + """Get Posts of the user's feed.""" - Example to download up to the 20 pics the user last liked: - >>> loader = Instaloader() - >>> loader.load_session_from_file('USER') - >>> loader.download_feed_pics(max_count=20, fast_update=True, - >>> filter_func=lambda node: - >>> not node["likes"]["viewer_has_liked"] - >>> if "likes" in node else - >>> not node["viewer_has_liked"]) + data = self._get_json('', params={'__a': 1}) - :param max_count: Maximum count of pictures to download - :param fast_update: If true, abort when first already-downloaded picture is encountered - :param filter_func: function(node), which returns True if given picture should not be downloaded - :param download_videos: True, if videos should be downloaded - :param geotags: Download geotags - :param download_comments: Update comments - """ - data = self.get_feed_json() - count = 1 while True: if "graphql" in data: is_edge = True @@ -795,34 +758,64 @@ class Instaloader: else: is_edge = False feed = data["feed"]["media"] - for edge_or_node in feed["edges"] if is_edge else feed["nodes"]: - if max_count is not None and count > max_count: - return - node = edge_or_node["node"] if is_edge else edge_or_node - name = node["owner"]["username"] - if filter_func is not None and filter_func(node): - self._log("" % name, flush=True) - continue - self._log("[%3i] %s " % (count, name), end="", flush=True) - count += 1 - try: - downloaded = self.download_node(node, profile=name, target=':feed', - download_videos=download_videos, geotags=geotags, - download_comments=download_comments) - except NodeUnavailableException: - print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {} from feed." - .format(node['shortcode'], name), file=sys.stderr) - continue - if fast_update and not downloaded: - return + + if is_edge: + yield from [edge["node"] for edge in feed["edges"]] + else: + yield from [node for node in feed["nodes"]] + if not feed["page_info"]["has_next_page"]: break - data = self.get_feed_json(end_cursor=feed["page_info"]["end_cursor"]) + data = self.graphql_query(17863003771166879, {'fetch_media_item_count': 12, + 'fetch_media_item_cursor': feed["page_info"]["end_cursor"], + 'fetch_comment_count': 4, + 'fetch_like': 10}) - def get_hashtag_json(self, hashtag: str, - max_id: Optional[str] = None) -> Optional[Dict[str, Any]]: - """Return JSON of a #hashtag""" - return self.get_json(name='explore/tags/{0}/'.format(hashtag), max_id=max_id) + def download_feed_posts(self, max_count: int = None, fast_update: bool = False, + filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, + download_videos: bool = True, geotags: bool = False, + download_comments: bool = False) -> None: + """ + Download pictures from the user's feed. + + Example to download up to the 20 pics the user last liked: + >>> loader = Instaloader() + >>> loader.load_session_from_file('USER') + >>> loader.download_feed_posts(max_count=20, fast_update=True, + >>> filter_func=lambda post: + >>> not post["likes"]["viewer_has_liked"] + >>> if "likes" in post else + >>> not post["viewer_has_liked"]) + + :param max_count: Maximum count of pictures to download + :param fast_update: If true, abort when first already-downloaded picture is encountered + :param filter_func: function(post), which returns True if given picture should not be downloaded + :param download_videos: True, if videos should be downloaded + :param geotags: Download geotags + :param download_comments: Update comments + """ + count = 1 + for post in self.get_feed_posts(): + if max_count is not None and count > max_count: + break + name = post["owner"]["username"] + if filter_func is not None and filter_func(post): + self._log("" % name, flush=True) + continue + self._log("[%3i] %s " % (count, name), end="", flush=True) + count += 1 + with self._error_catcher('Download feed'): + downloaded = self.download_post(post, profile=name, target=':feed', + download_videos=download_videos, geotags=geotags, + download_comments=download_comments) + if fast_update and not downloaded: + break + + def get_hashtag_posts(self, hashtag: str) -> Iterator[Dict[str, Any]]: + """Get Posts associated with a #hashtag.""" + yield from self.graphql_node_list(17875800862117404, {'tag_name': hashtag}, + 'https://www.instagram.com/explore/tags/' + hashtag + '/', + lambda d: d['data']['hashtag']['edge_hashtag_to_media']) def download_hashtag(self, hashtag: str, max_count: Optional[int] = None, @@ -837,48 +830,38 @@ class Instaloader: :param hashtag: Hashtag to download, without leading '#' :param max_count: Maximum count of pictures to download - :param filter_func: function(node), which returns True if given picture should not be downloaded + :param filter_func: function(post), which returns True if given picture should not be downloaded :param fast_update: If true, abort when first already-downloaded picture is encountered :param download_videos: True, if videos should be downloaded :param geotags: Download geotags :param download_comments: Update comments """ - data = self.get_hashtag_json(hashtag) count = 1 - while data: - for node in data['entry_data']['TagPage'][0]['tag']['media']['nodes']: - if max_count is not None and count > max_count: - return - self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True) - if filter_func is not None and filter_func(node): - self._log('') - continue - count += 1 - try: - downloaded = self.download_node(node=node, profile=None, target='#'+hashtag, - download_videos=download_videos, geotags=geotags, - download_comments=download_comments) - except NodeUnavailableException: - print("Unable to download node \"https://www.instagram.com/p/{}/\" " - "while downloading hashtag \"{}\".".format(node['shortcode'], hashtag), file=sys.stderr) - continue - if fast_update and not downloaded: - return - if data['entry_data']['TagPage'][0]['tag']['media']['page_info']['has_next_page']: - data = self.get_hashtag_json(hashtag, - max_id=data['entry_data']['TagPage'][0]['tag']['media']['page_info'][ - 'end_cursor']) - else: + for post in self.get_hashtag_posts(hashtag): + if max_count is not None and count > max_count: break + self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True) + if filter_func is not None and filter_func(post): + self._log('') + continue + count += 1 + with self._error_catcher('Download hashtag #{}'.format(hashtag)): + downloaded = self.download_post(node=post, profile=None, target='#' + hashtag, + download_videos=download_videos, geotags=geotags, + download_comments=download_comments) + if fast_update and not downloaded: + break - def check_id(self, profile: str, json_data: Dict[str, Any]) -> Tuple[str, int]: + def check_profile_id(self, profile: str, profile_metadata: Optional[Dict[str, Any]] = None) -> Tuple[str, int]: """ Consult locally stored ID of profile with given name, check whether ID matches and whether name has changed and return current name of the profile, and store ID of profile. + :param profile: Profile name + :param profile_metadata: The profile's metadata (get_profile_metadata()), or None if the profile was not found :return: current profile name, profile id """ - profile_exists = "ProfilePage" in json_data["entry_data"] + profile_exists = profile_metadata is not None if ((format_string_contains_key(self.dirname_pattern, 'profile') or format_string_contains_key(self.dirname_pattern, 'target'))): id_filename = '{0}/id'.format(self.dirname_pattern.format(profile=profile.lower(), @@ -889,7 +872,7 @@ class Instaloader: with open(id_filename, 'rb') as id_file: profile_id = int(id_file.read()) if (not profile_exists) or \ - (profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])): + (profile_id != int(profile_metadata['user']['id'])): if profile_exists: self._log("Profile {0} does not match the stored unique ID {1}.".format(profile, profile_id)) else: @@ -913,75 +896,95 @@ class Instaloader: os.makedirs(self.dirname_pattern.format(profile=profile.lower(), target=profile.lower()), exist_ok=True) with open(id_filename, 'w') as text_file: - profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id'] + profile_id = profile_metadata['user']['id'] text_file.write(profile_id + "\n") self._log("Stored ID {0} for profile {1}.".format(profile_id, profile)) return profile, profile_id raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) - def download(self, name: str, - profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, - download_comments: bool = False, fast_update: bool = False, - download_stories: bool = False, download_stories_only: bool = False) -> None: + def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]: + """Retrieves a profile's metadata, for use with e.g. get_profile_posts() and check_profile_id().""" + try: + return self._get_json('{}/'.format(profile_name), params={'__a': 1}) + except QueryReturnedNotFoundException: + raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name)) + + def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Dict[str, Any]]: + """Retrieve all posts from a profile.""" + yield from profile_metadata['user']['media']['nodes'] + has_next_page = profile_metadata['user']['media']['page_info']['has_next_page'] + end_cursor = profile_metadata['user']['media']['page_info']['end_cursor'] + while has_next_page: + data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'], + 'first': 500, + 'after': end_cursor}, + 'https://www.instagram.com/{0}/'.format(profile_metadata['user']['username'])) + media = data['data']['user']['edge_owner_to_timeline_media'] + yield from [edge['node'] for edge in media['edges']] + has_next_page = media['page_info']['has_next_page'] + end_cursor = media['page_info']['end_cursor'] + + def download_profile(self, name: str, + profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, + download_comments: bool = False, fast_update: bool = False, + download_stories: bool = False, download_stories_only: bool = False) -> None: """Download one profile""" + # Get profile main page json - data = self.get_json(name) + profile_metadata = None + with suppress(ProfileNotExistsException): + # ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we + # must suppress it. + profile_metadata = self.get_profile_metadata(name) + # check if profile does exist or name has changed since last download # and update name and json data if necessary - name_updated, profile_id = self.check_id(name, data) + name_updated, profile_id = self.check_profile_id(name, profile_metadata) if name_updated != name: name = name_updated - data = self.get_json(name) + profile_metadata = self.get_profile_metadata(name) + # Download profile picture - try: - self.download_profilepic(name, data["entry_data"]["ProfilePage"][0]["user"]["profile_pic_url"]) - except NodeUnavailableException: - print("Unable to download profilepic of user {}.".format(name), file=sys.stderr) + with self._error_catcher('Download profile picture of {}'.format(name)): + self.download_profilepic(name, profile_metadata["user"]["profile_pic_url"]) if profile_pic_only: return + # Catch some errors - if data["entry_data"]["ProfilePage"][0]["user"]["is_private"]: - if data["config"]["viewer"] is None: + if profile_metadata["user"]["is_private"]: + if self.username is None: raise LoginRequiredException("profile %s requires login" % name) - if not data["entry_data"]["ProfilePage"][0]["user"]["followed_by_viewer"] and \ - self.username != data["entry_data"]["ProfilePage"][0]["user"]["username"]: + if not profile_metadata["user"]["followed_by_viewer"] and \ + self.username != profile_metadata["user"]["username"]: raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name) else: - if data["config"]["viewer"] is not None and not (download_stories or download_stories_only): + if self.username is not None and not (download_stories or download_stories_only): self._log("profile %s could also be downloaded anonymously." % name) + + # Download stories, if requested if download_stories or download_stories_only: self.download_stories(userids=[profile_id], filename_target=name, download_videos=download_videos, fast_update=fast_update) if download_stories_only: return - if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or - not data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) \ - and not profile_pic_only: + + if ("nodes" not in profile_metadata["user"]["media"] or + not profile_metadata["user"]["media"]["nodes"]): raise ProfileHasNoPicsException("Profile %s: no pics found." % name) # Iterate over pictures and download them self._log("Retrieving posts from profile {}.".format(name)) - def get_last_id(data): - if data["entry_data"] and data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: - return data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"][-1]["id"] - - totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"] + totalcount = profile_metadata["user"]["media"]["count"] count = 1 - while get_last_id(data) is not None: - for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: - self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) - count += 1 - try: - downloaded = self.download_node(node=node, profile=name, target=name, - download_videos=download_videos, geotags=geotags, - download_comments=download_comments) - except NodeUnavailableException: - print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {}." - .format(node['shortcode'], name), file=sys.stderr) - continue + for post in self.get_profile_posts(profile_metadata): + self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) + count += 1 + with self._error_catcher('Download profile {}'.format(name)): + downloaded = self.download_post(node=post, profile=name, target=name, + download_videos=download_videos, geotags=geotags, + download_comments=download_comments) if fast_update and not downloaded: - return - data = self.get_json(name, max_id=get_last_id(data)) + break def interactive_login(self, username: str) -> None: """Logs in and internally stores session, asking user for password interactively. @@ -998,12 +1001,12 @@ class Instaloader: print(err, file=sys.stderr) password = None - def download_profiles(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None, - sessionfile: Optional[str] = None, max_count: Optional[int] = None, - profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, - download_comments: bool = False, - fast_update: bool = False, - stories: bool = False, stories_only: bool = False) -> None: + def main(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None, + sessionfile: Optional[str] = None, max_count: Optional[int] = None, + profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, + download_comments: bool = False, + fast_update: bool = False, + stories: bool = False, stories_only: bool = False) -> None: """Download set of profiles and handle sessions""" # Login, if desired if username is not None: @@ -1020,46 +1023,51 @@ class Instaloader: self.interactive_login(username) self._log("Logged in as %s." % username) # Try block for KeyboardInterrupt (save session on ^C) - failedtargets = [] targets = set() try: # Generate set of targets for pentry in profilelist: if pentry[0] == '#': self._log("Retrieving pictures with hashtag {0}".format(pentry)) - self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update, - download_videos=download_videos, geotags=geotags, - download_comments=download_comments) + with self._error_catcher(): + self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update, + download_videos=download_videos, geotags=geotags, + download_comments=download_comments) elif pentry[0] == '@': if username is not None: self._log("Retrieving followees of %s..." % pentry[1:]) - followees = self.get_followees(pentry[1:]) - targets.update([followee['username'] for followee in followees]) + with self._error_catcher(): + followees = self.get_followees(pentry[1:]) + targets.update([followee['username'] for followee in followees]) else: print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr) elif pentry == ":feed-all": if username is not None: self._log("Retrieving pictures from your feed...") - self.download_feed_pics(fast_update=fast_update, max_count=max_count, - download_videos=download_videos, geotags=geotags, - download_comments=download_comments) + with self._error_catcher(): + self.download_feed_posts(fast_update=fast_update, max_count=max_count, + download_videos=download_videos, geotags=geotags, + download_comments=download_comments) else: print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr) elif pentry == ":feed-liked": if username is not None: self._log("Retrieving pictures you liked from your feed...") - self.download_feed_pics(fast_update=fast_update, max_count=max_count, - filter_func=lambda node: - not node["likes"]["viewer_has_liked"] - if "likes" in node - else not node["viewer_has_liked"], - download_videos=download_videos, geotags=geotags, - download_comments=download_comments) + def liked_filter(node): + if "likes" in node: + return not node["likes"]["viewer_has_liked"] + return not node["viewer_has_liked"] + with self._error_catcher(): + self.download_feed_posts(fast_update=fast_update, max_count=max_count, + filter_func=liked_filter, + download_videos=download_videos, geotags=geotags, + download_comments=download_comments) else: print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr) elif pentry == ":stories": if username is not None: - self.download_stories(download_videos=download_videos, fast_update=fast_update) + with self._error_catcher(): + self.download_stories(download_videos=download_videos, fast_update=fast_update) else: print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr) else: @@ -1068,31 +1076,31 @@ class Instaloader: self._log("Downloading %i profiles..." % len(targets)) # Iterate through targets list and download them for target in targets: - try: + with self._error_catcher(): try: - self.download(target, profile_pic_only, download_videos, - geotags, download_comments, fast_update, stories, stories_only) + self.download_profile(target, profile_pic_only, download_videos, + geotags, download_comments, fast_update, stories, stories_only) except ProfileNotExistsException as err: if username is not None: self._log(err) self._log("Trying again anonymously, helps in case you are just blocked.") anonymous_loader = Instaloader(self.sleep, self.quiet, self.shorter_output, self.user_agent, self.dirname_pattern, self.filename_pattern) - anonymous_loader.download(target, profile_pic_only, download_videos, - geotags, download_comments, fast_update) + anonymous_loader.error_log = self.error_log + with self._error_catcher(): + anonymous_loader.download_profile(target, profile_pic_only, download_videos, + geotags, download_comments, fast_update) else: raise err - except NonfatalException as err: - failedtargets.append(target) - print(err, file=sys.stderr) except KeyboardInterrupt: print("\nInterrupted by user.", file=sys.stderr) - if len(targets) > 1 and failedtargets: - print("Errors occured (see above) while downloading profiles: %s." % - ", ".join(failedtargets), file=sys.stderr) # Save session if it is useful if username is not None: self.save_session_to_file(sessionfile) + if self.error_log: + print("\nErrors occured:", file=sys.stderr) + for err in self.error_log: + print(err, file=sys.stderr) def main(): @@ -1194,11 +1202,11 @@ def main(): loader = Instaloader(sleep=not args.no_sleep, quiet=args.quiet, shorter_output=args.shorter_output, user_agent=args.user_agent, dirname_pattern=args.dirname_pattern, filename_pattern=args.filename_pattern) - loader.download_profiles(args.profile, args.login.lower() if args.login is not None else None, args.password, - args.sessionfile, - int(args.count) if args.count is not None else None, - args.profile_pic_only, not args.skip_videos, args.geotags, args.comments, - args.fast_update, args.stories, args.stories_only) + loader.main(args.profile, args.login.lower() if args.login is not None else None, args.password, + args.sessionfile, + int(args.count) if args.count is not None else None, + args.profile_pic_only, not args.skip_videos, args.geotags, args.comments, + args.fast_update, args.stories, args.stories_only) except InstaloaderException as err: raise SystemExit("Fatal error: %s" % err)