1
0
mirror of https://github.com/instaloader/instaloader.git synced 2024-08-16 19:59:40 +02:00

Adjust structure to handle new threaded comments

This is needed because of a structure change by Instagram.
Fixes #272.
This commit is contained in:
André Koch-Kramer 2019-03-31 11:02:20 +02:00
parent 9421ad5fda
commit 15d9cd8949
4 changed files with 108 additions and 34 deletions

View File

@ -122,6 +122,11 @@ Additionally, the following trivial structures are defined:
.. autoclass:: PostComment
:no-show-inheritance:
:inherited-members:
:exclude-members: count, index
.. autoclass:: PostCommentAnswer
:no-show-inheritance:
.. autoclass:: PostLocation
:no-show-inheritance:

View File

@ -15,5 +15,5 @@ else:
from .exceptions import *
from .instaloader import Instaloader
from .instaloadercontext import InstaloaderContext
from .structures import (Highlight, Post, PostSidecarNode, PostComment, PostLocation, Profile, Story, StoryItem,
from .structures import (Highlight, Post, PostSidecarNode, PostComment, PostCommentAnswer, PostLocation, Profile, Story, StoryItem,
load_structure_from_file, save_structure_to_file)

View File

@ -240,11 +240,33 @@ class Instaloader:
self.context.log('json', end=' ', flush=True)
def update_comments(self, filename: str, post: Post) -> None:
def _postcomment_asdict(comment):
def _postcommentanswer_asdict(comment):
return {'id': comment.id,
'created_at': int(comment.created_at_utc.replace(tzinfo=timezone.utc).timestamp()),
'text': comment.text,
'owner': comment.owner._asdict()}
def _postcomment_asdict(comment):
return {**_postcommentanswer_asdict(comment),
'answers': sorted([_postcommentanswer_asdict(answer) for answer in comment.answers],
key=lambda t: int(t['id']),
reverse=True)}
def get_unique_comments(comments, combine_answers=False):
if not comments:
return list()
comments_list = sorted(sorted(list(comments), key=lambda t: int(t['id'])),
key=lambda t: int(t['created_at']), reverse=True)
unique_comments_list = [comments_list[0]]
for x, y in zip(comments_list[:-1], comments_list[1:]):
if x['id'] != y['id']:
unique_comments_list.append(y)
elif combine_answers:
combined_answers = unique_comments_list[-1].get('answers') or list()
if 'answers' in y:
combined_answers.extend(y['answers'])
unique_comments_list[-1]['answers'] = get_unique_comments(combined_answers)
return unique_comments_list
filename += '_comments.json'
try:
with open(filename) as fp:
@ -253,18 +275,10 @@ class Instaloader:
comments = list()
comments.extend(_postcomment_asdict(comment) for comment in post.get_comments())
if comments:
comments_list = sorted(sorted(list(comments), key=lambda t: int(t['id'])),
key=lambda t: int(t['created_at']), reverse=True)
unique_comments_list = [comments_list[0]]
#for comment in comments_list:
# if unique_comments_list[-1]['id'] != comment['id']:
# unique_comments_list.append(comment)
#file.write(json.dumps(unique_comments_list, indent=4))
for x, y in zip(comments_list[:-1], comments_list[1:]):
if x['id'] != y['id']:
unique_comments_list.append(y)
comments = get_unique_comments(comments, combine_answers=True)
answer_ids = set(int(answer['id']) for comment in comments for answer in comment.get('answers'))
with open(filename, 'w') as file:
file.write(json.dumps(unique_comments_list, indent=4))
file.write(json.dumps(list(filter(lambda t: int(t['id']) not in answer_ids, comments)), indent=4))
self.context.log('comments', end=' ', flush=True)
def save_caption(self, filename: str, mtime: datetime, caption: str) -> None:

View File

@ -4,6 +4,8 @@ import re
from base64 import b64decode, b64encode
from collections import namedtuple
from datetime import datetime
from functools import reduce
from operator import add
from typing import Any, Dict, Iterator, List, Optional, Union
from . import __version__
@ -17,11 +19,11 @@ PostSidecarNode.is_video.__doc__ = "Whether this node is a video."
PostSidecarNode.display_url.__doc__ = "URL of image or video thumbnail."
PostSidecarNode.video_url.__doc__ = "URL of video or None."
PostComment = namedtuple('PostComment', ['id', 'created_at_utc', 'text', 'owner'])
PostComment.id.__doc__ = "ID number of comment."
PostComment.created_at_utc.__doc__ = ":class:`~datetime.datetime` when comment was created (UTC)."
PostComment.text.__doc__ = "Comment text."
PostComment.owner.__doc__ = "Owner :class:`Profile` of the comment."
PostCommentAnswer = namedtuple('PostCommentAnswer', ['id', 'created_at_utc', 'text', 'owner'])
PostCommentAnswer.id.__doc__ = "ID number of comment."
PostCommentAnswer.created_at_utc.__doc__ = ":class:`~datetime.datetime` when comment was created (UTC)."
PostCommentAnswer.text.__doc__ = "Comment text."
PostCommentAnswer.owner.__doc__ = "Owner :class:`Profile` of the comment."
PostLocation = namedtuple('PostLocation', ['id', 'name', 'slug', 'has_public_page', 'lat', 'lng'])
PostLocation.id.__doc__ = "ID number of location."
@ -32,6 +34,21 @@ PostLocation.lat.__doc__ = "Latitude (:class:`float`)."
PostLocation.lng.__doc__ = "Longitude (:class:`float`)."
class PostComment(namedtuple('PostComment', (*PostCommentAnswer._fields, 'answers'))):
__slots__ = ()
def __new__(cls, pca: PostCommentAnswer, answers: Iterator[PostCommentAnswer]):
return super(cls, PostComment).__new__(cls,
*(getattr(pca, field) for field in PostCommentAnswer._fields),
answers)
PostComment.__doc__ = PostComment.__bases__[0].__doc__
for field in PostCommentAnswer._fields:
getattr(PostComment, field).__doc__ = getattr(PostCommentAnswer, field).__doc__
PostComment.answers.__doc__ = r"Iterator which yields all :class:`PostCommentAnswer`\ s for the comment."
class Post:
"""
Structure containing information about an Instagram post.
@ -283,34 +300,72 @@ class Post:
@property
def comments(self) -> int:
"""Comment count"""
return self._field('edge_media_to_comment', 'count')
"""Comment count including answers"""
try:
return self._field('edge_media_to_parent_comment', 'count')
except KeyError:
return self._field('edge_media_to_comment', 'count')
def get_comments(self) -> Iterator[PostComment]:
"""Iterate over all comments of the post.
r"""Iterate over all comments of the post.
Each comment is represented by a PostComment namedtuple with fields text (string), created_at (datetime),
id (int) and owner (:class:`Profile`).
id (int), owner (:class:`Profile`) and answers (:class:`~typing.Iterator`\ [:class:`PostCommentAnswer`])
if available.
"""
def _postcommentanswer(node):
return PostCommentAnswer(id=int(node['id']),
created_at_utc=datetime.utcfromtimestamp(node['created_at']),
text=node['text'],
owner=Profile(self._context, node['owner']))
def _postcommentanswers(node):
if 'edge_threaded_comments' not in node:
return
answer_count = node['edge_threaded_comments']['count']
if answer_count == 0:
# Avoid doing additional requests if there are no comment answers
return
answer_edges = node['edge_threaded_comments']['edges']
if answer_count == len(answer_edges):
# If the answer's metadata already contains all comments, don't do GraphQL requests to obtain them
yield from (_postcommentanswer(comment['node']) for comment in answer_edges)
return
yield from (_postcommentanswer(answer_node) for answer_node in
self._context.graphql_node_list("51fdd02b67508306ad4484ff574a0b62",
{'comment_id': node['id']},
'https://www.instagram.com/p/' + self.shortcode + '/',
lambda d: d['data']['comment']['edge_threaded_comments']))
def _postcomment(node):
return PostComment(id=int(node['id']),
created_at_utc=datetime.utcfromtimestamp(node['created_at']),
text=node['text'],
owner=Profile(self._context, node['owner']))
return PostComment(_postcommentanswer(node),
answers=_postcommentanswers(node))
if self.comments == 0:
# Avoid doing additional requests if there are no comments
return
comment_edges = self._field('edge_media_to_comment', 'edges')
if self.comments == len(comment_edges):
# If the Post's metadata already contains all comments, don't do GraphQL requests to obtain them
try:
comment_edges = self._field('edge_media_to_parent_comment', 'edges')
answers_count = reduce(add, [edge['node']['edge_threaded_comments']['count'] for edge in comment_edges], 0)
threaded_comments_available = True
except KeyError:
comment_edges = self._field('edge_media_to_comment', 'edges')
answers_count = 0
threaded_comments_available = False
if self.comments == len(comment_edges) + answers_count:
# If the Post's metadata already contains all parent comments, don't do GraphQL requests to obtain them
yield from (_postcomment(comment['node']) for comment in comment_edges)
return
yield from (_postcomment(node) for node in
self._context.graphql_node_list("33ba35852cb50da46f5b5e889df7d159",
{'shortcode': self.shortcode},
'https://www.instagram.com/p/' + self.shortcode + '/',
lambda d: d['data']['shortcode_media']['edge_media_to_comment'],
self._rhx_gis))
self._context.graphql_node_list(
"97b41c52301f77ce508f55e66d17620e" if threaded_comments_available
else "f0986789a5c5d17c2400faebf16efd0d",
{'shortcode': self.shortcode},
'https://www.instagram.com/p/' + self.shortcode + '/',
lambda d:
d['data']['shortcode_media'][
'edge_media_to_parent_comment' if threaded_comments_available else 'edge_media_to_comment'],
self._rhx_gis))
def get_likes(self) -> Iterator['Profile']:
"""Iterate over all likes of the post. A :class:`Profile` instance of each likee is yielded."""