From bf967dfbd6a43205acd432b93f5df9a36526adca Mon Sep 17 00:00:00 2001 From: Andrei Markin Date: Thu, 12 Mar 2026 13:25:24 +0400 Subject: [PATCH] feat(executors): add support for config in workflows --- libs/executors/garf/executors/config.py | 49 ++++++- .../garf/executors/entrypoints/cli.py | 2 +- .../garf/executors/entrypoints/typer_cli.py | 26 +++- libs/executors/garf/executors/utils.py | 33 +++++ .../garf/executors/workflows/workflow.py | 52 ++++---- libs/executors/tests/end-to-end/query.sql | 1 + libs/executors/tests/end-to-end/test.json | 12 +- libs/executors/tests/end-to-end/test_cli.py | 86 ++++++++++--- .../tests/end-to-end/test_config.yaml | 14 +- .../executors/tests/end-to-end/test_query.sql | 1 + .../tests/end-to-end/test_workflow.yaml | 5 + libs/executors/tests/unit/test_config.py | 121 +++++++++++++----- .../tests/unit/workflows/test_workflow.py | 58 +++++++++ 13 files changed, 370 insertions(+), 90 deletions(-) create mode 100644 libs/executors/garf/executors/utils.py diff --git a/libs/executors/garf/executors/config.py b/libs/executors/garf/executors/config.py index 50db200a..7a3aa7dd 100644 --- a/libs/executors/garf/executors/config.py +++ b/libs/executors/garf/executors/config.py @@ -24,7 +24,10 @@ import pydantic import smart_open import yaml +from garf.core import query_editor +from garf.executors import utils from garf.executors.execution_context import ExecutionContext +from typing_extensions import Self class Config(pydantic.BaseModel): @@ -35,18 +38,56 @@ class Config(pydantic.BaseModel): """ sources: dict[str, ExecutionContext] + global_parameters: ExecutionContext | None = None @classmethod def from_file(cls, path: str | pathlib.Path | os.PathLike[str]) -> Config: """Builds config from local or remote yaml file.""" with smart_open.open(path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) - return Config(sources=data) + return Config(**data) def save(self, path: str | pathlib.Path | os.PathLike[str]) -> str: """Saves config to local or remote yaml file.""" with smart_open.open(path, 'w', encoding='utf-8') as f: - yaml.dump( - self.model_dump(exclude_none=True).get('sources'), f, encoding='utf-8' - ) + yaml.dump(self.model_dump(exclude_none=True), f, encoding='utf-8') return f'Config is saved to {str(path)}' + + def expand(self) -> Self: + if global_parameters := self.global_parameters: + if query_parameters := global_parameters.query_parameters: + common_parameters = { + k: v for k, v in query_parameters.model_dump().items() if v + } + for k, s in self.sources.items(): + source_parameters = { + k: v for k, v in s.query_parameters.model_dump().items() if v + } + source_joined_parameters = utils.merge_dicts( + dict(common_parameters), source_parameters + ) + s.query_parameters = query_editor.GarfQueryParameters( + **source_joined_parameters + ) + if writer_parameters := global_parameters.writer_parameters: + common_parameters = {k: v for k, v in writer_parameters.items() if v} + for k, s in self.sources.items(): + writer_parameters = { + k: v for k, v in s.writer_parameters.items() if v + } + writer_joined_parameters = utils.merge_dicts( + dict(common_parameters), writer_parameters + ) + s.writer_parameters = writer_joined_parameters + if fetcher_parameters := global_parameters.fetcher_parameters: + common_parameters = {k: v for k, v in fetcher_parameters.items() if v} + for k, s in self.sources.items(): + fetcher_parameters = { + k: v for k, v in s.fetcher_parameters.items() if v + } + fetcher_joined_parameters = utils.merge_dicts( + dict(common_parameters), fetcher_parameters + ) + s.fetcher_parameters = fetcher_joined_parameters + + return self diff --git a/libs/executors/garf/executors/entrypoints/cli.py b/libs/executors/garf/executors/entrypoints/cli.py index 74262c3a..cb5d7289 100644 --- a/libs/executors/garf/executors/entrypoints/cli.py +++ b/libs/executors/garf/executors/entrypoints/cli.py @@ -119,7 +119,7 @@ def main(): extra_parameters = utils.ParamsParser().parse_all(kwargs) execution_workflow = workflow.Workflow.from_file( - workflow_file, context=extra_parameters + workflow_file, context=extra_parameters, config_file=args.config ) workflow_skip = args.workflow_skip if args.workflow_skip else None workflow_include = args.workflow_include if args.workflow_include else None diff --git a/libs/executors/garf/executors/entrypoints/typer_cli.py b/libs/executors/garf/executors/entrypoints/typer_cli.py index b3c98403..c9964538 100644 --- a/libs/executors/garf/executors/entrypoints/typer_cli.py +++ b/libs/executors/garf/executors/entrypoints/typer_cli.py @@ -108,9 +108,13 @@ ] -def _init_runner(file, context=None) -> workflow_runner.WorkflowRunner: +def _init_runner( + file, context=None, config=None +) -> workflow_runner.WorkflowRunner: wf_parent = pathlib.Path.cwd() / pathlib.Path(file).parent - execution_workflow = workflow.Workflow.from_file(path=file, context=context) + execution_workflow = workflow.Workflow.from_file( + path=file, context=context, config_file=config + ) return workflow_runner.WorkflowRunner( execution_workflow=execution_workflow, wf_parent=wf_parent ) @@ -274,6 +278,10 @@ def run( str, typer.Option('--workflow', '-f', help='Workflow YAML file'), ], + config: Annotated[ + Optional[str], + typer.Option('--config', '-c', help='Yaml file with parameters'), + ] = None, include: Annotated[ Optional[str], typer.Option('--include', '-i', help='Steps of workflow to execute'), @@ -298,7 +306,7 @@ def run( ) garf_logger.addHandler(initialize_logger()) context = utils.ParamsParser().parse_all(ctx.args) - _init_runner(file, context).run( + _init_runner(file, context, config).run( enable_cache=enable_cache, cache_ttl_seconds=cache_ttl_seconds, selected_aliases=include, @@ -320,12 +328,16 @@ def compile( Optional[str], typer.Option('--output-workflow', '-o', help='Output workflow YAML file'), ] = None, + config: Annotated[ + Optional[str], + typer.Option('--config', '-c', help='Yaml file with parameters'), + ] = None, ): """Compiles workflow.""" if not output_file: output_file = pathlib.Path(file).stem / '_compiled.yaml' context = utils.ParamsParser().parse_all(ctx.args) - _init_runner(file, context).compile(output_file) + _init_runner(file, context, config).compile(output_file) @workflow_app.command( @@ -341,6 +353,10 @@ def deploy( Optional[str], typer.Option('--output-workflow', '-o', help='Output workflow YAML file'), ] = None, + config: Annotated[ + Optional[str], + typer.Option('--config', '-c', help='Yaml file with parameters'), + ] = None, embed_queries: Annotated[ bool, typer.Option( @@ -352,7 +368,7 @@ def deploy( if not output_file: output_file = pathlib.Path(file).stem / '_gcp.yaml' context = utils.ParamsParser().parse_all(ctx.args) - _init_runner(file, context).deploy( + _init_runner(file, context, config).deploy( path=output_file, embed_queries=embed_queries ) diff --git a/libs/executors/garf/executors/utils.py b/libs/executors/garf/executors/utils.py new file mode 100644 index 00000000..2071d14c --- /dev/null +++ b/libs/executors/garf/executors/utils.py @@ -0,0 +1,33 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Common utilities for garf executors.""" + +import copy +from typing import Any + + +def merge_dicts(dict1: dict[str, Any], dict2: dict[str, Any]) -> dict[str, Any]: + """Performs deep merging of nested dicts.""" + result = copy.deepcopy(dict1) + for key, value in dict2.items(): + if ( + key in result + and isinstance(result[key], dict) + and isinstance(value, dict) + ): + result[key] = merge_dicts(result[key], value) + else: + result[key] = value + return result diff --git a/libs/executors/garf/executors/workflows/workflow.py b/libs/executors/garf/executors/workflows/workflow.py index ad20fae9..cf44fe66 100644 --- a/libs/executors/garf/executors/workflows/workflow.py +++ b/libs/executors/garf/executors/workflows/workflow.py @@ -15,7 +15,6 @@ from __future__ import annotations -import copy import os import pathlib from collections import defaultdict @@ -25,7 +24,7 @@ import smart_open import yaml from garf.core import query_editor -from garf.executors import exceptions +from garf.executors import config, exceptions, utils from garf.executors.execution_context import ExecutionContext from garf.io import reader @@ -166,35 +165,49 @@ class Workflow(pydantic.BaseModel): steps: list[ExecutionStep] context: dict[str, dict[str, Any]] | None = None + execution_config: config.Config | None = None prefix: str | pathlib.Path | None = pydantic.Field( default=None, excluded=True ) def model_post_init(self, __context__) -> None: + if self.execution_config: + self.execution_config.expand() + config_parameters = self.execution_config.sources + else: + config_parameters = {} + custom_parameters = defaultdict(dict) if context := self.context: - custom_parameters = defaultdict(dict) if custom_macros := context.get('macro'): custom_parameters['query_parameters']['macro'] = custom_macros if custom_templates := context.get('template'): custom_parameters['query_parameters']['template'] = custom_templates - steps = self.steps - for i, step in enumerate(steps): + steps = self.steps + for i, step in enumerate(steps): + if context: if fetcher_parameters := context.get(step.fetcher): custom_parameters['fetcher_parameters'] = fetcher_parameters if writer_parameters := context.get(step.writer): custom_parameters['writer_parameters'] = writer_parameters - res = _merge_dicts( - step.model_dump(exclude_none=True), custom_parameters + if source_config_parameters := config_parameters.get(step.fetcher): + res = utils.merge_dicts( + step.model_dump(exclude_none=True), + source_config_parameters.model_dump(exclude_none=True), ) - steps[i] = ExecutionStep(**res) + else: + res = step.model_dump(exclude_none=True) + res = utils.merge_dicts(res, custom_parameters) + + steps[i] = ExecutionStep(**res) @classmethod def from_file( cls, path: str | pathlib.Path | os.PathLike[str], context: dict[str, dict[str, Any]] | None = None, + config_file: str | pathlib.Path | os.PathLike[str] | None = None, ) -> Workflow: """Builds workflow from local or remote yaml file.""" with smart_open.open(path, 'r', encoding='utf-8') as f: @@ -203,7 +216,12 @@ def from_file( if isinstance(path, str): path = pathlib.Path(path) return Workflow( - steps=data.get('steps'), context=context, prefix=path.parent + steps=data.get('steps'), + context=context, + prefix=path.parent, + execution_config=config.Config.from_file(config_file) + if config_file + else None, ) except pydantic.ValidationError as e: raise GarfWorkflowError(f'Incorrect workflow:\n {e}') from e @@ -229,19 +247,3 @@ def compile(self) -> None: else: new_queries.append(query.to_query(self.prefix)) step.queries = new_queries - - -def _merge_dicts( - dict1: dict[str, Any], dict2: dict[str, Any] -) -> dict[str, Any]: - result = copy.deepcopy(dict1) - for key, value in dict2.items(): - if ( - key in result - and isinstance(result[key], dict) - and isinstance(value, dict) - ): - result[key] = _merge_dicts(result[key], value) - else: - result[key] = value - return result diff --git a/libs/executors/tests/end-to-end/query.sql b/libs/executors/tests/end-to-end/query.sql index 00107d47..c2986416 100644 --- a/libs/executors/tests/end-to-end/query.sql +++ b/libs/executors/tests/end-to-end/query.sql @@ -1,5 +1,6 @@ SELECT resource, dimensions.name AS name, + dimensions.values AS values, metrics.clicks AS clicks FROM resource diff --git a/libs/executors/tests/end-to-end/test.json b/libs/executors/tests/end-to-end/test.json index 57e0a70a..75888d92 100644 --- a/libs/executors/tests/end-to-end/test.json +++ b/libs/executors/tests/end-to-end/test.json @@ -3,7 +3,8 @@ "resource": "Campaign A", "dimensions": { "name": "Ad Group 1", - "id": 101 + "id": 101, + "values": [1, 2] }, "metrics": { "clicks": 1500, @@ -14,7 +15,8 @@ "resource": "Campaign B", "dimensions": { "name": "Ad Group 2", - "id": 102 + "id": 102, + "values": [1, 2] }, "metrics": { "clicks": 2300, @@ -25,7 +27,8 @@ "resource": "Campaign C", "dimensions": { "name": "Ad Group 3", - "id": 103 + "id": 103, + "values": [1, 2] }, "metrics": { "clicks": 800, @@ -36,7 +39,8 @@ "resource": "Campaign A", "dimensions": { "name": "Ad Group 4", - "id": 104 + "id": 104, + "values": [1, 2] }, "metrics": { "clicks": 3200, diff --git a/libs/executors/tests/end-to-end/test_cli.py b/libs/executors/tests/end-to-end/test_cli.py index c5f5d72f..1471832e 100644 --- a/libs/executors/tests/end-to-end/test_cli.py +++ b/libs/executors/tests/end-to-end/test_cli.py @@ -32,28 +32,58 @@ def _skip_python_39(command): class TestCli: query = ( - 'SELECT resource, dimensions.name AS name, metrics.clicks AS clicks ' - 'FROM resource' + 'SELECT resource, dimensions.name AS name, dimensions.values AS values, ' + 'metrics.clicks AS clicks FROM resource' ) expected_output = [ { 'resource': 'Campaign A', 'name': 'Ad Group 1', + 'values': '1|2', 'clicks': 1500, }, { 'resource': 'Campaign B', 'name': 'Ad Group 2', + 'values': '1|2', 'clicks': 2300, }, { 'resource': 'Campaign C', 'name': 'Ad Group 3', + 'values': '1|2', 'clicks': 800, }, { 'resource': 'Campaign A', 'name': 'Ad Group 4', + 'values': '1|2', + 'clicks': 3200, + }, + ] + expected_output_arrays = [ + { + 'resource': 'Campaign A', + 'name': 'Ad Group 1', + 'values': [1, 2], + 'clicks': 1500, + }, + { + 'resource': 'Campaign B', + 'name': 'Ad Group 2', + 'values': [1, 2], + 'clicks': 2300, + }, + { + 'resource': 'Campaign C', + 'name': 'Ad Group 3', + 'values': [1, 2], + 'clicks': 800, + }, + { + 'resource': 'Campaign A', + 'name': 'Ad Group 4', + 'values': [1, 2], 'clicks': 3200, }, ] @@ -109,18 +139,7 @@ def test_fake_source_from_config(self, tmp_path, command): query_path = tmp_path / 'query.sql' with pathlib.Path.open(query_path, 'w', encoding='utf-8') as f: f.write(self.query) - test_config = _SCRIPT_PATH / 'test_config.yaml' - with open(test_config, 'r', encoding='utf-8') as f: - config_data = yaml.safe_load(f) - original_data_location = config_data['fake']['fetcher_parameters'][ - 'data_location' - ] - config_data['fake']['fetcher_parameters']['data_location'] = str( - _SCRIPT_PATH / original_data_location - ) - tmp_config = tmp_path / 'config.yaml' - with open(tmp_config, 'w', encoding='utf-8') as f: - yaml.dump(config_data, f, encoding='utf-8') + tmp_config = _prepare_config(tmp_path) command = ( f'{command} {str(query_path)} --source fake ' f'-c {str(tmp_config)} ' @@ -135,7 +154,7 @@ def test_fake_source_from_config(self, tmp_path, command): ) assert result.returncode == 0 - assert json.loads(result.stdout) == self.expected_output + assert json.loads(result.stdout) == self.expected_output_arrays @pytest.mark.parametrize('command', ['garf -w', 'grf workflow run -f']) def test_fake_source_from_workflow(self, command): @@ -154,3 +173,40 @@ def test_fake_source_from_workflow(self, command): for output in result.stdout.split('\n'): if output: assert json.loads(output) == self.expected_output + + @pytest.mark.parametrize('command', ['garf -w', 'grf workflow run -f']) + def test_fake_source_from_workflow_with_config(self, command, tmp_path): + _skip_python_39(command) + workflow_path = _SCRIPT_PATH / 'test_workflow.yaml' + tmp_config = _prepare_config(tmp_path) + command = ( + f'{command} {str(workflow_path)} --loglevel ERROR -c {str(tmp_config)}' + ) + result = subprocess.run( + command, + shell=True, + check=False, + capture_output=True, + text=True, + ) + + assert result.returncode == 0 + for output in result.stdout.split('\n'): + if output: + assert json.loads(output) == self.expected_output_arrays + + +def _prepare_config(path): + test_config = _SCRIPT_PATH / 'test_config.yaml' + with open(test_config, 'r', encoding='utf-8') as f: + config_data = yaml.safe_load(f) + original_data_location = config_data['sources']['fake'][ + 'fetcher_parameters' + ]['data_location'] + config_data['sources']['fake']['fetcher_parameters']['data_location'] = str( + _SCRIPT_PATH / original_data_location + ) + tmp_config = path / 'config.yaml' + with open(tmp_config, 'w', encoding='utf-8') as f: + yaml.dump(config_data, f, encoding='utf-8') + return tmp_config diff --git a/libs/executors/tests/end-to-end/test_config.yaml b/libs/executors/tests/end-to-end/test_config.yaml index d18ba16d..a1a92b5a 100644 --- a/libs/executors/tests/end-to-end/test_config.yaml +++ b/libs/executors/tests/end-to-end/test_config.yaml @@ -1,6 +1,8 @@ -fake: - writer: console - writer_parameters: - format: json - fetcher_parameters: - data_location: test.json +sources: + fake: + writer: console + writer_parameters: + format: json + array_handling: arrays + fetcher_parameters: + data_location: test.json diff --git a/libs/executors/tests/end-to-end/test_query.sql b/libs/executors/tests/end-to-end/test_query.sql index 00107d47..c2986416 100644 --- a/libs/executors/tests/end-to-end/test_query.sql +++ b/libs/executors/tests/end-to-end/test_query.sql @@ -1,5 +1,6 @@ SELECT resource, dimensions.name AS name, + dimensions.values AS values, metrics.clicks AS clicks FROM resource diff --git a/libs/executors/tests/end-to-end/test_workflow.yaml b/libs/executors/tests/end-to-end/test_workflow.yaml index f604aa8c..c24e67f3 100644 --- a/libs/executors/tests/end-to-end/test_workflow.yaml +++ b/libs/executors/tests/end-to-end/test_workflow.yaml @@ -19,6 +19,7 @@ steps: SELECT resource, dimensions.name AS name, + dimensions.values AS values, metrics.clicks AS clicks FROM resource <<: [*empty_context, *writer_defaults] @@ -28,6 +29,7 @@ steps: dimensions: name: Ad Group 1 id: 101 + values: [1, 2] metrics: clicks: 1500 cost: 250.75 @@ -35,6 +37,7 @@ steps: dimensions: name: Ad Group 2 id: 102 + values: [1, 2] metrics: clicks: 2300 cost: 410.20 @@ -42,6 +45,7 @@ steps: dimensions: name: Ad Group 3 id: 103 + values: [1, 2] metrics: clicks: 800 cost: 120.50 @@ -49,6 +53,7 @@ steps: dimensions: name: Ad Group 4 id: 104 + values: [1, 2] metrics: clicks: 3200 cost: 600.00 diff --git a/libs/executors/tests/unit/test_config.py b/libs/executors/tests/unit/test_config.py index d4bc52dd..4e59700a 100644 --- a/libs/executors/tests/unit/test_config.py +++ b/libs/executors/tests/unit/test_config.py @@ -20,53 +20,114 @@ class TestConfig: def test_from_file_returns_correct_context_from_data(self, tmp_path): tmp_config = tmp_path / 'config.yaml' data = { - 'api': { - 'query_parameters': { - 'macro': { - 'start_date': '2025-01-01', + 'sources': { + 'api': { + 'query_parameters': { + 'macro': { + 'start_date': '2025-01-01', + }, + 'template': { + 'cohorts': 1, + }, }, - 'template': { - 'cohorts': 1, + 'fetcher_parameters': { + 'id': [1, 2, 3], }, - }, - 'fetcher_parameters': { - 'id': [1, 2, 3], - }, - 'writer': 'csv', - 'writer_parameters': { - 'destination_folder': '/tmp', - }, + 'writer': 'csv', + 'writer_parameters': { + 'destination_folder': '/tmp', + }, + } } } with open(tmp_config, 'w', encoding='utf-8') as f: yaml.dump(data, f, encoding='utf-8') config = Config.from_file(tmp_config) - expected_config = Config(sources=data) + expected_config = Config(**data) assert config == expected_config def test_save_returns_correct_data(self, tmp_path): tmp_config = tmp_path / 'config.yaml' data = { - 'api': { - 'query_parameters': { - 'macro': { - 'start_date': '2025-01-01', + 'sources': { + 'api': { + 'query_parameters': { + 'macro': { + 'start_date': '2025-01-01', + }, + 'template': { + 'cohorts': 1, + }, }, - 'template': { - 'cohorts': 1, + 'fetcher_parameters': { + 'id': [1, 2, 3], }, - }, - 'fetcher_parameters': { - 'id': [1, 2, 3], - }, - 'writer': 'csv', - 'writer_parameters': { - 'destination_folder': '/tmp', - }, + 'writer': 'csv', + 'writer_parameters': { + 'destination_folder': '/tmp', + }, + } } } - config = Config(sources=data) + config = Config(**data) config.save(tmp_config) with open(tmp_config, 'r', encoding='utf-8') as f: config_data = yaml.safe_load(f) assert config_data == data + + def test_expand(self): + data = { + 'global_parameters': { + 'query_parameters': { + 'macro': { + 'start_date': '2025-01-01', + 'end_date': '2025-12-31', + }, + }, + 'writer_parameters': { + 'destination_folder': '/tmp/data', + }, + }, + 'sources': { + 'api': { + 'query_parameters': { + 'macro': { + 'start_date': '2025-12-01', + }, + 'template': { + 'cohorts': 1, + }, + }, + 'writer': 'csv', + 'writer_parameters': { + 'destination_folder': '/tmp', + }, + }, + 'api2': { + 'writer': 'csv', + }, + }, + } + config = Config(**data).expand() + expected_query_parameters = { + 'macro': { + 'start_date': '2025-12-01', + 'end_date': '2025-12-31', + }, + 'template': { + 'cohorts': 1, + }, + } + assert ( + config.sources.get('api').query_parameters.model_dump() + == expected_query_parameters + ) + assert config.sources.get('api').writer_parameters == { + 'destination_folder': '/tmp', + } + assert config.sources.get('api2').query_parameters.model_dump().get( + 'macro' + ) == data.get('global_parameters').get('query_parameters').get('macro') + assert config.sources.get('api2').writer_parameters == { + 'destination_folder': '/tmp/data', + } diff --git a/libs/executors/tests/unit/workflows/test_workflow.py b/libs/executors/tests/unit/workflows/test_workflow.py index 70a3b931..bd87d83f 100644 --- a/libs/executors/tests/unit/workflows/test_workflow.py +++ b/libs/executors/tests/unit/workflows/test_workflow.py @@ -15,6 +15,7 @@ import pathlib import yaml +from garf.executors import config from garf.executors.workflows.workflow import ( ExecutionStep, Query, @@ -97,6 +98,63 @@ def test_init_with_context(self): assert step.fetcher_parameters.get('id') == new_ids assert step.writer_parameters.get('destination_folder') == new_folder + def test_init_with_context_and_config(self): + test_config = config.Config( + global_parameters={ + 'query_parameters': { + 'macro': { + 'min_impressions': 1, + } + }, + 'writer_parameters': { + 'destination_folder': '/tmp/data', + 'array_handling': 'arrays', + }, + }, + sources={ + 'api': { + 'query_parameters': { + 'macro': {'start_date': '2025-01-01'}, + 'template': { + 'custom_conversions': ['One', 'Two'], + }, + } + } + }, + ) + new_start_date = '2026-01-01' + new_cohort = '2' + new_ids = [4, 5, 6] + new_folder = '/app' + workflow = Workflow( + steps=self.data.get('steps'), + context={ + 'macro': {'start_date': new_start_date}, + 'template': { + 'cohorts': new_cohort, + }, + 'api': {'id': new_ids}, + 'csv': {'destination_folder': new_folder}, + }, + execution_config=test_config, + ) + + step = workflow.steps[0] + assert step.query_parameters.macro == { + 'start_date': new_start_date, + 'end_date': '2025-12-31', + 'min_impressions': 1, + } + assert step.query_parameters.template == { + 'cohorts': new_cohort, + 'custom_conversions': ['One', 'Two'], + } + assert step.fetcher_parameters.get('id') == new_ids + assert step.writer_parameters == { + 'destination_folder': new_folder, + 'array_handling': 'arrays', + } + def test_compile(self): workflow = Workflow( steps=[