From 7f28c771412c26cbb44da867f9325f9acde99547 Mon Sep 17 00:00:00 2001 From: Bas van Dijk Date: Thu, 12 Feb 2026 12:27:45 +0000 Subject: [PATCH 1/4] chore: //ci/githubstats:query more perf improvements --- ci/githubstats/query.py | 65 +++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/ci/githubstats/query.py b/ci/githubstats/query.py index 20e2c62895d4..3696adfb3858 100755 --- a/ci/githubstats/query.py +++ b/ci/githubstats/query.py @@ -457,40 +457,41 @@ def process_log( group_name = None summary = None vm_ipv6s = {} - lines = download_to_path.read_text().strip().splitlines() - last_line = lines[-1] if len(lines) > 0 else None # system-tests have structured logs with JSON objects that we can parse to get more detailed error summaries # and to determine the group (testnet) name for downloading the IC logs from ElasticSearch. # Non-system-tests just get annotated with the last line of the log which usually contains the error message. if test_target.startswith("//rs/tests/"): - for line in lines: - try: - # Here we try parsing a timestamp from the first 23 characters of a line - # assuming the line looks something like: "2026-02-03 13:55:09.645 INFO..." - last_seen_timestamp = datetime.strptime(line[:TIMESTAMP_LEN], "%Y-%m-%d %H:%M:%S.%f") - except (ValueError, pd.errors.ParserError): - continue - - ix = line[TIMESTAMP_LEN:].find("{") - if ix == -1: - continue - obj = line[TIMESTAMP_LEN + ix :] + with open(download_to_path, 'r', encoding='utf-8') as f: + for line in f: + if len(line) < TIMESTAMP_LEN: + continue + try: + # Here we try parsing a timestamp from the first 23 characters of a line + # assuming the line looks something like: "2026-02-03 13:55:09.645 INFO..." + last_seen_timestamp = datetime.strptime(line[:TIMESTAMP_LEN], "%Y-%m-%d %H:%M:%S.%f") + except ValueError: + continue - try: - log_event = LogEvent.from_json(obj) - match log_event.event_name: - case "infra_group_name_created_event": - group_name = GroupName.from_dict(log_event.body).group - test_start_time = last_seen_timestamp - case "farm_vm_created_event": - farm_vm_created = FarmVMCreated.from_dict(log_event.body) - vm_ipv6s[farm_vm_created.vm_name] = farm_vm_created.ipv6 - case "json_report_created_event": - summary = SystemGroupSummary.from_dict(log_event.body) - break - except (ValueError, dacite.DaciteError): - continue + ix = line.find("{", TIMESTAMP_LEN) + if ix == -1: + continue + obj = line[ix:] + + try: + log_event = LogEvent.from_json(obj) + match log_event.event_name: + case "infra_group_name_created_event": + group_name = GroupName.from_dict(log_event.body).group + test_start_time = last_seen_timestamp + case "farm_vm_created_event": + farm_vm_created = FarmVMCreated.from_dict(log_event.body) + vm_ipv6s[farm_vm_created.vm_name] = farm_vm_created.ipv6 + case "json_report_created_event": + summary = SystemGroupSummary.from_dict(log_event.body) + break + except (ValueError, dacite.DaciteError): + continue if group_name is not None and download_ic_logs: # If it's a system-test, we want to download the IC logs from ElasticSearch to get more context on the failure. @@ -503,10 +504,16 @@ def process_log( last_seen_timestamp, vm_ipv6s, ) + else: + # Efficiently get the last line of the log: + parts = download_to_path.read_text().rstrip().rsplit(sep="\n", maxsplit=1) + line = (parts[0] if len(parts) == 1 else parts[1]).lstrip() + if line == "": + line = None with row["lock"]: row["error_summaries"][attempt_num] = ( - summary if summary is not None else last_line if attempt_status == FAILED else None + summary if summary is not None else line if attempt_status == FAILED else None ) From cec3de240c476eaca2767a3f65ec2394f6a818d2 Mon Sep 17 00:00:00 2001 From: Bas van Dijk Date: Thu, 12 Feb 2026 14:42:06 +0000 Subject: [PATCH 2/4] logging --- ci/githubstats/query.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ci/githubstats/query.py b/ci/githubstats/query.py index 3696adfb3858..224a185fa773 100755 --- a/ci/githubstats/query.py +++ b/ci/githubstats/query.py @@ -639,6 +639,9 @@ def download_ic_logs_for_system_test( ic_logs_dir = attempt_dir / "ic_logs" ic_logs_dir.mkdir(exist_ok=True) + gte = test_start_time.isoformat() + lte = test_end_time.isoformat() + elasticsearch_query = { "size": 10000, "query": { @@ -648,8 +651,8 @@ def download_ic_logs_for_system_test( { "range": { "timestamp": { - "gte": test_start_time.isoformat(), - "lte": test_end_time.isoformat(), + "gte": gte, + "lte": lte, } } }, @@ -666,6 +669,10 @@ def download_ic_logs_for_system_test( params = {"filter_path": "hits.hits"} all_hits = [] while True: + print( + f"Downloading IC logs for attempt {attempt_dir.name} for testnet {group_name} from ElasticSearch between {gte} - {lte} with search_after={elasticsearch_query.get('search_after', None)} ...", + file=sys.stderr, + ) response = requests.post(url, params=params, json=elasticsearch_query, timeout=60) if not response.ok: From c9932be19999c1473cbf8dba2a3dea9fb425235a Mon Sep 17 00:00:00 2001 From: Bas van Dijk Date: Thu, 12 Feb 2026 15:05:36 +0000 Subject: [PATCH 3/4] Concurrent downloading and processsing of IC logs --- ci/githubstats/query.py | 105 ++++++++++++++++++++++++++-------------- 1 file changed, 68 insertions(+), 37 deletions(-) diff --git a/ci/githubstats/query.py b/ci/githubstats/query.py index 224a185fa773..f2e99a5307eb 100755 --- a/ci/githubstats/query.py +++ b/ci/githubstats/query.py @@ -18,6 +18,7 @@ from dataclasses import dataclass from datetime import datetime from pathlib import Path +from queue import Queue from typing import List, Optional import codeowners @@ -462,7 +463,7 @@ def process_log( # and to determine the group (testnet) name for downloading the IC logs from ElasticSearch. # Non-system-tests just get annotated with the last line of the log which usually contains the error message. if test_target.startswith("//rs/tests/"): - with open(download_to_path, 'r', encoding='utf-8') as f: + with open(download_to_path, "r", encoding="utf-8") as f: for line in f: if len(line) < TIMESTAMP_LEN: continue @@ -664,47 +665,36 @@ def download_ic_logs_for_system_test( "sort": [{"timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}}, {"_doc": "asc"}], } - try: - url = "https://elasticsearch.testnet.dfinity.network/testnet-vector-push-*/_search" - params = {"filter_path": "hits.hits"} - all_hits = [] - while True: - print( - f"Downloading IC logs for attempt {attempt_dir.name} for testnet {group_name} from ElasticSearch between {gte} - {lte} with search_after={elasticsearch_query.get('search_after', None)} ...", - file=sys.stderr, - ) - response = requests.post(url, params=params, json=elasticsearch_query, timeout=60) - - if not response.ok: - print( - f"Failed to download IC logs for {group_name}: {response.status_code} {response.text}", - file=sys.stderr, - ) - return + # Create a queue for passing hits from download thread to processing thread + hits_queue = Queue(maxsize=1000) # Limit memory usage with bounded queue - hits = response.json().get("hits", {}).get("hits", []) - all_hits.extend(hits) + def process_hits_from_queue(): + """Consumer thread: Process hits from queue and write log files.""" + logs_by_node = {} - if len(hits) < elasticsearch_query["size"]: - break + while True: + hits = hits_queue.get() + for hit in hits: + # Sentinel value signals end of download + if hit is None: + break + + # Process the hit + if "_source" not in hit: + continue + source = hit["_source"] + if "ic_node" not in source or "timestamp" not in source or "MESSAGE" not in source: + continue - last_hit = hits[-1] - elasticsearch_query["search_after"] = last_hit["sort"] + node = source["ic_node"] + try: + timestamp = datetime.strptime(source["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + continue - logs_by_node = {} - for hit in all_hits: - if "_source" not in hit: - continue - source = hit["_source"] - if "ic_node" not in source or "timestamp" not in source or "MESSAGE" not in source: - continue - node = source["ic_node"] - try: - timestamp = datetime.strptime(source["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - continue - logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"])) + logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"])) + # Write all log files after processing all hits for node, messages in logs_by_node.items(): log_file = ic_logs_dir / f"{node}.log" if node in vm_ipv6s: @@ -715,6 +705,47 @@ def download_ic_logs_for_system_test( ) print(f"Downloaded {len(messages)} log entries for node {node} to {log_file}", file=sys.stderr) + # Start processing thread + processing_thread = threading.Thread(target=process_hits_from_queue) + processing_thread.start() + + try: + # Download logs from ElasticSearch (Producer): + url = "https://elasticsearch.testnet.dfinity.network/testnet-vector-push-*/_search" + params = {"filter_path": "hits.hits"} + + try: + while True: + print( + f"Downloading IC logs for attempt {attempt_dir.name} for testnet {group_name} from ElasticSearch between {gte} - {lte} with search_after={elasticsearch_query.get('search_after', None)} ...", + file=sys.stderr, + ) + response = requests.post(url, params=params, json=elasticsearch_query, timeout=60) + + if not response.ok: + print( + f"Failed to download IC logs for {group_name}: {response.status_code} {response.text}", + file=sys.stderr, + ) + return + + hits = response.json().get("hits", {}).get("hits", []) + + # Push hits to queue for concurrent processing + hits_queue.put(hits) + + if len(hits) < elasticsearch_query["size"]: + break + + last_hit = hits[-1] + elasticsearch_query["search_after"] = last_hit["sort"] + + finally: + # Always send sentinel to signal completion, even if download fails + hits_queue.put(None) + # Wait for processing thread to finish + processing_thread.join() + except requests.exceptions.RequestException as e: print(f"Error downloading IC logs for {group_name}: {e}", file=sys.stderr) except json.JSONDecodeError as e: From 763c2f99b2edf83d146e951cff64d635e5f1c3ed Mon Sep 17 00:00:00 2001 From: Bas van Dijk Date: Thu, 12 Feb 2026 16:19:07 +0000 Subject: [PATCH 4/4] refactor --- ci/githubstats/query.py | 105 ++++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 48 deletions(-) diff --git a/ci/githubstats/query.py b/ci/githubstats/query.py index f2e99a5307eb..1faaf306547d 100755 --- a/ci/githubstats/query.py +++ b/ci/githubstats/query.py @@ -498,7 +498,7 @@ def process_log( # If it's a system-test, we want to download the IC logs from ElasticSearch to get more context on the failure. # We fork a thread for downloading the IC logs to speed up the whole process instead of doing it sequentially after downloading all test logs. download_ic_log_executor.submit( - download_ic_logs_for_system_test, + download_and_process_ic_logs_for_system_test, attempt_dir, group_name, test_start_time, @@ -630,15 +630,21 @@ def shorten(msg: str, max_length: int) -> str: return msg -def download_ic_logs_for_system_test( +def download_and_process_ic_logs_for_system_test( attempt_dir: Path, group_name: str, test_start_time: datetime, test_end_time: datetime, vm_ipv6s: dict[str, str], ): - ic_logs_dir = attempt_dir / "ic_logs" - ic_logs_dir.mkdir(exist_ok=True) + # Create a queue for passing hits from download thread to processing thread + hits_queue = Queue(maxsize=100) # Limit memory usage with bounded queue + + # Start processing thread + processing_thread = threading.Thread( + target=process_elasticsearch_hits_from_queue, args=(attempt_dir, hits_queue, vm_ipv6s) + ) + processing_thread.start() gte = test_start_time.isoformat() lte = test_end_time.isoformat() @@ -665,50 +671,6 @@ def download_ic_logs_for_system_test( "sort": [{"timestamp": {"order": "asc", "format": "strict_date_optional_time_nanos"}}, {"_doc": "asc"}], } - # Create a queue for passing hits from download thread to processing thread - hits_queue = Queue(maxsize=1000) # Limit memory usage with bounded queue - - def process_hits_from_queue(): - """Consumer thread: Process hits from queue and write log files.""" - logs_by_node = {} - - while True: - hits = hits_queue.get() - for hit in hits: - # Sentinel value signals end of download - if hit is None: - break - - # Process the hit - if "_source" not in hit: - continue - source = hit["_source"] - if "ic_node" not in source or "timestamp" not in source or "MESSAGE" not in source: - continue - - node = source["ic_node"] - try: - timestamp = datetime.strptime(source["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") - except ValueError: - continue - - logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"])) - - # Write all log files after processing all hits - for node, messages in logs_by_node.items(): - log_file = ic_logs_dir / f"{node}.log" - if node in vm_ipv6s: - ipv6_symlink_path = ic_logs_dir / f"{vm_ipv6s[node]}.log" - ipv6_symlink_path.symlink_to(log_file.name) - log_file.write_text( - "\n".join([f"{timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')} {msg}" for timestamp, msg in messages]) - ) - print(f"Downloaded {len(messages)} log entries for node {node} to {log_file}", file=sys.stderr) - - # Start processing thread - processing_thread = threading.Thread(target=process_hits_from_queue) - processing_thread.start() - try: # Download logs from ElasticSearch (Producer): url = "https://elasticsearch.testnet.dfinity.network/testnet-vector-push-*/_search" @@ -752,6 +714,53 @@ def process_hits_from_queue(): print(f"Error parsing JSON response for {group_name}: {e}", file=sys.stderr) +def process_elasticsearch_hits_from_queue( + attempt_dir: Path, + hits_queue: Queue, + vm_ipv6s: dict[str, str], +): + """Consumer thread: Process ElasticSearch hits from queue and write IC node log files.""" + logs_by_node = {} + + ic_logs_dir = attempt_dir / "ic_logs" + ic_logs_dir.mkdir(exist_ok=True) + + while True: + hits = hits_queue.get() + if hits is None: + break + for hit in hits: + # Sentinel value signals end of download + if hit is None: + break + + # Process the hit + if "_source" not in hit: + continue + source = hit["_source"] + if "ic_node" not in source or "timestamp" not in source or "MESSAGE" not in source: + continue + + node = source["ic_node"] + try: + timestamp = datetime.strptime(source["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") + except ValueError: + continue + + logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"])) + + # Write all log files after processing all hits + for node, messages in logs_by_node.items(): + log_file = ic_logs_dir / f"{node}.log" + if node in vm_ipv6s: + ipv6_symlink_path = ic_logs_dir / f"{vm_ipv6s[node]}.log" + ipv6_symlink_path.symlink_to(log_file.name) + log_file.write_text( + "\n".join([f"{timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')} {msg}" for timestamp, msg in messages]) + ) + print(f"Downloaded {len(messages)} log entries for node {node} to {log_file}", file=sys.stderr) + + def write_log_dir_readme(readme_path: Path, test_target: str, df: pd.DataFrame, timestamp: datetime.timestamp): """ Write a nice README.md in the log output directory describing the //ci/githubstats:query invocation