Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 117 additions & 63 deletions ci/githubstats/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from queue import Queue
from typing import List, Optional

import codeowners
Expand Down Expand Up @@ -457,56 +458,63 @@ def process_log(
group_name = None
summary = None
vm_ipv6s = {}
lines = download_to_path.read_text().strip().splitlines()
last_line = lines[-1] if len(lines) > 0 else None

# system-tests have structured logs with JSON objects that we can parse to get more detailed error summaries
# and to determine the group (testnet) name for downloading the IC logs from ElasticSearch.
# Non-system-tests just get annotated with the last line of the log which usually contains the error message.
if test_target.startswith("//rs/tests/"):
for line in lines:
try:
# Here we try parsing a timestamp from the first 23 characters of a line
# assuming the line looks something like: "2026-02-03 13:55:09.645 INFO..."
last_seen_timestamp = datetime.strptime(line[:TIMESTAMP_LEN], "%Y-%m-%d %H:%M:%S.%f")
except (ValueError, pd.errors.ParserError):
continue

ix = line[TIMESTAMP_LEN:].find("{")
if ix == -1:
continue
obj = line[TIMESTAMP_LEN + ix :]
with open(download_to_path, "r", encoding="utf-8") as f:
for line in f:
if len(line) < TIMESTAMP_LEN:
continue
try:
# Here we try parsing a timestamp from the first 23 characters of a line
# assuming the line looks something like: "2026-02-03 13:55:09.645 INFO..."
last_seen_timestamp = datetime.strptime(line[:TIMESTAMP_LEN], "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
continue

try:
log_event = LogEvent.from_json(obj)
match log_event.event_name:
case "infra_group_name_created_event":
group_name = GroupName.from_dict(log_event.body).group
test_start_time = last_seen_timestamp
case "farm_vm_created_event":
farm_vm_created = FarmVMCreated.from_dict(log_event.body)
vm_ipv6s[farm_vm_created.vm_name] = farm_vm_created.ipv6
case "json_report_created_event":
summary = SystemGroupSummary.from_dict(log_event.body)
break
except (ValueError, dacite.DaciteError):
continue
ix = line.find("{", TIMESTAMP_LEN)
if ix == -1:
continue
obj = line[ix:]

try:
log_event = LogEvent.from_json(obj)
match log_event.event_name:
case "infra_group_name_created_event":
group_name = GroupName.from_dict(log_event.body).group
test_start_time = last_seen_timestamp
case "farm_vm_created_event":
farm_vm_created = FarmVMCreated.from_dict(log_event.body)
vm_ipv6s[farm_vm_created.vm_name] = farm_vm_created.ipv6
case "json_report_created_event":
summary = SystemGroupSummary.from_dict(log_event.body)
break
except (ValueError, dacite.DaciteError):
continue

if group_name is not None and download_ic_logs:
# If it's a system-test, we want to download the IC logs from ElasticSearch to get more context on the failure.
# We fork a thread for downloading the IC logs to speed up the whole process instead of doing it sequentially after downloading all test logs.
download_ic_log_executor.submit(
download_ic_logs_for_system_test,
download_and_process_ic_logs_for_system_test,
attempt_dir,
group_name,
test_start_time,
last_seen_timestamp,
vm_ipv6s,
)
else:
# Efficiently get the last line of the log:
parts = download_to_path.read_text().rstrip().rsplit(sep="\n", maxsplit=1)
line = (parts[0] if len(parts) == 1 else parts[1]).lstrip()
if line == "":
line = None

with row["lock"]:
row["error_summaries"][attempt_num] = (
summary if summary is not None else last_line if attempt_status == FAILED else None
summary if summary is not None else line if attempt_status == FAILED else None
)


Expand Down Expand Up @@ -622,15 +630,24 @@ def shorten(msg: str, max_length: int) -> str:
return msg


def download_ic_logs_for_system_test(
def download_and_process_ic_logs_for_system_test(
attempt_dir: Path,
group_name: str,
test_start_time: datetime,
test_end_time: datetime,
vm_ipv6s: dict[str, str],
):
ic_logs_dir = attempt_dir / "ic_logs"
ic_logs_dir.mkdir(exist_ok=True)
# Create a queue for passing hits from download thread to processing thread
hits_queue = Queue(maxsize=100) # Limit memory usage with bounded queue

# Start processing thread
processing_thread = threading.Thread(
target=process_elasticsearch_hits_from_queue, args=(attempt_dir, hits_queue, vm_ipv6s)
)
processing_thread.start()

gte = test_start_time.isoformat()
lte = test_end_time.isoformat()

elasticsearch_query = {
"size": 10000,
Expand All @@ -641,8 +658,8 @@ def download_ic_logs_for_system_test(
{
"range": {
"timestamp": {
"gte": test_start_time.isoformat(),
"lte": test_end_time.isoformat(),
"gte": gte,
"lte": lte,
}
}
},
Expand All @@ -655,56 +672,93 @@ def download_ic_logs_for_system_test(
}

try:
# Download logs from ElasticSearch (Producer):
url = "https://elasticsearch.testnet.dfinity.network/testnet-vector-push-*/_search"
params = {"filter_path": "hits.hits"}
all_hits = []
while True:
response = requests.post(url, params=params, json=elasticsearch_query, timeout=60)

if not response.ok:
try:
while True:
print(
f"Failed to download IC logs for {group_name}: {response.status_code} {response.text}",
f"Downloading IC logs for attempt {attempt_dir.name} for testnet {group_name} from ElasticSearch between {gte} - {lte} with search_after={elasticsearch_query.get('search_after', None)} ...",
file=sys.stderr,
)
return
response = requests.post(url, params=params, json=elasticsearch_query, timeout=60)

hits = response.json().get("hits", {}).get("hits", [])
all_hits.extend(hits)
if not response.ok:
print(
f"Failed to download IC logs for {group_name}: {response.status_code} {response.text}",
file=sys.stderr,
)
return

if len(hits) < elasticsearch_query["size"]:
break
hits = response.json().get("hits", {}).get("hits", [])

last_hit = hits[-1]
elasticsearch_query["search_after"] = last_hit["sort"]
# Push hits to queue for concurrent processing
hits_queue.put(hits)

logs_by_node = {}
for hit in all_hits:
if len(hits) < elasticsearch_query["size"]:
break

last_hit = hits[-1]
elasticsearch_query["search_after"] = last_hit["sort"]

finally:
# Always send sentinel to signal completion, even if download fails
hits_queue.put(None)
# Wait for processing thread to finish
processing_thread.join()

except requests.exceptions.RequestException as e:
print(f"Error downloading IC logs for {group_name}: {e}", file=sys.stderr)
except json.JSONDecodeError as e:
print(f"Error parsing JSON response for {group_name}: {e}", file=sys.stderr)


def process_elasticsearch_hits_from_queue(
attempt_dir: Path,
hits_queue: Queue,
vm_ipv6s: dict[str, str],
):
"""Consumer thread: Process ElasticSearch hits from queue and write IC node log files."""
logs_by_node = {}

ic_logs_dir = attempt_dir / "ic_logs"
ic_logs_dir.mkdir(exist_ok=True)

while True:
hits = hits_queue.get()
if hits is None:
break
for hit in hits:
# Sentinel value signals end of download
if hit is None:
break

# Process the hit
if "_source" not in hit:
continue
source = hit["_source"]
if "ic_node" not in source or "timestamp" not in source or "MESSAGE" not in source:
continue

node = source["ic_node"]
try:
timestamp = datetime.strptime(source["timestamp"], "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
continue
logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"]))

for node, messages in logs_by_node.items():
log_file = ic_logs_dir / f"{node}.log"
if node in vm_ipv6s:
ipv6_symlink_path = ic_logs_dir / f"{vm_ipv6s[node]}.log"
ipv6_symlink_path.symlink_to(log_file.name)
log_file.write_text(
"\n".join([f"{timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')} {msg}" for timestamp, msg in messages])
)
print(f"Downloaded {len(messages)} log entries for node {node} to {log_file}", file=sys.stderr)
logs_by_node.setdefault(node, []).append((timestamp, source["MESSAGE"]))

except requests.exceptions.RequestException as e:
print(f"Error downloading IC logs for {group_name}: {e}", file=sys.stderr)
except json.JSONDecodeError as e:
print(f"Error parsing JSON response for {group_name}: {e}", file=sys.stderr)
# Write all log files after processing all hits
for node, messages in logs_by_node.items():
log_file = ic_logs_dir / f"{node}.log"
if node in vm_ipv6s:
ipv6_symlink_path = ic_logs_dir / f"{vm_ipv6s[node]}.log"
ipv6_symlink_path.symlink_to(log_file.name)
log_file.write_text(
"\n".join([f"{timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')} {msg}" for timestamp, msg in messages])
)
print(f"Downloaded {len(messages)} log entries for node {node} to {log_file}", file=sys.stderr)


def write_log_dir_readme(readme_path: Path, test_target: str, df: pd.DataFrame, timestamp: datetime.timestamp):
Expand Down
Loading