mirror of
https://github.com/mikf/gallery-dl.git
synced 2024-11-22 02:32:33 +01:00
update PathFormat class
- change 'has_extension' from a simple flag/bool to a field that contains the original filename extension - rename 'keywords' to 'kwdict' and some other stuff as well - inline 'adjust_path()' - put enumeration index before filename extension (#306)
This commit is contained in:
parent
423f68f585
commit
0bb873757a
@ -114,7 +114,7 @@ class HttpDownloader(DownloaderBase):
|
||||
size = text.parse_int(size)
|
||||
|
||||
# set missing filename extension
|
||||
if not pathfmt.has_extension:
|
||||
if not pathfmt.extension:
|
||||
pathfmt.set_extension(self.get_extension(response))
|
||||
if pathfmt.exists():
|
||||
pathfmt.temppath = ""
|
||||
@ -153,7 +153,7 @@ class HttpDownloader(DownloaderBase):
|
||||
|
||||
# check filename extension
|
||||
if self.adjust_extension:
|
||||
adj_ext = self.check_extension(file, pathfmt)
|
||||
adj_ext = self.check_extension(file, pathfmt.extension)
|
||||
if adj_ext:
|
||||
pathfmt.set_extension(adj_ext)
|
||||
|
||||
@ -161,7 +161,7 @@ class HttpDownloader(DownloaderBase):
|
||||
|
||||
self.downloading = False
|
||||
if self.mtime:
|
||||
pathfmt.keywords["_mtime"] = response.headers.get("Last-Modified")
|
||||
pathfmt.kwdict["_mtime"] = response.headers.get("Last-Modified")
|
||||
return True
|
||||
|
||||
def receive(self, response, file):
|
||||
@ -197,9 +197,8 @@ class HttpDownloader(DownloaderBase):
|
||||
return "txt"
|
||||
|
||||
@staticmethod
|
||||
def check_extension(file, pathfmt):
|
||||
def check_extension(file, extension):
|
||||
"""Check filename extension against fileheader"""
|
||||
extension = pathfmt.keywords["extension"]
|
||||
if extension in FILETYPE_CHECK:
|
||||
file.seek(0)
|
||||
header = file.read(8)
|
||||
|
@ -50,7 +50,7 @@ class YoutubeDLDownloader(DownloaderBase):
|
||||
return False
|
||||
|
||||
if "entries" in info_dict:
|
||||
index = pathfmt.keywords.get("_ytdl_index")
|
||||
index = pathfmt.kwdict.get("_ytdl_index")
|
||||
if index is None:
|
||||
return self._download_playlist(pathfmt, info_dict)
|
||||
else:
|
||||
@ -59,7 +59,7 @@ class YoutubeDLDownloader(DownloaderBase):
|
||||
|
||||
def _download_video(self, pathfmt, info_dict):
|
||||
if "url" in info_dict:
|
||||
text.nameext_from_url(info_dict["url"], pathfmt.keywords)
|
||||
text.nameext_from_url(info_dict["url"], pathfmt.kwdict)
|
||||
pathfmt.set_extension(info_dict["ext"])
|
||||
if pathfmt.exists():
|
||||
pathfmt.temppath = ""
|
||||
|
@ -196,7 +196,7 @@ class DownloadJob(Job):
|
||||
archive = self.archive
|
||||
|
||||
# prepare download
|
||||
pathfmt.set_keywords(keywords)
|
||||
pathfmt.set_filename(keywords)
|
||||
|
||||
if postprocessors:
|
||||
for pp in postprocessors:
|
||||
@ -364,7 +364,7 @@ class SimulationJob(DownloadJob):
|
||||
"""Simulate the extraction process without downloading anything"""
|
||||
|
||||
def handle_url(self, url, keywords, fallback=None):
|
||||
self.pathfmt.set_keywords(keywords)
|
||||
self.pathfmt.set_filename(keywords)
|
||||
self.out.skip(self.pathfmt.path)
|
||||
if self.sleep:
|
||||
time.sleep(self.sleep)
|
||||
|
@ -33,7 +33,7 @@ class ClassifyPP(PostProcessor):
|
||||
}
|
||||
|
||||
def prepare(self, pathfmt):
|
||||
ext = pathfmt.keywords.get("extension")
|
||||
ext = pathfmt.extension
|
||||
|
||||
if ext in self.mapping:
|
||||
self._dir = pathfmt.realdirectory + os.sep + self.mapping[ext]
|
||||
|
@ -36,15 +36,14 @@ class MetadataPP(PostProcessor):
|
||||
def run(self, pathfmt):
|
||||
path = "{}.{}".format(pathfmt.realpath, self.extension)
|
||||
with open(path, "w", encoding="utf-8") as file:
|
||||
self.write(file, pathfmt)
|
||||
self.write(file, pathfmt.kwdict)
|
||||
|
||||
def _write_custom(self, file, pathfmt):
|
||||
output = self.formatter.format_map(pathfmt.keywords)
|
||||
def _write_custom(self, file, kwdict):
|
||||
output = self.formatter.format_map(kwdict)
|
||||
file.write(output)
|
||||
|
||||
def _write_tags(self, file, pathfmt):
|
||||
kwds = pathfmt.keywords
|
||||
tags = kwds.get("tags") or kwds.get("tag_string")
|
||||
def _write_tags(self, file, kwdict):
|
||||
tags = kwdict.get("tags") or kwdict.get("tag_string")
|
||||
|
||||
if not tags:
|
||||
return
|
||||
@ -58,8 +57,8 @@ class MetadataPP(PostProcessor):
|
||||
file.write("\n".join(tags))
|
||||
file.write("\n")
|
||||
|
||||
def _write_json(self, file, pathfmt):
|
||||
util.dump_json(pathfmt.keywords, file, self.ascii, self.indent)
|
||||
def _write_json(self, file, kwdict):
|
||||
util.dump_json(kwdict, file, self.ascii, self.indent)
|
||||
|
||||
|
||||
__postprocessor__ = MetadataPP
|
||||
|
@ -19,9 +19,9 @@ class MtimePP(PostProcessor):
|
||||
self.key = options.get("key", "date")
|
||||
|
||||
def run(self, pathfmt):
|
||||
mtime = pathfmt.keywords.get(self.key)
|
||||
mtime = pathfmt.kwdict.get(self.key)
|
||||
ts = getattr(mtime, "timestamp", None)
|
||||
pathfmt.keywords["_mtime"] = ts() if ts else parse_int(mtime)
|
||||
pathfmt.kwdict["_mtime"] = ts() if ts else parse_int(mtime)
|
||||
|
||||
|
||||
__postprocessor__ = MtimePP
|
||||
|
@ -52,13 +52,13 @@ class UgoiraPP(PostProcessor):
|
||||
def prepare(self, pathfmt):
|
||||
self._frames = None
|
||||
|
||||
if pathfmt.keywords["extension"] != "zip":
|
||||
if pathfmt.extension != "zip":
|
||||
return
|
||||
|
||||
if "frames" in pathfmt.keywords:
|
||||
self._frames = pathfmt.keywords["frames"]
|
||||
elif "pixiv_ugoira_frame_data" in pathfmt.keywords:
|
||||
self._frames = pathfmt.keywords["pixiv_ugoira_frame_data"]["data"]
|
||||
if "frames" in pathfmt.kwdict:
|
||||
self._frames = pathfmt.kwdict["frames"]
|
||||
elif "pixiv_ugoira_frame_data" in pathfmt.kwdict:
|
||||
self._frames = pathfmt.kwdict["pixiv_ugoira_frame_data"]["data"]
|
||||
else:
|
||||
return
|
||||
|
||||
|
@ -523,13 +523,12 @@ class PathFormat():
|
||||
except Exception as exc:
|
||||
raise exception.FormatError(exc, "filename")
|
||||
|
||||
self.delete = False
|
||||
self.has_extension = False
|
||||
self.keywords = {}
|
||||
self.filename = ""
|
||||
self.directory = self.realdirectory = ""
|
||||
self.filename = ""
|
||||
self.extension = ""
|
||||
self.kwdict = {}
|
||||
self.delete = False
|
||||
self.path = self.realpath = self.temppath = ""
|
||||
self.suffix = ""
|
||||
|
||||
self.basedirectory = expand_path(
|
||||
extractor.config("base-directory", (".", "gallery-dl")))
|
||||
@ -563,9 +562,9 @@ class PathFormat():
|
||||
|
||||
def exists(self, archive=None):
|
||||
"""Return True if the file exists on disk or in 'archive'"""
|
||||
if archive and archive.check(self.keywords):
|
||||
if archive and archive.check(self.kwdict):
|
||||
return self.fix_extension()
|
||||
if self.has_extension and os.path.exists(self.realpath):
|
||||
if self.extension and os.path.exists(self.realpath):
|
||||
return self.check_file()
|
||||
return False
|
||||
|
||||
@ -576,71 +575,79 @@ class PathFormat():
|
||||
def _enum_file(self):
|
||||
num = 1
|
||||
while True:
|
||||
suffix = "." + str(num)
|
||||
rpath = self.realpath + suffix
|
||||
if not os.path.exists(rpath):
|
||||
self.path += suffix
|
||||
self.realpath = rpath
|
||||
self.suffix = suffix
|
||||
self.set_extension("{}.{}".format(num, self.extension), False)
|
||||
if not os.path.exists(self.realpath):
|
||||
return False
|
||||
num += 1
|
||||
|
||||
def set_directory(self, keywords):
|
||||
def set_directory(self, kwdict):
|
||||
"""Build directory path and create it if necessary"""
|
||||
|
||||
# Build path segments by applying 'kwdict' to directory format strings
|
||||
try:
|
||||
segments = [
|
||||
self.clean_path(
|
||||
Formatter(segment, self.kwdefault)
|
||||
.format_map(keywords).strip())
|
||||
.format_map(kwdict).strip())
|
||||
for segment in self.directory_fmt
|
||||
]
|
||||
except Exception as exc:
|
||||
raise exception.FormatError(exc, "directory")
|
||||
|
||||
self.directory = os.path.join(
|
||||
self.basedirectory,
|
||||
*segments
|
||||
)
|
||||
# Join path segements
|
||||
self.directory = os.path.join(self.basedirectory, *segments)
|
||||
|
||||
# remove trailing path separator;
|
||||
# Remove trailing path separator;
|
||||
# occurs if the last argument to os.path.join() is an empty string
|
||||
if self.directory[-1] == os.sep:
|
||||
self.directory = self.directory[:-1]
|
||||
|
||||
self.realdirectory = self.adjust_path(self.directory)
|
||||
# Enable longer-than-260-character paths on Windows
|
||||
if os.name == "nt":
|
||||
self.realdirectory = "\\\\?\\" + os.path.abspath(self.directory)
|
||||
else:
|
||||
self.realdirectory = self.directory
|
||||
|
||||
# Create directory tree
|
||||
os.makedirs(self.realdirectory, exist_ok=True)
|
||||
|
||||
def set_keywords(self, keywords):
|
||||
"""Set filename keywords"""
|
||||
self.keywords = keywords
|
||||
self.temppath = self.suffix = ""
|
||||
self.has_extension = bool(keywords.get("extension"))
|
||||
if self.has_extension:
|
||||
def set_filename(self, kwdict):
|
||||
"""Set general filename data"""
|
||||
self.kwdict = kwdict
|
||||
self.temppath = ""
|
||||
self.extension = kwdict["extension"]
|
||||
|
||||
if self.extension:
|
||||
self.build_path()
|
||||
|
||||
def set_extension(self, extension, real=True):
|
||||
"""Set the 'extension' keyword"""
|
||||
self.has_extension = real
|
||||
self.keywords["extension"] = extension
|
||||
"""Set filename extension"""
|
||||
if real:
|
||||
self.extension = extension
|
||||
self.kwdict["extension"] = extension
|
||||
self.build_path()
|
||||
|
||||
def fix_extension(self, _=None):
|
||||
if not self.has_extension:
|
||||
self.set_extension("")
|
||||
"""Fix filenames without a given filename extension"""
|
||||
if not self.extension:
|
||||
self.set_extension("", False)
|
||||
if self.path[-1] == ".":
|
||||
self.path = self.path[:-1]
|
||||
self.temppath = self.realpath = self.realpath[:-1]
|
||||
return True
|
||||
|
||||
def build_path(self):
|
||||
"""Use filename-keywords and directory to build a full path"""
|
||||
"""Use filename metadata and directory to build a full path"""
|
||||
|
||||
# Apply 'kwdict' to filename format string
|
||||
try:
|
||||
self.filename = self.clean_path(
|
||||
self.formatter.format_map(self.keywords))
|
||||
self.formatter.format_map(self.kwdict))
|
||||
except Exception as exc:
|
||||
raise exception.FormatError(exc, "filename")
|
||||
|
||||
filename = os.sep + self.filename + self.suffix
|
||||
# Combine directory and filename to full paths
|
||||
filename = os.sep + self.filename
|
||||
self.path = self.directory + filename
|
||||
self.realpath = self.realdirectory + filename
|
||||
if not self.temppath:
|
||||
@ -648,7 +655,7 @@ class PathFormat():
|
||||
|
||||
def part_enable(self, part_directory=None):
|
||||
"""Enable .part file usage"""
|
||||
if self.has_extension:
|
||||
if self.extension:
|
||||
self.temppath += ".part"
|
||||
else:
|
||||
self.set_extension("part", False)
|
||||
@ -674,16 +681,16 @@ class PathFormat():
|
||||
return
|
||||
|
||||
if self.temppath != self.realpath:
|
||||
# move temp file to its actual location
|
||||
# Move temp file to its actual location
|
||||
try:
|
||||
os.replace(self.temppath, self.realpath)
|
||||
except OSError:
|
||||
shutil.copyfile(self.temppath, self.realpath)
|
||||
os.unlink(self.temppath)
|
||||
|
||||
if "_mtime" in self.keywords:
|
||||
# set file modification time
|
||||
mtime = self.keywords["_mtime"]
|
||||
if "_mtime" in self.kwdict:
|
||||
# Set file modification time
|
||||
mtime = self.kwdict["_mtime"]
|
||||
if mtime:
|
||||
try:
|
||||
if isinstance(mtime, str):
|
||||
@ -692,11 +699,6 @@ class PathFormat():
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def adjust_path(path):
|
||||
"""Enable longer-than-260-character paths on windows"""
|
||||
return "\\\\?\\" + os.path.abspath(path) if os.name == "nt" else path
|
||||
|
||||
|
||||
class DownloadArchive():
|
||||
|
||||
|
@ -120,7 +120,7 @@ class TestDownloaderBase(unittest.TestCase):
|
||||
}
|
||||
pathfmt = PathFormat(cls.extractor)
|
||||
pathfmt.set_directory(kwdict)
|
||||
pathfmt.set_keywords(kwdict)
|
||||
pathfmt.set_filename(kwdict)
|
||||
|
||||
if content:
|
||||
mode = "w" + ("b" if isinstance(content, bytes) else "")
|
||||
@ -145,7 +145,7 @@ class TestDownloaderBase(unittest.TestCase):
|
||||
|
||||
# test filename extension
|
||||
self.assertEqual(
|
||||
pathfmt.keywords["extension"],
|
||||
pathfmt.extension,
|
||||
expected_extension,
|
||||
)
|
||||
self.assertEqual(
|
||||
|
@ -218,8 +218,8 @@ class TestPathfmt():
|
||||
self.hashobj = hashobj
|
||||
self.path = ""
|
||||
self.size = 0
|
||||
self.keywords = {}
|
||||
self.has_extension = True
|
||||
self.kwdict = {}
|
||||
self.extension = "jpg"
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
Loading…
Reference in New Issue
Block a user