1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-08-18 04:39:39 +02:00

Fix download of hashtags and locations

Fixes #1080, fixes #1129, closes #1240.
This commit is contained in:
Alexander Graf 2021-11-12 20:17:24 +01:00
parent d6fd4c560c
commit 5d18857695
3 changed files with 165 additions and 38 deletions

View File

@ -22,6 +22,7 @@ from .exceptions import *
from .instaloadercontext import InstaloaderContext, RateController
from .lateststamps import LatestStamps
from .nodeiterator import NodeIterator, resumable_iteration
from .sectioniterator import SectionIterator
from .structures import (Hashtag, Highlight, JsonExportable, Post, PostLocation, Profile, Story, StoryItem,
load_structure_from_file, save_structure_to_file, PostSidecarNode, TitlePic)
@ -1088,18 +1089,12 @@ class Instaloader:
.. versionchanged:: 4.2.9
Require being logged in (as required by Instagram)
"""
has_next_page = True
end_cursor = None
while has_next_page:
if end_cursor:
params = {'__a': 1, 'max_id': end_cursor}
else:
params = {'__a': 1}
location_data = self.context.get_json('explore/locations/{0}/'.format(location),
params)['graphql']['location']['edge_location_to_media']
yield from (Post(self.context, edge['node']) for edge in location_data['edges'])
has_next_page = location_data['page_info']['has_next_page']
end_cursor = location_data['page_info']['end_cursor']
yield from SectionIterator(
self.context,
lambda d: d["native_location_data"]["recent"],
lambda m: Post.from_iphone_struct(self.context, m),
f"explore/locations/{location}/",
)
@_requires_login
def download_location(self, location: str,

View File

@ -0,0 +1,46 @@
from typing import Any, Callable, Dict, Iterator, Optional, TypeVar
from .instaloadercontext import InstaloaderContext
T = TypeVar('T')
class SectionIterator(Iterator[T]):
"""Iterator for the new 'sections'-style responses.
.. versionadded:: 4.9"""
def __init__(self,
context: InstaloaderContext,
sections_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
media_wrapper: Callable[[Dict], T],
query_path: str,
first_data: Optional[Dict[str, Any]] = None):
self._context = context
self._sections_extractor = sections_extractor
self._media_wrapper = media_wrapper
self._query_path = query_path
self._data = first_data or self._query()
self._page_index = 0
self._section_index = 0
def __iter__(self):
return self
def _query(self, max_id: Optional[str] = None) -> Dict[str, Any]:
pagination_variables = {"max_id": max_id} if max_id is not None else {}
return self._sections_extractor(
self._context.get_json(self._query_path, params={"__a": 1, **pagination_variables})
)
def __next__(self) -> T:
if self._page_index < len(self._data['sections']):
media = self._data['sections'][self._page_index]['layout_content']['medias'][self._section_index]['media']
self._section_index += 1
if self._section_index >= len(self._data['sections'][self._page_index]['layout_content']['medias']):
self._section_index = 0
self._page_index += 1
return self._media_wrapper(media)
if self._data['more_available']:
self._page_index, self._section_index, self._data = 0, 0, self._query(self._data["next_max_id"])
return self.__next__()
raise StopIteration()

View File

@ -3,7 +3,9 @@ import lzma
import re
from base64 import b64decode, b64encode
from collections import namedtuple
from contextlib import suppress
from datetime import datetime
from itertools import islice
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
@ -11,6 +13,7 @@ from . import __version__
from .exceptions import *
from .instaloadercontext import InstaloaderContext
from .nodeiterator import FrozenNodeIterator, NodeIterator
from .sectioniterator import SectionIterator
PostSidecarNode = namedtuple('PostSidecarNode', ['is_video', 'display_url', 'video_url'])
PostSidecarNode.__doc__ = "Item of a Sidecar Post."
@ -89,6 +92,41 @@ class Post:
"""Create a post object from a given mediaid"""
return cls.from_shortcode(context, Post.mediaid_to_shortcode(mediaid))
@classmethod
def from_iphone_struct(cls, context: InstaloaderContext, media: Dict[str, Any]):
"""Create a post from a given iphone_struct.
.. versionadded:: 4.9"""
media_types = {
1: "GraphImage",
2: "GraphVideo",
8: "GraphSidecar",
}
fake_node = {
"shortcode": media["code"],
"id": media["pk"],
"__typename": media_types[media["media_type"]],
"is_video": media_types[media["media_type"]] == "GraphVideo",
"date": media["taken_at"],
"caption": media["caption"].get("text") if media.get("caption") is not None else None,
"title": media.get("title"),
"viewer_has_liked": media["has_liked"],
"edge_media_preview_like": {"count": media["like_count"]},
"iphone_struct": media,
}
with suppress(KeyError):
fake_node["display_url"] = media['image_versions2']['candidates'][0]['url']
with suppress(KeyError):
fake_node["video_url"] = media['video_versions'][-1]['url']
fake_node["video_duration"] = media["video_duration"]
fake_node["video_view_count"] = media["view_count"]
with suppress(KeyError):
fake_node["edge_sidecar_to_children"] = {"edges": [{"node": {
"display_url": node['image_versions2']['candidates'][0]['url'],
"is_video": media_types[node["media_type"]] == "GraphVideo",
}} for node in media["carousel_media"]]}
return cls(context, fake_node, Profile.from_iphone_struct(context, media["user"]) if "user" in media else None)
@staticmethod
def shortcode_to_mediaid(code: str) -> int:
if len(code) > 11:
@ -665,6 +703,20 @@ class Profile:
context.profile_id_cache[profile_id] = profile
return profile
@classmethod
def from_iphone_struct(cls, context: InstaloaderContext, media: Dict[str, Any]):
"""Create a profile from a given iphone_struct.
.. versionadded:: 4.9"""
return cls(context, {
"id": media["pk"],
"username": media["username"],
"is_private": media["is_private"],
"full_name": media["full_name"],
"profile_pic_url_hd": media["profile_pic_url"],
"iphone_struct": media,
})
@classmethod
def own_profile(cls, context: InstaloaderContext):
"""Return own profile if logged-in.
@ -1359,6 +1411,9 @@ class Hashtag:
L.download_post(post, target="#"+hashtag.name)
Also, this class implements == and is hashable.
.. versionchanged:: 4.9
Removed ``get_related_tags()`` and ``is_top_media_only`` as these features were removed from Instagram.
"""
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
assert "name" in node
@ -1387,8 +1442,8 @@ class Hashtag:
return self._node["name"].lower()
def _query(self, params):
return self._context.get_json("explore/tags/{0}/".format(self.name),
params)["graphql"]["hashtag"]
json_response = self._context.get_json("explore/tags/{0}/".format(self.name), params)
return json_response["graphql"]["hashtag"] if "graphql" in json_response else json_response["data"]
def _obtain_metadata(self):
if not self._has_full_metadata:
@ -1399,7 +1454,9 @@ class Hashtag:
json_node = self._node.copy()
# remove posts
json_node.pop("edge_hashtag_to_top_posts", None)
json_node.pop("top", None)
json_node.pop("edge_hashtag_to_media", None)
json_node.pop("recent", None)
return json_node
def __repr__(self):
@ -1435,30 +1492,33 @@ class Hashtag:
return self._metadata("profile_pic_url")
@property
def description(self) -> str:
def description(self) -> Optional[str]:
return self._metadata("description")
@property
def allow_following(self) -> bool:
return self._metadata("allow_following")
return bool(self._metadata("allow_following"))
@property
def is_following(self) -> bool:
return self._metadata("is_following")
@property
def is_top_media_only(self) -> bool:
return self._metadata("is_top_media_only")
def get_related_tags(self) -> Iterator["Hashtag"]:
"""Yields similar hashtags."""
yield from (Hashtag(self._context, edge["node"])
for edge in self._metadata("edge_hashtag_to_related_tags", "edges"))
try:
return self._metadata("is_following")
except KeyError:
return bool(self._metadata("following"))
def get_top_posts(self) -> Iterator[Post]:
"""Yields the top posts of the hashtag."""
yield from (Post(self._context, edge["node"])
for edge in self._metadata("edge_hashtag_to_top_posts", "edges"))
try:
yield from (Post(self._context, edge["node"])
for edge in self._metadata("edge_hashtag_to_top_posts", "edges"))
except KeyError:
yield from SectionIterator(
self._context,
lambda d: d["data"]["top"],
lambda m: Post.from_iphone_struct(self._context, m),
f"explore/tags/{self.name}/",
self._metadata("top"),
)
@property
def mediacount(self) -> int:
@ -1468,22 +1528,34 @@ class Hashtag:
The number of posts with a certain hashtag may differ from the number of posts that can actually be accessed, as
the hashtag count might include private posts
"""
return self._metadata("edge_hashtag_to_media", "count")
try:
return self._metadata("edge_hashtag_to_media", "count")
except KeyError:
return self._metadata("media_count")
def get_posts(self) -> Iterator[Post]:
"""Yields the posts associated with this hashtag."""
self._metadata("edge_hashtag_to_media", "edges")
self._metadata("edge_hashtag_to_media", "page_info")
conn = self._metadata("edge_hashtag_to_media")
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
while conn["page_info"]["has_next_page"]:
data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]})
conn = data["edge_hashtag_to_media"]
"""Yields the recent posts associated with this hashtag."""
try:
self._metadata("edge_hashtag_to_media", "edges")
self._metadata("edge_hashtag_to_media", "page_info")
conn = self._metadata("edge_hashtag_to_media")
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
while conn["page_info"]["has_next_page"]:
data = self._query({'__a': 1, 'max_id': conn["page_info"]["end_cursor"]})
conn = data["edge_hashtag_to_media"]
yield from (Post(self._context, edge["node"]) for edge in conn["edges"])
except KeyError:
yield from SectionIterator(
self._context,
lambda d: d["data"]["recent"],
lambda m: Post.from_iphone_struct(self._context, m),
f"explore/tags/{self.name}/",
self._metadata("recent"),
)
def get_all_posts(self) -> Iterator[Post]:
"""Yields all posts, i.e. all most recent posts and the top posts, in almost-chronological order."""
sorted_top_posts = iter(sorted(self.get_top_posts(), key=lambda p: p.date_utc, reverse=True))
sorted_top_posts = iter(sorted(islice(self.get_top_posts(), 9), key=lambda p: p.date_utc, reverse=True))
other_posts = self.get_posts()
next_top = next(sorted_top_posts, None)
next_other = next(other_posts, None)
@ -1510,6 +1582,20 @@ class Hashtag:
yield next_other
next_other = next(other_posts, None)
def get_posts_resumable(self) -> NodeIterator[Post]:
"""Get the recent posts of the hashtag in a resumable fashion.
:rtype: NodeIterator[Post]
.. versionadded:: 4.9"""
return NodeIterator(
self._context, "9b498c08113f1e09617a1703c22b2f32",
lambda d: d['data']['hashtag']['edge_hashtag_to_media'],
lambda n: Post(self._context, n),
{'tag_name': self.name},
f"https://www.instagram.com/explore/tags/{self.name}/"
)
class TopSearchResults:
"""