1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-09-11 16:22:24 +02:00

New formatter for filename pattern

This commit is contained in:
Alexander Graf 2018-04-13 21:19:04 +02:00
parent df1cdb5d48
commit 80f701150c
2 changed files with 75 additions and 117 deletions

View File

@ -11,7 +11,7 @@ from contextlib import contextmanager, suppress
from datetime import datetime
from functools import wraps
from io import BytesIO
from typing import Any, Callable, Dict, Iterator, List, Optional
from typing import Callable, Dict, Iterator, List, Optional, Union
from .exceptions import *
from .instaloadercontext import InstaloaderContext
@ -25,7 +25,7 @@ def get_default_session_filename(username: str) -> str:
return filename.lower()
def format_string_contains_key(format_string: '_PathPattern', key: str) -> bool:
def format_string_contains_key(format_string: str, key: str) -> bool:
# pylint:disable=unused-variable
for literal_text, field_name, format_spec, conversion in string.Formatter().parse(format_string):
if field_name and (field_name == key or field_name.startswith(key + '.')):
@ -45,13 +45,31 @@ def _requires_login(func: Callable) -> Callable:
return call
class _PathPattern(str):
"""Class overriding :meth:`str.format` for character substitution in paths for Windows, see issue #84."""
class _PostPathFormatter(string.Formatter):
def __init__(self, post: Union[Post, StoryItem], target: str):
self._post = post
self._target = target
def format(self, *args: Any, **kwargs: Any) -> str:
ret = super().format(*args, **kwargs)
def vformat(self, format_string, args, kwargs):
"""Override :meth:`string.Formatter.vformat` for character substitution in paths for Windows, see issue #84."""
ret = super().vformat(format_string, args, kwargs)
return ret.replace(':', '\ua789') if platform.system() == 'Windows' else ret
def get_field(self, field_name, args, kwargs):
"""Override :meth:`string.Formatter.get_field` to substitue {target} and {<POST_ATTRIBUTE>}."""
if field_name == "target":
return self._target, None
if hasattr(Post, field_name) and hasattr(StoryItem, field_name):
return self._post.__getattribute__(field_name), None
return super().get_field(field_name, args, kwargs)
def format_field(self, value, format_spec):
"""Override :meth:`string.Formatter.format_field` to have our
default format_spec for :class:`datetime.Datetime` objects."""
if isinstance(value, datetime) and not format_spec:
return super().format_field(value, '%Y-%m-%d_%H-%M-%S')
return super().format_field(value, format_spec)
class Instaloader:
@ -72,19 +90,8 @@ class Instaloader:
self.context = InstaloaderContext(sleep, quiet, user_agent, max_connection_attempts)
# configuration parameters
self.dirname_pattern = _PathPattern(dirname_pattern if dirname_pattern is not None else '{target}')
if filename_pattern is not None:
filename_pattern = re.sub(r"({(?:post\.)?date)([:}])", r"\1_utc\2", filename_pattern)
self.filename_pattern_old = filename_pattern.replace('{date_utc}', '{date_utc:%Y-%m-%d_%H-%M-%S}')
self.filename_pattern_old = _PathPattern(re.sub(r"(?i)({(?:post\.)?date_utc:[^}]*?)_UTC",
r"\1", self.filename_pattern_old))
filename_pattern = re.sub(r"(?i)({(date_utc|post\.date_utc):(?![^}]*UTC[^}]*).*?)}",
r"\1_UTC}", filename_pattern)
self.filename_pattern = _PathPattern(filename_pattern.replace('{date_utc}',
'{date_utc:%Y-%m-%d_%H-%M-%S_UTC}'))
else:
self.filename_pattern = _PathPattern('{date_utc:%Y-%m-%d_%H-%M-%S_UTC}')
self.filename_pattern_old = _PathPattern('{date_utc:%Y-%m-%d_%H-%M-%S}')
self.dirname_pattern = dirname_pattern or "{target}"
self.filename_pattern = filename_pattern or "{date_utc}_UTC"
self.download_videos = download_videos
self.download_video_thumbnails = download_video_thumbnails
self.download_geotags = download_geotags
@ -119,25 +126,17 @@ class Instaloader:
def __exit__(self, *args):
self.close()
def download_pic(self, filename: str, url: str, mtime: datetime,
filename_alt: Optional[str] = None, filename_suffix: Optional[str] = None) -> bool:
def download_pic(self, filename: str, url: str, mtime: datetime, filename_suffix: Optional[str] = None) -> bool:
"""Downloads and saves picture with given url under given directory with given timestamp.
Returns true, if file was actually downloaded, i.e. updated."""
urlmatch = re.search('\\.[a-z0-9]*\\?', url)
file_extension = url[-3:] if urlmatch is None else urlmatch.group(0)[1:-1]
if filename_suffix is not None:
filename += '_' + filename_suffix
if filename_alt is not None:
filename_alt += '_' + filename_suffix
filename += '.' + file_extension
if os.path.isfile(filename):
self.context.log(filename + ' exists', end=' ', flush=True)
return False
if filename_alt is not None:
filename_alt += '.' + file_extension
if os.path.isfile(filename_alt):
self.context.log(filename_alt + 'exists', end=' ', flush=True)
return False
self.context.get_and_write_raw(url, filename)
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
return True
@ -153,20 +152,15 @@ class Instaloader:
# log 'json ' message when saving Post or StoryItem
self.context.log('json', end=' ', flush=True)
def update_comments(self, filename: str, post: Post, filename_alt: Optional[str] = None) -> None:
def update_comments(self, filename: str, post: Post) -> None:
filename += '_comments.json'
try:
filename_current = filename + '_comments.json'
comments = json.load(open(filename_current))
comments = json.load(open(filename))
except FileNotFoundError:
try:
filename_current = filename_alt + '_comments.json'
comments = json.load(open(filename_current))
except (FileNotFoundError, TypeError):
filename_current = filename + '_comments.json'
comments = list()
comments = list()
comments.extend(post.get_comments())
if comments:
with open(filename_current, 'w') as file:
with open(filename, 'w') as file:
comments_list = sorted(sorted(list(comments), key=lambda t: t['id']),
key=lambda t: t['created_at'], reverse=True)
unique_comments_list = [comments_list[0]]
@ -178,25 +172,17 @@ class Instaloader:
if x['id'] != y['id']:
unique_comments_list.append(y)
file.write(json.dumps(unique_comments_list, indent=4))
os.rename(filename_current, filename + '_comments.json')
self.context.log('comments', end=' ', flush=True)
def save_caption(self, filename: str, mtime: datetime, caption: str, filename_alt: Optional[str] = None) -> None:
def save_caption(self, filename: str, mtime: datetime, caption: str) -> None:
"""Updates picture caption"""
filename += '.txt'
if filename_alt is not None:
filename_alt += '.txt'
pcaption = caption.replace('\n', ' ').strip()
caption = caption.encode("UTF-8")
pcaption = '[' + ((pcaption[:29] + u"\u2026") if len(pcaption) > 31 else pcaption) + ']'
with suppress(FileNotFoundError):
try:
with open(filename, 'rb') as file:
file_caption = file.read()
except FileNotFoundError:
if filename_alt is not None:
with open(filename_alt, 'rb') as file:
file_caption = file.read()
with open(filename, 'rb') as file:
file_caption = file.read()
if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'):
try:
self.context.log(pcaption + ' unchanged', end=' ', flush=True)
@ -204,22 +190,15 @@ class Instaloader:
self.context.log('txt unchanged', end=' ', flush=True)
return None
else:
def get_filename(file, index):
return file if index == 0 else (file[:-4] + '_old_' +
(str(0) if index < 10 else str()) + str(index) + file[-4:])
def get_filename(index):
return filename if index == 0 else (filename[:-4] + '_old_' +
(str(0) if index < 10 else str()) + str(index) + filename[-4:])
i = 0
file_exists_list = []
while True:
file_exists_list.append(1 if os.path.isfile(get_filename(filename, i)) else 0)
if not file_exists_list[i] and filename_alt is not None:
file_exists_list[i] = 2 if os.path.isfile(get_filename(filename_alt, i)) else 0
if not file_exists_list[i]:
break
while os.path.isfile(get_filename(i)):
i = i + 1
for index in range(i, 0, -1):
os.rename(get_filename(filename if file_exists_list[index - 1] % 2 else filename_alt, index - 1),
get_filename(filename, index))
os.rename(get_filename(index - 1), get_filename(index))
try:
self.context.log(pcaption + ' updated', end=' ', flush=True)
except UnicodeEncodeError:
@ -311,21 +290,8 @@ class Instaloader:
:return: True if something was downloaded, False otherwise, i.e. file was already there
"""
# Format dirname and filename. post.owner_username might do an additional request, so only access it, if
# {profile} is part of the dirname pattern or filename pattern.
needs_profilename = (format_string_contains_key(self.dirname_pattern, 'profile') or
format_string_contains_key(self.filename_pattern, 'profile'))
profilename = post.owner_username if needs_profilename else None
dirname = self.dirname_pattern.format(profile=profilename, target=target.lower())
filename = dirname + '/' + self.filename_pattern.format(profile=profilename, target=target.lower(),
date_utc=post.date_utc,
shortcode=post.shortcode,
post=post)
filename_old = dirname + '/' + self.filename_pattern_old.replace("{post.date_utc", "{date_utc") \
.format(profile=profilename, target=target.lower(),
date_utc=post.date_local,
shortcode=post.shortcode,
post=post)
dirname = _PostPathFormatter(post, target).format(self.dirname_pattern)
filename = dirname + '/' + _PostPathFormatter(post, target).format(self.filename_pattern)
os.makedirs(os.path.dirname(filename), exist_ok=True)
# Download the image(s) / video thumbnail and videos within sidecars if desired
@ -335,41 +301,31 @@ class Instaloader:
for edge in post.get_sidecar_edges():
# Download picture or video thumbnail
if not edge['node']['is_video'] or self.download_video_thumbnails is True:
downloaded |= self.download_pic(filename=filename,
filename_alt=filename_old,
url=edge['node']['display_url'],
mtime=post.date_local,
filename_suffix=str(edge_number))
downloaded |= self.download_pic(filename=filename, url=edge['node']['display_url'],
mtime=post.date_local, filename_suffix=str(edge_number))
# Additionally download video if available and desired
if edge['node']['is_video'] and self.download_videos is True:
downloaded |= self.download_pic(filename=filename,
filename_alt=filename_old,
url=edge['node']['video_url'],
mtime=post.date_local,
filename_suffix=str(edge_number))
downloaded |= self.download_pic(filename=filename, url=edge['node']['video_url'],
mtime=post.date_local, filename_suffix=str(edge_number))
edge_number += 1
elif post.typename == 'GraphImage':
downloaded = self.download_pic(filename=filename, filename_alt=filename_old,
url=post.url, mtime=post.date_local)
downloaded = self.download_pic(filename=filename, url=post.url, mtime=post.date_local)
elif post.typename == 'GraphVideo':
if self.download_video_thumbnails is True:
downloaded = self.download_pic(filename=filename, filename_alt=filename_old,
url=post.url, mtime=post.date_local)
downloaded = self.download_pic(filename=filename, url=post.url, mtime=post.date_local)
else:
self.context.error("Warning: {0} has unknown typename: {1}".format(post, post.typename))
# Save caption if desired
if self.save_captions is not False:
if post.caption:
self.save_caption(filename=filename, filename_alt=filename_old,
mtime=post.date_local, caption=post.caption)
self.save_caption(filename=filename, mtime=post.date_local, caption=post.caption)
else:
self.context.log("<no caption>", end=' ', flush=True)
# Download video if desired
if post.is_video and self.download_videos is True:
downloaded |= self.download_pic(filename=filename, filename_alt=filename_old,
url=post.video_url, mtime=post.date_local)
downloaded |= self.download_pic(filename=filename, url=post.video_url, mtime=post.date_local)
# Download geotags if desired
if self.download_geotags is True:
@ -379,7 +335,7 @@ class Instaloader:
# Update comments if desired
if self.download_comments is True:
self.update_comments(filename=filename, filename_alt=filename_old, post=post)
self.update_comments(filename=filename, post=post)
# Save metadata as JSON if desired.
if self.save_metadata is not False:
@ -427,10 +383,6 @@ class Instaloader:
if not userids:
self.context.log("Retrieving all visible stories...")
if format_string_contains_key(self.filename_pattern, 'post'):
raise InvalidArgumentException("The \"post\" keyword is not supported in the filename pattern when "
"downloading stories.")
for user_story in self.get_stories(userids):
name = user_story.owner_username
self.context.log("Retrieving stories from profile {}.".format(name))
@ -452,30 +404,16 @@ class Instaloader:
:return: True if something was downloaded, False otherwise, i.e. file was already there
"""
owner_name = item.owner_username
shortcode = item.shortcode
date_local = item.date_local
date_utc = item.date_utc
dirname = self.dirname_pattern.format(profile=owner_name, target=target)
filename = dirname + '/' + self.filename_pattern.format(profile=owner_name, target=target,
date_utc=date_utc,
shortcode=shortcode)
filename_old = dirname + '/' + self.filename_pattern_old.format(profile=owner_name, target=target,
date_utc=date_local,
shortcode=shortcode)
dirname = _PostPathFormatter(item, target).format(self.dirname_pattern)
filename = dirname + '/' + _PostPathFormatter(item, target).format(self.filename_pattern)
os.makedirs(os.path.dirname(filename), exist_ok=True)
downloaded = False
if not item.is_video or self.download_video_thumbnails is True:
url = item.url
downloaded = self.download_pic(filename=filename,
filename_alt=filename_old,
url=url,
mtime=date_local)
downloaded = self.download_pic(filename=filename, url=url, mtime=date_local)
if item.is_video and self.download_videos is True:
downloaded |= self.download_pic(filename=filename,
filename_alt=filename_old,
url=item.video_url,
mtime=date_local)
downloaded |= self.download_pic(filename=filename, url=item.video_url, mtime=date_local)
# Save metadata as JSON if desired.
if self.save_metadata is not False:
self.save_metadata_json(filename, item)

View File

@ -165,6 +165,16 @@ class Post:
"""Timestamp when the post was created (UTC)."""
return datetime.utcfromtimestamp(self._node["date"] if "date" in self._node else self._node["taken_at_timestamp"])
@property
def date(self) -> datetime:
"""Synonym to :meth:`.date_utc`"""
return self.date_utc
@property
def profile(self) -> str:
"""Synonym to :meth:`.owner_username`"""
return self.owner_username
@property
def url(self) -> str:
"""URL of the picture / video thumbnail of the post"""
@ -573,6 +583,16 @@ class StoryItem:
"""Timestamp when the StoryItem was created (UTC)."""
return datetime.utcfromtimestamp(self._node['taken_at_timestamp'])
@property
def date(self) -> datetime:
"""Synonym to :meth:`.date_utc`"""
return self.date_utc
@property
def profile(self) -> str:
"""Synonym to :meth:`.owner_username`"""
return self.owner_username
@property
def expiring_local(self) -> datetime:
"""Timestamp when the StoryItem will get unavailable (local time zone)."""