1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-10-03 22:07:11 +02:00

Profile class abstracting profile structure

This commit is contained in:
Alexander Graf 2018-03-18 16:31:32 +01:00
parent 279de0a641
commit 760688d9e6
2 changed files with 228 additions and 140 deletions

View File

@ -37,8 +37,8 @@ certain source::
L.download_post(post, target='#cat')
Besides :func:`Instaloader.get_hashtag_posts`, there is
:func:`Instaloader.get_feed_posts`, :func:`Instaloader.get_profile_posts` and
:func:`Instaloader.get_saved_posts`.
:func:`Instaloader.get_feed_posts`, :func:`Profile.get_posts` and
:func:`Profile.get_saved_posts`.
Also, :class:`Post` instances can be created with :func:`Post.from_shortcode`
and :func:`Post.from_mediaid`.
@ -77,6 +77,12 @@ contribution to
.. autoclass:: Instaloader
:no-show-inheritance:
``Profile`` Class
^^^^^^^^^^^^^^^^^
.. autoclass:: Profile
:no-show-inheritance:
``Post`` Class
^^^^^^^^^^^^^^

View File

@ -22,7 +22,7 @@ from datetime import datetime
from enum import Enum
from io import BytesIO
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
import requests
import requests.utils
@ -169,8 +169,8 @@ class Post:
"""
Structure containing information about an Instagram post.
Created by Instaloader methods :meth:`.get_profile_posts`, :meth:`.get_hashtag_posts`, :meth:`.get_feed_posts` and
:meth:`.get_saved_posts`.
Created by methods :meth:`Profile.get_posts`, :meth:`Instaloader.get_hashtag_posts`,
:meth:`Instaloader.get_feed_posts` and :meth:`Profile.get_saved_posts`.
Posts are linked to an :class:`Instaloader` instance which is used for error logging and obtaining of additional
metadata, if required. This class unifies access to the properties associated with a post. It implements == and is
hashable.
@ -182,17 +182,16 @@ class Post:
LOGIN_REQUIRING_PROPERTIES = ["viewer_has_liked"]
def __init__(self, instaloader: 'Instaloader', node: Dict[str, Any],
profile: Optional[str] = None, profile_id: Optional[int] = None):
owner_profile: Optional['Profile'] = None):
"""Create a Post instance from a node structure as returned by Instagram.
:param instaloader: :class:`Instaloader` instance used for additional queries if neccessary.
:param node: Node structure.
:param profile: The name of the owner, if already known at creation.
:param node: Node structure, as returned by Instagram.
:param owner_profile: The Profile of the owner, if already known at creation.
"""
self._instaloader = instaloader
self._node = node
self._profile = profile
self._profile_id = profile_id
self._owner_profile = owner_profile
self._full_metadata_dict = None
@classmethod
@ -257,8 +256,8 @@ class Post:
def owner_username(self) -> str:
"""The Post's lowercase owner name, or 'UNKNOWN'."""
try:
if self._profile:
return self._profile.lower()
if self._owner_profile:
return self._owner_profile.username.lower()
return self._field('owner', 'username').lower()
except (InstaloaderException, KeyError, TypeError) as err:
self._instaloader.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err))
@ -267,8 +266,8 @@ class Post:
@property
def owner_id(self) -> int:
"""The ID of the Post's owner."""
if self._profile_id:
return self._profile_id
if self._owner_profile:
return self._owner_profile.userid
return int(self._field('owner', 'id'))
@property
@ -423,6 +422,155 @@ class Post:
jsondict[prop] = val.isoformat()
return jsondict
class Profile:
"""
An Instagram Profile.
Instances are linked to an :class:`Instaloader` instance. This class implements == and is hashable.
"""
def __init__(self, instaloader: 'Instaloader', identifier: Union[str, int]):
"""
Lookup Profile information and create Profile instance.
:param instaloader: :class:`Instaloader` instance used for queries etc.
:param identifier: Profile name (string) or Profile ID (integer).
"""
self._instaloader = instaloader
profile_name = identifier if isinstance(identifier, str) else \
self._instaloader.get_username_by_id(identifier)
try:
metadata = self._instaloader.get_json('{}/'.format(profile_name), params={'__a': 1})
self._metadata = metadata['graphql'] if 'graphql' in metadata else metadata
except QueryReturnedNotFoundException:
raise ProfileNotExistsException('Profile {} does not exist.'.format(identifier))
@property
def userid(self) -> int:
return int(self._metadata['user']['id'])
@property
def username(self) -> str:
return self._metadata['user']['username']
def __repr__(self):
return '<Profile {} ({})>'.format(self.username, self.userid)
def __eq__(self, o: object) -> bool:
if isinstance(o, Profile):
return self.userid == o.userid
return NotImplemented
def __hash__(self) -> int:
return hash(self.userid)
@property
def is_private(self) -> bool:
return self._metadata['user']['is_private']
@property
def followed_by_viewer(self) -> bool:
return self._metadata['user']['followed_by_viewer']
@property
def mediacount(self) -> int:
if "media" in self._metadata["user"]:
# backwards compatibility with old non-graphql structure
return self._metadata["user"]["media"]["count"]
return self._metadata["user"]["edge_owner_to_timeline_media"]["count"]
@property
def biography(self) -> str:
return self._metadata['user']['biography']
@property
def blocked_by_viewer(self) -> bool:
return self._metadata['user']['blocked_by_viewer']
@property
def follows_viewer(self) -> bool:
return self._metadata['user']['follows_viewer']
@property
def full_name(self) -> str:
return self._metadata['user']['full_name']
@property
def has_blocked_viewer(self) -> bool:
return self._metadata['user']['has_blocked_viewer']
@property
def has_requested_viewer(self) -> bool:
return self._metadata['user']['has_requested_viewer']
@property
def is_verified(self) -> bool:
return self._metadata['user']['is_verified']
@property
def requested_by_viewer(self) -> bool:
return self._metadata['user']['requested_by_viewer']
@property
def profile_pic_url(self) -> str:
return self._metadata["user"]["profile_pic_url_hd"] if "profile_pic_url_hd" in self._metadata["user"] \
else self._metadata["user"]["profile_pic_url"]
def get_posts(self) -> Iterator[Post]:
"""Retrieve all posts from a profile."""
if 'media' in self._metadata['user']:
# backwards compatibility with old non-graphql structure
yield from (Post(self._instaloader, node, owner_profile=self)
for node in self._metadata['user']['media']['nodes'])
has_next_page = self._metadata['user']['media']['page_info']['has_next_page']
end_cursor = self._metadata['user']['media']['page_info']['end_cursor']
else:
yield from (Post(self._instaloader, edge['node'], owner_profile=self)
for edge in self._metadata['user']['edge_owner_to_timeline_media']['edges'])
has_next_page = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']
end_cursor = self._metadata['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
while has_next_page:
# We do not use self.graphql_node_list() here, because profile_metadata
# lets us obtain the first 12 nodes 'for free'
data = self._instaloader.graphql_query(17888483320059182, {'id': self.userid,
'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': end_cursor},
'https://www.instagram.com/{0}/'.format(self.username))
media = data['data']['user']['edge_owner_to_timeline_media']
yield from (Post(self._instaloader, edge['node'], owner_profile=self)
for edge in media['edges'])
has_next_page = media['page_info']['has_next_page']
end_cursor = media['page_info']['end_cursor']
def get_saved_posts(self) -> Iterator[Post]:
"""Get Posts that are marked as saved by the user."""
if self.username != self._instaloader.username:
return
data = self._metadata
while True:
if "edge_saved_media" in data["user"]:
is_edge = True
saved_media = data["user"]["edge_saved_media"]
else:
is_edge = False
saved_media = data["user"]["saved_media"]
if is_edge:
yield from (Post(self._instaloader, edge["node"]) for edge in saved_media["edges"])
else:
yield from (Post(self._instaloader, node) for node in saved_media["nodes"])
if not saved_media["page_info"]["has_next_page"]:
break
data = self._instaloader.graphql_query("f883d95537fbcd400f466f63d42bd8a1",
{'id': self.userid, 'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': saved_media["page_info"]["end_cursor"]})['data']
class Tristate(Enum):
"""Tri-state to encode whether we should save certain information, i.e. videos, captions, comments or geotags.
@ -721,7 +869,7 @@ class Instaloader:
def get_id_by_username(self, profile: str) -> int:
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
his/her username. To get said ID, given the profile's name, you may call this function."""
return int(self.get_profile_metadata(profile)['user']['id'])
return Profile(self, profile).userid
def graphql_node_list(self, query_identifier: Union[int, str], query_variables: Dict[str, Any],
query_referer: Optional[str],
@ -881,35 +1029,32 @@ class Instaloader:
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
self._log('geo', end=' ', flush=True)
def download_profilepic(self, name: str, profile_metadata: Dict[str, Any]) -> None:
def download_profilepic(self, profile: Profile) -> None:
"""Downloads and saves profile pic."""
url = profile_metadata["user"]["profile_pic_url_hd"] if "profile_pic_url_hd" in profile_metadata["user"] \
else profile_metadata["user"]["profile_pic_url"]
def _epoch_to_string(epoch: datetime) -> str:
return epoch.strftime('%Y-%m-%d_%H-%M-%S')
date_object = datetime.strptime(self._get_anonymous_session().head(url).headers["Last-Modified"],
date_object = datetime.strptime(self._get_anonymous_session().head(profile.profile_pic_url).headers["Last-Modified"],
'%a, %d %b %Y %H:%M:%S GMT')
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
filename = '{0}/{1}_UTC_profile_pic.{2}'.format(self.dirname_pattern.format(profile=name.lower(),
target=name.lower()),
_epoch_to_string(date_object), url[-3:])
filename = '{0}/{1}_UTC_profile_pic.{2}'.format(self.dirname_pattern.format(profile=profile.username.lower(),
target=profile.username.lower()),
_epoch_to_string(date_object), profile.profile_pic_url[-3:])
else:
filename = '{0}/{1}_{2}_UTC_profile_pic.{3}'.format(self.dirname_pattern.format(), name.lower(),
_epoch_to_string(date_object), url[-3:])
filename = '{0}/{1}_{2}_UTC_profile_pic.{3}'.format(self.dirname_pattern.format(), profile.username.lower(),
_epoch_to_string(date_object), profile.profile_pic_url[-3:])
if os.path.isfile(filename):
self._log(filename + ' already exists')
return None
url_best = re.sub(r'/s([1-9][0-9]{2})x\1/', '/s2048x2048/', url)
url_best = re.sub(r'/s([1-9][0-9]{2})x\1/', '/s2048x2048/', profile.profile_pic_url)
url_best = re.sub(r'/vp/[a-f0-9]{32}/[A-F0-9]{8}/', '/', url_best) # remove signature
try:
self._get_and_write_raw(url_best, filename)
except (QueryReturnedForbiddenException, QueryReturnedNotFoundException) as err:
self.error('{} Retrying with lower quality version.'.format(err))
self._get_and_write_raw(url, filename)
self._get_and_write_raw(profile.profile_pic_url, filename)
os.utime(filename, (datetime.now().timestamp(), date_object.timestamp()))
self._log('') # log output of _get_and_write_raw() does not produce \n
@ -1241,31 +1386,6 @@ class Instaloader:
if fast_update and not downloaded:
break
def get_saved_posts(self) -> Iterator[Post]:
"""Get Posts that are marked as saved by the user."""
data = self.get_profile_metadata(self.username)
user_id = data["user"]["id"]
while True:
if "edge_saved_media" in data["user"]:
is_edge = True
saved_media = data["user"]["edge_saved_media"]
else:
is_edge = False
saved_media = data["user"]["saved_media"]
if is_edge:
yield from (Post(self, edge["node"]) for edge in saved_media["edges"])
else:
yield from (Post(self, node) for node in saved_media["nodes"])
if not saved_media["page_info"]["has_next_page"]:
break
data = self.graphql_query("f883d95537fbcd400f466f63d42bd8a1",
{'id': user_id, 'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': saved_media["page_info"]["end_cursor"]})['data']
def download_saved_posts(self, max_count: int = None, fast_update: bool = False,
filter_func: Optional[Callable[[Post], bool]] = None) -> None:
"""Download user's saved pictures.
@ -1275,7 +1395,7 @@ class Instaloader:
:param filter_func: function(post), which returns True if given picture should be downloaded
"""
count = 1
for post in self.get_saved_posts():
for post in Profile(self, self.username).get_saved_posts():
if max_count is not None and count > max_count:
break
name = post.owner_username
@ -1327,156 +1447,118 @@ class Instaloader:
if fast_update and not downloaded:
break
def check_profile_id(self, profile: str, profile_metadata: Optional[Dict[str, Any]] = None) -> Tuple[str, int]:
def check_profile_id(self, profile_name: str, profile: Optional[Profile] = None) -> str:
"""
Consult locally stored ID of profile with given name, check whether ID matches and whether name
has changed and return current name of the profile, and store ID of profile.
:param profile: Profile name
:param profile_metadata:
The profile's metadata (:meth:`get_profile_metadata`), or None if the profile was not found
:param profile_name: Profile name
:param profile: The :class:`Profile`, or None if the profile was not found
:return: current profile name, profile id
"""
profile_exists = profile_metadata is not None
profile_exists = profile is not None
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
id_filename = '{0}/id'.format(self.dirname_pattern.format(profile=profile.lower(),
target=profile.lower()))
id_filename = '{0}/id'.format(self.dirname_pattern.format(profile=profile_name.lower(),
target=profile_name.lower()))
else:
id_filename = '{0}/{1}_id'.format(self.dirname_pattern.format(), profile.lower())
id_filename = '{0}/{1}_id'.format(self.dirname_pattern.format(), profile_name.lower())
try:
with open(id_filename, 'rb') as id_file:
profile_id = int(id_file.read())
if (not profile_exists) or \
(profile_id != int(profile_metadata['user']['id'])):
(profile_id != profile.userid):
if profile_exists:
self._log("Profile {0} does not match the stored unique ID {1}.".format(profile, profile_id))
self._log("Profile {0} does not match the stored unique ID {1}.".format(profile_name, profile_id))
else:
self._log("Trying to find profile {0} using its unique ID {1}.".format(profile, profile_id))
self._log("Trying to find profile {0} using its unique ID {1}.".format(profile_name, profile_id))
newname = self.get_username_by_id(profile_id)
self._log("Profile {0} has changed its name to {1}.".format(profile, newname))
self._log("Profile {0} has changed its name to {1}.".format(profile_name, newname))
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
os.rename(self.dirname_pattern.format(profile=profile.lower(),
target=profile.lower()),
os.rename(self.dirname_pattern.format(profile=profile_name.lower(),
target=profile_name.lower()),
self.dirname_pattern.format(profile=newname.lower(),
target=newname.lower()))
else:
os.rename('{0}/{1}_id'.format(self.dirname_pattern.format(), profile.lower()),
os.rename('{0}/{1}_id'.format(self.dirname_pattern.format(), profile_name.lower()),
'{0}/{1}_id'.format(self.dirname_pattern.format(), newname.lower()))
return newname, profile_id
return profile, profile_id
return newname
return profile_name
except FileNotFoundError:
pass
if profile_exists:
os.makedirs(self.dirname_pattern.format(profile=profile.lower(),
target=profile.lower()), exist_ok=True)
os.makedirs(self.dirname_pattern.format(profile=profile_name.lower(),
target=profile_name.lower()), exist_ok=True)
with open(id_filename, 'w') as text_file:
profile_id = profile_metadata['user']['id']
text_file.write(profile_id + "\n")
self._log("Stored ID {0} for profile {1}.".format(profile_id, profile))
return profile, profile_id
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
text_file.write(str(profile.userid) + "\n")
self._log("Stored ID {0} for profile {1}.".format(profile.userid, profile_name))
return profile_name
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile_name))
def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]:
"""Retrieves a profile's metadata, for use with e.g. :meth:`get_profile_posts` and :meth:`check_profile_id`."""
try:
metadata = self.get_json('{}/'.format(profile_name), params={'__a': 1})
return metadata['graphql'] if 'graphql' in metadata else metadata
except QueryReturnedNotFoundException:
raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name))
def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Post]:
"""Retrieve all posts from a profile."""
profile_name = profile_metadata['user']['username']
profile_id = int(profile_metadata['user']['id'])
if 'media' in profile_metadata['user']:
# backwards compatibility with old non-graphql structure
yield from (Post(self, node, profile=profile_name, profile_id=profile_id)
for node in profile_metadata['user']['media']['nodes'])
has_next_page = profile_metadata['user']['media']['page_info']['has_next_page']
end_cursor = profile_metadata['user']['media']['page_info']['end_cursor']
else:
yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id)
for edge in profile_metadata['user']['edge_owner_to_timeline_media']['edges'])
has_next_page = profile_metadata['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']
end_cursor = profile_metadata['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
while has_next_page:
# We do not use self.graphql_node_list() here, because profile_metadata
# lets us obtain the first 12 nodes 'for free'
data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
'first': Instaloader.GRAPHQL_PAGE_LENGTH,
'after': end_cursor},
'https://www.instagram.com/{0}/'.format(profile_name))
media = data['data']['user']['edge_owner_to_timeline_media']
yield from (Post(self, edge['node'], profile=profile_name, profile_id=profile_id)
for edge in media['edges'])
has_next_page = media['page_info']['has_next_page']
end_cursor = media['page_info']['end_cursor']
def download_profile(self, name: str,
def download_profile(self, profile_name: str,
profile_pic: bool = True, profile_pic_only: bool = False,
fast_update: bool = False,
download_stories: bool = False, download_stories_only: bool = False,
filter_func: Optional[Callable[[Post], bool]] = None) -> None:
"""Download one profile"""
name = name.lower()
profile_name = profile_name.lower()
# Get profile main page json
profile_metadata = None
profile = None
with suppress(ProfileNotExistsException):
# ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we
# must suppress it here.
profile_metadata = self.get_profile_metadata(name)
profile = Profile(self, profile_name)
# check if profile does exist or name has changed since last download
# and update name and json data if necessary
name_updated, profile_id = self.check_profile_id(name, profile_metadata)
if name_updated != name:
name = name_updated
profile_metadata = self.get_profile_metadata(name)
name_updated = self.check_profile_id(profile_name, profile)
if name_updated != profile_name:
profile_name = name_updated
profile = Profile(self, profile_name)
if self.is_logged_in and profile.has_blocked_viewer and not profile.is_private:
# raising ProfileNotExistsException invokes "trying again anonymously" logic
raise ProfileNotExistsException("Profile {} has blocked you".format(profile_name))
# Download profile picture
if profile_pic or profile_pic_only:
with self._error_catcher('Download profile picture of {}'.format(name)):
self.download_profilepic(name, profile_metadata)
with self._error_catcher('Download profile picture of {}'.format(profile_name)):
self.download_profilepic(profile)
if profile_pic_only:
return
# Catch some errors
if profile_metadata["user"]["is_private"]:
if profile.is_private:
if not self.is_logged_in:
raise LoginRequiredException("profile %s requires login" % name)
if not profile_metadata["user"]["followed_by_viewer"] and \
self.username != profile_metadata["user"]["username"]:
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name)
raise LoginRequiredException("profile %s requires login" % profile_name)
if not profile.followed_by_viewer and \
self.username != profile.username:
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % profile_name)
else:
if self.is_logged_in and not (download_stories or download_stories_only):
self._log("profile %s could also be downloaded anonymously." % name)
self._log("profile %s could also be downloaded anonymously." % profile_name)
# Download stories, if requested
if download_stories or download_stories_only:
with self._error_catcher("Download stories of {}".format(name)):
self.download_stories(userids=[profile_id], filename_target=name, fast_update=fast_update)
with self._error_catcher("Download stories of {}".format(profile_name)):
self.download_stories(userids=[profile.userid], filename_target=profile_name, fast_update=fast_update)
if download_stories_only:
return
# Iterate over pictures and download them
self._log("Retrieving posts from profile {}.".format(name))
if "media" in profile_metadata["user"]:
# backwards compatibility with old non-graphql structure
totalcount = profile_metadata["user"]["media"]["count"]
else:
totalcount = profile_metadata["user"]["edge_owner_to_timeline_media"]["count"]
self._log("Retrieving posts from profile {}.".format(profile_name))
totalcount = profile.mediacount
count = 1
for post in self.get_profile_posts(profile_metadata):
for post in profile.get_posts():
self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
count += 1
if filter_func is not None and not filter_func(post):
self._log('<skipped>')
continue
with self._error_catcher('Download profile {}'.format(name)):
downloaded = self.download_post(post, target=name)
with self._error_catcher('Download profile {}'.format(profile_name)):
downloaded = self.download_post(post, target=profile_name)
if fast_update and not downloaded:
break