mirror of
https://github.com/instaloader/instaloader.git
synced 2024-11-23 02:32:46 +01:00
Integrate mypy static type checker into CI
This commit is contained in:
parent
4f9d64a284
commit
4c72077976
@ -1,14 +1,15 @@
|
||||
dist: xenial
|
||||
language: python
|
||||
python:
|
||||
- "3.5"
|
||||
- "3.5-dev"
|
||||
- "3.6"
|
||||
- "3.6-dev"
|
||||
- "3.7"
|
||||
install:
|
||||
- pip install pylint~=2.3.1 requests
|
||||
- pip install pylint~=2.3.1 requests mypy
|
||||
- pip install -r docs/requirements.txt
|
||||
script:
|
||||
- python3 -m pylint -r n -d bad-whitespace,missing-docstring,too-many-arguments,locally-disabled,line-too-long,no-else-raise,too-many-public-methods,too-many-lines,too-many-instance-attributes,too-many-locals,too-many-branches,too-many-statements,inconsistent-return-statements,invalid-name,wildcard-import,unused-wildcard-import,no-else-return,cyclic-import,unnecessary-pass instaloader
|
||||
- python3 -m mypy -m instaloader
|
||||
- make -C docs html SPHINXOPTS="-W -n"
|
||||
deploy:
|
||||
- provider: pypi
|
||||
|
@ -6,7 +6,7 @@ __version__ = '4.2.4'
|
||||
|
||||
try:
|
||||
# pylint:disable=wrong-import-position
|
||||
import win_unicode_console
|
||||
import win_unicode_console # type: ignore
|
||||
except ImportError:
|
||||
pass
|
||||
else:
|
||||
|
@ -13,10 +13,10 @@ from datetime import datetime, timezone
|
||||
from functools import wraps
|
||||
from hashlib import md5
|
||||
from io import BytesIO
|
||||
from typing import Any, Callable, Iterator, List, Optional, Set, Union
|
||||
from typing import Any, Callable, ContextManager, Iterator, List, Optional, Set, Union, cast
|
||||
|
||||
import requests
|
||||
import urllib3
|
||||
import urllib3 # type: ignore
|
||||
|
||||
from .exceptions import *
|
||||
from .instaloadercontext import InstaloaderContext
|
||||
@ -45,8 +45,8 @@ def _requires_login(func: Callable) -> Callable:
|
||||
if not instaloader.context.is_logged_in:
|
||||
raise LoginRequiredException("--login=USERNAME required.")
|
||||
return func(instaloader, *args, **kwargs)
|
||||
# pylint:disable=no-member
|
||||
call.__doc__ += ":raises LoginRequiredException: If called without being logged in.\n"
|
||||
docstring_text = ":raises LoginRequiredException: If called without being logged in.\n"
|
||||
call.__doc__ = call.__doc__ + docstring_text if call.__doc__ is not None else docstring_text
|
||||
return call
|
||||
|
||||
|
||||
@ -186,7 +186,7 @@ class Instaloader:
|
||||
raise InvalidArgumentException("Commit mode requires JSON metadata to be saved.")
|
||||
|
||||
# Used to keep state in commit mode
|
||||
self._committed = None
|
||||
self._committed = None # type: Optional[bool]
|
||||
|
||||
@contextmanager
|
||||
def anonymous_copy(self):
|
||||
@ -299,11 +299,11 @@ class Instaloader:
|
||||
filename += '.txt'
|
||||
caption += '\n'
|
||||
pcaption = _elliptify(caption)
|
||||
caption = caption.encode("UTF-8")
|
||||
bcaption = caption.encode("UTF-8")
|
||||
with suppress(FileNotFoundError):
|
||||
with open(filename, 'rb') as file:
|
||||
file_caption = file.read()
|
||||
if file_caption.replace(b'\r\n', b'\n') == caption.replace(b'\r\n', b'\n'):
|
||||
if file_caption.replace(b'\r\n', b'\n') == bcaption.replace(b'\r\n', b'\n'):
|
||||
try:
|
||||
self.context.log(pcaption + ' unchanged', end=' ', flush=True)
|
||||
except UnicodeEncodeError:
|
||||
@ -327,7 +327,7 @@ class Instaloader:
|
||||
except UnicodeEncodeError:
|
||||
self.context.log('txt', end=' ', flush=True)
|
||||
with open(filename, 'wb') as text_file:
|
||||
shutil.copyfileobj(BytesIO(caption), text_file)
|
||||
shutil.copyfileobj(BytesIO(bcaption), text_file)
|
||||
os.utime(filename, (datetime.now().timestamp(), mtime.timestamp()))
|
||||
|
||||
def save_location(self, filename: str, location: PostLocation, mtime: datetime) -> None:
|
||||
@ -349,12 +349,12 @@ class Instaloader:
|
||||
return epoch.strftime('%Y-%m-%d_%H-%M-%S_UTC')
|
||||
|
||||
profile_pic_response = self.context.get_raw(profile.profile_pic_url)
|
||||
date_object = None # type: Optional[datetime]
|
||||
if 'Last-Modified' in profile_pic_response.headers:
|
||||
date_object = datetime.strptime(profile_pic_response.headers["Last-Modified"], '%a, %d %b %Y %H:%M:%S GMT')
|
||||
profile_pic_bytes = None
|
||||
profile_pic_identifier = _epoch_to_string(date_object)
|
||||
else:
|
||||
date_object = None
|
||||
profile_pic_bytes = profile_pic_response.content
|
||||
profile_pic_identifier = md5(profile_pic_bytes).hexdigest()[:16]
|
||||
profile_pic_extension = 'jpg'
|
||||
@ -383,6 +383,7 @@ class Instaloader:
|
||||
:param filename: Filename, or None to use default filename.
|
||||
"""
|
||||
if filename is None:
|
||||
assert self.context.username is not None
|
||||
filename = get_default_session_filename(self.context.username)
|
||||
dirname = os.path.dirname(filename)
|
||||
if dirname != '' and not os.path.exists(dirname):
|
||||
@ -513,6 +514,7 @@ class Instaloader:
|
||||
userids = list(edge["node"]["id"] for edge in data["feed_reels_tray"]["edge_reels_tray_to_reel"]["edges"])
|
||||
|
||||
def _userid_chunks():
|
||||
assert userids is not None
|
||||
userids_per_query = 100
|
||||
for i in range(0, len(userids), userids_per_query):
|
||||
yield userids[i:i + userids_per_query]
|
||||
@ -711,6 +713,7 @@ class Instaloader:
|
||||
"""
|
||||
self.context.log("Retrieving saved posts...")
|
||||
count = 1
|
||||
assert self.context.username is not None
|
||||
for post in Profile.from_username(self.context, self.context.username).get_saved_posts():
|
||||
if max_count is not None and count > max_count:
|
||||
break
|
||||
@ -946,10 +949,12 @@ class Instaloader:
|
||||
|
||||
.. versionadded:: 4.1"""
|
||||
|
||||
@contextmanager
|
||||
def _error_raiser(_str):
|
||||
yield
|
||||
|
||||
error_handler = _error_raiser if raise_errors else self.context.error_catcher
|
||||
error_handler = cast(Callable[[Optional[str]], ContextManager[None]],
|
||||
_error_raiser if raise_errors else self.context.error_catcher)
|
||||
|
||||
for profile in profiles:
|
||||
with error_handler(profile.username):
|
||||
|
@ -10,7 +10,7 @@ import time
|
||||
import urllib.parse
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Callable, Dict, Iterator, Optional, Union
|
||||
from typing import Any, Callable, Dict, Iterator, List, Optional, Union
|
||||
|
||||
import requests
|
||||
import requests.utils
|
||||
@ -22,7 +22,7 @@ def copy_session(session: requests.Session) -> requests.Session:
|
||||
"""Duplicates a requests.Session."""
|
||||
new = requests.Session()
|
||||
new.cookies = requests.utils.cookiejar_from_dict(requests.utils.dict_from_cookiejar(session.cookies))
|
||||
new.headers = session.headers.copy()
|
||||
new.headers = session.headers.copy() # type: ignore
|
||||
return new
|
||||
|
||||
|
||||
@ -60,17 +60,17 @@ class InstaloaderContext:
|
||||
self.two_factor_auth_pending = None
|
||||
|
||||
# error log, filled with error() and printed at the end of Instaloader.main()
|
||||
self.error_log = []
|
||||
self.error_log = [] # type: List[str]
|
||||
|
||||
# For the adaption of sleep intervals (rate control)
|
||||
self._graphql_query_timestamps = dict()
|
||||
self._graphql_earliest_next_request_time = 0
|
||||
self._graphql_query_timestamps = dict() # type: Dict[str, List[float]]
|
||||
self._graphql_earliest_next_request_time = 0.0
|
||||
|
||||
# Can be set to True for testing, disables supression of InstaloaderContext._error_catcher
|
||||
self.raise_all_errors = False
|
||||
|
||||
# Cache profile from id (mapping from id to Profile)
|
||||
self.profile_id_cache = dict()
|
||||
self.profile_id_cache = dict() # type: Dict[int, Any]
|
||||
|
||||
@contextmanager
|
||||
def anonymous_copy(self):
|
||||
@ -283,7 +283,7 @@ class InstaloaderContext:
|
||||
max_reqs = {'1cb6ec562846122743b61e492c85999f': 200, '33ba35852cb50da46f5b5e889df7d159': 200}
|
||||
return max_reqs.get(query_hash) or min(max_reqs.values())
|
||||
|
||||
def _graphql_query_waittime(self, query_hash: str, current_time: float, untracked_queries: bool = False) -> int:
|
||||
def _graphql_query_waittime(self, query_hash: str, current_time: float, untracked_queries: bool = False) -> float:
|
||||
"""Calculate time needed to wait before GraphQL query can be executed."""
|
||||
sliding_window = 660
|
||||
if query_hash not in self._graphql_query_timestamps:
|
||||
|
@ -23,10 +23,10 @@ PostCommentAnswer.created_at_utc.__doc__ = ":class:`~datetime.datetime` when com
|
||||
PostCommentAnswer.text.__doc__ = "Comment text."
|
||||
PostCommentAnswer.owner.__doc__ = "Owner :class:`Profile` of the comment."
|
||||
|
||||
PostComment = namedtuple('PostComment', (*PostCommentAnswer._fields, 'answers'))
|
||||
PostComment = namedtuple('PostComment', (*PostCommentAnswer._fields, 'answers')) # type: ignore
|
||||
for field in PostCommentAnswer._fields:
|
||||
getattr(PostComment, field).__doc__ = getattr(PostCommentAnswer, field).__doc__
|
||||
PostComment.answers.__doc__ = r"Iterator which yields all :class:`PostCommentAnswer`\ s for the comment."
|
||||
PostComment.answers.__doc__ = r"Iterator which yields all :class:`PostCommentAnswer`\ s for the comment." # type: ignore
|
||||
|
||||
PostLocation = namedtuple('PostLocation', ['id', 'name', 'slug', 'has_public_page', 'lat', 'lng'])
|
||||
PostLocation.id.__doc__ = "ID number of location."
|
||||
@ -67,9 +67,9 @@ class Post:
|
||||
self._context = context
|
||||
self._node = node
|
||||
self._owner_profile = owner_profile
|
||||
self._full_metadata_dict = None
|
||||
self._rhx_gis_str = None
|
||||
self._location = None
|
||||
self._full_metadata_dict = None # type: Optional[Dict[str, Any]]
|
||||
self._rhx_gis_str = None # type: Optional[str]
|
||||
self._location = None # type: Optional[PostLocation]
|
||||
|
||||
@classmethod
|
||||
def from_shortcode(cls, context: InstaloaderContext, shortcode: str):
|
||||
@ -140,11 +140,13 @@ class Post:
|
||||
@property
|
||||
def _full_metadata(self) -> Dict[str, Any]:
|
||||
self._obtain_metadata()
|
||||
assert self._full_metadata_dict is not None
|
||||
return self._full_metadata_dict
|
||||
|
||||
@property
|
||||
def _rhx_gis(self) -> str:
|
||||
self._obtain_metadata()
|
||||
assert self._rhx_gis_str is not None
|
||||
return self._rhx_gis_str
|
||||
|
||||
def _field(self, *keys) -> Any:
|
||||
@ -231,6 +233,7 @@ class Post:
|
||||
return self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]
|
||||
elif "caption" in self._node:
|
||||
return self._node["caption"]
|
||||
return None
|
||||
|
||||
@property
|
||||
def caption_hashtags(self) -> List[str]:
|
||||
@ -279,6 +282,7 @@ class Post:
|
||||
"""URL of the video, or None."""
|
||||
if self.is_video:
|
||||
return self._field('video_url')
|
||||
return None
|
||||
|
||||
@property
|
||||
def viewer_has_liked(self) -> Optional[bool]:
|
||||
@ -424,7 +428,7 @@ class Profile:
|
||||
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
||||
assert 'username' in node
|
||||
self._context = context
|
||||
self._has_public_story = None
|
||||
self._has_public_story = None # type: Optional[bool]
|
||||
self._node = node
|
||||
self._rhx_gis = None
|
||||
self._iphone_struct_ = None
|
||||
@ -604,6 +608,7 @@ class Profile:
|
||||
'https://www.instagram.com/{}/'.format(self.username),
|
||||
self._rhx_gis)
|
||||
self._has_public_story = data['data']['user']['has_public_story']
|
||||
assert self._has_public_story is not None
|
||||
return self._has_public_story
|
||||
|
||||
@property
|
||||
@ -770,6 +775,7 @@ class StoryItem:
|
||||
""":class:`Profile` instance of the story item's owner."""
|
||||
if not self._owner_profile:
|
||||
self._owner_profile = Profile.from_id(self._context, self._node['owner']['id'])
|
||||
assert self._owner_profile is not None
|
||||
return self._owner_profile
|
||||
|
||||
@property
|
||||
@ -832,6 +838,7 @@ class StoryItem:
|
||||
"""URL of the video, or None."""
|
||||
if self.is_video:
|
||||
return self._node['video_resources'][-1]['src']
|
||||
return None
|
||||
|
||||
|
||||
class Story:
|
||||
@ -858,8 +865,8 @@ class Story:
|
||||
def __init__(self, context: InstaloaderContext, node: Dict[str, Any]):
|
||||
self._context = context
|
||||
self._node = node
|
||||
self._unique_id = None
|
||||
self._owner_profile = None
|
||||
self._unique_id = None # type: Optional[str]
|
||||
self._owner_profile = None # type: Optional[Profile]
|
||||
|
||||
def __repr__(self):
|
||||
return '<Story by {} changed {:%Y-%m-%d_%H-%M-%S_UTC}>'.format(self.owner_username, self.latest_media_utc)
|
||||
@ -873,7 +880,7 @@ class Story:
|
||||
return hash(self.unique_id)
|
||||
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
def unique_id(self) -> Union[str, int]:
|
||||
"""
|
||||
This ID only equals amongst :class:`Story` instances which have the same owner and the same set of
|
||||
:class:`StoryItem`. For all other :class:`Story` instances this ID is different.
|
||||
@ -889,12 +896,14 @@ class Story:
|
||||
"""Timestamp when the story has last been watched or None (local time zone)."""
|
||||
if self._node['seen']:
|
||||
return datetime.fromtimestamp(self._node['seen'])
|
||||
return None
|
||||
|
||||
@property
|
||||
def last_seen_utc(self) -> Optional[datetime]:
|
||||
"""Timestamp when the story has last been watched or None (UTC)."""
|
||||
if self._node['seen']:
|
||||
return datetime.utcfromtimestamp(self._node['seen'])
|
||||
return None
|
||||
|
||||
@property
|
||||
def latest_media_local(self) -> datetime:
|
||||
@ -959,7 +968,7 @@ class Highlight(Story):
|
||||
def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner: Optional[Profile] = None):
|
||||
super().__init__(context, node)
|
||||
self._owner_profile = owner
|
||||
self._items = None
|
||||
self._items = None # type: Optional[List[Dict[str, Any]]]
|
||||
|
||||
def __repr__(self):
|
||||
return '<Highlight by {}: {}>'.format(self.owner_username, self.title)
|
||||
@ -1002,11 +1011,13 @@ class Highlight(Story):
|
||||
def itemcount(self) -> int:
|
||||
"""Count of items associated with the :class:`Highlight` instance."""
|
||||
self._fetch_items()
|
||||
assert self._items is not None
|
||||
return len(self._items)
|
||||
|
||||
def get_items(self) -> Iterator[StoryItem]:
|
||||
"""Retrieve all associated highlight items."""
|
||||
self._fetch_items()
|
||||
assert self._items is not None
|
||||
yield from (StoryItem(self._context, item, self.owner_profile) for item in self._items)
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user