1
0
mirror of https://github.com/mikf/gallery-dl.git synced 2024-11-24 03:32:33 +01:00
gallery-dl/gallery_dl/extractor/fantia.py

187 lines
6.4 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://fantia.jp/"""
from .common import Extractor, Message
from .. import text, util
class FantiaExtractor(Extractor):
"""Base class for Fantia extractors"""
category = "fantia"
root = "https://fantia.jp"
directory_fmt = ("{category}", "{fanclub_id}")
filename_fmt = "{post_id}_{file_id}.{extension}"
archive_fmt = "{post_id}_{file_id}"
_warning = True
def items(self):
self.headers = {
"Accept" : "application/json, text/plain, */*",
"Referer": self.root,
}
if self._warning:
if not self._check_cookies(("_session_id",)):
self.log.warning("no '_session_id' cookie set")
FantiaExtractor._warning = False
for post_id in self.posts():
full_response, post = self._get_post_data(post_id)
yield Message.Directory, post
post["num"] = 0
for url, url_data in self._get_urls_from_post(full_response, post):
post["num"] += 1
fname = url_data["content_filename"] or url
text.nameext_from_url(fname, url_data)
url_data["file_url"] = url
yield Message.Url, url, url_data
def posts(self):
"""Return post IDs"""
def _pagination(self, url):
params = {"page": 1}
headers = self.headers
while True:
page = self.request(url, params=params, headers=headers).text
self._csrf_token(page)
post_id = None
for post_id in text.extract_iter(
page, 'class="link-block" href="/posts/', '"'):
yield post_id
if not post_id:
return
params["page"] += 1
def _csrf_token(self, page=None):
if not page:
page = self.request(self.root + "/").text
self.headers["X-CSRF-Token"] = text.extr(
page, 'name="csrf-token" content="', '"')
def _get_post_data(self, post_id):
"""Fetch and process post data"""
url = self.root+"/api/v1/posts/"+post_id
resp = self.request(url, headers=self.headers).json()["post"]
post = {
"post_id": resp["id"],
"post_url": self.root + "/posts/" + str(resp["id"]),
"post_title": resp["title"],
"comment": resp["comment"],
"rating": resp["rating"],
"posted_at": resp["posted_at"],
2021-09-17 20:09:24 +02:00
"date": text.parse_datetime(
resp["posted_at"], "%a, %d %b %Y %H:%M:%S %z"),
"fanclub_id": resp["fanclub"]["id"],
"fanclub_user_id": resp["fanclub"]["user"]["id"],
"fanclub_user_name": resp["fanclub"]["user"]["name"],
"fanclub_name": resp["fanclub"]["name"],
"fanclub_url": self.root+"/fanclubs/"+str(resp["fanclub"]["id"]),
"tags": resp["tags"]
}
return resp, post
def _get_urls_from_post(self, resp, post):
"""Extract individual URL data from the response"""
if "thumb" in resp and resp["thumb"] and "original" in resp["thumb"]:
post["content_filename"] = ""
post["content_category"] = "thumb"
post["file_id"] = "thumb"
yield resp["thumb"]["original"], post
for content in resp["post_contents"]:
post["content_category"] = content["category"]
post["content_title"] = content["title"]
post["content_filename"] = content.get("filename", "")
post["content_id"] = content["id"]
if "comment" in content:
post["content_comment"] = content["comment"]
if "post_content_photos" in content:
for photo in content["post_content_photos"]:
post["file_id"] = photo["id"]
yield photo["url"]["original"], post
if "download_uri" in content:
post["file_id"] = content["id"]
yield self.root+"/"+content["download_uri"], post
if content["category"] == "blog" and "comment" in content:
comment_json = util.json_loads(content["comment"])
ops = comment_json.get("ops", ())
# collect blogpost text first
blog_text = ""
for op in ops:
insert = op.get("insert")
if isinstance(insert, str):
blog_text += insert
post["blogpost_text"] = blog_text
# collect images
for op in ops:
insert = op.get("insert")
if isinstance(insert, dict) and "fantiaImage" in insert:
img = insert["fantiaImage"]
post["file_id"] = img["id"]
yield "https://fantia.jp" + img["original_url"], post
class FantiaCreatorExtractor(FantiaExtractor):
"""Extractor for a Fantia creator's works"""
subcategory = "creator"
pattern = r"(?:https?://)?(?:www\.)?fantia\.jp/fanclubs/(\d+)"
test = (
("https://fantia.jp/fanclubs/6939", {
"range": "1-25",
"count": ">= 25",
"keyword": {
"fanclub_user_id" : 52152,
"tags" : list,
"title" : str,
},
}),
)
def __init__(self, match):
FantiaExtractor.__init__(self, match)
self.creator_id = match.group(1)
def posts(self):
url = "{}/fanclubs/{}/posts".format(self.root, self.creator_id)
return self._pagination(url)
class FantiaPostExtractor(FantiaExtractor):
"""Extractor for media from a single Fantia post"""
subcategory = "post"
pattern = r"(?:https?://)?(?:www\.)?fantia\.jp/posts/(\d+)"
test = (
("https://fantia.jp/posts/508363", {
"count": 6,
"keyword": {
"post_title": "zunda逆バニーでおしりコッショリ",
"tags": list,
"rating": "adult",
"post_id": 508363
},
}),
)
def __init__(self, match):
FantiaExtractor.__init__(self, match)
self.post_id = match.group(1)
def posts(self):
self._csrf_token()
return (self.post_id,)