diff --git a/README.rst b/README.rst index 9fd2e63..7d4e9dc 100644 --- a/README.rst +++ b/README.rst @@ -260,10 +260,7 @@ You could also download your last 20 liked pics with .. code:: python loader.download_feed_posts(max_count=20, fast_update=True, - filter_func=lambda node: - not node["likes"]["viewer_has_liked"] - if "likes" in node else - not node["viewer_has_liked"]) + filter_func=lambda post: post.viewer_has_liked) To download the last 20 pictures with hashtag #cat, do diff --git a/instaloader.py b/instaloader.py index a7a4715..d35530a 100755 --- a/instaloader.py +++ b/instaloader.py @@ -123,6 +123,144 @@ def format_string_contains_key(format_string: str, key: str) -> bool: return False +class Post: + """ + Structure containing information about an Instagram post. + + Created by Instaloader methods get_profile_posts(), get_hashtag_posts(), get_feed_posts(). Posts are linked to + an Instaloader instance which is used for error logging and obtaining of additional metadata, if required. + This class unifies access to the properties associated with a post. It implements == and is hashable. + """ + + def __init__(self, instaloader: 'Instaloader', node: Dict[str, Any], profile: Optional[str] = None): + """Create a Post instance from a node structure as returned by Instagram. + + :param instaloader: Instaloader instance used for additional queries if neccessary. + :param node: Node structure. + :param profile: The name of the owner, if already known at creation. + """ + self._instaloader = instaloader + self._node = node + self._profile = profile + self._full_metadata_dict = None + + @classmethod + def from_shortcode(cls, instaloader: 'Instaloader', shortcode: str): + """Create a post object from a given shortcode""" + # pylint:disable=protected-access + post = cls(instaloader, {'shortcode': shortcode}) + post._node = post._full_metadata + return post + + @classmethod + def from_mediaid(cls, instaloader: 'Instaloader', mediaid: int): + """Create a post object from a given mediaid""" + return cls.from_shortcode(instaloader, mediaid_to_shortcode(mediaid)) + + @property + def shortcode(self) -> str: + return self._node['shortcode'] if 'shortcode' in self._node else self._node['code'] + + def __repr__(self): + return ''.format(self.shortcode) + + def __eq__(self, o: object) -> bool: + if isinstance(o, Post): + return self.shortcode == o.shortcode + return NotImplemented + + def __hash__(self) -> int: + return hash(self.shortcode) + + @property + def _full_metadata(self) -> Dict[str, Any]: + if not self._full_metadata_dict: + pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={'__a': 1}) + if "graphql" in pic_json: + self._full_metadata_dict = pic_json["graphql"]["shortcode_media"] + else: + self._full_metadata_dict = pic_json["media"] + return self._full_metadata_dict + + @property + def owner_username(self) -> str: + """The Post's lowercase owner name, or 'UNKNOWN'.""" + try: + if self._profile: + return self._profile.lower() + if 'owner' in self._node and 'username' in self._node['owner']: + return self._node['owner']['username'].lower() + return self._full_metadata['owner']['username'].lower() + except (InstaloaderException, KeyError, TypeError) as err: + self._instaloader.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err)) + return 'UNKNOWN' + + @property + def date(self) -> datetime: + return datetime.fromtimestamp(self._node["date"] if "date" in self._node else self._node["taken_at_timestamp"]) + + @property + def url(self) -> str: + return self._node["display_url"] if "display_url" in self._node else self._node["display_src"] + + @property + def typename(self) -> str: + """Type of post, GraphImage, GraphVideo or GraphSidecar""" + if '__typename' in self._node: + return self._node['__typename'] + # if __typename is not in node, it is an old image or video + return 'GraphImage' + + @property + def sidecar_edges(self) -> List[Dict[str, Any]]: + return self._full_metadata['edge_sidecar_to_children']['edges'] + + @property + def caption(self) -> Optional[str]: + if "edge_media_to_caption" in self._node and self._node["edge_media_to_caption"]["edges"]: + return self._node["edge_media_to_caption"]["edges"][0]["node"]["text"] + elif "caption" in self._node: + return self._node["caption"] + + @property + def is_video(self) -> bool: + return self._node['is_video'] + + @property + def video_url(self) -> str: + return self._full_metadata['video_url'] + + @property + def viewer_has_liked(self) -> bool: + """Whether the viewer has liked the post. + + :raises LoginRequiredException: if not logged in.""" + if not self._instaloader.is_logged_in: + raise LoginRequiredException("Login required to obtain whether viewer has liked {}.".format(self)) + if 'likes' in self._node: + return self._node['likes']['viewer_has_liked'] + if 'viewer_has_liked' in self._node: + return self._node['viewer_has_liked'] + return self._full_metadata['viewer_has_liked'] + + def get_comments(self) -> Iterator[Dict[str, Any]]: + comments_in_metadata = self._full_metadata['edge_media_to_comment'] + if comments_in_metadata['count'] == len(comments_in_metadata['edges']): + # If the Post's metadata already contains all comments, don't do GraphQL requests to obtain them + yield from (comment['node'] for comment in comments_in_metadata['edges']) + yield from self._instaloader.graphql_node_list(17852405266163336, {'shortcode': self.shortcode}, + 'https://www.instagram.com/p/' + self.shortcode + '/', + lambda d: d['data']['shortcode_media']['edge_media_to_comment']) + + def get_location(self) -> Optional[Dict[str, str]]: + """If the Post has a location, returns a dictionary with fields 'lat' and 'lng'.""" + loc_dict = self._full_metadata["location"] + if loc_dict is not None: + location_json = self._instaloader.get_json("explore/locations/{0}/".format(loc_dict["id"]), + params={'__a': 1}) + return location_json["location"] + + class Tristate(Enum): """Tri-state to encode whether we should save certain information, i.e. videos, captions, comments or geotags. @@ -161,9 +299,13 @@ class Instaloader: self.download_captions = download_captions self.download_comments = download_comments - # error log, filled with _error() and printed at the end of Instaloader.main() + # error log, filled with error() and printed at the end of Instaloader.main() self.error_log = [] + @property + def is_logged_in(self) -> bool: + return bool(self.username) + @contextmanager def anonymous_copy(self): """Yield an anonymous, otherwise equally-configured copy of an Instaloader instance; Then copy its error log.""" @@ -179,7 +321,7 @@ class Instaloader: if not self.quiet: print(*msg, sep=sep, end=end, flush=flush) - def _error(self, msg): + def error(self, msg): """Log a non-fatal error message to stderr, which is repeated at program termination.""" print(msg, file=sys.stderr) self.error_log.append(msg) @@ -194,9 +336,9 @@ class Instaloader: yield except InstaloaderException as err: if extra_info: - self._error('{}: {}'.format(extra_info, err)) + self.error('{}: {}'.format(extra_info, err)) else: - self._error('{}'.format(err)) + self.error('{}'.format(err)) def _sleep(self): """Sleep a short, random time if self.sleep is set. Called before each request to the instagram.com.""" @@ -225,12 +367,12 @@ class Instaloader: if tries <= 1: raise ConnectionException(error_string) else: - self._error(error_string + " [retrying]") + self.error(error_string + " [retrying]") self._sleep() self._get_and_write_raw(url, filename, tries - 1) - def _get_json(self, url: str, params: Optional[Dict[str, Any]] = None, - session: Optional[requests.Session] = None, tries: int = 3) -> Dict[str, Any]: + def get_json(self, url: str, params: Optional[Dict[str, Any]] = None, + session: Optional[requests.Session] = None, tries: int = 3) -> Dict[str, Any]: """JSON request to Instagram. :param url: URL, relative to https://www.instagram.com/ @@ -262,9 +404,9 @@ class Instaloader: if tries <= 1: raise ConnectionException(error_string) else: - self._error(error_string + " [retrying]") + self.error(error_string + " [retrying]") self._sleep() - self._get_json(url, params, sess, tries - 1) + self.get_json(url, params, sess, tries - 1) def _default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]: """Returns default HTTP header we use for requests.""" @@ -314,11 +456,11 @@ class Instaloader: tmpsession.headers['accept'] = '*/*' if referer is not None: tmpsession.headers['referer'] = referer - resp_json = self._get_json('graphql/query', params={'query_id': query_id, - 'variables': json.dumps(variables, separators=(',', ':'))}, - session=tmpsession) + resp_json = self.get_json('graphql/query', params={'query_id': query_id, + 'variables': json.dumps(variables, separators=(',', ':'))}, + session=tmpsession) if 'status' not in resp_json: - self._error("GraphQL response did not contain a \"status\" field.") + self.error("GraphQL response did not contain a \"status\" field.") return resp_json def get_username_by_id(self, profile_id: int) -> str: @@ -335,8 +477,7 @@ class Instaloader: else: raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").") else: - shortcode = mediaid_to_shortcode(int(data['edges'][0]["node"]["id"])) - return self.get_post_metadata(shortcode)['owner']['username'] + return Post.from_mediaid(self, int(data['edges'][0]["node"]["id"])).owner_username def get_id_by_username(self, profile: str) -> int: """Each Instagram profile has its own unique ID which stays unmodified even if a user changes @@ -396,19 +537,13 @@ class Instaloader: os.utime(filename, (datetime.now().timestamp(), mtime.timestamp())) return True - def get_comments(self, shortcode: str) -> Iterator[Dict[str, Any]]: - """Retrieve comments of node with given shortcode.""" - yield from self.graphql_node_list(17852405266163336, {'shortcode': shortcode}, - 'https://www.instagram.com/p/' + shortcode + '/', - lambda d: d['data']['shortcode_media']['edge_media_to_comment']) - - def update_comments(self, filename: str, shortcode: str) -> None: + def update_comments(self, filename: str, post: Post) -> None: filename += '_comments.json' try: comments = json.load(open(filename)) except FileNotFoundError: comments = list() - comments.extend(self.get_comments(shortcode)) + comments.extend(post.get_comments()) if comments: with open(filename, 'w') as file: comments_list = sorted(sorted(list(comments), key=lambda t: t['id']), @@ -533,7 +668,7 @@ class Instaloader: def test_login(self, session: Optional[requests.Session]) -> Optional[str]: """Returns the Instagram username to which given requests.Session object belongs, or None.""" if session: - data = self._get_json('', params={'__a': 1}, session=session) + data = self.get_json('', params={'__a': 1}, session=session) return data['graphql']['user']['username'] if 'graphql' in data else None def login(self, user: str, passwd: str) -> None: @@ -559,96 +694,63 @@ class Instaloader: else: raise ConnectionException('Login error! Connection error!') - def get_post_metadata(self, shortcode: str) -> Dict[str, Any]: - """Get full metadata of the post associated with given shortcode.""" - pic_json = self._get_json("p/{0}/".format(shortcode), params={'__a': 1}) - media = pic_json["graphql"]["shortcode_media"] if "graphql" in pic_json else pic_json["media"] - return media - - def get_location(self, post_metadata: Dict[str, Any]) -> Optional[Dict[str, str]]: - if post_metadata["location"] is not None: - location_json = self._get_json("explore/locations/{0}/".format(post_metadata["location"]["id"]), - params={'__a': 1}) - return location_json["location"] - - def download_post(self, node: Dict[str, Any], profile: Optional[str], target: str) -> bool: + def download_post(self, post: Post, target: str) -> bool: """ Download everything associated with one instagram post node, i.e. picture, caption and video. - :param node: Node, as from media->nodes list in instagram's JSONs - :param profile: Name of profile to which this node belongs + :param post: Post to download. :param target: Target name, i.e. profile name, #hashtag, :feed; for filename. :return: True if something was downloaded, False otherwise, i.e. file was already there """ - already_has_profilename = profile is not None or ('owner' in node and 'username' in node['owner']) + + # Format dirname and filename. post.owner_username might do an additional request, so only access it, if + # {profile} is part of the dirname pattern or filename pattern. needs_profilename = (format_string_contains_key(self.dirname_pattern, 'profile') or format_string_contains_key(self.filename_pattern, 'profile')) - shortcode = node['shortcode'] if 'shortcode' in node else node['code'] - post_metadata = None - if needs_profilename: - if already_has_profilename: - profilename = profile if profile is not None else node['owner']['username'] - profilename = profilename.lower() - else: - try: - post_metadata = self.get_post_metadata(shortcode) - profilename = post_metadata['owner']['username'].lower() - except (InstaloaderException, KeyError, TypeError) as err: - self._error("Unable to get owner name of post {}: {} -- using \'UNKNOWN\'.".format(shortcode, err)) - profilename = 'UNKNOWN' - else: - profilename = None - date = datetime.fromtimestamp(node["date"] if "date" in node else node["taken_at_timestamp"]) + profilename = post.owner_username if needs_profilename else None dirname = self.dirname_pattern.format(profile=profilename, target=target.lower()) filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(), - date=date, - shortcode=shortcode) + date=post.date, shortcode=post.shortcode) os.makedirs(os.path.dirname(filename), exist_ok=True) - url = node["display_url"] if "display_url" in node else node["display_src"] - if '__typename' in node: - if node['__typename'] == 'GraphSidecar': - if not post_metadata: - post_metadata = self.get_post_metadata(shortcode) - edge_number = 1 - downloaded = True - for edge in post_metadata['edge_sidecar_to_children']['edges']: - edge_downloaded = self.download_pic(filename=filename, - url=edge['node']['display_url'], - mtime=date, - filename_suffix=str(edge_number)) - downloaded = downloaded and edge_downloaded - edge_number += 1 - elif node['__typename'] in ['GraphImage', 'GraphVideo']: - downloaded = self.download_pic(filename=filename, - url=url, - mtime=date) - else: - self._log("Warning: Unknown typename discovered:" + node['__typename']) - downloaded = False + + # Download the image(s) / video thumbnail + if post.typename == 'GraphSidecar': + edge_number = 1 + downloaded = True + for edge in post.sidecar_edges: + edge_downloaded = self.download_pic(filename=filename, + url=edge['node']['display_url'], + mtime=post.date, + filename_suffix=str(edge_number)) + downloaded = downloaded and edge_downloaded + edge_number += 1 + elif post.typename in ['GraphImage', 'GraphVideo']: + downloaded = self.download_pic(filename=filename, url=post.url, mtime=post.date) else: - # Node is an old image or video. - downloaded = self.download_pic(filename=filename, url=url, mtime=date) + self.error("Warning: {0} has unknown typename: {1}".format(post, post.typename)) + downloaded = False + + # Save caption if desired if self.download_captions is not Tristate.never: - if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]: - self.save_caption(filename, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"]) - elif "caption" in node: - self.save_caption(filename, date, node["caption"]) + if post.caption: + self.save_caption(filename, post.date, post.caption) else: self._log("", end=' ', flush=True) - if node["is_video"] and self.download_videos is Tristate.always: - if not post_metadata: - post_metadata = self.get_post_metadata(shortcode) - self.download_pic(filename=filename, - url=post_metadata['video_url'], - mtime=date) + + # Download video if desired + if post.is_video and self.download_videos is Tristate.always: + self.download_pic(filename=filename, url=post.video_url, mtime=post.date) + + # Download geotags if desired if self.download_geotags is Tristate.always: - if not post_metadata: - post_metadata = self.get_post_metadata(shortcode) - location = self.get_location(post_metadata) + location = post.get_location() if location: - self.save_location(filename, location, date) + self.save_location(filename, location, post.date) + + # Update comments if desired if self.download_comments is Tristate.always: - self.update_comments(filename, shortcode) + self.update_comments(filename, post) + self._log() return downloaded @@ -701,7 +803,7 @@ class Instaloader: :param filename_target: Replacement for {target} in dirname_pattern and filename_pattern """ - if self.username is None: + if not self.is_logged_in: raise LoginRequiredException('Login required to download stories') for user_stories in self.get_stories(userids): @@ -759,10 +861,10 @@ class Instaloader: if fast_update and not downloaded: break - def get_feed_posts(self) -> Iterator[Dict[str, Any]]: + def get_feed_posts(self) -> Iterator[Post]: """Get Posts of the user's feed.""" - data = self._get_json('', params={'__a': 1}) + data = self.get_json('', params={'__a': 1}) while True: if "graphql" in data: @@ -776,9 +878,9 @@ class Instaloader: feed = data["feed"]["media"] if is_edge: - yield from [edge["node"] for edge in feed["edges"]] + yield from (Post(self, edge["node"]) for edge in feed["edges"]) else: - yield from [node for node in feed["nodes"]] + yield from (Post(self, node) for node in feed["nodes"]) if not feed["page_info"]["has_next_page"]: break @@ -788,7 +890,7 @@ class Instaloader: 'fetch_like': 10}) def download_feed_posts(self, max_count: int = None, fast_update: bool = False, - filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None) -> None: + filter_func: Optional[Callable[[Post], bool]] = None) -> None: """ Download pictures from the user's feed. @@ -796,39 +898,37 @@ class Instaloader: >>> loader = Instaloader() >>> loader.load_session_from_file('USER') >>> loader.download_feed_posts(max_count=20, fast_update=True, - >>> filter_func=lambda post: - >>> not post["likes"]["viewer_has_liked"] - >>> if "likes" in post else - >>> not post["viewer_has_liked"]) + >>> filter_func=lambda post: post.viewer_has_liked) :param max_count: Maximum count of pictures to download :param fast_update: If true, abort when first already-downloaded picture is encountered - :param filter_func: function(post), which returns True if given picture should not be downloaded + :param filter_func: function(post), which returns True if given picture should be downloaded """ count = 1 for post in self.get_feed_posts(): if max_count is not None and count > max_count: break - name = post["owner"]["username"] - if filter_func is not None and filter_func(post): + name = post.owner_username + if filter_func is not None and not filter_func(post): self._log("" % name, flush=True) continue self._log("[%3i] %s " % (count, name), end="", flush=True) count += 1 with self._error_catcher('Download feed'): - downloaded = self.download_post(post, profile=name, target=':feed') + downloaded = self.download_post(post, target=':feed') if fast_update and not downloaded: break - def get_hashtag_posts(self, hashtag: str) -> Iterator[Dict[str, Any]]: + def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]: """Get Posts associated with a #hashtag.""" - yield from self.graphql_node_list(17875800862117404, {'tag_name': hashtag}, - 'https://www.instagram.com/explore/tags/' + hashtag + '/', - lambda d: d['data']['hashtag']['edge_hashtag_to_media']) + yield from (Post(self, node) for node in + self.graphql_node_list(17875800862117404, {'tag_name': hashtag}, + 'https://www.instagram.com/explore/tags/{0}/'.format(hashtag), + lambda d: d['data']['hashtag']['edge_hashtag_to_media'])) def download_hashtag(self, hashtag: str, max_count: Optional[int] = None, - filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, + filter_func: Optional[Callable[[Post], bool]] = None, fast_update: bool = False) -> None: """Download pictures of one hashtag. @@ -838,7 +938,7 @@ class Instaloader: :param hashtag: Hashtag to download, without leading '#' :param max_count: Maximum count of pictures to download - :param filter_func: function(post), which returns True if given picture should not be downloaded + :param filter_func: function(post), which returns True if given picture should be downloaded :param fast_update: If true, abort when first already-downloaded picture is encountered """ count = 1 @@ -846,12 +946,12 @@ class Instaloader: if max_count is not None and count > max_count: break self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True) - if filter_func is not None and filter_func(post): + if filter_func is not None and not filter_func(post): self._log('') continue count += 1 with self._error_catcher('Download hashtag #{}'.format(hashtag)): - downloaded = self.download_post(node=post, profile=None, target='#' + hashtag) + downloaded = self.download_post(post, target='#' + hashtag) if fast_update and not downloaded: break @@ -908,13 +1008,14 @@ class Instaloader: def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]: """Retrieves a profile's metadata, for use with e.g. get_profile_posts() and check_profile_id().""" try: - return self._get_json('{}/'.format(profile_name), params={'__a': 1}) + return self.get_json('{}/'.format(profile_name), params={'__a': 1}) except QueryReturnedNotFoundException: raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name)) - def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Dict[str, Any]]: + def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Post]: """Retrieve all posts from a profile.""" - yield from profile_metadata['user']['media']['nodes'] + profile_name = profile_metadata['user']['username'] + yield from (Post(self, node, profile=profile_name) for node in profile_metadata['user']['media']['nodes']) has_next_page = profile_metadata['user']['media']['page_info']['has_next_page'] end_cursor = profile_metadata['user']['media']['page_info']['end_cursor'] while has_next_page: @@ -923,15 +1024,16 @@ class Instaloader: data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'], 'first': 500, 'after': end_cursor}, - 'https://www.instagram.com/{0}/'.format(profile_metadata['user']['username'])) + 'https://www.instagram.com/{0}/'.format(profile_name)) media = data['data']['user']['edge_owner_to_timeline_media'] - yield from [edge['node'] for edge in media['edges']] + yield from (Post(self, edge['node'], profile=profile_name) for edge in media['edges']) has_next_page = media['page_info']['has_next_page'] end_cursor = media['page_info']['end_cursor'] def download_profile(self, name: str, profile_pic_only: bool = False, fast_update: bool = False, - download_stories: bool = False, download_stories_only: bool = False) -> None: + download_stories: bool = False, download_stories_only: bool = False, + filter_func: Optional[Callable[[Post], bool]] = None) -> None: """Download one profile""" # Get profile main page json @@ -956,13 +1058,13 @@ class Instaloader: # Catch some errors if profile_metadata["user"]["is_private"]: - if self.username is None: + if not self.is_logged_in: raise LoginRequiredException("profile %s requires login" % name) if not profile_metadata["user"]["followed_by_viewer"] and \ self.username != profile_metadata["user"]["username"]: raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name) else: - if self.username is not None and not (download_stories or download_stories_only): + if self.is_logged_in and not (download_stories or download_stories_only): self._log("profile %s could also be downloaded anonymously." % name) # Download stories, if requested @@ -982,8 +1084,11 @@ class Instaloader: for post in self.get_profile_posts(profile_metadata): self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) count += 1 + if filter_func is not None and not filter_func(post): + self._log('') + continue with self._error_catcher('Download profile {}'.format(name)): - downloaded = self.download_post(node=post, profile=name, target=name) + downloaded = self.download_post(post, target=name) if fast_update and not downloaded: break @@ -1015,7 +1120,7 @@ class Instaloader: if sessionfile is not None: print(err, file=sys.stderr) self._log("Session file does not exist yet - Logging in.") - if self.username is None or username != self.test_login(self.session): + if not self.is_logged_in or username != self.test_login(self.session): if password is not None: self.login(username, password) else: @@ -1048,13 +1153,9 @@ class Instaloader: elif pentry == ":feed-liked": if username is not None: self._log("Retrieving pictures you liked from your feed...") - def liked_filter(node): - if "likes" in node: - return not node["likes"]["viewer_has_liked"] - return not node["viewer_has_liked"] with self._error_catcher(): self.download_feed_posts(fast_update=fast_update, max_count=max_count, - filter_func=liked_filter) + filter_func=lambda post: post.viewer_has_liked) else: print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr) elif pentry == ":stories":