1
0
mirror of https://github.com/mikf/gallery-dl.git synced 2024-11-25 12:12:34 +01:00
gallery-dl/gallery_dl/extractor/cohost.py
Mike Fährmann d68bb78f44
use 'True if COND else False' for bool conversions
faster than 'bool(COND)', especially on older Pythons < 3.11
2024-10-14 20:57:47 +02:00

223 lines
7.4 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2024 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://cohost.org/"""
from .common import Extractor, Message
from .. import text, util
BASE_PATTERN = r"(?:https?://)?(?:www\.)?cohost\.org"
class CohostExtractor(Extractor):
"""Base class for cohost extractors"""
category = "cohost"
root = "https://cohost.org"
directory_fmt = ("{category}", "{postingProject[handle]}")
filename_fmt = ("{postId}_{headline:?/_/[b:200]}{num}.{extension}")
archive_fmt = "{postId}_{num}"
def _init(self):
self.replies = self.config("replies", True)
self.pinned = self.config("pinned", False)
self.shares = self.config("shares", False)
self.asks = self.config("asks", True)
def items(self):
for post in self.posts():
reason = post.get("limitedVisibilityReason")
if reason and reason != "none":
if reason == "log-in-first":
reason = ("This page's posts are visible only to users "
"who are logged in.")
self.log.warning('%s: "%s"', post["postId"], reason)
files = self._extract_files(post)
post["count"] = len(files)
post["date"] = text.parse_datetime(
post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
yield Message.Directory, post
for post["num"], file in enumerate(files, 1):
url = file["fileURL"]
post.update(file)
text.nameext_from_url(url, post)
yield Message.Url, url, post
def posts(self):
return ()
def _request_api(self, endpoint, input):
url = "{}/api/v1/trpc/{}".format(self.root, endpoint)
params = {"batch": "1", "input": util.json_dumps({"0": input})}
headers = {"content-type": "application/json"}
data = self.request(url, params=params, headers=headers).json()
return data[0]["result"]["data"]
def _extract_files(self, post):
files = []
self._extract_blocks(post, files)
if self.shares and post.get("shareTree"):
for share in post["shareTree"]:
self._extract_blocks(share, files, share)
del post["shareTree"]
return files
def _extract_blocks(self, post, files, shared=None):
post["content"] = content = []
for block in post.pop("blocks") or ():
try:
type = block["type"]
if type == "attachment":
file = block["attachment"].copy()
file["shared"] = shared
files.append(file)
elif type == "attachment-row":
for att in block["attachments"]:
file = att["attachment"].copy()
file["shared"] = shared
files.append(file)
elif type == "markdown":
content.append(block["markdown"]["content"])
elif type == "ask":
post["ask"] = block["ask"]
else:
self.log.debug("%s: Unsupported block type '%s'",
post["postId"], type)
except Exception as exc:
self.log.debug("%s: %s", exc.__class__.__name__, exc)
class CohostUserExtractor(CohostExtractor):
"""Extractor for media from a cohost user"""
subcategory = "user"
pattern = BASE_PATTERN + r"/([^/?#]+)/?(?:$|\?|#)"
example = "https://cohost.org/USER"
def posts(self):
empty = 0
params = {
"projectHandle": self.groups[0],
"page": 0,
"options": {
"pinnedPostsAtTop" : True if self.pinned else False,
"hideReplies" : not self.replies,
"hideShares" : not self.shares,
"hideAsks" : not self.asks,
"viewingOnProjectPage": True,
},
}
while True:
data = self._request_api("posts.profilePosts", params)
posts = data["posts"]
if posts:
empty = 0
yield from posts
else:
empty += 1
pagination = data["pagination"]
if not pagination.get("morePagesForward"):
return
if empty >= 3:
return self.log.debug("Empty API results")
params["page"] = pagination["nextPage"]
class CohostPostExtractor(CohostExtractor):
"""Extractor for media from a single cohost post"""
subcategory = "post"
pattern = BASE_PATTERN + r"/([^/?#]+)/post/(\d+)"
example = "https://cohost.org/USER/post/12345"
def posts(self):
endpoint = "posts.singlePost"
params = {
"handle": self.groups[0],
"postId": int(self.groups[1]),
}
data = self._request_api(endpoint, params)
post = data["post"]
try:
post["comments"] = data["comments"][self.groups[1]]
except LookupError:
post["comments"] = ()
return (post,)
class CohostTagExtractor(CohostExtractor):
"""Extractor for tagged posts"""
subcategory = "tag"
pattern = BASE_PATTERN + r"/([^/?#]+)/tagged/([^/?#]+)(?:\?([^#]+))?"
example = "https://cohost.org/USER/tagged/TAG"
def posts(self):
user, tag, query = self.groups
url = "{}/{}/tagged/{}".format(self.root, user, tag)
params = text.parse_query(query)
post_feed_key = ("tagged-post-feed" if user == "rc" else
"project-tagged-post-feed")
while True:
page = self.request(url, params=params).text
data = util.json_loads(text.extr(
page, 'id="__COHOST_LOADER_STATE__">', '</script>'))
try:
feed = data[post_feed_key]
except KeyError:
feed = data.popitem()[1]
yield from feed["posts"]
pagination = feed["paginationMode"]
if not pagination.get("morePagesForward"):
return
params["refTimestamp"] = pagination["refTimestamp"]
params["skipPosts"] = \
pagination["currentSkip"] + pagination["idealPageStride"]
class CohostLikesExtractor(CohostExtractor):
"""Extractor for liked posts"""
subcategory = "likes"
pattern = BASE_PATTERN + r"/rc/liked-posts"
example = "https://cohost.org/rc/liked-posts"
def posts(self):
url = "{}/rc/liked-posts".format(self.root)
params = {}
while True:
page = self.request(url, params=params).text
data = util.json_loads(text.extr(
page, 'id="__COHOST_LOADER_STATE__">', '</script>'))
try:
feed = data["liked-posts-feed"]
except KeyError:
feed = data.popitem()[1]
yield from feed["posts"]
pagination = feed["paginationMode"]
if not pagination.get("morePagesForward"):
return
params["refTimestamp"] = pagination["refTimestamp"]
params["skipPosts"] = \
pagination["currentSkip"] + pagination["idealPageStride"]