1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-08-18 12:49:38 +02:00

Major code cleanup

Remove many code duplications, merely by using more pythonic idioms.

Use GraphQL more often.

Better cope with errors: All requests can be retried; failed requests do
not cause program termination; all error strings are repeated to the
user at the end of execution.

download_post() (formerly download_node()) does not repeat node metadata
request (before this commit, this request was executed up to three
times).
This commit is contained in:
Alexander Graf 2017-08-06 19:27:46 +02:00
parent 5d83a4ccf6
commit 58882f508e
2 changed files with 367 additions and 359 deletions

View File

@ -235,15 +235,13 @@ For example, to get a list of all followees and a list of all followers of a pro
loader.interactive_login(USERNAME)
# Retrieve followees
followees = loader.get_followees(PROFILE)
print(PROFILE + " follows these profiles:")
for f in followees:
for f in loader.get_followees(PROFILE):
print("\t%s\t%s" % (f['username'], f['full_name']))
# Retrieve followers
followers = loader.get_followers(PROFILE)
print("Followers of " + PROFILE + ":")
for f in followers:
for f in loader.get_followers(PROFILE):
print("\t%s\t%s" % (f['username'], f['full_name']))
Then, you may download all pictures of all followees with
@ -252,7 +250,7 @@ Then, you may download all pictures of all followees with
for f in followees:
try:
loader.download(f['username'])
loader.download_profile(f['username'])
except instaloader.NonfatalException:
pass
@ -260,9 +258,11 @@ You could also download your last 20 liked pics with
.. code:: python
loader.download_feed_pics(max_count=20, fast_update=True,
loader.download_feed_posts(max_count=20, fast_update=True,
filter_func=lambda node:
not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"])
not node["likes"]["viewer_has_liked"]
if "likes" in node else
not node["viewer_has_liked"])
To download the last 20 pictures with hashtag #cat, do

View File

@ -15,9 +15,10 @@ import tempfile
import time
from argparse import ArgumentParser
from base64 import b64decode, b64encode
from contextlib import contextmanager, suppress
from datetime import datetime
from io import BytesIO
from typing import Any, Callable, Dict, List, Optional, Tuple
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
import requests
import requests.utils
@ -52,11 +53,11 @@ class NonfatalException(InstaloaderException):
pass
class ProfileNotExistsException(NonfatalException):
class QueryReturnedNotFoundException(InstaloaderException):
pass
class ProfileAccessDeniedException(NonfatalException):
class ProfileNotExistsException(NonfatalException):
pass
@ -88,7 +89,7 @@ class BadCredentialsException(InstaloaderException):
pass
class ConnectionException(InstaloaderException):
class ConnectionException(NonfatalException):
pass
@ -149,17 +150,41 @@ class Instaloader:
self.dirname_pattern = dirname_pattern if dirname_pattern is not None else '{target}'
self.filename_pattern = filename_pattern.replace('{date}', '{date:%Y-%m-%d_%H-%M-%S}') \
if filename_pattern is not None else '{date:%Y-%m-%d_%H-%M-%S}'
self.error_log = []
def _log(self, *msg, sep='', end='\n', flush=False):
"""Log a message to stdout that can be suppressed with --quiet."""
if not self.quiet:
print(*msg, sep=sep, end=end, flush=flush)
def _error(self, msg: str):
"""Log a non-fatal error message to stderr, which is repeated at program termination."""
print(msg, file=sys.stderr)
self.error_log.append(msg)
@contextmanager
def _error_catcher(self, extra_info: Optional[str] = None):
"""
Context manager to catch, output and record NonfatalExceptions.
:param extra_info: String to prefix error message with."""
try:
yield
except NonfatalException as err:
if extra_info:
self._error('{}: {}'.format(extra_info, err))
else:
self._error('{}'.format(err))
def _sleep(self):
"""Sleep a short, random time if self.sleep is set. Called before each request to the instagram.com."""
if self.sleep:
time.sleep(random.uniform(0.25, 2.0))
time.sleep(random.uniform(0.5, 1.75))
def _get_and_write_raw(self, url: str, filename: str, tries: int = 3) -> None:
"""Downloads raw data.
:raises ConnectionException: When download repeatedly failed."""
try:
resp = self.get_anonymous_session().get(url, stream=True)
if resp.status_code == 200:
@ -168,27 +193,43 @@ class Instaloader:
resp.raw.decode_content = True
shutil.copyfileobj(resp.raw, file)
else:
raise ConnectionException("Request returned HTTP error code {}.".format(resp.status_code))
raise ConnectionException("HTTP error code {}.".format(resp.status_code))
except (urllib3.exceptions.HTTPError, requests.exceptions.RequestException, ConnectionException) as err:
print("URL: {}\n{}".format(url, err), file=sys.stderr)
error_string = "URL {}: {}".format(url, err)
if tries <= 1:
raise NodeUnavailableException
raise ConnectionException(error_string)
else:
self._error(error_string)
self._sleep()
self._get_and_write_raw(url, filename, tries - 1)
def get_json(self, name: str, session: requests.Session = None,
max_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Return JSON of a profile"""
if session is None:
session = self.session
def _get_json(self, url: str, params: Optional[Dict[str, Any]] = None,
session: Optional[requests.Session] = None, tries: int = 3) -> Dict[str, Any]:
"""JSON request to Instagram.
:param url: URL, relative to https://www.instagram.com/
:param params: GET parameters
:param session: Session to use, or None to use self.session
:param tries: Maximum number of attempts until a exception is raised
:return: Decoded response dictionary
"""
sess = session if session else self.session
try:
self._sleep()
if not max_id:
resp = session.get('https://www.instagram.com/' + name)
resp = sess.get('https://www.instagram.com/' + url, params=params)
if resp.status_code == 404:
raise QueryReturnedNotFoundException("404")
if resp.status_code != 200:
raise ConnectionException("HTTP error code {}.".format(resp.status_code))
return resp.json()
except (ConnectionException, json.decoder.JSONDecodeError) as err:
error_string = "JSON Query to {}: {}".format(url, err)
if tries <= 1:
raise ConnectionException(error_string)
else:
resp = session.get('https://www.instagram.com/' + name, params={'max_id': max_id})
match = re.search('window\\._sharedData = .*<', resp.text)
if match is not None:
return json.loads(match.group(0)[21:-2])
self._error(error_string)
self._sleep()
self._get_json(url, params, sess, tries - 1)
def default_http_header(self, empty_session_only: bool = False) -> Dict[str, str]:
"""Returns default HTTP header we use for requests."""
@ -238,13 +279,10 @@ class Instaloader:
tmpsession.headers['accept'] = '*/*'
if referer is not None:
tmpsession.headers['referer'] = referer
self._sleep()
response = tmpsession.get('https://www.instagram.com/graphql/query',
return self._get_json('graphql/query',
params={'query_id': query_id,
'variables': json.dumps(variables, separators=(',', ':'))})
if response.status_code != 200:
raise ConnectionException("GraphQL query returned HTTP error code {}.".format(response.status_code))
return response.json()
'variables': json.dumps(variables, separators=(',', ':'))},
session=tmpsession)
def get_username_by_id(self, profile_id: int) -> str:
"""To get the current username of a profile, given its unique ID, this function can be used."""
@ -261,89 +299,49 @@ class Instaloader:
raise LoginRequiredException("Login required to determine username (ID: " + str(profile_id) + ").")
else:
shortcode = mediaid_to_shortcode(int(data['edges'][0]["node"]["id"]))
data = self.get_json("p/" + shortcode)
return data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['owner']['username']
return self.get_post_metadata(shortcode)['owner']['username']
def get_id_by_username(self, profile: str) -> int:
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
his/her username. To get said ID, given the profile's name, you may call this function."""
data = self.get_json(profile, session=self.get_anonymous_session())
if "ProfilePage" not in data["entry_data"]:
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
return int(data['entry_data']['ProfilePage'][0]['user']['id'])
return int(self.get_profile_metadata(profile)['user']['id'])
def get_followers(self, profile: str) -> List[Dict[str, Any]]:
def graphql_node_list(self, query_id: int, query_variables: Dict[str, Any], query_referer: Optional[str],
edge_extractor: Callable[[Dict[str, Any]], Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
query_variables['first'] = 500
data = self.graphql_query(query_id, query_variables, query_referer)
while True:
edge_struct = edge_extractor(data)
yield from [edge['node'] for edge in edge_struct['edges']]
if edge_struct['page_info']['has_next_page']:
query_variables['after'] = edge_struct['page_info']['end_cursor']
data = self.graphql_query(query_id, query_variables, query_referer)
else:
break
def get_followers(self, profile: str) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followers of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
:return: List of followers (list of dictionaries).
"""
profile_id = self.get_id_by_username(profile)
data = self.graphql_query(17851374694183129, {'id': str(profile_id),
'first': 500},
referer='https://www.instagram.com/' + profile + '/')
followers = []
while True:
edge_followed_by = data['data']['user']['edge_followed_by']
followers.extend([follower['node'] for follower in edge_followed_by['edges']])
page_info = edge_followed_by['page_info']
if page_info['has_next_page']:
data = self.graphql_query(17851374694183129, {'id': str(profile_id),
'first': 500,
'after': page_info['end_cursor']},
referer='https://www.instagram.com/' + profile + '/')
else:
break
return followers
yield from self.graphql_node_list(17851374694183129, {'id': str(self.get_id_by_username(profile))},
'https://www.instagram.com/' + profile + '/',
lambda d: d['data']['user']['edge_followed_by'])
def get_followees(self, profile: str) -> List[Dict[str, Any]]:
def get_followees(self, profile: str) -> Iterator[Dict[str, Any]]:
"""
Retrieve list of followees (followings) of given profile.
To use this, one needs to be logged in and private profiles has to be followed,
otherwise this returns an empty list.
:param profile: Name of profile to lookup followers.
:return: List of followees (list of dictionaries).
"""
profile_id = self.get_id_by_username(profile)
data = self.graphql_query(17874545323001329, {'id': profile_id,
'first': 500},
referer='https://www.instagram.com/' + profile + '/')
followees = []
while True:
edge_follow = data['data']['user']['edge_follow']
followees.extend([followee['node'] for followee in edge_follow['edges']])
page_info = edge_follow['page_info']
if page_info['has_next_page']:
data = self.graphql_query(17874545323001329, {'id': profile_id,
'first': 500,
'after': page_info['end_cursor']},
referer='https://www.instagram.com/' + profile + '/')
else:
break
return followees
def get_comments(self, shortcode: str) -> List[Dict[str, Any]]:
"""Retrieve comments of node with given shortcode."""
data = self.graphql_query(17852405266163336, {'shortcode': shortcode,
'first': 500},
referer='https://www.instagram.com/p/' + shortcode + '/')
comments = []
while True:
edge_media_to_comment = data['data']['shortcode_media']['edge_media_to_comment']
comments.extend([comment['node'] for comment in edge_media_to_comment['edges']])
page_info = edge_media_to_comment['page_info']
if page_info['has_next_page']:
data = self.graphql_query(17852405266163336, {'shortcode': shortcode,
'first': 500,
'after': page_info['end_cursor']},
referer='https://www.instagram.com/p/' + shortcode + '/')
else:
break
return comments
yield from self.graphql_node_list(17874545323001329, {'id': str(self.get_id_by_username(profile))},
'https://www.instagram.com/' + profile + '/',
lambda d: d['data']['user']['edge_follow'])
def download_pic(self, filename: str, url: str, mtime: datetime,
filename_suffix: Optional[str] = None) -> bool:
@ -361,6 +359,12 @@ class Instaloader:
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
return True
def get_comments(self, shortcode: str) -> Iterator[Dict[str, Any]]:
"""Retrieve comments of node with given shortcode."""
yield from self.graphql_node_list(17852405266163336, {'shortcode': shortcode},
'https://www.instagram.com/p/' + shortcode + '/',
lambda d: d['data']['shortcode_media']['edge_media_to_comment'])
def update_comments(self, filename: str, shortcode: str) -> None:
filename += '_comments.json'
try:
@ -393,7 +397,7 @@ class Instaloader:
pcaption = "txt"
else:
pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']'
try:
with suppress(FileNotFoundError):
with open(filename, 'rb') as file:
file_caption = file.read()
if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'):
@ -416,8 +420,6 @@ class Instaloader:
self._log(pcaption + ' updated', end=' ', flush=True)
except UnicodeEncodeError:
self._log('txt updated', end=' ', flush=True)
except FileNotFoundError:
pass
try:
self._log(pcaption, end=' ', flush=True)
except UnicodeEncodeError:
@ -463,6 +465,7 @@ class Instaloader:
url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index + offset:]
self._get_and_write_raw(url, filename)
os.utime(filename, (datetime.now().timestamp(), date_object.timestamp()))
self._log('') # log output of _get_and_write_raw() does not produce \n
def save_session_to_file(self, filename: Optional[str] = None) -> None:
"""Saves requests.Session object."""
@ -495,14 +498,11 @@ class Instaloader:
self.session = session
self.username = username
def test_login(self, session: requests.Session) -> Optional[str]:
def test_login(self, session: Optional[requests.Session]) -> Optional[str]:
"""Returns the Instagram username to which given requests.Session object belongs, or None."""
if self.session is None:
return
data = self.get_json(str(), session=session)
if data['config']['viewer'] is None:
return
return data['config']['viewer']['username']
if session:
data = self._get_json('', params={'__a': 1}, session=session)
return data['graphql']['user']['username'] if 'graphql' in data else None
def login(self, user: str, passwd: str) -> None:
"""Log in to instagram with given username and password and internally store session object"""
@ -527,51 +527,34 @@ class Instaloader:
else:
raise ConnectionException('Login error! Connection error!')
def get_feed_json(self, end_cursor: str = None) -> Dict[str, Any]:
"""
Get JSON of the user's feed.
def get_post_metadata(self, shortcode: str, tries: int = 3) -> Dict[str, Any]:
"""Get full metadata of the post associated with given shortcode.
:param end_cursor: The end cursor, as from json["feed"]["media"]["page_info"]["end_cursor"]
:return: JSON
"""
if end_cursor is None:
return self.get_json(str())["entry_data"]["FeedPage"][0]
return self.graphql_query(17863003771166879, {'fetch_media_item_count': 12,
'fetch_media_item_cursor': end_cursor,
'fetch_comment_count': 4,
'fetch_like': 10})
def get_node_metadata(self, node_code: str, tries: int = 3) -> Dict[str, Any]:
pic_json = self.get_json("p/" + node_code)
:raises NodeUnavailableException: If the data cannot be retrieved."""
pic_json = self._get_json("p/{0}/".format(shortcode), params={'__a': 1})
try:
media = pic_json["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] \
if "graphql" in pic_json["entry_data"]["PostPage"][0] \
else pic_json["entry_data"]["PostPage"][0]["media"]
media = pic_json["graphql"]["shortcode_media"] if "graphql" in pic_json else pic_json["media"]
except KeyError as err:
print(err, file=sys.stderr)
print(json.dumps(pic_json, indent=4), file=sys.stderr)
error_string = "Post {}: {}".format(shortcode, err)
if tries <= 1:
raise NodeUnavailableException
raise NodeUnavailableException(error_string)
else:
self._error(error_string)
self._sleep()
media = self.get_node_metadata(node_code, tries - 1)
media = self.get_post_metadata(shortcode, tries - 1)
return media
def get_location(self, node_code: str) -> Dict[str, str]:
try:
media = self.get_node_metadata(node_code)
except NodeUnavailableException:
print("Unable to lookup location for node \"https://www.instagram.com/p/{}/\".".format(node_code),
file=sys.stderr)
return dict()
if media["location"] is not None:
location_json = self.get_json("explore/locations/" +
media["location"]["id"])
return location_json["entry_data"]["LocationsPage"][0]["location"]
def get_location(self, post_metadata: Dict[str, Any]) -> Optional[Dict[str, str]]:
if post_metadata["location"] is not None:
location_json = self._get_json("explore/locations/{0}/".format(post_metadata["location"]["id"]),
params={'__a': 1})
return location_json["location"]
def download_node(self, node: Dict[str, Any], profile: Optional[str], target: str,
def download_post(self, node: Dict[str, Any], profile: Optional[str], target: str,
download_videos: bool = True, geotags: bool = False, download_comments: bool = False) -> bool:
"""
Download everything associated with one instagram node, i.e. picture, caption and video.
Download everything associated with one instagram post node, i.e. picture, caption and video.
:param node: Node, as from media->nodes list in instagram's JSONs
:param profile: Name of profile to which this node belongs
@ -585,36 +568,34 @@ class Instaloader:
needs_profilename = (format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.filename_pattern, 'profile'))
shortcode = node['shortcode'] if 'shortcode' in node else node['code']
post_metadata = None
if needs_profilename:
if already_has_profilename:
profilename = profile if profile is not None else node['owner']['username']
profilename = profilename.lower()
else:
try:
metadata = self.get_node_metadata(shortcode)
profilename = metadata['owner']['username']
except NodeUnavailableException:
print("Unable to gather profilename for node "
"\"https://www.instagram.com/p/{}/\".".format(shortcode), file=sys.stderr)
post_metadata = self.get_post_metadata(shortcode)
profilename = post_metadata['owner']['username'].lower()
except (NonfatalException, KeyError) as err:
self._error("Unable to get owner name of post {}: {} -- using \'UNKNOWN\'.".format(shortcode, err))
profilename = 'UNKNOWN'
else:
profilename = None
profilename = profilename.lower() if profilename else None
date = datetime.fromtimestamp(node["date"] if "date" in node else node["taken_at_timestamp"])
dirname = self.dirname_pattern.format(profile=profilename, target=target.lower())
filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(),
date=date,
shortcode=shortcode)
os.makedirs(os.path.dirname(filename), exist_ok=True)
url = node["display_url"] if "display_url" in node else node["display_src"]
if '__typename' in node:
if node['__typename'] == 'GraphSidecar':
self._sleep()
sidecar_data = self.session.get('https://www.instagram.com/p/' + shortcode + '/',
params={'__a': 1}).json()
if not post_metadata:
post_metadata = self.get_post_metadata(shortcode)
edge_number = 1
downloaded = True
media = sidecar_data["graphql"]["shortcode_media"] if "graphql" in sidecar_data else sidecar_data[
"media"]
for edge in media['edge_sidecar_to_children']['edges']:
for edge in post_metadata['edge_sidecar_to_children']['edges']:
edge_downloaded = self.download_pic(filename=filename,
url=edge['node']['display_url'],
mtime=date,
@ -622,7 +603,6 @@ class Instaloader:
downloaded = downloaded and edge_downloaded
edge_number += 1
elif node['__typename'] in ['GraphImage', 'GraphVideo']:
url = node["display_url"] if "display_url" in node else node["display_src"]
downloaded = self.download_pic(filename=filename,
url=url,
mtime=date)
@ -631,7 +611,7 @@ class Instaloader:
downloaded = False
else:
# Node is an old image or video.
downloaded = self.download_pic(filename=filename, url=node["display_src"], mtime=date)
downloaded = self.download_pic(filename=filename, url=url, mtime=date)
if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]:
self.save_caption(filename, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"])
elif "caption" in node:
@ -639,12 +619,15 @@ class Instaloader:
else:
self._log("<no caption>", end=' ', flush=True)
if node["is_video"] and download_videos:
video_data = self.get_json('p/' + shortcode)
if not post_metadata:
post_metadata = self.get_post_metadata(shortcode)
self.download_pic(filename=filename,
url=video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'],
url=post_metadata['video_url'],
mtime=date)
if geotags:
location = self.get_location(shortcode)
if not post_metadata:
post_metadata = self.get_post_metadata(shortcode)
location = self.get_location(post_metadata)
if location:
self.save_location(filename, location, date)
if download_comments:
@ -652,6 +635,41 @@ class Instaloader:
self._log()
return downloaded
def get_stories(self, userids: Optional[List[int]] = None) -> Iterator[Dict[str, Any]]:
"""Get available stories from followees or all stories of users whose ID are given.
Does not mark stories as seen.
To use this, one needs to be logged in
:param userids: List of user IDs to be processed in terms of downloading their stories, or None.
"""
tempsession = copy_session(self.session)
header = tempsession.headers
header['User-Agent'] = 'Instagram 10.3.2 (iPhone7,2; iPhone OS 9_3_3; en_US; en-US; scale=2.00; 750x1334) ' \
'AppleWebKit/420+'
del header['Host']
del header['Origin']
del header['X-Instagram-AJAX']
del header['X-Requested-With']
def _get(url):
self._sleep()
resp = tempsession.get(url)
if resp.status_code != 200:
raise ConnectionException('Failed to fetch stories.')
return json.loads(resp.text)
url_reel_media = 'https://i.instagram.com/api/v1/feed/user/{0}/reel_media/'
url_reels_tray = 'https://i.instagram.com/api/v1/feed/reels_tray/'
if userids is not None:
for userid in userids:
yield _get(url_reel_media.format(userid))
else:
data = _get(url_reels_tray)
if not 'tray' in data:
raise BadResponseException('Bad story reel JSON.')
for user in data["tray"]:
yield user if "items" in user else _get(url_reel_media.format(user['user']['pk']))
def download_stories(self,
userids: Optional[List[int]] = None,
download_videos: bool = True,
@ -671,35 +689,7 @@ class Instaloader:
if self.username is None:
raise LoginRequiredException('Login required to download stories')
tempsession = copy_session(self.session)
header = tempsession.headers
header['User-Agent'] = 'Instagram 10.3.2 (iPhone7,2; iPhone OS 9_3_3; en_US; en-US; scale=2.00; 750x1334) ' \
'AppleWebKit/420+'
del header['Host']
del header['Origin']
del header['X-Instagram-AJAX']
del header['X-Requested-With']
def _user_stories():
def _get(url):
self._sleep()
resp = tempsession.get(url)
if resp.status_code != 200:
raise ConnectionException('Failed to fetch stories.')
return json.loads(resp.text)
url_reel_media = 'https://i.instagram.com/api/v1/feed/user/{0}/reel_media/'
url_reels_tray = 'https://i.instagram.com/api/v1/feed/reels_tray/'
if userids is not None:
for userid in userids:
yield _get(url_reel_media.format(userid))
else:
data = _get(url_reels_tray)
if not 'tray' in data:
raise BadResponseException('Bad story reel JSON.')
for user in data["tray"]:
yield user if "items" in user else _get(url_reel_media.format(user['user']['pk']))
for user_stories in _user_stories():
for user_stories in self.get_stories(userids):
if "items" not in user_stories:
continue
name = user_stories["user"]["username"].lower()
@ -710,7 +700,6 @@ class Instaloader:
self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
count += 1
self._sleep()
shortcode = item["code"] if "code" in item else "no_code"
date_float = item["device_timestamp"] if "device_timestamp" in item else item["taken_at"]
@ -726,7 +715,7 @@ class Instaloader:
date=date,
shortcode=shortcode)
os.makedirs(os.path.dirname(filename), exist_ok=True)
try:
with self._error_catcher('Download story {} from user {}'.format(shortcode, name)):
if "image_versions2" in item:
url = item["image_versions2"]["candidates"][0]["url"]
downloaded = self.download_pic(filename=filename,
@ -746,12 +735,6 @@ class Instaloader:
downloaded = self.download_pic(filename=filename,
url=item["video_versions"][0]["url"],
mtime=date)
if "video_duration" in item and self.sleep and downloaded:
time.sleep(item["video_duration"])
except NodeUnavailableException:
print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {} from stories."
.format(shortcode, name), file=sys.stderr)
continue
if item["story_locations"]:
location = item["story_locations"][0]["location"]
if location:
@ -760,31 +743,11 @@ class Instaloader:
if fast_update and not downloaded:
break
def download_feed_pics(self, max_count: int = None, fast_update: bool = False,
filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None,
download_videos: bool = True, geotags: bool = False,
download_comments: bool = False) -> None:
"""
Download pictures from the user's feed.
def get_feed_posts(self) -> Iterator[Dict[str, Any]]:
"""Get Posts of the user's feed."""
Example to download up to the 20 pics the user last liked:
>>> loader = Instaloader()
>>> loader.load_session_from_file('USER')
>>> loader.download_feed_pics(max_count=20, fast_update=True,
>>> filter_func=lambda node:
>>> not node["likes"]["viewer_has_liked"]
>>> if "likes" in node else
>>> not node["viewer_has_liked"])
data = self._get_json('', params={'__a': 1})
:param max_count: Maximum count of pictures to download
:param fast_update: If true, abort when first already-downloaded picture is encountered
:param filter_func: function(node), which returns True if given picture should not be downloaded
:param download_videos: True, if videos should be downloaded
:param geotags: Download geotags
:param download_comments: Update comments
"""
data = self.get_feed_json()
count = 1
while True:
if "graphql" in data:
is_edge = True
@ -795,34 +758,64 @@ class Instaloader:
else:
is_edge = False
feed = data["feed"]["media"]
for edge_or_node in feed["edges"] if is_edge else feed["nodes"]:
if is_edge:
yield from [edge["node"] for edge in feed["edges"]]
else:
yield from [node for node in feed["nodes"]]
if not feed["page_info"]["has_next_page"]:
break
data = self.graphql_query(17863003771166879, {'fetch_media_item_count': 12,
'fetch_media_item_cursor': feed["page_info"]["end_cursor"],
'fetch_comment_count': 4,
'fetch_like': 10})
def download_feed_posts(self, max_count: int = None, fast_update: bool = False,
filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None,
download_videos: bool = True, geotags: bool = False,
download_comments: bool = False) -> None:
"""
Download pictures from the user's feed.
Example to download up to the 20 pics the user last liked:
>>> loader = Instaloader()
>>> loader.load_session_from_file('USER')
>>> loader.download_feed_posts(max_count=20, fast_update=True,
>>> filter_func=lambda post:
>>> not post["likes"]["viewer_has_liked"]
>>> if "likes" in post else
>>> not post["viewer_has_liked"])
:param max_count: Maximum count of pictures to download
:param fast_update: If true, abort when first already-downloaded picture is encountered
:param filter_func: function(post), which returns True if given picture should not be downloaded
:param download_videos: True, if videos should be downloaded
:param geotags: Download geotags
:param download_comments: Update comments
"""
count = 1
for post in self.get_feed_posts():
if max_count is not None and count > max_count:
return
node = edge_or_node["node"] if is_edge else edge_or_node
name = node["owner"]["username"]
if filter_func is not None and filter_func(node):
break
name = post["owner"]["username"]
if filter_func is not None and filter_func(post):
self._log("<pic by %s skipped>" % name, flush=True)
continue
self._log("[%3i] %s " % (count, name), end="", flush=True)
count += 1
try:
downloaded = self.download_node(node, profile=name, target=':feed',
with self._error_catcher('Download feed'):
downloaded = self.download_post(post, profile=name, target=':feed',
download_videos=download_videos, geotags=geotags,
download_comments=download_comments)
except NodeUnavailableException:
print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {} from feed."
.format(node['shortcode'], name), file=sys.stderr)
continue
if fast_update and not downloaded:
return
if not feed["page_info"]["has_next_page"]:
break
data = self.get_feed_json(end_cursor=feed["page_info"]["end_cursor"])
def get_hashtag_json(self, hashtag: str,
max_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
"""Return JSON of a #hashtag"""
return self.get_json(name='explore/tags/{0}/'.format(hashtag), max_id=max_id)
def get_hashtag_posts(self, hashtag: str) -> Iterator[Dict[str, Any]]:
"""Get Posts associated with a #hashtag."""
yield from self.graphql_node_list(17875800862117404, {'tag_name': hashtag},
'https://www.instagram.com/explore/tags/' + hashtag + '/',
lambda d: d['data']['hashtag']['edge_hashtag_to_media'])
def download_hashtag(self, hashtag: str,
max_count: Optional[int] = None,
@ -837,48 +830,38 @@ class Instaloader:
:param hashtag: Hashtag to download, without leading '#'
:param max_count: Maximum count of pictures to download
:param filter_func: function(node), which returns True if given picture should not be downloaded
:param filter_func: function(post), which returns True if given picture should not be downloaded
:param fast_update: If true, abort when first already-downloaded picture is encountered
:param download_videos: True, if videos should be downloaded
:param geotags: Download geotags
:param download_comments: Update comments
"""
data = self.get_hashtag_json(hashtag)
count = 1
while data:
for node in data['entry_data']['TagPage'][0]['tag']['media']['nodes']:
for post in self.get_hashtag_posts(hashtag):
if max_count is not None and count > max_count:
return
break
self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True)
if filter_func is not None and filter_func(node):
if filter_func is not None and filter_func(post):
self._log('<skipped>')
continue
count += 1
try:
downloaded = self.download_node(node=node, profile=None, target='#'+hashtag,
with self._error_catcher('Download hashtag #{}'.format(hashtag)):
downloaded = self.download_post(node=post, profile=None, target='#' + hashtag,
download_videos=download_videos, geotags=geotags,
download_comments=download_comments)
except NodeUnavailableException:
print("Unable to download node \"https://www.instagram.com/p/{}/\" "
"while downloading hashtag \"{}\".".format(node['shortcode'], hashtag), file=sys.stderr)
continue
if fast_update and not downloaded:
return
if data['entry_data']['TagPage'][0]['tag']['media']['page_info']['has_next_page']:
data = self.get_hashtag_json(hashtag,
max_id=data['entry_data']['TagPage'][0]['tag']['media']['page_info'][
'end_cursor'])
else:
break
def check_id(self, profile: str, json_data: Dict[str, Any]) -> Tuple[str, int]:
def check_profile_id(self, profile: str, profile_metadata: Optional[Dict[str, Any]] = None) -> Tuple[str, int]:
"""
Consult locally stored ID of profile with given name, check whether ID matches and whether name
has changed and return current name of the profile, and store ID of profile.
:param profile: Profile name
:param profile_metadata: The profile's metadata (get_profile_metadata()), or None if the profile was not found
:return: current profile name, profile id
"""
profile_exists = "ProfilePage" in json_data["entry_data"]
profile_exists = profile_metadata is not None
if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))):
id_filename = '{0}/id'.format(self.dirname_pattern.format(profile=profile.lower(),
@ -889,7 +872,7 @@ class Instaloader:
with open(id_filename, 'rb') as id_file:
profile_id = int(id_file.read())
if (not profile_exists) or \
(profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])):
(profile_id != int(profile_metadata['user']['id'])):
if profile_exists:
self._log("Profile {0} does not match the stored unique ID {1}.".format(profile, profile_id))
else:
@ -913,75 +896,95 @@ class Instaloader:
os.makedirs(self.dirname_pattern.format(profile=profile.lower(),
target=profile.lower()), exist_ok=True)
with open(id_filename, 'w') as text_file:
profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id']
profile_id = profile_metadata['user']['id']
text_file.write(profile_id + "\n")
self._log("Stored ID {0} for profile {1}.".format(profile_id, profile))
return profile, profile_id
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
def download(self, name: str,
def get_profile_metadata(self, profile_name: str) -> Dict[str, Any]:
"""Retrieves a profile's metadata, for use with e.g. get_profile_posts() and check_profile_id()."""
try:
return self._get_json('{}/'.format(profile_name), params={'__a': 1})
except QueryReturnedNotFoundException:
raise ProfileNotExistsException('Profile {} does not exist.'.format(profile_name))
def get_profile_posts(self, profile_metadata: Dict[str, Any]) -> Iterator[Dict[str, Any]]:
"""Retrieve all posts from a profile."""
yield from profile_metadata['user']['media']['nodes']
has_next_page = profile_metadata['user']['media']['page_info']['has_next_page']
end_cursor = profile_metadata['user']['media']['page_info']['end_cursor']
while has_next_page:
data = self.graphql_query(17888483320059182, {'id': profile_metadata['user']['id'],
'first': 500,
'after': end_cursor},
'https://www.instagram.com/{0}/'.format(profile_metadata['user']['username']))
media = data['data']['user']['edge_owner_to_timeline_media']
yield from [edge['node'] for edge in media['edges']]
has_next_page = media['page_info']['has_next_page']
end_cursor = media['page_info']['end_cursor']
def download_profile(self, name: str,
profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False,
download_comments: bool = False, fast_update: bool = False,
download_stories: bool = False, download_stories_only: bool = False) -> None:
"""Download one profile"""
# Get profile main page json
data = self.get_json(name)
profile_metadata = None
with suppress(ProfileNotExistsException):
# ProfileNotExistsException is raised again later in check_profile_id() when we search the profile, so we
# must suppress it.
profile_metadata = self.get_profile_metadata(name)
# check if profile does exist or name has changed since last download
# and update name and json data if necessary
name_updated, profile_id = self.check_id(name, data)
name_updated, profile_id = self.check_profile_id(name, profile_metadata)
if name_updated != name:
name = name_updated
data = self.get_json(name)
profile_metadata = self.get_profile_metadata(name)
# Download profile picture
try:
self.download_profilepic(name, data["entry_data"]["ProfilePage"][0]["user"]["profile_pic_url"])
except NodeUnavailableException:
print("Unable to download profilepic of user {}.".format(name), file=sys.stderr)
with self._error_catcher('Download profile picture of {}'.format(name)):
self.download_profilepic(name, profile_metadata["user"]["profile_pic_url"])
if profile_pic_only:
return
# Catch some errors
if data["entry_data"]["ProfilePage"][0]["user"]["is_private"]:
if data["config"]["viewer"] is None:
if profile_metadata["user"]["is_private"]:
if self.username is None:
raise LoginRequiredException("profile %s requires login" % name)
if not data["entry_data"]["ProfilePage"][0]["user"]["followed_by_viewer"] and \
self.username != data["entry_data"]["ProfilePage"][0]["user"]["username"]:
if not profile_metadata["user"]["followed_by_viewer"] and \
self.username != profile_metadata["user"]["username"]:
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name)
else:
if data["config"]["viewer"] is not None and not (download_stories or download_stories_only):
if self.username is not None and not (download_stories or download_stories_only):
self._log("profile %s could also be downloaded anonymously." % name)
# Download stories, if requested
if download_stories or download_stories_only:
self.download_stories(userids=[profile_id], filename_target=name,
download_videos=download_videos, fast_update=fast_update)
if download_stories_only:
return
if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or
not data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) \
and not profile_pic_only:
if ("nodes" not in profile_metadata["user"]["media"] or
not profile_metadata["user"]["media"]["nodes"]):
raise ProfileHasNoPicsException("Profile %s: no pics found." % name)
# Iterate over pictures and download them
self._log("Retrieving posts from profile {}.".format(name))
def get_last_id(data):
if data["entry_data"] and data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]:
return data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"][-1]["id"]
totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"]
totalcount = profile_metadata["user"]["media"]["count"]
count = 1
while get_last_id(data) is not None:
for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]:
for post in self.get_profile_posts(profile_metadata):
self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
count += 1
try:
downloaded = self.download_node(node=node, profile=name, target=name,
with self._error_catcher('Download profile {}'.format(name)):
downloaded = self.download_post(node=post, profile=name, target=name,
download_videos=download_videos, geotags=geotags,
download_comments=download_comments)
except NodeUnavailableException:
print("Unable to download node \"https://www.instagram.com/p/{}/\" of user {}."
.format(node['shortcode'], name), file=sys.stderr)
continue
if fast_update and not downloaded:
return
data = self.get_json(name, max_id=get_last_id(data))
break
def interactive_login(self, username: str) -> None:
"""Logs in and internally stores session, asking user for password interactively.
@ -998,7 +1001,7 @@ class Instaloader:
print(err, file=sys.stderr)
password = None
def download_profiles(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
def main(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
sessionfile: Optional[str] = None, max_count: Optional[int] = None,
profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False,
download_comments: bool = False,
@ -1020,19 +1023,20 @@ class Instaloader:
self.interactive_login(username)
self._log("Logged in as %s." % username)
# Try block for KeyboardInterrupt (save session on ^C)
failedtargets = []
targets = set()
try:
# Generate set of targets
for pentry in profilelist:
if pentry[0] == '#':
self._log("Retrieving pictures with hashtag {0}".format(pentry))
with self._error_catcher():
self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update,
download_videos=download_videos, geotags=geotags,
download_comments=download_comments)
elif pentry[0] == '@':
if username is not None:
self._log("Retrieving followees of %s..." % pentry[1:])
with self._error_catcher():
followees = self.get_followees(pentry[1:])
targets.update([followee['username'] for followee in followees])
else:
@ -1040,7 +1044,8 @@ class Instaloader:
elif pentry == ":feed-all":
if username is not None:
self._log("Retrieving pictures from your feed...")
self.download_feed_pics(fast_update=fast_update, max_count=max_count,
with self._error_catcher():
self.download_feed_posts(fast_update=fast_update, max_count=max_count,
download_videos=download_videos, geotags=geotags,
download_comments=download_comments)
else:
@ -1048,17 +1053,20 @@ class Instaloader:
elif pentry == ":feed-liked":
if username is not None:
self._log("Retrieving pictures you liked from your feed...")
self.download_feed_pics(fast_update=fast_update, max_count=max_count,
filter_func=lambda node:
not node["likes"]["viewer_has_liked"]
if "likes" in node
else not node["viewer_has_liked"],
def liked_filter(node):
if "likes" in node:
return not node["likes"]["viewer_has_liked"]
return not node["viewer_has_liked"]
with self._error_catcher():
self.download_feed_posts(fast_update=fast_update, max_count=max_count,
filter_func=liked_filter,
download_videos=download_videos, geotags=geotags,
download_comments=download_comments)
else:
print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
elif pentry == ":stories":
if username is not None:
with self._error_catcher():
self.download_stories(download_videos=download_videos, fast_update=fast_update)
else:
print("--login=USERNAME required to download {}.".format(pentry), file=sys.stderr)
@ -1068,9 +1076,9 @@ class Instaloader:
self._log("Downloading %i profiles..." % len(targets))
# Iterate through targets list and download them
for target in targets:
with self._error_catcher():
try:
try:
self.download(target, profile_pic_only, download_videos,
self.download_profile(target, profile_pic_only, download_videos,
geotags, download_comments, fast_update, stories, stories_only)
except ProfileNotExistsException as err:
if username is not None:
@ -1078,21 +1086,21 @@ class Instaloader:
self._log("Trying again anonymously, helps in case you are just blocked.")
anonymous_loader = Instaloader(self.sleep, self.quiet, self.shorter_output,
self.user_agent, self.dirname_pattern, self.filename_pattern)
anonymous_loader.download(target, profile_pic_only, download_videos,
anonymous_loader.error_log = self.error_log
with self._error_catcher():
anonymous_loader.download_profile(target, profile_pic_only, download_videos,
geotags, download_comments, fast_update)
else:
raise err
except NonfatalException as err:
failedtargets.append(target)
print(err, file=sys.stderr)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
if len(targets) > 1 and failedtargets:
print("Errors occured (see above) while downloading profiles: %s." %
", ".join(failedtargets), file=sys.stderr)
# Save session if it is useful
if username is not None:
self.save_session_to_file(sessionfile)
if self.error_log:
print("\nErrors occured:", file=sys.stderr)
for err in self.error_log:
print(err, file=sys.stderr)
def main():
@ -1194,7 +1202,7 @@ def main():
loader = Instaloader(sleep=not args.no_sleep, quiet=args.quiet, shorter_output=args.shorter_output,
user_agent=args.user_agent,
dirname_pattern=args.dirname_pattern, filename_pattern=args.filename_pattern)
loader.download_profiles(args.profile, args.login.lower() if args.login is not None else None, args.password,
loader.main(args.profile, args.login.lower() if args.login is not None else None, args.password,
args.sessionfile,
int(args.count) if args.count is not None else None,
args.profile_pic_only, not args.skip_videos, args.geotags, args.comments,