1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-11-20 01:02:30 +01:00

added proxy support

This commit is contained in:
saravananravi08 2024-09-26 21:24:03 +05:30
parent c5dacb5f6d
commit 3d53ed39b0
19 changed files with 6133 additions and 7 deletions

View File

@ -0,0 +1,38 @@
"""Download pictures (or videos) along with their captions and other metadata from Instagram."""
__version__ = '4.13.1'
try:
# pylint:disable=wrong-import-position
import win_unicode_console # type: ignore
except ImportError:
pass
else:
win_unicode_console.enable()
from .exceptions import *
from .instaloader import Instaloader as Instaloader
from .instaloadercontext import (InstaloaderContext as InstaloaderContext,
RateController as RateController)
from .lateststamps import LatestStamps as LatestStamps
from .nodeiterator import (NodeIterator as NodeIterator,
FrozenNodeIterator as FrozenNodeIterator,
resumable_iteration as resumable_iteration)
from .structures import (Hashtag as Hashtag,
Highlight as Highlight,
Post as Post,
PostSidecarNode as PostSidecarNode,
PostComment as PostComment,
PostCommentAnswer as PostCommentAnswer,
PostLocation as PostLocation,
Profile as Profile,
Story as Story,
StoryItem as StoryItem,
TopSearchResults as TopSearchResults,
TitlePic as TitlePic,
load_structure_from_file as load_structure_from_file,
save_structure_to_file as save_structure_to_file,
load_structure as load_structure,
get_json_structure as get_json_structure)

View File

@ -0,0 +1,597 @@
"""Download pictures (or videos) along with their captions and other metadata from Instagram."""
import ast
import datetime
import os
import re
import sys
from argparse import ArgumentParser, ArgumentTypeError, SUPPRESS
from enum import IntEnum
from typing import List, Optional
from . import (AbortDownloadException, BadCredentialsException, Instaloader, InstaloaderException,
InvalidArgumentException, LoginException, Post, Profile, ProfileNotExistsException, StoryItem,
TwoFactorAuthRequiredException, __version__, load_structure_from_file)
from .instaloader import (get_default_session_filename, get_default_stamps_filename)
from .instaloadercontext import default_user_agent
from .lateststamps import LatestStamps
try:
import browser_cookie3
bc3_library = True
except ImportError:
bc3_library = False
class ExitCode(IntEnum):
SUCCESS = 0
NON_FATAL_ERROR = 1
INIT_FAILURE = 2
LOGIN_FAILURE = 3
DOWNLOAD_ABORTED = 4
USER_ABORTED = 5
UNEXPECTED_ERROR = 99
def usage_string():
# NOTE: duplicated in README.rst and docs/index.rst
argv0 = os.path.basename(sys.argv[0])
argv0 = "instaloader" if argv0 == "__main__.py" else argv0
return """
{0} [--comments] [--geotags]
{2:{1}} [--stories] [--highlights] [--tagged] [--igtv]
{2:{1}} [--login YOUR-USERNAME] [--fast-update]
{2:{1}} profile | "#hashtag" | %%location_id | :stories | :feed | :saved
{0} --help""".format(argv0, len(argv0), '')
def http_status_code_list(code_list_str: str) -> List[int]:
codes = [int(s) for s in code_list_str.split(',')]
for code in codes:
if not 100 <= code <= 599:
raise ArgumentTypeError("Invalid HTTP status code: {}".format(code))
return codes
def filterstr_to_filterfunc(filter_str: str, item_type: type):
"""Takes an --post-filter=... or --storyitem-filter=... filter
specification and makes a filter_func Callable out of it."""
# The filter_str is parsed, then all names occurring in its AST are replaced by loads to post.<name>. A
# function Post->bool is returned which evaluates the filter with the post as 'post' in its namespace.
class TransformFilterAst(ast.NodeTransformer):
def visit_Name(self, node: ast.Name):
if not isinstance(node.ctx, ast.Load):
raise InvalidArgumentException("Invalid filter: Modifying variables ({}) not allowed.".format(node.id))
if node.id == "datetime":
return node
if not hasattr(item_type, node.id):
raise InvalidArgumentException("Invalid filter: {} not a {} attribute.".format(node.id,
item_type.__name__))
new_node = ast.Attribute(ast.copy_location(ast.Name('item', ast.Load()), node), node.id,
ast.copy_location(ast.Load(), node))
return ast.copy_location(new_node, node)
input_filename = '<command line filter parameter>'
compiled_filter = compile(TransformFilterAst().visit(ast.parse(filter_str, filename=input_filename, mode='eval')),
filename=input_filename, mode='eval')
def filterfunc(item) -> bool:
# pylint:disable=eval-used
return bool(eval(compiled_filter, {'item': item, 'datetime': datetime.datetime}))
return filterfunc
def get_cookies_from_instagram(domain, browser, cookie_file='', cookie_name=''):
supported_browsers = {
"brave": browser_cookie3.brave,
"chrome": browser_cookie3.chrome,
"chromium": browser_cookie3.chromium,
"edge": browser_cookie3.edge,
"firefox": browser_cookie3.firefox,
"librewolf": browser_cookie3.librewolf,
"opera": browser_cookie3.opera,
"opera_gx": browser_cookie3.opera_gx,
"safari": browser_cookie3.safari,
"vivaldi": browser_cookie3.vivaldi,
}
if browser not in supported_browsers:
raise InvalidArgumentException("Loading cookies from the specified browser failed\n"
"Supported browsers are Brave, Chrome, Chromium, Edge, Firefox, LibreWolf, "
"Opera, Opera_GX, Safari and Vivaldi")
cookies = {}
browser_cookies = list(supported_browsers[browser](cookie_file=cookie_file))
for cookie in browser_cookies:
if domain in cookie.domain:
cookies[cookie.name] = cookie.value
if cookies:
print(f"Cookies loaded successfully from {browser}")
else:
raise LoginException(f"No cookies found for Instagram in {browser}, "
f"Are you logged in succesfully in {browser}?")
if cookie_name:
return cookies.get(cookie_name, {})
else:
return cookies
def import_session(browser, instaloader, cookiefile):
cookie = get_cookies_from_instagram('instagram', browser, cookiefile)
if cookie is not None:
instaloader.context.update_cookies(cookie)
username = instaloader.test_login()
if not username:
raise LoginException(f"Not logged in. Are you logged in successfully in {browser}?")
instaloader.context.username = username
print(f"{username} has been successfully logged in.")
print(f"Next time use --login={username} to reuse the same session.")
def _main(instaloader: Instaloader, targetlist: List[str],
username: Optional[str] = None, password: Optional[str] = None,
sessionfile: Optional[str] = None,
download_profile_pic: bool = True, download_posts=True,
download_stories: bool = False,
download_highlights: bool = False,
download_tagged: bool = False,
download_igtv: bool = False,
fast_update: bool = False,
latest_stamps_file: Optional[str] = None,
max_count: Optional[int] = None, post_filter_str: Optional[str] = None,
storyitem_filter_str: Optional[str] = None,
browser: Optional[str] = None,
cookiefile: Optional[str] = None) -> ExitCode:
"""Download set of profiles, hashtags etc. and handle logging in and session files if desired."""
# Parse and generate filter function
post_filter = None
if post_filter_str is not None:
post_filter = filterstr_to_filterfunc(post_filter_str, Post)
instaloader.context.log('Only download posts with property "{}".'.format(post_filter_str))
storyitem_filter = None
if storyitem_filter_str is not None:
storyitem_filter = filterstr_to_filterfunc(storyitem_filter_str, StoryItem)
instaloader.context.log('Only download storyitems with property "{}".'.format(storyitem_filter_str))
latest_stamps = None
if latest_stamps_file is not None:
latest_stamps = LatestStamps(latest_stamps_file)
instaloader.context.log(f"Using latest stamps from {latest_stamps_file}.")
# load cookies if browser is not None
if browser and bc3_library:
import_session(browser.lower(), instaloader, cookiefile)
elif browser and not bc3_library:
raise InvalidArgumentException("browser_cookie3 library is needed to load cookies from browsers")
# Login, if desired
if username is not None:
if not re.match(r"^[A-Za-z0-9._]+$", username):
instaloader.context.error("Warning: Parameter \"{}\" for --login is not a valid username.".format(username))
try:
instaloader.load_session_from_file(username, sessionfile)
except FileNotFoundError as err:
if sessionfile is not None:
print(err, file=sys.stderr)
instaloader.context.log("Session file does not exist yet - Logging in.")
if not instaloader.context.is_logged_in or username != instaloader.test_login():
if password is not None:
try:
instaloader.login(username, password)
except TwoFactorAuthRequiredException:
# https://github.com/instaloader/instaloader/issues/1217
instaloader.context.error("Warning: There have been reports of 2FA currently not working. "
"Consider importing session cookies from your browser with "
"--load-cookies.")
while True:
try:
code = input("Enter 2FA verification code: ")
instaloader.two_factor_login(code)
break
except BadCredentialsException as err:
print(err, file=sys.stderr)
pass
else:
try:
instaloader.interactive_login(username)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
return ExitCode.USER_ABORTED
instaloader.context.log("Logged in as %s." % username)
# since 4.2.9 login is required for geotags
if instaloader.download_geotags and not instaloader.context.is_logged_in:
instaloader.context.error("Warning: Login is required to download geotags of posts.")
# Try block for KeyboardInterrupt (save session on ^C)
profiles = set()
anonymous_retry_profiles = set()
exit_code = ExitCode.SUCCESS
try:
# Generate set of profiles, already downloading non-profile targets
for target in targetlist:
if (target.endswith('.json') or target.endswith('.json.xz')) and os.path.isfile(target):
with instaloader.context.error_catcher(target):
structure = load_structure_from_file(instaloader.context, target)
if isinstance(structure, Post):
if post_filter is not None and not post_filter(structure):
instaloader.context.log("<{} ({}) skipped>".format(structure, target), flush=True)
continue
instaloader.context.log("Downloading {} ({})".format(structure, target))
instaloader.download_post(structure, os.path.dirname(target))
elif isinstance(structure, StoryItem):
if storyitem_filter is not None and not storyitem_filter(structure):
instaloader.context.log("<{} ({}) skipped>".format(structure, target), flush=True)
continue
instaloader.context.log("Attempting to download {} ({})".format(structure, target))
instaloader.download_storyitem(structure, os.path.dirname(target))
elif isinstance(structure, Profile):
raise InvalidArgumentException("Profile JSON are ignored. Pass \"{}\" to download that profile"
.format(structure.username))
else:
raise InvalidArgumentException("{} JSON file not supported as target"
.format(structure.__class__.__name__))
continue
# strip '/' characters to be more shell-autocompletion-friendly
target = target.rstrip('/')
with instaloader.context.error_catcher(target):
if re.match(r"^@[A-Za-z0-9._]+$", target):
instaloader.context.log("Retrieving followees of %s..." % target[1:])
profile = Profile.from_username(instaloader.context, target[1:])
for followee in profile.get_followees():
instaloader.save_profile_id(followee)
profiles.add(followee)
elif re.match(r"^#\w+$", target):
instaloader.download_hashtag(hashtag=target[1:], max_count=max_count, fast_update=fast_update,
post_filter=post_filter,
profile_pic=download_profile_pic, posts=download_posts)
elif re.match(r"^-[A-Za-z0-9-_]+$", target):
instaloader.download_post(Post.from_shortcode(instaloader.context, target[1:]), target)
elif re.match(r"^%[0-9]+$", target):
instaloader.download_location(location=target[1:], max_count=max_count, fast_update=fast_update,
post_filter=post_filter)
elif target == ":feed":
instaloader.download_feed_posts(fast_update=fast_update, max_count=max_count,
post_filter=post_filter)
elif target == ":stories":
instaloader.download_stories(fast_update=fast_update, storyitem_filter=storyitem_filter)
elif target == ":saved":
instaloader.download_saved_posts(fast_update=fast_update, max_count=max_count,
post_filter=post_filter)
elif re.match(r"^[A-Za-z0-9._]+$", target):
try:
profile = instaloader.check_profile_id(target, latest_stamps)
if instaloader.context.is_logged_in and profile.has_blocked_viewer:
if download_profile_pic or ((download_posts or download_tagged or download_igtv)
and not profile.is_private):
raise ProfileNotExistsException("{} blocked you; But we download her anonymously."
.format(target))
else:
instaloader.context.error("{} blocked you.".format(target))
else:
profiles.add(profile)
except ProfileNotExistsException as err:
# Not only our profile.has_blocked_viewer condition raises ProfileNotExistsException,
# check_profile_id() also does, since access to blocked profile may be responded with 404.
if instaloader.context.is_logged_in and (download_profile_pic or download_posts or
download_tagged or download_igtv):
instaloader.context.log(err)
instaloader.context.log("Trying again anonymously, helps in case you are just blocked.")
with instaloader.anonymous_copy() as anonymous_loader:
with instaloader.context.error_catcher():
anonymous_retry_profiles.add(anonymous_loader.check_profile_id(target,
latest_stamps))
instaloader.context.error("Warning: {} will be downloaded anonymously (\"{}\")."
.format(target, err))
else:
raise
else:
target_type = {
'#': 'hashtag',
'%': 'location',
'-': 'shortcode',
}.get(target[0], 'username')
raise ProfileNotExistsException('Invalid {} {}'.format(target_type, target))
if len(profiles) > 1:
instaloader.context.log("Downloading {} profiles: {}".format(len(profiles),
' '.join([p.username for p in profiles])))
if instaloader.context.iphone_support and profiles and (download_profile_pic or download_posts) and \
not instaloader.context.is_logged_in:
instaloader.context.log("Hint: Login to download higher-quality versions of pictures.")
instaloader.download_profiles(profiles,
download_profile_pic, download_posts, download_tagged, download_igtv,
download_highlights, download_stories,
fast_update, post_filter, storyitem_filter, latest_stamps=latest_stamps)
if anonymous_retry_profiles:
instaloader.context.log("Downloading anonymously: {}"
.format(' '.join([p.username for p in anonymous_retry_profiles])))
with instaloader.anonymous_copy() as anonymous_loader:
anonymous_loader.download_profiles(anonymous_retry_profiles,
download_profile_pic, download_posts, download_tagged, download_igtv,
fast_update=fast_update, post_filter=post_filter,
latest_stamps=latest_stamps)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
exit_code = ExitCode.USER_ABORTED
except AbortDownloadException as exc:
print("\nDownload aborted: {}.".format(exc), file=sys.stderr)
exit_code = ExitCode.DOWNLOAD_ABORTED
# Save session if it is useful
if instaloader.context.is_logged_in:
instaloader.save_session_to_file(sessionfile)
# User might be confused if Instaloader does nothing
if not targetlist:
if instaloader.context.is_logged_in:
# Instaloader did at least save a session file
instaloader.context.log("No targets were specified, thus nothing has been downloaded.")
else:
# Instaloader did not do anything
instaloader.context.log("usage:" + usage_string())
exit_code = ExitCode.INIT_FAILURE
return exit_code
def main():
parser = ArgumentParser(description=__doc__, add_help=False, usage=usage_string(),
epilog="The complete documentation can be found at "
"https://instaloader.github.io/.",
fromfile_prefix_chars='+')
g_targets = parser.add_argument_group("What to Download",
"Specify a list of targets. For each of these, Instaloader creates a folder "
"and downloads all posts. The following targets are supported:")
g_targets.add_argument('profile', nargs='*',
help="Download profile. If an already-downloaded profile has been renamed, Instaloader "
"automatically finds it by its unique ID and renames the folder likewise.")
g_targets.add_argument('_at_profile', nargs='*', metavar="@profile",
help="Download all followees of profile. Requires login. "
"Consider using :feed rather than @yourself.")
g_targets.add_argument('_hashtag', nargs='*', metavar='"#hashtag"', help="Download #hashtag.")
g_targets.add_argument('_location', nargs='*', metavar='%location_id',
help="Download %%location_id. Requires login.")
g_targets.add_argument('_feed', nargs='*', metavar=":feed",
help="Download pictures from your feed. Requires login.")
g_targets.add_argument('_stories', nargs='*', metavar=":stories",
help="Download the stories of your followees. Requires login.")
g_targets.add_argument('_saved', nargs='*', metavar=":saved",
help="Download the posts that you marked as saved. Requires login.")
g_targets.add_argument('_singlepost', nargs='*', metavar="-- -shortcode",
help="Download the post with the given shortcode")
g_targets.add_argument('_json', nargs='*', metavar="filename.json[.xz]",
help="Re-Download the given object.")
g_targets.add_argument('_fromfile', nargs='*', metavar="+args.txt",
help="Read targets (and options) from given textfile.")
g_post = parser.add_argument_group("What to Download of each Post")
g_prof = parser.add_argument_group("What to Download of each Profile")
g_prof.add_argument('-P', '--profile-pic-only', action='store_true',
help=SUPPRESS)
g_prof.add_argument('--no-posts', action='store_true',
help="Do not download regular posts.")
g_prof.add_argument('--no-profile-pic', action='store_true',
help='Do not download profile picture.')
g_post.add_argument('--slide', action='store',
help='Set what image/interval of a sidecar you want to download.')
g_post.add_argument('--no-pictures', action='store_true',
help='Do not download post pictures. Cannot be used together with --fast-update. '
'Implies --no-video-thumbnails, does not imply --no-videos.')
g_post.add_argument('-V', '--no-videos', action='store_true',
help='Do not download videos.')
g_post.add_argument('--no-video-thumbnails', action='store_true',
help='Do not download thumbnails of videos.')
g_post.add_argument('-G', '--geotags', action='store_true',
help='Download geotags when available. Geotags are stored as a '
'text file with the location\'s name and a Google Maps link. '
'This requires an additional request to the Instagram '
'server for each picture. Requires login.')
g_post.add_argument('-C', '--comments', action='store_true',
help='Download and update comments for each post. '
'This requires an additional request to the Instagram '
'server for each post, which is why it is disabled by default. Requires login.')
g_post.add_argument('--no-captions', action='store_true',
help='Do not create txt files.')
g_post.add_argument('--post-metadata-txt', action='append',
help='Template to write in txt file for each Post.')
g_post.add_argument('--storyitem-metadata-txt', action='append',
help='Template to write in txt file for each StoryItem.')
g_post.add_argument('--no-metadata-json', action='store_true',
help='Do not create a JSON file containing the metadata of each post.')
g_post.add_argument('--metadata-json', action='store_true',
help=SUPPRESS)
g_post.add_argument('--no-compress-json', action='store_true',
help='Do not xz compress JSON files, rather create pretty formatted JSONs.')
g_prof.add_argument('-s', '--stories', action='store_true',
help='Also download stories of each profile that is downloaded. Requires login.')
g_prof.add_argument('--stories-only', action='store_true',
help=SUPPRESS)
g_prof.add_argument('--highlights', action='store_true',
help='Also download highlights of each profile that is downloaded. Requires login.')
g_prof.add_argument('--tagged', action='store_true',
help='Also download posts where each profile is tagged.')
g_prof.add_argument('--igtv', action='store_true',
help='Also download IGTV videos.')
g_cond = parser.add_argument_group("Which Posts to Download")
g_cond.add_argument('-F', '--fast-update', action='store_true',
help='For each target, stop when encountering the first already-downloaded picture. This '
'flag is recommended when you use Instaloader to update your personal Instagram archive.')
g_cond.add_argument('--latest-stamps', nargs='?', metavar='STAMPSFILE', const=get_default_stamps_filename(),
help='Store the timestamps of latest media scraped for each profile. This allows updating '
'your personal Instagram archive even if you delete the destination directories. '
'If STAMPSFILE is not provided, defaults to ' + get_default_stamps_filename())
g_cond.add_argument('--post-filter', '--only-if', metavar='filter',
help='Expression that, if given, must evaluate to True for each post to be downloaded. Must be '
'a syntactically valid python expression. Variables are evaluated to '
'instaloader.Post attributes. Example: --post-filter=viewer_has_liked.')
g_cond.add_argument('--storyitem-filter', metavar='filter',
help='Expression that, if given, must evaluate to True for each storyitem to be downloaded. '
'Must be a syntactically valid python expression. Variables are evaluated to '
'instaloader.StoryItem attributes.')
g_cond.add_argument('-c', '--count',
help='Do not attempt to download more than COUNT posts. '
'Applies to #hashtag, %%location_id, :feed, and :saved.')
g_login = parser.add_argument_group('Login (Download Private Profiles)',
'Instaloader can login to Instagram. This allows downloading private profiles. '
'To login, pass the --login option. Your session cookie (not your password!) '
'will be saved to a local file to be reused next time you want Instaloader '
'to login. Instead of --login, the --load-cookies option can be used to '
'import a session from a browser.')
g_login.add_argument('-l', '--login', metavar='YOUR-USERNAME',
help='Login name (profile name) for your Instagram account.')
g_login.add_argument('-b', '--load-cookies', metavar='BROWSER-NAME',
help='Browser name to load cookies from Instagram')
g_login.add_argument('-B', '--cookiefile', metavar='COOKIE-FILE',
help='Cookie file of a profile to load cookies')
g_login.add_argument('-f', '--sessionfile',
help='Path for loading and storing session key file. '
'Defaults to ' + get_default_session_filename("<login_name>"))
g_login.add_argument('-p', '--password', metavar='YOUR-PASSWORD',
help='Password for your Instagram account. Without this option, '
'you\'ll be prompted for your password interactively if '
'there is not yet a valid session file.')
g_how = parser.add_argument_group('How to Download')
g_how.add_argument('--dirname-pattern',
help='Name of directory where to store posts. {profile} is replaced by the profile name, '
'{target} is replaced by the target you specified, i.e. either :feed, #hashtag or the '
'profile name. Defaults to \'{target}\'.')
g_how.add_argument('--filename-pattern',
help='Prefix of filenames for posts and stories, relative to the directory given with '
'--dirname-pattern. {profile} is replaced by the profile name,'
'{target} is replaced by the target you specified, i.e. either :feed'
'#hashtag or the profile name. Defaults to \'{date_utc}_UTC\'')
g_how.add_argument('--title-pattern',
help='Prefix of filenames for profile pics, hashtag profile pics, and highlight covers. '
'Defaults to \'{date_utc}_UTC_{typename}\' if --dirname-pattern contains \'{target}\' '
'or \'{dirname}\', or if --dirname-pattern is not specified. Otherwise defaults to '
'\'{target}_{date_utc}_UTC_{typename}\'.')
g_how.add_argument('--resume-prefix', metavar='PREFIX',
help='Prefix for filenames that are used to save the information to resume an interrupted '
'download.')
g_how.add_argument('--sanitize-paths', action='store_true',
help='Sanitize paths so that the resulting file and directory names are valid on both '
'Windows and Unix.')
g_how.add_argument('--no-resume', action='store_true',
help='Do not resume a previously-aborted download iteration, and do not save such information '
'when interrupted.')
g_how.add_argument('--use-aged-resume-files', action='store_true', help=SUPPRESS)
g_how.add_argument('--user-agent',
help='User Agent to use for HTTP requests. Defaults to \'{}\'.'.format(default_user_agent()))
g_how.add_argument('-S', '--no-sleep', action='store_true', help=SUPPRESS)
g_how.add_argument('--max-connection-attempts', metavar='N', type=int, default=3,
help='Maximum number of connection attempts until a request is aborted. Defaults to 3. If a '
'connection fails, it can be manually skipped by hitting CTRL+C. Set this to 0 to retry '
'infinitely.')
g_how.add_argument('--commit-mode', action='store_true', help=SUPPRESS)
g_how.add_argument('--request-timeout', metavar='N', type=float, default=300.0,
help='Seconds to wait before timing out a connection request. Defaults to 300.')
g_how.add_argument('--abort-on', type=http_status_code_list, metavar="STATUS_CODES",
help='Comma-separated list of HTTP status codes that cause Instaloader to abort, bypassing all '
'retry logic.')
g_how.add_argument('--no-iphone', action='store_true',
help='Do not attempt to download iPhone version of images and videos.')
g_misc = parser.add_argument_group('Miscellaneous Options')
g_misc.add_argument('-q', '--quiet', action='store_true',
help='Disable user interaction, i.e. do not print messages (except errors) and fail '
'if login credentials are needed but not given. This makes Instaloader suitable as a '
'cron job.')
g_misc.add_argument('-h', '--help', action='help', help='Show this help message and exit.')
g_misc.add_argument('--version', action='version', help='Show version number and exit.',
version=__version__)
args = parser.parse_args()
try:
if (args.login is None and args.load_cookies is None) and (args.stories or args.stories_only):
print("Login is required to download stories.", file=sys.stderr)
args.stories = False
if args.stories_only:
raise InvalidArgumentException()
if ':feed-all' in args.profile or ':feed-liked' in args.profile:
raise InvalidArgumentException(":feed-all and :feed-liked were removed. Use :feed as target and "
"eventually --post-filter=viewer_has_liked.")
post_metadata_txt_pattern = '\n'.join(args.post_metadata_txt) if args.post_metadata_txt else None
storyitem_metadata_txt_pattern = '\n'.join(args.storyitem_metadata_txt) if args.storyitem_metadata_txt else None
if args.no_captions:
if not (post_metadata_txt_pattern or storyitem_metadata_txt_pattern):
post_metadata_txt_pattern = ''
storyitem_metadata_txt_pattern = ''
else:
raise InvalidArgumentException("--no-captions and --post-metadata-txt or --storyitem-metadata-txt "
"given; That contradicts.")
if args.no_resume and args.resume_prefix:
raise InvalidArgumentException("--no-resume and --resume-prefix given; That contradicts.")
resume_prefix = (args.resume_prefix if args.resume_prefix else 'iterator') if not args.no_resume else None
if args.no_pictures and args.fast_update:
raise InvalidArgumentException('--no-pictures and --fast-update cannot be used together.')
if args.login and args.load_cookies:
raise InvalidArgumentException('--load-cookies and --login cannot be used together.')
# Determine what to download
download_profile_pic = not args.no_profile_pic or args.profile_pic_only
download_posts = not (args.no_posts or args.stories_only or args.profile_pic_only)
download_stories = args.stories or args.stories_only
loader = Instaloader(sleep=not args.no_sleep, quiet=args.quiet, user_agent=args.user_agent,
dirname_pattern=args.dirname_pattern, filename_pattern=args.filename_pattern,
download_pictures=not args.no_pictures,
download_videos=not args.no_videos, download_video_thumbnails=not args.no_video_thumbnails,
download_geotags=args.geotags,
download_comments=args.comments, save_metadata=not args.no_metadata_json,
compress_json=not args.no_compress_json,
post_metadata_txt_pattern=post_metadata_txt_pattern,
storyitem_metadata_txt_pattern=storyitem_metadata_txt_pattern,
max_connection_attempts=args.max_connection_attempts,
request_timeout=args.request_timeout,
resume_prefix=resume_prefix,
check_resume_bbd=not args.use_aged_resume_files,
slide=args.slide,
fatal_status_codes=args.abort_on,
iphone_support=not args.no_iphone,
title_pattern=args.title_pattern,
sanitize_paths=args.sanitize_paths)
exit_code = _main(loader,
args.profile,
username=args.login.lower() if args.login is not None else None,
password=args.password,
sessionfile=args.sessionfile,
download_profile_pic=download_profile_pic,
download_posts=download_posts,
download_stories=download_stories,
download_highlights=args.highlights,
download_tagged=args.tagged,
download_igtv=args.igtv,
fast_update=args.fast_update,
latest_stamps_file=args.latest_stamps,
max_count=int(args.count) if args.count is not None else None,
post_filter_str=args.post_filter,
storyitem_filter_str=args.storyitem_filter,
browser=args.load_cookies,
cookiefile=args.cookiefile)
loader.close()
if loader.has_stored_errors:
exit_code = ExitCode.NON_FATAL_ERROR
except InvalidArgumentException as err:
print(err, file=sys.stderr)
exit_code = ExitCode.INIT_FAILURE
except LoginException as err:
print(err, file=sys.stderr)
exit_code = ExitCode.LOGIN_FAILURE
except InstaloaderException as err:
print("Fatal error: %s" % err)
exit_code = ExitCode.UNEXPECTED_ERROR
sys.exit(exit_code)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,84 @@
class InstaloaderException(Exception):
"""Base exception for this script.
:note: This exception should not be raised directly."""
pass
class QueryReturnedBadRequestException(InstaloaderException):
pass
class QueryReturnedForbiddenException(InstaloaderException):
pass
class ProfileNotExistsException(InstaloaderException):
pass
class ProfileHasNoPicsException(InstaloaderException):
"""
.. deprecated:: 4.2.2
Not raised anymore.
"""
pass
class PrivateProfileNotFollowedException(InstaloaderException):
pass
class LoginRequiredException(InstaloaderException):
pass
class LoginException(InstaloaderException):
pass
class TwoFactorAuthRequiredException(LoginException):
pass
class InvalidArgumentException(InstaloaderException):
pass
class BadResponseException(InstaloaderException):
pass
class BadCredentialsException(LoginException):
pass
class ConnectionException(InstaloaderException):
pass
class PostChangedException(InstaloaderException):
""".. versionadded:: 4.2.2"""
pass
class QueryReturnedNotFoundException(ConnectionException):
pass
class TooManyRequestsException(ConnectionException):
pass
class IPhoneSupportDisabledException(InstaloaderException):
pass
class AbortDownloadException(Exception):
"""
Exception that is not catched in the error catchers inside the download loop and so aborts the
download loop.
This exception is not a subclass of ``InstaloaderException``.
.. versionadded:: 4.7
"""
pass

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,875 @@
import json
import os
import pickle
import random
import shutil
import sys
import textwrap
import time
import urllib.parse
import uuid
from contextlib import contextmanager, suppress
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
import requests
import requests.utils
from .exceptions import *
def copy_session(session: requests.Session, request_timeout: Optional[float] = None) -> requests.Session:
"""Duplicates a requests.Session."""
new = requests.Session()
new.cookies = requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies))
new.headers = session.headers.copy() # type: ignore
# Override default timeout behavior.
# Need to silence mypy bug for this. See: https://github.com/python/mypy/issues/2427
new.request = partial(new.request, timeout=request_timeout) # type: ignore
return new
def default_user_agent() -> str:
return ('Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36')
def default_iphone_headers() -> Dict[str, Any]:
return {'User-Agent': 'Instagram 273.0.0.16.70 (iPad13,8; iOS 16_3; en_US; en-US; ' \
'scale=2.00; 2048x2732; 452417278) AppleWebKit/420+',
'x-ads-opt-out': '1',
'x-bloks-is-panorama-enabled': 'true',
'x-bloks-version-id': '01507c21540f73e2216b6f62a11a5b5e51aa85491b72475c080da35b1228ddd6',
'x-fb-client-ip': 'True',
'x-fb-connection-type': 'wifi',
'x-fb-http-engine': 'Liger',
'x-fb-server-cluster': 'True',
'x-fb': '1',
'x-ig-abr-connection-speed-kbps': '2',
'x-ig-app-id': '124024574287414',
'x-ig-app-locale': 'en-US',
'x-ig-app-startup-country': 'US',
'x-ig-bandwidth-speed-kbps': '0.000',
'x-ig-capabilities': '36r/F/8=',
'x-ig-connection-speed': '{}kbps'.format(random.randint(1000, 20000)),
'x-ig-connection-type': 'WiFi',
'x-ig-device-locale': 'en-US',
'x-ig-mapped-locale': 'en-US',
'x-ig-timezone-offset': str((datetime.now().astimezone().utcoffset() or timedelta(seconds=0)).seconds),
'x-ig-www-claim': '0',
'x-pigeon-session-id': str(uuid.uuid4()),
'x-tigon-is-retry': 'False',
'x-whatsapp': '0'}
class InstaloaderContext:
"""Class providing methods for (error) logging and low-level communication with Instagram.
It is not thought to be instantiated directly, rather :class:`Instaloader` instances maintain a context
object.
For logging, it provides :meth:`log`, :meth:`error`, :meth:`error_catcher`.
It provides low-level communication routines :meth:`get_json`, :meth:`graphql_query`, :meth:`graphql_node_list`,
:meth:`get_and_write_raw` and implements mechanisms for rate controlling and error handling.
Further, it provides methods for logging in and general session handles, which are used by that routines in
class :class:`Instaloader`.
"""
def __init__(self,proxy:Optional[dict], sleep: bool = True, quiet: bool = False, user_agent: Optional[str] = None,
max_connection_attempts: int = 3, request_timeout: float = 300.0,
rate_controller: Optional[Callable[["InstaloaderContext"], "RateController"]] = None,
fatal_status_codes: Optional[List[int]] = None,
iphone_support: bool = True):
self.user_agent = user_agent if user_agent is not None else default_user_agent()
self.request_timeout = request_timeout
self._session = self.get_anonymous_session(proxy)
self.username = None
self.user_id = None
self.sleep = sleep
self.quiet = quiet
self.max_connection_attempts = max_connection_attempts
self._graphql_page_length = 50
self.two_factor_auth_pending = None
self.iphone_support = iphone_support
self.iphone_headers = default_iphone_headers()
self.proxy = proxy
# error log, filled with error() and printed at the end of Instaloader.main()
self.error_log: List[str] = []
self._rate_controller = rate_controller(self) if rate_controller is not None else RateController(self)
# Can be set to True for testing, disables supression of InstaloaderContext._error_catcher
self.raise_all_errors = False
# HTTP status codes that should cause an AbortDownloadException
self.fatal_status_codes = fatal_status_codes or []
# Cache profile from id (mapping from id to Profile)
self.profile_id_cache: Dict[int, Any] = dict()
@contextmanager
def anonymous_copy(self):
session = self._session
username = self.username
user_id = self.user_id
iphone_headers = self.iphone_headers
self._session = self.get_anonymous_session(self.proxy)
self.username = None
self.user_id = None
self.iphone_headers = default_iphone_headers()
try:
yield self
finally:
self._session.close()
self.username = username
self._session = session
self.user_id = user_id
self.iphone_headers = iphone_headers
@property
def is_logged_in(self) -> bool:
"""True, if this Instaloader instance is logged in."""
return bool(self.username)
def log(self, *msg, sep='', end='\n', flush=False):
"""Log a message to stdout that can be suppressed with --quiet."""
if not self.quiet:
print(*msg, sep=sep, end=end, flush=flush)
def error(self, msg, repeat_at_end=True):
"""Log a non-fatal error message to stderr, which is repeated at program termination.
:param msg: Message to be printed.
:param repeat_at_end: Set to false if the message should be printed, but not repeated at program termination."""
print(msg, file=sys.stderr)
if repeat_at_end:
self.error_log.append(msg)
@property
def has_stored_errors(self) -> bool:
"""Returns whether any error has been reported and stored to be repeated at program termination.
.. versionadded: 4.12"""
return bool(self.error_log)
def close(self):
"""Print error log and close session"""
if self.error_log and not self.quiet:
print("\nErrors or warnings occurred:", file=sys.stderr)
for err in self.error_log:
print(err, file=sys.stderr)
self._session.close()
@contextmanager
def error_catcher(self, extra_info: Optional[str] = None):
"""
Context manager to catch, print and record InstaloaderExceptions.
:param extra_info: String to prefix error message with."""
try:
yield
except InstaloaderException as err:
if extra_info:
self.error('{}: {}'.format(extra_info, err))
else:
self.error('{}'.format(err))
if self.raise_all_errors:
raise
def _default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]:
"""Returns default HTTP header we use for requests."""
header = {'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive',
'Content-Length': '0',
'Host': 'www.instagram.com',
'Origin': 'https://www.instagram.com',
'Referer': 'https://www.instagram.com/',
'User-Agent': self.user_agent,
'X-Instagram-AJAX': '1',
'X-Requested-With': 'XMLHttpRequest'}
if empty_session_only:
del header['Host']
del header['Origin']
del header['X-Instagram-AJAX']
del header['X-Requested-With']
return header
def get_anonymous_session(self,proxy) -> requests.Session:
"""Returns our default anonymous requests.Session object."""
session = requests.Session()
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
session.proxies.update(proxy)
session.headers.update(self._default_http_header(empty_session_only=True))
# Override default timeout behavior.
# Need to silence mypy bug for this. See: https://github.com/python/mypy/issues/2427
session.request = partial(session.request, timeout=self.request_timeout) # type: ignore
return session
def save_session(self):
"""Not meant to be used directly, use :meth:`Instaloader.save_session`."""
return requests.utils.dict_from_cookiejar(self._session.cookies)
def update_cookies(self, cookie):
""".. versionadded:: 4.11"""
self._session.cookies.update(cookie)
def load_session(self, username, sessiondata):
"""Not meant to be used directly, use :meth:`Instaloader.load_session`."""
session = requests.Session()
session.cookies = requests.utils.cookiejar_from_dict(sessiondata)
session.headers.update(self._default_http_header())
session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']})
# Override default timeout behavior.
# Need to silence mypy bug for this. See: https://github.com/python/mypy/issues/2427
session.request = partial(session.request, timeout=self.request_timeout) # type: ignore
self._session = session
self.username = username
def save_session_to_file(self, sessionfile):
"""Not meant to be used directly, use :meth:`Instaloader.save_session_to_file`."""
pickle.dump(self.save_session(), sessionfile)
def load_session_from_file(self, username, sessionfile):
"""Not meant to be used directly, use :meth:`Instaloader.load_session_from_file`."""
self.load_session(username, pickle.load(sessionfile))
def test_login(self) -> Optional[str]:
"""Not meant to be used directly, use :meth:`Instaloader.test_login`."""
try:
data = self.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {})
return data["data"]["user"]["username"] if data["data"]["user"] is not None else None
except (AbortDownloadException, ConnectionException) as err:
self.error(f"Error when checking if logged in: {err}")
return None
def login(self, user, passwd):
"""Not meant to be used directly, use :meth:`Instaloader.login`.
:raises BadCredentialsException: If the provided password is wrong.
:raises TwoFactorAuthRequiredException: First step of 2FA login done, now call
:meth:`Instaloader.two_factor_login`.
:raises LoginException: An error happened during login (for example, and invalid response).
Or if the provided username does not exist.
.. versionchanged:: 4.12
Raises LoginException instead of ConnectionException when an error happens.
Raises LoginException instead of InvalidArgumentException when the username does not exist.
"""
# pylint:disable=import-outside-toplevel
import http.client
# pylint:disable=protected-access
http.client._MAXHEADERS = 200
session = requests.Session()
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'ig_cb': '1', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
session.headers.update(self._default_http_header())
# Override default timeout behavior.
# Need to silence mypy bug for this. See: https://github.com/python/mypy/issues/2427
session.request = partial(session.request, timeout=self.request_timeout) # type: ignore
# Make a request to Instagram's root URL, which will set the session's csrftoken cookie
# Not using self.get_json() here, because we need to access the cookie
session.get('https://www.instagram.com/')
# Add session's csrftoken cookie to session headers
csrf_token = session.cookies.get_dict()['csrftoken']
session.headers.update({'X-CSRFToken': csrf_token})
self.do_sleep()
# Workaround credits to pgrimaud.
# See: https://github.com/pgrimaud/instagram-user-feed/commit/96ad4cf54d1ad331b337f325c73e664999a6d066
enc_password = '#PWD_INSTAGRAM_BROWSER:0:{}:{}'.format(int(datetime.now().timestamp()), passwd)
login = session.post('https://www.instagram.com/api/v1/web/accounts/login/ajax/',
data={'enc_password': enc_password, 'username': user}, allow_redirects=True)
try:
resp_json = login.json()
except json.decoder.JSONDecodeError as err:
raise LoginException(
"Login error: JSON decode fail, {} - {}.".format(login.status_code, login.reason)
) from err
if resp_json.get('two_factor_required'):
two_factor_session = copy_session(session, self.request_timeout)
two_factor_session.headers.update({'X-CSRFToken': csrf_token})
two_factor_session.cookies.update({'csrftoken': csrf_token})
self.two_factor_auth_pending = (two_factor_session,
user,
resp_json['two_factor_info']['two_factor_identifier'])
raise TwoFactorAuthRequiredException("Login error: two-factor authentication required.")
if resp_json.get('checkpoint_url'):
raise LoginException(
f"Login: Checkpoint required. Point your browser to {resp_json.get('checkpoint_url')} - "
f"follow the instructions, then retry."
)
if resp_json['status'] != 'ok':
if 'message' in resp_json:
raise LoginException("Login error: \"{}\" status, message \"{}\".".format(resp_json['status'],
resp_json['message']))
else:
raise LoginException("Login error: \"{}\" status.".format(resp_json['status']))
if 'authenticated' not in resp_json:
# Issue #472
if 'message' in resp_json:
raise LoginException("Login error: Unexpected response, \"{}\".".format(resp_json['message']))
else:
raise LoginException("Login error: Unexpected response, this might indicate a blocked IP.")
if not resp_json['authenticated']:
if resp_json['user']:
# '{"authenticated": false, "user": true, "status": "ok"}'
raise BadCredentialsException('Login error: Wrong password.')
else:
# '{"authenticated": false, "user": false, "status": "ok"}'
# Raise LoginException rather than BadCredentialException, because BadCredentialException
# triggers re-asking of password in Instaloader.interactive_login(), which makes no sense if the
# username is invalid.
raise LoginException('Login error: User {} does not exist.'.format(user))
# '{"authenticated": true, "user": true, "userId": ..., "oneTapPrompt": false, "status": "ok"}'
session.headers.update({'X-CSRFToken': login.cookies['csrftoken']})
self._session = session
self.username = user
self.user_id = resp_json['userId']
def two_factor_login(self, two_factor_code):
"""Second step of login if 2FA is enabled.
Not meant to be used directly, use :meth:`Instaloader.two_factor_login`.
:raises InvalidArgumentException: No two-factor authentication pending.
:raises BadCredentialsException: 2FA verification code invalid.
.. versionadded:: 4.2"""
if not self.two_factor_auth_pending:
raise InvalidArgumentException("No two-factor authentication pending.")
(session, user, two_factor_id) = self.two_factor_auth_pending
login = session.post('https://www.instagram.com/accounts/login/ajax/two_factor/',
data={'username': user, 'verificationCode': two_factor_code, 'identifier': two_factor_id},
allow_redirects=True)
resp_json = login.json()
if resp_json['status'] != 'ok':
if 'message' in resp_json:
raise BadCredentialsException("2FA error: {}".format(resp_json['message']))
else:
raise BadCredentialsException("2FA error: \"{}\" status.".format(resp_json['status']))
session.headers.update({'X-CSRFToken': login.cookies['csrftoken']})
self._session = session
self.username = user
self.two_factor_auth_pending = None
def do_sleep(self):
"""Sleep a short time if self.sleep is set. Called before each request to instagram.com."""
if self.sleep:
time.sleep(min(random.expovariate(0.6), 15.0))
@staticmethod
def _response_error(resp: requests.Response) -> str:
extra_from_json: Optional[str] = None
with suppress(json.decoder.JSONDecodeError):
resp_json = resp.json()
if "status" in resp_json:
extra_from_json = (
f"\"{resp_json['status']}\" status, message \"{resp_json['message']}\""
if "message" in resp_json
else f"\"{resp_json['status']}\" status"
)
return (
f"{resp.status_code} {resp.reason}"
f"{f' - {extra_from_json}' if extra_from_json is not None else ''}"
f" when accessing {resp.url}"
)
def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram.com',
session: Optional[requests.Session] = None, _attempt=1,
response_headers: Optional[Dict[str, Any]] = None,
use_post: bool = False) -> Dict[str, Any]:
"""JSON request to Instagram.
:param path: URL, relative to the given domain which defaults to www.instagram.com/
:param params: request parameters
:param host: Domain part of the URL from where to download the requested JSON; defaults to www.instagram.com
:param session: Session to use, or None to use self.session
:param use_post: Use POST instead of GET to make the request
:return: Decoded response dictionary
:raises QueryReturnedBadRequestException: When the server responds with a 400.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises ConnectionException: When query repeatedly failed.
.. versionchanged:: 4.13
Added `use_post` parameter.
"""
is_graphql_query = 'query_hash' in params and 'graphql/query' in path
is_doc_id_query = 'doc_id' in params and 'graphql/query' in path
is_iphone_query = host == 'i.instagram.com'
is_other_query = not is_graphql_query and not is_doc_id_query and host == "www.instagram.com"
sess = session if session else self._session
try:
self.do_sleep()
if is_graphql_query:
self._rate_controller.wait_before_query(params['query_hash'])
if is_doc_id_query:
self._rate_controller.wait_before_query(params['doc_id'])
if is_iphone_query:
self._rate_controller.wait_before_query('iphone')
if is_other_query:
self._rate_controller.wait_before_query('other')
if use_post:
resp = sess.post('https://{0}/{1}'.format(host, path), data=params, allow_redirects=False)
else:
resp = sess.get('https://{0}/{1}'.format(host, path), params=params, allow_redirects=False)
if resp.status_code in self.fatal_status_codes:
redirect = " redirect to {}".format(resp.headers['location']) if 'location' in resp.headers else ""
body = ""
if resp.headers['Content-Type'].startswith('application/json'):
body = ': ' + resp.text[:500] + ('' if len(resp.text) > 501 else '')
raise AbortDownloadException("Query to https://{}/{} responded with \"{} {}\"{}{}".format(
host, path, resp.status_code, resp.reason, redirect, body
))
while resp.is_redirect:
redirect_url = resp.headers['location']
self.log('\nHTTP redirect from https://{0}/{1} to {2}'.format(host, path, redirect_url))
if (redirect_url.startswith('https://www.instagram.com/accounts/login') or
redirect_url.startswith('https://i.instagram.com/accounts/login')):
if not self.is_logged_in:
raise LoginRequiredException("Redirected to login page. Use --login or --load-cookies.")
raise AbortDownloadException("Redirected to login page. You've been logged out, please wait " +
"some time, recreate the session and try again")
if redirect_url.startswith('https://{}/'.format(host)):
resp = sess.get(redirect_url if redirect_url.endswith('/') else redirect_url + '/',
params=params, allow_redirects=False)
else:
break
if response_headers is not None:
response_headers.clear()
response_headers.update(resp.headers)
if resp.status_code == 400:
raise QueryReturnedBadRequestException(self._response_error(resp))
if resp.status_code == 404:
raise QueryReturnedNotFoundException(self._response_error(resp))
if resp.status_code == 429:
raise TooManyRequestsException(self._response_error(resp))
if resp.status_code != 200:
raise ConnectionException(self._response_error(resp))
else:
resp_json = resp.json()
if 'status' in resp_json and resp_json['status'] != "ok":
raise ConnectionException(self._response_error(resp))
return resp_json
except (ConnectionException, json.decoder.JSONDecodeError, requests.exceptions.RequestException) as err:
error_string = "JSON Query to {}: {}".format(path, err)
if _attempt == self.max_connection_attempts:
if isinstance(err, QueryReturnedNotFoundException):
raise QueryReturnedNotFoundException(error_string) from err
else:
raise ConnectionException(error_string) from err
self.error(error_string + " [retrying; skip with ^C]", repeat_at_end=False)
try:
if isinstance(err, TooManyRequestsException):
if is_graphql_query:
self._rate_controller.handle_429(params['query_hash'])
if is_doc_id_query:
self._rate_controller.handle_429(params['doc_id'])
if is_iphone_query:
self._rate_controller.handle_429('iphone')
if is_other_query:
self._rate_controller.handle_429('other')
return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1,
response_headers=response_headers)
except KeyboardInterrupt:
self.error("[skipped by user]", repeat_at_end=False)
raise ConnectionException(error_string) from err
def graphql_query(self, query_hash: str, variables: Dict[str, Any],
referer: Optional[str] = None) -> Dict[str, Any]:
"""
Do a GraphQL Query.
:param query_hash: Query identifying hash.
:param variables: Variables for the Query.
:param referer: HTTP Referer, or None.
:return: The server's response dictionary.
.. versionchanged:: 4.13.1
Removed the `rhx_gis` parameter.
"""
with copy_session(self._session, self.request_timeout) as tmpsession:
tmpsession.headers.update(self._default_http_header(empty_session_only=True))
del tmpsession.headers['Connection']
del tmpsession.headers['Content-Length']
tmpsession.headers['authority'] = 'www.instagram.com'
tmpsession.headers['scheme'] = 'https'
tmpsession.headers['accept'] = '*/*'
if referer is not None:
tmpsession.headers['referer'] = urllib.parse.quote(referer)
variables_json = json.dumps(variables, separators=(',', ':'))
resp_json = self.get_json('graphql/query',
params={'query_hash': query_hash,
'variables': variables_json},
session=tmpsession)
if 'status' not in resp_json:
self.error("GraphQL response did not contain a \"status\" field.")
return resp_json
def doc_id_graphql_query(self, doc_id: str, variables: Dict[str, Any],
referer: Optional[str] = None) -> Dict[str, Any]:
"""
Do a doc_id-based GraphQL Query using method POST.
.. versionadded:: 4.13
:param doc_id: doc_id for the query.
:param variables: Variables for the Query.
:param referer: HTTP Referer, or None.
:return: The server's response dictionary.
"""
with copy_session(self._session, self.request_timeout) as tmpsession:
tmpsession.headers.update(self._default_http_header(empty_session_only=True))
del tmpsession.headers['Connection']
del tmpsession.headers['Content-Length']
tmpsession.headers['authority'] = 'www.instagram.com'
tmpsession.headers['scheme'] = 'https'
tmpsession.headers['accept'] = '*/*'
if referer is not None:
tmpsession.headers['referer'] = urllib.parse.quote(referer)
variables_json = json.dumps(variables, separators=(',', ':'))
resp_json = self.get_json('graphql/query',
params={'variables': variables_json,
'doc_id': doc_id,
'server_timestamps': 'true'},
session=tmpsession,
use_post=True)
if 'status' not in resp_json:
self.error("GraphQL response did not contain a \"status\" field.")
return resp_json
def graphql_node_list(self, query_hash: str, query_variables: Dict[str, Any],
query_referer: Optional[str],
edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
_rhx_gis: Optional[str] = None,
first_data: Optional[Dict[str, Any]] = None) -> Iterator[Dict[str, Any]]:
"""
Retrieve a list of GraphQL nodes.
.. deprecated:: 4.5
Use :class:`NodeIterator` instead, which provides more functionality.
"""
def _query():
query_variables['first'] = self._graphql_page_length
try:
return edge_extractor(self.graphql_query(query_hash, query_variables, query_referer))
except QueryReturnedBadRequestException:
new_page_length = int(self._graphql_page_length / 2)
if new_page_length >= 12:
self._graphql_page_length = new_page_length
self.error("HTTP Error 400 (Bad Request) on GraphQL Query. Retrying with shorter page length.",
repeat_at_end=False)
return _query()
else:
raise
if first_data:
data = first_data
else:
data = _query()
yield from (edge['node'] for edge in data['edges'])
while data['page_info']['has_next_page']:
query_variables['after'] = data['page_info']['end_cursor']
data = _query()
yield from (edge['node'] for edge in data['edges'])
def get_iphone_json(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""JSON request to ``i.instagram.com``.
:param path: URL, relative to ``i.instagram.com/``
:param params: GET parameters
:return: Decoded response dictionary
:raises QueryReturnedBadRequestException: When the server responds with a 400.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises ConnectionException: When query repeatedly failed.
.. versionadded:: 4.2.1"""
with copy_session(self._session, self.request_timeout) as tempsession:
# Set headers to simulate an API request from iPad
tempsession.headers['ig-intended-user-id'] = str(self.user_id)
tempsession.headers['x-pigeon-rawclienttime'] = '{:.6f}'.format(time.time())
# Add headers obtained from previous iPad request
tempsession.headers.update(self.iphone_headers)
# Extract key information from cookies if we haven't got it already from a previous request
header_cookies_mapping = {'x-mid': 'mid',
'ig-u-ds-user-id': 'ds_user_id',
'x-ig-device-id': 'ig_did',
'x-ig-family-device-id': 'ig_did',
'family_device_id': 'ig_did'}
# Map the cookie value to the matching HTTP request header
cookies = tempsession.cookies.get_dict().copy()
for key, value in header_cookies_mapping.items():
if value in cookies:
if key not in tempsession.headers:
tempsession.headers[key] = cookies[value]
else:
# Remove the cookie value if it's already specified as a header
tempsession.cookies.pop(value, None)
# Edge case for ig-u-rur header due to special string encoding in cookie
if 'rur' in cookies:
if 'ig-u-rur' not in tempsession.headers:
tempsession.headers['ig-u-rur'] = cookies['rur'].strip('\"').encode('utf-8') \
.decode('unicode_escape')
else:
tempsession.cookies.pop('rur', None)
# Remove headers specific to Desktop version
for header in ['Host', 'Origin', 'X-Instagram-AJAX', 'X-Requested-With', 'Referer']:
tempsession.headers.pop(header, None)
# No need for cookies if we have a bearer token
if 'authorization' in tempsession.headers:
tempsession.cookies.clear()
response_headers = dict() # type: Dict[str, Any]
response = self.get_json(path, params, 'i.instagram.com', tempsession, response_headers=response_headers)
# Extract the ig-set-* headers and use them in the next request
for key, value in response_headers.items():
if key.startswith('ig-set-'):
self.iphone_headers[key.replace('ig-set-', '')] = value
elif key.startswith('x-ig-set-'):
self.iphone_headers[key.replace('x-ig-set-', 'x-ig-')] = value
return response
def write_raw(self, resp: Union[bytes, requests.Response], filename: str) -> None:
"""Write raw response data into a file.
.. versionadded:: 4.2.1"""
self.log(filename, end=' ', flush=True)
with open(filename + '.temp', 'wb') as file:
if isinstance(resp, requests.Response):
shutil.copyfileobj(resp.raw, file)
else:
file.write(resp)
os.replace(filename + '.temp', filename)
def get_raw(self, url: str, _attempt=1) -> requests.Response:
"""Downloads a file anonymously.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises QueryReturnedForbiddenException: When the server responds with a 403.
:raises ConnectionException: When download failed.
.. versionadded:: 4.2.1"""
with self.get_anonymous_session(self.proxy) as anonymous_session:
resp = anonymous_session.get(url, stream=True)
if resp.status_code == 200:
resp.raw.decode_content = True
return resp
else:
if resp.status_code == 403:
# suspected invalid URL signature
raise QueryReturnedForbiddenException(self._response_error(resp))
if resp.status_code == 404:
# 404 not worth retrying.
raise QueryReturnedNotFoundException(self._response_error(resp))
raise ConnectionException(self._response_error(resp))
def get_and_write_raw(self, url: str, filename: str) -> None:
"""Downloads and writes anonymously-requested raw data into a file.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises QueryReturnedForbiddenException: When the server responds with a 403.
:raises ConnectionException: When download repeatedly failed."""
self.write_raw(self.get_raw(url), filename)
def head(self, url: str, allow_redirects: bool = False) -> requests.Response:
"""HEAD a URL anonymously.
:raises QueryReturnedNotFoundException: When the server responds with a 404.
:raises QueryReturnedForbiddenException: When the server responds with a 403.
:raises ConnectionException: When request failed.
.. versionadded:: 4.7.6
"""
with self.get_anonymous_session(self.proxy) as anonymous_session:
resp = anonymous_session.head(url, allow_redirects=allow_redirects)
if resp.status_code == 200:
return resp
else:
if resp.status_code == 403:
# suspected invalid URL signature
raise QueryReturnedForbiddenException(self._response_error(resp))
if resp.status_code == 404:
# 404 not worth retrying.
raise QueryReturnedNotFoundException(self._response_error(resp))
raise ConnectionException(self._response_error(resp))
class RateController:
"""
Class providing request tracking and rate controlling to stay within rate limits.
It can be overridden to change Instaloader's behavior regarding rate limits, for example to raise a custom
exception when the rate limit is hit::
import instaloader
class MyRateController(instaloader.RateController):
def sleep(self, secs):
raise MyCustomException()
L = instaloader.Instaloader(rate_controller=lambda ctx: MyRateController(ctx))
"""
def __init__(self, context: InstaloaderContext):
self._context = context
self._query_timestamps: Dict[str, List[float]] = dict()
self._earliest_next_request_time = 0.0
self._iphone_earliest_next_request_time = 0.0
def sleep(self, secs: float):
"""Wait given number of seconds."""
# Not static, to allow for the behavior of this method to depend on context-inherent properties, such as
# whether we are logged in.
time.sleep(secs)
def _dump_query_timestamps(self, current_time: float, failed_query_type: str):
windows = [10, 11, 20, 22, 30, 60]
self._context.error("Number of requests within last {} minutes grouped by type:"
.format('/'.join(str(w) for w in windows)),
repeat_at_end=False)
for query_type, times in self._query_timestamps.items():
reqs_in_sliding_window = [sum(t > current_time - w * 60 for t in times) for w in windows]
self._context.error(" {} {:>32}: {}".format(
"*" if query_type == failed_query_type else " ",
query_type,
" ".join("{:4}".format(reqs) for reqs in reqs_in_sliding_window)
), repeat_at_end=False)
def count_per_sliding_window(self, query_type: str) -> int:
"""Return how many requests of the given type can be done within a sliding window of 11 minutes.
This is called by :meth:`RateController.query_waittime` and allows to simply customize wait times before queries
at query_type granularity. Consider overriding :meth:`RateController.query_waittime` directly if you need more
control."""
# Not static, to allow for the count_per_sliding_window to depend on context-inherent properties, such as
# whether we are logged in.
return 75 if query_type == 'other' else 200
def _reqs_in_sliding_window(self, query_type: Optional[str], current_time: float, window: float) -> List[float]:
if query_type is not None:
# timestamps of type query_type
relevant_timestamps = self._query_timestamps[query_type]
else:
# all GraphQL queries, i.e. not 'iphone' or 'other'
graphql_query_timestamps = filter(lambda tp: tp[0] not in ['iphone', 'other'],
self._query_timestamps.items())
relevant_timestamps = [t for times in (tp[1] for tp in graphql_query_timestamps) for t in times]
return list(filter(lambda t: t > current_time - window, relevant_timestamps))
def query_waittime(self, query_type: str, current_time: float, untracked_queries: bool = False) -> float:
"""Calculate time needed to wait before query can be executed."""
per_type_sliding_window = 660
iphone_sliding_window = 1800
if query_type not in self._query_timestamps:
self._query_timestamps[query_type] = []
self._query_timestamps[query_type] = list(filter(lambda t: t > current_time - 60 * 60,
self._query_timestamps[query_type]))
def per_type_next_request_time():
reqs_in_sliding_window = self._reqs_in_sliding_window(query_type, current_time, per_type_sliding_window)
if len(reqs_in_sliding_window) < self.count_per_sliding_window(query_type):
return 0.0
else:
return min(reqs_in_sliding_window) + per_type_sliding_window + 6
def gql_accumulated_next_request_time():
if query_type in ['iphone', 'other']:
return 0.0
gql_accumulated_sliding_window = 600
gql_accumulated_max_count = 275
reqs_in_sliding_window = self._reqs_in_sliding_window(None, current_time, gql_accumulated_sliding_window)
if len(reqs_in_sliding_window) < gql_accumulated_max_count:
return 0.0
else:
return min(reqs_in_sliding_window) + gql_accumulated_sliding_window
def untracked_next_request_time():
if untracked_queries:
if query_type == "iphone":
reqs_in_sliding_window = self._reqs_in_sliding_window(query_type, current_time,
iphone_sliding_window)
self._iphone_earliest_next_request_time = min(reqs_in_sliding_window) + iphone_sliding_window + 18
else:
reqs_in_sliding_window = self._reqs_in_sliding_window(query_type, current_time,
per_type_sliding_window)
self._earliest_next_request_time = min(reqs_in_sliding_window) + per_type_sliding_window + 6
return max(self._iphone_earliest_next_request_time, self._earliest_next_request_time)
def iphone_next_request():
if query_type == "iphone":
reqs_in_sliding_window = self._reqs_in_sliding_window(query_type, current_time, iphone_sliding_window)
if len(reqs_in_sliding_window) >= 199:
return min(reqs_in_sliding_window) + iphone_sliding_window + 18
return 0.0
return max(0.0,
max(
per_type_next_request_time(),
gql_accumulated_next_request_time(),
untracked_next_request_time(),
iphone_next_request(),
) - current_time)
def wait_before_query(self, query_type: str) -> None:
"""This method is called before a query to Instagram.
It calls :meth:`RateController.query_waittime` to determine the time needed to wait and then calls
:meth:`RateController.sleep` to wait until the request can be made."""
waittime = self.query_waittime(query_type, time.monotonic(), False)
assert waittime >= 0
if waittime > 15:
formatted_waittime = ("{} seconds".format(round(waittime)) if waittime <= 666 else
"{} minutes".format(round(waittime / 60)))
self._context.log("\nToo many queries in the last time. Need to wait {}, until {:%H:%M}."
.format(formatted_waittime, datetime.now() + timedelta(seconds=waittime)))
if waittime > 0:
self.sleep(waittime)
if query_type not in self._query_timestamps:
self._query_timestamps[query_type] = [time.monotonic()]
else:
self._query_timestamps[query_type].append(time.monotonic())
def handle_429(self, query_type: str) -> None:
"""This method is called to handle a 429 Too Many Requests response.
It calls :meth:`RateController.query_waittime` to determine the time needed to wait and then calls
:meth:`RateController.sleep` to wait until we can repeat the same request."""
current_time = time.monotonic()
waittime = self.query_waittime(query_type, current_time, True)
assert waittime >= 0
self._dump_query_timestamps(current_time, query_type)
text_for_429 = ("Instagram responded with HTTP error \"429 - Too Many Requests\". Please do not run multiple "
"instances of Instaloader in parallel or within short sequence. Also, do not use any Instagram "
"App while Instaloader is running.")
self._context.error(textwrap.fill(text_for_429), repeat_at_end=False)
if waittime > 1.5:
formatted_waittime = ("{} seconds".format(round(waittime)) if waittime <= 666 else
"{} minutes".format(round(waittime / 60)))
self._context.error("The request will be retried in {}, at {:%H:%M}."
.format(formatted_waittime, datetime.now() + timedelta(seconds=waittime)),
repeat_at_end=False)
if waittime > 0:
self.sleep(waittime)

View File

@ -0,0 +1,117 @@
import configparser
from datetime import datetime, timezone
from typing import Optional
from os.path import dirname
from os import makedirs
class LatestStamps:
"""LatestStamps class.
Convenience class for retrieving and storing data from the :option:`--latest-stamps` file.
:param latest_stamps_file: path to file.
.. versionadded:: 4.8"""
PROFILE_ID = 'profile-id'
PROFILE_PIC = 'profile-pic'
POST_TIMESTAMP = 'post-timestamp'
TAGGED_TIMESTAMP = 'tagged-timestamp'
IGTV_TIMESTAMP = 'igtv-timestamp'
STORY_TIMESTAMP = 'story-timestamp'
ISO_FORMAT = '%Y-%m-%dT%H:%M:%S.%f%z'
def __init__(self, latest_stamps_file):
self.file = latest_stamps_file
self.data = configparser.ConfigParser()
self.data.read(latest_stamps_file)
def _save(self):
if dn := dirname(self.file):
makedirs(dn, exist_ok=True)
with open(self.file, 'w') as f:
self.data.write(f)
def _ensure_section(self, section: str):
if not self.data.has_section(section):
self.data.add_section(section)
def get_profile_id(self, profile_name: str) -> Optional[int]:
"""Returns stored ID of profile."""
try:
return self.data.getint(profile_name, self.PROFILE_ID)
except (configparser.Error, ValueError):
return None
def save_profile_id(self, profile_name: str, profile_id: int):
"""Stores ID of profile."""
self._ensure_section(profile_name)
self.data.set(profile_name, self.PROFILE_ID, str(profile_id))
self._save()
def rename_profile(self, old_profile: str, new_profile: str):
"""Renames a profile."""
self._ensure_section(new_profile)
for option in [self.PROFILE_ID, self.PROFILE_PIC, self.POST_TIMESTAMP,
self.TAGGED_TIMESTAMP, self.IGTV_TIMESTAMP, self.STORY_TIMESTAMP]:
if self.data.has_option(old_profile, option):
value = self.data.get(old_profile, option)
self.data.set(new_profile, option, value)
self.data.remove_section(old_profile)
self._save()
def _get_timestamp(self, section: str, key: str) -> datetime:
try:
return datetime.strptime(self.data.get(section, key), self.ISO_FORMAT)
except (configparser.Error, ValueError):
return datetime.fromtimestamp(0, timezone.utc)
def _set_timestamp(self, section: str, key: str, timestamp: datetime):
self._ensure_section(section)
self.data.set(section, key, timestamp.strftime(self.ISO_FORMAT))
self._save()
def get_last_post_timestamp(self, profile_name: str) -> datetime:
"""Returns timestamp of last download of a profile's posts."""
return self._get_timestamp(profile_name, self.POST_TIMESTAMP)
def set_last_post_timestamp(self, profile_name: str, timestamp: datetime):
"""Sets timestamp of last download of a profile's posts."""
self._set_timestamp(profile_name, self.POST_TIMESTAMP, timestamp)
def get_last_tagged_timestamp(self, profile_name: str) -> datetime:
"""Returns timestamp of last download of a profile's tagged posts."""
return self._get_timestamp(profile_name, self.TAGGED_TIMESTAMP)
def set_last_tagged_timestamp(self, profile_name: str, timestamp: datetime):
"""Sets timestamp of last download of a profile's tagged posts."""
self._set_timestamp(profile_name, self.TAGGED_TIMESTAMP, timestamp)
def get_last_igtv_timestamp(self, profile_name: str) -> datetime:
"""Returns timestamp of last download of a profile's igtv posts."""
return self._get_timestamp(profile_name, self.IGTV_TIMESTAMP)
def set_last_igtv_timestamp(self, profile_name: str, timestamp: datetime):
"""Sets timestamp of last download of a profile's igtv posts."""
self._set_timestamp(profile_name, self.IGTV_TIMESTAMP, timestamp)
def get_last_story_timestamp(self, profile_name: str) -> datetime:
"""Returns timestamp of last download of a profile's stories."""
return self._get_timestamp(profile_name, self.STORY_TIMESTAMP)
def set_last_story_timestamp(self, profile_name: str, timestamp: datetime):
"""Sets timestamp of last download of a profile's stories."""
self._set_timestamp(profile_name, self.STORY_TIMESTAMP, timestamp)
def get_profile_pic(self, profile_name: str) -> str:
"""Returns filename of profile's last downloaded profile pic."""
try:
return self.data.get(profile_name, self.PROFILE_PIC)
except configparser.Error:
return ""
def set_profile_pic(self, profile_name: str, profile_pic: str):
"""Sets filename of profile's last downloaded profile pic."""
self._ensure_section(profile_name)
self.data.set(profile_name, self.PROFILE_PIC, profile_pic)
self._save()

View File

@ -0,0 +1,329 @@
import base64
import hashlib
import json
import os
from contextlib import contextmanager
from datetime import datetime, timedelta
from lzma import LZMAError
from typing import Any, Callable, Dict, Iterable, Iterator, NamedTuple, Optional, Tuple, TypeVar
from .exceptions import AbortDownloadException, InvalidArgumentException
from .instaloadercontext import InstaloaderContext
class FrozenNodeIterator(NamedTuple):
query_hash: Optional[str]
query_variables: Dict
query_referer: Optional[str]
context_username: Optional[str]
total_index: int
best_before: Optional[float]
remaining_data: Optional[Dict]
first_node: Optional[Dict]
doc_id: Optional[str]
FrozenNodeIterator.query_hash.__doc__ = """The GraphQL ``query_hash`` parameter."""
FrozenNodeIterator.query_variables.__doc__ = """The GraphQL ``query_variables`` parameter."""
FrozenNodeIterator.query_referer.__doc__ = """The HTTP referer used for the GraphQL query."""
FrozenNodeIterator.context_username.__doc__ = """The username who created the iterator, or ``None``."""
FrozenNodeIterator.total_index.__doc__ = """Number of items that have already been returned."""
FrozenNodeIterator.best_before.__doc__ = """Date when parts of the stored nodes might have expired."""
FrozenNodeIterator.remaining_data.__doc__ = \
"""The already-retrieved, yet-unprocessed ``edges`` and the ``page_info`` at time of freezing."""
FrozenNodeIterator.first_node.__doc__ = """Node data of the first item, if an item has been produced."""
FrozenNodeIterator.doc_id.__doc__ = """The GraphQL ``doc_id`` parameter."""
T = TypeVar('T')
class NodeIterator(Iterator[T]):
"""
Iterate the nodes within edges in a GraphQL pagination. Instances of this class are returned by many (but not all)
of Instaloader's :class:`Post`-returning functions (such as :meth:`Profile.get_posts` etc.).
What makes this iterator special is its ability to freeze/store its current state, e.g. to interrupt an iteration,
and later thaw/resume from where it left off.
You can freeze a NodeIterator with :meth:`NodeIterator.freeze`::
post_iterator = profile.get_posts()
try:
for post in post_iterator:
do_something_with(post)
except KeyboardInterrupt:
save("resume_information.json", post_iterator.freeze())
and later reuse it with :meth:`NodeIterator.thaw` on an equally-constructed NodeIterator::
post_iterator = profile.get_posts()
post_iterator.thaw(load("resume_information.json"))
(an appropriate method to load and save the :class:`FrozenNodeIterator` is e.g.
:func:`load_structure_from_file` and :func:`save_structure_to_file`.)
A :class:`FrozenNodeIterator` can only be thawn with a matching NodeIterator, i.e. a NodeIterator instance that has
been constructed with the same parameters as the instance that is represented by the :class:`FrozenNodeIterator` in
question. This is to ensure that an iteration cannot be resumed in a wrong, unmatching loop. As a quick way to
distinguish iterators that are saved e.g. in files, there is the :attr:`NodeIterator.magic` string: Two
NodeIterators are matching if and only if they have the same magic.
See also :func:`resumable_iteration` for a high-level context manager that handles a resumable iteration.
.. versionchanged: 4.13
Included support for `doc_id`-based queries (using POST method).
"""
_graphql_page_length = 12
_shelf_life = timedelta(days=29)
def __init__(self,
context: InstaloaderContext,
query_hash: Optional[str],
edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
node_wrapper: Callable[[Dict], T],
query_variables: Optional[Dict[str, Any]] = None,
query_referer: Optional[str] = None,
first_data: Optional[Dict[str, Any]] = None,
is_first: Optional[Callable[[T, Optional[T]], bool]] = None,
doc_id: Optional[str] = None):
self._context = context
self._query_hash = query_hash
self._doc_id = doc_id
self._edge_extractor = edge_extractor
self._node_wrapper = node_wrapper
self._query_variables = query_variables if query_variables is not None else {}
self._query_referer = query_referer
self._page_index = 0
self._total_index = 0
if first_data is not None:
self._data = first_data
self._best_before = datetime.now() + NodeIterator._shelf_life
else:
self._data = self._query()
self._first_node: Optional[Dict] = None
self._is_first = is_first
def _query(self, after: Optional[str] = None) -> Dict:
if self._doc_id is not None:
return self._query_doc_id(self._doc_id, after)
else:
assert self._query_hash is not None
return self._query_query_hash(self._query_hash, after)
def _query_doc_id(self, doc_id: str, after: Optional[str] = None) -> Dict:
pagination_variables: Dict[str, Any] = {'__relay_internal__pv__PolarisFeedShareMenurelayprovider': False}
if after is not None:
pagination_variables['after'] = after
pagination_variables['before'] = None
pagination_variables['first'] = 12
pagination_variables['last'] = None
data = self._edge_extractor(
self._context.doc_id_graphql_query(
doc_id, {**self._query_variables, **pagination_variables}, self._query_referer
)
)
self._best_before = datetime.now() + NodeIterator._shelf_life
return data
def _query_query_hash(self, query_hash: str, after: Optional[str] = None) -> Dict:
pagination_variables: Dict[str, Any] = {'first': NodeIterator._graphql_page_length}
if after is not None:
pagination_variables['after'] = after
data = self._edge_extractor(
self._context.graphql_query(
query_hash, {**self._query_variables, **pagination_variables}, self._query_referer
)
)
self._best_before = datetime.now() + NodeIterator._shelf_life
return data
def __iter__(self):
return self
def __next__(self) -> T:
if self._page_index < len(self._data['edges']):
node = self._data['edges'][self._page_index]['node']
page_index, total_index = self._page_index, self._total_index
try:
self._page_index += 1
self._total_index += 1
except KeyboardInterrupt:
self._page_index, self._total_index = page_index, total_index
raise
item = self._node_wrapper(node)
if self._is_first is not None:
if self._is_first(item, self.first_item):
self._first_node = node
else:
if self._first_node is None:
self._first_node = node
return item
if self._data.get('page_info', {}).get('has_next_page'):
query_response = self._query(self._data['page_info']['end_cursor'])
if self._data['edges'] != query_response['edges'] and len(query_response['edges']) > 0:
page_index, data = self._page_index, self._data
try:
self._page_index = 0
self._data = query_response
except KeyboardInterrupt:
self._page_index, self._data = page_index, data
raise
return self.__next__()
raise StopIteration()
@property
def count(self) -> Optional[int]:
"""The ``count`` as returned by Instagram. This is not always the total count this iterator will yield."""
return self._data.get('count') if self._data is not None else None
@property
def total_index(self) -> int:
"""Number of items that have already been returned."""
return self._total_index
@property
def magic(self) -> str:
"""Magic string for easily identifying a matching iterator file for resuming (hash of some parameters)."""
magic_hash = hashlib.blake2b(digest_size=6)
magic_hash.update(json.dumps(
[self._query_hash, self._query_variables, self._query_referer, self._context.username]
).encode())
return base64.urlsafe_b64encode(magic_hash.digest()).decode()
@property
def first_item(self) -> Optional[T]:
"""
If this iterator has produced any items, returns the first item produced.
It is possible to override what is considered the first item (for example, to consider the
newest item in case items are not in strict chronological order) by passing a callback
function as the `is_first` parameter when creating the class.
.. versionadded:: 4.8
.. versionchanged:: 4.9.2
What is considered the first item can be overridden.
"""
return self._node_wrapper(self._first_node) if self._first_node is not None else None
@staticmethod
def page_length() -> int:
return NodeIterator._graphql_page_length
def freeze(self) -> FrozenNodeIterator:
"""Freeze the iterator for later resuming."""
remaining_data = None
if self._data is not None:
remaining_data = {**self._data,
'edges': (self._data['edges'][(max(self._page_index - 1, 0)):])}
return FrozenNodeIterator(
query_hash=self._query_hash,
query_variables=self._query_variables,
query_referer=self._query_referer,
context_username=self._context.username,
total_index=max(self.total_index - 1, 0),
best_before=self._best_before.timestamp() if self._best_before else None,
remaining_data=remaining_data,
first_node=self._first_node,
doc_id=self._doc_id,
)
def thaw(self, frozen: FrozenNodeIterator) -> None:
"""
Use this iterator for resuming from earlier iteration.
:raises InvalidArgumentException:
If
- the iterator on which this method is called has already been used, or
- the given :class:`FrozenNodeIterator` does not match, i.e. belongs to a different iteration.
"""
if self._total_index or self._page_index:
raise InvalidArgumentException("thaw() called on already-used iterator.")
if (self._query_hash != frozen.query_hash or
self._query_variables != frozen.query_variables or
self._query_referer != frozen.query_referer or
self._context.username != frozen.context_username or
self._doc_id != frozen.doc_id):
raise InvalidArgumentException("Mismatching resume information.")
if not frozen.best_before:
raise InvalidArgumentException("\"best before\" date missing.")
if frozen.remaining_data is None:
raise InvalidArgumentException("\"remaining_data\" missing.")
self._total_index = frozen.total_index
self._best_before = datetime.fromtimestamp(frozen.best_before)
self._data = frozen.remaining_data
if frozen.first_node is not None:
self._first_node = frozen.first_node
@contextmanager
def resumable_iteration(context: InstaloaderContext,
iterator: Iterable,
load: Callable[[InstaloaderContext, str], Any],
save: Callable[[FrozenNodeIterator, str], None],
format_path: Callable[[str], str],
check_bbd: bool = True,
enabled: bool = True) -> Iterator[Tuple[bool, int]]:
"""
High-level context manager to handle a resumable iteration that can be interrupted
with a :class:`KeyboardInterrupt` or an :class:`AbortDownloadException`.
It can be used as follows to automatically load a previously-saved state into the iterator, save the iterator's
state when interrupted, and delete the resume file upon completion::
post_iterator = profile.get_posts()
with resumable_iteration(
context=L.context,
iterator=post_iterator,
load=lambda _, path: FrozenNodeIterator(**json.load(open(path))),
save=lambda fni, path: json.dump(fni._asdict(), open(path, 'w')),
format_path=lambda magic: "resume_info_{}.json".format(magic)
) as (is_resuming, start_index):
for post in post_iterator:
do_something_with(post)
It yields a tuple (is_resuming, start_index).
When the passed iterator is not a :class:`NodeIterator`, it behaves as if ``resumable_iteration`` was not used,
just executing the inner body.
:param context: The :class:`InstaloaderContext`.
:param iterator: The fresh :class:`NodeIterator`.
:param load: Loads a FrozenNodeIterator from given path. The object is ignored if it has a different type.
:param save: Saves the given FrozenNodeIterator to the given path.
:param format_path: Returns the path to the resume file for the given magic.
:param check_bbd: Whether to check the best before date and reject an expired FrozenNodeIterator.
:param enabled: Set to False to disable all functionality and simply execute the inner body.
.. versionchanged:: 4.7
Also interrupt on :class:`AbortDownloadException`.
"""
if not enabled or not isinstance(iterator, NodeIterator):
yield False, 0
return
is_resuming = False
start_index = 0
resume_file_path = format_path(iterator.magic)
resume_file_exists = os.path.isfile(resume_file_path)
if resume_file_exists:
try:
fni = load(context, resume_file_path)
if not isinstance(fni, FrozenNodeIterator):
raise InvalidArgumentException("Invalid type.")
if check_bbd and fni.best_before and datetime.fromtimestamp(fni.best_before) < datetime.now():
raise InvalidArgumentException("\"Best before\" date exceeded.")
iterator.thaw(fni)
is_resuming = True
start_index = iterator.total_index
context.log("Resuming from {}.".format(resume_file_path))
except (InvalidArgumentException, LZMAError, json.decoder.JSONDecodeError, EOFError) as exc:
context.error("Warning: Not resuming from {}: {}".format(resume_file_path, exc))
try:
yield is_resuming, start_index
except (KeyboardInterrupt, AbortDownloadException):
if os.path.dirname(resume_file_path):
os.makedirs(os.path.dirname(resume_file_path), exist_ok=True)
save(iterator.freeze(), resume_file_path)
context.log("\nSaved resume information to {}.".format(resume_file_path))
raise
if resume_file_exists:
os.unlink(resume_file_path)
context.log("Iteration complete, deleted resume information file {}.".format(resume_file_path))

View File

View File

@ -0,0 +1,46 @@
from typing import Any, Callable, Dict, Iterator, Optional, TypeVar
from .instaloadercontext import InstaloaderContext
T = TypeVar('T')
class SectionIterator(Iterator[T]):
"""Iterator for the new 'sections'-style responses.
.. versionadded:: 4.9"""
def __init__(self,
context: InstaloaderContext,
sections_extractor: Callable[[Dict[str, Any]], Dict[str, Any]],
media_wrapper: Callable[[Dict], T],
query_path: str,
first_data: Optional[Dict[str, Any]] = None):
self._context = context
self._sections_extractor = sections_extractor
self._media_wrapper = media_wrapper
self._query_path = query_path
self._data = first_data or self._query()
self._page_index = 0
self._section_index = 0
def __iter__(self):
return self
def _query(self, max_id: Optional[str] = None) -> Dict[str, Any]:
pagination_variables = {"max_id": max_id} if max_id is not None else {}
return self._sections_extractor(
self._context.get_json(self._query_path, params={"__a": 1, "__d": "dis", **pagination_variables})
)
def __next__(self) -> T:
if self._page_index < len(self._data['sections']):
media = self._data['sections'][self._page_index]['layout_content']['medias'][self._section_index]['media']
self._section_index += 1
if self._section_index >= len(self._data['sections'][self._page_index]['layout_content']['medias']):
self._section_index = 0
self._page_index += 1
return self._media_wrapper(media)
if self._data['more_available']:
self._page_index, self._section_index, self._data = 0, 0, self._query(self._data["next_max_id"])
return self.__next__()
raise StopIteration()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,185 @@
Metadata-Version: 2.1
Name: instaloader
Version: 4.13.1
Summary: Download pictures (or videos) along with their captions and other metadata from Instagram.
Home-page: https://instaloader.github.io/
Author: Alexander Graf, André Koch-Kramer
Author-email: mail@agraf.me, koch-kramer@web.de
License: MIT
Keywords: instagram,instagram-scraper,instagram-client,instagram-feed,downloader,videos,photos,pictures,instagram-user-photos,instagram-photos,instagram-metadata,instagram-downloader,instagram-stories
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Console
Classifier: Intended Audience :: End Users/Desktop
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: Programming Language :: Python :: 3 :: Only
Classifier: Topic :: Internet
Classifier: Topic :: Multimedia :: Graphics
Requires-Python: >=3.8
License-File: LICENSE
License-File: AUTHORS.md
Requires-Dist: requests>=2.4
Provides-Extra: browser-cookie3
Requires-Dist: browser_cookie3>=0.19.1; extra == "browser-cookie3"
.. image:: https://raw.githubusercontent.com/instaloader/instaloader/master/docs/logo_heading.png
.. badges-start
|pypi| |pyversion| |license| |aur| |contributors| |downloads|
.. |pypi| image:: https://img.shields.io/pypi/v/instaloader.svg
:alt: Instaloader PyPI Project Page
:target: https://pypi.org/project/instaloader/
.. |license| image:: https://img.shields.io/github/license/instaloader/instaloader.svg
:alt: MIT License
:target: https://github.com/instaloader/instaloader/blob/master/LICENSE
.. |pyversion| image:: https://img.shields.io/pypi/pyversions/instaloader.svg
:alt: Supported Python Versions
.. |contributors| image:: https://img.shields.io/github/contributors/instaloader/instaloader.svg
:alt: Contributor Count
:target: https://github.com/instaloader/instaloader/graphs/contributors
.. |aur| image:: https://img.shields.io/aur/version/instaloader.svg
:alt: Arch User Repository Package
:target: https://aur.archlinux.org/packages/instaloader/
.. |downloads| image:: https://pepy.tech/badge/instaloader/month
:alt: PyPI Download Count
:target: https://pepy.tech/project/instaloader
.. badges-end
::
$ pip3 install instaloader
$ instaloader profile [profile ...]
**Instaloader**
- downloads **public and private profiles, hashtags, user stories,
feeds and saved media**,
- downloads **comments, geotags and captions** of each post,
- automatically **detects profile name changes** and renames the target
directory accordingly,
- allows **fine-grained customization** of filters and where to store
downloaded media,
- automatically **resumes previously-interrupted** download iterations.
::
instaloader [--comments] [--geotags]
[--stories] [--highlights] [--tagged] [--igtv]
[--login YOUR-USERNAME] [--fast-update]
profile | "#hashtag" | :stories | :feed | :saved
`Instaloader Documentation <https://instaloader.github.io/>`__
How to Automatically Download Pictures from Instagram
-----------------------------------------------------
To **download all pictures and videos of a profile**, as well as the
**profile picture**, do
::
instaloader profile [profile ...]
where ``profile`` is the name of a profile you want to download. Instead
of only one profile, you may also specify a list of profiles.
To later **update your local copy** of that profiles, you may run
::
instaloader --fast-update profile [profile ...]
If ``--fast-update`` is given, Instaloader stops when arriving at the
first already-downloaded picture.
Alternatively, you can use ``--latest-stamps`` to have Instaloader store
the time each profile was last downloaded and only download newer media:
::
instaloader --latest-stamps -- profile [profile ...]
With this option it's possible to move or delete downloaded media and still keep
the archive updated.
When updating profiles, Instaloader
automatically **detects profile name changes** and renames the target directory
accordingly.
Instaloader can also be used to **download private profiles**. To do so,
invoke it with
::
instaloader --login=your_username profile [profile ...]
When logging in, Instaloader **stores the session cookies** in a file in your
temporary directory, which will be reused later the next time ``--login``
is given. So you can download private profiles **non-interactively** when you
already have a valid session cookie file.
`Instaloader Documentation <https://instaloader.github.io/basic-usage.html>`__
Contributing
------------
As an open source project, Instaloader heavily depends on the contributions from
its community. See
`contributing <https://instaloader.github.io/contributing.html>`__
for how you may help Instaloader to become an even greater tool.
Supporters
----------
.. current-sponsors-start
| Instaloader is proudly sponsored by
| `@rocketapi-io <https://github.com/rocketapi-io>`__
See `Alex' GitHub Sponsors <https://github.com/sponsors/aandergr>`__ page for
how you can sponsor the development of Instaloader!
.. current-sponsors-end
It is a pleasure for us to share our Instaloader to the world, and we are proud
to have attracted such an active and motivating community, with so many users
who share their suggestions and ideas with us. Buying a community-sponsored beer
or coffee from time to time is very likely to further raise our passion for the
development of Instaloader.
| For Donations, we provide GitHub Sponsors page, a PayPal.Me link and a Bitcoin address.
| GitHub Sponsors: `Sponsor @aandergr on GitHub Sponsors <https://github.com/sponsors/aandergr>`__
| PayPal: `PayPal.me/aandergr <https://www.paypal.me/aandergr>`__
| BTC: 1Nst4LoadeYzrKjJ1DX9CpbLXBYE9RKLwY
Disclaimer
----------
.. disclaimer-start
Instaloader is in no way affiliated with, authorized, maintained or endorsed by Instagram or any of its affiliates or
subsidiaries. This is an independent and unofficial project. Use at your own risk.
Instaloader is licensed under an MIT license. Refer to ``LICENSE`` file for more information.
.. disclaimer-end

View File

@ -0,0 +1,21 @@
AUTHORS.md
LICENSE
README.rst
setup.py
instaloader/__init__.py
instaloader/__main__.py
instaloader/exceptions.py
instaloader/instaloader.py
instaloader/instaloadercontext.py
instaloader/lateststamps.py
instaloader/nodeiterator.py
instaloader/py.typed
instaloader/sectioniterator.py
instaloader/structures.py
instaloader.egg-info/PKG-INFO
instaloader.egg-info/SOURCES.txt
instaloader.egg-info/dependency_links.txt
instaloader.egg-info/entry_points.txt
instaloader.egg-info/not-zip-safe
instaloader.egg-info/requires.txt
instaloader.egg-info/top_level.txt

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,2 @@
[console_scripts]
instaloader = instaloader.__main__:main

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,4 @@
requests>=2.4
[browser_cookie3]
browser_cookie3>=0.19.1

View File

@ -0,0 +1 @@
instaloader

View File

@ -209,6 +209,7 @@ class Instaloader:
"""
def __init__(self,
proxy:Optional[dict] = None,
sleep: bool = True,
quiet: bool = False,
user_agent: Optional[str] = None,
@ -234,7 +235,7 @@ class Instaloader:
title_pattern: Optional[str] = None,
sanitize_paths: bool = False):
self.context = InstaloaderContext(sleep, quiet, user_agent, max_connection_attempts,
self.context = InstaloaderContext(proxy,sleep, quiet, user_agent, max_connection_attempts,
request_timeout, rate_controller, fatal_status_codes,
iphone_support)

View File

@ -78,7 +78,7 @@ class InstaloaderContext:
class :class:`Instaloader`.
"""
def __init__(self, sleep: bool = True, quiet: bool = False, user_agent: Optional[str] = None,
def __init__(self,proxy:Optional[dict], sleep: bool = True, quiet: bool = False, user_agent: Optional[str] = None,
max_connection_attempts: int = 3, request_timeout: float = 300.0,
rate_controller: Optional[Callable[["InstaloaderContext"], "RateController"]] = None,
fatal_status_codes: Optional[List[int]] = None,
@ -86,7 +86,7 @@ class InstaloaderContext:
self.user_agent = user_agent if user_agent is not None else default_user_agent()
self.request_timeout = request_timeout
self._session = self.get_anonymous_session()
self._session = self.get_anonymous_session(proxy)
self.username = None
self.user_id = None
self.sleep = sleep
@ -96,6 +96,7 @@ class InstaloaderContext:
self.two_factor_auth_pending = None
self.iphone_support = iphone_support
self.iphone_headers = default_iphone_headers()
self.proxy = proxy
# error log, filled with error() and printed at the end of Instaloader.main()
self.error_log: List[str] = []
@ -117,7 +118,7 @@ class InstaloaderContext:
username = self.username
user_id = self.user_id
iphone_headers = self.iphone_headers
self._session = self.get_anonymous_session()
self._session = self.get_anonymous_session(self.proxy)
self.username = None
self.user_id = None
self.iphone_headers = default_iphone_headers()
@ -199,12 +200,13 @@ class InstaloaderContext:
del header['X-Requested-With']
return header
def get_anonymous_session(self) -> requests.Session:
def get_anonymous_session(self,proxy) -> requests.Session:
"""Returns our default anonymous requests.Session object."""
session = requests.Session()
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
session.proxies.update(proxy)
session.headers.update(self._default_http_header(empty_session_only=True))
# Override default timeout behavior.
# Need to silence mypy bug for this. See: https://github.com/python/mypy/issues/2427
@ -670,7 +672,7 @@ class InstaloaderContext:
:raises ConnectionException: When download failed.
.. versionadded:: 4.2.1"""
with self.get_anonymous_session() as anonymous_session:
with self.get_anonymous_session(self.proxy) as anonymous_session:
resp = anonymous_session.get(url, stream=True)
if resp.status_code == 200:
resp.raw.decode_content = True
@ -701,7 +703,7 @@ class InstaloaderContext:
.. versionadded:: 4.7.6
"""
with self.get_anonymous_session() as anonymous_session:
with self.get_anonymous_session(self.proxy) as anonymous_session:
resp = anonymous_session.head(url, allow_redirects=allow_redirects)
if resp.status_code == 200:
return resp