1
0
mirror of https://github.com/mikf/gallery-dl.git synced 2024-11-24 19:52:32 +01:00

[civitai] add 'post' extractors (#6279)

- https://civitai.com/posts/12345
- https://civitai.com/user/USER/posts
This commit is contained in:
Mike Fährmann 2024-10-06 17:48:48 +02:00
parent b12d65ade2
commit 9757eacce1
No known key found for this signature in database
GPG Key ID: 5680CA389D365A88
3 changed files with 127 additions and 33 deletions

View File

@ -166,7 +166,7 @@ Consider all listed sites to potentially be NSFW.
<tr> <tr>
<td>Civitai</td> <td>Civitai</td>
<td>https://www.civitai.com/</td> <td>https://www.civitai.com/</td>
<td>individual Images, Models, Search Results, Tag Searches (Images), Tag Searches (Models), User Profiles, User Images, User Models</td> <td>individual Images, Models, Posts, Search Results, Tag Searches (Images), Tag Searches (Models), User Profiles, User Images, User Models, User Posts</td>
<td></td> <td></td>
</tr> </tr>
<tr> <tr>

View File

@ -22,7 +22,7 @@ class CivitaiExtractor(Extractor):
category = "civitai" category = "civitai"
root = "https://civitai.com" root = "https://civitai.com"
directory_fmt = ("{category}", "{username|user[username]}", "images") directory_fmt = ("{category}", "{username|user[username]}", "images")
filename_fmt = "{id}.{extension}" filename_fmt = "{file[id]|id|filename}.{extension}"
archive_fmt = "{hash}" archive_fmt = "{hash}"
request_interval = (0.5, 1.5) request_interval = (0.5, 1.5)
@ -53,6 +53,30 @@ class CivitaiExtractor(Extractor):
yield Message.Queue, url, data yield Message.Queue, url, data
return return
posts = self.posts()
if posts:
for post in posts:
if "images" in post:
images = post["images"]
else:
images = self.api.images_post(post["id"])
post = self.api.post(post["id"])
post["date"] = text.parse_datetime(
post["publishedAt"], "%Y-%m-%dT%H:%M:%S.%fZ")
data = {
"post": post,
"user": post["user"],
}
del post["user"]
yield Message.Directory, data
for file in self._image_results(images):
file.update(data)
yield Message.Url, file["url"], file
return
images = self.images() images = self.images()
if images: if images:
for image in images: for image in images:
@ -68,6 +92,9 @@ class CivitaiExtractor(Extractor):
def models(self): def models(self):
return () return ()
def posts(self):
return ()
def images(self): def images(self):
return () return ()
@ -87,6 +114,19 @@ class CivitaiExtractor(Extractor):
url, self._image_quality, name) url, self._image_quality, name)
) )
def _image_results(self, images):
for num, file in enumerate(images, 1):
data = text.nameext_from_url(file["url"], {
"num" : num,
"file": file,
"url" : self._url(file),
})
if not data["extension"]:
data["extension"] = self._image_ext
if "id" not in file and data["filename"].isdecimal():
file["id"] = text.parse_int(data["filename"])
yield data
class CivitaiModelExtractor(CivitaiExtractor): class CivitaiModelExtractor(CivitaiExtractor):
subcategory = "model" subcategory = "model"
@ -189,17 +229,6 @@ class CivitaiModelExtractor(CivitaiExtractor):
images = self.api.images_gallery(model, version, user) images = self.api.images_gallery(model, version, user)
return self._image_results(images) return self._image_results(images)
def _image_results(self, images):
for num, file in enumerate(images, 1):
data = text.nameext_from_url(file["url"], {
"num" : num,
"file": file,
"url" : self._url(file),
})
if "id" not in file and data["filename"].isdecimal():
file["id"] = text.parse_int(data["filename"])
yield data
def _validate_file_model(self, response): def _validate_file_model(self, response):
if response.headers.get("Content-Type", "").startswith("text/html"): if response.headers.get("Content-Type", "").startswith("text/html"):
alert = text.extr( alert = text.extr(
@ -223,6 +252,17 @@ class CivitaiImageExtractor(CivitaiExtractor):
return self.api.image(self.groups[0]) return self.api.image(self.groups[0])
class CivitaiPostExtractor(CivitaiExtractor):
subcategory = "post"
directory_fmt = ("{category}", "{username|user[username]}", "posts",
"{post[id]}{post[title]:? //}")
pattern = BASE_PATTERN + r"/posts/(\d+)"
example = "https://civitai.com/posts/12345"
def posts(self):
return ({"id": int(self.groups[0])},)
class CivitaiTagModelsExtractor(CivitaiExtractor): class CivitaiTagModelsExtractor(CivitaiExtractor):
subcategory = "tag-models" subcategory = "tag-models"
pattern = BASE_PATTERN + r"/(?:tag/|models\?tag=)([^/?&#]+)" pattern = BASE_PATTERN + r"/(?:tag/|models\?tag=)([^/?&#]+)"
@ -265,8 +305,9 @@ class CivitaiUserExtractor(CivitaiExtractor):
base = "{}/user/{}/".format(self.root, self.groups[0]) base = "{}/user/{}/".format(self.root, self.groups[0])
return self._dispatch_extractors(( return self._dispatch_extractors((
(CivitaiUserModelsExtractor, base + "models"), (CivitaiUserModelsExtractor, base + "models"),
(CivitaiUserPostsExtractor , base + "posts"),
(CivitaiUserImagesExtractor, base + "images"), (CivitaiUserImagesExtractor, base + "images"),
), ("user-models", "user-images")) ), ("user-models", "user-posts"))
class CivitaiUserModelsExtractor(CivitaiExtractor): class CivitaiUserModelsExtractor(CivitaiExtractor):
@ -280,6 +321,19 @@ class CivitaiUserModelsExtractor(CivitaiExtractor):
return self.api.models(params) return self.api.models(params)
class CivitaiUserPostsExtractor(CivitaiExtractor):
subcategory = "user-posts"
directory_fmt = ("{category}", "{username|user[username]}", "posts",
"{post[id]}{post[title]:? //}")
pattern = USER_PATTERN + r"/posts/?(?:\?([^#]+))?"
example = "https://civitai.com/user/USER/posts"
def posts(self):
params = text.parse_query(self.groups[1])
params["username"] = text.unquote(self.groups[0])
return self.api.posts(params)
class CivitaiUserImagesExtractor(CivitaiExtractor): class CivitaiUserImagesExtractor(CivitaiExtractor):
subcategory = "user-images" subcategory = "user-images"
pattern = USER_PATTERN + r"/images/?(?:\?([^#]+))?" pattern = USER_PATTERN + r"/images/?(?:\?([^#]+))?"
@ -372,7 +426,7 @@ class CivitaiTrpcAPI():
self.root = extractor.root + "/api/trpc/" self.root = extractor.root + "/api/trpc/"
self.headers = { self.headers = {
"content-type" : "application/json", "content-type" : "application/json",
"x-client-version": "5.0.94", "x-client-version": "5.0.146",
"x-client-date" : "", "x-client-date" : "",
"x-client" : "web", "x-client" : "web",
"x-fingerprint" : "undefined", "x-fingerprint" : "undefined",
@ -398,7 +452,7 @@ class CivitaiTrpcAPI():
endpoint = "image.getInfinite" endpoint = "image.getInfinite"
if defaults: if defaults:
params_ = { params = self._merge_params(params, {
"useIndex" : True, "useIndex" : True,
"period" : "AllTime", "period" : "AllTime",
"sort" : "Newest", "sort" : "Newest",
@ -407,12 +461,9 @@ class CivitaiTrpcAPI():
"fromPlatform" : False, # Made On-Site "fromPlatform" : False, # Made On-Site
"browsingLevel": self.nsfw, "browsingLevel": self.nsfw,
"include" : ["cosmetics"], "include" : ["cosmetics"],
} })
params_.update(params)
else:
params_ = params
return self._pagination(endpoint, params_) return self._pagination(endpoint, params)
def images_gallery(self, model, version, user): def images_gallery(self, model, version, user):
endpoint = "image.getImagesAsPostsInfinite" endpoint = "image.getImagesAsPostsInfinite"
@ -429,6 +480,13 @@ class CivitaiTrpcAPI():
for post in self._pagination(endpoint, params): for post in self._pagination(endpoint, params):
yield from post["images"] yield from post["images"]
def images_post(self, post_id):
params = {
"postId" : int(post_id),
"pending": True,
}
return self.images(params)
def model(self, model_id): def model(self, model_id):
endpoint = "model.getById" endpoint = "model.getById"
params = {"id": int(model_id)} params = {"id": int(model_id)}
@ -443,7 +501,7 @@ class CivitaiTrpcAPI():
endpoint = "model.getAll" endpoint = "model.getAll"
if defaults: if defaults:
params_ = { params = self._merge_params(params, {
"period" : "AllTime", "period" : "AllTime",
"periodMode" : "published", "periodMode" : "published",
"sort" : "Newest", "sort" : "Newest",
@ -454,36 +512,71 @@ class CivitaiTrpcAPI():
"fromPlatform" : False, "fromPlatform" : False,
"supportsGeneration": False, "supportsGeneration": False,
"browsingLevel": self.nsfw, "browsingLevel": self.nsfw,
} })
params_.update(params)
else:
params_ = params
return self._pagination(endpoint, params_) return self._pagination(endpoint, params)
def post(self, post_id):
endpoint = "post.get"
params = {"id": int(post_id)}
return self._call(endpoint, params)
def posts(self, params, defaults=True):
endpoint = "post.getInfinite"
meta = {"cursor": ("Date",)}
if defaults:
params = self._merge_params(params, {
"browsingLevel": self.nsfw,
"period" : "AllTime",
"periodMode" : "published",
"sort" : "Newest",
"followed" : False,
"draftOnly" : False,
"pending" : True,
"include" : ["cosmetics"],
})
return self._pagination(endpoint, params, meta)
def user(self, username): def user(self, username):
endpoint = "user.getCreator" endpoint = "user.getCreator"
params = {"username": username} params = {"username": username}
return (self._call(endpoint, params),) return (self._call(endpoint, params),)
def _call(self, endpoint, params): def _call(self, endpoint, params, meta=None):
url = self.root + endpoint url = self.root + endpoint
headers = self.headers headers = self.headers
params = {"input": util.json_dumps({"json": params})}
if meta:
input = {"json": params, "meta": {"values": meta}}
else:
input = {"json": params}
params = {"input": util.json_dumps(input)}
headers["x-client-date"] = str(int(time.time() * 1000)) headers["x-client-date"] = str(int(time.time() * 1000))
response = self.extractor.request(url, headers=headers, params=params) response = self.extractor.request(url, params=params, headers=headers)
return response.json()["result"]["data"]["json"] return response.json()["result"]["data"]["json"]
def _pagination(self, endpoint, params): def _pagination(self, endpoint, params, meta=None):
if "cursor" not in params:
params["cursor"] = None
meta_ = {"cursor": ("undefined",)}
while True: while True:
data = self._call(endpoint, params) data = self._call(endpoint, params, meta_)
yield from data["items"] yield from data["items"]
try: try:
if not data["nextCursor"]: if not data["nextCursor"]:
return return
params["cursor"] = data["nextCursor"]
except KeyError: except KeyError:
return return
params["cursor"] = data["nextCursor"]
meta_ = meta
def _merge_params(self, params_user, params_default):
params_default.update(params_user)
return params_default

View File

@ -202,6 +202,7 @@ SUBCATEGORY_MAP = {
"tag-images": "Tag Searches (Images)", "tag-images": "Tag Searches (Images)",
"user-models": "User Models", "user-models": "User Models",
"user-images": "User Images", "user-images": "User Images",
"user-posts": "User Posts",
}, },
"coomerparty": { "coomerparty": {
"discord" : "", "discord" : "",