From f9d87e9a78db5d1f66842d7b3ee3c25f15bcc2c1 Mon Sep 17 00:00:00 2001 From: Johannes Gundlach Date: Fri, 22 Nov 2024 19:48:36 +0100 Subject: [PATCH 1/4] Use rclone --stats command for all transfer operations --- rclone_python/rclone.py | 3 +- rclone_python/utils.py | 134 +++++++++++++++------------------------- 2 files changed, 53 insertions(+), 84 deletions(-) diff --git a/rclone_python/rclone.py b/rclone_python/rclone.py index c98129d..9584d39 100644 --- a/rclone_python/rclone.py +++ b/rclone_python/rclone.py @@ -650,13 +650,14 @@ def _rclone_transfer_operation( # add global rclone flags if ignore_existing: command += " --ignore-existing" - command += " --progress" # in path command += f' "{in_path}"' # out path command += f' "{out_path}"' + command += " --stats 0.1s --stats-unit bytes --use-json-log -v" + # optional named arguments/flags command += utils.args2string(args) diff --git a/rclone_python/utils.py b/rclone_python/utils.py index 20ee748..5b63252 100644 --- a/rclone_python/utils.py +++ b/rclone_python/utils.py @@ -1,4 +1,4 @@ -import re +import json import subprocess from typing import Any, Callable, Dict, List, Optional, Tuple, Union from rich.progress import Progress, TaskID, Task @@ -56,31 +56,6 @@ def shorten_filepath(in_path: str, max_length: int) -> str: return in_path -def convert2bits(value: float, unit: str) -> float: - """Returns the corresponding bit value to a value with a certain binary prefix (based on powers of 2) like KiB or MiB. - - Args: - value (float): Bit value using a certain binary prefix like KiB or MiB. - unit (str): The binary prefix. - - Returns: - float: The corresponding bit value. - """ - exp = { - "B": 0, - "KiB": 1, - "MiB": 2, - "GiB": 3, - "TiB": 4, - "PiB": 5, - "EiB": 6, - "ZiB": 7, - "YiB": 8, - } - - return value * 1024 ** exp[unit] - - # ---------------------------------------------------------------------------- # # Progressbar related functions # # ---------------------------------------------------------------------------- # @@ -108,10 +83,12 @@ def rclone_progress( process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=stderr, shell=True ) - for line in iter(process.stdout.readline, b""): - var = line.decode() - valid, update_dict = extract_rclone_progress(buffer) + # rclone prints stats to stderr. each line is one update + for line in iter(process.stderr.readline, b""): + line = line.decode() + + valid, update_dict = extract_rclone_progress(line) if valid: if show_progress: @@ -124,12 +101,6 @@ def rclone_progress( if debug: pbar.log(buffer) - # reset the buffer - buffer = "" - else: - # buffer until we - buffer += var - if show_progress: complete_task(total_progress_id, pbar) for _, task_id in subprocesses.items(): @@ -140,48 +111,40 @@ def rclone_progress( return process -def extract_rclone_progress(buffer: str) -> Tuple[bool, Union[Dict[str, Any], None]]: - # matcher that checks if the progress update block is completely buffered yet (defines start and stop) - # it gets the sent bits, total bits, progress, transfer-speed and eta - reg_transferred = re.findall( - r"Transferred:\s+(\d+.\d+ \w+) \/ (\d+.\d+ \w+), (\d{1,3})%, (\d+.\d+ \w+\/\w+), ETA (\S+)", - buffer, - ) +def extract_rclone_progress(line: str) -> Tuple[bool, Union[Dict[str, Any], None]]: + try: + stats: Dict = json.loads(line).get("stats", None) + except ValueError: + stats = None - if reg_transferred: # transferred block is completely buffered + # wait until we know the total file size --> bytes > 0 + if stats is not None and stats.get("bytes", 0) > 0: # get the progress of the individual files - # matcher gets the currently transferring files and their individual progress - # returns list of tuples: (name, progress, file_size, unit) - prog_transferring = [] - prog_regex = re.findall( - r"\* +(.+):[ ]+(\d{1,3})% \/(\d+.\d+)([a-zA-Z]+),", buffer - ) - for item in prog_regex: - prog_transferring.append( - ( - item[0], - int(item[1]), - float(item[2]), - # the suffix B of the unit is missing for subprocesses - item[3] + "B", - ) + tasks = [] + for t in stats.get("transferring", []): + tasks.append( + { + "name": t["name"], + "total": t["size"], + "sent": t["bytes"], + "progress": t["percentage"], + } ) - out = {"prog_transferring": prog_transferring} - sent_bits, total_bits, progress, transfer_speed_str, eta = reg_transferred[0] - out["progress"] = float(progress.strip()) - out["total_bits"] = float(re.findall(r"\d+.\d+", total_bits)[0]) - out["sent_bits"] = float(re.findall(r"\d+.\d+", sent_bits)[0]) - out["unit_sent"] = re.findall(r"[a-zA-Z]+", sent_bits)[0] - out["unit_total"] = re.findall(r"[a-zA-Z]+", total_bits)[0] - out["transfer_speed"] = float(re.findall(r"\d+.\d+", transfer_speed_str)[0]) - out["transfer_speed_unit"] = re.findall( - r"[a-zA-Z]+/[a-zA-Z]+", transfer_speed_str - )[0] - out["eta"] = eta + out = { + "tasks": tasks, + "total": stats["totalBytes"], + "sent": stats["bytes"], + "progress": ( + stats["bytes"] / stats["totalBytes"] + if stats["bytes"] is not None + else 0 + ), + "transfer_speed": stats["speed"], + "rclone_output": stats, + } return True, out - else: return False, None @@ -251,33 +214,38 @@ def update_tasks( pbar.update( total_progress, - completed=convert2bits(update_dict["sent_bits"], update_dict["unit_sent"]), - total=convert2bits(update_dict["total_bits"], update_dict["unit_total"]), + completed=update_dict["sent"], + total=update_dict["total"], ) - sp_names = set() - for sp_file_name, sp_progress, sp_size, sp_unit in update_dict["prog_transferring"]: + task_names = set() + for task in update_dict["tasks"]: task_id = None - sp_names.add(sp_file_name) - if sp_file_name not in subprocesses: + task_name = task["name"] + task_size = task["total"] + task_progress = task["progress"] + + task_names.add(task_name) + + if task_name not in subprocesses: task_id = pbar.add_task(" ", visible=False) - subprocesses[sp_file_name] = task_id + subprocesses[task_name] = task_id else: - task_id = subprocesses[sp_file_name] + task_id = subprocesses[task_name] pbar.update( task_id, # set the description every time to reset the '├' - description=f" ├─{sp_file_name}", - completed=convert2bits(sp_size, sp_unit) * sp_progress / 100.0, - total=convert2bits(sp_size, sp_unit), + description=f" ├─{task_name}", + completed=task_size * task_progress / 100.0, + total=task_size, # hide subprocesses if we only upload a single file visible=len(subprocesses) > 1, ) # make all processes invisible that are no longer provided by rclone (bc. their upload completed) - missing = list(sorted(subprocesses.keys() - sp_names)) + missing = list(sorted(subprocesses.keys() - task_names)) for missing_sp_id in missing: pbar.update(subprocesses[missing_sp_id], visible=False) From 8392241d2f8653853d7598f186fd369515782c46 Mon Sep 17 00:00:00 2001 From: Johannes Gundlach Date: Fri, 22 Nov 2024 20:02:01 +0100 Subject: [PATCH 2/4] Add comments --- rclone_python/utils.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/rclone_python/utils.py b/rclone_python/utils.py index 5b63252..d18673e 100644 --- a/rclone_python/utils.py +++ b/rclone_python/utils.py @@ -112,6 +112,17 @@ def rclone_progress( def extract_rclone_progress(line: str) -> Tuple[bool, Union[Dict[str, Any], None]]: + """Extracts and returns the progress updates from the rclone transfer operation. + The returned Dictionary includes the original rclone stats output inside of "rclone_output". + All file sizes and speeds are give in bytes. + + Args: + line (str): One output line of the rclone transfer operation with the --use-json-log flag enabled. + + Returns: + Tuple[bool, Union[Dict[str, Any], None]]: The retrieved update Dictionary. + """ + try: stats: Dict = json.loads(line).get("stats", None) except ValueError: @@ -246,8 +257,8 @@ def update_tasks( # make all processes invisible that are no longer provided by rclone (bc. their upload completed) missing = list(sorted(subprocesses.keys() - task_names)) - for missing_sp_id in missing: - pbar.update(subprocesses[missing_sp_id], visible=False) + for missing_task_id in missing: + pbar.update(subprocesses[missing_task_id], visible=False) # change symbol for the last visible process for task in reversed(pbar.tasks): From 70f33be523f76ee9ddbf5d335714975e2c31b252 Mon Sep 17 00:00:00 2001 From: Johannes Gundlach Date: Fri, 22 Nov 2024 20:25:37 +0100 Subject: [PATCH 3/4] Fix ZeroDivisionError --- rclone_python/utils.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/rclone_python/utils.py b/rclone_python/utils.py index d18673e..a5348b3 100644 --- a/rclone_python/utils.py +++ b/rclone_python/utils.py @@ -128,8 +128,7 @@ def extract_rclone_progress(line: str) -> Tuple[bool, Union[Dict[str, Any], None except ValueError: stats = None - # wait until we know the total file size --> bytes > 0 - if stats is not None and stats.get("bytes", 0) > 0: + if stats is not None: # get the progress of the individual files tasks = [] for t in stats.get("transferring", []): @@ -147,9 +146,7 @@ def extract_rclone_progress(line: str) -> Tuple[bool, Union[Dict[str, Any], None "total": stats["totalBytes"], "sent": stats["bytes"], "progress": ( - stats["bytes"] / stats["totalBytes"] - if stats["bytes"] is not None - else 0 + stats["bytes"] / stats["totalBytes"] if stats["totalBytes"] != 0 else 0 ), "transfer_speed": stats["speed"], "rclone_output": stats, From 218edd245059509f2447e74436ae2a4127f66b75 Mon Sep 17 00:00:00 2001 From: Johannes Gundlach Date: Fri, 22 Nov 2024 20:31:48 +0100 Subject: [PATCH 4/4] Update version --- rclone_python/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclone_python/__init__.py b/rclone_python/__init__.py index b13f7d1..c93eba1 100644 --- a/rclone_python/__init__.py +++ b/rclone_python/__init__.py @@ -1 +1 @@ -VERSION = "0.1.15" +VERSION = "0.1.16"