1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-07-07 03:40:06 +02:00

Merge branch 'master' into upcoming/v4.10

This commit is contained in:
Alexander Graf 2022-10-05 20:02:51 +02:00
commit d09493e669
7 changed files with 110 additions and 30 deletions

View File

@ -18,7 +18,7 @@ jobs:
stale-issue-message: 'There has been no activity on this question for an extended period of time. This issue will be closed after further 14 days of inactivity.'
stale-issue-label: 'stale'
exempt-issue-labels: 'leave open'
days-before-stale: 15
days-before-stale: 21
days-before-close: -1
remove-stale-when-updated: false
- uses: actions/stale@v1
@ -30,5 +30,5 @@ jobs:
stale-pr-label: 'stale'
exempt-issue-label: 'leave open'
exempt-pr-label: 'leave open'
days-before-stale: 135
days-before-stale: 189
days-before-close: 14

View File

@ -127,6 +127,9 @@ Supporters
.. current-sponsors-start
| Instaloader is proudly sponsored by
| `@socialmethod <https://github.com/socialmethod>`__
See `Alex' GitHub Sponsors <https://github.com/sponsors/aandergr>`__ page for
how you can sponsor the development of Instaloader!

View File

@ -329,12 +329,13 @@ Miscellaneous Options
Read arguments from file `args.txt`, a shortcut to provide arguments from
file rather than command-line. This provides a convenient way to hide login
info from CLI, and can also be used to simplify management of long arguments.
You can provide more than one file at once, e.g.: ``+args1.txt +args2.txt``.
.. note::
Text file should separate arguments with line breaks.
args.txt example::
`args.txt` example::
--login=MYUSERNAME
--password=MYPASSWORD

View File

@ -1,7 +1,7 @@
"""Download pictures (or videos) along with their captions and other metadata from Instagram."""
__version__ = '4.9.1'
__version__ = '4.9.5'
try:

View File

@ -478,9 +478,8 @@ class Instaloader:
self.context.log(pcaption, end=' ', flush=True)
except UnicodeEncodeError:
self.context.log('txt', end=' ', flush=True)
with open(filename, 'wb') as text_file:
with BytesIO(bcaption) as bio:
shutil.copyfileobj(cast(IO, bio), text_file)
with open(filename, 'w', encoding='UTF-8') as fio:
fio.write(caption)
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
def save_location(self, filename: str, location: PostLocation, mtime: datetime) -> None:
@ -563,7 +562,7 @@ class Instaloader:
if latest_stamps is None:
self.download_profilepic(profile)
return
profile_pic_basename = profile.profile_pic_url.split('/')[-1].split('?')[0]
profile_pic_basename = profile.profile_pic_url_no_iphone.split('/')[-1].split('?')[0]
saved_basename = latest_stamps.get_profile_pic(profile.username)
if saved_basename == profile_pic_basename:
return
@ -896,13 +895,18 @@ class Instaloader:
filename_template = os.path.join(dirname, self.format_filename(item, target=target))
filename = self.__prepare_filename(filename_template, lambda: item.url)
downloaded = False
if not item.is_video or self.download_video_thumbnails is True:
video_url_fetch_failed = False
if item.is_video and self.download_videos is True:
video_url = item.video_url
if video_url:
filename = self.__prepare_filename(filename_template, lambda: str(video_url))
downloaded |= (not _already_downloaded(filename + ".mp4") and
self.download_pic(filename=filename, url=video_url, mtime=date_local))
else:
video_url_fetch_failed = True
if video_url_fetch_failed or not item.is_video or self.download_video_thumbnails is True:
downloaded = (not _already_downloaded(filename + ".jpg") and
self.download_pic(filename=filename, url=item.url, mtime=date_local))
if item.is_video and self.download_videos is True:
filename = self.__prepare_filename(filename_template, lambda: str(item.video_url))
downloaded |= (not _already_downloaded(filename + ".mp4") and
self.download_pic(filename=filename, url=item.video_url, mtime=date_local))
# Save caption if desired
metadata_string = _ArbitraryItemFormatter(item).format(self.storyitem_metadata_txt_pattern).strip()
if metadata_string:
@ -1027,7 +1031,10 @@ class Instaloader:
enabled=self.resume_prefix is not None
) as (is_resuming, start_index):
for number, post in enumerate(posts, start=start_index + 1):
if (max_count is not None and number > max_count) or not takewhile(post):
should_stop = not takewhile(post)
if should_stop and post.is_pinned:
continue
if (max_count is not None and number > max_count) or should_stop:
break
if displayed_count is not None:
self.context.log("[{0:{w}d}/{1:{w}d}] ".format(number, displayed_count,
@ -1059,7 +1066,7 @@ class Instaloader:
except PostChangedException:
post_changed = True
continue
if fast_update and not downloaded and not post_changed:
if fast_update and not downloaded and not post_changed and not post.is_pinned:
# disengage fast_update for first post when resuming
if not is_resuming or number > 0:
break

View File

@ -76,7 +76,8 @@ class NodeIterator(Iterator[T]):
node_wrapper: Callable[[Dict], T],
query_variables: Optional[Dict[str, Any]] = None,
query_referer: Optional[str] = None,
first_data: Optional[Dict[str, Any]] = None):
first_data: Optional[Dict[str, Any]] = None,
is_first: Optional[Callable[[T, Optional[T]], bool]] = None):
self._context = context
self._query_hash = query_hash
self._edge_extractor = edge_extractor
@ -91,6 +92,7 @@ class NodeIterator(Iterator[T]):
else:
self._data = self._query()
self._first_node: Optional[Dict] = None
self._is_first = is_first
def _query(self, after: Optional[str] = None) -> Dict:
pagination_variables: Dict[str, Any] = {'first': NodeIterator._graphql_page_length}
@ -128,8 +130,12 @@ class NodeIterator(Iterator[T]):
self._page_index, self._total_index = page_index, total_index
raise
item = self._node_wrapper(node)
if self._first_node is None:
self._first_node = node
if self._is_first is not None:
if self._is_first(item, self.first_item):
self._first_node = node
else:
if self._first_node is None:
self._first_node = node
return item
if self._data['page_info']['has_next_page']:
query_response = self._query(self._data['page_info']['end_cursor'])
@ -168,7 +174,13 @@ class NodeIterator(Iterator[T]):
"""
If this iterator has produced any items, returns the first item produced.
It is possible to override what is considered the first item (for example, to consider the
newest item in case items are not in strict chronological order) by passing a callback
function as the `is_first` parameter when creating the class.
.. versionadded:: 4.8
.. versionchanged:: 4.9.2
What is considered the first item can be overridden.
"""
return self._node_wrapper(self._first_node) if self._first_node is not None else None

View File

@ -6,7 +6,7 @@ from contextlib import suppress
from datetime import datetime
from itertools import islice
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union
from unicodedata import normalize
from . import __version__
@ -340,7 +340,7 @@ class Post:
url = re.sub(r'([?&])se=\d+&?', r'\1', orig_url).rstrip('&')
return url
except (InstaloaderException, KeyError, IndexError) as err:
self._context.error('{} Unable to fetch high quality image version of {}.'.format(err, self))
self._context.error(f"Unable to fetch high quality image version of {self}: {err}")
return self._node["display_url"] if "display_url" in self._node else self._node["display_src"]
@property
@ -404,8 +404,7 @@ class Post:
orig_url = carousel_media[idx]['image_versions2']['candidates'][0]['url']
display_url = re.sub(r'([?&])se=\d+&?', r'\1', orig_url).rstrip('&')
except (InstaloaderException, KeyError, IndexError) as err:
self._context.error('{} Unable to fetch high quality image version of {}.'.format(
err, self))
self._context.error(f"Unable to fetch high quality image version of {self}: {err}")
yield PostSidecarNode(is_video=is_video, display_url=display_url,
video_url=node['video_url'] if is_video else None)
@ -677,6 +676,13 @@ class Post:
loc.get('lat'), loc.get('lng'))
return self._location
@property
def is_pinned(self) -> bool:
"""True if this Post has been pinned by at least one user.
.. versionadded: 4.9.2"""
return 'pinned_for_users' in self._node and bool(self._node['pinned_for_users'])
class Profile:
"""
@ -1001,11 +1007,18 @@ class Profile:
try:
return self._iphone_struct['hd_profile_pic_url_info']['url']
except (InstaloaderException, KeyError) as err:
self._context.error('{} Unable to fetch high quality profile pic.'.format(err))
self._context.error(f"Unable to fetch high quality profile pic: {err}")
return self._metadata("profile_pic_url_hd")
else:
return self._metadata("profile_pic_url_hd")
@property
def profile_pic_url_no_iphone(self) -> str:
"""Return URL of lower-quality profile picture.
.. versionadded:: 4.9.3"""
return self._metadata("profile_pic_url_hd")
def get_profile_pic_url(self) -> str:
""".. deprecated:: 4.0.3
@ -1025,6 +1038,7 @@ class Profile:
{'id': self.userid},
'https://www.instagram.com/{0}/'.format(self.username),
self._metadata('edge_owner_to_timeline_media'),
Profile._make_is_newest_checker()
)
def get_saved_posts(self) -> NodeIterator[Post]:
@ -1058,6 +1072,7 @@ class Profile:
lambda n: Post(self._context, n, self if int(n['owner']['id']) == self.userid else None),
{'id': self.userid},
'https://www.instagram.com/{0}/'.format(self.username),
is_first=Profile._make_is_newest_checker()
)
def get_igtv_posts(self) -> NodeIterator[Post]:
@ -1075,8 +1090,13 @@ class Profile:
{'id': self.userid},
'https://www.instagram.com/{0}/channel/'.format(self.username),
self._metadata('edge_felix_video_timeline'),
Profile._make_is_newest_checker()
)
@staticmethod
def _make_is_newest_checker() -> Callable[[Post, Optional[Post]], bool]:
return lambda post, first: first is None or post.date_local > first.date_local
def get_followers(self) -> NodeIterator['Profile']:
"""
Retrieve list of followers of given profile.
@ -1204,8 +1224,14 @@ class StoryItem:
if not self._context.is_logged_in:
raise LoginRequiredException("--login required to access iPhone media info endpoint.")
if not self._iphone_struct_:
data = self._context.get_iphone_json(path='api/v1/media/{}/info/'.format(self.mediaid), params={})
self._iphone_struct_ = data['items'][0]
data = self._context.get_iphone_json(
path='api/v1/feed/reels_media/?reel_ids={}'.format(self.owner_id), params={}
)
self._iphone_struct_ = {}
for item in data['reels'][str(self.owner_id)]['items']:
if item['pk'] == self.mediaid:
self._iphone_struct_ = item
break
return self._iphone_struct_
@property
@ -1262,13 +1288,14 @@ class StoryItem:
@property
def url(self) -> str:
"""URL of the picture / video thumbnail of the StoryItem"""
if self.typename == "GraphStoryImage" and self._context.iphone_support and self._context.is_logged_in:
if self.typename in ["GraphStoryImage", "StoryImage"] and \
self._context.iphone_support and self._context.is_logged_in:
try:
orig_url = self._iphone_struct['image_versions2']['candidates'][0]['url']
url = re.sub(r'([?&])se=\d+&?', r'\1', orig_url).rstrip('&')
return url
except (InstaloaderException, KeyError, IndexError) as err:
self._context.error('{} Unable to fetch high quality image version of {}.'.format(err, self))
self._context.error(f"Unable to fetch high quality image version of {self}: {err}")
return self._node['display_resources'][-1]['src']
@property
@ -1390,6 +1417,7 @@ class Story:
self._node = node
self._unique_id: Optional[str] = None
self._owner_profile: Optional[Profile] = None
self._iphone_struct_: Optional[Dict[str, Any]] = None
def __repr__(self):
return '<Story by {} changed {:%Y-%m-%d_%H-%M-%S_UTC}>'.format(self.owner_username, self.latest_media_utc)
@ -1460,9 +1488,23 @@ class Story:
"""The story owner's ID."""
return self.owner_profile.userid
def _fetch_iphone_struct(self) -> None:
if self._context.iphone_support and self._context.is_logged_in and not self._iphone_struct_:
data = self._context.get_iphone_json(
path='api/v1/feed/reels_media/?reel_ids={}'.format(self.owner_id), params={}
)
self._iphone_struct_ = data['reels'][str(self.owner_id)]
def get_items(self) -> Iterator[StoryItem]:
"""Retrieve all items from a story."""
yield from (StoryItem(self._context, item, self.owner_profile) for item in reversed(self._node['items']))
self._fetch_iphone_struct()
for item in reversed(self._node['items']):
if self._iphone_struct_ is not None:
for iphone_struct_item in self._iphone_struct_['items']:
if iphone_struct_item['pk'] == int(item['id']):
item['iphone_struct'] = iphone_struct_item
break
yield StoryItem(self._context, item, self.owner_profile)
class Highlight(Story):
@ -1492,6 +1534,7 @@ class Highlight(Story):
super().__init__(context, node)
self._owner_profile = owner
self._items: Optional[List[Dict[str, Any]]] = None
self._iphone_struct_: Optional[Dict[str, Any]] = None
def __repr__(self):
return '<Highlight by {}: {}>'.format(self.owner_username, self.title)
@ -1530,6 +1573,13 @@ class Highlight(Story):
"highlight_reel_ids": [str(self.unique_id)],
"precomposed_overlay": False})['data']['reels_media'][0]['items']
def _fetch_iphone_struct(self) -> None:
if self._context.iphone_support and self._context.is_logged_in and not self._iphone_struct_:
data = self._context.get_iphone_json(
path='api/v1/feed/reels_media/?reel_ids=highlight:{}'.format(self.unique_id), params={}
)
self._iphone_struct_ = data['reels']['highlight:{}'.format(self.unique_id)]
@property
def itemcount(self) -> int:
"""Count of items associated with the :class:`Highlight` instance."""
@ -1540,8 +1590,15 @@ class Highlight(Story):
def get_items(self) -> Iterator[StoryItem]:
"""Retrieve all associated highlight items."""
self._fetch_items()
self._fetch_iphone_struct()
assert self._items is not None
yield from (StoryItem(self._context, item, self.owner_profile) for item in self._items)
for item in self._items:
if self._iphone_struct_ is not None:
for iphone_struct_item in self._iphone_struct_['items']:
if iphone_struct_item['pk'] == int(item['id']):
item['iphone_struct'] = iphone_struct_item
break
yield StoryItem(self._context, item, self.owner_profile)
class Hashtag:
@ -1595,7 +1652,7 @@ class Hashtag:
def _obtain_metadata(self):
if not self._has_full_metadata:
self._node = self._query({"__a": 1})
self._node = self._query({"__a": 1, "__d": "dis"})
self._has_full_metadata = True
def _asdict(self):