1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-08-17 12:19:38 +02:00

Mini refactoring and docstrings

Closes #12.
This commit is contained in:
Alexander Graf 2016-09-18 16:35:25 +02:00
parent d5c13b1295
commit 1ff6dd9d30

View File

@ -52,11 +52,13 @@ class ConnectionException(InstaloaderException):
pass
def log(*msg, sep='', end='\n', flush=False, quiet=False):
def _log(*msg, sep='', end='\n', flush=False, quiet=False):
if not quiet:
print(*msg, sep=sep, end=end, flush=flush)
def get_json(name, session, max_id=0, sleep=True):
"""Return JSON of a profile"""
resp = session.get('http://www.instagram.com/'+name, \
params={'max_id': max_id})
if sleep:
@ -67,15 +69,10 @@ def get_json(name, session, max_id=0, sleep=True):
else:
return json.loads(match.group(0)[21:-2])
def get_last_id(data):
if len(data["entry_data"]) == 0 or \
len(data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) == 0:
return None
else:
data = data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]
return int(data[len(data)-1]["id"])
def get_username_by_id(session, profile_id):
"""To get the current username of a profile, given its unique ID, this function can be used.
session is required to be a logged-in (i.e. non-anonymous) session."""
tempsession = copy_session(session)
tempsession.headers.update({'Content-Type' : 'application/json'})
resp = tempsession.post('https://www.instagram.com/query/', data='q=ig_user(' +
@ -93,23 +90,28 @@ def get_username_by_id(session, profile_id):
raise LoginRequiredException("Login required to determine username (id: " +
str(profile_id) + ").")
def get_id_by_username(profile):
"""Each Instagram profile has its own unique ID which stays unmodified even if a user changes
his/her username. To get said ID, given the profile's name, you may call this function."""
data = get_json(profile, get_anonymous_session())
if len(data["entry_data"]) == 0 or "ProfilePage" not in data("entry_data"):
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
return int(data['entry_data']['ProfilePage'][0]['user']['id'])
def epoch_to_string(epoch):
def _epoch_to_string(epoch):
return datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d_%H-%M-%S')
def get_file_extension(url):
match = re.search('\\.[a-z]*\\?', url)
if match is None:
return url[-3:]
else:
return match.group(0)[1:-1]
def get_followees(profile, session):
"""
Retrieve list of followees of given profile
:param profile: Name of profile to lookup followees
:param session: Session belonging to a user, i.e. not an anonymous session
:return: List of followees (list of dictionaries), as returned by instagram server
"""
tmpsession = copy_session(session)
data = get_json(profile, tmpsession)
profile_id = data['entry_data']['ProfilePage'][0]['user']['id']
@ -158,17 +160,21 @@ def get_followees(profile, session):
"unable to gather followees.")
raise LoginRequiredException("Login required to gather followees.")
def download_pic(name, url, date_epoch, outputlabel=None, quiet=False):
# Returns true, if file was actually downloaded, i.e. updated
"""Downloads and saves picture with given url under given directory with given timestamp.
Returns true, if file was actually downloaded, i.e. updated."""
if outputlabel is None:
outputlabel = epoch_to_string(date_epoch)
filename = name.lower() + '/' + epoch_to_string(date_epoch) + '.' + get_file_extension(url)
outputlabel = _epoch_to_string(date_epoch)
urlmatch = re.search('\\.[a-z]*\\?', url)
file_extension = url[-3:] if urlmatch is None else urlmatch.group(0)[1:-1]
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.' + file_extension
if os.path.isfile(filename):
log(outputlabel + ' exists', end=' ', flush=True, quiet=quiet)
_log(outputlabel + ' exists', end=' ', flush=True, quiet=quiet)
return False
resp = get_anonymous_session().get(url, stream=True)
if resp.status_code == 200:
log(outputlabel, end=' ', flush=True, quiet=quiet)
_log(outputlabel, end=' ', flush=True, quiet=quiet)
os.makedirs(name.lower(), exist_ok=True)
with open(filename, 'wb') as file:
resp.raw.decode_content = True
@ -178,8 +184,10 @@ def download_pic(name, url, date_epoch, outputlabel=None, quiet=False):
else:
raise ConnectionException("File \'" + url + "\' could not be downloaded.")
def save_caption(name, date_epoch, caption, shorter_output=False, quiet=False):
filename = name.lower() + '/' + epoch_to_string(date_epoch) + '.txt'
"""Updates picture caption"""
filename = name.lower() + '/' + _epoch_to_string(date_epoch) + '.txt'
pcaption = caption.replace('\n', ' ').strip()
caption = caption.encode("UTF-8")
if shorter_output:
@ -191,9 +199,9 @@ def save_caption(name, date_epoch, caption, shorter_output=False, quiet=False):
file_caption = file.read()
if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'):
try:
log(pcaption + ' unchanged', end=' ', flush=True, quiet=quiet)
_log(pcaption + ' unchanged', end=' ', flush=True, quiet=quiet)
except UnicodeEncodeError:
log('txt unchanged', end=' ', flush=True, quiet=quiet)
_log('txt unchanged', end=' ', flush=True, quiet=quiet)
return None
else:
def get_filename(index):
@ -205,27 +213,29 @@ def save_caption(name, date_epoch, caption, shorter_output=False, quiet=False):
for index in range(i, 0, -1):
os.rename(get_filename(index-1), get_filename(index))
try:
log(pcaption + ' updated', end=' ', flush=True, quiet=quiet)
_log(pcaption + ' updated', end=' ', flush=True, quiet=quiet)
except UnicodeEncodeError:
log('txt updated', end=' ', flush=True, quiet=quiet)
_log('txt updated', end=' ', flush=True, quiet=quiet)
except FileNotFoundError:
pass
try:
log(pcaption, end=' ', flush=True, quiet=quiet)
_log(pcaption, end=' ', flush=True, quiet=quiet)
except UnicodeEncodeError:
log('txt', end=' ', flush=True, quiet=quiet)
_log('txt', end=' ', flush=True, quiet=quiet)
os.makedirs(name.lower(), exist_ok=True)
with open(filename, 'wb') as text_file:
shutil.copyfileobj(BytesIO(caption), text_file)
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch))
def download_profilepic(name, url, quiet=False):
"""Downloads and saves profile pic with given url."""
date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"], \
'%a, %d %b %Y %H:%M:%S GMT')
filename = name.lower() + '/' + epoch_to_string(date_object.timestamp()) + \
filename = name.lower() + '/' + _epoch_to_string(date_object.timestamp()) + \
'_UTC_profile_pic.' + url[-3:]
if os.path.isfile(filename):
log(filename + ' already exists', quiet=quiet)
_log(filename + ' already exists', quiet=quiet)
return None
match = re.search('http.*://.*instagram.*[^/]*\\.(com|net)/[^/]+/.', url)
if match is None:
@ -235,7 +245,7 @@ def download_profilepic(name, url, quiet=False):
url = url[:index] + 's2048x2048' + ('/' if offset == 0 else str()) + url[index+offset:]
resp = get_anonymous_session().get(url, stream=True)
if resp.status_code == 200:
log(filename, quiet=quiet)
_log(filename, quiet=quiet)
os.makedirs(name.lower(), exist_ok=True)
with open(filename, 'wb') as file:
resp.raw.decode_content = True
@ -244,12 +254,16 @@ def download_profilepic(name, url, quiet=False):
else:
raise ConnectionException("File \'" + url + "\' could not be downloaded.")
def get_default_session_filename(username):
"""Returns default session filename for given username."""
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
filename = dirname + "/" + "session-" + username
return filename
def save_session(session, username, filename=None, quiet=False):
"""Saves requests.Session object."""
if filename is None:
filename = get_default_session_filename(username)
dirname = os.path.dirname(filename)
@ -259,9 +273,11 @@ def save_session(session, username, filename=None, quiet=False):
with open(filename, 'wb') as sessionfile:
os.chmod(filename, 0o600)
pickle.dump(requests.utils.dict_from_cookiejar(session.cookies), sessionfile)
log("Saved session to %s." % filename, quiet=quiet)
_log("Saved session to %s." % filename, quiet=quiet)
def load_session(username, filename=None, quiet=False):
"""Returns loaded requests.Session object, or None if not found."""
if filename is None:
filename = get_default_session_filename(username)
try:
@ -270,19 +286,23 @@ def load_session(username, filename=None, quiet=False):
session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile))
session.headers.update(default_http_header())
session.headers.update({'X-CSRFToken':session.cookies.get_dict()['csrftoken']})
log("Loaded session from %s." % filename, quiet=quiet)
_log("Loaded session from %s." % filename, quiet=quiet)
return session
except FileNotFoundError:
pass
def copy_session(session):
"""Duplicates a requests.Session."""
new = requests.Session()
new.cookies = \
requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies))
new.headers = session.headers
return new
def test_login(session):
"""Returns the Instagram username to which given requests.Session object belongs, or None."""
if session is None:
return
data = get_json(str(), session)
@ -291,7 +311,9 @@ def test_login(session):
time.sleep(4 * random.random() + 1)
return data['config']['viewer']['username']
def default_http_header(empty_session_only=False):
"""Returns default HTTP header we use for requests."""
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/51.0.2704.79 Safari/537.36'
header = { 'Accept-Encoding' : 'gzip, deflate', \
@ -312,7 +334,9 @@ def default_http_header(empty_session_only=False):
del header['X-Requested-With']
return header
def get_anonymous_session():
"""Returns our default anonymous requests.Session object."""
session = requests.Session()
session.cookies.update({'sessionid' : '', 'mid' : '', 'ig_pr' : '1', \
'ig_vw' : '1920', 'csrftoken' : '', \
@ -320,6 +344,7 @@ def get_anonymous_session():
session.headers.update(default_http_header(empty_session_only=True))
return session
def get_session(user, passwd):
"""Log in to instagram with given username and password and return session object"""
session = requests.Session()
@ -342,38 +367,6 @@ def get_session(user, passwd):
else:
raise ConnectionException('Login error! Connection error!')
def check_id(profile, session, json_data, quiet):
profile_exists = len(json_data["entry_data"]) > 0 and "ProfilePage" in json_data["entry_data"]
is_logged_in = json_data["config"]["viewer"] is not None
try:
with open(profile + "/id", 'rb') as id_file:
profile_id = int(id_file.read())
if (not profile_exists) or \
(profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])):
if is_logged_in:
newname = get_username_by_id(session, profile_id)
log("Profile {0} has changed its name to {1}.".format(profile, newname),
quiet=quiet)
os.rename(profile, newname)
return newname
if profile_exists:
raise ProfileNotExistsException("Profile {0} does not match the stored "
"unique ID {1}.".format(profile, profile_id))
raise ProfileNotExistsException("Profile {0} does not exist. Please login to "
"update profile name. Unique ID: {1}."
.format(profile, profile_id))
return profile
except FileNotFoundError:
pass
if profile_exists:
os.makedirs(profile.lower(), exist_ok=True)
with open(profile + "/id", 'w') as text_file:
profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id']
text_file.write(profile_id+"\n")
log("Stored ID {0} for profile {1}.".format(profile_id, profile), quiet=quiet)
return profile
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
def get_feed_json(session, end_cursor=None, sleep=True):
"""
@ -440,13 +433,13 @@ def download_node(node, session, name,
if "caption" in node:
save_caption(name, node["date"], node["caption"], shorter_output, quiet)
else:
log("<no caption>", end=' ', flush=True, quiet=quiet)
_log("<no caption>", end=' ', flush=True, quiet=quiet)
if node["is_video"] and download_videos:
video_data = get_json('p/' + node["code"], session, sleep=sleep)
download_pic(name,
video_data['entry_data']['PostPage'][0]['media']['video_url'],
node["date"], 'mp4', quiet=quiet)
log(quiet=quiet)
_log(quiet=quiet)
return downloaded
@ -476,9 +469,9 @@ def download_feed_pics(session, max_count=None, fast_update=False, filter_func=N
return
name = node["owner"]["username"]
if filter_func is not None and filter_func(node):
log("<pic by %s skipped>" % name, flush=True, quiet=quiet)
_log("<pic by %s skipped>" % name, flush=True, quiet=quiet)
continue
log("[%3i] %s " % (count, name), end="", flush=True, quiet=quiet)
_log("[%3i] %s " % (count, name), end="", flush=True, quiet=quiet)
count += 1
downloaded = download_node(node, session, name,
download_videos=download_videos, sleep=sleep,
@ -489,6 +482,43 @@ def download_feed_pics(session, max_count=None, fast_update=False, filter_func=N
sleep=sleep)
def check_id(profile, session, json_data, quiet=False):
"""
Consult locally stored ID of profile with given name, check whether ID matches and whether name
has changed and return current name of the profile, and store ID of profile.
"""
profile_exists = len(json_data["entry_data"]) > 0 and "ProfilePage" in json_data["entry_data"]
is_logged_in = json_data["config"]["viewer"] is not None
try:
with open(profile + "/id", 'rb') as id_file:
profile_id = int(id_file.read())
if (not profile_exists) or \
(profile_id != int(json_data['entry_data']['ProfilePage'][0]['user']['id'])):
if is_logged_in:
newname = get_username_by_id(session, profile_id)
_log("Profile {0} has changed its name to {1}.".format(profile, newname),
quiet=quiet)
os.rename(profile, newname)
return newname
if profile_exists:
raise ProfileNotExistsException("Profile {0} does not match the stored "
"unique ID {1}.".format(profile, profile_id))
raise ProfileNotExistsException("Profile {0} does not exist. Please login to "
"update profile name. Unique ID: {1}."
.format(profile, profile_id))
return profile
except FileNotFoundError:
pass
if profile_exists:
os.makedirs(profile.lower(), exist_ok=True)
with open(profile + "/id", 'w') as text_file:
profile_id = json_data['entry_data']['ProfilePage'][0]['user']['id']
text_file.write(profile_id+"\n")
_log("Stored ID {0} for profile {1}.".format(profile_id, profile), quiet=quiet)
return profile
raise ProfileNotExistsException("Profile {0} does not exist.".format(profile))
def download(name, session, profile_pic_only=False, download_videos=True,
fast_update=False, shorter_output=False, sleep=True, quiet=False):
"""Download one profile"""
@ -516,17 +546,24 @@ def download(name, session, profile_pic_only=False, download_videos=True,
raise PrivateProfileNotFollowedException("Profile %s: private but not followed." % name)
else:
if data["config"]["viewer"] is not None:
log("profile %s could also be downloaded anonymously." % name, quiet=quiet)
_log("profile %s could also be downloaded anonymously." % name, quiet=quiet)
if ("nodes" not in data["entry_data"]["ProfilePage"][0]["user"]["media"] or
len(data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) == 0) \
and not profile_pic_only:
raise ProfileHasNoPicsException("Profile %s: no pics found." % name)
# Iterate over pictures and download them
def get_last_id(data):
if len(data["entry_data"]) == 0 or \
len(data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]) == 0:
return None
else:
data = data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]
return int(data[len(data) - 1]["id"])
totalcount = data["entry_data"]["ProfilePage"][0]["user"]["media"]["count"]
count = 1
while get_last_id(data) is not None:
for node in data["entry_data"]["ProfilePage"][0]["user"]["media"]["nodes"]:
log("[%3i/%3i] " % (count, totalcount), end="", flush=True, quiet=quiet)
_log("[%3i/%3i] " % (count, totalcount), end="", flush=True, quiet=quiet)
count += 1
downloaded = download_node(node, session, name,
download_videos=download_videos, sleep=sleep,
@ -560,7 +597,7 @@ def download_profiles(profilelist, username=None, password=None, sessionfile=Non
session = load_session(username, sessionfile, quiet=quiet)
if username != test_login(session):
session = get_logged_in_session(username, password, quiet)
log("Logged in as %s." % username, quiet=quiet)
_log("Logged in as %s." % username, quiet=quiet)
else:
session = get_anonymous_session()
# Try block for KeyboardInterrupt (save session on ^C)
@ -570,16 +607,16 @@ def download_profiles(profilelist, username=None, password=None, sessionfile=Non
# Generate set of targets
for pentry in profilelist:
if pentry[0] == '@':
log("Retrieving followees of %s..." % pentry[1:], quiet=quiet)
_log("Retrieving followees of %s..." % pentry[1:], quiet=quiet)
followees = get_followees(pentry[1:], session)
targets.update([followee['username'] for followee in followees])
elif pentry == ":feed-all" and username is not None:
log("Retrieving pictures from your feed...", quiet=quiet)
_log("Retrieving pictures from your feed...", quiet=quiet)
download_feed_pics(session, fast_update=fast_update,
download_videos=download_videos, shorter_output=shorter_output,
sleep=sleep, quiet=quiet)
elif pentry == ":feed-liked" and username is not None:
log("Retrieving pictures you liked from your feed...", quiet=quiet)
_log("Retrieving pictures you liked from your feed...", quiet=quiet)
download_feed_pics(session, fast_update=fast_update,
filter_func=lambda node: not node["likes"]["viewer_has_liked"],
download_videos=download_videos, shorter_output=shorter_output,
@ -587,9 +624,9 @@ def download_profiles(profilelist, username=None, password=None, sessionfile=Non
else:
targets.add(pentry)
if len(targets) == 0:
log("No profiles to download given.", quiet=quiet)
_log("No profiles to download given.", quiet=quiet)
elif len(targets) > 1:
log("Downloading %i profiles..." % len(targets), quiet=quiet)
_log("Downloading %i profiles..." % len(targets), quiet=quiet)
# Iterate through targets list and download them
for target in targets:
try: