From 2c282cd60a53436581586c148967bb321844f783 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 24 Oct 2025 11:47:02 -0700 Subject: [PATCH 1/7] use is_disjoing --- apps/worker/services/processing/merging.py | 9 +++++++-- apps/worker/services/report/raw_upload_processor.py | 2 ++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index bc5b63d4d2..d958289f9c 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -77,10 +77,15 @@ def merge_reports( joined = get_joined_flag(commit_yaml, session.flags or []) log.info( - "merge_reports: Merging report", + "merge_reports: Merging report with disjoint optimization", extra={"joined": joined}, ) - master_report.merge(report, joined) + # Use is_disjoint=True to defer the expensive merge operation until finish_merge() + master_report.merge(report, joined, is_disjoint=True) + + # Now perform the actual merge of all disjoint coverage records in one pass + log.info("merge_reports: Finishing merge of disjoint records") + master_report.finish_merge() log.info("merge_reports: Returning merge result") return master_report, MergeResult(session_mapping, deleted_sessions) diff --git a/apps/worker/services/report/raw_upload_processor.py b/apps/worker/services/report/raw_upload_processor.py index a1ee23d12b..40f8898004 100644 --- a/apps/worker/services/report/raw_upload_processor.py +++ b/apps/worker/services/report/raw_upload_processor.py @@ -84,6 +84,8 @@ def process_raw_upload( if len(report_from_file._files) > len(report._files): report_from_file, report = report, report_from_file + # Cannot use is_disjoint=True here because multiple coverage files + # in one upload can report on the same source files with the same session ID report.merge(report_from_file) if not report: From 7cd8e4b6afb25717f7200a419717ca4c5424e01b Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 24 Oct 2025 11:57:46 -0700 Subject: [PATCH 2/7] increase blocking_timeout --- apps/worker/tasks/upload_finisher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index c128f99f3d..a1ef70b9c7 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -556,7 +556,7 @@ def get_report_lock(repoid: int, commitid: str, hard_time_limit: int) -> Lock: return redis_connection.lock( lock_name, timeout=timeout, - blocking_timeout=5, + blocking_timeout=30, ) From 45b8ffb2f4e1641709f505b796f5ed5f1cd79698 Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 27 Oct 2025 11:20:28 -0700 Subject: [PATCH 3/7] deduplicate sessions --- libs/shared/shared/reports/resources.py | 8 ++++++- libs/shared/shared/utils/merge.py | 31 +++++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/libs/shared/shared/reports/resources.py b/libs/shared/shared/reports/resources.py index f85b9f120e..43ba7ef5eb 100644 --- a/libs/shared/shared/reports/resources.py +++ b/libs/shared/shared/reports/resources.py @@ -13,7 +13,11 @@ from shared.reports.types import ReportLine, ReportTotals from shared.utils.flare import report_to_flare from shared.utils.make_network_file import make_network_file -from shared.utils.merge import get_complexity_from_sessions, get_coverage_from_sessions +from shared.utils.merge import ( + deduplicate_sessions, + get_complexity_from_sessions, + get_coverage_from_sessions, +) from shared.utils.migrate import migrate_totals from shared.utils.sessions import Session, SessionType from shared.utils.totals import agg_totals @@ -302,6 +306,8 @@ def finish_merge(self): continue for line in file._parsed_lines: if isinstance(line, ReportLine) and line.coverage is None: + # Deduplicate and merge any duplicate session IDs before calculating coverage + line.sessions = deduplicate_sessions(line.sessions) line.coverage = get_coverage_from_sessions(line.sessions) line.complexity = get_complexity_from_sessions(line.sessions) diff --git a/libs/shared/shared/utils/merge.py b/libs/shared/shared/utils/merge.py index c2ba96375e..3ec234b061 100644 --- a/libs/shared/shared/utils/merge.py +++ b/libs/shared/shared/utils/merge.py @@ -158,7 +158,9 @@ def merge_missed_branches(sessions: list[LineSession]) -> list | None: return list(missed_branches) if missed_branches is not None else None -def merge_line(l1, l2, joined=True, is_disjoint=False): +def merge_line( + l1: ReportLine, l2: ReportLine, joined: bool = True, is_disjoint: bool = False +) -> ReportLine: if not l1 or not l2: return l1 or l2 @@ -178,7 +180,7 @@ def merge_line(l1, l2, joined=True, is_disjoint=False): ) -def merge_line_session(s1, s2): +def merge_line_session(s1: LineSession, s2: LineSession) -> LineSession: s1b = s1.branches s2b = s2.branches if s1b is None and s2b is None: @@ -323,6 +325,31 @@ def partials_to_line(partials): return f"{v}/{ln}" +def deduplicate_sessions(sessions: list[LineSession]) -> list[LineSession]: + """ + Deduplicates sessions by ID, merging any duplicates using merge_line_session. + This handles cases where the same session ID appears multiple times due to + incorrect usage of is_disjoint=True optimization. + + Returns a list of sessions with unique IDs. + """ + if not sessions: + return sessions + + # Group sessions by ID + sessions_by_id: dict[int, LineSession] = {} + for session in sessions: + if session.id in sessions_by_id: + # Duplicate detected - merge them + sessions_by_id[session.id] = merge_line_session( + sessions_by_id[session.id], session + ) + else: + sessions_by_id[session.id] = session + + return list(sessions_by_id.values()) + + def get_complexity_from_sessions(sessions): _type = type(sessions[0].complexity) if _type is int: From dcd293021c4e2a2d0a13baa2b7b95c9a5c64bb0f Mon Sep 17 00:00:00 2001 From: Max Date: Mon, 27 Oct 2025 11:59:27 -0700 Subject: [PATCH 4/7] tests --- libs/shared/tests/unit/utils/test_merge.py | 86 ++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/libs/shared/tests/unit/utils/test_merge.py b/libs/shared/tests/unit/utils/test_merge.py index c6c043be8a..f726dc1764 100644 --- a/libs/shared/tests/unit/utils/test_merge.py +++ b/libs/shared/tests/unit/utils/test_merge.py @@ -7,6 +7,7 @@ LineType, ReportLine, branch_type, + deduplicate_sessions, get_complexity_from_sessions, get_coverage_from_sessions, line_type, @@ -347,3 +348,88 @@ def test_get_coverage_from_sessions(): ) == "2/2" ) + + +@pytest.mark.unit +@pytest.mark.parametrize( + "sessions, expected_len, expected_results", + [ + # No duplicates - sessions with unique IDs returned unchanged + ( + [ + LineSession(0, 1, [1, 2]), + LineSession(1, "2/2"), + LineSession(2, 0, [1, 2, 3]), + ], + 3, + {0: (1, [1, 2]), 1: ("2/2", None), 2: (0, [1, 2, 3])}, + ), + # Simple duplicate - two sessions with same ID merged + ( + [LineSession(0, "1/2", ["exit"]), LineSession(1, 1), LineSession(0, "2/2")], + 2, + {0: ("2/2", []), 1: (1, None)}, + ), + # Multiple duplicates - three sessions with same ID all merged + ( + [ + LineSession(0, "1/3", [1, 2]), + LineSession(1, 1), + LineSession(0, "1/3", [2, 3]), + LineSession(0, "1/3", [3]), + ], + 2, + {0: ("3/3", []), 1: (1, None)}, + ), + ( + [LineSession(0, 5), LineSession(0, 3)], + 1, + {0: (5, None)}, + ), + ( + [LineSession(0, "1/2", [5, 7]), LineSession(0, "1/2", [7, 9])], + 1, + {0: ("1/2", [7])}, + ), + ( + [LineSession(0, 1, None), LineSession(0, 1, [5])], + 1, + {0: (1, [])}, + ), + ( + [ + LineSession(2, 1), + LineSession(0, 1), + LineSession(1, 1), + LineSession(0, 1), + ], + 3, + {2: (1, None), 0: (1, None), 1: (1, None)}, + ), + ], +) +def test_deduplicate_sessions(sessions, expected_len, expected_results): + result = deduplicate_sessions(sessions) + assert len(result) == expected_len + + for session in result: + assert session.id in expected_results + expected_coverage, expected_branches = expected_results[session.id] + assert session.coverage == expected_coverage + if expected_branches is not None: + assert session.branches == expected_branches + + +@pytest.mark.unit +@pytest.mark.parametrize( + "sessions, expected", + [ + # Empty list + ([], []), + # Single session unchanged + ([LineSession(0, 1, [1])], [LineSession(0, 1, [1])]), + ], +) +def test_deduplicate_sessions_edge_cases(sessions, expected): + result = deduplicate_sessions(sessions) + assert result == expected From 5c14c7469ceb966f813952298837552e733977eb Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 28 Oct 2025 13:46:03 -0700 Subject: [PATCH 5/7] shadow mode validation --- apps/worker/services/processing/merging.py | 186 ++++++++++++++++++++- 1 file changed, 179 insertions(+), 7 deletions(-) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index d958289f9c..ef9b92dde8 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -1,7 +1,9 @@ import functools import logging +from copy import copy from decimal import Decimal +import orjson import sentry_sdk from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session as DbSession @@ -20,6 +22,78 @@ log = logging.getLogger(__name__) +def _compare_reports(original_report: Report, optimized_report: Report): + """ + Compare two reports to ensure they produce identical coverage data. + Raises an exception if the reports differ. + """ + # Compare sessions + if set(original_report.sessions.keys()) != set(optimized_report.sessions.keys()): + raise ValueError( + f"Shadow mode validation failed: Session IDs differ. " + f"Original: {set(original_report.sessions.keys())}, " + f"Optimized: {set(optimized_report.sessions.keys())}" + ) + + # Compare files + original_files = set(original_report.files) + optimized_files = set(optimized_report.files) + if original_files != optimized_files: + raise ValueError( + f"Shadow mode validation failed: File lists differ. " + f"Missing in optimized: {original_files - optimized_files}, " + f"Extra in optimized: {optimized_files - original_files}" + ) + + # Compare totals + original_totals = original_report.totals + optimized_totals = optimized_report.totals + if original_totals.astuple() != optimized_totals.astuple(): + raise ValueError( + f"Shadow mode validation failed: Report totals differ. " + f"Original: {original_totals.asdict()}, " + f"Optimized: {optimized_totals.asdict()}" + ) + + # Compare each file in detail + for filename in original_files: + original_file = original_report.get(filename) + optimized_file = optimized_report.get(filename) + + # Compare file totals + if original_file.totals.astuple() != optimized_file.totals.astuple(): + raise ValueError( + f"Shadow mode validation failed: File totals differ for '{filename}'. " + f"Original: {original_file.totals.asdict()}, " + f"Optimized: {optimized_file.totals.asdict()}" + ) + + # Compare line-by-line coverage + original_lines = dict(original_file.lines) + optimized_lines = dict(optimized_file.lines) + + if set(original_lines.keys()) != set(optimized_lines.keys()): + raise ValueError( + f"Shadow mode validation failed: Line numbers differ for '{filename}'. " + f"Original has {len(original_lines)} lines, optimized has {len(optimized_lines)} lines" + ) + + line_nums = original_lines.keys() + for line_num in line_nums: + original_line = original_lines[line_num] + optimized_line = optimized_lines[line_num] + + # Compare coverage data for each line + if str(original_line) != str(optimized_line): + raise ValueError( + f"Shadow mode validation failed: Line coverage differs for '{filename}' line {line_num}. " + f"Original: {original_line}, " + f"Optimized: {optimized_line}" + ) + + log.info("merge_reports: Shadow mode validation passed - reports are identical!") + + @sentry_sdk.trace def merge_reports( commit_yaml: UserYaml, @@ -33,7 +107,58 @@ def merge_reports( commit_yaml, master_report, intermediate_reports ) - log.info("merge_reports: Merging intermediate reports") + # Save the initial state for shadow mode testing + log.info("merge_reports: Saving initial state for shadow mode testing") + + # Save master_report state + if not master_report.is_empty(): + initial_report_json, initial_chunks, initial_totals = master_report.serialize() + initial_report_data = orjson.loads(initial_report_json) + # Need to save initial sessions separately since they'll be modified + initial_sessions = { + sid: copy(session) for sid, session in master_report.sessions.items() + } + else: + initial_report_json = None + initial_chunks = None + initial_totals = None + initial_report_data = None + initial_sessions = {} + + # Save copies of all intermediate reports BEFORE they get modified + saved_intermediate_reports = [] + for intermediate_report in intermediate_reports: + if not intermediate_report.report.is_empty(): + report_json, chunks, totals = intermediate_report.report.serialize() + saved_intermediate_reports.append( + { + "upload_id": intermediate_report.upload_id, + "report_json": report_json, + "chunks": chunks, + "totals": totals, + "report_data": orjson.loads(report_json), + "sessions": { + sid: copy(session) + for sid, session in intermediate_report.report.sessions.items() + }, + } + ) + else: + saved_intermediate_reports.append( + { + "upload_id": intermediate_report.upload_id, + "report_json": None, + "chunks": None, + "totals": None, + "report_data": None, + "sessions": {}, + } + ) + + # ============================================================================ + # ORIGINAL PATH: Using is_disjoint=False (current production behavior) + # ============================================================================ + log.info("merge_reports: Merging intermediate reports (original path)") for intermediate_report in intermediate_reports: report = intermediate_report.report if report.is_empty(): @@ -77,15 +202,62 @@ def merge_reports( joined = get_joined_flag(commit_yaml, session.flags or []) log.info( - "merge_reports: Merging report with disjoint optimization", + "merge_reports: Merging report", extra={"joined": joined}, ) - # Use is_disjoint=True to defer the expensive merge operation until finish_merge() - master_report.merge(report, joined, is_disjoint=True) + master_report.merge(report, joined, is_disjoint=False) + + # ============================================================================ + # OPTIMIZED PATH: Using is_disjoint=True (shadow mode testing) + # ============================================================================ + log.info("merge_reports: Starting shadow mode testing with is_disjoint=True") + + # Recreate the initial state + if initial_report_data is not None: + optimized_report = Report( + files=initial_report_data.get("files"), + sessions=initial_sessions, + totals=initial_totals, + chunks=initial_chunks.decode(), + ) + else: + optimized_report = Report() + + # Re-run the merge with optimized path using saved copies + for saved_report in saved_intermediate_reports: + # Recreate the report from saved state + if saved_report["report_data"] is None: + continue + + report_copy = Report( + files=saved_report["report_data"].get("files"), + sessions=saved_report["sessions"], + totals=saved_report["totals"], + chunks=saved_report["chunks"].decode(), + ) + + old_sessionid = next(iter(report_copy.sessions)) + new_sessionid = session_mapping[saved_report["upload_id"]] + + if optimized_report.is_empty() and old_sessionid == new_sessionid: + optimized_report = report_copy + continue + + report_copy.change_sessionid(old_sessionid, new_sessionid) + session_copy = report_copy.sessions[new_sessionid] + optimized_report.add_session(session_copy, use_id_from_session=True) + + joined = get_joined_flag(commit_yaml, session_copy.flags or []) + optimized_report.merge(report_copy, joined, is_disjoint=True) + + log.info("merge_reports: Finishing merge of disjoint records (optimized path)") + optimized_report.finish_merge() - # Now perform the actual merge of all disjoint coverage records in one pass - log.info("merge_reports: Finishing merge of disjoint records") - master_report.finish_merge() + # ============================================================================ + # COMPARISON: Validate that both paths produce the same result + # ============================================================================ + log.info("merge_reports: Comparing original and optimized reports") + _compare_reports(master_report, optimized_report) log.info("merge_reports: Returning merge result") return master_report, MergeResult(session_mapping, deleted_sessions) From 0005f5292b04d7356893d9a7ce1a88e73c88b595 Mon Sep 17 00:00:00 2001 From: Max Date: Tue, 28 Oct 2025 14:32:37 -0700 Subject: [PATCH 6/7] unit test for compare --- apps/worker/services/processing/merging.py | 4 +- .../services/tests/test_processing_merging.py | 156 ++++++++++++++++++ 2 files changed, 158 insertions(+), 2 deletions(-) create mode 100644 apps/worker/services/tests/test_processing_merging.py diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index ef9b92dde8..7e12445d61 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -22,7 +22,7 @@ log = logging.getLogger(__name__) -def _compare_reports(original_report: Report, optimized_report: Report): +def compare_reports(original_report: Report, optimized_report: Report): """ Compare two reports to ensure they produce identical coverage data. Raises an exception if the reports differ. @@ -257,7 +257,7 @@ def merge_reports( # COMPARISON: Validate that both paths produce the same result # ============================================================================ log.info("merge_reports: Comparing original and optimized reports") - _compare_reports(master_report, optimized_report) + compare_reports(master_report, optimized_report) log.info("merge_reports: Returning merge result") return master_report, MergeResult(session_mapping, deleted_sessions) diff --git a/apps/worker/services/tests/test_processing_merging.py b/apps/worker/services/tests/test_processing_merging.py new file mode 100644 index 0000000000..12c61c73bd --- /dev/null +++ b/apps/worker/services/tests/test_processing_merging.py @@ -0,0 +1,156 @@ +import pytest + +from services.processing.merging import compare_reports +from shared.reports.reportfile import ReportFile +from shared.reports.resources import Report +from shared.reports.types import ReportLine +from shared.utils.sessions import Session, SessionType + + +def _create_report(session_id=0, filename="file.py", lines=None, flags=None): + """Helper to create a report with specified parameters.""" + report = Report() + session = Session( + id=session_id, flags=flags or [], session_type=SessionType.uploaded + ) + report.add_session(session, use_id_from_session=True) + + if lines: + report_file = ReportFile(filename) + for line_num, coverage, sessions in lines: + report_file.append(line_num, ReportLine.create(coverage, None, sessions)) + report.append(report_file) + + return report + + +@pytest.mark.unit +@pytest.mark.parametrize( + "report1_params, report2_params", + [ + # Identical simple reports + ( + {"session_id": 0, "filename": "file.py", "lines": [(1, 1, [[0, 1]])]}, + {"session_id": 0, "filename": "file.py", "lines": [(1, 1, [[0, 1]])]}, + ), + # Empty reports + ( + {"session_id": 0, "filename": "file.py", "lines": []}, + {"session_id": 0, "filename": "file.py", "lines": []}, + ), + # Multiple lines with different coverage types + ( + { + "session_id": 0, + "filename": "module.py", + "lines": [ + (1, 1, [[0, 1]]), + (2, 0, [[0, 0]]), + (3, "1/2", [[0, "1/2", ["exit"]]]), + ], + }, + { + "session_id": 0, + "filename": "module.py", + "lines": [ + (1, 1, [[0, 1]]), + (2, 0, [[0, 0]]), + (3, "1/2", [[0, "1/2", ["exit"]]]), + ], + }, + ), + # Reports with flags + ( + { + "session_id": 0, + "filename": "file.py", + "lines": [(1, 1, [[0, 1]])], + "flags": ["unit"], + }, + { + "session_id": 0, + "filename": "file.py", + "lines": [(1, 1, [[0, 1]])], + "flags": ["unit"], + }, + ), + ], +) +def test_compare_reports_identical_succeeds(report1_params, report2_params): + """Test that comparing identical reports does not raise an exception.""" + report1 = _create_report(**report1_params) + report2 = _create_report(**report2_params) + + # Should not raise + compare_reports(report1, report2) + + +@pytest.mark.unit +@pytest.mark.parametrize( + "report1_params, report2_params, expected_error", + [ + # Different session IDs + ( + {"session_id": 0, "filename": "file.py", "lines": [(1, 1, [[0, 1]])]}, + {"session_id": 1, "filename": "file.py", "lines": [(1, 1, [[1, 1]])]}, + "Session IDs differ", + ), + # Different file lists + ( + {"session_id": 0, "filename": "file1.py", "lines": [(1, 1, [[0, 1]])]}, + {"session_id": 0, "filename": "file2.py", "lines": [(1, 1, [[0, 1]])]}, + "File lists differ", + ), + # Different line numbers + ( + {"session_id": 0, "filename": "file.py", "lines": [(1, 1, [[0, 1]])]}, + {"session_id": 0, "filename": "file.py", "lines": [(2, 1, [[0, 1]])]}, + "Line numbers differ", + ), + # Different coverage values + ( + {"session_id": 0, "filename": "file.py", "lines": [(1, 1, [[0, 1]])]}, + {"session_id": 0, "filename": "file.py", "lines": [(1, 0, [[0, 0]])]}, + "differ", + ), + # Different branches + ( + { + "session_id": 0, + "filename": "file.py", + "lines": [(1, "1/2", [[0, "1/2", ["exit"]]])], + }, + { + "session_id": 0, + "filename": "file.py", + "lines": [(1, "1/2", [[0, "1/2", ["1"]]])], + }, + "Line coverage differs", + ), + # Different report totals (different coverage) + ( + { + "session_id": 0, + "filename": "file.py", + "lines": [(1, 1, [[0, 1]]), (2, 1, [[0, 1]])], + }, + { + "session_id": 0, + "filename": "file.py", + "lines": [(1, 1, [[0, 1]]), (2, 0, [[0, 0]])], + }, + "Report totals differ", + ), + ], +) +def test_compare_reports_differences_raise_error( + report1_params, report2_params, expected_error +): + """Test that comparing different reports raises ValueError with appropriate message.""" + report1 = _create_report(**report1_params) + report2 = _create_report(**report2_params) + + with pytest.raises(ValueError) as exc_info: + compare_reports(report1, report2) + + assert expected_error in str(exc_info.value) From 6962fa59216610648f8130198f7bdb00c32219e3 Mon Sep 17 00:00:00 2001 From: Max Date: Thu, 30 Oct 2025 11:18:29 -0700 Subject: [PATCH 7/7] try catch --- apps/worker/services/processing/merging.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index 7e12445d61..f7bd76af77 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -256,8 +256,13 @@ def merge_reports( # ============================================================================ # COMPARISON: Validate that both paths produce the same result # ============================================================================ + # Comparison: Validate that both paths produce the same result log.info("merge_reports: Comparing original and optimized reports") - compare_reports(master_report, optimized_report) + try: + compare_reports(master_report, optimized_report) + except Exception as e: + log.error("merge_reports: Shadow mode validation failed", exc_info=True) + sentry_sdk.capture_exception(e) log.info("merge_reports: Returning merge result") return master_report, MergeResult(session_mapping, deleted_sessions)