1
0
mirror of https://github.com/mikf/gallery-dl.git synced 2024-11-23 11:12:40 +01:00
gallery-dl/gallery_dl/util.py

862 lines
23 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# Copyright 2017-2022 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
2017-03-28 13:12:44 +02:00
"""Utility functions and classes"""
import re
2017-03-28 13:12:44 +02:00
import os
import sys
import json
import time
import random
2018-01-29 22:13:06 +01:00
import sqlite3
import binascii
import datetime
2021-06-03 22:34:58 +02:00
import functools
2017-12-03 01:38:24 +01:00
import itertools
import subprocess
import urllib.parse
from http.cookiejar import Cookie
from email.utils import mktime_tz, parsedate_tz
2022-05-04 12:09:56 +02:00
from . import text, exception
2018-03-14 13:17:34 +01:00
def bencode(num, alphabet="0123456789"):
"""Encode an integer into a base-N encoded string"""
data = ""
base = len(alphabet)
while num:
num, remainder = divmod(num, base)
data = alphabet[remainder] + data
return data
def bdecode(data, alphabet="0123456789"):
"""Decode a base-N encoded string ( N = len(alphabet) )"""
num = 0
base = len(alphabet)
for c in data:
num *= base
num += alphabet.index(c)
return num
2017-12-03 01:38:24 +01:00
def advance(iterable, num):
2020-10-29 23:26:11 +01:00
""""Advance 'iterable' by 'num' steps"""
2017-12-03 01:38:24 +01:00
iterator = iter(iterable)
next(itertools.islice(iterator, num, num), None)
return iterator
2020-10-29 23:26:11 +01:00
def unique(iterable):
"""Yield unique elements from 'iterable' while preserving order"""
seen = set()
add = seen.add
for element in iterable:
if element not in seen:
add(element)
yield element
2021-03-02 23:01:38 +01:00
def unique_sequence(iterable):
"""Yield sequentially unique elements from 'iterable'"""
last = None
for element in iterable:
if element != last:
last = element
yield element
def contains(values, elements, separator=" "):
"""Returns True if at least one of 'elements' is contained in 'values'"""
if isinstance(values, str):
values = values.split(separator)
if not isinstance(elements, (tuple, list)):
return elements in values
for e in elements:
if e in values:
return True
return False
2019-10-28 15:06:17 +01:00
def raises(cls):
"""Returns a function that raises 'cls' as exception"""
def wrap(*args):
raise cls(*args)
return wrap
def identity(x):
"""Returns its argument"""
return x
def true(_):
"""Always returns True"""
return True
def false(_):
"""Always returns False"""
return False
def noop():
"""Does nothing"""
def generate_token(size=16):
"""Generate a random token with hexadecimal digits"""
data = random.getrandbits(size * 8).to_bytes(size, "big")
return binascii.hexlify(data).decode()
def format_value(value, suffixes="kMGTPEZY"):
2021-07-26 01:55:02 +02:00
value = format(value)
value_len = len(value)
index = value_len - 4
if index >= 0:
offset = (value_len - 1) % 3 + 1
return (value[:offset] + "." + value[offset:offset+2] +
suffixes[index // 3])
return value
2021-07-26 01:55:02 +02:00
def combine_dict(a, b):
"""Recursively combine the contents of 'b' into 'a'"""
for key, value in b.items():
if key in a and isinstance(value, dict) and isinstance(a[key], dict):
combine_dict(a[key], value)
else:
a[key] = value
2017-08-13 14:31:22 +02:00
return a
def transform_dict(a, func):
"""Recursively apply 'func' to all values in 'a'"""
for key, value in a.items():
if isinstance(value, dict):
transform_dict(value, func)
else:
a[key] = func(value)
def filter_dict(a):
"""Return a copy of 'a' without "private" entries"""
return {k: v for k, v in a.items() if k[0] != "_"}
def delete_items(obj, keys):
"""Remove all 'keys' from 'obj'"""
for key in keys:
if key in obj:
del obj[key]
def enumerate_reversed(iterable, start=0, length=None):
"""Enumerate 'iterable' and return its elements in reverse order"""
if length is None:
length = len(iterable)
try:
iterable = zip(range(start-1+length, start-1, -1), reversed(iterable))
except TypeError:
iterable = list(zip(range(start, start+length), iterable))
iterable.reverse()
return iterable
2019-02-14 11:15:19 +01:00
def number_to_string(value, numbers=(int, float)):
"""Convert numbers (int, float) to string; Return everything else as is."""
2019-02-14 11:15:19 +01:00
return str(value) if value.__class__ in numbers else value
def to_string(value):
"""str() with "better" defaults"""
if not value:
return ""
if value.__class__ is list:
try:
return ", ".join(value)
except Exception:
return ", ".join(map(str, value))
return str(value)
def datetime_to_timestamp(dt):
"""Convert naive UTC datetime to timestamp"""
return (dt - EPOCH) / SECOND
def datetime_to_timestamp_string(dt):
"""Convert naive UTC datetime to timestamp string"""
try:
return str((dt - EPOCH) // SECOND)
except Exception:
return ""
def dump_json(obj, fp=sys.stdout, ensure_ascii=True, indent=4):
"""Serialize 'obj' as JSON and write it to 'fp'"""
json.dump(
obj, fp,
ensure_ascii=ensure_ascii,
indent=indent,
default=str,
sort_keys=True,
)
fp.write("\n")
def dump_response(response, fp, *,
headers=False, content=True, hide_auth=True):
"""Write the contents of 'response' into a file-like object"""
if headers:
request = response.request
req_headers = request.headers.copy()
res_headers = response.headers.copy()
outfmt = """\
{request.method} {request.url}
Status: {response.status_code} {response.reason}
Request Headers
---------------
{request_headers}
Response Headers
----------------
{response_headers}
"""
if hide_auth:
authorization = req_headers.get("Authorization")
if authorization:
atype, sep, _ = authorization.partition(" ")
req_headers["Authorization"] = atype + " ***" if sep else "***"
cookie = req_headers.get("Cookie")
if cookie:
req_headers["Cookie"] = ";".join(
c.partition("=")[0] + "=***"
for c in cookie.split(";")
)
set_cookie = res_headers.get("Set-Cookie")
if set_cookie:
res_headers["Set-Cookie"] = re.sub(
r"(^|, )([^ =]+)=[^,;]*", r"\1\2=***", set_cookie,
)
fp.write(outfmt.format(
request=request,
response=response,
request_headers="\n".join(
name + ": " + value
for name, value in req_headers.items()
),
response_headers="\n".join(
name + ": " + value
for name, value in res_headers.items()
),
).encode())
if content:
if headers:
fp.write(b"\nContent\n-------\n")
fp.write(response.content)
2022-11-07 16:33:26 +01:00
def extract_headers(response):
headers = response.headers
data = dict(headers)
hcd = headers.get("content-disposition")
if hcd:
name = text.extr(hcd, 'filename="', '"')
if name:
text.nameext_from_url(name, data)
hlm = headers.get("last-modified")
if hlm:
data["date"] = datetime.datetime(*parsedate_tz(hlm)[:6])
return data
@functools.lru_cache(maxsize=None)
def git_head():
try:
out, err = subprocess.Popen(
("git", "rev-parse", "--short", "HEAD"),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=os.path.dirname(os.path.abspath(__file__)),
).communicate()
if out and not err:
return out.decode().rstrip()
except (OSError, subprocess.SubprocessError):
pass
return None
2017-10-26 00:04:28 +02:00
def expand_path(path):
"""Expand environment variables and tildes (~)"""
if not path:
return path
if not isinstance(path, str):
path = os.path.join(*path)
2017-10-26 00:04:28 +02:00
return os.path.expandvars(os.path.expanduser(path))
def remove_file(path):
try:
os.unlink(path)
except OSError:
pass
def remove_directory(path):
try:
os.rmdir(path)
except OSError:
pass
def set_mtime(path, mtime):
try:
if isinstance(mtime, str):
mtime = mktime_tz(parsedate_tz(mtime))
os.utime(path, (time.time(), mtime))
except Exception:
pass
def cookiestxt_load(fp, cookiejar):
"""Parse a Netscape cookies.txt file and add its Cookies to 'cookiejar'"""
set_cookie = cookiejar.set_cookie
for line in fp:
line = line.lstrip(" ")
# strip '#HttpOnly_'
if line.startswith("#HttpOnly_"):
line = line[10:]
# ignore empty lines and comments
if not line or line[0] in ("#", "$", "\n"):
continue
# strip trailing '\n'
if line[-1] == "\n":
line = line[:-1]
domain, domain_specified, path, secure, expires, name, value = \
line.split("\t")
if not name:
name = value
value = None
set_cookie(Cookie(
0, name, value,
None, False,
domain,
domain_specified == "TRUE",
domain.startswith("."),
path, False,
secure == "TRUE",
None if expires == "0" or not expires else expires,
False, None, None, {},
))
def cookiestxt_store(fp, cookies):
"""Write 'cookies' in Netscape cookies.txt format to 'fp'"""
write = fp.write
write("# Netscape HTTP Cookie File\n\n")
for cookie in cookies:
if not cookie.domain:
continue
if cookie.value is None:
name = ""
value = cookie.name
else:
name = cookie.name
value = cookie.value
write("\t".join((
cookie.domain,
"TRUE" if cookie.domain.startswith(".") else "FALSE",
cookie.path,
"TRUE" if cookie.secure else "FALSE",
"0" if cookie.expires is None else str(cookie.expires),
name,
value + "\n",
)))
def code_to_language(code, default=None):
2017-03-28 13:12:44 +02:00
"""Map an ISO 639-1 language code to its actual name"""
return CODES.get((code or "").lower(), default)
2017-03-28 13:12:44 +02:00
def language_to_code(lang, default=None):
2017-03-28 13:12:44 +02:00
"""Map a language name to its ISO 639-1 code"""
if lang is None:
return default
2017-03-28 13:12:44 +02:00
lang = lang.capitalize()
for code, language in CODES.items():
2017-03-28 13:12:44 +02:00
if language == lang:
return code
return default
CODES = {
2017-03-28 13:12:44 +02:00
"ar": "Arabic",
"bg": "Bulgarian",
"ca": "Catalan",
2017-03-28 13:12:44 +02:00
"cs": "Czech",
"da": "Danish",
"de": "German",
"el": "Greek",
"en": "English",
"es": "Spanish",
"fi": "Finnish",
"fr": "French",
"he": "Hebrew",
"hu": "Hungarian",
"id": "Indonesian",
"it": "Italian",
"ja": "Japanese",
2017-03-28 13:12:44 +02:00
"ko": "Korean",
"ms": "Malay",
"nl": "Dutch",
"no": "Norwegian",
"pl": "Polish",
"pt": "Portuguese",
"ro": "Romanian",
"ru": "Russian",
"sv": "Swedish",
"th": "Thai",
"tr": "Turkish",
"vi": "Vietnamese",
"zh": "Chinese",
}
def parse_inputfile(file, log):
"""Filter and process strings from an input file.
Lines starting with '#' and empty lines will be ignored.
Lines starting with '-' will be interpreted as a key-value pair separated
by an '='. where 'key' is a dot-separated option name and 'value' is a
JSON-parsable value. These configuration options will be applied while
processing the next URL.
Lines starting with '-G' are the same as above, except these options will
be applied for *all* following URLs, i.e. they are Global.
Everything else will be used as a potential URL.
Example input file:
# settings global options
-G base-directory = "/tmp/"
-G skip = false
# setting local options for the next URL
-filename="spaces_are_optional.jpg"
-skip = true
https://example.org/
# next URL uses default filename and 'skip' is false.
https://example.com/index.htm # comment1
https://example.com/404.htm # comment2
"""
gconf = []
lconf = []
strip_comment = None
for line in file:
line = line.strip()
if not line or line[0] == "#":
# empty line or comment
continue
elif line[0] == "-":
# config spec
if len(line) >= 2 and line[1] == "G":
conf = gconf
line = line[2:]
else:
conf = lconf
line = line[1:]
key, sep, value = line.partition("=")
if not sep:
log.warning("input file: invalid <key>=<value> pair: %s", line)
continue
try:
value = json.loads(value.strip())
except ValueError as exc:
log.warning("input file: unable to parse '%s': %s", value, exc)
continue
key = key.strip().split(".")
conf.append((key[:-1], key[-1], value))
else:
# url
if " #" in line or "\t#" in line:
if strip_comment is None:
strip_comment = re.compile(r"\s+#.*").sub
line = strip_comment("", line)
if gconf or lconf:
yield ExtendedUrl(line, gconf, lconf)
gconf = []
lconf = []
else:
yield line
2022-12-06 18:44:46 +01:00
class CustomNone():
"""None-style type that supports more operations than regular None"""
2019-02-14 11:15:19 +01:00
__slots__ = ()
def __getattribute__(self, _):
return self
def __getitem__(self, _):
return self
2022-12-06 18:44:46 +01:00
def __iter__(self):
return self
def __call__(self, *args, **kwargs):
return self
@staticmethod
def __next__():
raise StopIteration
@staticmethod
def __bool__():
return False
2022-12-06 18:44:46 +01:00
@staticmethod
def __len__():
return 0
@staticmethod
def __format__(_):
return "None"
@staticmethod
def __str__():
return "None"
__repr__ = __str__
2022-12-06 18:44:46 +01:00
NONE = CustomNone()
EPOCH = datetime.datetime(1970, 1, 1)
SECOND = datetime.timedelta(0, 1)
2020-05-19 21:42:11 +02:00
WINDOWS = (os.name == "nt")
2020-05-19 21:47:18 +02:00
SENTINEL = object()
2021-06-03 22:34:58 +02:00
SPECIAL_EXTRACTORS = {"oauth", "recursive", "test"}
GLOBALS = {
"contains" : contains,
2021-06-03 22:34:58 +02:00
"parse_int": text.parse_int,
"urlsplit" : urllib.parse.urlsplit,
"datetime" : datetime.datetime,
"timedelta": datetime.timedelta,
2021-06-03 22:34:58 +02:00
"abort" : raises(exception.StopExtraction),
"terminate": raises(exception.TerminateExtraction),
"re" : re,
}
def compile_expression(expr, name="<expr>", globals=GLOBALS):
code_object = compile(expr, name, "eval")
return functools.partial(eval, code_object, globals)
def build_duration_func(duration, min=0.0):
if not duration:
if min:
return lambda: min
return None
if isinstance(duration, str):
lower, _, upper = duration.partition("-")
lower = float(lower)
else:
try:
lower, upper = duration
except TypeError:
lower, upper = duration, None
if upper:
upper = float(upper)
return functools.partial(
random.uniform,
lower if lower > min else min,
upper if upper > min else min,
)
else:
if lower < min:
lower = min
return lambda: lower
def build_extractor_filter(categories, negate=True, special=None):
"""Build a function that takes an Extractor class as argument
and returns True if that class is allowed by 'categories'
"""
if isinstance(categories, str):
categories = categories.split(",")
catset = set() # set of categories / basecategories
subset = set() # set of subcategories
catsub = [] # list of category-subcategory pairs
for item in categories:
category, _, subcategory = item.partition(":")
if category and category != "*":
if subcategory and subcategory != "*":
catsub.append((category, subcategory))
else:
catset.add(category)
elif subcategory and subcategory != "*":
subset.add(subcategory)
if special:
catset |= special
elif not catset and not subset and not catsub:
return true if negate else false
tests = []
if negate:
if catset:
tests.append(lambda extr:
extr.category not in catset and
extr.basecategory not in catset)
if subset:
tests.append(lambda extr: extr.subcategory not in subset)
else:
if catset:
tests.append(lambda extr:
extr.category in catset or
extr.basecategory in catset)
if subset:
tests.append(lambda extr: extr.subcategory in subset)
if catsub:
def test(extr):
for category, subcategory in catsub:
if category in (extr.category, extr.basecategory) and \
subcategory == extr.subcategory:
return not negate
return negate
tests.append(test)
if len(tests) == 1:
return tests[0]
if negate:
return lambda extr: all(t(extr) for t in tests)
else:
return lambda extr: any(t(extr) for t in tests)
def build_proxy_map(proxies, log=None):
"""Generate a proxy map"""
if not proxies:
return None
if isinstance(proxies, str):
if "://" not in proxies:
proxies = "http://" + proxies.lstrip("/")
return {"http": proxies, "https": proxies}
if isinstance(proxies, dict):
for scheme, proxy in proxies.items():
if "://" not in proxy:
proxies[scheme] = "http://" + proxy.lstrip("/")
return proxies
if log:
log.warning("invalid proxy specifier: %s", proxies)
def build_predicate(predicates):
if not predicates:
return lambda url, kwdict: True
elif len(predicates) == 1:
return predicates[0]
return functools.partial(chain_predicates, predicates)
def chain_predicates(predicates, url, kwdict):
for pred in predicates:
if not pred(url, kwdict):
return False
return True
class RangePredicate():
2022-12-23 22:39:32 +01:00
"""Predicate; True if the current index is in the given range(s)"""
def __init__(self, rangespec):
2022-12-23 22:39:32 +01:00
self.ranges = ranges = self._parse(rangespec)
self.index = 0
2022-12-23 22:39:32 +01:00
if ranges:
# technically wrong, but good enough for now
# and evaluating min/max for a large range is slow
2022-12-23 22:39:32 +01:00
self.lower = min(r.start for r in ranges)
self.upper = max(r.stop for r in ranges) - 1
else:
2022-12-23 22:39:32 +01:00
self.lower = self.upper = 0
2022-12-23 22:39:32 +01:00
def __call__(self, _url, _kwdict):
self.index = index = self.index + 1
2022-12-23 22:39:32 +01:00
if index > self.upper:
raise exception.StopExtraction()
2022-12-23 22:39:32 +01:00
for range in self.ranges:
if index in range:
return True
return False
2017-03-28 13:12:44 +02:00
@staticmethod
2022-12-23 22:39:32 +01:00
def _parse(rangespec):
"""Parse an integer range string and return the resulting ranges
Examples:
_parse("-2,4,6-8,10-") -> [(1,3), (4,5), (6,9), (10,INTMAX)]
_parse(" - 3 , 4- 4, 2-6") -> [(1,4), (4,5), (2,7)]
_parse("1:2,4:8:2") -> [(1,1), (4,7,2)]
"""
ranges = []
2022-12-23 22:39:32 +01:00
append = ranges.append
if isinstance(rangespec, str):
rangespec = rangespec.split(",")
2022-12-23 22:39:32 +01:00
for group in rangespec:
if not group:
continue
2022-12-23 22:39:32 +01:00
elif ":" in group:
start, _, stop = group.partition(":")
stop, _, step = stop.partition(":")
2022-12-23 22:39:32 +01:00
append(range(
int(start) if start.strip() else 1,
int(stop) if stop.strip() else sys.maxsize,
int(step) if step.strip() else 1,
2022-12-23 22:39:32 +01:00
))
elif "-" in group:
start, _, stop = group.partition("-")
append(range(
int(start) if start.strip() else 1,
int(stop) + 1 if stop.strip() else sys.maxsize,
))
else:
start = int(group)
append(range(start, start+1))
return ranges
2017-03-28 13:12:44 +02:00
class UniquePredicate():
"""Predicate; True if given URL has not been encountered before"""
def __init__(self):
self.urls = set()
def __call__(self, url, _):
if url.startswith("text:"):
return True
if url not in self.urls:
self.urls.add(url)
return True
return False
class FilterPredicate():
"""Predicate; True if evaluating the given expression returns True"""
2021-06-03 22:34:58 +02:00
def __init__(self, expr, target="image"):
if not isinstance(expr, str):
expr = "(" + ") and (".join(expr) + ")"
name = "<{} filter>".format(target)
2021-06-03 22:34:58 +02:00
self.expr = compile_expression(expr, name)
2021-06-03 22:34:58 +02:00
def __call__(self, _, kwdict):
try:
2021-06-03 22:34:58 +02:00
return self.expr(kwdict)
except exception.GalleryDLException:
raise
except Exception as exc:
raise exception.FilterError(exc)
class ExtendedUrl():
"""URL with attached config key-value pairs"""
def __init__(self, url, gconf, lconf):
self.value, self.gconfig, self.lconfig = url, gconf, lconf
def __str__(self):
return self.value
2018-01-29 22:13:06 +01:00
class DownloadArchive():
def __init__(self, path, format_string, cache_key="_archive_key"):
try:
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
except sqlite3.OperationalError:
os.makedirs(os.path.dirname(path))
con = sqlite3.connect(path, timeout=60, check_same_thread=False)
2018-01-29 22:13:06 +01:00
con.isolation_level = None
2019-09-10 22:26:40 +02:00
self.close = con.close
2018-01-29 22:13:06 +01:00
self.cursor = con.cursor()
2022-05-04 12:09:56 +02:00
from . import formatter
self.keygen = formatter.parse(format_string).format_map
self._cache_key = cache_key
try:
self.cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY) WITHOUT ROWID")
except sqlite3.OperationalError:
# fallback for missing WITHOUT ROWID support (#553)
self.cursor.execute("CREATE TABLE IF NOT EXISTS archive "
"(entry TEXT PRIMARY KEY)")
2018-01-29 22:13:06 +01:00
def check(self, kwdict):
"""Return True if the item described by 'kwdict' exists in archive"""
key = kwdict[self._cache_key] = self.keygen(kwdict)
2018-01-29 22:13:06 +01:00
self.cursor.execute(
"SELECT 1 FROM archive WHERE entry=? LIMIT 1", (key,))
2018-01-29 22:13:06 +01:00
return self.cursor.fetchone()
def add(self, kwdict):
"""Add item described by 'kwdict' to archive"""
key = kwdict.get(self._cache_key) or self.keygen(kwdict)
2018-01-29 22:13:06 +01:00
self.cursor.execute(
"INSERT OR IGNORE INTO archive (entry) VALUES (?)", (key,))