1
0
mirror of https://github.com/mikf/gallery-dl.git synced 2024-11-23 11:12:40 +01:00
gallery-dl/gallery_dl/extractor/foolfuuka.py
2023-09-19 00:02:04 +02:00

253 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2019-2023 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for FoolFuuka 4chan archives"""
from .common import BaseExtractor, Message
from .. import text
import itertools
class FoolfuukaExtractor(BaseExtractor):
"""Base extractor for FoolFuuka based boards/archives"""
basecategory = "foolfuuka"
filename_fmt = "{timestamp_ms} {filename_media}.{extension}"
archive_fmt = "{board[shortname]}_{num}_{timestamp}"
external = "default"
def __init__(self, match):
BaseExtractor.__init__(self, match)
if self.category == "b4k":
self.remote = self._remote_direct
def items(self):
yield Message.Directory, self.metadata()
for post in self.posts():
media = post["media"]
if not media:
continue
url = media["media_link"]
if not url and "remote_media_link" in media:
url = self.remote(media)
if url.startswith("/"):
url = self.root + url
post["filename"], _, post["extension"] = \
media["media"].rpartition(".")
post["filename_media"] = media["media_filename"].rpartition(".")[0]
post["timestamp_ms"] = text.parse_int(
media["media_orig"].rpartition(".")[0])
yield Message.Url, url, post
def metadata(self):
"""Return general metadata"""
def posts(self):
"""Return an iterable with all relevant posts"""
def remote(self, media):
"""Resolve a remote media link"""
needle = '<meta http-equiv="Refresh" content="0; url='
page = self.request(media["remote_media_link"]).text
return text.extr(page, needle, '"')
@staticmethod
def _remote_direct(media):
return media["remote_media_link"]
BASE_PATTERN = FoolfuukaExtractor.update({
"4plebs": {
"root": "https://archive.4plebs.org",
"pattern": r"(?:archive\.)?4plebs\.org",
},
"archivedmoe": {
"root": "https://archived.moe",
"pattern": r"archived\.moe",
},
"archiveofsins": {
"root": "https://archiveofsins.com",
"pattern": r"(?:www\.)?archiveofsins\.com",
},
"b4k": {
"root": "https://arch.b4k.co",
"pattern": r"arch\.b4k\.co",
},
"desuarchive": {
"root": "https://desuarchive.org",
"pattern": r"desuarchive\.org",
},
"fireden": {
"root": "https://boards.fireden.net",
"pattern": r"boards\.fireden\.net",
},
"palanq": {
"root": "https://archive.palanq.win",
"pattern": r"archive\.palanq\.win",
},
"rbt": {
"root": "https://rbt.asia",
"pattern": r"(?:rbt\.asia|(?:archive\.)?rebeccablacktech\.com)",
},
"thebarchive": {
"root": "https://thebarchive.com",
"pattern": r"thebarchive\.com",
},
})
class FoolfuukaThreadExtractor(FoolfuukaExtractor):
"""Base extractor for threads on FoolFuuka based boards/archives"""
subcategory = "thread"
directory_fmt = ("{category}", "{board[shortname]}",
"{thread_num} {title|comment[:50]}")
pattern = BASE_PATTERN + r"/([^/?#]+)/thread/(\d+)"
example = "https://archived.moe/a/thread/12345/"
def __init__(self, match):
FoolfuukaExtractor.__init__(self, match)
self.board = match.group(match.lastindex-1)
self.thread = match.group(match.lastindex)
self.data = None
def metadata(self):
url = self.root + "/_/api/chan/thread/"
params = {"board": self.board, "num": self.thread}
self.data = self.request(url, params=params).json()[self.thread]
return self.data["op"]
def posts(self):
op = (self.data["op"],)
posts = self.data.get("posts")
if posts:
posts = list(posts.values())
posts.sort(key=lambda p: p["timestamp"])
return itertools.chain(op, posts)
return op
class FoolfuukaBoardExtractor(FoolfuukaExtractor):
"""Base extractor for FoolFuuka based boards/archives"""
subcategory = "board"
pattern = BASE_PATTERN + r"/([^/?#]+)/\d*$"
example = "https://archived.moe/a/"
def __init__(self, match):
FoolfuukaExtractor.__init__(self, match)
self.board = match.group(match.lastindex)
def items(self):
index_base = "{}/_/api/chan/index/?board={}&page=".format(
self.root, self.board)
thread_base = "{}/{}/thread/".format(self.root, self.board)
for page in itertools.count(1):
with self.request(index_base + format(page)) as response:
try:
threads = response.json()
except ValueError:
threads = None
if not threads:
return
for num, thread in threads.items():
thread["url"] = thread_base + format(num)
thread["_extractor"] = FoolfuukaThreadExtractor
yield Message.Queue, thread["url"], thread
class FoolfuukaSearchExtractor(FoolfuukaExtractor):
"""Base extractor for search results on FoolFuuka based boards/archives"""
subcategory = "search"
directory_fmt = ("{category}", "search", "{search}")
pattern = BASE_PATTERN + r"/([^/?#]+)/search((?:/[^/?#]+/[^/?#]+)+)"
example = "https://archived.moe/_/search/text/QUERY/"
request_interval = 1.0
def __init__(self, match):
FoolfuukaExtractor.__init__(self, match)
self.params = params = {}
args = match.group(match.lastindex).split("/")
key = None
for arg in args:
if key:
params[key] = text.unescape(arg)
key = None
else:
key = arg
board = match.group(match.lastindex-1)
if board != "_":
params["boards"] = board
def metadata(self):
return {"search": self.params.get("text", "")}
def posts(self):
url = self.root + "/_/api/chan/search/"
params = self.params.copy()
params["page"] = text.parse_int(params.get("page"), 1)
if "filter" not in params:
params["filter"] = "text"
while True:
try:
data = self.request(url, params=params).json()
except ValueError:
return
if isinstance(data, dict):
if data.get("error"):
return
posts = data["0"]["posts"]
elif isinstance(data, list):
posts = data[0]["posts"]
else:
return
yield from posts
if len(posts) <= 3:
return
params["page"] += 1
class FoolfuukaGalleryExtractor(FoolfuukaExtractor):
"""Base extractor for FoolFuuka galleries"""
subcategory = "gallery"
directory_fmt = ("{category}", "{board}", "gallery")
pattern = BASE_PATTERN + r"/([^/?#]+)/gallery(?:/(\d+))?"
example = "https://archived.moe/a/gallery"
def __init__(self, match):
FoolfuukaExtractor.__init__(self, match)
board = match.group(match.lastindex)
if board.isdecimal():
self.board = match.group(match.lastindex-1)
self.pages = (board,)
else:
self.board = board
self.pages = map(format, itertools.count(1))
def metadata(self):
return {"board": self.board}
def posts(self):
base = "{}/_/api/chan/gallery/?board={}&page=".format(
self.root, self.board)
for page in self.pages:
with self.request(base + page) as response:
posts = response.json()
if not posts:
return
yield from posts