diff --git a/airflow/dags/example_dags/__init__.py b/airflow/dags/example_dags/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airflow/dags/example_dags/print/__init__.py b/airflow/dags/example_dags/print/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airflow/dags/example_dags/print/dag.py b/airflow/dags/example_dags/print/dag.py new file mode 100644 index 000000000..973f520c8 --- /dev/null +++ b/airflow/dags/example_dags/print/dag.py @@ -0,0 +1,25 @@ +"""Example DAG demonstrating the usage of the BashOperator.""" +from __future__ import annotations + +import logging +from datetime import datetime + +from airflow.decorators import dag +from example_dags.print.tasks import print_context, print_finish, print_params + +logger = logging.getLogger(__name__) + + +@dag( + schedule="@daily", + start_date=datetime(2021, 12, 1), + catchup=False, + dag_id="print", +) +def taskflow(): + r1 = print_context() + r2 = print_params(r1) + print_finish(r2) + + +taskflow() diff --git a/airflow/dags/example_dags/print/tasks/__init__.py b/airflow/dags/example_dags/print/tasks/__init__.py new file mode 100644 index 000000000..85041f71b --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/__init__.py @@ -0,0 +1,3 @@ +from .print_context import print_context +from .print_finish import print_finish +from .print_params import print_params diff --git a/airflow/dags/example_dags/print/tasks/print_context.py b/airflow/dags/example_dags/print/tasks/print_context.py new file mode 100644 index 000000000..4e94cb703 --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/print_context.py @@ -0,0 +1,12 @@ +import logging +from pprint import pformat + +from airflow.decorators import task + +logger = logging.getLogger(__name__) + + +@task +def print_context(**context) -> dict: + logger.info("PARAMS: " + pformat(context)) + return context.get("params", {}) or {"empty": True} diff --git a/airflow/dags/example_dags/print/tasks/print_finish.py b/airflow/dags/example_dags/print/tasks/print_finish.py new file mode 100644 index 000000000..df5cd82fb --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/print_finish.py @@ -0,0 +1,10 @@ +import logging + +from airflow.decorators import task + +logger = logging.getLogger(__name__) + + +@task +def print_finish(data: str): + logger.info(data.upper()) diff --git a/airflow/dags/example_dags/print/tasks/print_params.py b/airflow/dags/example_dags/print/tasks/print_params.py new file mode 100644 index 000000000..41b585d06 --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/print_params.py @@ -0,0 +1,16 @@ +import logging +from pprint import pformat + +from airflow.decorators import task + +logger = logging.getLogger(__name__) + + +@task +def print_params(data: dict, **kwargs) -> str: + logger.info("PARAMS: " + pformat(data)) + logger.info("KWARGS: " + pformat(kwargs)) + return "finish" + + +"" diff --git a/airflow/dags/print_operator.py b/airflow/dags/print_operator.py deleted file mode 100644 index 04ab937a4..000000000 --- a/airflow/dags/print_operator.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Example DAG demonstrating the usage of the BashOperator.""" -from __future__ import annotations - -from datetime import datetime -from pprint import pprint - -from airflow.decorators import dag, task - - -@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False, dag_id="print") -def taskflow(): - @task - def print_context(**context) -> dict: - pprint(context) - return context.get('params', {}) - - @task - def print_params(data: dict) -> str: - pprint(data) - return "finish" - - @task - def print_finish(data: str): - print(data) - - r1 = print_context() - r2 = print_params(r1) - print_finish(r2) - - -taskflow()