diff --git a/ci/githubstats/query.py b/ci/githubstats/query.py index 20e2c62895d4..1faaf306547d 100755 --- a/ci/githubstats/query.py +++ b/ci/githubstats/query.py @@ -18,6 +18,7 @@ from dataclasses import dataclass from datetime import datetime from pathlib import Path +from queue import Queue from typing import List, Optional import codeowners @@ -457,56 +458,63 @@ def process_log( group_name = None summary = None vm_ipv6s = {} - lines = download_to_path.read_text().strip().splitlines() - last_line = lines[-1] if len(lines) > 0 else None # system-tests have structured logs with JSON objects that we can parse to get more detailed error summaries # and to determine the group (testnet) name for downloading the IC logs from ElasticSearch. # Non-system-tests just get annotated with the last line of the log which usually contains the error message. if test_target.startswith("//rs/tests/"): - for line in lines: - try: - # Here we try parsing a timestamp from the first 23 characters of a line - # assuming the line looks something like: "2026-02-03 13:55:09.645 INFO..." - last_seen_timestamp = datetime.strptime(line[:TIMESTAMP_LEN], "%Y-%m-%d %H:%M:%S.%f") - except (ValueError, pd.errors.ParserError): - continue - - ix = line[TIMESTAMP_LEN:].find("{") - if ix == -1: - continue - obj = line[TIMESTAMP_LEN + ix :] + with open(download_to_path, "r", encoding="utf-8") as f: + for line in f: + if len(line) < TIMESTAMP_LEN: + continue + try: + # Here we try parsing a timestamp from the first 23 characters of a line + # assuming the line looks something like: "2026-02-03 13:55:09.645 INFO..." + last_seen_timestamp = datetime.strptime(line[:TIMESTAMP_LEN], "%Y-%m-%d %H:%M:%S.%f") + except ValueError: + continue - try: - log_event = LogEvent.from_json(obj) - match log_event.event_name: - case "infra_group_name_created_event": - group_name = GroupName.from_dict(log_event.body).group - test_start_time = last_seen_timestamp - case "farm_vm_created_event": - farm_vm_created = FarmVMCreated.from_dict(log_event.body) - vm_ipv6s[farm_vm_created.vm_name] = farm_vm_created.ipv6 - case "json_report_created_event": - summary = SystemGroupSummary.from_dict(log_event.body) - break - except (ValueError, dacite.DaciteError): - continue + ix = line.find("{", TIMESTAMP_LEN) + if ix == -1: + continue + obj = line[ix:] + + try: + log_event = LogEvent.from_json(obj) + match log_event.event_name: + case "infra_group_name_created_event": + group_name = GroupName.from_dict(log_event.body).group + test_start_time = last_seen_timestamp + case "farm_vm_created_event": + farm_vm_created = FarmVMCreated.from_dict(log_event.body) + vm_ipv6s[farm_vm_created.vm_name] = farm_vm_created.ipv6 + case "json_report_created_event": + summary = SystemGroupSummary.from_dict(log_event.body) + break + except (ValueError, dacite.DaciteError): + continue if group_name is not None and download_ic_logs: # If it's a system-test, we want to download the IC logs from ElasticSearch to get more context on the failure. # We fork a thread for downloading the IC logs to speed up the whole process instead of doing it sequentially after downloading all test logs. download_ic_log_executor.submit( - download_ic_logs_for_system_test, + download_and_process_ic_logs_for_system_test, attempt_dir, group_name, test_start_time, last_seen_timestamp, vm_ipv6s, ) + else: + # Efficiently get the last line of the log: + parts = download_to_path.read_text().rstrip().rsplit(sep="\n", maxsplit=1) + line = (parts[0] if len(parts) == 1 else parts[1]).lstrip() + if line == "": + line = None with row["lock"]: row["error_summaries"][attempt_num] = ( - summary if summary is not None else last_line if attempt_status == FAILED else None + summary if summary is not None else line if attempt_status == FAILED else None ) @@ -622,15 +630,24 @@ def shorten(msg: str, max_length: int) -> str: return msg -def download_ic_logs_for_system_test( +def download_and_process_ic_logs_for_system_test( attempt_dir: Path, group_name: str, test_start_time: datetime, test_end_time: datetime, vm_ipv6s: dict[str, str], ): - ic_logs_dir = attempt_dir / "ic_logs" - ic_logs_dir.mkdir(exist_ok=True) + # Create a queue for passing hits from download thread to processing thread + hits_queue = Queue(maxsize=100) # Limit memory usage with bounded queue + + # Start processing thread + processing_thread = threading.Thread( + target=process_elasticsearch_hits_from_queue, args=(attempt_dir, hits_queue, vm_ipv6s) + ) + processing_thread.start() + + gte = test_start_time.isoformat() + lte = test_end_time.isoformat() elasticsearch_query = { "size": 10000, @@ -641,8 +658,8 @@ def download_ic_logs_for_system_test( { "range": { "timestamp": { - "gte": test_start_time.isoformat(), - "lte": test_end_time.isoformat(), + "gte": gte, + "lte": lte, } } }, @@ -655,56 +672,93 @@ def download_ic_logs_for_system_test( } try: + # Download logs from ElasticSearch (Producer): url = "https://elasticsearch.testnet.dfinity.network/testnet-vector-push-*/_search" params = {"filter_path": "hits.hits"} - all_hits = [] - while True: - response = requests.post(url, params=params, json=elasticsearch_query, timeout=60) - if not response.ok: + try: + while True: print( - f"Failed to download IC logs for {group_name}: {response.status_code} {response.text}", + f"Downloading IC logs for attempt {attempt_dir.name} for testnet {group_name} from ElasticSearch between {gte} - {lte} with search_after={elasticsearch_query.get('search_after', None)} ...", file=sys.stderr, ) - return + response = requests.post(url, params=params, json=elasticsearch_query, timeout=60) - hits = response.json().get("hits", {}).get("hits", []) - all_hits.extend(hits) + if not response.ok: + print( + f"Failed to download IC logs for {group_name}: {response.status_code} {response.text}", + file=sys.stderr, + ) + return - if len(hits) < elasticsearch_query["size"]: - break + hits = response.json().get("hits", {}).get("hits", []) - last_hit = hits[-1] - elasticsearch_query["search_after"] = last_hit["sort"] + # Push hits to queue for concurrent processing + hits_queue.put(hits) - logs_by_node = {} - for hit in all_hits: + if len(hits) < elasticsearch_query["size"]: + break + + last_hit = hits[-1] + elasticsearch_query["search_after"] = last_hit["sort"] + + finally: + # Always send sentinel to signal completion, even if download fails + hits_queue.put(None) + # Wait for processing thread to finish + processing_thread.join() + + except requests.exceptions.RequestException as e: + print(f"Error downloading IC logs for {group_name}: {e}", file=sys.stderr) + except json.JSONDecodeError as e: + print(f"Error parsing JSON response for {group_name}: {e}", file=sys.stderr) + + +def process_elasticsearch_hits_from_queue( + attempt_dir: Path, + hits_queue: Queue, + vm_ipv6s: dict[str, str], +): + """Consumer thread: Process ElasticSearch hits from queue and write IC node log files.""" + logs_by_node = {} + + ic_logs_dir = attempt_dir / "ic_logs" + ic_logs_dir.mkdir(exist_ok=True) + + while True: + hits = hits_queue.get() + if hits is None: + break + for hit in hits: + # Sentinel value signals end of download + if hit is None: + break + + # Process the hit if "_source" not in hit: continue source = hit["_source"] if "ic_node" not in source or "timestamp" not in source or "MESSAGE" not in source: continue + node = source["ic_node"] try: timestamp = datetime.strptime(source["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ") except ValueError: continue - logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"])) - for node, messages in logs_by_node.items(): - log_file = ic_logs_dir / f"{node}.log" - if node in vm_ipv6s: - ipv6_symlink_path = ic_logs_dir / f"{vm_ipv6s[node]}.log" - ipv6_symlink_path.symlink_to(log_file.name) - log_file.write_text( - "\n".join([f"{timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')} {msg}" for timestamp, msg in messages]) - ) - print(f"Downloaded {len(messages)} log entries for node {node} to {log_file}", file=sys.stderr) + logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"])) - except requests.exceptions.RequestException as e: - print(f"Error downloading IC logs for {group_name}: {e}", file=sys.stderr) - except json.JSONDecodeError as e: - print(f"Error parsing JSON response for {group_name}: {e}", file=sys.stderr) + # Write all log files after processing all hits + for node, messages in logs_by_node.items(): + log_file = ic_logs_dir / f"{node}.log" + if node in vm_ipv6s: + ipv6_symlink_path = ic_logs_dir / f"{vm_ipv6s[node]}.log" + ipv6_symlink_path.symlink_to(log_file.name) + log_file.write_text( + "\n".join([f"{timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')} {msg}" for timestamp, msg in messages]) + ) + print(f"Downloaded {len(messages)} log entries for node {node} to {log_file}", file=sys.stderr) def write_log_dir_readme(readme_path: Path, test_target: str, df: pd.DataFrame, timestamp: datetime.timestamp):