Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
"""Operator creator for Databricks DLT (Delta Live Tables) pipelines."""
"""Operator creator for declarative pipelines (DLT/Delta Live Tables)."""

import logging
from typing import Any

from airflow.models import BaseOperator, DAG

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.pipeline.tasks.databricks_dlt_task import DatabricksDLTTask
from dagger.pipeline.tasks.declarative_pipeline_task import DeclarativePipelineTask

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -51,31 +51,31 @@ def _cancel_databricks_run(context: dict[str, Any]) -> None:
_logger.error(f"Failed to cancel Databricks run {run_id}: {e}")


class DatabricksDLTCreator(OperatorCreator):
"""Creates operators for triggering Databricks DLT pipelines via Jobs.
class DeclarativePipelineCreator(OperatorCreator):
"""Creates operators for triggering declarative pipelines via Databricks Jobs.

This creator uses DatabricksRunNowOperator to trigger a Databricks Job
that wraps the DLT pipeline. The job is identified by name and must be
defined in the Databricks Asset Bundle.

Attributes:
ref_name: Reference name used by OperatorFactory to match this creator
with DatabricksDLTTask instances.
with DeclarativePipelineTask instances.
"""

ref_name: str = "databricks_dlt"
ref_name: str = "declarative_pipeline"

def __init__(self, task: DatabricksDLTTask, dag: DAG) -> None:
"""Initialize the DatabricksDLTCreator.
def __init__(self, task: DeclarativePipelineTask, dag: DAG) -> None:
"""Initialize the DeclarativePipelineCreator.

Args:
task: The DatabricksDLTTask containing pipeline configuration.
task: The DeclarativePipelineTask containing pipeline configuration.
dag: The Airflow DAG this operator will belong to.
"""
super().__init__(task, dag)

def _create_operator(self, **kwargs: Any) -> BaseOperator:
"""Create a DatabricksRunNowOperator for the DLT pipeline.
"""Create a DatabricksRunNowOperator for the declarative pipeline.

Creates an Airflow operator that triggers an existing Databricks Job
by name. The job must have a pipeline_task that references the DLT
Expand All @@ -97,11 +97,11 @@ def _create_operator(self, **kwargs: Any) -> BaseOperator:
DatabricksRunNowOperator,
)

# Get task parameters - defaults are handled in DatabricksDLTTask
# Get task parameters - defaults are handled in DeclarativePipelineTask
job_name: str = self._task.job_name
if not job_name:
raise ValueError(
f"job_name is required for DatabricksDLTTask '{self._task.name}'"
f"job_name is required for DeclarativePipelineTask '{self._task.name}'"
)
databricks_conn_id: str = self._task.databricks_conn_id
wait_for_completion: bool = self._task.wait_for_completion
Expand Down
2 changes: 1 addition & 1 deletion dagger/dag_creator/airflow/operator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
airflow_op_creator,
athena_transform_creator,
batch_creator,
databricks_dlt_creator,
declarative_pipeline_creator,
dbt_creator,
dummy_creator,
python_creator,
Expand Down
2 changes: 1 addition & 1 deletion dagger/pipeline/task_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
airflow_op_task,
athena_transform_task,
batch_task,
databricks_dlt_task,
declarative_pipeline_task,
dbt_task,
dummy_task,
python_task,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Task configuration for Databricks DLT (Delta Live Tables) pipelines."""
"""Task configuration for declarative pipelines (DLT/Delta Live Tables)."""

from typing import Any, Optional

from dagger.pipeline.task import Task
from dagger.utilities.config_validator import Attribute


class DatabricksDLTTask(Task):
"""Task configuration for triggering Databricks DLT pipelines via Jobs.
class DeclarativePipelineTask(Task):
"""Task configuration for triggering declarative pipelines via Databricks Jobs.

This task type uses DatabricksRunNowOperator to trigger a Databricks Job
that wraps the DLT pipeline. The job is identified by name and must be
Expand All @@ -23,27 +23,31 @@ class DatabricksDLTTask(Task):
cancel_on_kill: Whether to cancel Databricks job if Airflow task is killed.

Example YAML configuration:
type: databricks_dlt
type: declarative_pipeline
description: Run DLT pipeline users
inputs:
- type: athena
schema: ddb_changelogs
table: order_preference
follow_external_dependency: true
- type: s3
name: input_order_service_public_users
bucket: cho${ENV}-data-lake
path: pg_changelogs/kafka/order-service/order_service.public.users
outputs:
- type: databricks
catalog: ${ENV_MARTS}
schema: dlt_users
table: silver_order_preference
catalog: changelogs
schema: order_service_public
table: pg_users
- type: databricks
catalog: core
schema: order_service
table: pg_users
task_parameters:
job_name: dlt-users
job_name: "[JOB] order-service-pipeline"
databricks_conn_id: databricks_default
wait_for_completion: true
poll_interval_seconds: 30
timeout_seconds: 3600
"""

ref_name: str = "databricks_dlt"
ref_name: str = "declarative_pipeline"

@classmethod
def init_attributes(cls, orig_cls: type) -> None:
Expand Down Expand Up @@ -106,7 +110,7 @@ def __init__(
pipeline: Any,
job_config: dict[str, Any],
) -> None:
"""Initialize a DatabricksDLTTask instance.
"""Initialize a DeclarativePipelineTask instance.

Args:
name: The task name (used as task_id in Airflow).
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
"""Unit tests for DatabricksDLTCreator."""
"""Unit tests for DeclarativePipelineCreator."""

import sys
import unittest
from datetime import timedelta
from unittest.mock import MagicMock, patch

from dagger.dag_creator.airflow.operator_creators.databricks_dlt_creator import (
DatabricksDLTCreator,
from dagger.dag_creator.airflow.operator_creators.declarative_pipeline_creator import (
DeclarativePipelineCreator,
_cancel_databricks_run,
)


class TestDatabricksDLTCreator(unittest.TestCase):
"""Test cases for DatabricksDLTCreator."""
class TestDeclarativePipelineCreator(unittest.TestCase):
"""Test cases for DeclarativePipelineCreator."""

def setUp(self) -> None:
"""Set up test fixtures."""
Expand All @@ -35,7 +35,7 @@ def setUp(self) -> None:

def test_ref_name(self) -> None:
"""Test that ref_name is correctly set."""
self.assertEqual(DatabricksDLTCreator.ref_name, "databricks_dlt")
self.assertEqual(DeclarativePipelineCreator.ref_name, "declarative_pipeline")

@patch.dict(
sys.modules,
Expand All @@ -49,7 +49,7 @@ def test_create_operator(self) -> None:
"airflow.providers.databricks.operators.databricks"
].DatabricksRunNowOperator = mock_operator_class

creator = DatabricksDLTCreator(self.mock_task, self.mock_dag)
creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag)
operator = creator._create_operator()

mock_operator_class.assert_called_once()
Expand All @@ -66,7 +66,7 @@ def test_create_operator_maps_task_properties(self) -> None:
"airflow.providers.databricks.operators.databricks"
].DatabricksRunNowOperator = mock_operator_class

creator = DatabricksDLTCreator(self.mock_task, self.mock_dag)
creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag)
creator._create_operator()

call_kwargs = mock_operator_class.call_args[1]
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_create_operator_with_custom_values(self) -> None:
"airflow.providers.databricks.operators.databricks"
].DatabricksRunNowOperator = mock_operator_class

creator = DatabricksDLTCreator(self.mock_task, self.mock_dag)
creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag)
creator._create_operator()

call_kwargs = mock_operator_class.call_args[1]
Expand All @@ -115,7 +115,7 @@ def test_create_operator_empty_job_name_raises_error(self) -> None:
"""Test that empty job_name raises ValueError."""
self.mock_task.job_name = ""

creator = DatabricksDLTCreator(self.mock_task, self.mock_dag)
creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag)

with self.assertRaises(ValueError) as context:
creator._create_operator()
Expand All @@ -131,7 +131,7 @@ def test_create_operator_none_job_name_raises_error(self) -> None:
"""Test that None job_name raises ValueError."""
self.mock_task.job_name = None

creator = DatabricksDLTCreator(self.mock_task, self.mock_dag)
creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag)

with self.assertRaises(ValueError) as context:
creator._create_operator()
Expand All @@ -149,7 +149,7 @@ def test_create_operator_passes_kwargs(self) -> None:
"airflow.providers.databricks.operators.databricks"
].DatabricksRunNowOperator = mock_operator_class

creator = DatabricksDLTCreator(self.mock_task, self.mock_dag)
creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag)
creator._create_operator(retries=3, retry_delay=60)

call_kwargs = mock_operator_class.call_args[1]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: databricks_dlt
type: declarative_pipeline
description: Test DLT pipeline task
inputs:
- type: athena
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
"""Unit tests for DatabricksDLTTask."""
"""Unit tests for DeclarativePipelineTask."""

import unittest
from unittest.mock import MagicMock

import yaml

from dagger.pipeline.tasks.databricks_dlt_task import DatabricksDLTTask
from dagger.pipeline.tasks.declarative_pipeline_task import DeclarativePipelineTask


class TestDatabricksDLTTask(unittest.TestCase):
"""Test cases for DatabricksDLTTask."""
class TestDeclarativePipelineTask(unittest.TestCase):
"""Test cases for DeclarativePipelineTask."""

def setUp(self) -> None:
"""Set up test fixtures."""
with open(
"tests/fixtures/pipeline/tasks/databricks_dlt_task.yaml", "r"
"tests/fixtures/pipeline/tasks/declarative_pipeline_task.yaml", "r"
) as stream:
self.config = yaml.safe_load(stream)

# Create a mock pipeline object
self.mock_pipeline = MagicMock()
self.mock_pipeline.directory = "tests/fixtures/pipeline/tasks"

self.task = DatabricksDLTTask(
self.task = DeclarativePipelineTask(
name="test_dlt_task",
pipeline_name="test_pipeline",
pipeline=self.mock_pipeline,
Expand All @@ -31,7 +31,7 @@ def setUp(self) -> None:

def test_ref_name(self) -> None:
"""Test that ref_name is correctly set."""
self.assertEqual(DatabricksDLTTask.ref_name, "databricks_dlt")
self.assertEqual(DeclarativePipelineTask.ref_name, "declarative_pipeline")

def test_job_name(self) -> None:
"""Test job_name property."""
Expand Down Expand Up @@ -66,13 +66,13 @@ def test_pipeline_name(self) -> None:
self.assertEqual(self.task.pipeline_name, "test_pipeline")


class TestDatabricksDLTTaskDefaults(unittest.TestCase):
"""Test cases for DatabricksDLTTask default values."""
class TestDeclarativePipelineTaskDefaults(unittest.TestCase):
"""Test cases for DeclarativePipelineTask default values."""

def setUp(self) -> None:
"""Set up test fixtures with minimal config."""
self.config = {
"type": "databricks_dlt",
"type": "declarative_pipeline",
"description": "Test DLT task with defaults",
"inputs": [],
"outputs": [],
Expand All @@ -86,7 +86,7 @@ def setUp(self) -> None:
self.mock_pipeline = MagicMock()
self.mock_pipeline.directory = "tests/fixtures/pipeline/tasks"

self.task = DatabricksDLTTask(
self.task = DeclarativePipelineTask(
name="minimal_dlt_task",
pipeline_name="test_pipeline",
pipeline=self.mock_pipeline,
Expand Down Expand Up @@ -114,13 +114,13 @@ def test_default_cancel_on_kill(self) -> None:
self.assertTrue(self.task.cancel_on_kill)


class TestDatabricksDLTTaskBooleanHandling(unittest.TestCase):
class TestDeclarativePipelineTaskBooleanHandling(unittest.TestCase):
"""Test cases for boolean parameter handling edge cases."""

def test_wait_for_completion_false(self) -> None:
"""Test that wait_for_completion=false is correctly handled."""
config = {
"type": "databricks_dlt",
"type": "declarative_pipeline",
"description": "Test",
"inputs": [],
"outputs": [],
Expand All @@ -135,7 +135,7 @@ def test_wait_for_completion_false(self) -> None:
mock_pipeline = MagicMock()
mock_pipeline.directory = "tests/fixtures/pipeline/tasks"

task = DatabricksDLTTask(
task = DeclarativePipelineTask(
name="test_task",
pipeline_name="test_pipeline",
pipeline=mock_pipeline,
Expand All @@ -147,7 +147,7 @@ def test_wait_for_completion_false(self) -> None:
def test_cancel_on_kill_false(self) -> None:
"""Test that cancel_on_kill=false is correctly handled."""
config = {
"type": "databricks_dlt",
"type": "declarative_pipeline",
"description": "Test",
"inputs": [],
"outputs": [],
Expand All @@ -162,7 +162,7 @@ def test_cancel_on_kill_false(self) -> None:
mock_pipeline = MagicMock()
mock_pipeline.directory = "tests/fixtures/pipeline/tasks"

task = DatabricksDLTTask(
task = DeclarativePipelineTask(
name="test_task",
pipeline_name="test_pipeline",
pipeline=mock_pipeline,
Expand Down