1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-09-12 00:32:24 +02:00

Refactor Instaloader's methods into a class

This commit is contained in:
Alexander Graf 2017-06-24 22:43:40 +02:00
parent 52492456ed
commit caf75a8135
3 changed files with 713 additions and 720 deletions

View File

@ -9,4 +9,4 @@ python:
install: install:
- pip install pylint requests - pip install pylint requests
script: script:
- python3 -m pylint -r n -d bad-whitespace,bad-continuation,missing-docstring,multiple-imports,too-many-arguments,locally-disabled,line-too-long instaloader - python3 -m pylint -r n -d bad-whitespace,missing-docstring,too-many-arguments,locally-disabled,line-too-long,too-many-public-methods instaloader

View File

@ -136,11 +136,14 @@ their follower count, do
import instaloader import instaloader
# login # Get instance
session = instaloader.get_logged_in_session(USERNAME) loader = instaloader.Instaloader()
# get followees # Login
followees = instaloader.get_followees(PROFILE, session) loader.interactive_login(USERNAME)
# Retrieve followees
followees = loader.get_followees(PROFILE)
for f in followees: for f in followees:
print("%i\t%s\t%s" % (f['follower_count'], f['username'], f['full_name'])) print("%i\t%s\t%s" % (f['follower_count'], f['username'], f['full_name']))
@ -150,7 +153,7 @@ Then, you may download all pictures of all followees with
for f in followees: for f in followees:
try: try:
instaloader.download(f['username'], session) loader.download(f['username'])
except instaloader.NonfatalException: except instaloader.NonfatalException:
pass pass
@ -158,7 +161,7 @@ You could also download your last 20 liked pics with
.. code:: python .. code:: python
instaloader.download_feed_pics(session, max_count=20, fast_update=True, oader.download_feed_pics(max_count=20, fast_update=True,
filter_func=lambda node: filter_func=lambda node:
not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"]) not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"])
@ -166,7 +169,7 @@ To download the last 20 pictures with hashtag #cat, do
.. code:: python .. code:: python
instaloader.download_hashtag('cat', session=instaloader.get_anonymous_session(), max_count=20) loader.download_hashtag('cat', max_count=20)
Each Instagram profile has its own unique ID which stays unmodified even Each Instagram profile has its own unique ID which stays unmodified even
if a user changes his/her username. To get said ID, given the profile's if a user changes his/her username. To get said ID, given the profile's
@ -174,7 +177,7 @@ name, you may call
.. code:: python .. code:: python
instaloader.get_id_by_username(PROFILE_NAME) loader.get_id_by_username(PROFILE_NAME)
``get_followees()`` also returns unique IDs for all loaded followees. To ``get_followees()`` also returns unique IDs for all loaded followees. To
get the current username of a profile, given this unique ID get the current username of a profile, given this unique ID
@ -182,4 +185,4 @@ get the current username of a profile, given this unique ID
.. code:: python .. code:: python
instaloader.get_username_by_id(session, followees[0]['id']) loader.get_username_by_id(followees[0]['id'])

View File

@ -3,13 +3,24 @@
"""Tool to download pictures (or videos) and captions from Instagram, from a given set """Tool to download pictures (or videos) and captions from Instagram, from a given set
of profiles (even if private), from your feed or from all followees of a given profile.""" of profiles (even if private), from your feed or from all followees of a given profile."""
import re, json, datetime, shutil, os, time, random, sys, pickle, getpass, tempfile import datetime
import getpass
import json
import os
import pickle
import random
import re
import shutil
import sys
import tempfile
import time
from argparse import ArgumentParser from argparse import ArgumentParser
from io import BytesIO from io import BytesIO
from numbers import Real from typing import Any, Callable, Dict, List, Optional
from typing import List, Optional, Any, Dict, Callable
import requests
import requests.utils
import requests, requests.utils
# To get version from setup.py for instaloader --version # To get version from setup.py for instaloader --version
import pkg_resources import pkg_resources
@ -33,58 +44,128 @@ class InstaloaderException(Exception):
"""Base exception for this script""" """Base exception for this script"""
pass pass
class NonfatalException(InstaloaderException): class NonfatalException(InstaloaderException):
"""Base exception for errors which should not cause instaloader to stop""" """Base exception for errors which should not cause instaloader to stop"""
pass pass
class ProfileNotExistsException(NonfatalException): class ProfileNotExistsException(NonfatalException):
pass pass
class ProfileAccessDeniedException(NonfatalException): class ProfileAccessDeniedException(NonfatalException):
pass pass
class ProfileHasNoPicsException(NonfatalException): class ProfileHasNoPicsException(NonfatalException):
pass pass
class PrivateProfileNotFollowedException(NonfatalException): class PrivateProfileNotFollowedException(NonfatalException):
pass pass
class LoginRequiredException(NonfatalException): class LoginRequiredException(NonfatalException):
pass pass
class BadCredentialsException(InstaloaderException): class BadCredentialsException(InstaloaderException):
pass pass
class ConnectionException(InstaloaderException): class ConnectionException(InstaloaderException):
pass pass
def _log(*msg, sep='', end='\n', flush=False, quiet=False): def _epoch_to_string(epoch: float) -> str:
if not quiet: return datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d_%H-%M-%S')
def get_default_session_filename(username: str) -> str:
"""Returns default session filename for given username."""
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
filename = dirname + "/" + "session-" + username
return filename
def copy_session(session: requests.Session) -> requests.Session:
"""Duplicates a requests.Session."""
new = requests.Session()
new.cookies = \
requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies))
new.headers = session.headers
return new
def default_http_header(empty_session_only: bool = False) -> Dict[str, str]:
"""Returns default HTTP header we use for requests."""
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36'
header = {'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive',
'Content-Length': '0',
'Host': 'www.instagram.com',
'Origin': 'https://www.instagram.com',
'Referer': 'https://www.instagram.com/',
'User-Agent': user_agent,
'X-Instagram-AJAX': '1',
'X-Requested-With': 'XMLHttpRequest'}
if empty_session_only:
del header['Host']
del header['Origin']
del header['Referer']
del header['X-Instagram-AJAX']
del header['X-Requested-With']
return header
def get_anonymous_session() -> requests.Session:
"""Returns our default anonymous requests.Session object."""
session = requests.Session()
session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''})
session.headers.update(default_http_header(empty_session_only=True))
return session
class Instaloader:
def __init__(self,
sleep: bool = True, quiet: bool = False, shorter_output: bool = False):
self.session = get_anonymous_session()
self.username = None
self.sleep = sleep
self.quiet = quiet
self.shorter_output = shorter_output
def _log(self, *msg, sep='', end='\n', flush=False):
if not self.quiet:
print(*msg, sep=sep, end=end, flush=flush) print(*msg, sep=sep, end=end, flush=flush)
def get_json(self, name: str, session: requests.Session = None,
def get_json(name: str, session: requests.Session, max_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
max_id: Optional[str] = None, sleep: bool = True) -> Optional[Dict[str, Any]]:
"""Return JSON of a profile""" """Return JSON of a profile"""
if session is None:
session = self.session
if not max_id: if not max_id:
resp = session.get('https://www.instagram.com/' + name) resp = session.get('https://www.instagram.com/' + name)
else: else:
resp = session.get('https://www.instagram.com/' + name, params={'max_id': max_id}) resp = session.get('https://www.instagram.com/' + name, params={'max_id': max_id})
if sleep: if self.sleep:
time.sleep(4 * random.random() + 1) time.sleep(4 * random.random() + 1)
match = re.search('window\\._sharedData = .*<', resp.text) match = re.search('window\\._sharedData = .*<', resp.text)
if match is not None: if match is not None:
return json.loads(match.group(0)[21:-2]) return json.loads(match.group(0)[21:-2])
def get_username_by_id(self, profile_id: int) -> str:
def get_username_by_id(session: requests.Session, profile_id: int) -> str:
"""To get the current username of a profile, given its unique ID, this function can be used. """To get the current username of a profile, given its unique ID, this function can be used.
session is required to be a logged-in (i.e. non-anonymous) session.""" session is required to be a logged-in (i.e. non-anonymous) session."""
tempsession = copy_session(session) tempsession = copy_session(self.session)
tempsession.headers.update({'Content-Type': 'application/x-www-form-urlencoded'}) tempsession.headers.update({'Content-Type': 'application/x-www-form-urlencoded'})
resp = tempsession.post('https://www.instagram.com/query/', data='q=ig_user(' + resp = tempsession.post('https://www.instagram.com/query/',
str(profile_id) +')+%7B%0A++username%0A%7D%0A') data='q=ig_user(' + str(profile_id) + ')+%7B%0A++username%0A%7D%0A')
if resp.status_code == 200: if resp.status_code == 200:
data = json.loads(resp.text) data = json.loads(resp.text)
if 'username' in data: if 'username' in data:
@ -92,36 +173,29 @@ def get_username_by_id(session: requests.Session, profile_id: int) -> str:
raise ProfileNotExistsException("No profile found, the user may have blocked " + raise ProfileNotExistsException("No profile found, the user may have blocked " +
"you (id: " + str(profile_id) + ").") "you (id: " + str(profile_id) + ").")
else: else:
if test_login(session): if self.test_login(self.session):
raise ProfileAccessDeniedException("Username could not be determined due to error {0} (id: {1})." raise ProfileAccessDeniedException("Username could not be determined due to error {0} (id: {1})."
.format(str(resp.status_code), str(profile_id))) .format(str(resp.status_code), str(profile_id)))
raise LoginRequiredException("Login required to determine username (id: " + raise LoginRequiredException("Login required to determine username (id: " +
str(profile_id) + ").") str(profile_id) + ").")
def get_id_by_username(self, profile: str) -> int:
def get_id_by_username(profile: str) -> int:
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes """Each Instagram profile has its own unique ID which stays unmodified even if a user changes
his/her username. To get said ID, given the profile's name, you may call this function.""" his/her username. To get said ID, given the profile's name, you may call this function."""
data = get_json(profile, get_anonymous_session()) data = self.get_json(profile, session=get_anonymous_session())
if "ProfilePage" not in data["entry_data"]: if "ProfilePage" not in data["entry_data"]:
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
return int(data['entry_data']['ProfilePage'][0]['user']['id']) return int(data['entry_data']['ProfilePage'][0]['user']['id'])
def get_followees(self, profile: str) -> List[Dict[str, Any]]:
def _epoch_to_string(epoch: Real) -> str:
return datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d_%H-%M-%S')
def get_followees(profile: str, session: requests.Session) -> List[Dict[str, Any]]:
""" """
Retrieve list of followees of given profile Retrieve list of followees of given profile
:param profile: Name of profile to lookup followees :param profile: Name of profile to lookup followees
:param session: Session belonging to a user, i.e. not an anonymous session
:return: List of followees (list of dictionaries), as returned by instagram server :return: List of followees (list of dictionaries), as returned by instagram server
""" """
tmpsession = copy_session(session) tmpsession = copy_session(self.session)
data = get_json(profile, tmpsession) data = self.get_json(profile, session=tmpsession)
profile_id = data['entry_data']['ProfilePage'][0]['user']['id'] profile_id = data['entry_data']['ProfilePage'][0]['user']['id']
query = ["q=ig_user(" + profile_id + ")+%7B%0A" query = ["q=ig_user(" + profile_id + ")+%7B%0A"
"++follows.", "++follows.",
@ -155,21 +229,20 @@ def get_followees(profile: str, session: requests.Session) -> List[Dict[str, Any
followee['follower_count'] = followee.pop('followed_by')['count'] followee['follower_count'] = followee.pop('followed_by')['count']
followees = followees + [followee] followees = followees + [followee]
if data['follows']['page_info']['has_next_page']: if data['follows']['page_info']['has_next_page']:
resp = tmpsession.post('https://www.instagram.com/query/', data=query[0] resp = tmpsession.post('https://www.instagram.com/query/',
+ "after(" data="{0}after({1}%2C+{2}".format(query[0],
+ data['follows']['page_info']['end_cursor'] data['follows']['page_info']['end_cursor'],
+ "%2C+" + query[1] ) query[1]))
data = json.loads(resp.text) data = json.loads(resp.text)
else: else:
break break
return followees return followees
if test_login(tmpsession): if self.test_login(tmpsession):
raise ConnectionException("ConnectionError(" + str(resp.status_code) + "): " raise ConnectionException("ConnectionError(" + str(resp.status_code) + "): "
"unable to gather followees.") "unable to gather followees.")
raise LoginRequiredException("Login required to gather followees.") raise LoginRequiredException("Login required to gather followees.")
def download_pic(self, name: str, url: str, date_epoch: float, outputlabel: Optional[str] = None,
def download_pic(name: str, url: str, date_epoch: Real, outputlabel: Optional[str] = None, quiet: bool = False,
filename_suffix: Optional[str] = None) -> bool: filename_suffix: Optional[str] = None) -> bool:
"""Downloads and saves picture with given url under given directory with given timestamp. """Downloads and saves picture with given url under given directory with given timestamp.
Returns true, if file was actually downloaded, i.e. updated.""" Returns true, if file was actually downloaded, i.e. updated."""
@ -182,11 +255,11 @@ def download_pic(name: str, url: str, date_epoch: Real, outputlabel: Optional[st
filename += '_' + filename_suffix filename += '_' + filename_suffix
filename += '.' + file_extension filename += '.' + file_extension
if os.path.isfile(filename): if os.path.isfile(filename):
_log(outputlabel + ' exists', end=' ', flush=True, quiet=quiet) self._log(outputlabel + ' exists', end=' ', flush=True)
return False return False
resp = get_anonymous_session().get(url, stream=True) resp = get_anonymous_session().get(url, stream=True)
if resp.status_code == 200: if resp.status_code == 200:
_log(outputlabel, end=' ', flush=True, quiet=quiet) self._log(outputlabel, end=' ', flush=True)
os.makedirs(name.lower(), exist_ok=True) os.makedirs(name.lower(), exist_ok=True)
with open(filename, 'wb') as file: with open(filename, 'wb') as file:
resp.raw.decode_content = True resp.raw.decode_content = True
@ -196,13 +269,12 @@ def download_pic(name: str, url: str, date_epoch: Real, outputlabel: Optional[st
else: else:
raise ConnectionException("File \'" + url + "\' could not be downloaded.") raise ConnectionException("File \'" + url + "\' could not be downloaded.")
def save_caption(self, name: str, date_epoch: float, caption: str) -> None:
def save_caption(name: str, date_epoch: Real, caption: str, shorter_output: bool = False, quiet: bool = False) -> None:
"""Updates picture caption""" """Updates picture caption"""
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.txt' filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.txt'
pcaption = caption.replace('\n', ' ').strip() pcaption = caption.replace('\n', ' ').strip()
caption = caption.encode("UTF-8") caption = caption.encode("UTF-8")
if shorter_output: if self.shorter_output:
pcaption = "txt" pcaption = "txt"
else: else:
pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']' pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']'
@ -211,55 +283,53 @@ def save_caption(name: str, date_epoch: Real, caption: str, shorter_output: bool
file_caption = file.read() file_caption = file.read()
if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'): if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'):
try: try:
_log(pcaption + ' unchanged', end=' ', flush=True, quiet=quiet) self._log(pcaption + ' unchanged', end=' ', flush=True)
except UnicodeEncodeError: except UnicodeEncodeError:
_log('txt unchanged', end=' ', flush=True, quiet=quiet) self._log('txt unchanged', end=' ', flush=True)
return None return None
else: else:
def get_filename(index): def get_filename(index):
return filename if index == 0 else (filename[:-4] + '_old_' + return filename if index == 0 else (filename[:-4] + '_old_' +
(str(0) if index < 10 else str()) + str(index) + filename[-4:]) (str(0) if index < 10 else str()) + str(index) + filename[-4:])
i = 0 i = 0
while os.path.isfile(get_filename(i)): while os.path.isfile(get_filename(i)):
i = i + 1 i = i + 1
for index in range(i, 0, -1): for index in range(i, 0, -1):
os.rename(get_filename(index - 1), get_filename(index)) os.rename(get_filename(index - 1), get_filename(index))
try: try:
_log(pcaption + ' updated', end=' ', flush=True, quiet=quiet) self._log(pcaption + ' updated', end=' ', flush=True)
except UnicodeEncodeError: except UnicodeEncodeError:
_log('txt updated', end=' ', flush=True, quiet=quiet) self._log('txt updated', end=' ', flush=True)
except FileNotFoundError: except FileNotFoundError:
pass pass
try: try:
_log(pcaption, end=' ', flush=True, quiet=quiet) self._log(pcaption, end=' ', flush=True)
except UnicodeEncodeError: except UnicodeEncodeError:
_log('txt', end=' ', flush=True, quiet=quiet) self._log('txt', end=' ', flush=True)
os.makedirs(name.lower(), exist_ok=True) os.makedirs(name.lower(), exist_ok=True)
with open(filename, 'wb') as text_file: with open(filename, 'wb') as text_file:
shutil.copyfileobj(BytesIO(caption), text_file) shutil.copyfileobj(BytesIO(caption), text_file)
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch))
def save_location(self, name: str, location_json: Dict[str, str], date_epoch: float) -> None:
def save_location(name: str, location_json: Dict[str, str], date_epoch: Real, quiet: bool = False) -> None:
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '_location.txt' filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '_location.txt'
location_string = location_json["name"]+"\n" + \ location_string = (location_json["name"] + "\n" +
"https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n" \ "https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n".format(location_json["lat"],
.format(location_json["lat"], location_json["lng"]) location_json["lng"]))
os.makedirs(name.lower(), exist_ok=True) os.makedirs(name.lower(), exist_ok=True)
with open(filename, 'wb') as text_file: with open(filename, 'wb') as text_file:
shutil.copyfileobj(BytesIO(location_string.encode()), text_file) shutil.copyfileobj(BytesIO(location_string.encode()), text_file)
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch))
_log('geo', end=' ', flush=True, quiet=quiet) self._log('geo', end=' ', flush=True)
def download_profilepic(self, name: str, url: str) -> None:
def download_profilepic(name: str, url: str, quiet: bool = False) -> None:
"""Downloads and saves profile pic with given url.""" """Downloads and saves profile pic with given url."""
date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"], \ date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"],
'%a, %d %b %Y %H:%M:%S GMT') '%a, %d %b %Y %H:%M:%S GMT')
filename = name.lower() + '/' + _epoch_to_string(date_object.timestamp()) + \ filename = name.lower() + '/' + _epoch_to_string(date_object.timestamp()) + '_UTC_profile_pic.' + url[-3:]
'_UTC_profile_pic.' + url[-3:]
if os.path.isfile(filename): if os.path.isfile(filename):
_log(filename + ' already exists', quiet=quiet) self._log(filename + ' already exists')
return None return None
match = re.search('http.*://.*instagram.*[^/]*\\.(com|net)/[^/]+/.', url) match = re.search('http.*://.*instagram.*[^/]*\\.(com|net)/[^/]+/.', url)
if match is None: if match is None:
@ -269,7 +339,7 @@ def download_profilepic(name: str, url: str, quiet: bool = False) -> None:
url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index + offset:] url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index + offset:]
resp = get_anonymous_session().get(url, stream=True) resp = get_anonymous_session().get(url, stream=True)
if resp.status_code == 200: if resp.status_code == 200:
_log(filename, quiet=quiet) self._log(filename)
os.makedirs(name.lower(), exist_ok=True) os.makedirs(name.lower(), exist_ok=True)
with open(filename, 'wb') as file: with open(filename, 'wb') as file:
resp.raw.decode_content = True resp.raw.decode_content = True
@ -278,132 +348,76 @@ def download_profilepic(name: str, url: str, quiet: bool = False) -> None:
else: else:
raise ConnectionException("File \'" + url + "\' could not be downloaded.") raise ConnectionException("File \'" + url + "\' could not be downloaded.")
def save_session_to_file(self, filename: Optional[str] = None) -> None:
def get_default_session_filename(username: str) -> str:
"""Returns default session filename for given username."""
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
filename = dirname + "/" + "session-" + username
return filename
def save_session(session: requests.Session, username: str, filename: Optional[str] = None, quiet: bool = False) -> None:
"""Saves requests.Session object.""" """Saves requests.Session object."""
if filename is None: if filename is None:
filename = get_default_session_filename(username) filename = get_default_session_filename(self.username)
dirname = os.path.dirname(filename) dirname = os.path.dirname(filename)
if dirname != '' and not os.path.exists(dirname): if dirname != '' and not os.path.exists(dirname):
os.makedirs(dirname) os.makedirs(dirname)
os.chmod(dirname, 0o700) os.chmod(dirname, 0o700)
with open(filename, 'wb') as sessionfile: with open(filename, 'wb') as sessionfile:
os.chmod(filename, 0o600) os.chmod(filename, 0o600)
pickle.dump(requests.utils.dict_from_cookiejar(session.cookies), sessionfile) pickle.dump(requests.utils.dict_from_cookiejar(self.session.cookies), sessionfile)
_log("Saved session to %s." % filename, quiet=quiet) self._log("Saved session to %s." % filename)
def load_session_from_file(self, username: str, filename: Optional[str] = None) -> None:
def load_session(username: str, filename: Optional[str] = None, quiet: bool = False) -> requests.Session:
"""Returns loaded requests.Session object, or None if not found.""" """Returns loaded requests.Session object, or None if not found."""
self.username = username
if filename is None: if filename is None:
filename = get_default_session_filename(username) filename = get_default_session_filename(username)
try:
with open(filename, 'rb') as sessionfile: with open(filename, 'rb') as sessionfile:
session = requests.Session() session = requests.Session()
session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile)) session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile))
session.headers.update(default_http_header()) session.headers.update(default_http_header())
session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']}) session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']})
_log("Loaded session from %s." % filename, quiet=quiet) self._log("Loaded session from %s." % filename)
return session self.session = session
except FileNotFoundError: self.username = username
pass
def test_login(self, session: requests.Session) -> Optional[str]:
def copy_session(session: requests.Session) -> requests.Session:
"""Duplicates a requests.Session."""
new = requests.Session()
new.cookies = \
requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies))
new.headers = session.headers
return new
def test_login(session: requests.Session) -> Optional[str]:
"""Returns the Instagram username to which given requests.Session object belongs, or None.""" """Returns the Instagram username to which given requests.Session object belongs, or None."""
if session is None: if self.session is None:
return return
data = get_json(str(), session) data = self.get_json(str(), session=session)
if data['config']['viewer'] is None: if data['config']['viewer'] is None:
return return
time.sleep(4 * random.random() + 1) time.sleep(4 * random.random() + 1)
return data['config']['viewer']['username'] return data['config']['viewer']['username']
def login(self, user: str, passwd: str) -> None:
def default_http_header(empty_session_only: bool = False) -> Dict[str, str]:
"""Returns default HTTP header we use for requests."""
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36'
header = { 'Accept-Encoding' : 'gzip, deflate', \
'Accept-Language' : 'en-US,en;q=0.8', \
'Connection' : 'keep-alive', \
'Content-Length' : '0', \
'Host' : 'www.instagram.com', \
'Origin' : 'https://www.instagram.com', \
'Referer' : 'https://www.instagram.com/', \
'User-Agent' : user_agent, \
'X-Instagram-AJAX' : '1', \
'X-Requested-With' : 'XMLHttpRequest'}
if empty_session_only:
del header['Host']
del header['Origin']
del header['Referer']
del header['X-Instagram-AJAX']
del header['X-Requested-With']
return header
def get_anonymous_session() -> requests.Session:
"""Returns our default anonymous requests.Session object."""
session = requests.Session()
session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \
'ig_vw' : '1920', 'csrftoken' : '', \
's_network' : '', 'ds_user_id' : ''})
session.headers.update(default_http_header(empty_session_only=True))
return session
def get_session(user: str, passwd: str) -> requests.Session:
"""Log in to instagram with given username and password and return session object""" """Log in to instagram with given username and password and return session object"""
session = requests.Session() session = requests.Session()
session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \ session.cookies.update({'sessionid': '', 'mid': '', 'ig_pr': '1',
'ig_vw' : '1920', 'csrftoken' : '', \ 'ig_vw': '1920', 'csrftoken': '',
's_network': '', 'ds_user_id': ''}) 's_network': '', 'ds_user_id': ''})
session.headers.update(default_http_header()) session.headers.update(default_http_header())
resp = session.get('https://www.instagram.com/') resp = session.get('https://www.instagram.com/')
session.headers.update({'X-CSRFToken': resp.cookies['csrftoken']}) session.headers.update({'X-CSRFToken': resp.cookies['csrftoken']})
time.sleep(9 * random.random() + 3) time.sleep(9 * random.random() + 3)
login = session.post('https://www.instagram.com/accounts/login/ajax/', \ login = session.post('https://www.instagram.com/accounts/login/ajax/',
data={'password': passwd, 'username': user}, allow_redirects=True) data={'password': passwd, 'username': user}, allow_redirects=True)
session.headers.update({'X-CSRFToken': login.cookies['csrftoken']}) session.headers.update({'X-CSRFToken': login.cookies['csrftoken']})
time.sleep(5 * random.random()) time.sleep(5 * random.random())
if login.status_code == 200: if login.status_code == 200:
if user == test_login(session): if user == self.test_login(session):
return session self.username = user
self.session = session
else: else:
raise BadCredentialsException('Login error! Check your credentials!') raise BadCredentialsException('Login error! Check your credentials!')
else: else:
raise ConnectionException('Login error! Connection error!') raise ConnectionException('Login error! Connection error!')
def get_feed_json(self, end_cursor: str = None) -> Dict[str, Any]:
def get_feed_json(session: requests.Session, end_cursor: str = None, sleep: bool = True) -> Dict[str, Any]:
""" """
Get JSON of the user's feed. Get JSON of the user's feed.
:param session: Session belonging to a user, i.e. not an anonymous session
:param end_cursor: The end cursor, as from json["feed"]["media"]["page_info"]["end_cursor"] :param end_cursor: The end cursor, as from json["feed"]["media"]["page_info"]["end_cursor"]
:param sleep: Sleep between requests to instagram server
:return: JSON :return: JSON
""" """
if end_cursor is None: if end_cursor is None:
return get_json(str(), session, sleep=sleep)["entry_data"]["FeedPage"][0] return self.get_json(str())["entry_data"]["FeedPage"][0]
tmpsession = copy_session(session) tmpsession = copy_session(self.session)
query = "q=ig_me()+%7B%0A++feed+%7B%0A++++media.after(" + end_cursor + "%2C+12)+%7B%0A" + \ query = "q=ig_me()+%7B%0A++feed+%7B%0A++++media.after(" + end_cursor + "%2C+12)+%7B%0A" + \
"++++++nodes+%7B%0A++++++++id%2C%0A++++++++caption%2C%0A++++++++code%2C%0A++++++++" + \ "++++++nodes+%7B%0A++++++++id%2C%0A++++++++caption%2C%0A++++++++code%2C%0A++++++++" + \
"comments.last(4)+%7B%0A++++++++++count%2C%0A++++++++++nodes+%7B%0A++++++++++++" + \ "comments.last(4)+%7B%0A++++++++++count%2C%0A++++++++++nodes+%7B%0A++++++++++++" + \
@ -432,112 +446,105 @@ def get_feed_json(session: requests.Session, end_cursor: str = None, sleep: bool
tmpsession.headers.update({'Referer': 'https://www.instagram.com/'}) tmpsession.headers.update({'Referer': 'https://www.instagram.com/'})
tmpsession.headers.update({'Content-Type': 'application/x-www-form-urlencoded'}) tmpsession.headers.update({'Content-Type': 'application/x-www-form-urlencoded'})
resp = tmpsession.post('https://www.instagram.com/query/', data=query) resp = tmpsession.post('https://www.instagram.com/query/', data=query)
if sleep: if self.sleep:
time.sleep(4 * random.random() + 1) time.sleep(4 * random.random() + 1)
return json.loads(resp.text) return json.loads(resp.text)
def get_location(self, node_code: str) -> Dict[str, str]:
def get_location(session: requests.Session, node_code: str, sleep: bool = True) -> Dict[str, str]: pic_json = self.get_json("p/" + node_code)
pic_json = get_json("p/" + node_code, session, sleep=sleep)
media = pic_json["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] \ media = pic_json["entry_data"]["PostPage"][0]["graphql"]["shortcode_media"] \
if "graphql" in pic_json["entry_data"]["PostPage"][0] \ if "graphql" in pic_json["entry_data"]["PostPage"][0] \
else pic_json["entry_data"]["PostPage"][0]["media"] else pic_json["entry_data"]["PostPage"][0]["media"]
if media["location"] is not None: if media["location"] is not None:
location_json = get_json("explore/locations/" + location_json = self.get_json("explore/locations/" +
media["location"]["id"], media["location"]["id"])
session, sleep=sleep)
return location_json["entry_data"]["LocationsPage"][0]["location"] return location_json["entry_data"]["LocationsPage"][0]["location"]
def download_node(self, node: Dict[str, Any], name: str,
def download_node(node: Dict[str, Any], session: requests.Session, name: str, download_videos: bool = True, geotags: bool = False) -> bool:
download_videos: bool = True, geotags: bool = False,
sleep: bool = True, shorter_output: bool = False, quiet: bool = False) -> bool:
""" """
Download everything associated with one instagram node, i.e. picture, caption and video. Download everything associated with one instagram node, i.e. picture, caption and video.
:param node: Node, as from media->nodes list in instagram's JSONs :param node: Node, as from media->nodes list in instagram's JSONs
:param session: Session
:param name: Name of profile to which this node belongs :param name: Name of profile to which this node belongs
:param download_videos: True, if videos should be downloaded :param download_videos: True, if videos should be downloaded
:param geotags: Download geotags :param geotags: Download geotags
:param sleep: Sleep between requests to instagram server
:param shorter_output: Shorten log output by not printing captions
:param quiet: Suppress output
:return: True if something was downloaded, False otherwise, i.e. file was already there :return: True if something was downloaded, False otherwise, i.e. file was already there
""" """
# pylint:disable=too-many-branches,too-many-locals # pylint:disable=too-many-branches,too-many-locals
date = node["date"] if "date" in node else node["taken_at_timestamp"] date = node["date"] if "date" in node else node["taken_at_timestamp"]
if '__typename' in node: if '__typename' in node:
if node['__typename'] == 'GraphSidecar': if node['__typename'] == 'GraphSidecar':
sidecar_data = session.get('https://www.instagram.com/p/' + node['code'] + '/', params={'__a': 1}).json() sidecar_data = self.session.get('https://www.instagram.com/p/' + node['code'] + '/',
params={'__a': 1}).json()
edge_number = 1 edge_number = 1
downloaded = True downloaded = True
media = sidecar_data["graphql"]["shortcode_media"] if "graphql" in sidecar_data else sidecar_data["media"] media = sidecar_data["graphql"]["shortcode_media"] if "graphql" in sidecar_data else sidecar_data[
"media"]
for edge in media['edge_sidecar_to_children']['edges']: for edge in media['edge_sidecar_to_children']['edges']:
edge_downloaded = download_pic(name, edge['node']['display_url'],date, edge_downloaded = self.download_pic(name, edge['node']['display_url'], date,
filename_suffix=str(edge_number), quiet=quiet, filename_suffix=str(edge_number),
outputlabel=(str(edge_number) if edge_number != 1 else None)) outputlabel=(str(edge_number) if edge_number != 1 else None))
downloaded = downloaded and edge_downloaded downloaded = downloaded and edge_downloaded
edge_number += 1 edge_number += 1
if sleep: if self.sleep:
time.sleep(1.75 * random.random() + 0.25) time.sleep(1.75 * random.random() + 0.25)
elif node['__typename'] in ['GraphImage', 'GraphVideo']: elif node['__typename'] in ['GraphImage', 'GraphVideo']:
downloaded = download_pic(name, node["display_url"] if "display_url" in node else node["display_src"], downloaded = self.download_pic(name,
date, quiet=quiet) node["display_url"] if "display_url" in node else node["display_src"],
if sleep: date)
if self.sleep:
time.sleep(1.75 * random.random() + 0.25) time.sleep(1.75 * random.random() + 0.25)
else: else:
_log("Warning: Unknown typename discovered:" + node['__typename']) self._log("Warning: Unknown typename discovered:" + node['__typename'])
downloaded = False downloaded = False
else: else:
# Node is an old image or video. # Node is an old image or video.
downloaded = download_pic(name, node["display_src"], date, quiet=quiet) downloaded = self.download_pic(name, node["display_src"], date)
if sleep: if self.sleep:
time.sleep(1.75 * random.random() + 0.25) time.sleep(1.75 * random.random() + 0.25)
if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]: if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]:
save_caption(name, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"], shorter_output, quiet) self.save_caption(name, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"])
elif "caption" in node: elif "caption" in node:
save_caption(name, date, node["caption"], shorter_output, quiet) self.save_caption(name, date, node["caption"])
else: else:
_log("<no caption>", end=' ', flush=True, quiet=quiet) self._log("<no caption>", end=' ', flush=True)
node_code = node['shortcode'] if 'shortcode' in node else node['code'] node_code = node['shortcode'] if 'shortcode' in node else node['code']
if node["is_video"] and download_videos: if node["is_video"] and download_videos:
video_data = get_json('p/' + node_code, session, sleep=sleep) video_data = self.get_json('p/' + node_code)
download_pic(name, self.download_pic(name,
video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'], video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'],
date, 'mp4', quiet=quiet) date, 'mp4')
if geotags: if geotags:
location = get_location(session, node_code, sleep) location = self.get_location(node_code)
if location: if location:
save_location(name, location, date, quiet=quiet) self.save_location(name, location, date)
_log(quiet=quiet) self._log()
return downloaded return downloaded
def download_feed_pics(self, max_count: int = None, fast_update: bool = False,
def download_feed_pics(session: requests.Session, max_count: int = None, fast_update: bool = False,
filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None,
download_videos: bool = True, geotags: bool = False, download_videos: bool = True, geotags: bool = False) -> None:
shorter_output: bool = False, sleep: bool = True, quiet: bool = False) -> None:
""" """
Download pictures from the user's feed. Download pictures from the user's feed.
Example to download up to the 20 pics the user last liked: Example to download up to the 20 pics the user last liked:
>>> download_feed_pics(load_session('USER'), max_count=20, fast_update=True, >>> loader = Instaloader()
>>> loader.load_session_from_file('USER')
>>> loader.download_feed_pics(max_count=20, fast_update=True,
>>> filter_func=lambda node: >>> filter_func=lambda node:
>>> not node["likes"]["viewer_has_liked"] if "likes" in node else not node["viewer_has_liked"]) >>> not node["likes"]["viewer_has_liked"]
>>> if "likes" in node else
>>> not node["viewer_has_liked"])
:param session: Session belonging to a user, i.e. not an anonymous session
:param max_count: Maximum count of pictures to download :param max_count: Maximum count of pictures to download
:param fast_update: If true, abort when first already-downloaded picture is encountered :param fast_update: If true, abort when first already-downloaded picture is encountered
:param filter_func: function(node), which returns True if given picture should not be downloaded :param filter_func: function(node), which returns True if given picture should not be downloaded
:param download_videos: True, if videos should be downloaded :param download_videos: True, if videos should be downloaded
:param geotags: Download geotags :param geotags: Download geotags
:param shorter_output: Shorten log output by not printing captions
:param sleep: Sleep between requests to instagram server
:param quiet: Suppress output
""" """
# pylint:disable=too-many-locals # pylint:disable=too-many-locals
data = get_feed_json(session, sleep=sleep) data = self.get_feed_json()
count = 1 count = 1
while True: while True:
if "graphql" in data: if "graphql" in data:
@ -552,71 +559,63 @@ def download_feed_pics(session: requests.Session, max_count: int = None, fast_up
node = edge_or_node["node"] if is_edge else edge_or_node node = edge_or_node["node"] if is_edge else edge_or_node
name = node["owner"]["username"] name = node["owner"]["username"]
if filter_func is not None and filter_func(node): if filter_func is not None and filter_func(node):
_log("<pic by %s skipped>" % name, flush=True, quiet=quiet) self._log("<pic by %s skipped>" % name, flush=True)
continue continue
_log("[%3i] %s " % (count, name), end="", flush=True, quiet=quiet) self._log("[%3i] %s " % (count, name), end="", flush=True)
count += 1 count += 1
downloaded = download_node(node, session, name, downloaded = self.download_node(node, name,
download_videos=download_videos, geotags=geotags, download_videos=download_videos, geotags=geotags)
sleep=sleep, shorter_output=shorter_output, quiet=quiet)
if fast_update and not downloaded: if fast_update and not downloaded:
return return
if not feed["page_info"]["has_next_page"]: if not feed["page_info"]["has_next_page"]:
break break
data = get_feed_json(session, end_cursor=feed["page_info"]["end_cursor"], sleep=sleep) data = self.get_feed_json(end_cursor=feed["page_info"]["end_cursor"])
def get_hashtag_json(self, hashtag: str,
def get_hashtag_json(hashtag: str, session: requests.Session, max_id: Optional[str] = None) -> Optional[Dict[str, Any]]:
max_id: Optional[str] = None, sleep: bool = True) -> Optional[Dict[str, Any]]:
"""Return JSON of a #hashtag""" """Return JSON of a #hashtag"""
return get_json(name='explore/tags/{0}/'.format(hashtag), session=session, max_id=max_id, sleep=sleep) return self.get_json(name='explore/tags/{0}/'.format(hashtag), max_id=max_id)
def download_hashtag(self, hashtag: str,
def download_hashtag(hashtag: str, session: requests.Session,
max_count: Optional[int] = None, max_count: Optional[int] = None,
filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None, filter_func: Optional[Callable[[Dict[str, Dict[str, Any]]], bool]] = None,
fast_update: bool = False, download_videos: bool = True, geotags: bool = False, fast_update: bool = False, download_videos: bool = True, geotags: bool = False) -> None:
shorter_output: bool = False, sleep: bool = True, quiet: bool = False) -> None:
"""Download pictures of one hashtag. """Download pictures of one hashtag.
To download the last 30 pictures with hashtag #cat, do To download the last 30 pictures with hashtag #cat, do
>>> download_hashtag('cat', session=get_anonymous_session(), max_count=30) >>> loader = Instaloader()
>>> loader.download_hashtag('cat', max_count=30)
:param hashtag: Hashtag to download, without leading '#' :param hashtag: Hashtag to download, without leading '#'
:param session: Session belonging to a user, i.e. not an anonymous session
:param max_count: Maximum count of pictures to download :param max_count: Maximum count of pictures to download
:param filter_func: function(node), which returns True if given picture should not be downloaded :param filter_func: function(node), which returns True if given picture should not be downloaded
:param fast_update: If true, abort when first already-downloaded picture is encountered :param fast_update: If true, abort when first already-downloaded picture is encountered
:param download_videos: True, if videos should be downloaded :param download_videos: True, if videos should be downloaded
:param geotags: Download geotags :param geotags: Download geotags
:param shorter_output: Shorten log output by not printing captions
:param sleep: Sleep between requests to instagram server
:param quiet: Suppress output
""" """
data = get_hashtag_json(hashtag, session, sleep=sleep) data = self.get_hashtag_json(hashtag)
count = 1 count = 1
while data: while data:
for node in data['entry_data']['TagPage'][0]['tag']['media']['nodes']: for node in data['entry_data']['TagPage'][0]['tag']['media']['nodes']:
if max_count is not None and count > max_count: if max_count is not None and count > max_count:
return return
_log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True, quiet=quiet) self._log('[{0:3d}] #{1} '.format(count, hashtag), end='', flush=True)
if filter_func is not None and filter_func(node): if filter_func is not None and filter_func(node):
_log('<skipped>', quiet=quiet) self._log('<skipped>')
continue continue
count += 1 count += 1
downloaded = download_node(node, session, '#{0}'.format(hashtag), downloaded = self.download_node(node, '#{0}'.format(hashtag),
download_videos=download_videos, geotags=geotags, sleep=sleep, download_videos=download_videos, geotags=geotags)
shorter_output=shorter_output, quiet=quiet)
if fast_update and not downloaded: if fast_update and not downloaded:
return return
if data['entry_data']['TagPage'][0]['tag']['media']['page_info']['has_next_page']: if data['entry_data']['TagPage'][0]['tag']['media']['page_info']['has_next_page']:
data = get_hashtag_json(hashtag, session, sleep=sleep, data = self.get_hashtag_json(hashtag,
max_id=data['entry_data']['TagPage'][0]['tag']['media']['page_info']['end_cursor']) max_id=data['entry_data']['TagPage'][0]['tag']['media']['page_info'][
'end_cursor'])
else: else:
break break
def check_id(self, profile: str, json_data: Dict[str, Any]) -> str:
def check_id(profile: str, session: requests.Session, json_data: Dict[str, Any], quiet: bool = False) -> str:
""" """
Consult locally stored ID of profile with given name, check whether ID matches and whether name Consult locally stored ID of profile with given name, check whether ID matches and whether name
has changed and return current name of the profile, and store ID of profile. has changed and return current name of the profile, and store ID of profile.
@ -629,9 +628,8 @@ def check_id(profile: str, session: requests.Session, json_data: Dict[str, Any],
if (not profile_exists) or \ if (not profile_exists) or \
(profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])): (profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])):
if is_logged_in: if is_logged_in:
newname = get_username_by_id(session, profile_id) newname = self.get_username_by_id(profile_id)
_log("Profile {0} has changed its name to {1}.".format(profile, newname), self._log("Profile {0} has changed its name to {1}.".format(profile, newname))
quiet=quiet)
os.rename(profile, newname) os.rename(profile, newname)
return newname return newname
if profile_exists: if profile_exists:
@ -648,29 +646,26 @@ def check_id(profile: str, session: requests.Session, json_data: Dict[str, Any],
with open(profile + "/id", 'w') as text_file: with open(profile + "/id", 'w') as text_file:
profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id'] profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id']
text_file.write(profile_id + "\n") text_file.write(profile_id + "\n")
_log("Stored ID {0} for profile {1}.".format(profile_id, profile), quiet=quiet) self._log("Stored ID {0} for profile {1}.".format(profile_id, profile))
return profile return profile
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile)) raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
def download(self, name: str,
def download(name: str, session: requests.Session,
profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False,
fast_update: bool = False, shorter_output: bool = False, sleep: bool = True, fast_update: bool = False) -> None:
quiet: bool = False) -> None:
"""Download one profile""" """Download one profile"""
# pylint:disable=too-many-branches,too-many-locals # pylint:disable=too-many-branches,too-many-locals
# Get profile main page json # Get profile main page json
data = get_json(name, session, sleep=sleep) data = self.get_json(name)
# check if profile does exist or name has changed since last download # check if profile does exist or name has changed since last download
# and update name and json data if necessary # and update name and json data if necessary
name_updated = check_id(name, session, data, quiet=quiet) name_updated = self.check_id(name, data)
if name_updated != name: if name_updated != name:
name = name_updated name = name_updated
data = get_json(name, session, sleep=sleep) data = self.get_json(name)
# Download profile picture # Download profile picture
download_profilepic(name, data["entry_data"]["ProfilePage"][0]["user"]["profile_pic_url"], self.download_profilepic(name, data["entry_data"]["ProfilePage"][0]["user"]["profile_pic_url"])
quiet=quiet) if self.sleep:
if sleep:
time.sleep(1.75 * random.random() + 0.25) time.sleep(1.75 * random.random() + 0.25)
if profile_pic_only: if profile_pic_only:
return return
@ -682,60 +677,55 @@ def download(name: str, session: requests.Session,
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name) raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name)
else: else:
if data["config"]["viewer"] is not None: if data["config"]["viewer"] is not None:
_log("profile %s could also be downloaded anonymously." % name, quiet=quiet) self._log("profile %s could also be downloaded anonymously." % name)
if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or
not data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) \ not data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) \
and not profile_pic_only: and not profile_pic_only:
raise ProfileHasNoPicsException("Profile %s: no pics found." % name) raise ProfileHasNoPicsException("Profile %s: no pics found." % name)
# Iterate over pictures and download them # Iterate over pictures and download them
def get_last_id(data): def get_last_id(data):
if data["entry_data"] and data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: if data["entry_data"] and data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]:
return data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"][-1]["id"] return data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"][-1]["id"]
totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"] totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"]
count = 1 count = 1
while get_last_id(data) is not None: while get_last_id(data) is not None:
for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]: for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]:
_log("[%3i/%3i] " % (count, totalcount), end="", flush=True, quiet=quiet) self._log("[%3i/%3i] " % (count, totalcount), end="", flush=True)
count += 1 count += 1
downloaded = download_node(node, session, name, downloaded = self.download_node(node, name,
download_videos=download_videos, geotags=geotags, download_videos=download_videos, geotags=geotags)
sleep=sleep, shorter_output=shorter_output, quiet=quiet)
if fast_update and not downloaded: if fast_update and not downloaded:
return return
data = get_json(name, session, max_id=get_last_id(data), sleep=sleep) data = self.get_json(name, max_id=get_last_id(data))
def interactive_login(self, username: str, password: Optional[str] = None) -> None:
def get_logged_in_session(username: str, password: Optional[str] = None, quiet: bool = False) -> requests.Session:
"""Logs in and returns session, asking user for password if needed""" """Logs in and returns session, asking user for password if needed"""
if password is not None: if password is not None:
return get_session(username, password) self.login(username, password)
if quiet: if self.quiet:
raise LoginRequiredException("Quiet mode requires given password or valid " raise LoginRequiredException("Quiet mode requires given password or valid session file.")
"session file.")
while password is None: while password is None:
password = getpass.getpass(prompt="Enter Instagram password for %s: " % username) password = getpass.getpass(prompt="Enter Instagram password for %s: " % username)
try: try:
return get_session(username, password) self.login(username, password)
except BadCredentialsException as err: except BadCredentialsException as err:
print(err, file=sys.stderr) print(err, file=sys.stderr)
password = None password = None
def download_profiles(self, profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
def download_profiles(profilelist: List[str], username: Optional[str] = None, password: Optional[str] = None,
sessionfile: Optional[str] = None, max_count: Optional[int] = None, sessionfile: Optional[str] = None, max_count: Optional[int] = None,
profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False, profile_pic_only: bool = False, download_videos: bool = True, geotags: bool = False,
fast_update: bool = False, fast_update: bool = False) -> None:
sleep: bool = True, shorter_output: bool = False, quiet: bool = False) -> None:
"""Download set of profiles and handle sessions""" """Download set of profiles and handle sessions"""
# pylint:disable=too-many-branches,too-many-locals # pylint:disable=too-many-branches,too-many-locals
# Login, if desired # Login, if desired
if username is not None: if username is not None:
session = load_session(username, sessionfile, quiet=quiet) self.load_session_from_file(username, sessionfile)
if username != test_login(session): if username != self.test_login(self.session):
session = get_logged_in_session(username, password, quiet) self.interactive_login(username, password)
_log("Logged in as %s." % username, quiet=quiet) self._log("Logged in as %s." % username)
else:
session = get_anonymous_session()
# Try block for KeyboardInterrupt (save session on ^C) # Try block for KeyboardInterrupt (save session on ^C)
failedtargets = [] failedtargets = []
targets = set() targets = set()
@ -743,43 +733,42 @@ def download_profiles(profilelist: List[str], username: Optional[str] = None, pa
# Generate set of targets # Generate set of targets
for pentry in profilelist: for pentry in profilelist:
if pentry[0] == '#': if pentry[0] == '#':
_log("Retrieving pictures with hashtag {0}".format(pentry), quiet=quiet) self._log("Retrieving pictures with hashtag {0}".format(pentry))
download_hashtag(hashtag=pentry[1:], session=session, max_count=max_count, fast_update=fast_update, self.download_hashtag(hashtag=pentry[1:], max_count=max_count, fast_update=fast_update,
download_videos=download_videos, geotags=geotags, shorter_output=shorter_output, download_videos=download_videos, geotags=geotags)
sleep=sleep, quiet=quiet)
elif pentry[0] == '@' and username is not None: elif pentry[0] == '@' and username is not None:
_log("Retrieving followees of %s..." % pentry[1:], quiet=quiet) self._log("Retrieving followees of %s..." % pentry[1:])
followees = get_followees(pentry[1:], session) followees = self.get_followees(pentry[1:])
targets.update([followee['username'] for followee in followees]) targets.update([followee['username'] for followee in followees])
elif pentry == ":feed-all" and username is not None: elif pentry == ":feed-all" and username is not None:
_log("Retrieving pictures from your feed...", quiet=quiet) self._log("Retrieving pictures from your feed...")
download_feed_pics(session, fast_update=fast_update, max_count=max_count, self.download_feed_pics(fast_update=fast_update, max_count=max_count,
download_videos=download_videos, geotags=geotags, download_videos=download_videos, geotags=geotags)
shorter_output=shorter_output, sleep=sleep, quiet=quiet)
elif pentry == ":feed-liked" and username is not None: elif pentry == ":feed-liked" and username is not None:
_log("Retrieving pictures you liked from your feed...", quiet=quiet) self._log("Retrieving pictures you liked from your feed...")
download_feed_pics(session, fast_update=fast_update, max_count=max_count, self.download_feed_pics(fast_update=fast_update, max_count=max_count,
filter_func=lambda node: filter_func=lambda node:
not node["likes"]["viewer_has_liked"] not node["likes"]["viewer_has_liked"]
if "likes" in node if "likes" in node
else not node["viewer_has_liked"], else not node["viewer_has_liked"],
download_videos=download_videos, geotags=geotags, download_videos=download_videos, geotags=geotags)
shorter_output=shorter_output, sleep=sleep, quiet=quiet)
else: else:
targets.add(pentry) targets.add(pentry)
if len(targets) > 1: if len(targets) > 1:
_log("Downloading %i profiles..." % len(targets), quiet=quiet) self._log("Downloading %i profiles..." % len(targets))
# Iterate through targets list and download them # Iterate through targets list and download them
for target in targets: for target in targets:
try: try:
try: try:
download(target, session, profile_pic_only, download_videos, self.download(target, profile_pic_only, download_videos,
geotags, fast_update, shorter_output, sleep, quiet) geotags, fast_update)
except ProfileNotExistsException as err: except ProfileNotExistsException as err:
if username is not None: if username is not None:
_log("\"Profile not exists\" - Trying again anonymously, helps in case you are just blocked") self._log(
download(target, get_anonymous_session(), profile_pic_only, download_videos, "\"Profile not exists\" - Trying again anonymously, helps in case you are just blocked")
geotags, fast_update, shorter_output, sleep, quiet) anonymous_loader = Instaloader(self.sleep, self.quiet, self.shorter_output)
anonymous_loader.download(target, profile_pic_only, download_videos,
geotags, fast_update)
else: else:
raise err raise err
except NonfatalException as err: except NonfatalException as err:
@ -792,7 +781,8 @@ def download_profiles(profilelist: List[str], username: Optional[str] = None, pa
", ".join(failedtargets), file=sys.stderr) ", ".join(failedtargets), file=sys.stderr)
# Save session if it is useful # Save session if it is useful
if username is not None: if username is not None:
save_session(session, username, sessionfile, quiet=quiet) self.save_session_to_file(sessionfile)
def main(): def main():
parser = ArgumentParser(description=__doc__, parser = ArgumentParser(description=__doc__,
@ -806,17 +796,16 @@ def main():
parser.add_argument('--version', action='version', parser.add_argument('--version', action='version',
version=__version__) version=__version__)
parser.add_argument('-l', '--login', metavar='YOUR-USERNAME', parser.add_argument('-l', '--login', metavar='YOUR-USERNAME',
help='Login name for your Instagram account. Not needed to download public '\ help='Login name for your Instagram account. Not needed to download public '
'profiles, but if you want to download private profiles or all followees of '\ 'profiles, but if you want to download private profiles or all followees of '
'some profile, you have to specify a username used to login.') 'some profile, you have to specify a username used to login.')
parser.add_argument('-p', '--password', metavar='YOUR-PASSWORD', parser.add_argument('-p', '--password', metavar='YOUR-PASSWORD',
help='Password for your Instagram account. If --login is given and there is '\ help='Password for your Instagram account. If --login is given and there is '
'not yet a valid session file, you\'ll be prompted for your password if '\ 'not yet a valid session file, you\'ll be prompted for your password if '
'--password is not given. Specifying this option without --login has no '\ '--password is not given. Specifying this option without --login has no '
'effect.') 'effect.')
parser.add_argument('-f', '--sessionfile', parser.add_argument('-f', '--sessionfile',
help='File to store session key, defaults to '+ \ help='File to store session key, defaults to ' + get_default_session_filename("<login_name>"))
get_default_session_filename("<login_name>"))
parser.add_argument('-P', '--profile-pic-only', action='store_true', parser.add_argument('-P', '--profile-pic-only', action='store_true',
help='Only download profile picture') help='Only download profile picture')
parser.add_argument('-V', '--skip-videos', action='store_true', parser.add_argument('-V', '--skip-videos', action='store_true',
@ -833,16 +822,17 @@ def main():
parser.add_argument('-O', '--shorter-output', action='store_true', parser.add_argument('-O', '--shorter-output', action='store_true',
help='Do not display captions while downloading') help='Do not display captions while downloading')
parser.add_argument('-q', '--quiet', action='store_true', parser.add_argument('-q', '--quiet', action='store_true',
help='Disable user interaction, i.e. do not print messages (except errors) and fail ' \ help='Disable user interaction, i.e. do not print messages (except errors) and fail '
'if login credentials are needed but not given.') 'if login credentials are needed but not given.')
args = parser.parse_args() args = parser.parse_args()
try: try:
download_profiles(args.profile, args.login, args.password, args.sessionfile, loader = Instaloader(not args.no_sleep, args.quiet, args.shorter_output)
loader.download_profiles(args.profile, args.login, args.password, args.sessionfile,
int(args.count) if args.count is not None else None, int(args.count) if args.count is not None else None,
args.profile_pic_only, not args.skip_videos, args.geotags, args.fast_update, args.profile_pic_only, not args.skip_videos, args.geotags, args.fast_update)
not args.no_sleep, args.shorter_output, args.quiet)
except InstaloaderException as err: except InstaloaderException as err:
raise SystemExit("Fatal error: %s" % err) raise SystemExit("Fatal error: %s" % err)
if __name__ == "__main__": if __name__ == "__main__":
main() main()