diff --git a/instaloader/instaloader.py b/instaloader/instaloader.py index 90d2620..16d5fc1 100644 --- a/instaloader/instaloader.py +++ b/instaloader/instaloader.py @@ -135,30 +135,8 @@ class Instaloader: def __exit__(self, *args): self.close() - def get_username_by_id(self, profile_id: int) -> str: - """To get the current username of a profile, given its unique ID, this function can be used.""" - data = self.context.graphql_query("472f257a40c653c64c666ce877d59d2b", - {'id': str(profile_id), 'first': 1})['data']['user'] - if data: - data = data["edge_owner_to_timeline_media"] - else: - raise ProfileNotExistsException("No profile found, the user may have blocked you (ID: " + - str(profile_id) + ").") - if not data['edges']: - if data['count'] == 0: - raise ProfileHasNoPicsException("Profile with ID {0}: no pics found.".format(str(profile_id))) - else: - raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").") - else: - return Post.from_mediaid(self.context, int(data['edges'][0]["node"]["id"])).owner_username - - def get_id_by_username(self, profile: str) -> int: - """Each Instagram profile has its own unique ID which stays unmodified even if a user changes - his/her username. To get said ID, given the profile's name, you may call this function.""" - return Profile(self.context, profile).userid - @_requires_login - def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]: + def get_followers(self, profile: Profile) -> Iterator[Dict[str, Any]]: """ Retrieve list of followers of given profile. To use this, one needs to be logged in and private profiles has to be followed, @@ -167,12 +145,12 @@ class Instaloader: :param profile: Name of profile to lookup followers. """ yield from self.context.graphql_node_list("37479f2b8209594dde7facb0d904896a", - {'id': str(self.get_id_by_username(profile))}, - 'https://www.instagram.com/' + profile + '/', + {'id': str(profile.userid)}, + 'https://www.instagram.com/' + profile.username + '/', lambda d: d['data']['user']['edge_followed_by']) @_requires_login - def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]: + def get_followees(self, profile: Profile) -> Iterator[Dict[str, Any]]: """ Retrieve list of followees (followings) of given profile. To use this, one needs to be logged in and private profiles has to be followed, @@ -181,8 +159,8 @@ class Instaloader: :param profile: Name of profile to lookup followers. """ yield from self.context.graphql_node_list("58712303d941c6855d4e888c5f0cd22f", - {'id': str(self.get_id_by_username(profile))}, - 'https://www.instagram.com/' + profile + '/', + {'id': str(profile.userid)}, + 'https://www.instagram.com/' + profile.username + '/', lambda d: d['data']['user']['edge_follow']) def download_pic(self, filename: str, url: str, mtime: datetime, @@ -608,7 +586,7 @@ class Instaloader: """ self.context.log("Retrieving saved posts...") count = 1 - for post in Profile(self.context, self.context.username).get_saved_posts(): + for post in Profile.from_username(self.context, self.context.username).get_saved_posts(): if max_count is not None and count > max_count: break name = post.owner_username @@ -671,15 +649,17 @@ class Instaloader: if fast_update and not downloaded: break - def check_profile_id(self, profile_name: str, profile: Optional[Profile] = None) -> str: + def check_profile_id(self, profile_name: str) -> Profile: """ Consult locally stored ID of profile with given name, check whether ID matches and whether name has changed and return current name of the profile, and store ID of profile. :param profile_name: Profile name - :param profile: The :class:`Profile`, or None if the profile was not found - :return: current profile name, profile id + :return: Instance of current profile """ + profile = None + with suppress(ProfileNotExistsException): + profile = Profile.from_username(self.context, profile_name) profile_exists = profile is not None if ((format_string_contains_key(self.dirname_pattern, 'profile') or format_string_contains_key(self.dirname_pattern, 'target'))): @@ -698,7 +678,8 @@ class Instaloader: else: self.context.log("Trying to find profile {0} using its unique ID {1}.".format(profile_name, profile_id)) - newname = self.get_username_by_id(profile_id) + profile_from_id = Profile.from_id(self.context, profile_id) + newname = profile_from_id.username self.context.log("Profile {0} has changed its name to {1}.".format(profile_name, newname)) if ((format_string_contains_key(self.dirname_pattern, 'profile') or format_string_contains_key(self.dirname_pattern, 'target'))): @@ -709,8 +690,8 @@ class Instaloader: else: os.rename('{0}/{1}_id'.format(self.dirname_pattern.format(), profile_name.lower()), '{0}/{1}_id'.format(self.dirname_pattern.format(), newname.lower())) - return newname - return profile_name + return profile_from_id + return profile except FileNotFoundError: pass if profile_exists: @@ -719,7 +700,7 @@ class Instaloader: with open(id_filename, 'w') as text_file: text_file.write(str(profile.userid) + "\n") self.context.log("Stored ID {0} for profile {1}.".format(profile.userid, profile_name)) - return profile_name + return profile raise ProfileNotExistsException("Profile {0} does not exist.".format(profile_name)) def download_profile(self, profile_name: str, @@ -728,21 +709,13 @@ class Instaloader: download_stories: bool = False, download_stories_only: bool = False, filter_func: Optional[Callable[[Post], bool]] = None) -> None: """Download one profile""" - profile_name = profile_name.lower() # Get profile main page json - profile = None - with suppress(ProfileNotExistsException): - # ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we - # must suppress it here. - profile = Profile(self.context, profile_name) - # check if profile does exist or name has changed since last download # and update name and json data if necessary - name_updated = self.check_profile_id(profile_name, profile) - if name_updated != profile_name: - profile_name = name_updated - profile = Profile(self.context, profile_name) + profile = self.check_profile_id(profile_name.lower()) + + profile_name = profile.username if self.context.is_logged_in and profile.has_blocked_viewer and not profile.is_private: # raising ProfileNotExistsException invokes "trying again anonymously" logic diff --git a/instaloader/structures.py b/instaloader/structures.py index 50fe26d..0824dc6 100644 --- a/instaloader/structures.py +++ b/instaloader/structures.py @@ -43,6 +43,7 @@ class Post: :param node: Node structure, as returned by Instagram. :param owner_profile: The Profile of the owner, if already known at creation. """ + assert 'shortcode' in node self._context = context self._node = node self._owner_profile = owner_profile @@ -105,25 +106,25 @@ class Post: d = d[key] return d + @property + def owner_profile(self) -> 'Profile': + if not self._owner_profile: + owner_struct = self._field('owner') + if 'username' in owner_struct: + self._owner_profile = Profile(self._context, owner_struct) + else: + self._owner_profile = Profile.from_id(self._context, owner_struct['id']) + return self._owner_profile + @property def owner_username(self) -> str: - """The Post's lowercase owner name, or 'UNKNOWN'.""" - try: - if self._owner_profile: - return self._owner_profile.username.lower() - return self._field('owner', 'username').lower() - except (InstaloaderException, KeyError, TypeError) as err: - if self._context.raise_all_errors: - raise err - self._context.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err)) - return 'UNKNOWN' + """The Post's lowercase owner name.""" + return self.owner_profile.username @property def owner_id(self) -> int: """The ID of the Post's owner.""" - if self._owner_profile: - return self._owner_profile.userid - return int(self._field('owner', 'id')) + return self.owner_profile.userid @property def date_local(self) -> datetime: @@ -288,28 +289,62 @@ class Profile: This class implements == and is hashable. """ - def __init__(self, context: InstaloaderContext, profile_name: str): - """ - Lookup Profile information and create Profile instance. - - :param context: :class:`InstaloaderContext` instance used for queries etc. - :param identifier: Profile name (string). - """ + def __init__(self, context: InstaloaderContext, node: Dict[str, Any]): + assert 'username' in node self._context = context + self._node = node + @classmethod + def from_username(cls, context: InstaloaderContext, username: str): + # pylint:disable=protected-access + profile = cls(context, {'username': username.lower()}) + profile._obtain_metadata() # to raise ProfileNotExistException now in case username is invalid + return profile + + @classmethod + def from_id(cls, context: InstaloaderContext, profile_id: int): + data = context.graphql_query("472f257a40c653c64c666ce877d59d2b", + {'id': str(profile_id), 'first': 1})['data']['user'] + if data: + data = data["edge_owner_to_timeline_media"] + else: + raise ProfileNotExistsException("No profile found, the user may have blocked you (ID: " + + str(profile_id) + ").") + if not data['edges']: + if data['count'] == 0: + raise ProfileHasNoPicsException("Profile with ID {0}: no pics found.".format(str(profile_id))) + else: + raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").") + username = Post.from_mediaid(context, int(data['edges'][0]["node"]["id"])).owner_username + return cls(context, {'username': username.lower(), 'id': profile_id}) + + def _obtain_metadata(self): try: - metadata = self._context.get_json('{}/'.format(profile_name), params={'__a': 1}) - self._metadata = metadata['graphql'] if 'graphql' in metadata else metadata + metadata = self._context.get_json('{}/'.format(self.username), params={'__a': 1}) + self._node = metadata['graphql']['user'] if 'graphql' in metadata else metadata['user'] except QueryReturnedNotFoundException: - raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name)) + raise ProfileNotExistsException('Profile {} does not exist.'.format(self.username)) + + def _metadata(self, *keys) -> Any: + try: + d = self._node + for key in keys: + d = d[key] + return d + except KeyError: + self._obtain_metadata() + d = self._node + for key in keys: + d = d[key] + return d @property def userid(self) -> int: - return int(self._metadata['user']['id']) + return int(self._metadata('id')) @property def username(self) -> str: - return self._metadata['user']['username'] + return self._metadata('username').lower() def __repr__(self): return ''.format(self.username, self.userid) @@ -324,50 +359,47 @@ class Profile: @property def is_private(self) -> bool: - return self._metadata['user']['is_private'] + return self._metadata('is_private') @property def followed_by_viewer(self) -> bool: - return self._metadata['user']['followed_by_viewer'] + return self._metadata('followed_by_viewer') @property def mediacount(self) -> int: - if "media" in self._metadata["user"]: - # backwards compatibility with old non-graphql structure - return self._metadata["user"]["media"]["count"] - return self._metadata["user"]["edge_owner_to_timeline_media"]["count"] + return self._metadata('edge_owner_to_timeline_media', 'count') @property def biography(self) -> str: - return self._metadata['user']['biography'] + return self._metadata('biography') @property def blocked_by_viewer(self) -> bool: - return self._metadata['user']['blocked_by_viewer'] + return self._metadata('blocked_by_viewer') @property def follows_viewer(self) -> bool: - return self._metadata['user']['follows_viewer'] + return self._metadata('follows_viewer') @property def full_name(self) -> str: - return self._metadata['user']['full_name'] + return self._metadata('full_name') @property def has_blocked_viewer(self) -> bool: - return self._metadata['user']['has_blocked_viewer'] + return self._metadata('has_blocked_viewer') @property def has_requested_viewer(self) -> bool: - return self._metadata['user']['has_requested_viewer'] + return self._metadata('has_requested_viewer') @property def is_verified(self) -> bool: - return self._metadata['user']['is_verified'] + return self._metadata('is_verified') @property def requested_by_viewer(self) -> bool: - return self._metadata['user']['requested_by_viewer'] + return self._metadata('requested_by_viewer') def get_profile_pic_url(self) -> str: """Return URL of profile picture""" @@ -378,22 +410,14 @@ class Profile: return data["user"]["hd_profile_pic_url_info"]["url"] except (InstaloaderException, KeyError) as err: self._context.error('{} Unable to fetch high quality profile pic.'.format(err)) - return self._metadata["user"]["profile_pic_url_hd"] if "profile_pic_url_hd" in self._metadata["user"] \ - else self._metadata["user"]["profile_pic_url"] + return self._metadata("profile_pic_url_hd") def get_posts(self) -> Iterator[Post]: """Retrieve all posts from a profile.""" - if 'media' in self._metadata['user']: - # backwards compatibility with old non-graphql structure - yield from (Post(self._context, node, owner_profile=self) - for node in self._metadata['user']['media']['nodes']) - has_next_page = self._metadata['user']['media']['page_info']['has_next_page'] - end_cursor = self._metadata['user']['media']['page_info']['end_cursor'] - else: - yield from (Post(self._context, edge['node'], owner_profile=self) - for edge in self._metadata['user']['edge_owner_to_timeline_media']['edges']) - has_next_page = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['has_next_page'] - end_cursor = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['end_cursor'] + yield from (Post(self._context, edge['node'], owner_profile=self) + for edge in self._metadata('edge_owner_to_timeline_media', 'edges')) + has_next_page = self._metadata('edge_owner_to_timeline_media', 'page_info', 'has_next_page') + end_cursor = self._metadata('edge_owner_to_timeline_media', 'page_info', 'end_cursor') while has_next_page: # We do not use self.graphql_node_list() here, because profile_metadata # lets us obtain the first 12 nodes 'for free' @@ -414,23 +438,18 @@ class Profile: if self.username != self._context.username: raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username)) - data = self._metadata - - while True: - if "edge_saved_media" in data["user"]: - is_edge = True - saved_media = data["user"]["edge_saved_media"] - else: - is_edge = False - saved_media = data["user"]["saved_media"] - - if is_edge: - yield from (Post(self._context, edge["node"]) for edge in saved_media["edges"]) - else: - yield from (Post(self._context, node) for node in saved_media["nodes"]) - - if not saved_media["page_info"]["has_next_page"]: - break + yield from (Post(self._context, edge['node']) + for edge in self._metadata('edge_saved_media', 'edges')) + has_next_page = self._metadata('edge_saved_media', 'page_info', 'has_next_page') + end_cursor = self._metadata('edge_saved_media', 'page_info', 'end_cursor') + while has_next_page: data = self._context.graphql_query("f883d95537fbcd400f466f63d42bd8a1", - {'id': self.userid, 'first': GRAPHQL_PAGE_LENGTH, - 'after': saved_media["page_info"]["end_cursor"]})['data'] + {'id': self.userid, + 'first': GRAPHQL_PAGE_LENGTH, + 'after': end_cursor}, + 'https://www.instagram.com/{0}/'.format(self.username)) + media = data['data']['user']['edge_saved_media'] + yield from (Post(self._context, edge['node']) + for edge in media['edges']) + has_next_page = media['page_info']['has_next_page'] + end_cursor = media['page_info']['end_cursor'] diff --git a/test/instaloader-unittests.py b/test/instaloader-unittests.py index 846200e..1e2690b 100644 --- a/test/instaloader-unittests.py +++ b/test/instaloader-unittests.py @@ -71,7 +71,7 @@ class TestInstaloader(unittest.TestCase): def test_saved_paging(self): self.L.load_session_from_file(OWN_USERNAME) - for count, post in enumerate(instaloader.Profile(self.L.context, OWN_USERNAME).get_saved_posts()): + for count, post in enumerate(instaloader.Profile.from_username(self.L.context, OWN_USERNAME).get_saved_posts()): print(post) if count == PAGING_MAX_COUNT: break @@ -82,29 +82,31 @@ class TestInstaloader(unittest.TestCase): def test_get_followees(self): self.L.load_session_from_file(OWN_USERNAME) - for f in self.L.get_followees(OWN_USERNAME): + for f in self.L.get_followees(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)): print(f['username']) def test_get_followers(self): self.L.load_session_from_file(OWN_USERNAME) - for f in self.L.get_followers(OWN_USERNAME): + for f in self.L.get_followers(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)): print(f['username']) def test_get_username_by_id(self): - self.assertEqual(PUBLIC_PROFILE.lower(), self.L.get_username_by_id(PUBLIC_PROFILE_ID)) + self.assertEqual(PUBLIC_PROFILE.lower(), + instaloader.Profile.from_id(self.L.context, PUBLIC_PROFILE_ID).username) def test_get_id_by_username(self): - self.assertEqual(PUBLIC_PROFILE_ID, self.L.get_id_by_username(PUBLIC_PROFILE)) + self.assertEqual(PUBLIC_PROFILE_ID, + instaloader.Profile.from_username(self.L.context, PUBLIC_PROFILE).userid) def test_get_likes(self): self.L.load_session_from_file(OWN_USERNAME) - for post in instaloader.Profile(self.L.context, OWN_USERNAME).get_posts(): + for post in instaloader.Profile.from_username(self.L.context, OWN_USERNAME).get_posts(): for like in post.get_likes(): print(like['username']) break def test_post_from_mediaid(self): - for post in instaloader.Profile(self.L.context, PUBLIC_PROFILE).get_posts(): + for post in instaloader.Profile.from_username(self.L.context, PUBLIC_PROFILE).get_posts(): post2 = instaloader.Post.from_mediaid(self.L.context, post.mediaid) self.assertEqual(post, post2) break