1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-09-11 16:22:24 +02:00

Consistently use datetime type for handling dates

This commit is contained in:
Alexander Graf 2017-07-29 11:08:52 +02:00
parent 48d6f0226b
commit 66f69b5c21

View File

@ -2,7 +2,6 @@
"""Download pictures (or videos) along with their captions and other metadata from Instagram.""" """Download pictures (or videos) along with their captions and other metadata from Instagram."""
import datetime
import getpass import getpass
import json import json
import os import os
@ -16,6 +15,7 @@ import tempfile
import time import time
from argparse import ArgumentParser from argparse import ArgumentParser
from base64 import b64decode, b64encode from base64 import b64decode, b64encode
from datetime import datetime
from io import BytesIO from io import BytesIO
from typing import Any, Callable, Dict, List, Optional from typing import Any, Callable, Dict, List, Optional
@ -87,10 +87,6 @@ class ConnectionException(InstaloaderException):
pass pass
def _epoch_to_string(epoch: float) -> str:
return datetime.datetime.fromtimestamp(epoch).strftime('%Y-%m-%d_%H-%M-%S')
def get_default_session_filename(username: str) -> str: def get_default_session_filename(username: str) -> str:
"""Returns default session filename for given username.""" """Returns default session filename for given username."""
dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser() dirname = tempfile.gettempdir() + "/" + ".instaloader-" + getpass.getuser()
@ -327,7 +323,7 @@ class Instaloader:
break break
return comments return comments
def download_pic(self, filename: str, url: str, date_epoch: float, def download_pic(self, filename: str, url: str, mtime: datetime,
filename_suffix: Optional[str] = None) -> bool: filename_suffix: Optional[str] = None) -> bool:
"""Downloads and saves picture with given url under given directory with given timestamp. """Downloads and saves picture with given url under given directory with given timestamp.
Returns true, if file was actually downloaded, i.e. updated.""" Returns true, if file was actually downloaded, i.e. updated."""
@ -345,7 +341,7 @@ class Instaloader:
with open(filename, 'wb') as file: with open(filename, 'wb') as file:
resp.raw.decode_content = True resp.raw.decode_content = True
shutil.copyfileobj(resp.raw, file) shutil.copyfileobj(resp.raw, file)
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
return True return True
else: else:
raise ConnectionException("File \'" + url + "\' could not be downloaded.") raise ConnectionException("File \'" + url + "\' could not be downloaded.")
@ -373,7 +369,7 @@ class Instaloader:
file.write(json.dumps(unique_comments_list, indent=4)) file.write(json.dumps(unique_comments_list, indent=4))
self._log('comments', end=' ', flush=True) self._log('comments', end=' ', flush=True)
def save_caption(self, filename: str, date_epoch: float, caption: str) -> None: def save_caption(self, filename: str, mtime: datetime, caption: str) -> None:
"""Updates picture caption""" """Updates picture caption"""
filename += '.txt' filename += '.txt'
pcaption = caption.replace('\n', ' ').strip() pcaption = caption.replace('\n', ' ').strip()
@ -413,30 +409,34 @@ class Instaloader:
self._log('txt', end=' ', flush=True) self._log('txt', end=' ', flush=True)
with open(filename, 'wb') as text_file: with open(filename, 'wb') as text_file:
shutil.copyfileobj(BytesIO(caption), text_file) shutil.copyfileobj(BytesIO(caption), text_file)
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
def save_location(self, filename: str, location_json: Dict[str, str], date_epoch: float) -> None: def save_location(self, filename: str, location_json: Dict[str, str], mtime: datetime) -> None:
filename += '_location.txt' filename += '_location.txt'
location_string = (location_json["name"] + "\n" + location_string = (location_json["name"] + "\n" +
"https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n".format(location_json["lat"], "https://maps.google.com/maps?q={0},{1}&ll={0},{1}\n".format(location_json["lat"],
location_json["lng"])) location_json["lng"]))
with open(filename, 'wb') as text_file: with open(filename, 'wb') as text_file:
shutil.copyfileobj(BytesIO(location_string.encode()), text_file) shutil.copyfileobj(BytesIO(location_string.encode()), text_file)
os.utime(filename, (datetime.datetime.now().timestamp(), date_epoch)) os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
self._log('geo', end=' ', flush=True) self._log('geo', end=' ', flush=True)
def download_profilepic(self, name: str, url: str) -> None: def download_profilepic(self, name: str, url: str) -> None:
"""Downloads and saves profile pic with given url.""" """Downloads and saves profile pic with given url."""
date_object = datetime.datetime.strptime(requests.head(url).headers["Last-Modified"],
'%a, %d %b %Y %H:%M:%S GMT') def _epoch_to_string(epoch: datetime) -> str:
return epoch.strftime('%Y-%m-%d_%H-%M-%S')
date_object = datetime.strptime(requests.head(url).headers["Last-Modified"],
'%a, %d %b %Y %H:%M:%S GMT')
if ((format_string_contains_key(self.dirname_pattern, 'profile') or if ((format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.dirname_pattern, 'target'))): format_string_contains_key(self.dirname_pattern, 'target'))):
filename = '{0}/{1}_UTC_profile_pic.{2}'.format(self.dirname_pattern.format(profile=name.lower(), filename = '{0}/{1}_UTC_profile_pic.{2}'.format(self.dirname_pattern.format(profile=name.lower(),
target=name.lower()), target=name.lower()),
_epoch_to_string(date_object.timestamp()), url[-3:]) _epoch_to_string(date_object), url[-3:])
else: else:
filename = '{0}/{1}_{2}_UTC_profile_pic.{3}'.format(self.dirname_pattern.format(), name.lower(), filename = '{0}/{1}_{2}_UTC_profile_pic.{3}'.format(self.dirname_pattern.format(), name.lower(),
_epoch_to_string(date_object.timestamp()), url[-3:]) _epoch_to_string(date_object), url[-3:])
if os.path.isfile(filename): if os.path.isfile(filename):
self._log(filename + ' already exists') self._log(filename + ' already exists')
return None return None
@ -452,7 +452,7 @@ class Instaloader:
with open(filename, 'wb') as file: with open(filename, 'wb') as file:
resp.raw.decode_content = True resp.raw.decode_content = True
shutil.copyfileobj(resp.raw, file) shutil.copyfileobj(resp.raw, file)
os.utime(filename, (datetime.datetime.now().timestamp(), date_object.timestamp())) os.utime(filename, (datetime.now().timestamp(), date_object.timestamp()))
else: else:
raise ConnectionException("File \'" + url + "\' could not be downloaded.") raise ConnectionException("File \'" + url + "\' could not be downloaded.")
@ -573,10 +573,10 @@ class Instaloader:
else: else:
profilename = None profilename = None
profilename = profilename.lower() if profilename else None profilename = profilename.lower() if profilename else None
date = node["date"] if "date" in node else node["taken_at_timestamp"] date = datetime.fromtimestamp(node["date"] if "date" in node else node["taken_at_timestamp"])
dirname = self.dirname_pattern.format(profile=profilename, target=target.lower()) dirname = self.dirname_pattern.format(profile=profilename, target=target.lower())
filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(), filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(),
date=datetime.datetime.fromtimestamp(date), date=date,
shortcode=shortcode) shortcode=shortcode)
os.makedirs(os.path.dirname(filename), exist_ok=True) os.makedirs(os.path.dirname(filename), exist_ok=True)
if '__typename' in node: if '__typename' in node:
@ -591,7 +591,7 @@ class Instaloader:
for edge in media['edge_sidecar_to_children']['edges']: for edge in media['edge_sidecar_to_children']['edges']:
edge_downloaded = self.download_pic(filename=filename, edge_downloaded = self.download_pic(filename=filename,
url=edge['node']['display_url'], url=edge['node']['display_url'],
date_epoch=date, mtime=date,
filename_suffix=str(edge_number)) filename_suffix=str(edge_number))
downloaded = downloaded and edge_downloaded downloaded = downloaded and edge_downloaded
edge_number += 1 edge_number += 1
@ -599,13 +599,13 @@ class Instaloader:
url = node["display_url"] if "display_url" in node else node["display_src"] url = node["display_url"] if "display_url" in node else node["display_src"]
downloaded = self.download_pic(filename=filename, downloaded = self.download_pic(filename=filename,
url=url, url=url,
date_epoch=date) mtime=date)
else: else:
self._log("Warning: Unknown typename discovered:" + node['__typename']) self._log("Warning: Unknown typename discovered:" + node['__typename'])
downloaded = False downloaded = False
else: else:
# Node is an old image or video. # Node is an old image or video.
downloaded = self.download_pic(filename=filename, url=node["display_src"], date_epoch=date) downloaded = self.download_pic(filename=filename, url=node["display_src"], mtime=date)
if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]: if "edge_media_to_caption" in node and node["edge_media_to_caption"]["edges"]:
self.save_caption(filename, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"]) self.save_caption(filename, date, node["edge_media_to_caption"]["edges"][0]["node"]["text"])
elif "caption" in node: elif "caption" in node:
@ -616,7 +616,7 @@ class Instaloader:
video_data = self.get_json('p/' + shortcode) video_data = self.get_json('p/' + shortcode)
self.download_pic(filename=filename, self.download_pic(filename=filename,
url=video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'], url=video_data['entry_data']['PostPage'][0]['graphql']['shortcode_media']['video_url'],
date_epoch=date) mtime=date)
if geotags: if geotags:
location = self.get_location(shortcode) location = self.get_location(shortcode)
if location: if location:
@ -684,24 +684,24 @@ class Instaloader:
self._sleep() self._sleep()
shortcode = item["code"] if "code" in item else "no_code" shortcode = item["code"] if "code" in item else "no_code"
date = item["device_timestamp"] if "device_timestamp" in item else item["taken_at"] date_float = item["device_timestamp"] if "device_timestamp" in item else item["taken_at"]
try: try:
date_stamp = datetime.datetime.fromtimestamp(date) date = datetime.fromtimestamp(date_float)
except ValueError: except ValueError:
# device_timestamp seems to sometime be in milliseconds # device_timestamp seems to sometime be in milliseconds
date /= 1000 date_float /= 1000
date_stamp = datetime.datetime.fromtimestamp(date) date = datetime.fromtimestamp(date_float)
dirname = self.dirname_pattern.format(profile=name, target=':stories') dirname = self.dirname_pattern.format(profile=name, target=':stories')
filename = dirname + '/' + self.filename_pattern.format(profile=name, target=':stories', filename = dirname + '/' + self.filename_pattern.format(profile=name, target=':stories',
date=date_stamp, date=date,
shortcode=shortcode) shortcode=shortcode)
os.makedirs(os.path.dirname(filename), exist_ok=True) os.makedirs(os.path.dirname(filename), exist_ok=True)
if "image_versions2" in item: if "image_versions2" in item:
url = item["image_versions2"]["candidates"][0]["url"] url = item["image_versions2"]["candidates"][0]["url"]
downloaded = self.download_pic(filename=filename, downloaded = self.download_pic(filename=filename,
url=url, url=url,
date_epoch=date) mtime=date)
else: else:
self._log("Warning: Unable to find story image.") self._log("Warning: Unable to find story image.")
downloaded = False downloaded = False
@ -715,7 +715,7 @@ class Instaloader:
if "video_versions" in item and download_videos: if "video_versions" in item and download_videos:
downloaded = self.download_pic(filename=filename, downloaded = self.download_pic(filename=filename,
url=item["video_versions"][0]["url"], url=item["video_versions"][0]["url"],
date_epoch=date) mtime=date)
if "video_duration" in item and self.sleep and downloaded: if "video_duration" in item and self.sleep and downloaded:
time.sleep(item["video_duration"]) time.sleep(item["video_duration"])
if item["story_locations"]: if item["story_locations"]: