diff --git a/instaloader/instaloader.py b/instaloader/instaloader.py index d64d8be..d7d74fe 100644 --- a/instaloader/instaloader.py +++ b/instaloader/instaloader.py @@ -22,6 +22,7 @@ from .exceptions import * from .instaloadercontext import InstaloaderContext, RateController from .lateststamps import LatestStamps from .nodeiterator import NodeIterator, resumable_iteration +from .sectioniterator import SectionIterator from .structures import (Hashtag, Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem, load_structure_from_file, save_structure_to_file, PostSidecarNode, TitlePic) @@ -1088,18 +1089,12 @@ class Instaloader: .. versionchanged:: 4.2.9 Require being logged in (as required by Instagram) """ - has_next_page = True - end_cursor = None - while has_next_page: - if end_cursor: - params = {'__a': 1, 'max_id': end_cursor} - else: - params = {'__a': 1} - location_data = self.context.get_json('explore/locations/{0}/'.format(location), - params)['graphql']['location']['edge_location_to_media'] - yield from (Post(self.context, edge['node']) for edge in location_data['edges']) - has_next_page = location_data['page_info']['has_next_page'] - end_cursor = location_data['page_info']['end_cursor'] + yield from SectionIterator( + self.context, + lambda d: d["native_location_data"]["recent"], + lambda m: Post.from_iphone_struct(self.context, m), + f"explore/locations/{location}/", + ) @_requires_login def download_location(self, location: str, diff --git a/instaloader/sectioniterator.py b/instaloader/sectioniterator.py new file mode 100644 index 0000000..77816f2 --- /dev/null +++ b/instaloader/sectioniterator.py @@ -0,0 +1,46 @@ +from typing import Any, Callable, Dict, Iterator, Optional, TypeVar + +from .instaloadercontext import InstaloaderContext + +T = TypeVar('T') + + +class SectionIterator(Iterator[T]): + """Iterator for the new 'sections'-style responses. + + .. versionadded:: 4.9""" + def __init__(self, + context: InstaloaderContext, + sections_extractor: Callable[[Dict[str, Any]], Dict[str, Any]], + media_wrapper: Callable[[Dict], T], + query_path: str, + first_data: Optional[Dict[str, Any]] = None): + self._context = context + self._sections_extractor = sections_extractor + self._media_wrapper = media_wrapper + self._query_path = query_path + self._data = first_data or self._query() + self._page_index = 0 + self._section_index = 0 + + def __iter__(self): + return self + + def _query(self, max_id: Optional[str] = None) -> Dict[str, Any]: + pagination_variables = {"max_id": max_id} if max_id is not None else {} + return self._sections_extractor( + self._context.get_json(self._query_path, params={"__a": 1, **pagination_variables}) + ) + + def __next__(self) -> T: + if self._page_index < len(self._data['sections']): + media = self._data['sections'][self._page_index]['layout_content']['medias'][self._section_index]['media'] + self._section_index += 1 + if self._section_index >= len(self._data['sections'][self._page_index]['layout_content']['medias']): + self._section_index = 0 + self._page_index += 1 + return self._media_wrapper(media) + if self._data['more_available']: + self._page_index, self._section_index, self._data = 0, 0, self._query(self._data["next_max_id"]) + return self.__next__() + raise StopIteration() diff --git a/instaloader/structures.py b/instaloader/structures.py index ece61fb..2959f76 100644 --- a/instaloader/structures.py +++ b/instaloader/structures.py @@ -3,7 +3,9 @@ import lzma import re from base64 import b64decode, b64encode from collections import namedtuple +from contextlib import suppress from datetime import datetime +from itertools import islice from pathlib import Path from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union @@ -11,6 +13,7 @@ from . import __version__ from .exceptions import * from .instaloadercontext import InstaloaderContext from .nodeiterator import FrozenNodeIterator, NodeIterator +from .sectioniterator import SectionIterator PostSidecarNode = namedtuple('PostSidecarNode', ['is_video', 'display_url', 'video_url']) PostSidecarNode.__doc__ = "Item of a Sidecar Post." @@ -89,6 +92,41 @@ class Post: """Create a post object from a given mediaid""" return cls.from_shortcode(context, Post.mediaid_to_shortcode(mediaid)) + @classmethod + def from_iphone_struct(cls, context: InstaloaderContext, media: Dict[str, Any]): + """Create a post from a given iphone_struct. + + .. versionadded:: 4.9""" + media_types = { + 1: "GraphImage", + 2: "GraphVideo", + 8: "GraphSidecar", + } + fake_node = { + "shortcode": media["code"], + "id": media["pk"], + "__typename": media_types[media["media_type"]], + "is_video": media_types[media["media_type"]] == "GraphVideo", + "date": media["taken_at"], + "caption": media["caption"].get("text") if media.get("caption") is not None else None, + "title": media.get("title"), + "viewer_has_liked": media["has_liked"], + "edge_media_preview_like": {"count": media["like_count"]}, + "iphone_struct": media, + } + with suppress(KeyError): + fake_node["display_url"] = media['image_versions2']['candidates'][0]['url'] + with suppress(KeyError): + fake_node["video_url"] = media['video_versions'][-1]['url'] + fake_node["video_duration"] = media["video_duration"] + fake_node["video_view_count"] = media["view_count"] + with suppress(KeyError): + fake_node["edge_sidecar_to_children"] = {"edges": [{"node": { + "display_url": node['image_versions2']['candidates'][0]['url'], + "is_video": media_types[node["media_type"]] == "GraphVideo", + }} for node in media["carousel_media"]]} + return cls(context, fake_node, Profile.from_iphone_struct(context, media["user"]) if "user" in media else None) + @staticmethod def shortcode_to_mediaid(code: str) -> int: if len(code) > 11: @@ -665,6 +703,20 @@ class Profile: context.profile_id_cache[profile_id] = profile return profile + @classmethod + def from_iphone_struct(cls, context: InstaloaderContext, media: Dict[str, Any]): + """Create a profile from a given iphone_struct. + + .. versionadded:: 4.9""" + return cls(context, { + "id": media["pk"], + "username": media["username"], + "is_private": media["is_private"], + "full_name": media["full_name"], + "profile_pic_url_hd": media["profile_pic_url"], + "iphone_struct": media, + }) + @classmethod def own_profile(cls, context: InstaloaderContext): """Return own profile if logged-in. @@ -1359,6 +1411,9 @@ class Hashtag: L.download_post(post, target="#"+hashtag.name) Also, this class implements == and is hashable. + + .. versionchanged:: 4.9 + Removed ``get_related_tags()`` and ``is_top_media_only`` as these features were removed from Instagram. """ def __init__(self, context: InstaloaderContext, node: Dict[str, Any]): assert "name" in node @@ -1387,8 +1442,8 @@ class Hashtag: return self._node["name"].lower() def _query(self, params): - return self._context.get_json("explore/tags/{0}/".format(self.name), - params)["graphql"]["hashtag"] + json_response = self._context.get_json("explore/tags/{0}/".format(self.name), params) + return json_response["graphql"]["hashtag"] if "graphql" in json_response else json_response["data"] def _obtain_metadata(self): if not self._has_full_metadata: @@ -1399,7 +1454,9 @@ class Hashtag: json_node = self._node.copy() # remove posts json_node.pop("edge_hashtag_to_top_posts", None) + json_node.pop("top", None) json_node.pop("edge_hashtag_to_media", None) + json_node.pop("recent", None) return json_node def __repr__(self): @@ -1435,30 +1492,33 @@ class Hashtag: return self._metadata("profile_pic_url") @property - def description(self) -> str: + def description(self) -> Optional[str]: return self._metadata("description") @property def allow_following(self) -> bool: - return self._metadata("allow_following") + return bool(self._metadata("allow_following")) @property def is_following(self) -> bool: - return self._metadata("is_following") - - @property - def is_top_media_only(self) -> bool: - return self._metadata("is_top_media_only") - - def get_related_tags(self) -> Iterator["Hashtag"]: - """Yields similar hashtags.""" - yield from (Hashtag(self._context, edge["node"]) - for edge in self._metadata("edge_hashtag_to_related_tags", "edges")) + try: + return self._metadata("is_following") + except KeyError: + return bool(self._metadata("following")) def get_top_posts(self) -> Iterator[Post]: """Yields the top posts of the hashtag.""" - yield from (Post(self._context, edge["node"]) - for edge in self._metadata("edge_hashtag_to_top_posts", "edges")) + try: + yield from (Post(self._context, edge["node"]) + for edge in self._metadata("edge_hashtag_to_top_posts", "edges")) + except KeyError: + yield from SectionIterator( + self._context, + lambda d: d["data"]["top"], + lambda m: Post.from_iphone_struct(self._context, m), + f"explore/tags/{self.name}/", + self._metadata("top"), + ) @property def mediacount(self) -> int: @@ -1468,22 +1528,34 @@ class Hashtag: The number of posts with a certain hashtag may differ from the number of posts that can actually be accessed, as the hashtag count might include private posts """ - return self._metadata("edge_hashtag_to_media", "count") + try: + return self._metadata("edge_hashtag_to_media", "count") + except KeyError: + return self._metadata("media_count") def get_posts(self) -> Iterator[Post]: - """Yields the posts associated with this hashtag.""" - self._metadata("edge_hashtag_to_media", "edges") - self._metadata("edge_hashtag_to_media", "page_info") - conn = self._metadata("edge_hashtag_to_media") - yield from (Post(self._context, edge["node"]) for edge in conn["edges"]) - while conn["page_info"]["has_next_page"]: - data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]}) - conn = data["edge_hashtag_to_media"] + """Yields the recent posts associated with this hashtag.""" + try: + self._metadata("edge_hashtag_to_media", "edges") + self._metadata("edge_hashtag_to_media", "page_info") + conn = self._metadata("edge_hashtag_to_media") yield from (Post(self._context, edge["node"]) for edge in conn["edges"]) + while conn["page_info"]["has_next_page"]: + data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]}) + conn = data["edge_hashtag_to_media"] + yield from (Post(self._context, edge["node"]) for edge in conn["edges"]) + except KeyError: + yield from SectionIterator( + self._context, + lambda d: d["data"]["recent"], + lambda m: Post.from_iphone_struct(self._context, m), + f"explore/tags/{self.name}/", + self._metadata("recent"), + ) def get_all_posts(self) -> Iterator[Post]: """Yields all posts, i.e. all most recent posts and the top posts, in almost-chronological order.""" - sorted_top_posts = iter(sorted(self.get_top_posts(), key=lambda p: p.date_utc, reverse=True)) + sorted_top_posts = iter(sorted(islice(self.get_top_posts(), 9), key=lambda p: p.date_utc, reverse=True)) other_posts = self.get_posts() next_top = next(sorted_top_posts, None) next_other = next(other_posts, None) @@ -1510,6 +1582,20 @@ class Hashtag: yield next_other next_other = next(other_posts, None) + def get_posts_resumable(self) -> NodeIterator[Post]: + """Get the recent posts of the hashtag in a resumable fashion. + + :rtype: NodeIterator[Post] + + .. versionadded:: 4.9""" + return NodeIterator( + self._context, "9b498c08113f1e09617a1703c22b2f32", + lambda d: d['data']['hashtag']['edge_hashtag_to_media'], + lambda n: Post(self._context, n), + {'tag_name': self.name}, + f"https://www.instagram.com/explore/tags/{self.name}/" + ) + class TopSearchResults: """