1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-07-07 11:42:38 +02:00

Handle pinned posts with --fast-update and --latest-stamps (#1586)

Instead of hard-coding a check for pinned items when storing the first
item, allows the creater of NodeIterator to specify how the comparison
should be made.

This also allows storing the newest post (whatever it's status is),
instead of simply storing the newst non-pinned post, which prevents
redownloads in case a user adds only a pinned post between runs.
This commit is contained in:
Eduardo Kalinowski 2022-07-04 15:02:41 -03:00 committed by GitHub
parent 8649a10113
commit da0dcb106c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 6 deletions

View File

@ -1010,7 +1010,10 @@ class Instaloader:
enabled=self.resume_prefix is not None
) as (is_resuming, start_index):
for number, post in enumerate(posts, start=start_index + 1):
if (max_count is not None and number > max_count) or not takewhile(post):
should_stop = not takewhile(post)
if should_stop and post.is_pinned:
continue
if (max_count is not None and number > max_count) or should_stop:
break
if displayed_count is not None:
self.context.log("[{0:{w}d}/{1:{w}d}] ".format(number, displayed_count,
@ -1042,7 +1045,7 @@ class Instaloader:
except PostChangedException:
post_changed = True
continue
if fast_update and not downloaded and not post_changed:
if fast_update and not downloaded and not post_changed and not post.is_pinned:
# disengage fast_update for first post when resuming
if not is_resuming or number > 0:
break

View File

@ -76,7 +76,8 @@ class NodeIterator(Iterator[T]):
node_wrapper: Callable[[Dict], T],
query_variables: Optional[Dict[str, Any]] = None,
query_referer: Optional[str] = None,
first_data: Optional[Dict[str, Any]] = None):
first_data: Optional[Dict[str, Any]] = None,
is_first: Optional[Callable[[T], bool]] = None):
self._context = context
self._query_hash = query_hash
self._edge_extractor = edge_extractor
@ -91,6 +92,7 @@ class NodeIterator(Iterator[T]):
else:
self._data = self._query()
self._first_node: Optional[Dict] = None
self._is_first = is_first
def _query(self, after: Optional[str] = None) -> Dict:
pagination_variables = {'first': NodeIterator._graphql_page_length} # type: Dict[str, Any]
@ -128,8 +130,12 @@ class NodeIterator(Iterator[T]):
self._page_index, self._total_index = page_index, total_index
raise
item = self._node_wrapper(node)
if self._first_node is None:
self._first_node = node
if self._is_first is not None:
if self._is_first(item):
self._first_node = node
else:
if self._first_node is None:
self._first_node = node
return item
if self._data['page_info']['has_next_page']:
query_response = self._query(self._data['page_info']['end_cursor'])
@ -168,7 +174,13 @@ class NodeIterator(Iterator[T]):
"""
If this iterator has produced any items, returns the first item produced.
It is possible to override what is considered the first item (for example, to consider the
newest item in case items are not in strict chronological order) by passing a callback
function as the `is_first` parameter when creating the class.
.. versionadded:: 4.8
.. versionchanged:: 4.9.2
What is considered the first item can be overridden.
"""
return self._node_wrapper(self._first_node) if self._first_node is not None else None

View File

@ -7,7 +7,7 @@ from contextlib import suppress
from datetime import datetime
from itertools import islice
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union
from unicodedata import normalize
from . import __version__
@ -644,6 +644,13 @@ class Post:
loc.get('lat'), loc.get('lng'))
return self._location
@property
def is_pinned(self) -> bool:
"""True if this Post has been pinned by at least one user.
.. versionadded: 4.9.2"""
return 'pinned_for_users' in self._node and bool(self._node['pinned_for_users'])
class Profile:
"""
@ -970,6 +977,7 @@ class Profile:
{'id': self.userid},
'https://www.instagram.com/{0}/'.format(self.username),
self._metadata('edge_owner_to_timeline_media'),
Profile._make_is_newest_checker()
)
def get_saved_posts(self) -> NodeIterator[Post]:
@ -1003,6 +1011,7 @@ class Profile:
lambda n: Post(self._context, n, self if int(n['owner']['id']) == self.userid else None),
{'id': self.userid},
'https://www.instagram.com/{0}/'.format(self.username),
is_first=Profile._make_is_newest_checker()
)
def get_igtv_posts(self) -> NodeIterator[Post]:
@ -1020,8 +1029,22 @@ class Profile:
{'id': self.userid},
'https://www.instagram.com/{0}/channel/'.format(self.username),
self._metadata('edge_felix_video_timeline'),
Profile._make_is_newest_checker()
)
@staticmethod
def _make_is_newest_checker() -> Callable[[Post], bool]:
newest_date: Optional[datetime] = None
def is_newest(p: Post) -> bool:
nonlocal newest_date
post_date = p.date_local
if newest_date is None or post_date > newest_date:
newest_date = post_date
return True
else:
return False
return is_newest
def get_followers(self) -> NodeIterator['Profile']:
"""
Retrieve list of followers of given profile.