From caf75a8135af395d000dbf3cc4afb2ef03179b1d Mon Sep 17 00:00:00 2001 From: Alexander Graf Date: Sat, 24 Jun 2017 22:43:40 +0200 Subject: [PATCH] Refactor Instaloader's methods into a class --- .travis.yml | 2 +- README.rst | 23 +- instaloader.py | 1408 ++++++++++++++++++++++++------------------------ 3 files changed, 713 insertions(+), 720 deletions(-) diff --git a/.travis.yml b/.travis.yml index 04ee698..d3201cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,4 +9,4 @@ python: install: - pip install pylint requests script: - - python3 -m pylint -r n -d bad-whitespace,bad-continuation,missing-docstring,multiple-imports,too-many-arguments,locally-disabled,line-too-long instaloader + - python3 -m pylint -r n -d bad-whitespace,missing-docstring,too-many-arguments,locally-disabled,line-too-long,too-many-public-methods instaloader diff --git a/README.rst b/README.rst index 76df367..3931979 100644 --- a/README.rst +++ b/README.rst @@ -136,11 +136,14 @@ their follower count, do import instaloader - # login - session = instaloader.get_logged_in_session(USERNAME) + # Get instance + loader = instaloader.Instaloader() - # get followees - followees = instaloader.get_followees(PROFILE, session) + # Login + loader.interactive_login(USERNAME) + + # Retrieve followees + followees = loader.get_followees(PROFILE) for f in followees: print("%i\t%s\t%s" % (f['follower_count'], f['username'], f['full_name'])) @@ -150,7 +153,7 @@ Then, you may download all pictures of all followees with for f in followees: try: - instaloader.download(f['username'], session) + loader.download(f['username']) except instaloader.NonfatalException: pass @@ -158,15 +161,15 @@ You could also download your last 20 liked pics with .. code:: python - instaloader.download_feed_pics(session, max_count=20, fast_update=True, - filter_func=lambda node: + oader.download_feed_pics(max_count=20, fast_update=True, + filter_func=lambda node: not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"]) To download the last 20 pictures with hashtag #cat, do .. code:: python - instaloader.download_hashtag('cat', session=instaloader.get_anonymous_session(), max_count=20) + loader.download_hashtag('cat', max_count=20) Each Instagram profile has its own unique ID which stays unmodified even if a user changes his/her username. To get said ID, given the profile's @@ -174,7 +177,7 @@ name, you may call .. code:: python - instaloader.get_id_by_username(PROFILE_NAME) + loader.get_id_by_username(PROFILE_NAME) ``get_followees()`` also returns unique IDs for all loaded followees. To get the current username of a profile, given this unique ID @@ -182,4 +185,4 @@ get the current username of a profile, given this unique ID .. code:: python - instaloader.get_username_by_id(session, followees[0]['id']) + loader.get_username_by_id(followees[0]['id']) diff --git a/instaloader.py b/instaloader.py index d4edf8f..3fba648 100755 --- a/instaloader.py +++ b/instaloader.py @@ -3,13 +3,24 @@ """Tool to download pictures (or videos) and captions from Instagram, from a given set of profiles (even if private), from your feed or from all followees of a given profile.""" -import re, json, datetime, shutil, os, time, random, sys, pickle, getpass, tempfile +import datetime +import getpass +import json +import os +import pickle +import random +import re +import shutil +import sys +import tempfile +import time from argparse import ArgumentParser from io import BytesIO -from numbers import Real -from typing import List, Optional, Any, Dict, Callable +from typing import Any, Callable, Dict, List, Optional + +import requests +import requests.utils -import requests, requests.utils # To get version from setup.py for instaloader --version import pkg_resources @@ -33,252 +44,44 @@ class InstaloaderException(Exception): """Base exception for this script""" pass + class NonfatalException(InstaloaderException): """Base exception for errors which should not cause instaloader to stop""" pass + class ProfileNotExistsException(NonfatalException): pass + class ProfileAccessDeniedException(NonfatalException): pass + class ProfileHasNoPicsException(NonfatalException): pass + class PrivateProfileNotFollowedException(NonfatalException): pass + class LoginRequiredException(NonfatalException): pass + class BadCredentialsException(InstaloaderException): pass + class ConnectionException(InstaloaderException): pass -def _log(*msg, sep='', end='\n', flush=False, quiet=False): - if not quiet: - print(*msg, sep=sep, end=end, flush=flush) - - -def get_json(name: str, session: requests.Session, - max_id: Optional[str] = None, sleep: bool = True) -> Optional[Dict[str, Any]]: - """Return JSON of a profile""" - if not max_id: - resp = session.get('https://www.instagram.com/'+name) - else: - resp = session.get('https://www.instagram.com/'+name, params={'max_id': max_id}) - if sleep: - time.sleep(4 * random.random() + 1) - match = re.search('window\\._sharedData = .*<', resp.text) - if match is not None: - return json.loads(match.group(0)[21:-2]) - - -def get_username_by_id(session: requests.Session, profile_id: int) -> str: - """To get the current username of a profile, given its unique ID, this function can be used. - session is required to be a logged-in (i.e. non-anonymous) session.""" - tempsession = copy_session(session) - tempsession.headers.update({'Content-Type' : 'application/x-www-form-urlencoded'}) - resp = tempsession.post('https://www.instagram.com/query/', data='q=ig_user(' + - str(profile_id) +')+%7B%0A++username%0A%7D%0A') - if resp.status_code == 200: - data = json.loads(resp.text) - if 'username' in data: - return json.loads(resp.text)['username'] - raise ProfileNotExistsException("No profile found, the user may have blocked " + - "you (id: " + str(profile_id) + ").") - else: - if test_login(session): - raise ProfileAccessDeniedException("Username could not be determined due to error {0} (id: {1})." - .format(str(resp.status_code), str(profile_id))) - raise LoginRequiredException("Login required to determine username (id: " + - str(profile_id) + ").") - - -def get_id_by_username(profile: str) -> int: - """Each Instagram profile has its own unique ID which stays unmodified even if a user changes - his/her username. To get said ID, given the profile's name, you may call this function.""" - data = get_json(profile, get_anonymous_session()) - if "ProfilePage" not in data["entry_data"]: - raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) - return int(data['entry_data']['ProfilePage'][0]['user']['id']) - - -def _epoch_to_string(epoch: Real) -> str: +def _epoch_to_string(epoch: float) -> str: return datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d_%H-%M-%S') -def get_followees(profile: str, session: requests.Session) -> List[Dict[str, Any]]: - """ - Retrieve list of followees of given profile - - :param profile: Name of profile to lookup followees - :param session: Session belonging to a user, i.e. not an anonymous session - :return: List of followees (list of dictionaries), as returned by instagram server - """ - tmpsession = copy_session(session) - data = get_json(profile, tmpsession) - profile_id = data['entry_data']['ProfilePage'][0]['user']['id'] - query = ["q=ig_user(" + profile_id + ")+%7B%0A" - "++follows.", - str(data['entry_data']['ProfilePage'][0]['user']['follows']['count']) + - ")+%7B%0A" - "++++count%2C%0A" - "++++page_info+%7B%0A" - "++++++end_cursor%2C%0A" - "++++++has_next_page%0A" - "++++%7D%2C%0A" - "++++nodes+%7B%0A" - "++++++id%2C%0A" - "++++++full_name%2C%0A" - "++++++username%2C%0A" - "++++++followed_by+%7B%0A" - "++++++++count%0A" - "++++++%7D%0A" - "++++%7D%0A" - "++%7D%0A" - "%7D%0A" - "&ref=relationships%3A%3Afollow_list"] - tmpsession.headers.update(default_http_header()) - tmpsession.headers.update({'Referer' : 'https://www.instagram.com/'+profile+'/following/'}) - tmpsession.headers.update({'Content-Type' : 'application/x-www-form-urlencoded'}) - resp = tmpsession.post('https://www.instagram.com/query/', data=query[0]+"first("+query[1]) - if resp.status_code == 200: - data = json.loads(resp.text) - followees = [] - while True: - for followee in data['follows']['nodes']: - followee['follower_count'] = followee.pop('followed_by')['count'] - followees = followees + [followee] - if data['follows']['page_info']['has_next_page']: - resp = tmpsession.post('https://www.instagram.com/query/', data=query[0] - + "after(" - + data['follows']['page_info']['end_cursor'] - + "%2C+" + query[1] ) - data = json.loads(resp.text) - else: - break - return followees - if test_login(tmpsession): - raise ConnectionException("ConnectionError("+str(resp.status_code)+"): " - "unable to gather followees.") - raise LoginRequiredException("Login required to gather followees.") - - -def download_pic(name: str, url: str, date_epoch: Real, outputlabel: Optional[str] = None, quiet: bool = False, - filename_suffix: Optional[str] = None) -> bool: - """Downloads and saves picture with given url under given directory with given timestamp. - Returns true, if file was actually downloaded, i.e. updated.""" - if outputlabel is None: - outputlabel = _epoch_to_string(date_epoch) - urlmatch = re.search('\\.[a-z]*\\?', url) - file_extension = url[-3:] if urlmatch is None else urlmatch.group(0)[1:-1] - filename = name.lower() + '/' + _epoch_to_string(date_epoch) - if filename_suffix is not None: - filename += '_' + filename_suffix - filename += '.' + file_extension - if os.path.isfile(filename): - _log(outputlabel + ' exists', end=' ', flush=True, quiet=quiet) - return False - resp = get_anonymous_session().get(url, stream=True) - if resp.status_code == 200: - _log(outputlabel, end=' ', flush=True, quiet=quiet) - os.makedirs(name.lower(), exist_ok=True) - with open(filename, 'wb') as file: - resp.raw.decode_content = True - shutil.copyfileobj(resp.raw, file) - os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) - return True - else: - raise ConnectionException("File \'" + url + "\' could not be downloaded.") - - -def save_caption(name: str, date_epoch: Real, caption: str, shorter_output: bool = False, quiet: bool = False) -> None: - """Updates picture caption""" - filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.txt' - pcaption = caption.replace('\n', ' ').strip() - caption = caption.encode("UTF-8") - if shorter_output: - pcaption = "txt" - else: - pcaption = '[' + ((pcaption[:29]+u"\u2026") if len(pcaption)>31 else pcaption) + ']' - try: - with open(filename, 'rb') as file: - file_caption = file.read() - if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'): - try: - _log(pcaption + ' unchanged', end=' ', flush=True, quiet=quiet) - except UnicodeEncodeError: - _log('txt unchanged', end=' ', flush=True, quiet=quiet) - return None - else: - def get_filename(index): - return filename if index==0 else (filename[:-4] + '_old_' + - (str(0) if index<10 else str()) + str(index) + filename[-4:]) - i = 0 - while os.path.isfile(get_filename(i)): - i = i + 1 - for index in range(i, 0, -1): - os.rename(get_filename(index-1), get_filename(index)) - try: - _log(pcaption + ' updated', end=' ', flush=True, quiet=quiet) - except UnicodeEncodeError: - _log('txt updated', end=' ', flush=True, quiet=quiet) - except FileNotFoundError: - pass - try: - _log(pcaption, end=' ', flush=True, quiet=quiet) - except UnicodeEncodeError: - _log('txt', end=' ', flush=True, quiet=quiet) - os.makedirs(name.lower(), exist_ok=True) - with open(filename, 'wb') as text_file: - shutil.copyfileobj(BytesIO(caption), text_file) - os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) - - -def save_location(name: str, location_json: Dict[str, str], date_epoch: Real, quiet: bool = False) -> None: - filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '_location.txt' - location_string = location_json["name"]+"\n" + \ - "https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n" \ - .format(location_json["lat"], location_json["lng"]) - os.makedirs(name.lower(), exist_ok=True) - with open(filename, 'wb') as text_file: - shutil.copyfileobj(BytesIO(location_string.encode()), text_file) - os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) - _log('geo', end=' ', flush=True, quiet=quiet) - - -def download_profilepic(name: str, url: str, quiet: bool = False) -> None: - """Downloads and saves profile pic with given url.""" - date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"], \ - '%a, %d %b %Y %H:%M:%S GMT') - filename = name.lower() + '/' + _epoch_to_string(date_object.timestamp()) + \ - '_UTC_profile_pic.' + url[-3:] - if os.path.isfile(filename): - _log(filename + ' already exists', quiet=quiet) - return None - match = re.search('http.*://.*instagram.*[^/]*\\.(com|net)/[^/]+/.', url) - if match is None: - raise ConnectionException("URL \'" + url + "\' could not be processed.") - index = len(match.group(0))-1 - offset = 8 if match.group(0)[-1:] == 's' else 0 - url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index+offset:] - resp = get_anonymous_session().get(url, stream=True) - if resp.status_code == 200: - _log(filename, quiet=quiet) - os.makedirs(name.lower(), exist_ok=True) - with open(filename, 'wb') as file: - resp.raw.decode_content = True - shutil.copyfileobj(resp.raw, file) - os.utime(filename, (datetime.datetime.now().timestamp(), date_object.timestamp())) - else: - raise ConnectionException("File \'" + url + "\' could not be downloaded.") - - def get_default_session_filename(username: str) -> str: """Returns default session filename for given username.""" dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser() @@ -286,70 +89,29 @@ def get_default_session_filename(username: str) -> str: return filename -def save_session(session: requests.Session, username: str, filename: Optional[str] = None, quiet: bool = False) -> None: - """Saves requests.Session object.""" - if filename is None: - filename = get_default_session_filename(username) - dirname = os.path.dirname(filename) - if dirname != '' and not os.path.exists(dirname): - os.makedirs(dirname) - os.chmod(dirname, 0o700) - with open(filename, 'wb') as sessionfile: - os.chmod(filename, 0o600) - pickle.dump(requests.utils.dict_from_cookiejar(session.cookies), sessionfile) - _log("Saved session to %s." % filename, quiet=quiet) - - -def load_session(username: str, filename: Optional[str] = None, quiet: bool = False) -> requests.Session: - """Returns loaded requests.Session object, or None if not found.""" - if filename is None: - filename = get_default_session_filename(username) - try: - with open(filename, 'rb') as sessionfile: - session = requests.Session() - session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile)) - session.headers.update(default_http_header()) - session.headers.update({'X-CSRFToken':session.cookies.get_dict()['csrftoken']}) - _log("Loaded session from %s." % filename, quiet=quiet) - return session - except FileNotFoundError: - pass - - def copy_session(session: requests.Session) -> requests.Session: """Duplicates a requests.Session.""" new = requests.Session() new.cookies = \ - requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies)) + requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies)) new.headers = session.headers return new -def test_login(session: requests.Session) -> Optional[str]: - """Returns the Instagram username to which given requests.Session object belongs, or None.""" - if session is None: - return - data = get_json(str(), session) - if data['config']['viewer'] is None: - return - time.sleep(4 * random.random() + 1) - return data['config']['viewer']['username'] - - def default_http_header(empty_session_only: bool = False) -> Dict[str, str]: """Returns default HTTP header we use for requests.""" user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \ - '(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36' - header = { 'Accept-Encoding' : 'gzip, deflate', \ - 'Accept-Language' : 'en-US,en;q=0.8', \ - 'Connection' : 'keep-alive', \ - 'Content-Length' : '0', \ - 'Host' : 'www.instagram.com', \ - 'Origin' : 'https://www.instagram.com', \ - 'Referer' : 'https://www.instagram.com/', \ - 'User-Agent' : user_agent, \ - 'X-Instagram-AJAX' : '1', \ - 'X-Requested-With' : 'XMLHttpRequest'} + '(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36' + header = {'Accept-Encoding': 'gzip, deflate', + 'Accept-Language': 'en-US,en;q=0.8', + 'Connection': 'keep-alive', + 'Content-Length': '0', + 'Host': 'www.instagram.com', + 'Origin': 'https://www.instagram.com', + 'Referer': 'https://www.instagram.com/', + 'User-Agent': user_agent, + 'X-Instagram-AJAX': '1', + 'X-Requested-With': 'XMLHttpRequest'} if empty_session_only: del header['Host'] del header['Origin'] @@ -362,437 +124,665 @@ def default_http_header(empty_session_only: bool = False) -> Dict[str, str]: def get_anonymous_session() -> requests.Session: """Returns our default anonymous requests.Session object.""" session = requests.Session() - session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \ - 'ig_vw' : '1920', 'csrftoken' : '', \ - 's_network' : '', 'ds_user_id' : ''}) + session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1', + 'ig_vw': '1920', 'csrftoken': '', + 's_network': '', 'ds_user_id': ''}) session.headers.update(default_http_header(empty_session_only=True)) return session -def get_session(user: str, passwd: str) -> requests.Session: - """Log in to instagram with given username and password and return session object""" - session = requests.Session() - session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \ - 'ig_vw' : '1920', 'csrftoken' : '', \ - 's_network' : '', 'ds_user_id' : ''}) - session.headers.update(default_http_header()) - resp = session.get('https://www.instagram.com/') - session.headers.update({'X-CSRFToken':resp.cookies['csrftoken']}) - time.sleep(9 * random.random() + 3) - login = session.post('https://www.instagram.com/accounts/login/ajax/', \ - data={'password':passwd,'username':user}, allow_redirects=True) - session.headers.update({'X-CSRFToken':login.cookies['csrftoken']}) - time.sleep(5 * random.random()) - if login.status_code == 200: - if user == test_login(session): - return session +class Instaloader: + def __init__(self, + sleep: bool = True, quiet: bool = False, shorter_output: bool = False): + self.session = get_anonymous_session() + self.username = None + self.sleep = sleep + self.quiet = quiet + self.shorter_output = shorter_output + + def _log(self, *msg, sep='', end='\n', flush=False): + if not self.quiet: + print(*msg, sep=sep, end=end, flush=flush) + + def get_json(self, name: str, session: requests.Session = None, + max_id: Optional[str] = None) -> Optional[Dict[str, Any]]: + """Return JSON of a profile""" + if session is None: + session = self.session + if not max_id: + resp = session.get('https://www.instagram.com/' + name) else: - raise BadCredentialsException('Login error! Check your credentials!') - else: - raise ConnectionException('Login error! Connection error!') + resp = session.get('https://www.instagram.com/' + name, params={'max_id': max_id}) + if self.sleep: + time.sleep(4 * random.random() + 1) + match = re.search('window\\._sharedData = .*<', resp.text) + if match is not None: + return json.loads(match.group(0)[21:-2]) - -def get_feed_json(session: requests.Session, end_cursor: str = None, sleep: bool = True) -> Dict[str, Any]: - """ - Get JSON of the user's feed. - - :param session: Session belonging to a user, i.e. not an anonymous session - :param end_cursor: The end cursor, as from json["feed"]["media"]["page_info"]["end_cursor"] - :param sleep: Sleep between requests to instagram server - :return: JSON - """ - if end_cursor is None: - return get_json(str(), session, sleep=sleep)["entry_data"]["FeedPage"][0] - tmpsession = copy_session(session) - query = "q=ig_me()+%7B%0A++feed+%7B%0A++++media.after(" + end_cursor + "%2C+12)+%7B%0A"+\ - "++++++nodes+%7B%0A++++++++id%2C%0A++++++++caption%2C%0A++++++++code%2C%0A++++++++"+\ - "comments.last(4)+%7B%0A++++++++++count%2C%0A++++++++++nodes+%7B%0A++++++++++++"+\ - "id%2C%0A++++++++++++created_at%2C%0A++++++++++++text%2C%0A++++++++++++"+\ - "user+%7B%0A++++++++++++++id%2C%0A++++++++++++++profile_pic_url%2C%0A++++++++++++++"+\ - "username%0A++++++++++++%7D%0A++++++++++%7D%2C%0A++++++++++"+\ - "page_info%0A++++++++%7D%2C%0A++++++++comments_disabled%2C%0A++++++++"+\ - "date%2C%0A++++++++dimensions+%7B%0A++++++++++height%2C%0A++++++++++"+\ - "width%0A++++++++%7D%2C%0A++++++++display_src%2C%0A++++++++is_video%2C%0A++++++++"+\ - "likes+%7B%0A++++++++++count%2C%0A++++++++++nodes+%7B%0A++++++++++++"+\ - "user+%7B%0A++++++++++++++id%2C%0A++++++++++++++profile_pic_url%2C%0A++++++++++++++"+\ - "username%0A++++++++++++%7D%0A++++++++++%7D%2C%0A++++++++++"+\ - "viewer_has_liked%0A++++++++%7D%2C%0A++++++++location+%7B%0A++++++++++"+\ - "id%2C%0A++++++++++has_public_page%2C%0A++++++++++name%0A++++++++%7D%2C%0A++++++++"+\ - "owner+%7B%0A++++++++++id%2C%0A++++++++++blocked_by_viewer%2C%0A++++++++++"+\ - "followed_by_viewer%2C%0A++++++++++full_name%2C%0A++++++++++"+\ - "has_blocked_viewer%2C%0A++++++++++is_private%2C%0A++++++++++"+\ - "profile_pic_url%2C%0A++++++++++requested_by_viewer%2C%0A++++++++++"+\ - "username%0A++++++++%7D%2C%0A++++++++usertags+%7B%0A++++++++++"+\ - "nodes+%7B%0A++++++++++++user+%7B%0A++++++++++++++"+\ - "username%0A++++++++++++%7D%2C%0A++++++++++++x%2C%0A++++++++++++y%0A++++++++++"+\ - "%7D%0A++++++++%7D%2C%0A++++++++video_url%2C%0A++++++++"+\ - "video_views%0A++++++%7D%2C%0A++++++page_info%0A++++%7D%0A++%7D%2C%0A++id%2C%0A++"+\ - "profile_pic_url%2C%0A++username%0A%7D%0A&ref=feed::show" - tmpsession.headers.update(default_http_header()) - tmpsession.headers.update({'Referer' : 'https://www.instagram.com/'}) - tmpsession.headers.update({'Content-Type' : 'application/x-www-form-urlencoded'}) - resp = tmpsession.post('https://www.instagram.com/query/', data=query) - if sleep: - time.sleep(4 * random.random() + 1) - return json.loads(resp.text) - - -def get_location(session: requests.Session, node_code: str, sleep: bool = True) -> Dict[str, str]: - pic_json = get_json("p/" + node_code, session, sleep=sleep) - media = pic_json["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] \ - if "graphql" in pic_json["entry_data"]["PostPage"][0] \ - else pic_json["entry_data"]["PostPage"][0]["media"] - if media["location"] is not None: - location_json = get_json("explore/locations/" + - media["location"]["id"], - session, sleep=sleep) - return location_json["entry_data"]["LocationsPage"][0]["location"] - - -def download_node(node: Dict[str, Any], session: requests.Session, name: str, - download_videos: bool = True, geotags: bool = False, - sleep: bool = True, shorter_output: bool = False, quiet: bool = False) -> bool: - """ - Download everything associated with one instagram node, i.e. picture, caption and video. - - :param node: Node, as from media->nodes list in instagram's JSONs - :param session: Session - :param name: Name of profile to which this node belongs - :param download_videos: True, if videos should be downloaded - :param geotags: Download geotags - :param sleep: Sleep between requests to instagram server - :param shorter_output: Shorten log output by not printing captions - :param quiet: Suppress output - :return: True if something was downloaded, False otherwise, i.e. file was already there - """ - # pylint:disable=too-many-branches,too-many-locals - date = node["date"] if "date" in node else node["taken_at_timestamp"] - if '__typename' in node: - if node['__typename'] == 'GraphSidecar': - sidecar_data = session.get('https://www.instagram.com/p/' + node['code'] + '/', params={'__a': 1}).json() - edge_number = 1 - downloaded = True - media = sidecar_data["graphql"]["shortcode_media"] if "graphql" in sidecar_data else sidecar_data["media"] - for edge in media['edge_sidecar_to_children']['edges']: - edge_downloaded = download_pic(name, edge['node']['display_url'],date, - filename_suffix=str(edge_number), quiet=quiet, - outputlabel=(str(edge_number) if edge_number != 1 else None)) - downloaded = downloaded and edge_downloaded - edge_number += 1 - if sleep: - time.sleep(1.75 * random.random() + 0.25) - elif node['__typename'] in ['GraphImage', 'GraphVideo']: - downloaded = download_pic(name, node["display_url"] if "display_url" in node else node["display_src"], - date, quiet=quiet) - if sleep: - time.sleep(1.75 * random.random() + 0.25) + def get_username_by_id(self, profile_id: int) -> str: + """To get the current username of a profile, given its unique ID, this function can be used. + session is required to be a logged-in (i.e. non-anonymous) session.""" + tempsession = copy_session(self.session) + tempsession.headers.update({'Content-Type': 'application/x-www-form-urlencoded'}) + resp = tempsession.post('https://www.instagram.com/query/', + data='q=ig_user(' + str(profile_id) + ')+%7B%0A++username%0A%7D%0A') + if resp.status_code == 200: + data = json.loads(resp.text) + if 'username' in data: + return json.loads(resp.text)['username'] + raise ProfileNotExistsException("No profile found, the user may have blocked " + + "you (id: " + str(profile_id) + ").") else: - _log("Warning: Unknown typename discovered:" + node['__typename']) - downloaded = False - else: - # Node is an old image or video. - downloaded = download_pic(name, node["display_src"], date, quiet=quiet) - if sleep: - time.sleep(1.75 * random.random() + 0.25) - if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]: - save_caption(name, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"], shorter_output, quiet) - elif "caption" in node: - save_caption(name, date, node["caption"], shorter_output, quiet) - else: - _log("", end=' ', flush=True, quiet=quiet) - node_code = node['shortcode'] if 'shortcode' in node else node['code'] - if node["is_video"] and download_videos: - video_data = get_json('p/' + node_code, session, sleep=sleep) - download_pic(name, - video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'], - date, 'mp4', quiet=quiet) - if geotags: - location = get_location(session, node_code, sleep) - if location: - save_location(name, location, date, quiet=quiet) - _log(quiet=quiet) - return downloaded + if self.test_login(self.session): + raise ProfileAccessDeniedException("Username could not be determined due to error {0} (id: {1})." + .format(str(resp.status_code), str(profile_id))) + raise LoginRequiredException("Login required to determine username (id: " + + str(profile_id) + ").") + def get_id_by_username(self, profile: str) -> int: + """Each Instagram profile has its own unique ID which stays unmodified even if a user changes + his/her username. To get said ID, given the profile's name, you may call this function.""" + data = self.get_json(profile, session=get_anonymous_session()) + if "ProfilePage" not in data["entry_data"]: + raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) + return int(data['entry_data']['ProfilePage'][0]['user']['id']) -def download_feed_pics(session: requests.Session, max_count: int = None, fast_update: bool = False, - filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, - download_videos: bool = True, geotags: bool = False, - shorter_output: bool = False, sleep: bool = True, quiet: bool = False) -> None: - """ - Download pictures from the user's feed. + def get_followees(self, profile: str) -> List[Dict[str, Any]]: + """ + Retrieve list of followees of given profile - Example to download up to the 20 pics the user last liked: - >>> download_feed_pics(load_session('USER'), max_count=20, fast_update=True, - >>> filter_func=lambda node: - >>> not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"]) + :param profile: Name of profile to lookup followees + :return: List of followees (list of dictionaries), as returned by instagram server + """ + tmpsession = copy_session(self.session) + data = self.get_json(profile, session=tmpsession) + profile_id = data['entry_data']['ProfilePage'][0]['user']['id'] + query = ["q=ig_user(" + profile_id + ")+%7B%0A" + "++follows.", + str(data['entry_data']['ProfilePage'][0]['user']['follows']['count']) + + ")+%7B%0A" + "++++count%2C%0A" + "++++page_info+%7B%0A" + "++++++end_cursor%2C%0A" + "++++++has_next_page%0A" + "++++%7D%2C%0A" + "++++nodes+%7B%0A" + "++++++id%2C%0A" + "++++++full_name%2C%0A" + "++++++username%2C%0A" + "++++++followed_by+%7B%0A" + "++++++++count%0A" + "++++++%7D%0A" + "++++%7D%0A" + "++%7D%0A" + "%7D%0A" + "&ref=relationships%3A%3Afollow_list"] + tmpsession.headers.update(default_http_header()) + tmpsession.headers.update({'Referer': 'https://www.instagram.com/' + profile + '/following/'}) + tmpsession.headers.update({'Content-Type': 'application/x-www-form-urlencoded'}) + resp = tmpsession.post('https://www.instagram.com/query/', data=query[0] + "first(" + query[1]) + if resp.status_code == 200: + data = json.loads(resp.text) + followees = [] + while True: + for followee in data['follows']['nodes']: + followee['follower_count'] = followee.pop('followed_by')['count'] + followees = followees + [followee] + if data['follows']['page_info']['has_next_page']: + resp = tmpsession.post('https://www.instagram.com/query/', + data="{0}after({1}%2C+{2}".format(query[0], + data['follows']['page_info']['end_cursor'], + query[1])) + data = json.loads(resp.text) + else: + break + return followees + if self.test_login(tmpsession): + raise ConnectionException("ConnectionError(" + str(resp.status_code) + "): " + "unable to gather followees.") + raise LoginRequiredException("Login required to gather followees.") - :param session: Session belonging to a user, i.e. not an anonymous session - :param max_count: Maximum count of pictures to download - :param fast_update: If true, abort when first already-downloaded picture is encountered - :param filter_func: function(node), which returns True if given picture should not be downloaded - :param download_videos: True, if videos should be downloaded - :param geotags: Download geotags - :param shorter_output: Shorten log output by not printing captions - :param sleep: Sleep between requests to instagram server - :param quiet: Suppress output - """ - # pylint:disable=too-many-locals - data = get_feed_json(session, sleep=sleep) - count = 1 - while True: - if "graphql" in data: - is_edge = True - feed = data["graphql"]["user"]["edge_web_feed_timeline"] + def download_pic(self, name: str, url: str, date_epoch: float, outputlabel: Optional[str] = None, + filename_suffix: Optional[str] = None) -> bool: + """Downloads and saves picture with given url under given directory with given timestamp. + Returns true, if file was actually downloaded, i.e. updated.""" + if outputlabel is None: + outputlabel = _epoch_to_string(date_epoch) + urlmatch = re.search('\\.[a-z]*\\?', url) + file_extension = url[-3:] if urlmatch is None else urlmatch.group(0)[1:-1] + filename = name.lower() + '/' + _epoch_to_string(date_epoch) + if filename_suffix is not None: + filename += '_' + filename_suffix + filename += '.' + file_extension + if os.path.isfile(filename): + self._log(outputlabel + ' exists', end=' ', flush=True) + return False + resp = get_anonymous_session().get(url, stream=True) + if resp.status_code == 200: + self._log(outputlabel, end=' ', flush=True) + os.makedirs(name.lower(), exist_ok=True) + with open(filename, 'wb') as file: + resp.raw.decode_content = True + shutil.copyfileobj(resp.raw, file) + os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) + return True else: - is_edge = False - feed = data["feed"]["media"] - for edge_or_node in feed["edges"] if is_edge else feed["nodes"]: - if max_count is not None and count > max_count: - return - node = edge_or_node["node"] if is_edge else edge_or_node - name = node["owner"]["username"] - if filter_func is not None and filter_func(node): - _log("" % name, flush=True, quiet=quiet) - continue - _log("[%3i] %s " % (count, name), end="", flush=True, quiet=quiet) - count += 1 - downloaded = download_node(node, session, name, - download_videos=download_videos, geotags=geotags, - sleep=sleep, shorter_output=shorter_output, quiet=quiet) - if fast_update and not downloaded: - return - if not feed["page_info"]["has_next_page"]: - break - data = get_feed_json(session, end_cursor=feed["page_info"]["end_cursor"], sleep=sleep) + raise ConnectionException("File \'" + url + "\' could not be downloaded.") - -def get_hashtag_json(hashtag: str, session: requests.Session, - max_id: Optional[str] = None, sleep: bool = True) -> Optional[Dict[str, Any]]: - """Return JSON of a #hashtag""" - return get_json(name='explore/tags/{0}/'.format(hashtag), session=session, max_id=max_id, sleep=sleep) - - -def download_hashtag(hashtag: str, session: requests.Session, - max_count: Optional[int] = None, - filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, - fast_update: bool = False, download_videos: bool = True, geotags: bool = False, - shorter_output: bool = False, sleep: bool = True, quiet: bool = False) -> None: - """Download pictures of one hashtag. - - To download the last 30 pictures with hashtag #cat, do - >>> download_hashtag('cat', session=get_anonymous_session(), max_count=30) - - :param hashtag: Hashtag to download, without leading '#' - :param session: Session belonging to a user, i.e. not an anonymous session - :param max_count: Maximum count of pictures to download - :param filter_func: function(node), which returns True if given picture should not be downloaded - :param fast_update: If true, abort when first already-downloaded picture is encountered - :param download_videos: True, if videos should be downloaded - :param geotags: Download geotags - :param shorter_output: Shorten log output by not printing captions - :param sleep: Sleep between requests to instagram server - :param quiet: Suppress output - """ - data = get_hashtag_json(hashtag, session, sleep=sleep) - count = 1 - while data: - for node in data['entry_data']['TagPage'][0]['tag']['media']['nodes']: - if max_count is not None and count > max_count: - return - _log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True, quiet=quiet) - if filter_func is not None and filter_func(node): - _log('', quiet=quiet) - continue - count += 1 - downloaded = download_node(node, session, '#{0}'.format(hashtag), - download_videos=download_videos, geotags=geotags, sleep=sleep, - shorter_output=shorter_output, quiet=quiet) - if fast_update and not downloaded: - return - if data['entry_data']['TagPage'][0]['tag']['media']['page_info']['has_next_page']: - data = get_hashtag_json(hashtag, session, sleep=sleep, - max_id=data['entry_data']['TagPage'][0]['tag']['media']['page_info']['end_cursor']) + def save_caption(self, name: str, date_epoch: float, caption: str) -> None: + """Updates picture caption""" + filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.txt' + pcaption = caption.replace('\n', ' ').strip() + caption = caption.encode("UTF-8") + if self.shorter_output: + pcaption = "txt" else: - break - - -def check_id(profile: str, session: requests.Session, json_data: Dict[str, Any], quiet: bool = False) -> str: - """ - Consult locally stored ID of profile with given name, check whether ID matches and whether name - has changed and return current name of the profile, and store ID of profile. - """ - profile_exists = len(json_data["entry_data"]) > 0 and "ProfilePage" in json_data["entry_data"] - is_logged_in = json_data["config"]["viewer"] is not None - try: - with open(profile + "/id", 'rb') as id_file: - profile_id = int(id_file.read()) - if (not profile_exists) or \ - (profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])): - if is_logged_in: - newname = get_username_by_id(session, profile_id) - _log("Profile {0} has changed its name to {1}.".format(profile, newname), - quiet=quiet) - os.rename(profile, newname) - return newname - if profile_exists: - raise ProfileNotExistsException("Profile {0} does not match the stored " - "unique ID {1}.".format(profile, profile_id)) - raise ProfileNotExistsException("Profile {0} does not exist. Please login to " - "update profile name. Unique ID: {1}." - .format(profile, profile_id)) - return profile - except FileNotFoundError: - pass - if profile_exists: - os.makedirs(profile.lower(), exist_ok=True) - with open(profile + "/id", 'w') as text_file: - profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id'] - text_file.write(profile_id+"\n") - _log("Stored ID {0} for profile {1}.".format(profile_id, profile), quiet=quiet) - return profile - raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) - - -def download(name: str, session: requests.Session, - profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, - fast_update: bool = False, shorter_output: bool = False, sleep: bool = True, - quiet: bool = False) -> None: - """Download one profile""" - # pylint:disable=too-many-branches,too-many-locals - # Get profile main page json - data = get_json(name, session, sleep=sleep) - # check if profile does exist or name has changed since last download - # and update name and json data if necessary - name_updated = check_id(name, session, data, quiet=quiet) - if name_updated != name: - name = name_updated - data = get_json(name, session, sleep=sleep) - # Download profile picture - download_profilepic(name, data["entry_data"]["ProfilePage"][0]["user"]["profile_pic_url"], - quiet=quiet) - if sleep: - time.sleep(1.75 * random.random() + 0.25) - if profile_pic_only: - return - # Catch some errors - if data["entry_data"]["ProfilePage"][0]["user"]["is_private"]: - if data["config"]["viewer"] is None: - raise LoginRequiredException("profile %s requires login" % name) - if not data["entry_data"]["ProfilePage"][0]["user"]["followed_by_viewer"]: - raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name) - else: - if data["config"]["viewer"] is not None: - _log("profile %s could also be downloaded anonymously." % name, quiet=quiet) - if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or - not data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) \ - and not profile_pic_only: - raise ProfileHasNoPicsException("Profile %s: no pics found." % name) - # Iterate over pictures and download them - def get_last_id(data): - if data["entry_data"] and data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: - return data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"][-1]["id"] - totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"] - count = 1 - while get_last_id(data) is not None: - for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: - _log("[%3i/%3i] " % (count, totalcount), end="", flush=True, quiet=quiet) - count += 1 - downloaded = download_node(node, session, name, - download_videos=download_videos, geotags=geotags, - sleep=sleep, shorter_output=shorter_output, quiet=quiet) - if fast_update and not downloaded: - return - data = get_json(name, session, max_id=get_last_id(data), sleep=sleep) - - -def get_logged_in_session(username: str, password: Optional[str] = None, quiet: bool = False) -> requests.Session: - """Logs in and returns session, asking user for password if needed""" - if password is not None: - return get_session(username, password) - if quiet: - raise LoginRequiredException("Quiet mode requires given password or valid " - "session file.") - while password is None: - password = getpass.getpass(prompt="Enter Instagram password for %s: " % username) + pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']' try: - return get_session(username, password) - except BadCredentialsException as err: - print(err, file=sys.stderr) - password = None - - -def download_profiles(profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None, - sessionfile: Optional[str] = None, max_count: Optional[int] = None, - profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, - fast_update: bool = False, - sleep: bool = True, shorter_output: bool = False, quiet: bool = False) -> None: - """Download set of profiles and handle sessions""" - # pylint:disable=too-many-branches,too-many-locals - # Login, if desired - if username is not None: - session = load_session(username, sessionfile, quiet=quiet) - if username != test_login(session): - session = get_logged_in_session(username, password, quiet) - _log("Logged in as %s." % username, quiet=quiet) - else: - session = get_anonymous_session() - # Try block for KeyboardInterrupt (save session on ^C) - failedtargets = [] - targets = set() - try: - # Generate set of targets - for pentry in profilelist: - if pentry[0] == '#': - _log("Retrieving pictures with hashtag {0}".format(pentry), quiet=quiet) - download_hashtag(hashtag=pentry[1:], session=session, max_count=max_count, fast_update=fast_update, - download_videos=download_videos, geotags=geotags, shorter_output=shorter_output, - sleep=sleep, quiet=quiet) - elif pentry[0] == '@' and username is not None: - _log("Retrieving followees of %s..." % pentry[1:], quiet=quiet) - followees = get_followees(pentry[1:], session) - targets.update([followee['username'] for followee in followees]) - elif pentry == ":feed-all" and username is not None: - _log("Retrieving pictures from your feed...", quiet=quiet) - download_feed_pics(session, fast_update=fast_update, max_count=max_count, - download_videos=download_videos, geotags=geotags, - shorter_output=shorter_output, sleep=sleep, quiet=quiet) - elif pentry == ":feed-liked" and username is not None: - _log("Retrieving pictures you liked from your feed...", quiet=quiet) - download_feed_pics(session, fast_update=fast_update, max_count=max_count, - filter_func=lambda node: - not node["likes"]["viewer_has_liked"] - if "likes" in node - else not node["viewer_has_liked"], - download_videos=download_videos, geotags=geotags, - shorter_output=shorter_output, sleep=sleep, quiet=quiet) - else: - targets.add(pentry) - if len(targets) > 1: - _log("Downloading %i profiles..." % len(targets), quiet=quiet) - # Iterate through targets list and download them - for target in targets: - try: + with open(filename, 'rb') as file: + file_caption = file.read() + if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'): try: - download(target, session, profile_pic_only, download_videos, - geotags, fast_update, shorter_output, sleep, quiet) - except ProfileNotExistsException as err: - if username is not None: - _log("\"Profile not exists\" - Trying again anonymously, helps in case you are just blocked") - download(target, get_anonymous_session(), profile_pic_only, download_videos, - geotags, fast_update, shorter_output, sleep, quiet) - else: - raise err - except NonfatalException as err: - failedtargets.append(target) + self._log(pcaption + ' unchanged', end=' ', flush=True) + except UnicodeEncodeError: + self._log('txt unchanged', end=' ', flush=True) + return None + else: + def get_filename(index): + return filename if index == 0 else (filename[:-4] + '_old_' + + (str(0) if index < 10 else str()) + str(index) + filename[-4:]) + + i = 0 + while os.path.isfile(get_filename(i)): + i = i + 1 + for index in range(i, 0, -1): + os.rename(get_filename(index - 1), get_filename(index)) + try: + self._log(pcaption + ' updated', end=' ', flush=True) + except UnicodeEncodeError: + self._log('txt updated', end=' ', flush=True) + except FileNotFoundError: + pass + try: + self._log(pcaption, end=' ', flush=True) + except UnicodeEncodeError: + self._log('txt', end=' ', flush=True) + os.makedirs(name.lower(), exist_ok=True) + with open(filename, 'wb') as text_file: + shutil.copyfileobj(BytesIO(caption), text_file) + os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) + + def save_location(self, name: str, location_json: Dict[str, str], date_epoch: float) -> None: + filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '_location.txt' + location_string = (location_json["name"] + "\n" + + "https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n".format(location_json["lat"], + location_json["lng"])) + os.makedirs(name.lower(), exist_ok=True) + with open(filename, 'wb') as text_file: + shutil.copyfileobj(BytesIO(location_string.encode()), text_file) + os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) + self._log('geo', end=' ', flush=True) + + def download_profilepic(self, name: str, url: str) -> None: + """Downloads and saves profile pic with given url.""" + date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"], + '%a, %d %b %Y %H:%M:%S GMT') + filename = name.lower() + '/' + _epoch_to_string(date_object.timestamp()) + '_UTC_profile_pic.' + url[-3:] + if os.path.isfile(filename): + self._log(filename + ' already exists') + return None + match = re.search('http.*://.*instagram.*[^/]*\\.(com|net)/[^/]+/.', url) + if match is None: + raise ConnectionException("URL \'" + url + "\' could not be processed.") + index = len(match.group(0)) - 1 + offset = 8 if match.group(0)[-1:] == 's' else 0 + url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index + offset:] + resp = get_anonymous_session().get(url, stream=True) + if resp.status_code == 200: + self._log(filename) + os.makedirs(name.lower(), exist_ok=True) + with open(filename, 'wb') as file: + resp.raw.decode_content = True + shutil.copyfileobj(resp.raw, file) + os.utime(filename, (datetime.datetime.now().timestamp(), date_object.timestamp())) + else: + raise ConnectionException("File \'" + url + "\' could not be downloaded.") + + def save_session_to_file(self, filename: Optional[str] = None) -> None: + """Saves requests.Session object.""" + if filename is None: + filename = get_default_session_filename(self.username) + dirname = os.path.dirname(filename) + if dirname != '' and not os.path.exists(dirname): + os.makedirs(dirname) + os.chmod(dirname, 0o700) + with open(filename, 'wb') as sessionfile: + os.chmod(filename, 0o600) + pickle.dump(requests.utils.dict_from_cookiejar(self.session.cookies), sessionfile) + self._log("Saved session to %s." % filename) + + def load_session_from_file(self, username: str, filename: Optional[str] = None) -> None: + """Returns loaded requests.Session object, or None if not found.""" + self.username = username + if filename is None: + filename = get_default_session_filename(username) + with open(filename, 'rb') as sessionfile: + session = requests.Session() + session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile)) + session.headers.update(default_http_header()) + session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']}) + self._log("Loaded session from %s." % filename) + self.session = session + self.username = username + + def test_login(self, session: requests.Session) -> Optional[str]: + """Returns the Instagram username to which given requests.Session object belongs, or None.""" + if self.session is None: + return + data = self.get_json(str(), session=session) + if data['config']['viewer'] is None: + return + time.sleep(4 * random.random() + 1) + return data['config']['viewer']['username'] + + def login(self, user: str, passwd: str) -> None: + """Log in to instagram with given username and password and return session object""" + session = requests.Session() + session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1', + 'ig_vw': '1920', 'csrftoken': '', + 's_network': '', 'ds_user_id': ''}) + session.headers.update(default_http_header()) + resp = session.get('https://www.instagram.com/') + session.headers.update({'X-CSRFToken': resp.cookies['csrftoken']}) + time.sleep(9 * random.random() + 3) + login = session.post('https://www.instagram.com/accounts/login/ajax/', + data={'password': passwd, 'username': user}, allow_redirects=True) + session.headers.update({'X-CSRFToken': login.cookies['csrftoken']}) + time.sleep(5 * random.random()) + if login.status_code == 200: + if user == self.test_login(session): + self.username = user + self.session = session + else: + raise BadCredentialsException('Login error! Check your credentials!') + else: + raise ConnectionException('Login error! Connection error!') + + def get_feed_json(self, end_cursor: str = None) -> Dict[str, Any]: + """ + Get JSON of the user's feed. + + :param end_cursor: The end cursor, as from json["feed"]["media"]["page_info"]["end_cursor"] + :return: JSON + """ + if end_cursor is None: + return self.get_json(str())["entry_data"]["FeedPage"][0] + tmpsession = copy_session(self.session) + query = "q=ig_me()+%7B%0A++feed+%7B%0A++++media.after(" + end_cursor + "%2C+12)+%7B%0A" + \ + "++++++nodes+%7B%0A++++++++id%2C%0A++++++++caption%2C%0A++++++++code%2C%0A++++++++" + \ + "comments.last(4)+%7B%0A++++++++++count%2C%0A++++++++++nodes+%7B%0A++++++++++++" + \ + "id%2C%0A++++++++++++created_at%2C%0A++++++++++++text%2C%0A++++++++++++" + \ + "user+%7B%0A++++++++++++++id%2C%0A++++++++++++++profile_pic_url%2C%0A++++++++++++++" + \ + "username%0A++++++++++++%7D%0A++++++++++%7D%2C%0A++++++++++" + \ + "page_info%0A++++++++%7D%2C%0A++++++++comments_disabled%2C%0A++++++++" + \ + "date%2C%0A++++++++dimensions+%7B%0A++++++++++height%2C%0A++++++++++" + \ + "width%0A++++++++%7D%2C%0A++++++++display_src%2C%0A++++++++is_video%2C%0A++++++++" + \ + "likes+%7B%0A++++++++++count%2C%0A++++++++++nodes+%7B%0A++++++++++++" + \ + "user+%7B%0A++++++++++++++id%2C%0A++++++++++++++profile_pic_url%2C%0A++++++++++++++" + \ + "username%0A++++++++++++%7D%0A++++++++++%7D%2C%0A++++++++++" + \ + "viewer_has_liked%0A++++++++%7D%2C%0A++++++++location+%7B%0A++++++++++" + \ + "id%2C%0A++++++++++has_public_page%2C%0A++++++++++name%0A++++++++%7D%2C%0A++++++++" + \ + "owner+%7B%0A++++++++++id%2C%0A++++++++++blocked_by_viewer%2C%0A++++++++++" + \ + "followed_by_viewer%2C%0A++++++++++full_name%2C%0A++++++++++" + \ + "has_blocked_viewer%2C%0A++++++++++is_private%2C%0A++++++++++" + \ + "profile_pic_url%2C%0A++++++++++requested_by_viewer%2C%0A++++++++++" + \ + "username%0A++++++++%7D%2C%0A++++++++usertags+%7B%0A++++++++++" + \ + "nodes+%7B%0A++++++++++++user+%7B%0A++++++++++++++" + \ + "username%0A++++++++++++%7D%2C%0A++++++++++++x%2C%0A++++++++++++y%0A++++++++++" + \ + "%7D%0A++++++++%7D%2C%0A++++++++video_url%2C%0A++++++++" + \ + "video_views%0A++++++%7D%2C%0A++++++page_info%0A++++%7D%0A++%7D%2C%0A++id%2C%0A++" + \ + "profile_pic_url%2C%0A++username%0A%7D%0A&ref=feed::show" + tmpsession.headers.update(default_http_header()) + tmpsession.headers.update({'Referer': 'https://www.instagram.com/'}) + tmpsession.headers.update({'Content-Type': 'application/x-www-form-urlencoded'}) + resp = tmpsession.post('https://www.instagram.com/query/', data=query) + if self.sleep: + time.sleep(4 * random.random() + 1) + return json.loads(resp.text) + + def get_location(self, node_code: str) -> Dict[str, str]: + pic_json = self.get_json("p/" + node_code) + media = pic_json["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] \ + if "graphql" in pic_json["entry_data"]["PostPage"][0] \ + else pic_json["entry_data"]["PostPage"][0]["media"] + if media["location"] is not None: + location_json = self.get_json("explore/locations/" + + media["location"]["id"]) + return location_json["entry_data"]["LocationsPage"][0]["location"] + + def download_node(self, node: Dict[str, Any], name: str, + download_videos: bool = True, geotags: bool = False) -> bool: + """ + Download everything associated with one instagram node, i.e. picture, caption and video. + + :param node: Node, as from media->nodes list in instagram's JSONs + :param name: Name of profile to which this node belongs + :param download_videos: True, if videos should be downloaded + :param geotags: Download geotags + :return: True if something was downloaded, False otherwise, i.e. file was already there + """ + # pylint:disable=too-many-branches,too-many-locals + date = node["date"] if "date" in node else node["taken_at_timestamp"] + if '__typename' in node: + if node['__typename'] == 'GraphSidecar': + sidecar_data = self.session.get('https://www.instagram.com/p/' + node['code'] + '/', + params={'__a': 1}).json() + edge_number = 1 + downloaded = True + media = sidecar_data["graphql"]["shortcode_media"] if "graphql" in sidecar_data else sidecar_data[ + "media"] + for edge in media['edge_sidecar_to_children']['edges']: + edge_downloaded = self.download_pic(name, edge['node']['display_url'], date, + filename_suffix=str(edge_number), + outputlabel=(str(edge_number) if edge_number != 1 else None)) + downloaded = downloaded and edge_downloaded + edge_number += 1 + if self.sleep: + time.sleep(1.75 * random.random() + 0.25) + elif node['__typename'] in ['GraphImage', 'GraphVideo']: + downloaded = self.download_pic(name, + node["display_url"] if "display_url" in node else node["display_src"], + date) + if self.sleep: + time.sleep(1.75 * random.random() + 0.25) + else: + self._log("Warning: Unknown typename discovered:" + node['__typename']) + downloaded = False + else: + # Node is an old image or video. + downloaded = self.download_pic(name, node["display_src"], date) + if self.sleep: + time.sleep(1.75 * random.random() + 0.25) + if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]: + self.save_caption(name, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"]) + elif "caption" in node: + self.save_caption(name, date, node["caption"]) + else: + self._log("", end=' ', flush=True) + node_code = node['shortcode'] if 'shortcode' in node else node['code'] + if node["is_video"] and download_videos: + video_data = self.get_json('p/' + node_code) + self.download_pic(name, + video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'], + date, 'mp4') + if geotags: + location = self.get_location(node_code) + if location: + self.save_location(name, location, date) + self._log() + return downloaded + + def download_feed_pics(self, max_count: int = None, fast_update: bool = False, + filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, + download_videos: bool = True, geotags: bool = False) -> None: + """ + Download pictures from the user's feed. + + Example to download up to the 20 pics the user last liked: + >>> loader = Instaloader() + >>> loader.load_session_from_file('USER') + >>> loader.download_feed_pics(max_count=20, fast_update=True, + >>> filter_func=lambda node: + >>> not node["likes"]["viewer_has_liked"] + >>> if "likes" in node else + >>> not node["viewer_has_liked"]) + + :param max_count: Maximum count of pictures to download + :param fast_update: If true, abort when first already-downloaded picture is encountered + :param filter_func: function(node), which returns True if given picture should not be downloaded + :param download_videos: True, if videos should be downloaded + :param geotags: Download geotags + """ + # pylint:disable=too-many-locals + data = self.get_feed_json() + count = 1 + while True: + if "graphql" in data: + is_edge = True + feed = data["graphql"]["user"]["edge_web_feed_timeline"] + else: + is_edge = False + feed = data["feed"]["media"] + for edge_or_node in feed["edges"] if is_edge else feed["nodes"]: + if max_count is not None and count > max_count: + return + node = edge_or_node["node"] if is_edge else edge_or_node + name = node["owner"]["username"] + if filter_func is not None and filter_func(node): + self._log("" % name, flush=True) + continue + self._log("[%3i] %s " % (count, name), end="", flush=True) + count += 1 + downloaded = self.download_node(node, name, + download_videos=download_videos, geotags=geotags) + if fast_update and not downloaded: + return + if not feed["page_info"]["has_next_page"]: + break + data = self.get_feed_json(end_cursor=feed["page_info"]["end_cursor"]) + + def get_hashtag_json(self, hashtag: str, + max_id: Optional[str] = None) -> Optional[Dict[str, Any]]: + """Return JSON of a #hashtag""" + return self.get_json(name='explore/tags/{0}/'.format(hashtag), max_id=max_id) + + def download_hashtag(self, hashtag: str, + max_count: Optional[int] = None, + filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, + fast_update: bool = False, download_videos: bool = True, geotags: bool = False) -> None: + """Download pictures of one hashtag. + + To download the last 30 pictures with hashtag #cat, do + >>> loader = Instaloader() + >>> loader.download_hashtag('cat', max_count=30) + + :param hashtag: Hashtag to download, without leading '#' + :param max_count: Maximum count of pictures to download + :param filter_func: function(node), which returns True if given picture should not be downloaded + :param fast_update: If true, abort when first already-downloaded picture is encountered + :param download_videos: True, if videos should be downloaded + :param geotags: Download geotags + """ + data = self.get_hashtag_json(hashtag) + count = 1 + while data: + for node in data['entry_data']['TagPage'][0]['tag']['media']['nodes']: + if max_count is not None and count > max_count: + return + self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True) + if filter_func is not None and filter_func(node): + self._log('') + continue + count += 1 + downloaded = self.download_node(node, '#{0}'.format(hashtag), + download_videos=download_videos, geotags=geotags) + if fast_update and not downloaded: + return + if data['entry_data']['TagPage'][0]['tag']['media']['page_info']['has_next_page']: + data = self.get_hashtag_json(hashtag, + max_id=data['entry_data']['TagPage'][0]['tag']['media']['page_info'][ + 'end_cursor']) + else: + break + + def check_id(self, profile: str, json_data: Dict[str, Any]) -> str: + """ + Consult locally stored ID of profile with given name, check whether ID matches and whether name + has changed and return current name of the profile, and store ID of profile. + """ + profile_exists = len(json_data["entry_data"]) > 0 and "ProfilePage" in json_data["entry_data"] + is_logged_in = json_data["config"]["viewer"] is not None + try: + with open(profile + "/id", 'rb') as id_file: + profile_id = int(id_file.read()) + if (not profile_exists) or \ + (profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])): + if is_logged_in: + newname = self.get_username_by_id(profile_id) + self._log("Profile {0} has changed its name to {1}.".format(profile, newname)) + os.rename(profile, newname) + return newname + if profile_exists: + raise ProfileNotExistsException("Profile {0} does not match the stored " + "unique ID {1}.".format(profile, profile_id)) + raise ProfileNotExistsException("Profile {0} does not exist. Please login to " + "update profile name. Unique ID: {1}." + .format(profile, profile_id)) + return profile + except FileNotFoundError: + pass + if profile_exists: + os.makedirs(profile.lower(), exist_ok=True) + with open(profile + "/id", 'w') as text_file: + profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id'] + text_file.write(profile_id + "\n") + self._log("Stored ID {0} for profile {1}.".format(profile_id, profile)) + return profile + raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) + + def download(self, name: str, + profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, + fast_update: bool = False) -> None: + """Download one profile""" + # pylint:disable=too-many-branches,too-many-locals + # Get profile main page json + data = self.get_json(name) + # check if profile does exist or name has changed since last download + # and update name and json data if necessary + name_updated = self.check_id(name, data) + if name_updated != name: + name = name_updated + data = self.get_json(name) + # Download profile picture + self.download_profilepic(name, data["entry_data"]["ProfilePage"][0]["user"]["profile_pic_url"]) + if self.sleep: + time.sleep(1.75 * random.random() + 0.25) + if profile_pic_only: + return + # Catch some errors + if data["entry_data"]["ProfilePage"][0]["user"]["is_private"]: + if data["config"]["viewer"] is None: + raise LoginRequiredException("profile %s requires login" % name) + if not data["entry_data"]["ProfilePage"][0]["user"]["followed_by_viewer"]: + raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name) + else: + if data["config"]["viewer"] is not None: + self._log("profile %s could also be downloaded anonymously." % name) + if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or + not data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) \ + and not profile_pic_only: + raise ProfileHasNoPicsException("Profile %s: no pics found." % name) + + # Iterate over pictures and download them + def get_last_id(data): + if data["entry_data"] and data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: + return data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"][-1]["id"] + + totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"] + count = 1 + while get_last_id(data) is not None: + for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: + self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True) + count += 1 + downloaded = self.download_node(node, name, + download_videos=download_videos, geotags=geotags) + if fast_update and not downloaded: + return + data = self.get_json(name, max_id=get_last_id(data)) + + def interactive_login(self, username: str, password: Optional[str] = None) -> None: + """Logs in and returns session, asking user for password if needed""" + if password is not None: + self.login(username, password) + if self.quiet: + raise LoginRequiredException("Quiet mode requires given password or valid session file.") + while password is None: + password = getpass.getpass(prompt="Enter Instagram password for %s: " % username) + try: + self.login(username, password) + except BadCredentialsException as err: print(err, file=sys.stderr) - except KeyboardInterrupt: - print("\nInterrupted by user.", file=sys.stderr) - if len(targets) > 1 and failedtargets: - print("Errors occured (see above) while downloading profiles: %s." % - ", ".join(failedtargets), file=sys.stderr) - # Save session if it is useful - if username is not None: - save_session(session, username, sessionfile, quiet=quiet) + password = None + + def download_profiles(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None, + sessionfile: Optional[str] = None, max_count: Optional[int] = None, + profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, + fast_update: bool = False) -> None: + """Download set of profiles and handle sessions""" + # pylint:disable=too-many-branches,too-many-locals + # Login, if desired + if username is not None: + self.load_session_from_file(username, sessionfile) + if username != self.test_login(self.session): + self.interactive_login(username, password) + self._log("Logged in as %s." % username) + # Try block for KeyboardInterrupt (save session on ^C) + failedtargets = [] + targets = set() + try: + # Generate set of targets + for pentry in profilelist: + if pentry[0] == '#': + self._log("Retrieving pictures with hashtag {0}".format(pentry)) + self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update, + download_videos=download_videos, geotags=geotags) + elif pentry[0] == '@' and username is not None: + self._log("Retrieving followees of %s..." % pentry[1:]) + followees = self.get_followees(pentry[1:]) + targets.update([followee['username'] for followee in followees]) + elif pentry == ":feed-all" and username is not None: + self._log("Retrieving pictures from your feed...") + self.download_feed_pics(fast_update=fast_update, max_count=max_count, + download_videos=download_videos, geotags=geotags) + elif pentry == ":feed-liked" and username is not None: + self._log("Retrieving pictures you liked from your feed...") + self.download_feed_pics(fast_update=fast_update, max_count=max_count, + filter_func=lambda node: + not node["likes"]["viewer_has_liked"] + if "likes" in node + else not node["viewer_has_liked"], + download_videos=download_videos, geotags=geotags) + else: + targets.add(pentry) + if len(targets) > 1: + self._log("Downloading %i profiles..." % len(targets)) + # Iterate through targets list and download them + for target in targets: + try: + try: + self.download(target, profile_pic_only, download_videos, + geotags, fast_update) + except ProfileNotExistsException as err: + if username is not None: + self._log( + "\"Profile not exists\" - Trying again anonymously, helps in case you are just blocked") + anonymous_loader = Instaloader(self.sleep, self.quiet, self.shorter_output) + anonymous_loader.download(target, profile_pic_only, download_videos, + geotags, fast_update) + else: + raise err + except NonfatalException as err: + failedtargets.append(target) + print(err, file=sys.stderr) + except KeyboardInterrupt: + print("\nInterrupted by user.", file=sys.stderr) + if len(targets) > 1 and failedtargets: + print("Errors occured (see above) while downloading profiles: %s." % + ", ".join(failedtargets), file=sys.stderr) + # Save session if it is useful + if username is not None: + self.save_session_to_file(sessionfile) + def main(): parser = ArgumentParser(description=__doc__, @@ -806,43 +796,43 @@ def main(): parser.add_argument('--version', action='version', version=__version__) parser.add_argument('-l', '--login', metavar='YOUR-USERNAME', - help='Login name for your Instagram account. Not needed to download public '\ - 'profiles, but if you want to download private profiles or all followees of '\ - 'some profile, you have to specify a username used to login.') + help='Login name for your Instagram account. Not needed to download public ' + 'profiles, but if you want to download private profiles or all followees of ' + 'some profile, you have to specify a username used to login.') parser.add_argument('-p', '--password', metavar='YOUR-PASSWORD', - help='Password for your Instagram account. If --login is given and there is '\ - 'not yet a valid session file, you\'ll be prompted for your password if '\ - '--password is not given. Specifying this option without --login has no '\ - 'effect.') + help='Password for your Instagram account. If --login is given and there is ' + 'not yet a valid session file, you\'ll be prompted for your password if ' + '--password is not given. Specifying this option without --login has no ' + 'effect.') parser.add_argument('-f', '--sessionfile', - help='File to store session key, defaults to '+ \ - get_default_session_filename("")) + help='File to store session key, defaults to ' + get_default_session_filename("")) parser.add_argument('-P', '--profile-pic-only', action='store_true', - help='Only download profile picture') + help='Only download profile picture') parser.add_argument('-V', '--skip-videos', action='store_true', - help='Do not download videos') + help='Do not download videos') parser.add_argument('-G', '--geotags', action='store_true', - help='Store geotags when available') + help='Store geotags when available') parser.add_argument('-F', '--fast-update', action='store_true', - help='Abort at encounter of first already-downloaded picture') + help='Abort at encounter of first already-downloaded picture') parser.add_argument('-c', '--count', help='Do not attempt to download more than COUNT posts. ' 'Applies only to #hashtag, :feed-all and :feed-liked.') parser.add_argument('-S', '--no-sleep', action='store_true', - help='Do not sleep between actual downloads of pictures') + help='Do not sleep between actual downloads of pictures') parser.add_argument('-O', '--shorter-output', action='store_true', - help='Do not display captions while downloading') + help='Do not display captions while downloading') parser.add_argument('-q', '--quiet', action='store_true', - help='Disable user interaction, i.e. do not print messages (except errors) and fail ' \ - 'if login credentials are needed but not given.') + help='Disable user interaction, i.e. do not print messages (except errors) and fail ' + 'if login credentials are needed but not given.') args = parser.parse_args() try: - download_profiles(args.profile, args.login, args.password, args.sessionfile, - int(args.count) if args.count is not None else None, - args.profile_pic_only, not args.skip_videos, args.geotags, args.fast_update, - not args.no_sleep, args.shorter_output, args.quiet) + loader = Instaloader(not args.no_sleep, args.quiet, args.shorter_output) + loader.download_profiles(args.profile, args.login, args.password, args.sessionfile, + int(args.count) if args.count is not None else None, + args.profile_pic_only, not args.skip_videos, args.geotags, args.fast_update) except InstaloaderException as err: raise SystemExit("Fatal error: %s" % err) + if __name__ == "__main__": main()