From 1dd7569988787f203db17101107211448d13a992 Mon Sep 17 00:00:00 2001 From: Javier Hernandez Date: Tue, 24 Mar 2026 15:49:43 +0100 Subject: [PATCH 1/2] Fix #626: Use explicit DuckDB connections for thread-safe operations Replace duckdb.query() (shared default connection) with explicit per-call duckdb.connect(":memory:") in Aggregation and Analytic operators to prevent PendingRequest errors in multi-threaded usage. --- src/vtlengine/Operators/Aggregation.py | 13 +- src/vtlengine/Operators/Analytic.py | 6 +- tests/Concurrency/__init__.py | 0 tests/Concurrency/test_thread_safety.py | 200 ++++++++++++++++++++++++ 4 files changed, 216 insertions(+), 3 deletions(-) create mode 100644 tests/Concurrency/__init__.py create mode 100644 tests/Concurrency/test_thread_safety.py diff --git a/src/vtlengine/Operators/Aggregation.py b/src/vtlengine/Operators/Aggregation.py index 3dc8d9277..37e942ce9 100644 --- a/src/vtlengine/Operators/Aggregation.py +++ b/src/vtlengine/Operators/Aggregation.py @@ -198,7 +198,12 @@ def _agg_func( ) else: query = f"SELECT COUNT() AS int_var from df {grouping}" - return duckdb.query(query).to_df() + conn = duckdb.connect(database=":memory:", read_only=False) + try: + conn.register("df", df) + return conn.execute(query).fetchdf() + finally: + conn.close() if measure_names is not None and len(measure_names) > 0: functions = "" @@ -226,13 +231,17 @@ def _agg_func( f"SELECT {', '.join(grouping_names or [])} from df {grouping} {having_expression}" ) + conn = duckdb.connect(database=":memory:", read_only=False) try: - result = duckdb.query(query).to_df() + conn.register("df", df) + result = conn.execute(query).fetchdf() except RuntimeError as e: if "Conversion" in e.args[0]: raise RunTimeError("2-3-8", op=cls.op, msg=e.args[0].split(":")[-1]) else: raise RunTimeError("2-1-1-1", op=cls.op, error=e) + finally: + conn.close() return result @classmethod diff --git a/src/vtlengine/Operators/Analytic.py b/src/vtlengine/Operators/Analytic.py index 578ba06b5..c916b9a15 100644 --- a/src/vtlengine/Operators/Analytic.py +++ b/src/vtlengine/Operators/Analytic.py @@ -297,13 +297,17 @@ def analyticfunc( if cls.op == COUNT: df[measure_names] = df[measure_names].fillna(-1) + conn = duckdb.connect(database=":memory:", read_only=False) try: - result = duckdb.query(query).to_df() + conn.register("df", df) + result = conn.execute(query).fetchdf() except RuntimeError as e: if "Conversion" in e.args[0]: raise RunTimeError("2-3-8", op=cls.op, msg=e.args[0].split(":")[-1]) else: raise RunTimeError("2-1-1-1", op=cls.op, error=e) + finally: + conn.close() if cls.op == RATIO_TO_REPORT: for col_name in measure_names: arr = pa.array(result[col_name]) diff --git a/tests/Concurrency/__init__.py b/tests/Concurrency/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/Concurrency/test_thread_safety.py b/tests/Concurrency/test_thread_safety.py new file mode 100644 index 000000000..8fd3fc7af --- /dev/null +++ b/tests/Concurrency/test_thread_safety.py @@ -0,0 +1,200 @@ +"""Thread-safety tests for DuckDB operations. + +Verifies that concurrent vtlengine executions using DuckDB (aggregation, analytic, +and eval operations) work correctly when called from multiple threads. + +See: https://github.com/Meaningful-Data/vtlengine/issues/626 +""" + +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Dict + +import pandas as pd + +from vtlengine import run + +AGG_SCRIPT = """ + DS_r <- sum(DS_1 group by Id_1); +""" + +AGG_DATA_STRUCTURES: Dict[str, Any] = { + "datasets": [ + { + "name": "DS_1", + "DataStructure": [ + {"name": "Id_1", "type": "Integer", "role": "Identifier", "nullable": False}, + {"name": "Id_2", "type": "String", "role": "Identifier", "nullable": False}, + {"name": "Me_1", "type": "Number", "role": "Measure", "nullable": True}, + ], + } + ] +} + +AGG_DATAPOINTS = { + "DS_1": pd.DataFrame( + { + "Id_1": [1, 1, 2, 2, 3], + "Id_2": ["A", "B", "A", "B", "A"], + "Me_1": [10.0, 20.0, 30.0, 40.0, 50.0], + } + ) +} + +ANALYTIC_SCRIPT = """ + DS_r <- first_value(DS_1 over (partition by Id_1 order by Id_2 asc)); +""" + +ANALYTIC_DATA_STRUCTURES: Dict[str, Any] = { + "datasets": [ + { + "name": "DS_1", + "DataStructure": [ + {"name": "Id_1", "type": "Integer", "role": "Identifier", "nullable": False}, + {"name": "Id_2", "type": "String", "role": "Identifier", "nullable": False}, + {"name": "Me_1", "type": "Number", "role": "Measure", "nullable": True}, + ], + } + ] +} + +ANALYTIC_DATAPOINTS = { + "DS_1": pd.DataFrame( + { + "Id_1": [1, 1, 2, 2, 3], + "Id_2": ["A", "B", "A", "B", "A"], + "Me_1": [10.0, 20.0, 30.0, 40.0, 50.0], + } + ) +} + +EVAL_SCRIPT = """ + DS_r <- eval(SQL1(DS_1) language "SQL" + returns dataset { + identifier Id_1, + measure Me_1 + }); +""" + +EVAL_DATA_STRUCTURES: Dict[str, Any] = { + "datasets": [ + { + "name": "DS_1", + "DataStructure": [ + {"name": "Id_1", "type": "Integer", "role": "Identifier", "nullable": False}, + {"name": "Me_1", "type": "Number", "role": "Measure", "nullable": True}, + ], + } + ] +} + +EVAL_DATAPOINTS = {"DS_1": pd.DataFrame({"Id_1": [1, 2, 3], "Me_1": [10.0, 20.0, 30.0]})} + +EVAL_EXTERNAL_ROUTINES = [{"name": "SQL1", "query": "SELECT Id_1, Me_1 * 2 AS Me_1 FROM DS_1"}] + + +def _run_agg(thread_id: int) -> Dict[str, Any]: + result = run( + script=AGG_SCRIPT, + data_structures=AGG_DATA_STRUCTURES, + datapoints=AGG_DATAPOINTS, + ) + return {"thread_id": thread_id, "result": result} + + +def _run_analytic(thread_id: int) -> Dict[str, Any]: + result = run( + script=ANALYTIC_SCRIPT, + data_structures=ANALYTIC_DATA_STRUCTURES, + datapoints=ANALYTIC_DATAPOINTS, + ) + return {"thread_id": thread_id, "result": result} + + +def _run_eval(thread_id: int) -> Dict[str, Any]: + result = run( + script=EVAL_SCRIPT, + data_structures=EVAL_DATA_STRUCTURES, + datapoints=EVAL_DATAPOINTS, + external_routines=EVAL_EXTERNAL_ROUTINES, + ) + return {"thread_id": thread_id, "result": result} + + +class TestConcurrentAggregation: + """Test concurrent aggregation operations (issue #626).""" + + def test_two_threads(self) -> None: + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(_run_agg, i) for i in range(2)] + results = [f.result() for f in as_completed(futures)] + + assert len(results) == 2 + for r in results: + ds_r = r["result"]["DS_r"].data + assert len(ds_r) == 3 + assert set(ds_r["Id_1"].tolist()) == {1, 2, 3} + expected_sums = {1: 30.0, 2: 70.0, 3: 50.0} + for _, row in ds_r.iterrows(): + assert row["Me_1"] == expected_sums[row["Id_1"]] + + +class TestConcurrentAnalytic: + """Test concurrent analytic operations (issue #626).""" + + def test_two_threads(self) -> None: + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(_run_analytic, i) for i in range(2)] + results = [f.result() for f in as_completed(futures)] + + assert len(results) == 2 + for r in results: + ds_r = r["result"]["DS_r"].data + assert len(ds_r) == 5 + + +class TestConcurrentMixed: + """Test mixed concurrent operations (aggregation + analytic).""" + + def test_aggregation_and_analytic(self) -> None: + with ThreadPoolExecutor(max_workers=2) as executor: + agg_future = executor.submit(_run_agg, 0) + analytic_future = executor.submit(_run_analytic, 1) + agg_result = agg_future.result() + analytic_result = analytic_future.result() + + assert len(agg_result["result"]["DS_r"].data) == 3 + assert len(analytic_result["result"]["DS_r"].data) == 5 + + +class TestConcurrentEval: + """Test concurrent external routine (eval) operations.""" + + def test_two_threads(self) -> None: + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(_run_eval, i) for i in range(2)] + results = [f.result() for f in as_completed(futures)] + + assert len(results) == 2 + for r in results: + ds_r = r["result"]["DS_r"].data + assert len(ds_r) == 3 + expected = {1: 20.0, 2: 40.0, 3: 60.0} + for _, row in ds_r.iterrows(): + assert row["Me_1"] == expected[row["Id_1"]] + + +class TestConcurrentStress: + """Stress test with higher concurrency.""" + + def test_ten_concurrent_aggregations(self) -> None: + with ThreadPoolExecutor(max_workers=10) as executor: + futures = [executor.submit(_run_agg, i) for i in range(10)] + results = [f.result() for f in as_completed(futures)] + + assert len(results) == 10 + expected_sums = {1: 30.0, 2: 70.0, 3: 50.0} + for r in results: + ds_r = r["result"]["DS_r"].data + assert len(ds_r) == 3 + for _, row in ds_r.iterrows(): + assert row["Me_1"] == expected_sums[row["Id_1"]] From 7a48775ff7f5776de7e33009e717dcdea38bbed3 Mon Sep 17 00:00:00 2001 From: Javier Hernandez Date: Wed, 25 Mar 2026 11:39:02 +0100 Subject: [PATCH 2/2] Simplify concurrency tests --- tests/Concurrency/test_thread_safety.py | 199 ++++++------------------ 1 file changed, 46 insertions(+), 153 deletions(-) diff --git a/tests/Concurrency/test_thread_safety.py b/tests/Concurrency/test_thread_safety.py index 8fd3fc7af..f831cbd7a 100644 --- a/tests/Concurrency/test_thread_safety.py +++ b/tests/Concurrency/test_thread_safety.py @@ -1,23 +1,17 @@ -"""Thread-safety tests for DuckDB operations. +"""Thread-safety tests for DuckDB operations (issue #626). -Verifies that concurrent vtlengine executions using DuckDB (aggregation, analytic, -and eval operations) work correctly when called from multiple threads. - -See: https://github.com/Meaningful-Data/vtlengine/issues/626 +Verifies that concurrent vtlengine executions using DuckDB work correctly +when called from multiple threads in the same process. """ from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Any, Dict +from typing import Any, Dict, Optional import pandas as pd from vtlengine import run -AGG_SCRIPT = """ - DS_r <- sum(DS_1 group by Id_1); -""" - -AGG_DATA_STRUCTURES: Dict[str, Any] = { +DATA_STRUCTURES: Dict[str, Any] = { "datasets": [ { "name": "DS_1", @@ -30,7 +24,7 @@ ] } -AGG_DATAPOINTS = { +DATAPOINTS: Dict[str, Any] = { "DS_1": pd.DataFrame( { "Id_1": [1, 1, 2, 2, 3], @@ -40,161 +34,60 @@ ) } -ANALYTIC_SCRIPT = """ - DS_r <- first_value(DS_1 over (partition by Id_1 order by Id_2 asc)); -""" - -ANALYTIC_DATA_STRUCTURES: Dict[str, Any] = { - "datasets": [ - { - "name": "DS_1", - "DataStructure": [ - {"name": "Id_1", "type": "Integer", "role": "Identifier", "nullable": False}, - {"name": "Id_2", "type": "String", "role": "Identifier", "nullable": False}, - {"name": "Me_1", "type": "Number", "role": "Measure", "nullable": True}, - ], - } - ] -} - -ANALYTIC_DATAPOINTS = { - "DS_1": pd.DataFrame( - { - "Id_1": [1, 1, 2, 2, 3], - "Id_2": ["A", "B", "A", "B", "A"], - "Me_1": [10.0, 20.0, 30.0, 40.0, 50.0], - } - ) -} - -EVAL_SCRIPT = """ - DS_r <- eval(SQL1(DS_1) language "SQL" - returns dataset { - identifier Id_1, - measure Me_1 - }); -""" - -EVAL_DATA_STRUCTURES: Dict[str, Any] = { - "datasets": [ - { - "name": "DS_1", - "DataStructure": [ - {"name": "Id_1", "type": "Integer", "role": "Identifier", "nullable": False}, - {"name": "Me_1", "type": "Number", "role": "Measure", "nullable": True}, - ], - } - ] -} - -EVAL_DATAPOINTS = {"DS_1": pd.DataFrame({"Id_1": [1, 2, 3], "Me_1": [10.0, 20.0, 30.0]})} -EVAL_EXTERNAL_ROUTINES = [{"name": "SQL1", "query": "SELECT Id_1, Me_1 * 2 AS Me_1 FROM DS_1"}] +def _run_concurrent( + script: str, + workers: int = 2, + external_routines: Optional[list] = None, # type: ignore[type-arg] +) -> list: # type: ignore[type-arg] + def task(tid: int) -> Dict[str, Any]: + return run( + script=script, + data_structures=DATA_STRUCTURES, + datapoints=DATAPOINTS, + external_routines=external_routines, + ) - -def _run_agg(thread_id: int) -> Dict[str, Any]: - result = run( - script=AGG_SCRIPT, - data_structures=AGG_DATA_STRUCTURES, - datapoints=AGG_DATAPOINTS, - ) - return {"thread_id": thread_id, "result": result} - - -def _run_analytic(thread_id: int) -> Dict[str, Any]: - result = run( - script=ANALYTIC_SCRIPT, - data_structures=ANALYTIC_DATA_STRUCTURES, - datapoints=ANALYTIC_DATAPOINTS, - ) - return {"thread_id": thread_id, "result": result} - - -def _run_eval(thread_id: int) -> Dict[str, Any]: - result = run( - script=EVAL_SCRIPT, - data_structures=EVAL_DATA_STRUCTURES, - datapoints=EVAL_DATAPOINTS, - external_routines=EVAL_EXTERNAL_ROUTINES, - ) - return {"thread_id": thread_id, "result": result} + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [executor.submit(task, i) for i in range(workers)] + return [f.result() for f in as_completed(futures)] class TestConcurrentAggregation: - """Test concurrent aggregation operations (issue #626).""" - - def test_two_threads(self) -> None: - with ThreadPoolExecutor(max_workers=2) as executor: - futures = [executor.submit(_run_agg, i) for i in range(2)] - results = [f.result() for f in as_completed(futures)] - + def test_concurrent_sum(self) -> None: + results = _run_concurrent("DS_r <- sum(DS_1 group by Id_1);") assert len(results) == 2 for r in results: - ds_r = r["result"]["DS_r"].data - assert len(ds_r) == 3 - assert set(ds_r["Id_1"].tolist()) == {1, 2, 3} - expected_sums = {1: 30.0, 2: 70.0, 3: 50.0} - for _, row in ds_r.iterrows(): - assert row["Me_1"] == expected_sums[row["Id_1"]] + df = r["DS_r"].data + assert len(df) == 3 + sums = dict(zip(df["Id_1"], df["Me_1"])) + assert sums == {1: 30.0, 2: 70.0, 3: 50.0} class TestConcurrentAnalytic: - """Test concurrent analytic operations (issue #626).""" - - def test_two_threads(self) -> None: - with ThreadPoolExecutor(max_workers=2) as executor: - futures = [executor.submit(_run_analytic, i) for i in range(2)] - results = [f.result() for f in as_completed(futures)] - + def test_concurrent_first_value(self) -> None: + script = "DS_r <- first_value(DS_1 over (partition by Id_1 order by Id_2 asc));" + results = _run_concurrent(script) assert len(results) == 2 for r in results: - ds_r = r["result"]["DS_r"].data - assert len(ds_r) == 5 - - -class TestConcurrentMixed: - """Test mixed concurrent operations (aggregation + analytic).""" - - def test_aggregation_and_analytic(self) -> None: - with ThreadPoolExecutor(max_workers=2) as executor: - agg_future = executor.submit(_run_agg, 0) - analytic_future = executor.submit(_run_analytic, 1) - agg_result = agg_future.result() - analytic_result = analytic_future.result() - - assert len(agg_result["result"]["DS_r"].data) == 3 - assert len(analytic_result["result"]["DS_r"].data) == 5 + assert len(r["DS_r"].data) == 5 class TestConcurrentEval: - """Test concurrent external routine (eval) operations.""" - - def test_two_threads(self) -> None: - with ThreadPoolExecutor(max_workers=2) as executor: - futures = [executor.submit(_run_eval, i) for i in range(2)] - results = [f.result() for f in as_completed(futures)] - + def test_concurrent_eval(self) -> None: + script = """ + DS_r <- eval(SQL1(DS_1) language "SQL" + returns dataset { + identifier Id_1, + identifier Id_2, + measure Me_1 + }); + """ + routines = [{"name": "SQL1", "query": "SELECT Id_1, Id_2, Me_1 * 2 AS Me_1 FROM DS_1"}] + results = _run_concurrent(script, external_routines=routines) assert len(results) == 2 for r in results: - ds_r = r["result"]["DS_r"].data - assert len(ds_r) == 3 - expected = {1: 20.0, 2: 40.0, 3: 60.0} - for _, row in ds_r.iterrows(): - assert row["Me_1"] == expected[row["Id_1"]] - - -class TestConcurrentStress: - """Stress test with higher concurrency.""" - - def test_ten_concurrent_aggregations(self) -> None: - with ThreadPoolExecutor(max_workers=10) as executor: - futures = [executor.submit(_run_agg, i) for i in range(10)] - results = [f.result() for f in as_completed(futures)] - - assert len(results) == 10 - expected_sums = {1: 30.0, 2: 70.0, 3: 50.0} - for r in results: - ds_r = r["result"]["DS_r"].data - assert len(ds_r) == 3 - for _, row in ds_r.iterrows(): - assert row["Me_1"] == expected_sums[row["Id_1"]] + df = r["DS_r"].data + assert len(df) == 5 + doubled = dict(zip(df["Id_2"], df["Me_1"])) + assert doubled["A"] in {20.0, 60.0, 100.0}