Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 45 additions & 4 deletions libs/executors/garf/executors/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import pydantic
import smart_open
import yaml
from garf.core import query_editor
from garf.executors import utils
from garf.executors.execution_context import ExecutionContext
from typing_extensions import Self


class Config(pydantic.BaseModel):
Expand All @@ -35,18 +38,56 @@ class Config(pydantic.BaseModel):
"""

sources: dict[str, ExecutionContext]
global_parameters: ExecutionContext | None = None

@classmethod
def from_file(cls, path: str | pathlib.Path | os.PathLike[str]) -> Config:
"""Builds config from local or remote yaml file."""
with smart_open.open(path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
return Config(sources=data)
return Config(**data)

def save(self, path: str | pathlib.Path | os.PathLike[str]) -> str:
"""Saves config to local or remote yaml file."""
with smart_open.open(path, 'w', encoding='utf-8') as f:
yaml.dump(
self.model_dump(exclude_none=True).get('sources'), f, encoding='utf-8'
)
yaml.dump(self.model_dump(exclude_none=True), f, encoding='utf-8')
return f'Config is saved to {str(path)}'

def expand(self) -> Self:
if global_parameters := self.global_parameters:
if query_parameters := global_parameters.query_parameters:
common_parameters = {
k: v for k, v in query_parameters.model_dump().items() if v
}
for k, s in self.sources.items():
source_parameters = {
k: v for k, v in s.query_parameters.model_dump().items() if v
}
source_joined_parameters = utils.merge_dicts(
dict(common_parameters), source_parameters
)
s.query_parameters = query_editor.GarfQueryParameters(
**source_joined_parameters
)
if writer_parameters := global_parameters.writer_parameters:
common_parameters = {k: v for k, v in writer_parameters.items() if v}
for k, s in self.sources.items():
writer_parameters = {
k: v for k, v in s.writer_parameters.items() if v
}
writer_joined_parameters = utils.merge_dicts(
dict(common_parameters), writer_parameters
)
s.writer_parameters = writer_joined_parameters
if fetcher_parameters := global_parameters.fetcher_parameters:
common_parameters = {k: v for k, v in fetcher_parameters.items() if v}
for k, s in self.sources.items():
fetcher_parameters = {
k: v for k, v in s.fetcher_parameters.items() if v
}
fetcher_joined_parameters = utils.merge_dicts(
dict(common_parameters), fetcher_parameters
)
s.fetcher_parameters = fetcher_joined_parameters

return self
2 changes: 1 addition & 1 deletion libs/executors/garf/executors/entrypoints/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def main():

extra_parameters = utils.ParamsParser().parse_all(kwargs)
execution_workflow = workflow.Workflow.from_file(
workflow_file, context=extra_parameters
workflow_file, context=extra_parameters, config_file=args.config
)
workflow_skip = args.workflow_skip if args.workflow_skip else None
workflow_include = args.workflow_include if args.workflow_include else None
Expand Down
26 changes: 21 additions & 5 deletions libs/executors/garf/executors/entrypoints/typer_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,13 @@
]


def _init_runner(file, context=None) -> workflow_runner.WorkflowRunner:
def _init_runner(
file, context=None, config=None
) -> workflow_runner.WorkflowRunner:
wf_parent = pathlib.Path.cwd() / pathlib.Path(file).parent
execution_workflow = workflow.Workflow.from_file(path=file, context=context)
execution_workflow = workflow.Workflow.from_file(
path=file, context=context, config_file=config
)
return workflow_runner.WorkflowRunner(
execution_workflow=execution_workflow, wf_parent=wf_parent
)
Expand Down Expand Up @@ -274,6 +278,10 @@ def run(
str,
typer.Option('--workflow', '-f', help='Workflow YAML file'),
],
config: Annotated[
Optional[str],
typer.Option('--config', '-c', help='Yaml file with parameters'),
] = None,
include: Annotated[
Optional[str],
typer.Option('--include', '-i', help='Steps of workflow to execute'),
Expand All @@ -298,7 +306,7 @@ def run(
)
garf_logger.addHandler(initialize_logger())
context = utils.ParamsParser().parse_all(ctx.args)
_init_runner(file, context).run(
_init_runner(file, context, config).run(
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
selected_aliases=include,
Expand All @@ -320,12 +328,16 @@ def compile(
Optional[str],
typer.Option('--output-workflow', '-o', help='Output workflow YAML file'),
] = None,
config: Annotated[
Optional[str],
typer.Option('--config', '-c', help='Yaml file with parameters'),
] = None,
):
"""Compiles workflow."""
if not output_file:
output_file = pathlib.Path(file).stem / '_compiled.yaml'
context = utils.ParamsParser().parse_all(ctx.args)
_init_runner(file, context).compile(output_file)
_init_runner(file, context, config).compile(output_file)


@workflow_app.command(
Expand All @@ -341,6 +353,10 @@ def deploy(
Optional[str],
typer.Option('--output-workflow', '-o', help='Output workflow YAML file'),
] = None,
config: Annotated[
Optional[str],
typer.Option('--config', '-c', help='Yaml file with parameters'),
] = None,
embed_queries: Annotated[
bool,
typer.Option(
Expand All @@ -352,7 +368,7 @@ def deploy(
if not output_file:
output_file = pathlib.Path(file).stem / '_gcp.yaml'
context = utils.ParamsParser().parse_all(ctx.args)
_init_runner(file, context).deploy(
_init_runner(file, context, config).deploy(
path=output_file, embed_queries=embed_queries
)

Expand Down
33 changes: 33 additions & 0 deletions libs/executors/garf/executors/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Common utilities for garf executors."""

import copy
from typing import Any


def merge_dicts(dict1: dict[str, Any], dict2: dict[str, Any]) -> dict[str, Any]:
"""Performs deep merging of nested dicts."""
result = copy.deepcopy(dict1)
for key, value in dict2.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = merge_dicts(result[key], value)
else:
result[key] = value
return result
52 changes: 27 additions & 25 deletions libs/executors/garf/executors/workflows/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

from __future__ import annotations

import copy
import os
import pathlib
from collections import defaultdict
Expand All @@ -25,7 +24,7 @@
import smart_open
import yaml
from garf.core import query_editor
from garf.executors import exceptions
from garf.executors import config, exceptions, utils
from garf.executors.execution_context import ExecutionContext
from garf.io import reader

Expand Down Expand Up @@ -166,35 +165,49 @@ class Workflow(pydantic.BaseModel):

steps: list[ExecutionStep]
context: dict[str, dict[str, Any]] | None = None
execution_config: config.Config | None = None
prefix: str | pathlib.Path | None = pydantic.Field(
default=None, excluded=True
)

def model_post_init(self, __context__) -> None:
if self.execution_config:
self.execution_config.expand()
config_parameters = self.execution_config.sources
else:
config_parameters = {}
custom_parameters = defaultdict(dict)
if context := self.context:
custom_parameters = defaultdict(dict)
if custom_macros := context.get('macro'):
custom_parameters['query_parameters']['macro'] = custom_macros
if custom_templates := context.get('template'):
custom_parameters['query_parameters']['template'] = custom_templates

steps = self.steps
for i, step in enumerate(steps):
steps = self.steps
for i, step in enumerate(steps):
if context:
if fetcher_parameters := context.get(step.fetcher):
custom_parameters['fetcher_parameters'] = fetcher_parameters
if writer_parameters := context.get(step.writer):
custom_parameters['writer_parameters'] = writer_parameters

res = _merge_dicts(
step.model_dump(exclude_none=True), custom_parameters
if source_config_parameters := config_parameters.get(step.fetcher):
res = utils.merge_dicts(
step.model_dump(exclude_none=True),
source_config_parameters.model_dump(exclude_none=True),
)
steps[i] = ExecutionStep(**res)
else:
res = step.model_dump(exclude_none=True)
res = utils.merge_dicts(res, custom_parameters)

steps[i] = ExecutionStep(**res)

@classmethod
def from_file(
cls,
path: str | pathlib.Path | os.PathLike[str],
context: dict[str, dict[str, Any]] | None = None,
config_file: str | pathlib.Path | os.PathLike[str] | None = None,
) -> Workflow:
"""Builds workflow from local or remote yaml file."""
with smart_open.open(path, 'r', encoding='utf-8') as f:
Expand All @@ -203,7 +216,12 @@ def from_file(
if isinstance(path, str):
path = pathlib.Path(path)
return Workflow(
steps=data.get('steps'), context=context, prefix=path.parent
steps=data.get('steps'),
context=context,
prefix=path.parent,
execution_config=config.Config.from_file(config_file)
if config_file
else None,
)
except pydantic.ValidationError as e:
raise GarfWorkflowError(f'Incorrect workflow:\n {e}') from e
Expand All @@ -229,19 +247,3 @@ def compile(self) -> None:
else:
new_queries.append(query.to_query(self.prefix))
step.queries = new_queries


def _merge_dicts(
dict1: dict[str, Any], dict2: dict[str, Any]
) -> dict[str, Any]:
result = copy.deepcopy(dict1)
for key, value in dict2.items():
if (
key in result
and isinstance(result[key], dict)
and isinstance(value, dict)
):
result[key] = _merge_dicts(result[key], value)
else:
result[key] = value
return result
1 change: 1 addition & 0 deletions libs/executors/tests/end-to-end/query.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
SELECT
resource,
dimensions.name AS name,
dimensions.values AS values,
metrics.clicks AS clicks
FROM resource
12 changes: 8 additions & 4 deletions libs/executors/tests/end-to-end/test.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
"resource": "Campaign A",
"dimensions": {
"name": "Ad Group 1",
"id": 101
"id": 101,
"values": [1, 2]
},
"metrics": {
"clicks": 1500,
Expand All @@ -14,7 +15,8 @@
"resource": "Campaign B",
"dimensions": {
"name": "Ad Group 2",
"id": 102
"id": 102,
"values": [1, 2]
},
"metrics": {
"clicks": 2300,
Expand All @@ -25,7 +27,8 @@
"resource": "Campaign C",
"dimensions": {
"name": "Ad Group 3",
"id": 103
"id": 103,
"values": [1, 2]
},
"metrics": {
"clicks": 800,
Expand All @@ -36,7 +39,8 @@
"resource": "Campaign A",
"dimensions": {
"name": "Ad Group 4",
"id": 104
"id": 104,
"values": [1, 2]
},
"metrics": {
"clicks": 3200,
Expand Down
Loading
Loading