1
0
mirror of https://github.com/mikf/gallery-dl.git synced 2024-11-23 19:22:32 +01:00
gallery-dl/gallery_dl/extractor/weibo.py

201 lines
7.2 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2019-2022 Mike Fährmann
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation.
"""Extractors for https://www.weibo.com/"""
from .common import Extractor, Message
from .. import text, exception
import itertools
import json
class WeiboExtractor(Extractor):
category = "weibo"
directory_fmt = ("{category}", "{user[screen_name]}")
filename_fmt = "{status[id]}_{num:>02}.{extension}"
archive_fmt = "{status[id]}_{num}"
root = "https://m.weibo.cn"
request_interval = (1.0, 2.0)
def __init__(self, match):
Extractor.__init__(self, match)
self.retweets = self.config("retweets", True)
self.videos = self.config("videos", True)
def items(self):
original_retweets = (self.retweets == "original")
for status in self.statuses():
if self.retweets and "retweeted_status" in status:
if original_retweets:
status = status["retweeted_status"]
files = self._files_from_status(status)
else:
files = itertools.chain(
self._files_from_status(status),
self._files_from_status(status["retweeted_status"]),
)
else:
files = self._files_from_status(status)
for num, file in enumerate(files, 1):
if num == 1:
status["date"] = text.parse_datetime(
status["created_at"], "%a %b %d %H:%M:%S %z %Y")
yield Message.Directory, status
file["status"] = status
file["num"] = num
yield Message.Url, file["url"], file
def statuses(self):
"""Returns an iterable containing all relevant 'status' objects"""
def _status_by_id(self, status_id):
url = "{}/detail/{}".format(self.root, status_id)
page = self.request(url, fatal=False).text
data = text.extract(page, "var $render_data = [", "][0] || {};")[0]
return json.loads(data)["status"] if data else None
def _files_from_status(self, status):
page_info = status.pop("page_info", ())
if "pics" in status:
if len(status["pics"]) < status["pic_num"]:
status = self._status_by_id(status["id"]) or status
for image in status.pop("pics"):
pid = image["pid"]
if "large" in image:
image = image["large"]
geo = image.get("geo") or {}
yield text.nameext_from_url(image["url"], {
"url" : image["url"],
"pid" : pid,
"width" : text.parse_int(geo.get("width")),
"height": text.parse_int(geo.get("height")),
})
if self.videos and "media_info" in page_info:
info = page_info["media_info"]
url = info.get("stream_url_hd") or info.get("stream_url")
if url:
data = text.nameext_from_url(url, {
"url" : url,
"pid" : 0,
"width" : 0,
"height": 0,
})
if data["extension"] == "m3u8":
data["extension"] = "mp4"
data["url"] = "ytdl:" + url
data["_ytdl_extra"] = {"protocol": "m3u8_native"}
yield data
class WeiboUserExtractor(WeiboExtractor):
"""Extractor for all images of a user on weibo.cn"""
subcategory = "user"
pattern = (r"(?:https?://)?(?:www\.|m\.)?weibo\.c(?:om|n)"
r"/(?:u|p(?:rofile)?)/(\d+)")
test = (
("https://m.weibo.cn/u/2314621010", {
"range": "1-30",
}),
# deleted (#2521)
("https://weibo.com/u/7500315942", {
"count": 0,
}),
("https://m.weibo.cn/profile/2314621010"),
("https://m.weibo.cn/p/2304132314621010_-_WEIBO_SECOND_PROFILE_WEIBO"),
("https://www.weibo.com/p/1003062314621010/home"),
)
def __init__(self, match):
WeiboExtractor.__init__(self, match)
self.user_id = match.group(1)[-10:]
def statuses(self):
url = self.root + "/api/container/getIndex"
headers = {
"Accept": "application/json, text/plain, */*",
"X-Requested-With": "XMLHttpRequest",
"MWeibo-Pwa": "1",
"X-XSRF-TOKEN": None,
"Referer": "{}/u/{}".format(self.root, self.user_id),
}
params = {
"type": "uid",
"value": self.user_id,
"containerid": "107603" + self.user_id,
}
while True:
response = self.request(url, params=params, headers=headers)
headers["X-XSRF-TOKEN"] = response.cookies.get("XSRF-TOKEN")
data = response.json()
if not data.get("ok"):
self.log.debug(response.content)
if "since_id" not in params: # first iteration
raise exception.StopExtraction(
'"%s"', data.get("msg") or "unknown error")
data = data["data"]
for card in data["cards"]:
if "mblog" in card:
yield card["mblog"]
info = data.get("cardlistInfo")
if not info:
# occasionally weibo returns an empty response
# repeating the same request usually/eventually yields
# the correct response.
continue
params["since_id"] = sid = info.get("since_id")
if not sid:
return
class WeiboStatusExtractor(WeiboExtractor):
"""Extractor for images from a status on weibo.cn"""
subcategory = "status"
pattern = (r"(?:https?://)?(?:www\.|m\.)?weibo\.c(?:om|n)"
r"/(?:detail|status|\d+)/(\w+)")
test = (
("https://m.weibo.cn/detail/4323047042991618", {
"pattern": r"https?://wx\d+.sinaimg.cn/large/\w+.jpg",
"keyword": {"status": {"date": "dt:2018-12-30 13:56:36"}},
}),
("https://m.weibo.cn/detail/4339748116375525", {
"pattern": r"https?://f.us.sinaimg.cn/\w+\.mp4\?label=mp4_hd",
}),
# unavailable video (#427)
("https://m.weibo.cn/status/4268682979207023", {
"exception": exception.NotFoundError,
}),
# non-numeric status ID (#664)
("https://weibo.com/3314883543/Iy7fj4qVg"),
# original retweets (#1542)
("https://m.weibo.cn/detail/4600272267522211", {
"options": (("retweets", "original"),),
"keyword": {"status": {"id": "4600167083287033"}},
}),
("https://m.weibo.cn/status/4339748116375525"),
("https://m.weibo.cn/5746766133/4339748116375525"),
)
def __init__(self, match):
WeiboExtractor.__init__(self, match)
self.status_id = match.group(1)
def statuses(self):
status = self._status_by_id(self.status_id)
if not status:
raise exception.NotFoundError("status")
return (status,)