1
0
mirror of https://github.com/mikf/gallery-dl.git synced 2024-11-22 10:42:34 +01:00

[instagram] move API related code into separate classes

may contain bugs and is probably incomplete for the GraphQL variant
This commit is contained in:
Mike Fährmann 2022-09-26 21:07:18 +02:00
parent ac45ed2764
commit 6f77193a24
No known key found for this signature in database
GPG Key ID: 5680CA389D365A88

View File

@ -34,13 +34,18 @@ class InstagramExtractor(Extractor):
def __init__(self, match):
Extractor.__init__(self, match)
self.item = match.group(1)
self.api = None
self.www_claim = "0"
self.csrf_token = util.generate_token()
self._logged_in = True
self._find_tags = re.compile(r"#\w+").findall
self._cursor = None
def items(self):
self.login()
self.api = (InstagramRestAPI(self) if self._logged_in else
InstagramGraphqlAPI(self))
data = self.metadata()
videos = self.config("videos", True)
previews = self.config("previews", False)
@ -51,7 +56,7 @@ class InstagramExtractor(Extractor):
if "__typename" in post:
post = self._parse_post_graphql(post)
else:
post = self._parse_post_api(post)
post = self._parse_post_rest(post)
post.update(data)
files = post.pop("_files")
@ -107,59 +112,6 @@ class InstagramExtractor(Extractor):
return response
def _request_api(self, endpoint, **kwargs):
url = "https://i.instagram.com/api" + endpoint
kwargs["headers"] = {
"X-CSRFToken" : self.csrf_token,
"X-Instagram-AJAX": "1006242110",
"X-IG-App-ID" : "936619743392459",
"X-ASBD-ID" : "198387",
"X-IG-WWW-Claim" : self.www_claim,
"Origin" : self.root,
"Referer" : self.root + "/",
}
kwargs["cookies"] = {
"csrftoken": self.csrf_token,
}
return self.request(url, **kwargs).json()
def _request_graphql(self, query_hash, variables):
url = self.root + "/graphql/query/"
params = {
"query_hash": query_hash,
"variables" : json.dumps(variables),
}
headers = {
"X-CSRFToken" : self.csrf_token,
"X-Instagram-AJAX": "1006242110",
"X-IG-App-ID" : "936619743392459",
"X-ASBD-ID" : "198387",
"X-IG-WWW-Claim" : self.www_claim,
"X-Requested-With": "XMLHttpRequest",
"Referer" : self.root + "/",
}
cookies = {
"csrftoken": self.csrf_token,
}
return self.request(
url, params=params, headers=headers, cookies=cookies,
).json()["data"]
@memcache(keyarg=1)
def _user_by_screen_name(self, screen_name):
endpoint = "/v1/users/web_profile_info/"
params = {"username": screen_name}
return self._request_api(endpoint, params=params)["data"]["user"]
def _uid_by_screen_name(self, screen_name):
if screen_name.startswith("id:"):
return screen_name[3:]
return self._user_by_screen_name(screen_name)["id"]
def _media_by_id(self, post_id):
endpoint = "/v1/media/{}/info/".format(post_id)
return self._pagination_api(endpoint)
def login(self):
self._username = None
if not self._check_cookies(self.cookienames):
@ -167,6 +119,8 @@ class InstagramExtractor(Extractor):
if username:
self._username = username
self._update_cookies(_login_impl(self, username, password))
else:
self._logged_in = False
self.session.cookies.set(
"csrftoken", self.csrf_token, domain=self.cookiedomain)
@ -174,13 +128,10 @@ class InstagramExtractor(Extractor):
typename = post["__typename"]
if post.get("is_video") and "video_url" not in post:
media = next(self._media_by_id(post["id"]))
return self._parse_post_api(media)
if typename == "GraphSidecar" and \
post = self.api.media(post["id"])
elif typename == "GraphSidecar" and \
"edge_sidecar_to_children" not in post:
media = next(self._media_by_id(post["id"]))
return self._parse_post_api(media)
post = self.api.media(post["id"])
pinned = post.get("pinned_for_users", ())
if pinned:
@ -251,7 +202,7 @@ class InstagramExtractor(Extractor):
return data
def _parse_post_api(self, post):
def _parse_post_rest(self, post):
if "items" in post:
items = post["items"]
reel_id = str(post["id"]).rpartition(":")[2]
@ -378,51 +329,6 @@ class InstagramExtractor(Extractor):
"username" : user["username"],
"full_name": user["full_name"]})
def _pagination_graphql(self, query_hash, variables):
cursor = self.config("cursor")
if cursor:
variables["after"] = cursor
while True:
data = next(iter(self._request_graphql(
query_hash, variables)["user"].values()))
for edge in data["edges"]:
yield edge["node"]
info = data["page_info"]
if not info["has_next_page"]:
return
elif not data["edges"]:
s = "" if self.item.endswith("s") else "s"
raise exception.StopExtraction(
"%s'%s posts are private", self.item, s)
variables["after"] = self._cursor = info["end_cursor"]
self.log.debug("Cursor: %s", self._cursor)
def _pagination_api(self, endpoint, params=None):
if params is None:
params = {}
while True:
data = self._request_api(endpoint, params=params)
yield from data["items"]
if not data["more_available"]:
return
params["max_id"] = data["next_max_id"]
def _pagination_api_post(self, endpoint, params, post=False):
while True:
data = self._request_api(endpoint, method="POST", data=params)
for item in data["items"]:
yield item["media"]
info = data["paging_info"]
if not info["more_available"]:
return
params["max_id"] = info["max_id"]
class InstagramUserExtractor(InstagramExtractor):
"""Extractor for an Instagram user profile"""
@ -457,10 +363,8 @@ class InstagramPostsExtractor(InstagramExtractor):
})
def posts(self):
endpoint = "/v1/feed/user/{}/".format(
self._uid_by_screen_name(self.item))
params = {"count": 30}
return self._pagination_api(endpoint, params)
uid = self.api.user_id(self.item)
return self.api.user_feed(uid)
class InstagramTaggedExtractor(InstagramExtractor):
@ -482,7 +386,7 @@ class InstagramTaggedExtractor(InstagramExtractor):
self.user_id = self.item[3:]
return {"tagged_owner_id": self.user_id}
user = self._user_by_screen_name(self.item)
user = self.api.user(self.item)
self.user_id = user["id"]
return {
@ -492,9 +396,7 @@ class InstagramTaggedExtractor(InstagramExtractor):
}
def posts(self):
endpoint = "/v1/usertags/{}/feed/".format(self.user_id)
params = {"count": 50}
return self._pagination_api(endpoint, params)
return self.api.user_tagged(self.user_id)
class InstagramChannelExtractor(InstagramExtractor):
@ -507,9 +409,8 @@ class InstagramChannelExtractor(InstagramExtractor):
})
def posts(self):
query_hash = "bc78b344a68ed16dd5d7f264681c4c76"
variables = {"id": self._uid_by_screen_name(self.item), "first": 50}
return self._pagination_graphql(query_hash, variables)
uid = self.api.user_id(self.item)
return self.api.user_clips(uid)
class InstagramSavedExtractor(InstagramExtractor):
@ -522,9 +423,7 @@ class InstagramSavedExtractor(InstagramExtractor):
)
def posts(self):
endpoint = "/v1/feed/saved/posts/"
for item in self._pagination_api(endpoint):
yield item["media"]
return self.api.user_saved()
class InstagramCollectionExtractor(InstagramExtractor):
@ -546,9 +445,7 @@ class InstagramCollectionExtractor(InstagramExtractor):
}
def posts(self):
endpoint = "/v1/feed/collection/{}/posts/".format(self.collection_id)
for item in self._pagination_api(endpoint):
yield item["media"]
return self.api.user_collection(self.collection_id)
class InstagramTagExtractor(InstagramExtractor):
@ -565,27 +462,7 @@ class InstagramTagExtractor(InstagramExtractor):
return {"tag": text.unquote(self.item)}
def posts(self):
endpoint = "/v1/tags/{}/sections/".format(self.item)
data = {
"include_persistent": "0",
"max_id" : None,
"page" : None,
"surface": "grid",
"tab" : "recent",
}
while True:
info = self._request_api(endpoint, method="POST", data=data)
for section in info["sections"]:
for media in section["layout_content"]["medias"]:
yield media["media"]
if not info.get("more_available"):
return
data["max_id"] = info["next_max_id"]
data["page"] = info["next_page"]
return self.api.tags_media(self.item)
class InstagramPostExtractor(InstagramExtractor):
@ -698,7 +575,7 @@ class InstagramPostExtractor(InstagramExtractor):
)
def posts(self):
return self._media_by_id(id_from_shortcode(self.item))
return self.api.media(id_from_shortcode(self.item))
class InstagramStoriesExtractor(InstagramExtractor):
@ -722,14 +599,12 @@ class InstagramStoriesExtractor(InstagramExtractor):
if self.highlight_id:
reel_id = "highlight:" + self.highlight_id
else:
reel_id = self._uid_by_screen_name(self.user)
reel_id = self.api.user_id(self.user)
endpoint = "/v1/feed/reels_media/"
params = {"reel_ids": reel_id}
reels = self._request_api(endpoint, params=params)["reels"]
reels = self.api.reels_media(reel_id)
if self.media_id:
reel = reels[reel_id]
if self.media_id and reels:
reel = reels[0]
for item in reel["items"]:
if item["pk"] == self.media_id:
reel["items"] = (item,)
@ -737,7 +612,7 @@ class InstagramStoriesExtractor(InstagramExtractor):
else:
raise exception.NotFoundError("story")
return reels.values()
return reels
class InstagramHighlightsExtractor(InstagramExtractor):
@ -747,22 +622,8 @@ class InstagramHighlightsExtractor(InstagramExtractor):
test = ("https://www.instagram.com/instagram/highlights",)
def posts(self):
endpoint = "/v1/highlights/{}/highlights_tray/".format(
self._uid_by_screen_name(self.item))
tray = self._request_api(endpoint)["tray"]
reel_ids = [highlight["id"] for highlight in tray]
# Anything above 30 responds with statuscode 400.
# 30 can work, however, sometimes the API will respond with 560 or 500.
chunk_size = 5
endpoint = "/v1/feed/reels_media/"
for offset in range(0, len(reel_ids), chunk_size):
chunk_ids = reel_ids[offset : offset+chunk_size]
params = {"reel_ids": chunk_ids}
reels = self._request_api(endpoint, params=params)["reels"]
for reel_id in chunk_ids:
yield reels[reel_id]
uid = self.api.user_id(self.item)
return self.api.highlights_media(uid)
class InstagramReelsExtractor(InstagramExtractor):
@ -775,13 +636,247 @@ class InstagramReelsExtractor(InstagramExtractor):
})
def posts(self):
endpoint = "/v1/clips/user/"
data = {
"target_user_id": self._uid_by_screen_name(self.item),
"page_size" : "50",
}
uid = self.api.user_id(self.item)
return self.api.user_clips(uid)
return self._pagination_api_post(endpoint, data)
class InstagramRestAPI():
def __init__(self, extractor):
self.extractor = extractor
def highlights_media(self, user_id):
chunk_size = 5
reel_ids = [hl["id"] for hl in self.highlights_tray(user_id)]
for offset in range(0, len(reel_ids), chunk_size):
yield from self.reels_media(
reel_ids[offset : offset+chunk_size])
def highlights_tray(self, user_id):
endpoint = "/v1/highlights/{}/highlights_tray/".format(user_id)
return self._call(endpoint)["tray"]
def media(self, post_id):
endpoint = "/v1/media/{}/info/".format(post_id)
return self._pagination(endpoint)
def reels_media(self, reel_ids):
endpoint = "/v1/feed/reels_media/"
params = {"reel_ids": reel_ids}
return self._call(endpoint, params=params)["reels_media"]
def tags_media(self, tag):
for section in self.tags_sections(tag):
for media in section["layout_content"]["medias"]:
yield media["media"]
def tags_sections(self, tag):
endpoint = "/v1/tags/{}/sections/".format(tag)
data = {
"include_persistent": "0",
"max_id" : None,
"page" : None,
"surface": "grid",
"tab" : "recent",
}
return self._pagination_sections(endpoint, data)
@memcache(keyarg=1)
def user(self, screen_name):
endpoint = "/v1/users/web_profile_info/"
params = {"username": screen_name}
return self._call(endpoint, params=params)["data"]["user"]
def user_id(self, screen_name):
if screen_name.startswith("id:"):
return screen_name[3:]
return self.user(screen_name)["id"]
def user_clips(self, user_id):
endpoint = "/v1/clips/user/"
data = {"target_user_id": user_id, "page_size": "50"}
return self._pagination_post(endpoint, data)
def user_collection(self, collection_id):
endpoint = "/v1/feed/collection/{}/posts/".format(collection_id)
params = {"count": 50}
return self._pagination(endpoint, params, media=True)
def user_feed(self, user_id):
endpoint = "/v1/feed/user/{}/".format(user_id)
params = {"count": 30}
return self._pagination(endpoint, params)
def user_saved(self):
endpoint = "/v1/feed/saved/posts/"
params = {"count": 50}
return self._pagination(endpoint, params, media=True)
def user_tagged(self, user_id):
endpoint = "/v1/usertags/{}/feed/".format(user_id)
params = {"count": 50}
return self._pagination(endpoint, params)
def _call(self, endpoint, **kwargs):
extr = self.extractor
url = "https://i.instagram.com/api" + endpoint
kwargs["headers"] = {
"X-CSRFToken" : extr.csrf_token,
"X-Instagram-AJAX": "1006242110",
"X-IG-App-ID" : "936619743392459",
"X-ASBD-ID" : "198387",
"X-IG-WWW-Claim" : extr.www_claim,
"Origin" : extr.root,
"Referer" : extr.root + "/",
}
kwargs["cookies"] = {
"csrftoken": extr.csrf_token,
}
return extr.request(url, **kwargs).json()
def _pagination(self, endpoint, params=None, media=False):
if params is None:
params = {}
while True:
data = self._call(endpoint, params=params)
if media:
for item in data["items"]:
yield item["media"]
else:
yield from data["items"]
if not data["more_available"]:
return
params["max_id"] = data["next_max_id"]
def _pagination_post(self, endpoint, params):
while True:
data = self._call(endpoint, method="POST", data=params)
for item in data["items"]:
yield item["media"]
info = data["paging_info"]
if not info.get("more_available"):
return
params["max_id"] = info["max_id"]
def _pagination_sections(self, endpoint, params):
while True:
info = self._call(endpoint, method="POST", data=params)
yield from info["sections"]
if not info.get("more_available"):
return
params["max_id"] = info["next_max_id"]
params["page"] = info["next_page"]
class InstagramGraphqlAPI():
def __init__(self, extractor):
self.extractor = extractor
self.user = InstagramRestAPI(extractor).user
self.user_collection = self.user_saved = self.reels_media = \
self.highlights_media = self._login_required
self._json_dumps = json.JSONEncoder(separators=(",", ":")).encode
@staticmethod
def _login_required(_=None):
raise exception.AuthorizationError("Login required")
def highlights_tray(self, user_id):
query_hash = "d4d88dc1500312af6f937f7b804c68c3"
variables = {
"user_id": user_id,
"include_chaining": False,
"include_reel": False,
"include_suggested_users": False,
"include_logged_out_extras": True,
"include_highlight_reels": True,
"include_live_status": False,
}
edges = (self._call(query_hash, variables)["user"]
["edge_highlight_reels"]["edges"])
return [edge["node"] for edge in edges]
def media(self, post_id):
query_hash = "9f8827793ef34641b2fb195d4d41151c"
variables = {
"shortcode": shortcode_from_id(post_id),
"child_comment_count": 3,
"fetch_comment_count": 40,
"parent_comment_count": 24,
"has_threaded_comments": True,
}
return (self._call(query_hash, variables)["shortcode_media"],)
def user_id(self, screen_name):
if screen_name.startswith("id:"):
return screen_name[3:]
return self.user(screen_name)["id"]
def user_clips(self, user_id):
query_hash = "bc78b344a68ed16dd5d7f264681c4c76"
variables = {"id": user_id, "first": 50}
return self._pagination(query_hash, variables)
def user_feed(self, user_id):
query_hash = "69cba40317214236af40e7efa697781d"
variables = {"id": user_id, "first": 50}
return self._pagination(query_hash, variables)
def _call(self, query_hash, variables):
extr = self.extractor
url = "https://www.instagram.com/graphql/query/"
params = {
"query_hash": query_hash,
"variables" : self._json_dumps(variables),
}
headers = {
"Accept" : "*/*",
"X-CSRFToken" : extr.csrf_token,
"X-Instagram-AJAX": "1006267176",
"X-IG-App-ID" : "936619743392459",
"X-ASBD-ID" : "198387",
"X-IG-WWW-Claim" : extr.www_claim,
"X-Requested-With": "XMLHttpRequest",
"Referer" : extr.root + "/",
}
cookies = {
"csrftoken": extr.csrf_token,
}
return extr.request(
url, params=params, headers=headers, cookies=cookies,
).json()["data"]
def _pagination(self, query_hash, variables):
cursor = self.extractor.config("cursor")
if cursor:
variables["after"] = cursor
while True:
data = next(iter(self._call(
query_hash, variables)["user"].values()))
for edge in data["edges"]:
yield edge["node"]
info = data["page_info"]
if not info["has_next_page"]:
return
elif not data["edges"]:
s = "" if self.item.endswith("s") else "s"
raise exception.StopExtraction(
"%s'%s posts are private", self.item, s)
variables["after"] = self._cursor = info["end_cursor"]
self.log.debug("Cursor: %s", self._cursor)
@cache(maxage=360*24*3600, keyarg=1)