1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-10-03 22:07:11 +02:00

Refactor Profile to allow lazy initialization

This changes also allows us to use Profile class in more situations,
which makes the code better.
This commit is contained in:
Alexander Graf 2018-04-10 15:25:36 +02:00
parent dd1cbf0a1d
commit d90d67d619
3 changed files with 119 additions and 125 deletions

View File

@ -135,30 +135,8 @@ class Instaloader:
def __exit__(self, *args):
self.close()
def get_username_by_id(self, profile_id: int) -> str:
"""To get the current username of a profile, given its unique ID, this function can be used."""
data = self.context.graphql_query("472f257a40c653c64c666ce877d59d2b",
{'id': str(profile_id), 'first': 1})['data']['user']
if data:
data = data["edge_owner_to_timeline_media"]
else:
raise ProfileNotExistsException("No profile found, the user may have blocked you (ID: " +
str(profile_id) + ").")
if not data['edges']:
if data['count'] == 0:
raise ProfileHasNoPicsException("Profile with ID {0}: no pics found.".format(str(profile_id)))
else:
raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").")
else:
return Post.from_mediaid(self.context, int(data['edges'][0]["node"]["id"])).owner_username
def get_id_by_username(self, profile: str) -> int:
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
his/her username. To get said ID, given the profile's name, you may call this function."""
return Profile(self.context, profile).userid
@_requires_login
def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]:
def get_followers(self, profile: Profile) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followers of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
@ -167,12 +145,12 @@ class Instaloader:
:param profile: Name of profile to lookup followers.
"""
yield from self.context.graphql_node_list("37479f2b8209594dde7facb0d904896a",
{'id': str(self.get_id_by_username(profile))},
'https://www.instagram.com/' + profile + '/',
{'id': str(profile.userid)},
'https://www.instagram.com/' + profile.username + '/',
lambda d: d['data']['user']['edge_followed_by'])
@_requires_login
def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]:
def get_followees(self, profile: Profile) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followees (followings) of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
@ -181,8 +159,8 @@ class Instaloader:
:param profile: Name of profile to lookup followers.
"""
yield from self.context.graphql_node_list("58712303d941c6855d4e888c5f0cd22f",
{'id': str(self.get_id_by_username(profile))},
'https://www.instagram.com/' + profile + '/',
{'id': str(profile.userid)},
'https://www.instagram.com/' + profile.username + '/',
lambda d: d['data']['user']['edge_follow'])
def download_pic(self, filename: str, url: str, mtime: datetime,
@ -608,7 +586,7 @@ class Instaloader:
"""
self.context.log("Retrieving saved posts...")
count = 1
for post in Profile(self.context, self.context.username).get_saved_posts():
for post in Profile.from_username(self.context, self.context.username).get_saved_posts():
if max_count is not None and count > max_count:
break
name = post.owner_username
@ -671,15 +649,17 @@ class Instaloader:
if fast_update and not downloaded:
break
def check_profile_id(self, profile_name: str, profile: Optional[Profile] = None) -> str:
def check_profile_id(self, profile_name: str) -> Profile:
"""
Consult locally stored ID of profile with given name, check whether ID matches and whether name
has changed and return current name of the profile, and store ID of profile.
:param profile_name: Profile name
:param profile: The :class:`Profile`, or None if the profile was not found
:return: current profile name, profile id
:return: Instance of current profile
"""
profile = None
with suppress(ProfileNotExistsException):
profile = Profile.from_username(self.context, profile_name)
profile_exists = profile is not None
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
@ -698,7 +678,8 @@ class Instaloader:
else:
self.context.log("Trying to find profile {0} using its unique ID {1}.".format(profile_name,
profile_id))
newname = self.get_username_by_id(profile_id)
profile_from_id = Profile.from_id(self.context, profile_id)
newname = profile_from_id.username
self.context.log("Profile {0} has changed its name to {1}.".format(profile_name, newname))
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
@ -709,8 +690,8 @@ class Instaloader:
else:
os.rename('{0}/{1}_id'.format(self.dirname_pattern.format(), profile_name.lower()),
'{0}/{1}_id'.format(self.dirname_pattern.format(), newname.lower()))
return newname
return profile_name
return profile_from_id
return profile
except FileNotFoundError:
pass
if profile_exists:
@ -719,7 +700,7 @@ class Instaloader:
with open(id_filename, 'w') as text_file:
text_file.write(str(profile.userid) + "\n")
self.context.log("Stored ID {0} for profile {1}.".format(profile.userid, profile_name))
return profile_name
return profile
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile_name))
def download_profile(self, profile_name: str,
@ -728,21 +709,13 @@ class Instaloader:
download_stories: bool = False, download_stories_only: bool = False,
filter_func: Optional[Callable[[Post], bool]] = None) -> None:
"""Download one profile"""
profile_name = profile_name.lower()
# Get profile main page json
profile = None
with suppress(ProfileNotExistsException):
# ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we
# must suppress it here.
profile = Profile(self.context, profile_name)
# check if profile does exist or name has changed since last download
# and update name and json data if necessary
name_updated = self.check_profile_id(profile_name, profile)
if name_updated != profile_name:
profile_name = name_updated
profile = Profile(self.context, profile_name)
profile = self.check_profile_id(profile_name.lower())
profile_name = profile.username
if self.context.is_logged_in and profile.has_blocked_viewer and not profile.is_private:
# raising ProfileNotExistsException invokes "trying again anonymously" logic

View File

@ -43,6 +43,7 @@ class Post:
:param node: Node structure, as returned by Instagram.
:param owner_profile: The Profile of the owner, if already known at creation.
"""
assert 'shortcode' in node
self._context = context
self._node = node
self._owner_profile = owner_profile
@ -105,25 +106,25 @@ class Post:
d = d[key]
return d
@property
def owner_profile(self) -> 'Profile':
if not self._owner_profile:
owner_struct = self._field('owner')
if 'username' in owner_struct:
self._owner_profile = Profile(self._context, owner_struct)
else:
self._owner_profile = Profile.from_id(self._context, owner_struct['id'])
return self._owner_profile
@property
def owner_username(self) -> str:
"""The Post's lowercase owner name, or 'UNKNOWN'."""
try:
if self._owner_profile:
return self._owner_profile.username.lower()
return self._field('owner', 'username').lower()
except (InstaloaderException, KeyError, TypeError) as err:
if self._context.raise_all_errors:
raise err
self._context.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err))
return 'UNKNOWN'
"""The Post's lowercase owner name."""
return self.owner_profile.username
@property
def owner_id(self) -> int:
"""The ID of the Post's owner."""
if self._owner_profile:
return self._owner_profile.userid
return int(self._field('owner', 'id'))
return self.owner_profile.userid
@property
def date_local(self) -> datetime:
@ -288,28 +289,62 @@ class Profile:
This class implements == and is hashable.
"""
def __init__(self, context: InstaloaderContext, profile_name: str):
"""
Lookup Profile information and create Profile instance.
:param context: :class:`InstaloaderContext` instance used for queries etc.
:param identifier: Profile name (string).
"""
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
assert 'username' in node
self._context = context
self._node = node
@classmethod
def from_username(cls, context: InstaloaderContext, username: str):
# pylint:disable=protected-access
profile = cls(context, {'username': username.lower()})
profile._obtain_metadata() # to raise ProfileNotExistException now in case username is invalid
return profile
@classmethod
def from_id(cls, context: InstaloaderContext, profile_id: int):
data = context.graphql_query("472f257a40c653c64c666ce877d59d2b",
{'id': str(profile_id), 'first': 1})['data']['user']
if data:
data = data["edge_owner_to_timeline_media"]
else:
raise ProfileNotExistsException("No profile found, the user may have blocked you (ID: " +
str(profile_id) + ").")
if not data['edges']:
if data['count'] == 0:
raise ProfileHasNoPicsException("Profile with ID {0}: no pics found.".format(str(profile_id)))
else:
raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").")
username = Post.from_mediaid(context, int(data['edges'][0]["node"]["id"])).owner_username
return cls(context, {'username': username.lower(), 'id': profile_id})
def _obtain_metadata(self):
try:
metadata = self._context.get_json('{}/'.format(profile_name), params={'__a': 1})
self._metadata = metadata['graphql'] if 'graphql' in metadata else metadata
metadata = self._context.get_json('{}/'.format(self.username), params={'__a': 1})
self._node = metadata['graphql']['user'] if 'graphql' in metadata else metadata['user']
except QueryReturnedNotFoundException:
raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name))
raise ProfileNotExistsException('Profile {} does not exist.'.format(self.username))
def _metadata(self, *keys) -> Any:
try:
d = self._node
for key in keys:
d = d[key]
return d
except KeyError:
self._obtain_metadata()
d = self._node
for key in keys:
d = d[key]
return d
@property
def userid(self) -> int:
return int(self._metadata['user']['id'])
return int(self._metadata('id'))
@property
def username(self) -> str:
return self._metadata['user']['username']
return self._metadata('username').lower()
def __repr__(self):
return '<Profile {} ({})>'.format(self.username, self.userid)
@ -324,50 +359,47 @@ class Profile:
@property
def is_private(self) -> bool:
return self._metadata['user']['is_private']
return self._metadata('is_private')
@property
def followed_by_viewer(self) -> bool:
return self._metadata['user']['followed_by_viewer']
return self._metadata('followed_by_viewer')
@property
def mediacount(self) -> int:
if "media" in self._metadata["user"]:
# backwards compatibility with old non-graphql structure
return self._metadata["user"]["media"]["count"]
return self._metadata["user"]["edge_owner_to_timeline_media"]["count"]
return self._metadata('edge_owner_to_timeline_media', 'count')
@property
def biography(self) -> str:
return self._metadata['user']['biography']
return self._metadata('biography')
@property
def blocked_by_viewer(self) -> bool:
return self._metadata['user']['blocked_by_viewer']
return self._metadata('blocked_by_viewer')
@property
def follows_viewer(self) -> bool:
return self._metadata['user']['follows_viewer']
return self._metadata('follows_viewer')
@property
def full_name(self) -> str:
return self._metadata['user']['full_name']
return self._metadata('full_name')
@property
def has_blocked_viewer(self) -> bool:
return self._metadata['user']['has_blocked_viewer']
return self._metadata('has_blocked_viewer')
@property
def has_requested_viewer(self) -> bool:
return self._metadata['user']['has_requested_viewer']
return self._metadata('has_requested_viewer')
@property
def is_verified(self) -> bool:
return self._metadata['user']['is_verified']
return self._metadata('is_verified')
@property
def requested_by_viewer(self) -> bool:
return self._metadata['user']['requested_by_viewer']
return self._metadata('requested_by_viewer')
def get_profile_pic_url(self) -> str:
"""Return URL of profile picture"""
@ -378,22 +410,14 @@ class Profile:
return data["user"]["hd_profile_pic_url_info"]["url"]
except (InstaloaderException, KeyError) as err:
self._context.error('{} Unable to fetch high quality profile pic.'.format(err))
return self._metadata["user"]["profile_pic_url_hd"] if "profile_pic_url_hd" in self._metadata["user"] \
else self._metadata["user"]["profile_pic_url"]
return self._metadata("profile_pic_url_hd")
def get_posts(self) -> Iterator[Post]:
"""Retrieve all posts from a profile."""
if 'media' in self._metadata['user']:
# backwards compatibility with old non-graphql structure
yield from (Post(self._context, node, owner_profile=self)
for node in self._metadata['user']['media']['nodes'])
has_next_page = self._metadata['user']['media']['page_info']['has_next_page']
end_cursor = self._metadata['user']['media']['page_info']['end_cursor']
else:
yield from (Post(self._context, edge['node'], owner_profile=self)
for edge in self._metadata['user']['edge_owner_to_timeline_media']['edges'])
has_next_page = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']
end_cursor = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
yield from (Post(self._context, edge['node'], owner_profile=self)
for edge in self._metadata('edge_owner_to_timeline_media', 'edges'))
has_next_page = self._metadata('edge_owner_to_timeline_media', 'page_info', 'has_next_page')
end_cursor = self._metadata('edge_owner_to_timeline_media', 'page_info', 'end_cursor')
while has_next_page:
# We do not use self.graphql_node_list() here, because profile_metadata
# lets us obtain the first 12 nodes 'for free'
@ -414,23 +438,18 @@ class Profile:
if self.username != self._context.username:
raise LoginRequiredException("--login={} required to get that profile's saved posts.".format(self.username))
data = self._metadata
while True:
if "edge_saved_media" in data["user"]:
is_edge = True
saved_media = data["user"]["edge_saved_media"]
else:
is_edge = False
saved_media = data["user"]["saved_media"]
if is_edge:
yield from (Post(self._context, edge["node"]) for edge in saved_media["edges"])
else:
yield from (Post(self._context, node) for node in saved_media["nodes"])
if not saved_media["page_info"]["has_next_page"]:
break
yield from (Post(self._context, edge['node'])
for edge in self._metadata('edge_saved_media', 'edges'))
has_next_page = self._metadata('edge_saved_media', 'page_info', 'has_next_page')
end_cursor = self._metadata('edge_saved_media', 'page_info', 'end_cursor')
while has_next_page:
data = self._context.graphql_query("f883d95537fbcd400f466f63d42bd8a1",
{'id': self.userid, 'first': GRAPHQL_PAGE_LENGTH,
'after': saved_media["page_info"]["end_cursor"]})['data']
{'id': self.userid,
'first': GRAPHQL_PAGE_LENGTH,
'after': end_cursor},
'https://www.instagram.com/{0}/'.format(self.username))
media = data['data']['user']['edge_saved_media']
yield from (Post(self._context, edge['node'])
for edge in media['edges'])
has_next_page = media['page_info']['has_next_page']
end_cursor = media['page_info']['end_cursor']

View File

@ -71,7 +71,7 @@ class TestInstaloader(unittest.TestCase):
def test_saved_paging(self):
self.L.load_session_from_file(OWN_USERNAME)
for count, post in enumerate(instaloader.Profile(self.L.context, OWN_USERNAME).get_saved_posts()):
for count, post in enumerate(instaloader.Profile.from_username(self.L.context, OWN_USERNAME).get_saved_posts()):
print(post)
if count == PAGING_MAX_COUNT:
break
@ -82,29 +82,31 @@ class TestInstaloader(unittest.TestCase):
def test_get_followees(self):
self.L.load_session_from_file(OWN_USERNAME)
for f in self.L.get_followees(OWN_USERNAME):
for f in self.L.get_followees(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)):
print(f['username'])
def test_get_followers(self):
self.L.load_session_from_file(OWN_USERNAME)
for f in self.L.get_followers(OWN_USERNAME):
for f in self.L.get_followers(instaloader.Profile.from_username(self.L.context, OWN_USERNAME)):
print(f['username'])
def test_get_username_by_id(self):
self.assertEqual(PUBLIC_PROFILE.lower(), self.L.get_username_by_id(PUBLIC_PROFILE_ID))
self.assertEqual(PUBLIC_PROFILE.lower(),
instaloader.Profile.from_id(self.L.context, PUBLIC_PROFILE_ID).username)
def test_get_id_by_username(self):
self.assertEqual(PUBLIC_PROFILE_ID, self.L.get_id_by_username(PUBLIC_PROFILE))
self.assertEqual(PUBLIC_PROFILE_ID,
instaloader.Profile.from_username(self.L.context, PUBLIC_PROFILE).userid)
def test_get_likes(self):
self.L.load_session_from_file(OWN_USERNAME)
for post in instaloader.Profile(self.L.context, OWN_USERNAME).get_posts():
for post in instaloader.Profile.from_username(self.L.context, OWN_USERNAME).get_posts():
for like in post.get_likes():
print(like['username'])
break
def test_post_from_mediaid(self):
for post in instaloader.Profile(self.L.context, PUBLIC_PROFILE).get_posts():
for post in instaloader.Profile.from_username(self.L.context, PUBLIC_PROFILE).get_posts():
post2 = instaloader.Post.from_mediaid(self.L.context, post.mediaid)
self.assertEqual(post, post2)
break