From bd50a52b0d7247cdbf205eb851ce33ae4b89c516 Mon Sep 17 00:00:00 2001 From: The Hatsune Daishi Date: Wed, 22 Sep 2021 23:12:04 +0900 Subject: [PATCH] Basic framework for simultaneous download of multiple formats (#1036) Authored by: nao20010128nao --- yt_dlp/downloader/common.py | 33 +++++++-- yt_dlp/downloader/fragment.py | 53 ++++++++++++- yt_dlp/downloader/http.py | 2 + yt_dlp/minicurses.py | 135 ++++++++++++++++++++++++++++++++++ yt_dlp/utils.py | 8 ++ 5 files changed, 224 insertions(+), 7 deletions(-) create mode 100644 yt_dlp/minicurses.py diff --git a/yt_dlp/downloader/common.py b/yt_dlp/downloader/common.py index ce914bd4a..53e83d2c3 100644 --- a/yt_dlp/downloader/common.py +++ b/yt_dlp/downloader/common.py @@ -16,6 +16,11 @@ shell_quote, timeconvert, ) +from ..minicurses import ( + MultilinePrinter, + QuietMultilinePrinter, + BreaklineStatusPrinter +) class FileDownloader(object): @@ -68,6 +73,7 @@ def __init__(self, ydl, params): self.ydl = ydl self._progress_hooks = [] self.params = params + self._multiline = None self.add_progress_hook(self.report_progress) @staticmethod @@ -236,12 +242,28 @@ def report_destination(self, filename): """Report destination filename.""" self.to_screen('[download] Destination: ' + filename) - def _report_progress_status(self, msg, is_last_line=False): + def _prepare_multiline_status(self, lines): + if self.params.get('quiet'): + self._multiline = QuietMultilinePrinter() + elif self.params.get('progress_with_newline', False): + self._multiline = BreaklineStatusPrinter(sys.stderr, lines) + elif self.params.get('noprogress', False): + self._multiline = None + else: + self._multiline = MultilinePrinter(sys.stderr, lines) + + def _finish_multiline_status(self): + if self._multiline is not None: + self._multiline.end() + + def _report_progress_status(self, msg, is_last_line=False, progress_line=None): fullmsg = '[download] ' + msg if self.params.get('progress_with_newline', False): self.to_screen(fullmsg) + elif progress_line is not None and self._multiline is not None: + self._multiline.print_at_line(fullmsg, progress_line) else: - if compat_os_name == 'nt': + if compat_os_name == 'nt' or not sys.stderr.isatty(): prev_len = getattr(self, '_report_progress_prev_line_length', 0) if prev_len > len(fullmsg): @@ -249,7 +271,7 @@ def _report_progress_status(self, msg, is_last_line=False): self._report_progress_prev_line_length = len(fullmsg) clear_line = '\r' else: - clear_line = ('\r\x1b[K' if sys.stderr.isatty() else '\r') + clear_line = '\r\x1b[K' self.to_screen(clear_line + fullmsg, skip_eol=not is_last_line) self.to_console_title('yt-dlp ' + msg) @@ -266,7 +288,8 @@ def report_progress(self, s): s['_elapsed_str'] = self.format_seconds(s['elapsed']) msg_template += ' in %(_elapsed_str)s' self._report_progress_status( - msg_template % s, is_last_line=True) + msg_template % s, progress_line=s.get('progress_idx')) + return if self.params.get('noprogress'): return @@ -311,7 +334,7 @@ def report_progress(self, s): else: msg_template = '%(_percent_str)s % at %(_speed_str)s ETA %(_eta_str)s' - self._report_progress_status(msg_template % s) + self._report_progress_status(msg_template % s, progress_line=s.get('progress_idx')) def report_resuming_byte(self, resume_len): """Report attempt to resume at given byte.""" diff --git a/yt_dlp/downloader/fragment.py b/yt_dlp/downloader/fragment.py index ebdef27db..31f946792 100644 --- a/yt_dlp/downloader/fragment.py +++ b/yt_dlp/downloader/fragment.py @@ -3,6 +3,7 @@ import os import time import json +from math import ceil try: import concurrent.futures @@ -120,6 +121,7 @@ def _download_fragment(self, ctx, frag_url, info_dict, headers=None, request_dat 'url': frag_url, 'http_headers': headers or info_dict.get('http_headers'), 'request_data': request_data, + 'ctx_id': ctx.get('ctx_id'), } success = ctx['dl'].download(fragment_filename, fragment_info_dict) if not success: @@ -219,6 +221,7 @@ def _prepare_frag_download(self, ctx): def _start_frag_download(self, ctx, info_dict): resume_len = ctx['complete_frags_downloaded_bytes'] total_frags = ctx['total_frags'] + ctx_id = ctx.get('ctx_id') # This dict stores the download progress, it's updated by the progress # hook state = { @@ -242,6 +245,12 @@ def frag_progress_hook(s): if s['status'] not in ('downloading', 'finished'): return + if ctx_id is not None and s.get('ctx_id') != ctx_id: + return + + state['max_progress'] = ctx.get('max_progress') + state['progress_idx'] = ctx.get('progress_idx') + time_now = time.time() state['elapsed'] = time_now - start frag_total_bytes = s.get('total_bytes') or 0 @@ -301,6 +310,9 @@ def _finish_frag_download(self, ctx, info_dict): 'filename': ctx['filename'], 'status': 'finished', 'elapsed': elapsed, + 'ctx_id': ctx.get('ctx_id'), + 'max_progress': ctx.get('max_progress'), + 'progress_idx': ctx.get('progress_idx'), }, info_dict) def _prepare_external_frag_download(self, ctx): @@ -347,7 +359,44 @@ def decrypt_fragment(fragment, frag_content): return decrypt_fragment - def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None): + def download_and_append_fragments_multiple(self, *args, pack_func=None, finish_func=None): + ''' + @params (ctx1, fragments1, info_dict1), (ctx2, fragments2, info_dict2), ... + all args must be either tuple or list + ''' + max_progress = len(args) + if max_progress == 1: + return self.download_and_append_fragments(*args[0], pack_func=pack_func, finish_func=finish_func) + max_workers = self.params.get('concurrent_fragment_downloads', max_progress) + self._prepare_multiline_status(max_progress) + + def thread_func(idx, ctx, fragments, info_dict, tpe): + ctx['max_progress'] = max_progress + ctx['progress_idx'] = idx + return self.download_and_append_fragments(ctx, fragments, info_dict, pack_func=pack_func, finish_func=finish_func, tpe=tpe) + + class FTPE(concurrent.futures.ThreadPoolExecutor): + # has to stop this or it's going to wait on the worker thread itself + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + spins = [] + for idx, (ctx, fragments, info_dict) in enumerate(args): + tpe = FTPE(ceil(max_workers / max_progress)) + job = tpe.submit(thread_func, idx, ctx, fragments, info_dict, tpe) + spins.append((tpe, job)) + + result = True + for tpe, job in spins: + try: + result = result and job.result() + finally: + tpe.shutdown(wait=True) + + self._finish_multiline_status() + return True + + def download_and_append_fragments(self, ctx, fragments, info_dict, *, pack_func=None, finish_func=None, tpe=None): fragment_retries = self.params.get('fragment_retries', 0) is_fatal = (lambda idx: idx == 0) if self.params.get('skip_unavailable_fragments', True) else (lambda _: True) if not pack_func: @@ -416,7 +465,7 @@ def _download_fragment(fragment): return fragment, frag_content, frag_index, ctx_copy.get('fragment_filename_sanitized') self.report_warning('The download speed shown is only of one thread. This is a known issue and patches are welcome') - with concurrent.futures.ThreadPoolExecutor(max_workers) as pool: + with tpe or concurrent.futures.ThreadPoolExecutor(max_workers) as pool: for fragment, frag_content, frag_index, frag_filename in pool.map(_download_fragment, fragments): ctx['fragment_filename_sanitized'] = frag_filename ctx['fragment_index'] = frag_index diff --git a/yt_dlp/downloader/http.py b/yt_dlp/downloader/http.py index 1edb0f91f..9e79051ad 100644 --- a/yt_dlp/downloader/http.py +++ b/yt_dlp/downloader/http.py @@ -310,6 +310,7 @@ def retry(e): 'eta': eta, 'speed': speed, 'elapsed': now - ctx.start_time, + 'ctx_id': info_dict.get('ctx_id'), }, info_dict) if data_len is not None and byte_counter == data_len: @@ -357,6 +358,7 @@ def retry(e): 'filename': ctx.filename, 'status': 'finished', 'elapsed': time.time() - ctx.start_time, + 'ctx_id': info_dict.get('ctx_id'), }, info_dict) return True diff --git a/yt_dlp/minicurses.py b/yt_dlp/minicurses.py new file mode 100644 index 000000000..74ad891c9 --- /dev/null +++ b/yt_dlp/minicurses.py @@ -0,0 +1,135 @@ +import os + +from threading import Lock +from .utils import compat_os_name, get_windows_version + + +class MultilinePrinterBase(): + def __enter__(self): + return self + + def __exit__(self, *args): + self.end() + + def print_at_line(self, text, pos): + pass + + def end(self): + pass + + +class MultilinePrinter(MultilinePrinterBase): + + def __init__(self, stream, lines): + """ + @param stream stream to write to + @lines number of lines to be written + """ + self.stream = stream + + is_win10 = compat_os_name == 'nt' and get_windows_version() >= (10, ) + self.CARRIAGE_RETURN = '\r' + if os.getenv('TERM') and self._isatty() or is_win10: + # reason not to use curses https://github.com/yt-dlp/yt-dlp/pull/1036#discussion_r713851492 + # escape sequences for Win10 https://docs.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences + self.UP = '\x1b[A' + self.DOWN = '\n' + self.ERASE_LINE = '\x1b[K' + self._HAVE_FULLCAP = self._isatty() or is_win10 + else: + self.UP = self.DOWN = self.ERASE_LINE = None + self._HAVE_FULLCAP = False + + # lines are numbered from top to bottom, counting from 0 to self.maximum + self.maximum = lines - 1 + self.lastline = 0 + self.lastlength = 0 + + self.movelock = Lock() + + @property + def have_fullcap(self): + """ + True if the TTY is allowing to control cursor, + so that multiline progress works + """ + return self._HAVE_FULLCAP + + def _isatty(self): + try: + return self.stream.isatty() + except BaseException: + return False + + def _move_cursor(self, dest): + current = min(self.lastline, self.maximum) + self.stream.write(self.CARRIAGE_RETURN) + if current == dest: + # current and dest are at same position, no need to move cursor + return + elif current > dest: + # when maximum == 2, + # 0. dest + # 1. + # 2. current + self.stream.write(self.UP * (current - dest)) + elif current < dest: + # when maximum == 2, + # 0. current + # 1. + # 2. dest + self.stream.write(self.DOWN * (dest - current)) + self.lastline = dest + + def print_at_line(self, text, pos): + with self.movelock: + if self.have_fullcap: + self._move_cursor(pos) + self.stream.write(self.ERASE_LINE) + self.stream.write(text) + else: + if self.maximum != 0: + # let user know about which line is updating the status + text = f'{pos + 1}: {text}' + textlen = len(text) + if self.lastline == pos: + # move cursor at the start of progress when writing to same line + self.stream.write(self.CARRIAGE_RETURN) + if self.lastlength > textlen: + text += ' ' * (self.lastlength - textlen) + self.lastlength = textlen + else: + # otherwise, break the line + self.stream.write('\n') + self.lastlength = 0 + self.stream.write(text) + self.lastline = pos + + def end(self): + with self.movelock: + # move cursor to the end of the last line, and write line break + # so that other to_screen calls can precede + self._move_cursor(self.maximum) + self.stream.write('\n') + + +class QuietMultilinePrinter(MultilinePrinterBase): + def __init__(self): + self.have_fullcap = True + + +class BreaklineStatusPrinter(MultilinePrinterBase): + + def __init__(self, stream, lines): + """ + @param stream stream to write to + """ + self.stream = stream + self.maximum = lines + self.have_fullcap = True + + def print_at_line(self, text, pos): + if self.maximum != 0: + # let user know about which line is updating the status + text = f'{pos + 1}: {text}' + self.stream.write(text + '\n') diff --git a/yt_dlp/utils.py b/yt_dlp/utils.py index de0213b14..9eb47fccb 100644 --- a/yt_dlp/utils.py +++ b/yt_dlp/utils.py @@ -6373,3 +6373,11 @@ def traverse_dict(dictn, keys, casesense=True): def variadic(x, allowed_types=(str, bytes)): return x if isinstance(x, collections.abc.Iterable) and not isinstance(x, allowed_types) else (x,) + + +def get_windows_version(): + ''' Get Windows version. None if it's not running on Windows ''' + if compat_os_name == 'nt': + return version_tuple(platform.win32_ver()[1]) + else: + return None