diff --git a/src/vtlengine/Operators/Aggregation.py b/src/vtlengine/Operators/Aggregation.py index 3dc8d927..37e942ce 100644 --- a/src/vtlengine/Operators/Aggregation.py +++ b/src/vtlengine/Operators/Aggregation.py @@ -198,7 +198,12 @@ def _agg_func( ) else: query = f"SELECT COUNT() AS int_var from df {grouping}" - return duckdb.query(query).to_df() + conn = duckdb.connect(database=":memory:", read_only=False) + try: + conn.register("df", df) + return conn.execute(query).fetchdf() + finally: + conn.close() if measure_names is not None and len(measure_names) > 0: functions = "" @@ -226,13 +231,17 @@ def _agg_func( f"SELECT {', '.join(grouping_names or [])} from df {grouping} {having_expression}" ) + conn = duckdb.connect(database=":memory:", read_only=False) try: - result = duckdb.query(query).to_df() + conn.register("df", df) + result = conn.execute(query).fetchdf() except RuntimeError as e: if "Conversion" in e.args[0]: raise RunTimeError("2-3-8", op=cls.op, msg=e.args[0].split(":")[-1]) else: raise RunTimeError("2-1-1-1", op=cls.op, error=e) + finally: + conn.close() return result @classmethod diff --git a/src/vtlengine/Operators/Analytic.py b/src/vtlengine/Operators/Analytic.py index 578ba06b..c916b9a1 100644 --- a/src/vtlengine/Operators/Analytic.py +++ b/src/vtlengine/Operators/Analytic.py @@ -297,13 +297,17 @@ def analyticfunc( if cls.op == COUNT: df[measure_names] = df[measure_names].fillna(-1) + conn = duckdb.connect(database=":memory:", read_only=False) try: - result = duckdb.query(query).to_df() + conn.register("df", df) + result = conn.execute(query).fetchdf() except RuntimeError as e: if "Conversion" in e.args[0]: raise RunTimeError("2-3-8", op=cls.op, msg=e.args[0].split(":")[-1]) else: raise RunTimeError("2-1-1-1", op=cls.op, error=e) + finally: + conn.close() if cls.op == RATIO_TO_REPORT: for col_name in measure_names: arr = pa.array(result[col_name]) diff --git a/tests/Concurrency/__init__.py b/tests/Concurrency/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/Concurrency/test_thread_safety.py b/tests/Concurrency/test_thread_safety.py new file mode 100644 index 00000000..f831cbd7 --- /dev/null +++ b/tests/Concurrency/test_thread_safety.py @@ -0,0 +1,93 @@ +"""Thread-safety tests for DuckDB operations (issue #626). + +Verifies that concurrent vtlengine executions using DuckDB work correctly +when called from multiple threads in the same process. +""" + +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Any, Dict, Optional + +import pandas as pd + +from vtlengine import run + +DATA_STRUCTURES: Dict[str, Any] = { + "datasets": [ + { + "name": "DS_1", + "DataStructure": [ + {"name": "Id_1", "type": "Integer", "role": "Identifier", "nullable": False}, + {"name": "Id_2", "type": "String", "role": "Identifier", "nullable": False}, + {"name": "Me_1", "type": "Number", "role": "Measure", "nullable": True}, + ], + } + ] +} + +DATAPOINTS: Dict[str, Any] = { + "DS_1": pd.DataFrame( + { + "Id_1": [1, 1, 2, 2, 3], + "Id_2": ["A", "B", "A", "B", "A"], + "Me_1": [10.0, 20.0, 30.0, 40.0, 50.0], + } + ) +} + + +def _run_concurrent( + script: str, + workers: int = 2, + external_routines: Optional[list] = None, # type: ignore[type-arg] +) -> list: # type: ignore[type-arg] + def task(tid: int) -> Dict[str, Any]: + return run( + script=script, + data_structures=DATA_STRUCTURES, + datapoints=DATAPOINTS, + external_routines=external_routines, + ) + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [executor.submit(task, i) for i in range(workers)] + return [f.result() for f in as_completed(futures)] + + +class TestConcurrentAggregation: + def test_concurrent_sum(self) -> None: + results = _run_concurrent("DS_r <- sum(DS_1 group by Id_1);") + assert len(results) == 2 + for r in results: + df = r["DS_r"].data + assert len(df) == 3 + sums = dict(zip(df["Id_1"], df["Me_1"])) + assert sums == {1: 30.0, 2: 70.0, 3: 50.0} + + +class TestConcurrentAnalytic: + def test_concurrent_first_value(self) -> None: + script = "DS_r <- first_value(DS_1 over (partition by Id_1 order by Id_2 asc));" + results = _run_concurrent(script) + assert len(results) == 2 + for r in results: + assert len(r["DS_r"].data) == 5 + + +class TestConcurrentEval: + def test_concurrent_eval(self) -> None: + script = """ + DS_r <- eval(SQL1(DS_1) language "SQL" + returns dataset { + identifier Id_1, + identifier Id_2, + measure Me_1 + }); + """ + routines = [{"name": "SQL1", "query": "SELECT Id_1, Id_2, Me_1 * 2 AS Me_1 FROM DS_1"}] + results = _run_concurrent(script, external_routines=routines) + assert len(results) == 2 + for r in results: + df = r["DS_r"].data + assert len(df) == 5 + doubled = dict(zip(df["Id_2"], df["Me_1"])) + assert doubled["A"] in {20.0, 60.0, 100.0}