From 57e0f077a635ee30f37ebea71ddb70723831ecd8 Mon Sep 17 00:00:00 2001 From: pukkandan Date: Tue, 21 Jun 2022 17:02:56 +0530 Subject: [PATCH] [update] Expose more functionality to API --- pyinst.py | 1 - yt_dlp/compat/__init__.py | 2 +- yt_dlp/update.py | 268 ++++++++++++++++++++++---------------- yt_dlp/utils.py | 7 +- 4 files changed, 159 insertions(+), 119 deletions(-) diff --git a/pyinst.py b/pyinst.py index 292f5d719..d27e56555 100644 --- a/pyinst.py +++ b/pyinst.py @@ -5,7 +5,6 @@ from PyInstaller.__main__ import run as run_pyinstaller - OS_NAME, ARCH = sys.platform, platform.architecture()[0][:2] diff --git a/yt_dlp/compat/__init__.py b/yt_dlp/compat/__init__.py index 35875ed20..3a91fad0e 100644 --- a/yt_dlp/compat/__init__.py +++ b/yt_dlp/compat/__init__.py @@ -55,7 +55,7 @@ def compat_ord(c): def compat_realpath(path): while os.path.islink(path): path = os.path.abspath(os.readlink(path)) - return path + return os.path.realpath(path) else: compat_realpath = os.path.realpath diff --git a/yt_dlp/update.py b/yt_dlp/update.py index 85c676e00..68418ce98 100644 --- a/yt_dlp/update.py +++ b/yt_dlp/update.py @@ -11,8 +11,8 @@ from .utils import Popen, traverse_obj, version_tuple from .version import __version__ - -RELEASE_JSON_URL = 'https://api.github.com/repos/yt-dlp/yt-dlp/releases/latest' +REPOSITORY = 'yt-dlp/yt-dlp' +API_URL = f'https://api.github.com/repos/{REPOSITORY}/releases/latest' @functools.cache @@ -60,137 +60,176 @@ def is_non_updateable(): return _NON_UPDATEABLE_REASONS.get(detect_variant(), _NON_UPDATEABLE_REASONS['other']) -def run_update(ydl): - """ - Update the program file with the latest version from the repository - Returns whether the program should terminate - """ +def _sha256_file(path): + h = hashlib.sha256() + mv = memoryview(bytearray(128 * 1024)) + with open(os.path.realpath(path), 'rb', buffering=0) as f: + for n in iter(lambda: f.readinto(mv), 0): + h.update(mv[:n]) + return h.hexdigest() - def report_error(msg, expected=False): - ydl.report_error(msg, tb=False if expected else None) - def report_unable(action, expected=False): - report_error(f'Unable to {action}', expected) +class Updater: + def __init__(self, ydl): + self.ydl = ydl - def report_permission_error(file): - report_unable(f'write to {file}; Try running as administrator', True) + @functools.cached_property + def _new_version_info(self): + self.ydl.write_debug(f'Fetching release info: {API_URL}') + return json.loads(self.ydl.urlopen(API_URL).read().decode()) - def report_network_error(action, delim=';'): - report_unable(f'{action}{delim} Visit https://github.com/yt-dlp/yt-dlp/releases/latest', True) + @property + def current_version(self): + """Current version""" + return __version__ - def calc_sha256sum(path): - h = hashlib.sha256() - mv = memoryview(bytearray(128 * 1024)) - with open(os.path.realpath(path), 'rb', buffering=0) as f: - for n in iter(lambda: f.readinto(mv), 0): - h.update(mv[:n]) - return h.hexdigest() + @property + def new_version(self): + """Version of the latest release""" + return self._new_version_info['tag_name'] - try: - version_info = json.loads(ydl.urlopen(RELEASE_JSON_URL).read().decode()) - except Exception: - return report_network_error('obtain version info', delim='; Please try again later or') + @property + def has_update(self): + """Whether there is an update available""" + return version_tuple(__version__) < version_tuple(self.new_version) - version_id = version_info['tag_name'] - ydl.to_screen(f'Latest version: {version_id}, Current version: {__version__}') - if version_tuple(__version__) >= version_tuple(version_id): - ydl.to_screen(f'yt-dlp is up to date ({__version__})') - return + @functools.cached_property + def filename(self): + """Filename of the executable""" + return compat_realpath(_get_variant_and_executable_path()[1]) - err = is_non_updateable() - if err: - return report_error(err, True) - - variant, filename = _get_variant_and_executable_path() - filename = compat_realpath(filename) # Absolute path, following symlinks - - label = _FILE_SUFFIXES[variant] - if label and platform.architecture()[0][:2] == '32': - label = f'_x86{label}' - release_name = f'yt-dlp{label}' - - ydl.to_screen(f'Current Build Hash {calc_sha256sum(filename)}') - ydl.to_screen(f'Updating to version {version_id} ...') - - def get_file(name, fatal=True): - error = report_network_error if fatal else lambda _: None - url = traverse_obj( - version_info, ('assets', lambda _, v: v['name'] == name, 'browser_download_url'), get_all=False) + def _download(self, name=None): + name = name or self.release_name + url = traverse_obj(self._new_version_info, ( + 'assets', lambda _, v: v['name'] == name, 'browser_download_url'), get_all=False) if not url: - return error('fetch updates') - try: - return ydl.urlopen(url).read() - except OSError: - return error('download latest version') + raise Exception('Unable to find download URL') + self.ydl.write_debug(f'Downloading {name} from {url}') + return self.ydl.urlopen(url).read() - def verify(content): - if not content: - return False - hash_data = get_file('SHA2-256SUMS', fatal=False) or b'' - expected = dict(ln.split()[::-1] for ln in hash_data.decode().splitlines()).get(release_name) - if not expected: - ydl.report_warning('no hash information found for the release') - elif hashlib.sha256(content).hexdigest() != expected: - return report_network_error('verify the new executable') + @functools.cached_property + def release_name(self): + """The release filename""" + label = _FILE_SUFFIXES[detect_variant()] + if label and platform.architecture()[0][:2] == '32': + label = f'_x86{label}' + return f'yt-dlp{label}' + + @functools.cached_property + def release_hash(self): + """Hash of the latest release""" + hash_data = dict(ln.split()[::-1] for ln in self._download('SHA2-256SUMS').decode().splitlines()) + return hash_data[self.release_name] + + def _report_error(self, msg, expected=False): + self.ydl.report_error(msg, tb=False if expected else None) + + def _report_permission_error(self, file): + self._report_error(f'Unable to write to {file}; Try running as administrator', True) + + def _report_network_error(self, action, delim=';'): + self._report_error(f'Unable to {action}{delim} Visit https://github.com/{REPOSITORY}/releases/latest', True) + + def check_update(self): + """Report whether there is an update available""" + try: + self.ydl.to_screen( + f'Latest version: {self.new_version}, Current version: {self.current_version}') + except Exception: + return self._report_network_error('obtain version info', delim='; Please try again later or') + + if not self.has_update: + return self.ydl.to_screen(f'yt-dlp is up to date ({__version__})') + + if not is_non_updateable(): + self.ydl.to_screen(f'Current Build Hash {_sha256_file(self.filename)}') return True - directory = os.path.dirname(filename) - if not os.access(filename, os.W_OK): - return report_permission_error(filename) - elif not os.access(directory, os.W_OK): - return report_permission_error(directory) + def update(self): + """Update yt-dlp executable to the latest version""" + if not self.check_update(): + return + err = is_non_updateable() + if err: + return self._report_error(err, True) + self.ydl.to_screen(f'Updating to version {self.new_version} ...') - new_filename, old_filename = f'{filename}.new', f'{filename}.old' - if variant == 'zip': # Can be replaced in-place - new_filename, old_filename = filename, None + directory = os.path.dirname(self.filename) + if not os.access(self.filename, os.W_OK): + return self._report_permission_error(self.filename) + elif not os.access(directory, os.W_OK): + return self._report_permission_error(directory) - try: - if os.path.exists(old_filename or ''): - os.remove(old_filename) - except OSError: - return report_unable('remove the old version') + new_filename, old_filename = f'{self.filename}.new', f'{self.filename}.old' + if detect_variant() == 'zip': # Can be replaced in-place + new_filename, old_filename = self.filename, None - newcontent = get_file(release_name) - if not verify(newcontent): - return - try: - with open(new_filename, 'wb') as outf: - outf.write(newcontent) - except OSError: - return report_permission_error(new_filename) + try: + if os.path.exists(old_filename or ''): + os.remove(old_filename) + except OSError: + return self._report_error('Unable to remove the old version') - try: - if old_filename: - os.rename(filename, old_filename) - except OSError: - return report_unable('move current version') - try: - if old_filename: - os.rename(new_filename, filename) - except OSError: - report_unable('overwrite current version') - os.rename(old_filename, filename) - return + try: + newcontent = self._download() + except OSError: + return self._report_network_error('download latest version') + except Exception: + return self._report_network_error('fetch updates') - if variant not in ('win32_exe', 'py2exe'): - if old_filename: - os.remove(old_filename) - ydl.to_screen(f'Updated yt-dlp to version {version_id}; Restart yt-dlp to use the new version') - return + try: + expected_hash = self.release_hash + except Exception: + self.ydl.report_warning('no hash information found for the release') + else: + if hashlib.sha256(newcontent).hexdigest() != expected_hash: + return self._report_network_error('verify the new executable') - try: - # Continues to run in the background - Popen(f'ping 127.0.0.1 -n 5 -w 1000 & del /F "{old_filename}"', - shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) - ydl.to_screen(f'Updated yt-dlp to version {version_id}') - return True # Exit app - except OSError: - report_unable('delete the old version') + try: + with open(new_filename, 'wb') as outf: + outf.write(newcontent) + except OSError: + return self._report_permission_error(new_filename) + + try: + if old_filename: + os.rename(self.filename, old_filename) + except OSError: + return self._report_error('Unable to move current version') + try: + if old_filename: + os.rename(new_filename, self.filename) + except OSError: + self._report_error('Unable to overwrite current version') + return os.rename(old_filename, self.filename) + + if detect_variant() not in ('win32_exe', 'py2exe'): + if old_filename: + os.remove(old_filename) + self.ydl.to_screen(f'Updated yt-dlp to version {self.new_version}; Restart yt-dlp to use the new version') + return + + try: + # Continues to run in the background + Popen(f'ping 127.0.0.1 -n 5 -w 1000 & del /F "{old_filename}"', + shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + self.ydl.to_screen(f'Updated yt-dlp to version {self.new_version}') + return True # Exit app + except OSError: + self._report_unable('delete the old version') + + +def run_update(ydl): + """Update the program file with the latest version from the repository + @returns Whether there was a successfull update (No update = False) + """ + return Updater(ydl).update() # Deprecated def update_self(to_screen, verbose, opener): import traceback + from .utils import write_string write_string( @@ -202,12 +241,10 @@ def update_self(to_screen, verbose, opener): class FakeYDL(): to_screen = printfn - @staticmethod - def report_warning(msg, *args, **kwargs): + def report_warning(self, msg, *args, **kwargs): return printfn(f'WARNING: {msg}', *args, **kwargs) - @staticmethod - def report_error(msg, tb=None): + def report_error(self, msg, tb=None): printfn(f'ERROR: {msg}') if not verbose: return @@ -224,6 +261,9 @@ def report_error(msg, tb=None): if tb: printfn(tb) + def write_debug(self, msg, *args, **kwargs): + printfn(f'[debug] {msg}', *args, **kwargs) + def urlopen(self, url): return opener.open(url) diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py index 10bcd5f4e..dc6894d83 100644 --- a/yt_dlp/utils.py +++ b/yt_dlp/utils.py @@ -991,9 +991,10 @@ def make_HTTPS_handler(params, **kwargs): def bug_reports_message(before=';'): - msg = ('please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , ' - 'filling out the appropriate issue template. ' - 'Confirm you are on the latest version using yt-dlp -U') + from .update import REPOSITORY + + msg = (f'please report this issue on https://github.com/{REPOSITORY}/issues?q= , ' + 'filling out the appropriate issue template. Confirm you are on the latest version using yt-dlp -U') before = before.rstrip() if not before or before.endswith(('.', '!', '?')):