1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-08-18 12:49:38 +02:00

Story and StoryItem classes to represent stories

This commit is contained in:
André Koch-Kramer 2018-04-10 20:22:57 +02:00
parent d90d67d619
commit 5b9590a768
2 changed files with 202 additions and 23 deletions

View File

@ -16,7 +16,7 @@ from typing import Any, Callable, Dict, Iterator, List, Optional
from .exceptions import * from .exceptions import *
from .instaloadercontext import InstaloaderContext from .instaloadercontext import InstaloaderContext
from .structures import Post, Profile, mediaid_to_shortcode from .structures import Post, Profile, Story, StoryItem
def get_default_session_filename(username: str) -> str: def get_default_session_filename(username: str) -> str:
@ -430,7 +430,7 @@ class Instaloader:
return downloaded return downloaded
@_requires_login @_requires_login
def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Dict[str, Any]]: def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Story]:
"""Get available stories from followees or all stories of users whose ID are given. """Get available stories from followees or all stories of users whose ID are given.
Does not mark stories as seen. Does not mark stories as seen.
To use this, one needs to be logged in To use this, one needs to be logged in
@ -448,8 +448,7 @@ class Instaloader:
stories = self.context.graphql_query("bf41e22b1c4ba4c9f31b844ebb7d9056", stories = self.context.graphql_query("bf41e22b1c4ba4c9f31b844ebb7d9056",
{"reel_ids": userids, "precomposed_overlay": False})["data"] {"reel_ids": userids, "precomposed_overlay": False})["data"]
for media in stories["reels_media"]: yield from (Story(self.context, media) for media in stories['reels_media'])
yield media
@_requires_login @_requires_login
def download_stories(self, def download_stories(self,
@ -473,52 +472,50 @@ class Instaloader:
raise InvalidArgumentException("The \"post\" keyword is not supported in the filename pattern when " raise InvalidArgumentException("The \"post\" keyword is not supported in the filename pattern when "
"downloading stories.") "downloading stories.")
for user_stories in self.get_stories(userids): for user_story in self.get_stories(userids):
if "items" not in user_stories: name = user_story.owner_username
raise BadResponseException('Bad reel media JSON.')
name = user_stories["user"]["username"].lower()
self.context.log("Retrieving stories from profile {}.".format(name)) self.context.log("Retrieving stories from profile {}.".format(name))
totalcount = len(user_stories["items"]) totalcount = user_story.itemcount
count = 1 count = 1
for item in user_stories["items"]: for item in user_story.get_items():
self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True) self.context.log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
count += 1 count += 1
with self.context.error_catcher('Download story from user {}'.format(name)): with self.context.error_catcher('Download story from user {}'.format(name)):
downloaded = self.download_story(item, filename_target, name) downloaded = self.download_story(item, filename_target)
if fast_update and not downloaded: if fast_update and not downloaded:
break break
def download_story(self, item: Dict[str, Any], target: str, profile: str) -> bool: def download_story(self, item: StoryItem, target: str) -> bool:
"""Download one user story. """Download one user story.
:param item: Story item, as in story['items'] for story in :meth:`get_stories` :param item: Story item, as in story['items'] for story in :meth:`get_stories`
:param target: Replacement for {target} in dirname_pattern and filename_pattern :param target: Replacement for {target} in dirname_pattern and filename_pattern
:param profile: Owner profile name
:return: True if something was downloaded, False otherwise, i.e. file was already there :return: True if something was downloaded, False otherwise, i.e. file was already there
""" """
shortcode = mediaid_to_shortcode(int(item["id"])) owner_name = item.owner_username
date_local = datetime.fromtimestamp(item["taken_at_timestamp"]) shortcode = item.shortcode
date_utc = datetime.utcfromtimestamp(item["taken_at_timestamp"]) date_local = item.date_local
dirname = self.dirname_pattern.format(profile=profile, target=target) date_utc = item.date_utc
filename = dirname + '/' + self.filename_pattern.format(profile=profile, target=target, dirname = self.dirname_pattern.format(profile=owner_name, target=target)
filename = dirname + '/' + self.filename_pattern.format(profile=owner_name, target=target,
date_utc=date_utc, date_utc=date_utc,
shortcode=shortcode) shortcode=shortcode)
filename_old = dirname + '/' + self.filename_pattern_old.format(profile=profile, target=target, filename_old = dirname + '/' + self.filename_pattern_old.format(profile=owner_name, target=target,
date_utc=date_local, date_utc=date_local,
shortcode=shortcode) shortcode=shortcode)
os.makedirs(os.path.dirname(filename), exist_ok=True) os.makedirs(os.path.dirname(filename), exist_ok=True)
downloaded = False downloaded = False
if not item["is_video"] or self.download_video_thumbnails is Tristate.always: if not item.is_video or self.download_video_thumbnails is Tristate.always:
url = item["display_resources"][-1]["src"] url = item.url
downloaded = self.download_pic(filename=filename, downloaded = self.download_pic(filename=filename,
filename_alt=filename_old, filename_alt=filename_old,
url=url, url=url,
mtime=date_local) mtime=date_local)
if item["is_video"] and self.download_videos is Tristate.always: if item.is_video and self.download_videos is Tristate.always:
downloaded |= self.download_pic(filename=filename, downloaded |= self.download_pic(filename=filename,
filename_alt=filename_old, filename_alt=filename_old,
url=item["video_resources"][-1]["src"], url=item.video_url,
mtime=date_local) mtime=date_local)
self.context.log() self.context.log()
return downloaded return downloaded

View File

@ -453,3 +453,185 @@ class Profile:
for edge in media['edges']) for edge in media['edges'])
has_next_page = media['page_info']['has_next_page'] has_next_page = media['page_info']['has_next_page']
end_cursor = media['page_info']['end_cursor'] end_cursor = media['page_info']['end_cursor']
class StoryItem:
"""
Structure containing information about a user story item i.e. image or video.
Created by method :meth:`Story.get_items`. This class implements == and is hashable.
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
:param node: Dictionary containing the available information of the story item.
:param owner_profile: :class:`Profile` instance representing the story owner.
"""
def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner_profile: Optional[Profile] = None):
self._context = context
self._node = node
self._owner_profile = owner_profile
@property
def mediaid(self) -> int:
"""The mediaid is a decimal representation of the media shortcode."""
return int(self._node['id'])
@property
def shortcode(self) -> str:
return mediaid_to_shortcode(self.mediaid)
def __repr__(self):
return '<StoryItem {}>'.format(self.mediaid)
def __eq__(self, o: object) -> bool:
if isinstance(o, StoryItem):
return self.mediaid == o.mediaid
return NotImplemented
def __hash__(self) -> int:
return hash(self.mediaid)
@property
def owner_profile(self) -> Profile:
""":class:`Profile` instance of the story item's owner."""
if not self._owner_profile:
self._owner_profile = Profile.from_id(self._context, self._node['owner']['id'])
return self._owner_profile
@property
def owner_username(self) -> str:
"""The StoryItem owner's lowercase name."""
return self.owner_profile.username
@property
def owner_id(self) -> int:
"""The ID of the StoryItem owner."""
return self.owner_profile.userid
@property
def date_local(self) -> datetime:
"""Timestamp when the StoryItem was created (local time zone)."""
return datetime.fromtimestamp(self._node['taken_at_timestamp'])
@property
def date_utc(self) -> datetime:
"""Timestamp when the StoryItem was created (UTC)."""
return datetime.utcfromtimestamp(self._node['taken_at_timestamp'])
@property
def expiring_local(self) -> datetime:
"""Timestamp when the StoryItem will get unavailable (local time zone)."""
return datetime.fromtimestamp(self._node['expiring_at_timestamp'])
@property
def expiring_utc(self) -> datetime:
"""Timestamp when the StoryItem will get unavailable (UTC)."""
return datetime.utcfromtimestamp(self._node['expiring_at_timestamp'])
@property
def url(self) -> str:
"""URL of the picture / video thumbnail of the StoryItem"""
return self._node['display_resources'][-1]['src']
@property
def typename(self) -> str:
"""Type of post, GraphStoryImage or GraphStoryVideo"""
return self._node['__typename']
@property
def is_video(self) -> bool:
"""True if the StoryItem is a video."""
return self._node['is_video']
@property
def video_url(self) -> Optional[str]:
"""URL of the video, or None."""
if self.is_video:
return self._node['video_resources'][-1]['src']
class Story:
"""
Structure representing a user story with its associated items.
Provides methods for accessing story properties, as well as :meth:`Story.get_items`.
This class implements == and is hashable.
:param context: :class:`InstaloaderContext` instance used for additional queries if necessary.
:param node: Dictionary containing the available information of the story as returned by Instagram.
"""
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
self._context = context
self._node = node
self._unique_id = None
def __repr__(self):
return '<Story by {} changed {:%Y-%m-%d_%H-%M-%S_UTC}>'.format(self.owner_username, self.latest_media_utc)
def __eq__(self, o: object) -> bool:
if isinstance(o, Story):
return self.unique_id == o.unique_id
return NotImplemented
def __hash__(self) -> int:
return hash(self._unique_id)
@property
def unique_id(self) -> str:
"""
This ID only equals amongst :class:`Story` instances which have the same owner and the same set of
:class:`StoryItem`s. For all other :class:`Story` instances this ID is different.
"""
if not self._unique_id:
id_list = [item.mediaid for item in self.get_items()]
id_list.sort()
self._unique_id = str().join([str(self.owner_id)] + list(map(str, id_list)))
return self._unique_id
@property
def last_seen_local(self) -> Optional[datetime]:
"""Timestamp when the story has last been watched or None (local time zone)."""
if self._node['seen']:
return datetime.fromtimestamp(self._node['seen'])
@property
def last_seen_utc(self) -> Optional[datetime]:
"""Timestamp when the story has last been watched or None (UTC)."""
if self._node['seen']:
return datetime.utcfromtimestamp(self._node['seen'])
@property
def latest_media_local(self) -> datetime:
"""Timestamp when the last item of the story was created (local time zone)."""
return datetime.fromtimestamp(self._node['latest_reel_media'])
@property
def latest_media_utc(self) -> datetime:
"""Timestamp when the last item of the story was created (UTC)."""
return datetime.utcfromtimestamp(self._node['latest_reel_media'])
@property
def itemcount(self) -> int:
"""Count of items associated with the :class:`Story` instance."""
return len(self._node['items'])
@property
def owner_profile(self) -> Profile:
""":class:`Profile` instance of the story owner."""
return Profile(self._context, self._node['user'])
@property
def owner_username(self) -> str:
"""The story owner's lowercase username."""
return self.owner_profile.username
@property
def owner_id(self) -> int:
"""The story owner's ID."""
return self.owner_profile.userid
def get_items(self) -> Iterator[StoryItem]:
"""Retrieve all items from a story."""
yield from (StoryItem(self._context, item, self.owner_profile) for item in self._node['items'])