diff --git a/dagger/dag_creator/airflow/operator_creators/databricks_dlt_creator.py b/dagger/dag_creator/airflow/operator_creators/declarative_pipeline_creator.py similarity index 83% rename from dagger/dag_creator/airflow/operator_creators/databricks_dlt_creator.py rename to dagger/dag_creator/airflow/operator_creators/declarative_pipeline_creator.py index 87a11ac..94ba721 100644 --- a/dagger/dag_creator/airflow/operator_creators/databricks_dlt_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/declarative_pipeline_creator.py @@ -1,4 +1,4 @@ -"""Operator creator for Databricks DLT (Delta Live Tables) pipelines.""" +"""Operator creator for declarative pipelines (DLT/Delta Live Tables).""" import logging from typing import Any @@ -6,7 +6,7 @@ from airflow.models import BaseOperator, DAG from dagger.dag_creator.airflow.operator_creator import OperatorCreator -from dagger.pipeline.tasks.databricks_dlt_task import DatabricksDLTTask +from dagger.pipeline.tasks.declarative_pipeline_task import DeclarativePipelineTask _logger = logging.getLogger(__name__) @@ -51,8 +51,8 @@ def _cancel_databricks_run(context: dict[str, Any]) -> None: _logger.error(f"Failed to cancel Databricks run {run_id}: {e}") -class DatabricksDLTCreator(OperatorCreator): - """Creates operators for triggering Databricks DLT pipelines via Jobs. +class DeclarativePipelineCreator(OperatorCreator): + """Creates operators for triggering declarative pipelines via Databricks Jobs. This creator uses DatabricksRunNowOperator to trigger a Databricks Job that wraps the DLT pipeline. The job is identified by name and must be @@ -60,22 +60,22 @@ class DatabricksDLTCreator(OperatorCreator): Attributes: ref_name: Reference name used by OperatorFactory to match this creator - with DatabricksDLTTask instances. + with DeclarativePipelineTask instances. """ - ref_name: str = "databricks_dlt" + ref_name: str = "declarative_pipeline" - def __init__(self, task: DatabricksDLTTask, dag: DAG) -> None: - """Initialize the DatabricksDLTCreator. + def __init__(self, task: DeclarativePipelineTask, dag: DAG) -> None: + """Initialize the DeclarativePipelineCreator. Args: - task: The DatabricksDLTTask containing pipeline configuration. + task: The DeclarativePipelineTask containing pipeline configuration. dag: The Airflow DAG this operator will belong to. """ super().__init__(task, dag) def _create_operator(self, **kwargs: Any) -> BaseOperator: - """Create a DatabricksRunNowOperator for the DLT pipeline. + """Create a DatabricksRunNowOperator for the declarative pipeline. Creates an Airflow operator that triggers an existing Databricks Job by name. The job must have a pipeline_task that references the DLT @@ -97,11 +97,11 @@ def _create_operator(self, **kwargs: Any) -> BaseOperator: DatabricksRunNowOperator, ) - # Get task parameters - defaults are handled in DatabricksDLTTask + # Get task parameters - defaults are handled in DeclarativePipelineTask job_name: str = self._task.job_name if not job_name: raise ValueError( - f"job_name is required for DatabricksDLTTask '{self._task.name}'" + f"job_name is required for DeclarativePipelineTask '{self._task.name}'" ) databricks_conn_id: str = self._task.databricks_conn_id wait_for_completion: bool = self._task.wait_for_completion diff --git a/dagger/dag_creator/airflow/operator_factory.py b/dagger/dag_creator/airflow/operator_factory.py index dd7344e..079e3a0 100644 --- a/dagger/dag_creator/airflow/operator_factory.py +++ b/dagger/dag_creator/airflow/operator_factory.py @@ -4,7 +4,7 @@ airflow_op_creator, athena_transform_creator, batch_creator, - databricks_dlt_creator, + declarative_pipeline_creator, dbt_creator, dummy_creator, python_creator, diff --git a/dagger/pipeline/task_factory.py b/dagger/pipeline/task_factory.py index f5f80bb..f7a89f2 100644 --- a/dagger/pipeline/task_factory.py +++ b/dagger/pipeline/task_factory.py @@ -3,7 +3,7 @@ airflow_op_task, athena_transform_task, batch_task, - databricks_dlt_task, + declarative_pipeline_task, dbt_task, dummy_task, python_task, diff --git a/dagger/pipeline/tasks/databricks_dlt_task.py b/dagger/pipeline/tasks/declarative_pipeline_task.py similarity index 87% rename from dagger/pipeline/tasks/databricks_dlt_task.py rename to dagger/pipeline/tasks/declarative_pipeline_task.py index 4f0b113..5cb4e81 100644 --- a/dagger/pipeline/tasks/databricks_dlt_task.py +++ b/dagger/pipeline/tasks/declarative_pipeline_task.py @@ -1,4 +1,4 @@ -"""Task configuration for Databricks DLT (Delta Live Tables) pipelines.""" +"""Task configuration for declarative pipelines (DLT/Delta Live Tables).""" from typing import Any, Optional @@ -6,8 +6,8 @@ from dagger.utilities.config_validator import Attribute -class DatabricksDLTTask(Task): - """Task configuration for triggering Databricks DLT pipelines via Jobs. +class DeclarativePipelineTask(Task): + """Task configuration for triggering declarative pipelines via Databricks Jobs. This task type uses DatabricksRunNowOperator to trigger a Databricks Job that wraps the DLT pipeline. The job is identified by name and must be @@ -23,27 +23,31 @@ class DatabricksDLTTask(Task): cancel_on_kill: Whether to cancel Databricks job if Airflow task is killed. Example YAML configuration: - type: databricks_dlt + type: declarative_pipeline description: Run DLT pipeline users inputs: - - type: athena - schema: ddb_changelogs - table: order_preference - follow_external_dependency: true + - type: s3 + name: input_order_service_public_users + bucket: cho${ENV}-data-lake + path: pg_changelogs/kafka/order-service/order_service.public.users outputs: - type: databricks - catalog: ${ENV_MARTS} - schema: dlt_users - table: silver_order_preference + catalog: changelogs + schema: order_service_public + table: pg_users + - type: databricks + catalog: core + schema: order_service + table: pg_users task_parameters: - job_name: dlt-users + job_name: "[JOB] order-service-pipeline" databricks_conn_id: databricks_default wait_for_completion: true poll_interval_seconds: 30 timeout_seconds: 3600 """ - ref_name: str = "databricks_dlt" + ref_name: str = "declarative_pipeline" @classmethod def init_attributes(cls, orig_cls: type) -> None: @@ -106,7 +110,7 @@ def __init__( pipeline: Any, job_config: dict[str, Any], ) -> None: - """Initialize a DatabricksDLTTask instance. + """Initialize a DeclarativePipelineTask instance. Args: name: The task name (used as task_id in Airflow). diff --git a/tests/dag_creator/airflow/operator_creators/test_databricks_dlt_creator.py b/tests/dag_creator/airflow/operator_creators/test_declarative_pipeline_creator.py similarity index 92% rename from tests/dag_creator/airflow/operator_creators/test_databricks_dlt_creator.py rename to tests/dag_creator/airflow/operator_creators/test_declarative_pipeline_creator.py index 39de91b..5addd03 100644 --- a/tests/dag_creator/airflow/operator_creators/test_databricks_dlt_creator.py +++ b/tests/dag_creator/airflow/operator_creators/test_declarative_pipeline_creator.py @@ -1,18 +1,18 @@ -"""Unit tests for DatabricksDLTCreator.""" +"""Unit tests for DeclarativePipelineCreator.""" import sys import unittest from datetime import timedelta from unittest.mock import MagicMock, patch -from dagger.dag_creator.airflow.operator_creators.databricks_dlt_creator import ( - DatabricksDLTCreator, +from dagger.dag_creator.airflow.operator_creators.declarative_pipeline_creator import ( + DeclarativePipelineCreator, _cancel_databricks_run, ) -class TestDatabricksDLTCreator(unittest.TestCase): - """Test cases for DatabricksDLTCreator.""" +class TestDeclarativePipelineCreator(unittest.TestCase): + """Test cases for DeclarativePipelineCreator.""" def setUp(self) -> None: """Set up test fixtures.""" @@ -35,7 +35,7 @@ def setUp(self) -> None: def test_ref_name(self) -> None: """Test that ref_name is correctly set.""" - self.assertEqual(DatabricksDLTCreator.ref_name, "databricks_dlt") + self.assertEqual(DeclarativePipelineCreator.ref_name, "declarative_pipeline") @patch.dict( sys.modules, @@ -49,7 +49,7 @@ def test_create_operator(self) -> None: "airflow.providers.databricks.operators.databricks" ].DatabricksRunNowOperator = mock_operator_class - creator = DatabricksDLTCreator(self.mock_task, self.mock_dag) + creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag) operator = creator._create_operator() mock_operator_class.assert_called_once() @@ -66,7 +66,7 @@ def test_create_operator_maps_task_properties(self) -> None: "airflow.providers.databricks.operators.databricks" ].DatabricksRunNowOperator = mock_operator_class - creator = DatabricksDLTCreator(self.mock_task, self.mock_dag) + creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag) creator._create_operator() call_kwargs = mock_operator_class.call_args[1] @@ -97,7 +97,7 @@ def test_create_operator_with_custom_values(self) -> None: "airflow.providers.databricks.operators.databricks" ].DatabricksRunNowOperator = mock_operator_class - creator = DatabricksDLTCreator(self.mock_task, self.mock_dag) + creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag) creator._create_operator() call_kwargs = mock_operator_class.call_args[1] @@ -115,7 +115,7 @@ def test_create_operator_empty_job_name_raises_error(self) -> None: """Test that empty job_name raises ValueError.""" self.mock_task.job_name = "" - creator = DatabricksDLTCreator(self.mock_task, self.mock_dag) + creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag) with self.assertRaises(ValueError) as context: creator._create_operator() @@ -131,7 +131,7 @@ def test_create_operator_none_job_name_raises_error(self) -> None: """Test that None job_name raises ValueError.""" self.mock_task.job_name = None - creator = DatabricksDLTCreator(self.mock_task, self.mock_dag) + creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag) with self.assertRaises(ValueError) as context: creator._create_operator() @@ -149,7 +149,7 @@ def test_create_operator_passes_kwargs(self) -> None: "airflow.providers.databricks.operators.databricks" ].DatabricksRunNowOperator = mock_operator_class - creator = DatabricksDLTCreator(self.mock_task, self.mock_dag) + creator = DeclarativePipelineCreator(self.mock_task, self.mock_dag) creator._create_operator(retries=3, retry_delay=60) call_kwargs = mock_operator_class.call_args[1] diff --git a/tests/fixtures/pipeline/tasks/databricks_dlt_task.yaml b/tests/fixtures/pipeline/tasks/declarative_pipeline_task.yaml similarity index 94% rename from tests/fixtures/pipeline/tasks/databricks_dlt_task.yaml rename to tests/fixtures/pipeline/tasks/declarative_pipeline_task.yaml index 1902cf0..be20955 100644 --- a/tests/fixtures/pipeline/tasks/databricks_dlt_task.yaml +++ b/tests/fixtures/pipeline/tasks/declarative_pipeline_task.yaml @@ -1,4 +1,4 @@ -type: databricks_dlt +type: declarative_pipeline description: Test DLT pipeline task inputs: - type: athena diff --git a/tests/pipeline/tasks/test_databricks_dlt_task.py b/tests/pipeline/tasks/test_declarative_pipeline_task.py similarity index 84% rename from tests/pipeline/tasks/test_databricks_dlt_task.py rename to tests/pipeline/tasks/test_declarative_pipeline_task.py index a222148..5904351 100644 --- a/tests/pipeline/tasks/test_databricks_dlt_task.py +++ b/tests/pipeline/tasks/test_declarative_pipeline_task.py @@ -1,20 +1,20 @@ -"""Unit tests for DatabricksDLTTask.""" +"""Unit tests for DeclarativePipelineTask.""" import unittest from unittest.mock import MagicMock import yaml -from dagger.pipeline.tasks.databricks_dlt_task import DatabricksDLTTask +from dagger.pipeline.tasks.declarative_pipeline_task import DeclarativePipelineTask -class TestDatabricksDLTTask(unittest.TestCase): - """Test cases for DatabricksDLTTask.""" +class TestDeclarativePipelineTask(unittest.TestCase): + """Test cases for DeclarativePipelineTask.""" def setUp(self) -> None: """Set up test fixtures.""" with open( - "tests/fixtures/pipeline/tasks/databricks_dlt_task.yaml", "r" + "tests/fixtures/pipeline/tasks/declarative_pipeline_task.yaml", "r" ) as stream: self.config = yaml.safe_load(stream) @@ -22,7 +22,7 @@ def setUp(self) -> None: self.mock_pipeline = MagicMock() self.mock_pipeline.directory = "tests/fixtures/pipeline/tasks" - self.task = DatabricksDLTTask( + self.task = DeclarativePipelineTask( name="test_dlt_task", pipeline_name="test_pipeline", pipeline=self.mock_pipeline, @@ -31,7 +31,7 @@ def setUp(self) -> None: def test_ref_name(self) -> None: """Test that ref_name is correctly set.""" - self.assertEqual(DatabricksDLTTask.ref_name, "databricks_dlt") + self.assertEqual(DeclarativePipelineTask.ref_name, "declarative_pipeline") def test_job_name(self) -> None: """Test job_name property.""" @@ -66,13 +66,13 @@ def test_pipeline_name(self) -> None: self.assertEqual(self.task.pipeline_name, "test_pipeline") -class TestDatabricksDLTTaskDefaults(unittest.TestCase): - """Test cases for DatabricksDLTTask default values.""" +class TestDeclarativePipelineTaskDefaults(unittest.TestCase): + """Test cases for DeclarativePipelineTask default values.""" def setUp(self) -> None: """Set up test fixtures with minimal config.""" self.config = { - "type": "databricks_dlt", + "type": "declarative_pipeline", "description": "Test DLT task with defaults", "inputs": [], "outputs": [], @@ -86,7 +86,7 @@ def setUp(self) -> None: self.mock_pipeline = MagicMock() self.mock_pipeline.directory = "tests/fixtures/pipeline/tasks" - self.task = DatabricksDLTTask( + self.task = DeclarativePipelineTask( name="minimal_dlt_task", pipeline_name="test_pipeline", pipeline=self.mock_pipeline, @@ -114,13 +114,13 @@ def test_default_cancel_on_kill(self) -> None: self.assertTrue(self.task.cancel_on_kill) -class TestDatabricksDLTTaskBooleanHandling(unittest.TestCase): +class TestDeclarativePipelineTaskBooleanHandling(unittest.TestCase): """Test cases for boolean parameter handling edge cases.""" def test_wait_for_completion_false(self) -> None: """Test that wait_for_completion=false is correctly handled.""" config = { - "type": "databricks_dlt", + "type": "declarative_pipeline", "description": "Test", "inputs": [], "outputs": [], @@ -135,7 +135,7 @@ def test_wait_for_completion_false(self) -> None: mock_pipeline = MagicMock() mock_pipeline.directory = "tests/fixtures/pipeline/tasks" - task = DatabricksDLTTask( + task = DeclarativePipelineTask( name="test_task", pipeline_name="test_pipeline", pipeline=mock_pipeline, @@ -147,7 +147,7 @@ def test_wait_for_completion_false(self) -> None: def test_cancel_on_kill_false(self) -> None: """Test that cancel_on_kill=false is correctly handled.""" config = { - "type": "databricks_dlt", + "type": "declarative_pipeline", "description": "Test", "inputs": [], "outputs": [], @@ -162,7 +162,7 @@ def test_cancel_on_kill_false(self) -> None: mock_pipeline = MagicMock() mock_pipeline.directory = "tests/fixtures/pipeline/tasks" - task = DatabricksDLTTask( + task = DeclarativePipelineTask( name="test_task", pipeline_name="test_pipeline", pipeline=mock_pipeline,