diff --git a/README.rst b/README.rst
index a7ed7c4..3e6835a 100644
--- a/README.rst
+++ b/README.rst
@@ -1,21 +1,13 @@
Instaloader
===========
-Download pictures (or videos) along with their captions and other metadata
-from Instagram.
Installation
------------
-Instaloader is written in Python, thus ensure having
-`Python `__ (at least version 3.5) installed.
-
-If you intend to use this tool under Windows, it is recommended
-to install
-`win-unicode-console `__.
-
-If you have `pip `__ installed, you
-may install Instaloader using
+Instaloader requires `Python `__, at least
+version 3.5. If you have `pip `__
+installed, you may install Instaloader using
::
@@ -34,7 +26,7 @@ Instaloader requires
`requests `__, which
will be installed automatically, if it is not already installed.
-How to automatically download pictures from Instagram
+How to Automatically Download Pictures from Instagram
-----------------------------------------------------
To **download all pictures and videos of a profile**, as well as the
@@ -53,8 +45,10 @@ To later **update your local copy** of that profiles, you may run
instaloader --fast-update profile [profile ...]
-When ``--fast-update`` is given, Instaloader stops when arriving at
-the first already-downloaded picture.
+If ``--fast-update`` is given, Instaloader stops when arriving at the
+first already-downloaded picture. When updating profiles, Instaloader
+automatically **detects profile name changes** and renames the target
+directory accordingly.
Instaloader can also be used to **download private profiles**. To do so,
invoke it with
@@ -63,57 +57,96 @@ invoke it with
instaloader --login=your_username profile [profile ...]
-When invoked like this, it also **stores the session cookies** in a file
-in your temporary directory, which will be reused later when ``--login`` is given. So
-you can download private profiles **non-interactively** when you already
-have a valid session cookie file.
+When logging in, Instaloader **stores the session cookies** in a file in
+your temporary directory, which will be reused later the next time
+``--login`` is given. So you can download private profiles
+**non-interactively** when you already have a valid session cookie file.
-Besides downloading private profiles, being logged in allows to
-**download stories**:
+What to Download
+^^^^^^^^^^^^^^^^
-::
+Instaloader does not only download media by-profile. More generally, you
+may specify the following targets:
- instaloader --login=your_username --stories profile [profile ...]
+- ``profile``: Public profile, or private profile with ``--login``,
-You may also download
-**the most recent pictures by hashtag**:
+- ``"#hashtag"``: Posts with a certain **hashtag** (the quotes are
+ usually neccessary),
-::
+- ``:stories``: The currently-visible **stories** of your followees
+ (requires ``--login``),
- instaloader "#hashtag"
+- ``:feed``: Your **feed** (requires ``--login``),
-This downloads the requested posts into a directory named according to the specified
-hashtag and the filenames correspond to the timestamp of the posts.
-As with all download tasks, this behavior can easily be customized, for example
-encode the poster's profile name in the filenames:
+- ``@profile``: All profiles which are followed by ``profile``, i.e. the
+ *followees* of ``profile`` (requires ``--login``).
+
+Instaloader goes through all media matching the specified targets and
+downloads the pictures and videos and their captions. You can specify
+``--comments`` to also **download comments** of each post and
+``--geotags`` to **download geotags** of each post and save them as
+Google Maps link. For each profile you download, ``--stories``
+instructs Instaloader to **download the user's stories**.
+
+Filename Specification
+^^^^^^^^^^^^^^^^^^^^^^
+
+For each target, Instaloader creates a directory named after the target,
+i.e. ``profile``, ``#hashtag``, ``:feed``, etc. and therein saves the
+posts in files named after the post's timestamp.
+
+``--dirname-pattern`` allows to configure the directory name of each
+target. The default is ``--dirname-pattern={target}``. In the dirname
+pattern, the token ``{target}`` is replaced by the target name, and
+``{profile}`` is replaced by the owner of the post which is downloaded.
+
+``--filename-pattern`` configures the path of the post's files relative
+to the target directory. The default is ``--filename-pattern={date}``.
+The tokens ``{target}`` and ``{profile}`` are replaced like in the
+dirname pattern. Further, the tokens ``{date}`` and ``{shortcode}`` are
+defined.
+
+For example, encode the poster's profile name in the filenames with:
::
instaloader --filename-pattern={date}_{profile} "#hashtag"
-If you want to **download all followees of a given profile**, call
+The pattern string is formatted with Python's string formatter. This
+gives additional flexibilty for pattern specification. For example,
+strptime-style formatting options are supported for the post's
+timestamp. The default for ``{date}`` is ``{date:%Y-%m-%d_%H-%M-%S}``.
+
+Filter Posts
+^^^^^^^^^^^^
+
+The ``--only-if`` option allows to specify criterias that posts have to
+meet to be downloaded. If not given, all posts are downloaded. It must
+be a boolean Python expression where the variables ``likes``,
+``comments``, ``viewer_has_liked``, ``is_video``, ``date``, and some
+more (see class ``instaloader.Post`` for a full list) are defined.
+
+A few examples:
+
+To **download the pictures from your feed that you have liked**:
::
- instaloader --login=your_username @profile
+ instaloader --login=your_username --only-if=viewer_has_liked :feed
-To **download all the pictures from your feed which you have liked**, call
+Or you might only want to download **posts that either you liked or were
+liked by many others**:
::
- instaloader --login=your_username :feed-liked
+ instaloader --login=your_username --only-if="likes>100 or viewer_has_liked" profile
-or to **download all pictures from your feed**:
+Or you may **skip videos**:
::
- instaloader --login=your_username :feed-all
+ instaloader --only-if="not is_video" target
-**Download all stories** from the profiles you follow:
-
-::
-
- instaloader --login=your_username --filename-pattern={date}_{profile} :stories
Advanced Options
----------------
@@ -134,25 +167,34 @@ has been renamed, Instaloader automatically **finds it by its unique ID** and
renames the folder likewise.
Instead of a *profile* or a *#hashtag*, the special targets
-``:feed-all`` (pictures from your feed),
-``:feed-liked`` (pictures from your feed which you liked), and
+``:feed`` (pictures from your feed) and
``:stories`` (stories of your followees) can be specified.
--profile-pic-only Only download profile picture.
---skip-videos Do not download videos.
+--no-videos Do not download videos.
--geotags **Download geotags** when available. Geotags are stored as
a text file with the location's name and a Google Maps
link. This requires an additional request to the
Instagram server for each picture, which is why it is
disabled by default.
+--no-geotags Do not store geotags, even if they can be obtained
+ without any additional request.
--comments Download and update comments for each post. This
requires an additional request to the Instagram server
for each post, which is why it is disabled by default.
+--no-captions Do not store media captions, although no additional
+ request is needed to obtain them.
--stories Also **download stories** of each profile that is
downloaded. Requires ``--login``.
--stories-only Rather than downloading regular posts of each
specified profile, only download stories.
Requires ``--login``.
+--only-if filter Expression that, if given, must evaluate to True for each post to
+ be downloaded. Must be a syntactically valid python
+ expression. Variables are evaluated to
+ ``instaloader.Post`` attributes.
+ Example: ``--only-if=viewer_has_liked``.
+
When to Stop Downloading
^^^^^^^^^^^^^^^^^^^^^^^^
@@ -204,13 +246,10 @@ How to Download
Defaults to ``{date:%Y-%m-%d_%H-%M-%S}``.
--user-agent USER_AGENT User Agent to use for HTTP requests. Per default,
Instaloader pretends being Chrome/51.
---no-sleep Do not sleep between requests to Instagram's servers.
- This makes downloading faster, but may be suspicious.
Miscellaneous Options
^^^^^^^^^^^^^^^^^^^^^
---shorter-output Do not display captions while downloading.
--quiet Disable user interaction, i.e. do not print messages
(except errors) and fail if login credentials are
needed but not given. This makes Instaloader
@@ -234,35 +273,29 @@ For example, to get a list of all followees and a list of all followers of a pro
# Login
loader.interactive_login(USERNAME)
- # Retrieve followees
- followees = loader.get_followees(PROFILE)
+ # Print followees
print(PROFILE + " follows these profiles:")
- for f in followees:
+ for f in loader.get_followees(PROFILE):
print("\t%s\t%s" % (f['username'], f['full_name']))
- # Retrieve followers
- followers = loader.get_followers(PROFILE)
+ # Print followers
print("Followers of " + PROFILE + ":")
- for f in followers:
+ for f in loader.get_followers(PROFILE):
print("\t%s\t%s" % (f['username'], f['full_name']))
Then, you may download all pictures of all followees with
.. code:: python
- for f in followees:
- try:
- loader.download(f['username'])
- except instaloader.NonfatalException:
- pass
+ for f in loader.get_followers(PROFILE):
+ loader.download_profile(f['username'])
You could also download your last 20 liked pics with
.. code:: python
- loader.download_feed_pics(max_count=20, fast_update=True,
- filter_func=lambda node:
- not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"])
+ loader.download_feed_posts(max_count=20, fast_update=True,
+ filter_func=lambda post: post.viewer_has_liked)
To download the last 20 pictures with hashtag #cat, do
@@ -296,4 +329,6 @@ Disclaimer
----------
This code is in no way affiliated with, authorized, maintained or endorsed by Instagram or any of its affiliates or
-subsidiaries. This is an independent and unofficial project. Use at your own risk.
\ No newline at end of file
+subsidiaries. This is an independent and unofficial project. Use at your own risk.
+
+Instaloader is licensed under an MIT license. Refer to ``LICENSE`` file for more information.
diff --git a/instaloader.py b/instaloader.py
index be60fe3..49a5e35 100755
--- a/instaloader.py
+++ b/instaloader.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""Download pictures (or videos) along with their captions and other metadata from Instagram."""
-
+import ast
import getpass
import json
import os
@@ -12,25 +12,25 @@ import shutil
import string
import sys
import tempfile
+import textwrap
import time
-from argparse import ArgumentParser
+import urllib.parse
+from argparse import ArgumentParser, SUPPRESS
from base64 import b64decode, b64encode
+from contextlib import contextmanager, suppress
from datetime import datetime
+from enum import Enum
+
from io import BytesIO
-from typing import Any, Callable, Dict, List, Optional, Tuple
+from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
import requests
import requests.utils
import urllib3
-# To get version from setup.py for instaloader --version
-import pkg_resources
-try:
- # pylint:disable=no-member
- __version__ = pkg_resources.get_distribution('instaloader').version
-except pkg_resources.DistributionNotFound:
- __version__ = 'Run ./setup.py --version'
+__version__ = '3.0rc0'
+
try:
# pylint:disable=wrong-import-position
@@ -47,40 +47,31 @@ class InstaloaderException(Exception):
pass
-class NonfatalException(InstaloaderException):
- """Base exception for errors which should not cause instaloader to stop"""
+class QueryReturnedNotFoundException(InstaloaderException):
pass
-class ProfileNotExistsException(NonfatalException):
+class ProfileNotExistsException(InstaloaderException):
pass
-class ProfileAccessDeniedException(NonfatalException):
+class ProfileHasNoPicsException(InstaloaderException):
pass
-class ProfileHasNoPicsException(NonfatalException):
+class PrivateProfileNotFollowedException(InstaloaderException):
pass
-class PrivateProfileNotFollowedException(NonfatalException):
+class LoginRequiredException(InstaloaderException):
pass
-class LoginRequiredException(NonfatalException):
+class InvalidArgumentException(InstaloaderException):
pass
-class InvalidArgumentException(NonfatalException):
- pass
-
-
-class BadResponseException(NonfatalException):
- pass
-
-
-class NodeUnavailableException(NonfatalException):
+class BadResponseException(InstaloaderException):
pass
@@ -92,6 +83,10 @@ class ConnectionException(InstaloaderException):
pass
+class TooManyRequests(ConnectionException):
+ pass
+
+
def get_default_session_filename(username: str) -> str:
"""Returns default session filename for given username."""
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
@@ -134,64 +129,386 @@ def format_string_contains_key(format_string: str, key: str) -> bool:
return False
+def filterstr_to_filterfunc(filter_str: str, logged_in: bool) -> Callable[['Post'], bool]:
+ """Takes an --only-if=... filter specification and makes a filter_func Callable out of it."""
+
+ # The filter_str is parsed, then all names occurring in its AST are replaced by loads to post.. A
+ # function Post->bool is returned which evaluates the filter with the post as 'post' in its namespace.
+
+ class TransformFilterAst(ast.NodeTransformer):
+ def visit_Name(self, node: ast.Name):
+ # pylint:disable=invalid-name,no-self-use
+ if not isinstance(node.ctx, ast.Load):
+ raise InvalidArgumentException("Invalid filter: Modifying variables ({}) not allowed.".format(node.id))
+ if not hasattr(Post, node.id):
+ raise InvalidArgumentException("Invalid filter: Name {} is not defined.".format(node.id))
+ if node.id in Post.LOGIN_REQUIRING_PROPERTIES and not logged_in:
+ raise InvalidArgumentException("Invalid filter: Name {} requires being logged in.".format(node.id))
+ new_node = ast.Attribute(ast.copy_location(ast.Name('post', ast.Load()), node), node.id,
+ ast.copy_location(ast.Load(), node))
+ return ast.copy_location(new_node, node)
+
+ input_filename = '<--only-if parameter>'
+ compiled_filter = compile(TransformFilterAst().visit(ast.parse(filter_str, filename=input_filename, mode='eval')),
+ filename=input_filename, mode='eval')
+
+ def filterfunc(post: 'Post') -> bool:
+ # pylint:disable=eval-used
+ return bool(eval(compiled_filter, {'post': post}))
+
+ return filterfunc
+
+
+class Post:
+ """
+ Structure containing information about an Instagram post.
+
+ Created by Instaloader methods get_profile_posts(), get_hashtag_posts(), get_feed_posts(). Posts are linked to
+ an Instaloader instance which is used for error logging and obtaining of additional metadata, if required.
+ This class unifies access to the properties associated with a post. It implements == and is hashable.
+ """
+
+ LOGIN_REQUIRING_PROPERTIES = ["viewer_has_liked"]
+
+ def __init__(self, instaloader: 'Instaloader', node: Dict[str, Any], profile: Optional[str] = None):
+ """Create a Post instance from a node structure as returned by Instagram.
+
+ :param instaloader: Instaloader instance used for additional queries if neccessary.
+ :param node: Node structure.
+ :param profile: The name of the owner, if already known at creation.
+ """
+ self._instaloader = instaloader
+ self._node = node
+ self._profile = profile
+ self._full_metadata_dict = None
+
+ @classmethod
+ def from_shortcode(cls, instaloader: 'Instaloader', shortcode: str):
+ """Create a post object from a given shortcode"""
+ # pylint:disable=protected-access
+ post = cls(instaloader, {'shortcode': shortcode})
+ post._node = post._full_metadata
+ return post
+
+ @classmethod
+ def from_mediaid(cls, instaloader: 'Instaloader', mediaid: int):
+ """Create a post object from a given mediaid"""
+ return cls.from_shortcode(instaloader, mediaid_to_shortcode(mediaid))
+
+ @property
+ def shortcode(self) -> str:
+ return self._node['shortcode'] if 'shortcode' in self._node else self._node['code']
+
+ def __repr__(self):
+ return ''.format(self.shortcode)
+
+ def __eq__(self, o: object) -> bool:
+ if isinstance(o, Post):
+ return self.shortcode == o.shortcode
+ return NotImplemented
+
+ def __hash__(self) -> int:
+ return hash(self.shortcode)
+
+ @property
+ def _full_metadata(self) -> Dict[str, Any]:
+ if not self._full_metadata_dict:
+ pic_json = self._instaloader.get_json("p/{0}/".format(self.shortcode), params={'__a': 1})
+ if "graphql" in pic_json:
+ self._full_metadata_dict = pic_json["graphql"]["shortcode_media"]
+ else:
+ self._full_metadata_dict = pic_json["media"]
+ return self._full_metadata_dict
+
+ def _field(self, *keys) -> Any:
+ """Lookups given fields in _node, and if not found in _full_metadata. Raises KeyError if not found anywhere."""
+ # pylint:disable=invalid-name
+ try:
+ d = self._node
+ for key in keys:
+ d = d[key]
+ return d
+ except KeyError:
+ d = self._full_metadata
+ for key in keys:
+ d = d[key]
+ return d
+
+ @property
+ def owner_username(self) -> str:
+ """The Post's lowercase owner name, or 'UNKNOWN'."""
+ try:
+ if self._profile:
+ return self._profile.lower()
+ return self._field('owner', 'username').lower()
+ except (InstaloaderException, KeyError, TypeError) as err:
+ self._instaloader.error("Get owner name of {}: {} -- using \'UNKNOWN\'.".format(self, err))
+ return 'UNKNOWN'
+
+ @property
+ def date(self) -> datetime:
+ return datetime.fromtimestamp(self._node["date"] if "date" in self._node else self._node["taken_at_timestamp"])
+
+ @property
+ def url(self) -> str:
+ return self._node["display_url"] if "display_url" in self._node else self._node["display_src"]
+
+ @property
+ def typename(self) -> str:
+ """Type of post, GraphImage, GraphVideo or GraphSidecar"""
+ if '__typename' in self._node:
+ return self._node['__typename']
+ # if __typename is not in node, it is an old image or video
+ return 'GraphImage'
+
+ def get_sidecar_edges(self) -> List[Dict[str, Any]]:
+ return self._field('edge_sidecar_to_children', 'edges')
+
+ @property
+ def caption(self) -> Optional[str]:
+ if "edge_media_to_caption" in self._node and self._node["edge_media_to_caption"]["edges"]:
+ return self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]
+ elif "caption" in self._node:
+ return self._node["caption"]
+
+ @property
+ def is_video(self) -> bool:
+ return self._node['is_video']
+
+ @property
+ def video_url(self) -> Optional[str]:
+ if self.is_video:
+ return self._field('video_url')
+
+ @property
+ def viewer_has_liked(self) -> Optional[bool]:
+ """Whether the viewer has liked the post, or None if not logged in."""
+ if not self._instaloader.is_logged_in:
+ return None
+ if 'likes' in self._node and 'viewer_has_liked' in self._node['likes']:
+ return self._node['likes']['viewer_has_liked']
+ return self._field('viewer_has_liked')
+
+ @property
+ def likes(self) -> int:
+ """Likes count"""
+ return self._field('edge_media_preview_like', 'count')
+
+ @property
+ def comments(self) -> int:
+ """Comment count"""
+ return self._field('edge_media_to_comment', 'count')
+
+ def get_comments(self) -> Iterator[Dict[str, Any]]:
+ """Iterate over all comments of the post."""
+ if self.comments == 0:
+ # Avoid doing additional requests if there are no comments
+ return
+ comment_edges = self._field('edge_media_to_comment', 'edges')
+ if self.comments == len(comment_edges):
+ # If the Post's metadata already contains all comments, don't do GraphQL requests to obtain them
+ yield from (comment['node'] for comment in comment_edges)
+ yield from self._instaloader.graphql_node_list(17852405266163336, {'shortcode': self.shortcode},
+ 'https://www.instagram.com/p/' + self.shortcode + '/',
+ lambda d: d['data']['shortcode_media']['edge_media_to_comment'])
+
+ def get_location(self) -> Optional[Dict[str, str]]:
+ """If the Post has a location, returns a dictionary with fields 'lat' and 'lng'."""
+ loc_dict = self._field("location")
+ if loc_dict is not None:
+ location_json = self._instaloader.get_json("explore/locations/{0}/".format(loc_dict["id"]),
+ params={'__a': 1})
+ return location_json["location"]
+
+
+class Tristate(Enum):
+ """Tri-state to encode whether we should save certain information, i.e. videos, captions, comments or geotags.
+
+ never: Do not save, even if the information is available without any additional request,
+ no_extra_query: Save if and only if available without doing additional queries,
+ always: Save (and query, if neccessary).
+ """
+ never = 0
+ no_extra_query = 1
+ always = 2
+
+
class Instaloader:
def __init__(self,
- sleep: bool = True, quiet: bool = False, shorter_output: bool = False,
+ sleep: bool = True, quiet: bool = False,
user_agent: Optional[str] = None,
dirname_pattern: Optional[str] = None,
- filename_pattern: Optional[str] = None):
+ filename_pattern: Optional[str] = None,
+ download_videos: Tristate = Tristate.always,
+ download_geotags: Tristate = Tristate.no_extra_query,
+ download_captions: Tristate = Tristate.no_extra_query,
+ download_comments: Tristate = Tristate.no_extra_query):
+
+ # configuration parameters
self.user_agent = user_agent if user_agent is not None else default_user_agent()
- self.session = self.get_anonymous_session()
+ self.session = self._get_anonymous_session()
self.username = None
self.sleep = sleep
self.quiet = quiet
- self.shorter_output = shorter_output
self.dirname_pattern = dirname_pattern if dirname_pattern is not None else '{target}'
self.filename_pattern = filename_pattern.replace('{date}', '{date:%Y-%m-%d_%H-%M-%S}') \
if filename_pattern is not None else '{date:%Y-%m-%d_%H-%M-%S}'
+ self.download_videos = download_videos
+ self.download_geotags = download_geotags
+ self.download_captions = download_captions
+ self.download_comments = download_comments
self.previous_queries = dict()
+ # error log, filled with error() and printed at the end of Instaloader.main()
+ self.error_log = []
+
+ # For the adaption of sleep intervals (rate control)
+ self.request_count = 0
+ self.last_request_time = 0
+
+ @property
+ def is_logged_in(self) -> bool:
+ return bool(self.username)
+
+ @contextmanager
+ def anonymous_copy(self):
+ """Yield an anonymous, otherwise equally-configured copy of an Instaloader instance; Then copy its error log."""
+ new_loader = Instaloader(self.sleep, self.quiet, self.user_agent,
+ self.dirname_pattern, self.filename_pattern,
+ self.download_videos, self.download_geotags,
+ self.download_captions, self.download_comments)
+ new_loader.previous_queries = self.previous_queries
+ yield new_loader
+ self.error_log.extend(new_loader.error_log)
+ self.previous_queries = new_loader.previous_queries
+
def _log(self, *msg, sep='', end='\n', flush=False):
+ """Log a message to stdout that can be suppressed with --quiet."""
if not self.quiet:
print(*msg, sep=sep, end=end, flush=flush)
+ def error(self, msg):
+ """Log a non-fatal error message to stderr, which is repeated at program termination."""
+ print(msg, file=sys.stderr)
+ self.error_log.append(msg)
+
+ @contextmanager
+ def _error_catcher(self, extra_info: Optional[str] = None):
+ """
+ Context manager to catch, print and record InstaloaderExceptions.
+
+ :param extra_info: String to prefix error message with."""
+ try:
+ yield
+ except InstaloaderException as err:
+ if extra_info:
+ self.error('{}: {}'.format(extra_info, err))
+ else:
+ self.error('{}'.format(err))
+
def _sleep(self):
- """Sleep a short, random time if self.sleep is set. Called before each request to the instagram.com."""
+ """Sleep a short time if self.sleep is set. Called before each request to instagram.com."""
if self.sleep:
- time.sleep(random.uniform(0.25, 2.0))
+ time.sleep(random.uniform(0.5, 3))
def _get_and_write_raw(self, url: str, filename: str, tries: int = 3) -> None:
+ """Downloads raw data.
+
+ :raises QueryReturnedNotFoundException: When the server responds with a 404.
+ :raises ConnectionException: When download repeatedly failed."""
try:
- resp = self.get_anonymous_session().get(url, stream=True)
+ resp = self._get_anonymous_session().get(url, stream=True)
if resp.status_code == 200:
self._log(filename, end=' ', flush=True)
with open(filename, 'wb') as file:
resp.raw.decode_content = True
shutil.copyfileobj(resp.raw, file)
else:
- raise ConnectionException("Request returned HTTP error code {}.".format(resp.status_code))
+ if resp.status_code == 404:
+ # 404 not worth retrying.
+ raise QueryReturnedNotFoundException("404 when accessing {}.".format(url))
+ raise ConnectionException("HTTP error code {}.".format(resp.status_code))
except (urllib3.exceptions.HTTPError, requests.exceptions.RequestException, ConnectionException) as err:
- print("URL: {}\n{}".format(url, err), file=sys.stderr)
+ error_string = "URL {}: {}".format(url, err)
if tries <= 1:
- raise NodeUnavailableException
+ raise ConnectionException(error_string)
+ else:
+ self.error(error_string + " [retrying]")
self._sleep()
self._get_and_write_raw(url, filename, tries - 1)
- def get_json(self, name: str, session: requests.Session = None,
- max_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
- """Return JSON of a profile"""
- if session is None:
- session = self.session
- self._sleep()
- if not max_id:
- resp = session.get('https://www.instagram.com/' + name)
- else:
- resp = session.get('https://www.instagram.com/' + name, params={'max_id': max_id})
- match = re.search('window\\._sharedData = .*<', resp.text)
- if match is not None:
- return json.loads(match.group(0)[21:-2])
+ def get_json(self, url: str, params: Dict[str, Any],
+ session: Optional[requests.Session] = None, tries: int = 3) -> Dict[str, Any]:
+ """JSON request to Instagram.
- def default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]:
+ :param url: URL, relative to https://www.instagram.com/
+ :param params: GET parameters
+ :param session: Session to use, or None to use self.session
+ :param tries: Maximum number of attempts until an exception is raised
+ :return: Decoded response dictionary
+ :raises QueryReturnedNotFoundException: When the server responds with a 404.
+ :raises ConnectionException: When query repeatedly failed.
+ """
+ def graphql_query_waittime(query_id: int, untracked_queries: bool = False) -> int:
+ sliding_window = 660
+ timestamps = self.previous_queries.get(query_id)
+ if not timestamps:
+ return sliding_window if untracked_queries else 0
+ current_time = time.monotonic()
+ timestamps = list(filter(lambda t: t > current_time - sliding_window, timestamps))
+ self.previous_queries[query_id] = timestamps
+ if len(timestamps) < 100 and not untracked_queries:
+ return 0
+ return round(min(timestamps) + sliding_window - current_time) + 6
+ is_graphql_query = 'query_id' in params and 'graphql/query' in url
+ if is_graphql_query:
+ query_id = params['query_id']
+ waittime = graphql_query_waittime(query_id)
+ if waittime > 0:
+ self._log('\nToo many queries in the last time. Need to wait {} seconds.'.format(waittime))
+ time.sleep(waittime)
+ timestamp_list = self.previous_queries.get(query_id)
+ if timestamp_list is not None:
+ timestamp_list.append(time.monotonic())
+ else:
+ self.previous_queries[query_id] = [time.monotonic()]
+ sess = session if session else self.session
+ try:
+ self._sleep()
+ resp = sess.get('https://www.instagram.com/' + url, params=params)
+ if resp.status_code == 404:
+ raise QueryReturnedNotFoundException("404")
+ if resp.status_code == 429:
+ raise TooManyRequests("429 - Too Many Requests")
+ if resp.status_code != 200:
+ raise ConnectionException("HTTP error code {}.".format(resp.status_code))
+ resp_json = resp.json()
+ if 'status' in resp_json and resp_json['status'] != "ok":
+ if 'message' in resp_json:
+ raise ConnectionException("Returned \"{}\" status, message \"{}\".".format(resp_json['status'],
+ resp_json['message']))
+ else:
+ raise ConnectionException("Returned \"{}\" status.".format(resp_json['status']))
+ return resp_json
+ except (ConnectionException, json.decoder.JSONDecodeError, requests.exceptions.RequestException) as err:
+ error_string = "JSON Query to {}: {}".format(url, err)
+ if tries <= 1:
+ raise ConnectionException(error_string)
+ self.error(error_string + " [retrying]")
+ if isinstance(err, TooManyRequests):
+ text_for_429 = ("HTTP error code 429 was returned because too many queries occured in the last time. "
+ "Please do not use Instagram in your browser or run multiple instances of Instaloader "
+ "in parallel.")
+ print(textwrap.fill(text_for_429), file=sys.stderr)
+ if is_graphql_query:
+ waittime = graphql_query_waittime(query_id=params['query_id'], untracked_queries=True)
+ if waittime > 0:
+ self._log('The request will be retried in {} seconds.'.format(waittime))
+ time.sleep(waittime)
+ self._sleep()
+ return self.get_json(url, params, sess, tries - 1)
+
+ def _default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]:
"""Returns default HTTP header we use for requests."""
header = {'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.8',
@@ -211,86 +528,40 @@ class Instaloader:
del header['X-Requested-With']
return header
- def get_anonymous_session(self) -> requests.Session:
+ def _get_anonymous_session(self) -> requests.Session:
"""Returns our default anonymous requests.Session object."""
session = requests.Session()
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
- session.headers.update(self.default_http_header(empty_session_only=True))
+ session.headers.update(self._default_http_header(empty_session_only=True))
return session
def graphql_query(self, query_id: int, variables: Dict[str, Any],
- referer: Optional[str] = None, tries: int = 3) -> Dict[str, Any]:
+ referer: Optional[str] = None) -> Dict[str, Any]:
"""
Do a GraphQL Query.
:param query_id: Query ID.
:param variables: Variables for the Query.
:param referer: HTTP Referer, or None.
- :param tries: Number of total tries. Retries if a RequestException occurs.
:return: The server's response dictionary.
"""
tmpsession = copy_session(self.session)
- tmpsession.headers.update(self.default_http_header(empty_session_only=True))
+ tmpsession.headers.update(self._default_http_header(empty_session_only=True))
del tmpsession.headers['Connection']
del tmpsession.headers['Content-Length']
tmpsession.headers['authority'] = 'www.instagram.com'
tmpsession.headers['scheme'] = 'https'
tmpsession.headers['accept'] = '*/*'
if referer is not None:
- tmpsession.headers['referer'] = referer
- sliding_window = 660
- def graphql_query_waittime(query_id: int) -> int:
- timestamp_list = self.previous_queries.get(query_id)
- if not timestamp_list:
- return 0
- current_time = datetime.now().timestamp()
- timestamp_list = list(filter(lambda t: t > current_time - sliding_window, timestamp_list))
- self.previous_queries[query_id] = timestamp_list
- if len(timestamp_list) < 100:
- return 0
- return round(min(timestamp_list) + sliding_window - current_time) + 6
- waittime = graphql_query_waittime(query_id)
- if waittime > 0:
- self._log('\nToo many queries in the last time. Need to wait {} seconds.'.format(waittime), flush=True)
- time.sleep(waittime)
- self._sleep()
- try:
- response = tmpsession.get('https://www.instagram.com/graphql/query',
- params={'query_id': query_id,
- 'variables': json.dumps(variables, separators=(',', ':'))})
- except requests.exceptions.RequestException as err:
- print(err, file=sys.stderr)
- if tries <= 1:
- raise ConnectionException('GraphQL query {} failed multiple times with following variables:\n{}'
- .format(query_id, json.dumps(variables, indent=4)))
- self._sleep()
- return self.graphql_query(query_id, variables, referer, tries - 1)
- else:
- if response.status_code != 200:
- if response.status_code == 429:
- print('GraphQL query returned HTTP error code 429 because too many queries occured '
- 'in the last time.', file=sys.stderr)
- waittime = round(min(self.previous_queries.get(query_id))
- + sliding_window - datetime.now().timestamp()) + 6 \
- if self.previous_queries.get(query_id) else sliding_window
- if waittime > 0:
- sys.stderr.flush()
- self._log('Please do not use Instagram in your browser or run multiple instances '
- 'of Instaloader in parallel while using options that require GraphQL '
- 'queries.\nNeed to wait {} seconds.'.format(waittime), flush=True)
- time.sleep(waittime)
- return self.graphql_query(query_id, variables, referer, tries)
- else:
- raise ConnectionException("GraphQL query returned HTTP error code {}.".format(response.status_code))
- return response.json()
- finally:
- timestamp_list = self.previous_queries.get(query_id)
- if timestamp_list:
- self.previous_queries.get(query_id).append(datetime.now().timestamp())
- else:
- self.previous_queries[query_id] = [datetime.now().timestamp()]
+ tmpsession.headers['referer'] = urllib.parse.quote(referer)
+ resp_json = self.get_json('graphql/query', params={'query_id': query_id,
+ 'variables': json.dumps(variables, separators=(',', ':'))},
+ session=tmpsession)
+ if 'status' not in resp_json:
+ self.error("GraphQL response did not contain a \"status\" field.")
+ return resp_json
def get_username_by_id(self, profile_id: int) -> str:
"""To get the current username of a profile, given its unique ID, this function can be used."""
@@ -306,101 +577,49 @@ class Instaloader:
else:
raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").")
else:
- shortcode = mediaid_to_shortcode(int(data['edges'][0]["node"]["id"]))
- data = self.get_json("p/" + shortcode)
- return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['owner']['username']
+ return Post.from_mediaid(self, int(data['edges'][0]["node"]["id"])).owner_username
def get_id_by_username(self, profile: str) -> int:
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
his/her username. To get said ID, given the profile's name, you may call this function."""
- data = self.get_json(profile, session=self.get_anonymous_session())
- if "ProfilePage" not in data["entry_data"]:
- raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
- return int(data['entry_data']['ProfilePage'][0]['user']['id'])
+ return int(self.get_profile_metadata(profile)['user']['id'])
- def get_followers(self, profile: str) -> List[Dict[str, Any]]:
+ def graphql_node_list(self, query_id: int, query_variables: Dict[str, Any], query_referer: Optional[str],
+ edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
+ query_variables['first'] = 500
+ data = self.graphql_query(query_id, query_variables, query_referer)
+ while True:
+ edge_struct = edge_extractor(data)
+ yield from [edge['node'] for edge in edge_struct['edges']]
+ if edge_struct['page_info']['has_next_page']:
+ query_variables['after'] = edge_struct['page_info']['end_cursor']
+ data = self.graphql_query(query_id, query_variables, query_referer)
+ else:
+ break
+
+ def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followers of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
- :return: List of followers (list of dictionaries).
"""
- profile_id = self.get_id_by_username(profile)
- data = self.graphql_query(17851374694183129, {'id': str(profile_id),
- 'first': 500},
- referer='https://www.instagram.com/' + profile + '/')
- followers = []
- while True:
- edge_followed_by = data['data']['user']['edge_followed_by']
- followers.extend([follower['node'] for follower in edge_followed_by['edges']])
- page_info = edge_followed_by['page_info']
- if page_info['has_next_page']:
- data = self.graphql_query(17851374694183129, {'id': str(profile_id),
- 'first': 500,
- 'after': page_info['end_cursor']},
- referer='https://www.instagram.com/' + profile + '/')
- else:
- break
- return followers
+ yield from self.graphql_node_list(17851374694183129, {'id': str(self.get_id_by_username(profile))},
+ 'https://www.instagram.com/' + profile + '/',
+ lambda d: d['data']['user']['edge_followed_by'])
- def get_followees(self, profile: str) -> List[Dict[str, Any]]:
+ def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followees (followings) of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
- :return: List of followees (list of dictionaries).
"""
- profile_id = self.get_id_by_username(profile)
- data = self.graphql_query(17874545323001329, {'id': profile_id,
- 'first': 500},
- referer='https://www.instagram.com/' + profile + '/')
- followees = []
- while True:
- edge_follow = data['data']['user']['edge_follow']
- followees.extend([followee['node'] for followee in edge_follow['edges']])
- page_info = edge_follow['page_info']
- if page_info['has_next_page']:
- data = self.graphql_query(17874545323001329, {'id': profile_id,
- 'first': 500,
- 'after': page_info['end_cursor']},
- referer='https://www.instagram.com/' + profile + '/')
- else:
- break
- return followees
-
- def get_comments(self, shortcode: str, tries: int = 3) -> List[Dict[str, Any]]:
- """Retrieve comments of node with given shortcode."""
- data = self.get_node_metadata(shortcode)
- if data['edge_media_to_comment']['count'] == len(data['edge_media_to_comment']['edges']):
- return [comment['node'] for comment in data['edge_media_to_comment']['edges']]
- data = self.graphql_query(17852405266163336, {'shortcode': shortcode,
- 'first': 500},
- referer='https://www.instagram.com/p/' + shortcode + '/')
- comments = []
- while True:
- try:
- edge_media_to_comment = data['data']['shortcode_media']['edge_media_to_comment']
- except KeyError as err:
- print('Missing key {} in response of GraphQL query for retrieving comments for node \'{}\'.'
- .format(err, shortcode), file=sys.stderr)
- if tries <= 1:
- raise BadResponseException('GraphQL query returned strange JSON for shortcode {}:\n{}'.format(shortcode, data))
- self._sleep()
- return self.get_comments(shortcode, tries - 1)
- comments.extend([comment['node'] for comment in edge_media_to_comment['edges']])
- page_info = edge_media_to_comment['page_info']
- if page_info['has_next_page']:
- data = self.graphql_query(17852405266163336, {'shortcode': shortcode,
- 'first': 500,
- 'after': page_info['end_cursor']},
- referer='https://www.instagram.com/p/' + shortcode + '/')
- else:
- break
- return comments
+ yield from self.graphql_node_list(17874545323001329, {'id': str(self.get_id_by_username(profile))},
+ 'https://www.instagram.com/' + profile + '/',
+ lambda d: d['data']['user']['edge_follow'])
def download_pic(self, filename: str, url: str, mtime: datetime,
filename_suffix: Optional[str] = None) -> bool:
@@ -418,13 +637,13 @@ class Instaloader:
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
return True
- def update_comments(self, filename: str, shortcode: str) -> None:
+ def update_comments(self, filename: str, post: Post) -> None:
filename += '_comments.json'
try:
comments = json.load(open(filename))
except FileNotFoundError:
comments = list()
- comments.extend(self.get_comments(shortcode))
+ comments.extend(post.get_comments())
if comments:
with open(filename, 'w') as file:
comments_list = sorted(sorted(list(comments), key=lambda t: t['id']),
@@ -446,11 +665,8 @@ class Instaloader:
filename += '.txt'
pcaption = caption.replace('\n', ' ').strip()
caption = caption.encode("UTF-8")
- if self.shorter_output:
- pcaption = "txt"
- else:
- pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']'
- try:
+ pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']'
+ with suppress(FileNotFoundError):
with open(filename, 'rb') as file:
file_caption = file.read()
if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'):
@@ -473,8 +689,6 @@ class Instaloader:
self._log(pcaption + ' updated', end=' ', flush=True)
except UnicodeEncodeError:
self._log('txt updated', end=' ', flush=True)
- except FileNotFoundError:
- pass
try:
self._log(pcaption, end=' ', flush=True)
except UnicodeEncodeError:
@@ -499,7 +713,7 @@ class Instaloader:
def _epoch_to_string(epoch: datetime) -> str:
return epoch.strftime('%Y-%m-%d_%H-%M-%S')
- date_object = datetime.strptime(requests.head(url).headers["Last-Modified"],
+ date_object = datetime.strptime(self._get_anonymous_session().head(url).headers["Last-Modified"],
'%a, %d %b %Y %H:%M:%S GMT')
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
@@ -512,14 +726,10 @@ class Instaloader:
if os.path.isfile(filename):
self._log(filename + ' already exists')
return None
- match = re.search('http.*://.*instagram.*[^/]*\\.(com|net)/[^/]+/.', url)
- if match is None:
- raise ConnectionException("URL \'" + url + "\' could not be processed.")
- index = len(match.group(0)) - 1
- offset = 8 if match.group(0)[-1:] == 's' else 0
- url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index + offset:]
+ url = re.sub(r'/s([1-9][0-9]{2})x\1/', '/s2048x2048/', url)
self._get_and_write_raw(url, filename)
os.utime(filename, (datetime.now().timestamp(), date_object.timestamp()))
+ self._log('') # log output of _get_and_write_raw() does not produce \n
def save_session_to_file(self, filename: Optional[str] = None) -> None:
"""Saves requests.Session object."""
@@ -546,20 +756,17 @@ class Instaloader:
with open(filename, 'rb') as sessionfile:
session = requests.Session()
session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile))
- session.headers.update(self.default_http_header())
+ session.headers.update(self._default_http_header())
session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']})
self._log("Loaded session from %s." % filename)
self.session = session
self.username = username
- def test_login(self, session: requests.Session) -> Optional[str]:
+ def test_login(self, session: Optional[requests.Session]) -> Optional[str]:
"""Returns the Instagram username to which given requests.Session object belongs, or None."""
- if self.session is None:
- return
- data = self.get_json(str(), session=session)
- if data['config']['viewer'] is None:
- return
- return data['config']['viewer']['username']
+ if session:
+ data = self.get_json('', params={'__a': 1}, session=session)
+ return data['graphql']['user']['username'] if 'graphql' in data else None
def login(self, user: str, passwd: str) -> None:
"""Log in to instagram with given username and password and internally store session object"""
@@ -567,7 +774,7 @@ class Instaloader:
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
- session.headers.update(self.default_http_header())
+ session.headers.update(self._default_http_header())
self._sleep()
resp = session.get('https://www.instagram.com/')
session.headers.update({'X-CSRFToken': resp.cookies['csrftoken']})
@@ -584,150 +791,73 @@ class Instaloader:
else:
raise ConnectionException('Login error! Connection error!')
- def get_feed_json(self, end_cursor: str = None) -> Dict[str, Any]:
+ def download_post(self, post: Post, target: str) -> bool:
"""
- Get JSON of the user's feed.
+ Download everything associated with one instagram post node, i.e. picture, caption and video.
- :param end_cursor: The end cursor, as from json["feed"]["media"]["page_info"]["end_cursor"]
- :return: JSON
- """
- if end_cursor is None:
- return self.get_json(str())["entry_data"]["FeedPage"][0]
- return self.graphql_query(17863003771166879, {'fetch_media_item_count': 12,
- 'fetch_media_item_cursor': end_cursor,
- 'fetch_comment_count': 4,
- 'fetch_like': 10})
-
- def get_node_metadata(self, node_code: str, tries: int = 3) -> Dict[str, Any]:
- pic_json = self.get_json("p/" + node_code)
- try:
- media = pic_json["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] \
- if "graphql" in pic_json["entry_data"]["PostPage"][0] \
- else pic_json["entry_data"]["PostPage"][0]["media"]
- except (KeyError, TypeError) as err:
- print(err, file=sys.stderr)
- print(json.dumps(pic_json, indent=4), file=sys.stderr)
- if tries <= 1:
- raise NodeUnavailableException
- self._sleep()
- media = self.get_node_metadata(node_code, tries - 1)
- return media
-
- def get_location(self, node_code: str) -> Dict[str, str]:
- try:
- media = self.get_node_metadata(node_code)
- except NodeUnavailableException:
- print("Unable to lookup location for node \"https://www.instagram.com/p/{}/\".".format(node_code),
- file=sys.stderr)
- return dict()
- if media["location"] is not None:
- location_json = self.get_json("explore/locations/" +
- media["location"]["id"])
- return location_json["entry_data"]["LocationsPage"][0]["location"]
-
- def download_node(self, node: Dict[str, Any], profile: Optional[str], target: str,
- download_videos: bool = True, geotags: bool = False, download_comments: bool = False) -> bool:
- """
- Download everything associated with one instagram node, i.e. picture, caption and video.
-
- :param node: Node, as from media->nodes list in instagram's JSONs
- :param profile: Name of profile to which this node belongs
+ :param post: Post to download.
:param target: Target name, i.e. profile name, #hashtag, :feed; for filename.
- :param download_videos: True, if videos should be downloaded
- :param geotags: Download geotags
- :param download_comments: Update comments
:return: True if something was downloaded, False otherwise, i.e. file was already there
"""
- already_has_profilename = profile is not None or ('owner' in node and 'username' in node['owner'])
+
+ # Format dirname and filename. post.owner_username might do an additional request, so only access it, if
+ # {profile} is part of the dirname pattern or filename pattern.
needs_profilename = (format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.filename_pattern, 'profile'))
- shortcode = node['shortcode'] if 'shortcode' in node else node['code']
- if needs_profilename:
- if already_has_profilename:
- profilename = profile if profile is not None else node['owner']['username']
- else:
- try:
- metadata = self.get_node_metadata(shortcode)
- profilename = metadata['owner']['username']
- except NodeUnavailableException:
- print("Unable to gather profilename for node "
- "\"https://www.instagram.com/p/{}/\".".format(shortcode), file=sys.stderr)
- profilename = 'UNKNOWN'
- else:
- profilename = None
- profilename = profilename.lower() if profilename else None
- date = datetime.fromtimestamp(node["date"] if "date" in node else node["taken_at_timestamp"])
+ profilename = post.owner_username if needs_profilename else None
dirname = self.dirname_pattern.format(profile=profilename, target=target.lower())
filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(),
- date=date,
- shortcode=shortcode)
+ date=post.date, shortcode=post.shortcode)
os.makedirs(os.path.dirname(filename), exist_ok=True)
- if '__typename' in node:
- if node['__typename'] == 'GraphSidecar':
- self._sleep()
- sidecar_data = self.session.get('https://www.instagram.com/p/' + shortcode + '/',
- params={'__a': 1}).json()
- edge_number = 1
- downloaded = True
- media = sidecar_data["graphql"]["shortcode_media"] if "graphql" in sidecar_data else sidecar_data[
- "media"]
- for edge in media['edge_sidecar_to_children']['edges']:
- edge_downloaded = self.download_pic(filename=filename,
- url=edge['node']['display_url'],
- mtime=date,
- filename_suffix=str(edge_number))
- downloaded = downloaded and edge_downloaded
- edge_number += 1
- elif node['__typename'] in ['GraphImage', 'GraphVideo']:
- url = node["display_url"] if "display_url" in node else node["display_src"]
- downloaded = self.download_pic(filename=filename,
- url=url,
- mtime=date)
+
+ # Download the image(s) / video thumbnail
+ if post.typename == 'GraphSidecar':
+ edge_number = 1
+ downloaded = True
+ for edge in post.get_sidecar_edges():
+ edge_downloaded = self.download_pic(filename=filename,
+ url=edge['node']['display_url'],
+ mtime=post.date,
+ filename_suffix=str(edge_number))
+ downloaded = downloaded and edge_downloaded
+ edge_number += 1
+ elif post.typename in ['GraphImage', 'GraphVideo']:
+ downloaded = self.download_pic(filename=filename, url=post.url, mtime=post.date)
+ else:
+ self.error("Warning: {0} has unknown typename: {1}".format(post, post.typename))
+ downloaded = False
+
+ # Save caption if desired
+ if self.download_captions is not Tristate.never:
+ if post.caption:
+ self.save_caption(filename, post.date, post.caption)
else:
- self._log("Warning: Unknown typename discovered:" + node['__typename'])
- downloaded = False
- else:
- # Node is an old image or video.
- downloaded = self.download_pic(filename=filename, url=node["display_src"], mtime=date)
- if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]:
- self.save_caption(filename, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"])
- elif "caption" in node:
- self.save_caption(filename, date, node["caption"])
- else:
- self._log("", end=' ', flush=True)
- if node["is_video"] and download_videos:
- video_data = self.get_json('p/' + shortcode)
- self.download_pic(filename=filename,
- url=video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'],
- mtime=date)
- if geotags:
- location = self.get_location(shortcode)
+ self._log("", end=' ', flush=True)
+
+ # Download video if desired
+ if post.is_video and self.download_videos is Tristate.always:
+ self.download_pic(filename=filename, url=post.video_url, mtime=post.date)
+
+ # Download geotags if desired
+ if self.download_geotags is Tristate.always:
+ location = post.get_location()
if location:
- self.save_location(filename, location, date)
- if download_comments:
- self.update_comments(filename, shortcode)
+ self.save_location(filename, location, post.date)
+
+ # Update comments if desired
+ if self.download_comments is Tristate.always:
+ self.update_comments(filename, post)
+
self._log()
return downloaded
- def download_stories(self,
- userids: Optional[List[int]] = None,
- download_videos: bool = True,
- fast_update: bool = False,
- filename_target: str = ':stories') -> None:
- """
- Download available stories from user followees or all stories of users whose ID are given.
+ def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Dict[str, Any]]:
+ """Get available stories from followees or all stories of users whose ID are given.
Does not mark stories as seen.
To use this, one needs to be logged in
- :param userids: List of user IDs to be processed in terms of downloading their stories
- :param download_videos: True, if videos should be downloaded
- :param fast_update: If true, abort when first already-downloaded picture is encountered
- :param filename_target: Replacement for {target} in dirname_pattern and filename_pattern
+ :param userids: List of user IDs to be processed in terms of downloading their stories, or None.
"""
-
- if self.username is None:
- raise LoginRequiredException('Login required to download stories')
-
tempsession = copy_session(self.session)
header = tempsession.headers
header['User-Agent'] = 'Instagram 10.3.2 (iPhone7,2; iPhone OS 9_3_3; en_US; en-US; scale=2.00; 750x1334) ' \
@@ -737,53 +867,67 @@ class Instaloader:
del header['X-Instagram-AJAX']
del header['X-Requested-With']
- def _user_stories():
- def _get(url):
- self._sleep()
- resp = tempsession.get(url)
- if resp.status_code != 200:
- raise ConnectionException('Failed to fetch stories.')
- return json.loads(resp.text)
- url_reel_media = 'https://i.instagram.com/api/v1/feed/user/{0}/reel_media/'
- url_reels_tray = 'https://i.instagram.com/api/v1/feed/reels_tray/'
- if userids is not None:
- for userid in userids:
- yield _get(url_reel_media.format(userid))
- else:
- data = _get(url_reels_tray)
- if not 'tray' in data:
- raise BadResponseException('Bad story reel JSON.')
- for user in data["tray"]:
- yield user if "items" in user else _get(url_reel_media.format(user['user']['pk']))
+ def _get(url):
+ self._sleep()
+ resp = tempsession.get(url)
+ if resp.status_code != 200:
+ raise ConnectionException('Failed to fetch stories.')
+ return json.loads(resp.text)
- for user_stories in _user_stories():
+ url_reel_media = 'https://i.instagram.com/api/v1/feed/user/{0}/reel_media/'
+ url_reels_tray = 'https://i.instagram.com/api/v1/feed/reels_tray/'
+ if userids is not None:
+ for userid in userids:
+ yield _get(url_reel_media.format(userid))
+ else:
+ data = _get(url_reels_tray)
+ if 'tray' not in data:
+ raise BadResponseException('Bad story reel JSON.')
+ for user in data["tray"]:
+ yield user if "items" in user else _get(url_reel_media.format(user['user']['pk']))
+
+ def download_stories(self,
+ userids: Optional[List[int]] = None,
+ fast_update: bool = False,
+ filename_target: str = ':stories') -> None:
+ """
+ Download available stories from user followees or all stories of users whose ID are given.
+ Does not mark stories as seen.
+ To use this, one needs to be logged in
+
+ :param userids: List of user IDs to be processed in terms of downloading their stories
+ :param fast_update: If true, abort when first already-downloaded picture is encountered
+ :param filename_target: Replacement for {target} in dirname_pattern and filename_pattern
+ """
+
+ if not self.is_logged_in:
+ raise LoginRequiredException('Login required to download stories')
+
+ for user_stories in self.get_stories(userids):
if "items" not in user_stories:
- continue
+ raise BadResponseException('Bad reel media JSON.')
name = user_stories["user"]["username"].lower()
self._log("Retrieving stories from profile {}.".format(name))
- totalcount = len(user_stories["items"]) if "items" in user_stories else 0
+ totalcount = len(user_stories["items"])
count = 1
for item in user_stories["items"]:
self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
count += 1
- self._sleep()
shortcode = item["code"] if "code" in item else "no_code"
date_float = item["device_timestamp"] if "device_timestamp" in item else item["taken_at"]
- if date_float < 10000000000:
- date = datetime.fromtimestamp(date_float)
- else:
- # device_timestamp seems to sometime be in milliseconds
+ if date_float > 10000000000:
+ # device_timestamp seems to sometimes be in milliseconds
date_float /= 1000
- date = datetime.fromtimestamp(date_float)
+ date = datetime.fromtimestamp(date_float)
dirname = self.dirname_pattern.format(profile=name, target=filename_target)
filename = dirname + '/' + self.filename_pattern.format(profile=name, target=filename_target,
date=date,
shortcode=shortcode)
os.makedirs(os.path.dirname(filename), exist_ok=True)
- try:
+ with self._error_catcher('Download story {} from user {}'.format(shortcode, name)):
if "image_versions2" in item:
url = item["image_versions2"]["candidates"][0]["url"]
downloaded = self.download_pic(filename=filename,
@@ -792,56 +936,31 @@ class Instaloader:
else:
self._log("Warning: Unable to find story image.")
downloaded = False
- if "caption" in item and item["caption"] is not None:
+ if "caption" in item and item["caption"] is not None and \
+ self.download_captions is not Tristate.never:
caption = item["caption"]
if isinstance(caption, dict) and "text" in caption:
caption = caption["text"]
self.save_caption(filename, date, caption)
else:
self._log("", end=' ', flush=True)
- if "video_versions" in item and download_videos:
+ if "video_versions" in item and self.download_videos is Tristate.always:
downloaded = self.download_pic(filename=filename,
url=item["video_versions"][0]["url"],
mtime=date)
- if "video_duration" in item and self.sleep and downloaded:
- time.sleep(item["video_duration"])
- except NodeUnavailableException:
- print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {} from stories."
- .format(shortcode, name), file=sys.stderr)
- continue
- if item["story_locations"]:
- location = item["story_locations"][0]["location"]
- if location:
- self.save_location(filename, location, date)
- self._log()
- if fast_update and not downloaded:
- break
+ if item["story_locations"] and self.download_geotags is not Tristate.never:
+ location = item["story_locations"][0]["location"]
+ if location:
+ self.save_location(filename, location, date)
+ self._log()
+ if fast_update and not downloaded:
+ break
- def download_feed_pics(self, max_count: int = None, fast_update: bool = False,
- filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None,
- download_videos: bool = True, geotags: bool = False,
- download_comments: bool = False) -> None:
- """
- Download pictures from the user's feed.
+ def get_feed_posts(self) -> Iterator[Post]:
+ """Get Posts of the user's feed."""
- Example to download up to the 20 pics the user last liked:
- >>> loader = Instaloader()
- >>> loader.load_session_from_file('USER')
- >>> loader.download_feed_pics(max_count=20, fast_update=True,
- >>> filter_func=lambda node:
- >>> not node["likes"]["viewer_has_liked"]
- >>> if "likes" in node else
- >>> not node["viewer_has_liked"])
+ data = self.get_json('', params={'__a': 1})
- :param max_count: Maximum count of pictures to download
- :param fast_update: If true, abort when first already-downloaded picture is encountered
- :param filter_func: function(node), which returns True if given picture should not be downloaded
- :param download_videos: True, if videos should be downloaded
- :param geotags: Download geotags
- :param download_comments: Update comments
- """
- data = self.get_feed_json()
- count = 1
while True:
if "graphql" in data:
is_edge = True
@@ -852,40 +971,60 @@ class Instaloader:
else:
is_edge = False
feed = data["feed"]["media"]
- for edge_or_node in feed["edges"] if is_edge else feed["nodes"]:
- if max_count is not None and count > max_count:
- return
- node = edge_or_node["node"] if is_edge else edge_or_node
- name = node["owner"]["username"]
- if filter_func is not None and filter_func(node):
- self._log("" % name, flush=True)
- continue
- self._log("[%3i] %s " % (count, name), end="", flush=True)
- count += 1
- try:
- downloaded = self.download_node(node, profile=name, target=':feed',
- download_videos=download_videos, geotags=geotags,
- download_comments=download_comments)
- except NodeUnavailableException:
- print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {} from feed."
- .format(node['shortcode'] if 'shortcode' in node else node['code'], name), file=sys.stderr)
- continue
- if fast_update and not downloaded:
- return
+
+ if is_edge:
+ yield from (Post(self, edge["node"]) for edge in feed["edges"])
+ else:
+ yield from (Post(self, node) for node in feed["nodes"])
+
if not feed["page_info"]["has_next_page"]:
break
- data = self.get_feed_json(end_cursor=feed["page_info"]["end_cursor"])
+ data = self.graphql_query(17863003771166879, {'fetch_media_item_count': 12,
+ 'fetch_media_item_cursor': feed["page_info"]["end_cursor"],
+ 'fetch_comment_count': 4,
+ 'fetch_like': 10})
- def get_hashtag_json(self, hashtag: str,
- max_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
- """Return JSON of a #hashtag"""
- return self.get_json(name='explore/tags/{0}/'.format(hashtag), max_id=max_id)
+ def download_feed_posts(self, max_count: int = None, fast_update: bool = False,
+ filter_func: Optional[Callable[[Post], bool]] = None) -> None:
+ """
+ Download pictures from the user's feed.
+
+ Example to download up to the 20 pics the user last liked:
+ >>> loader = Instaloader()
+ >>> loader.load_session_from_file('USER')
+ >>> loader.download_feed_posts(max_count=20, fast_update=True,
+ >>> filter_func=lambda post: post.viewer_has_liked)
+
+ :param max_count: Maximum count of pictures to download
+ :param fast_update: If true, abort when first already-downloaded picture is encountered
+ :param filter_func: function(post), which returns True if given picture should be downloaded
+ """
+ count = 1
+ for post in self.get_feed_posts():
+ if max_count is not None and count > max_count:
+ break
+ name = post.owner_username
+ if filter_func is not None and not filter_func(post):
+ self._log("" % name, flush=True)
+ continue
+ self._log("[%3i] %s " % (count, name), end="", flush=True)
+ count += 1
+ with self._error_catcher('Download feed'):
+ downloaded = self.download_post(post, target=':feed')
+ if fast_update and not downloaded:
+ break
+
+ def get_hashtag_posts(self, hashtag: str) -> Iterator[Post]:
+ """Get Posts associated with a #hashtag."""
+ yield from (Post(self, node) for node in
+ self.graphql_node_list(17875800862117404, {'tag_name': hashtag},
+ 'https://www.instagram.com/explore/tags/{0}/'.format(hashtag),
+ lambda d: d['data']['hashtag']['edge_hashtag_to_media']))
def download_hashtag(self, hashtag: str,
max_count: Optional[int] = None,
- filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None,
- fast_update: bool = False, download_videos: bool = True, geotags: bool = False,
- download_comments: bool = False) -> None:
+ filter_func: Optional[Callable[[Post], bool]] = None,
+ fast_update: bool = False) -> None:
"""Download pictures of one hashtag.
To download the last 30 pictures with hashtag #cat, do
@@ -894,49 +1033,34 @@ class Instaloader:
:param hashtag: Hashtag to download, without leading '#'
:param max_count: Maximum count of pictures to download
- :param filter_func: function(node), which returns True if given picture should not be downloaded
+ :param filter_func: function(post), which returns True if given picture should be downloaded
:param fast_update: If true, abort when first already-downloaded picture is encountered
- :param download_videos: True, if videos should be downloaded
- :param geotags: Download geotags
- :param download_comments: Update comments
"""
- data = self.get_hashtag_json(hashtag)
+ hashtag = hashtag.lower()
count = 1
- while data:
- for node in data['entry_data']['TagPage'][0]['tag']['media']['nodes']:
- if max_count is not None and count > max_count:
- return
- self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True)
- if filter_func is not None and filter_func(node):
- self._log('')
- continue
- count += 1
- try:
- downloaded = self.download_node(node=node, profile=None, target='#'+hashtag,
- download_videos=download_videos, geotags=geotags,
- download_comments=download_comments)
- except NodeUnavailableException:
- print("Unable to download node \"https://www.instagram.com/p/{}/\" "
- "while downloading hashtag \"{}\"."
- .format(node['shortcode'] if 'shortcode' in node else node['code'], hashtag), file=sys.stderr)
- continue
- if fast_update and not downloaded:
- return
- if data['entry_data']['TagPage'][0]['tag']['media']['page_info']['has_next_page']:
- data = self.get_hashtag_json(hashtag,
- max_id=data['entry_data']['TagPage'][0]['tag']['media']['page_info'][
- 'end_cursor'])
- else:
+ for post in self.get_hashtag_posts(hashtag):
+ if max_count is not None and count > max_count:
break
+ self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True)
+ if filter_func is not None and not filter_func(post):
+ self._log('')
+ continue
+ count += 1
+ with self._error_catcher('Download hashtag #{}'.format(hashtag)):
+ downloaded = self.download_post(post, target='#' + hashtag)
+ if fast_update and not downloaded:
+ break
- def check_id(self, profile: str, json_data: Dict[str, Any]) -> Tuple[str, int]:
+ def check_profile_id(self, profile: str, profile_metadata: Optional[Dict[str, Any]] = None) -> Tuple[str, int]:
"""
Consult locally stored ID of profile with given name, check whether ID matches and whether name
has changed and return current name of the profile, and store ID of profile.
+ :param profile: Profile name
+ :param profile_metadata: The profile's metadata (get_profile_metadata()), or None if the profile was not found
:return: current profile name, profile id
"""
- profile_exists = "ProfilePage" in json_data["entry_data"]
+ profile_exists = profile_metadata is not None
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
id_filename = '{0}/id'.format(self.dirname_pattern.format(profile=profile.lower(),
@@ -947,7 +1071,7 @@ class Instaloader:
with open(id_filename, 'rb') as id_file:
profile_id = int(id_file.read())
if (not profile_exists) or \
- (profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])):
+ (profile_id != int(profile_metadata['user']['id'])):
if profile_exists:
self._log("Profile {0} does not match the stored unique ID {1}.".format(profile, profile_id))
else:
@@ -971,75 +1095,100 @@ class Instaloader:
os.makedirs(self.dirname_pattern.format(profile=profile.lower(),
target=profile.lower()), exist_ok=True)
with open(id_filename, 'w') as text_file:
- profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id']
+ profile_id = profile_metadata['user']['id']
text_file.write(profile_id + "\n")
self._log("Stored ID {0} for profile {1}.".format(profile_id, profile))
return profile, profile_id
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
- def download(self, name: str,
- profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False,
- download_comments: bool = False, fast_update: bool = False,
- download_stories: bool = False, download_stories_only: bool = False) -> None:
+ def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]:
+ """Retrieves a profile's metadata, for use with e.g. get_profile_posts() and check_profile_id()."""
+ try:
+ return self.get_json('{}/'.format(profile_name), params={'__a': 1})
+ except QueryReturnedNotFoundException:
+ raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name))
+
+ def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Post]:
+ """Retrieve all posts from a profile."""
+ profile_name = profile_metadata['user']['username']
+ yield from (Post(self, node, profile=profile_name) for node in profile_metadata['user']['media']['nodes'])
+ has_next_page = profile_metadata['user']['media']['page_info']['has_next_page']
+ end_cursor = profile_metadata['user']['media']['page_info']['end_cursor']
+ while has_next_page:
+ # We do not use self.graphql_node_list() here, because profile_metadata
+ # lets us obtain the first 12 nodes 'for free'
+ data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
+ 'first': 500,
+ 'after': end_cursor},
+ 'https://www.instagram.com/{0}/'.format(profile_name))
+ media = data['data']['user']['edge_owner_to_timeline_media']
+ yield from (Post(self, edge['node'], profile=profile_name) for edge in media['edges'])
+ has_next_page = media['page_info']['has_next_page']
+ end_cursor = media['page_info']['end_cursor']
+
+ def download_profile(self, name: str,
+ profile_pic_only: bool = False, fast_update: bool = False,
+ download_stories: bool = False, download_stories_only: bool = False,
+ filter_func: Optional[Callable[[Post], bool]] = None) -> None:
"""Download one profile"""
+ name = name.lower()
+
# Get profile main page json
- data = self.get_json(name)
+ profile_metadata = None
+ with suppress(ProfileNotExistsException):
+ # ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we
+ # must suppress it here.
+ profile_metadata = self.get_profile_metadata(name)
+
# check if profile does exist or name has changed since last download
# and update name and json data if necessary
- name_updated, profile_id = self.check_id(name, data)
+ name_updated, profile_id = self.check_profile_id(name, profile_metadata)
if name_updated != name:
name = name_updated
- data = self.get_json(name)
+ profile_metadata = self.get_profile_metadata(name)
+
# Download profile picture
- try:
- self.download_profilepic(name, data["entry_data"]["ProfilePage"][0]["user"]["profile_pic_url"])
- except NodeUnavailableException:
- print("Unable to download profilepic of user {}.".format(name), file=sys.stderr)
+ with self._error_catcher('Download profile picture of {}'.format(name)):
+ self.download_profilepic(name, profile_metadata["user"]["profile_pic_url"])
if profile_pic_only:
return
+
# Catch some errors
- if data["entry_data"]["ProfilePage"][0]["user"]["is_private"]:
- if data["config"]["viewer"] is None:
+ if profile_metadata["user"]["is_private"]:
+ if not self.is_logged_in:
raise LoginRequiredException("profile %s requires login" % name)
- if not data["entry_data"]["ProfilePage"][0]["user"]["followed_by_viewer"] and \
- self.username != data["entry_data"]["ProfilePage"][0]["user"]["username"]:
+ if not profile_metadata["user"]["followed_by_viewer"] and \
+ self.username != profile_metadata["user"]["username"]:
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name)
else:
- if data["config"]["viewer"] is not None and not (download_stories or download_stories_only):
+ if self.is_logged_in and not (download_stories or download_stories_only):
self._log("profile %s could also be downloaded anonymously." % name)
+
+ # Download stories, if requested
if download_stories or download_stories_only:
- self.download_stories(userids=[profile_id], filename_target=name,
- download_videos=download_videos, fast_update=fast_update)
+ with self._error_catcher("Download stories of {}".format(name)):
+ self.download_stories(userids=[profile_id], filename_target=name, fast_update=fast_update)
if download_stories_only:
return
- if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or
- not data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) \
- and not profile_pic_only:
+
+ if ("nodes" not in profile_metadata["user"]["media"] or
+ not profile_metadata["user"]["media"]["nodes"]):
raise ProfileHasNoPicsException("Profile %s: no pics found." % name)
# Iterate over pictures and download them
self._log("Retrieving posts from profile {}.".format(name))
- def get_last_id(data):
- if data["entry_data"] and data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]:
- return data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"][-1]["id"]
-
- totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"]
+ totalcount = profile_metadata["user"]["media"]["count"]
count = 1
- while get_last_id(data) is not None:
- for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]:
- self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
- count += 1
- try:
- downloaded = self.download_node(node=node, profile=name, target=name,
- download_videos=download_videos, geotags=geotags,
- download_comments=download_comments)
- except NodeUnavailableException:
- print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {}."
- .format(node['shortcode'] if 'shortcode' in node else node['code'], name), file=sys.stderr)
- continue
+ for post in self.get_profile_posts(profile_metadata):
+ self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
+ count += 1
+ if filter_func is not None and not filter_func(post):
+ self._log('')
+ continue
+ with self._error_catcher('Download profile {}'.format(name)):
+ downloaded = self.download_post(post, target=name)
if fast_update and not downloaded:
- return
- data = self.get_json(name, max_id=get_last_id(data))
+ break
def interactive_login(self, username: str) -> None:
"""Logs in and internally stores session, asking user for password interactively.
@@ -1056,13 +1205,18 @@ class Instaloader:
print(err, file=sys.stderr)
password = None
- def download_profiles(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
- sessionfile: Optional[str] = None, max_count: Optional[int] = None,
- profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False,
- download_comments: bool = False,
- fast_update: bool = False,
- stories: bool = False, stories_only: bool = False) -> None:
- """Download set of profiles and handle sessions"""
+ def main(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
+ sessionfile: Optional[str] = None, max_count: Optional[int] = None,
+ profile_pic_only: bool = False, fast_update: bool = False,
+ stories: bool = False, stories_only: bool = False,
+ filter_str: Optional[str] = None) -> None:
+ """Download set of profiles, hashtags etc. and handle logging in and session files if desired."""
+ # Parse and generate filter function
+ if filter_str is not None:
+ filter_func = filterstr_to_filterfunc(filter_str, username is not None)
+ self._log('Only download posts with property "{}".'.format(filter_str))
+ else:
+ filter_func = None
# Login, if desired
if username is not None:
try:
@@ -1071,86 +1225,73 @@ class Instaloader:
if sessionfile is not None:
print(err, file=sys.stderr)
self._log("Session file does not exist yet - Logging in.")
- if username != self.test_login(self.session):
+ if not self.is_logged_in or username != self.test_login(self.session):
if password is not None:
self.login(username, password)
else:
self.interactive_login(username)
self._log("Logged in as %s." % username)
# Try block for KeyboardInterrupt (save session on ^C)
- failedtargets = []
targets = set()
try:
# Generate set of targets
for pentry in profilelist:
if pentry[0] == '#':
self._log("Retrieving pictures with hashtag {0}".format(pentry))
- self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update,
- download_videos=download_videos, geotags=geotags,
- download_comments=download_comments)
+ with self._error_catcher():
+ self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update,
+ filter_func=filter_func)
elif pentry[0] == '@':
if username is not None:
self._log("Retrieving followees of %s..." % pentry[1:])
- followees = self.get_followees(pentry[1:])
- targets.update([followee['username'] for followee in followees])
+ with self._error_catcher():
+ followees = self.get_followees(pentry[1:])
+ targets.update([followee['username'] for followee in followees])
else:
print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
- elif pentry == ":feed-all":
+ elif pentry == ":feed":
if username is not None:
self._log("Retrieving pictures from your feed...")
- self.download_feed_pics(fast_update=fast_update, max_count=max_count,
- download_videos=download_videos, geotags=geotags,
- download_comments=download_comments)
- else:
- print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
- elif pentry == ":feed-liked":
- if username is not None:
- self._log("Retrieving pictures you liked from your feed...")
- self.download_feed_pics(fast_update=fast_update, max_count=max_count,
- filter_func=lambda node:
- not node["likes"]["viewer_has_liked"]
- if "likes" in node
- else not node["viewer_has_liked"],
- download_videos=download_videos, geotags=geotags,
- download_comments=download_comments)
+ with self._error_catcher():
+ self.download_feed_posts(fast_update=fast_update, max_count=max_count,
+ filter_func=filter_func)
else:
print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
elif pentry == ":stories":
if username is not None:
- self.download_stories(download_videos=download_videos, fast_update=fast_update)
+ with self._error_catcher():
+ self.download_stories(fast_update=fast_update)
else:
print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
else:
targets.add(pentry)
if len(targets) > 1:
- self._log("Downloading %i profiles..." % len(targets))
+ self._log("Downloading {} profiles: {}".format(len(targets), ','.join(targets)))
# Iterate through targets list and download them
for target in targets:
- try:
+ with self._error_catcher():
try:
- self.download(target, profile_pic_only, download_videos,
- geotags, download_comments, fast_update, stories, stories_only)
+ self.download_profile(target, profile_pic_only, fast_update, stories, stories_only,
+ filter_func=filter_func)
except ProfileNotExistsException as err:
if username is not None:
self._log(err)
self._log("Trying again anonymously, helps in case you are just blocked.")
- anonymous_loader = Instaloader(self.sleep, self.quiet, self.shorter_output,
- self.user_agent, self.dirname_pattern, self.filename_pattern)
- anonymous_loader.download(target, profile_pic_only, download_videos,
- geotags, download_comments, fast_update)
+ with self.anonymous_copy() as anonymous_loader:
+ with self._error_catcher():
+ anonymous_loader.download_profile(target, profile_pic_only, fast_update,
+ filter_func=filter_func)
else:
raise err
- except NonfatalException as err:
- failedtargets.append(target)
- print(err, file=sys.stderr)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
- if len(targets) > 1 and failedtargets:
- print("Errors occured (see above) while downloading profiles: %s." %
- ", ".join(failedtargets), file=sys.stderr)
# Save session if it is useful
if username is not None:
self.save_session_to_file(sessionfile)
+ if self.error_log:
+ print("\nErrors occured:", file=sys.stderr)
+ for err in self.error_log:
+ print(err, file=sys.stderr)
def main():
@@ -1167,27 +1308,35 @@ def main():
g_what.add_argument('profile', nargs='*', metavar='profile|#hashtag',
help='Name of profile or #hashtag to download. '
'Alternatively, if --login is given: @ to download all followees of '
- '; the special targets :feed-all or :feed-liked to '
+ '; the special targets :feed to '
'download pictures from your feed; or :stories to download the stories of your '
'followees.')
g_what.add_argument('-P', '--profile-pic-only', action='store_true',
help='Only download profile picture.')
- g_what.add_argument('-V', '--skip-videos', action='store_true',
+ g_what.add_argument('-V', '--no-videos', action='store_true',
help='Do not download videos.')
g_what.add_argument('-G', '--geotags', action='store_true',
help='Download geotags when available. Geotags are stored as a '
'text file with the location\'s name and a Google Maps link. '
'This requires an additional request to the Instagram '
'server for each picture, which is why it is disabled by default.')
+ g_what.add_argument('--no-geotags', action='store_true',
+ help='Do not store geotags, even if they can be obtained without any additional request.')
g_what.add_argument('-C', '--comments', action='store_true',
help='Download and update comments for each post. '
'This requires an additional request to the Instagram '
'server for each post, which is why it is disabled by default.')
+ g_what.add_argument('--no-captions', action='store_true',
+ help='Do not store media captions, although no additional request is needed to obtain them.')
g_what.add_argument('-s', '--stories', action='store_true',
help='Also download stories of each profile that is downloaded. Requires --login.')
g_what.add_argument('--stories-only', action='store_true',
help='Rather than downloading regular posts of each specified profile, only download '
'stories. Requires --login.')
+ g_what.add_argument('--only-if', metavar='filter',
+ help='Expression that, if given, must evaluate to True for each post to be downloaded. Must be '
+ 'a syntactically valid python expression. Variables are evaluated to '
+ 'instaloader.Post attributes. Example: --only-if=viewer_has_liked.')
g_stop = parser.add_argument_group('When to Stop Downloading',
'If none of these options are given, Instaloader goes through all pictures '
@@ -1227,13 +1376,9 @@ def main():
'\'{date:%%Y-%%m-%%d_%%H-%%M-%%S}\'.')
g_how.add_argument('--user-agent',
help='User Agent to use for HTTP requests. Defaults to \'{}\'.'.format(default_user_agent()))
- g_how.add_argument('-S', '--no-sleep', action='store_true',
- help='Do not sleep between requests to Instagram\'s servers. This makes downloading faster, but '
- 'may be suspicious.')
+ g_how.add_argument('-S', '--no-sleep', action='store_true', help=SUPPRESS)
g_misc = parser.add_argument_group('Miscellaneous Options')
- g_misc.add_argument('-O', '--shorter-output', action='store_true',
- help='Do not display captions while downloading.')
g_misc.add_argument('-q', '--quiet', action='store_true',
help='Disable user interaction, i.e. do not print messages (except errors) and fail '
'if login credentials are needed but not given. This makes Instaloader suitable as a '
@@ -1249,14 +1394,33 @@ def main():
args.stories = False
if args.stories_only:
raise SystemExit(1)
- loader = Instaloader(sleep=not args.no_sleep, quiet=args.quiet, shorter_output=args.shorter_output,
+
+ if ':feed-all' in args.profile or ':feed-liked' in args.profile:
+ raise SystemExit(":feed-all and :feed-liked were removed. Use :feed as target and "
+ "eventually --only-if=viewer_has_liked.")
+
+ download_videos = Tristate.always if not args.no_videos else Tristate.no_extra_query
+ download_comments = Tristate.always if args.comments else Tristate.no_extra_query
+ download_captions = Tristate.no_extra_query if not args.no_captions else Tristate.never
+
+ if args.geotags and args.no_geotags:
+ raise SystemExit("--geotags and --no-geotags given. I am confused and refuse to work.")
+ elif args.geotags:
+ download_geotags = Tristate.always
+ elif args.no_geotags:
+ download_geotags = Tristate.never
+ else:
+ download_geotags = Tristate.no_extra_query
+
+ loader = Instaloader(sleep=not args.no_sleep, quiet=args.quiet,
user_agent=args.user_agent,
- dirname_pattern=args.dirname_pattern, filename_pattern=args.filename_pattern)
- loader.download_profiles(args.profile, args.login.lower() if args.login is not None else None, args.password,
- args.sessionfile,
- int(args.count) if args.count is not None else None,
- args.profile_pic_only, not args.skip_videos, args.geotags, args.comments,
- args.fast_update, args.stories, args.stories_only)
+ dirname_pattern=args.dirname_pattern, filename_pattern=args.filename_pattern,
+ download_videos=download_videos, download_geotags=download_geotags,
+ download_captions=download_captions, download_comments=download_comments)
+ loader.main(args.profile, args.login.lower() if args.login is not None else None, args.password,
+ args.sessionfile,
+ int(args.count) if args.count is not None else None,
+ args.profile_pic_only, args.fast_update, args.stories, args.stories_only, args.only_if)
except InstaloaderException as err:
raise SystemExit("Fatal error: %s" % err)
diff --git a/setup.py b/setup.py
index 1777767..248fccb 100755
--- a/setup.py
+++ b/setup.py
@@ -1,14 +1,39 @@
#!/usr/bin/env python3
+import re
import sys
+import os
+import platform
from setuptools import setup
+
+SRC = os.path.abspath(os.path.dirname(__file__))
+
+
+def get_version():
+ with open(os.path.join(SRC, 'instaloader.py')) as f:
+ for line in f:
+ m = re.match("__version__ = '(.*)'", line)
+ if m:
+ return m.group(1)
+ raise SystemExit("Could not find version string.")
+
+
if sys.version_info < (3, 5):
sys.exit('Instaloader requires Python >= 3.5.')
+requirements = ['requests>=2.4']
+
+if platform.system() == 'Windows' and sys.version_info < (3, 6):
+ requirements.append('win_unicode_console')
+
+keywords = (['instagram', 'instagram-scraper', 'instagram-client', 'instagram-feed', 'downloader', 'videos', 'photos',
+ 'pictures', 'instagram-user-photos', 'instagram-photos', 'instagram-metadata', 'instagram-downloader',
+ 'instagram-stories'])
+
setup(
name='instaloader',
- version='2.2.3',
+ version=get_version(),
py_modules=['instaloader'],
url='https://github.com/Thammus/instaloader',
license='MIT',
@@ -16,12 +41,12 @@ setup(
author_email='mail@agraf.me, koch-kramer@web.de',
description='Download pictures (or videos) along with their captions and other metadata '
'from Instagram.',
- long_description=open('README.rst').read(),
- install_requires=['requests>=2.4'],
+ long_description=open(os.path.join(SRC, 'README.rst')).read(),
+ install_requires=requirements,
python_requires='>=3.5',
entry_points={'console_scripts': ['instaloader=instaloader:main']},
zip_safe=True,
- keywords='instagram downloader',
+ keywords=keywords,
classifiers=[
'Development Status :: 5 - Production/Stable',
'Environment :: Console',