From b3cbc47c249d871888cd098e2670fed3b7493876 Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Fri, 14 Jun 2024 15:14:06 +0200 Subject: [PATCH 1/2] feat: splited print dag tasks --- airflow/dags/example_dags/__init__.py | 0 airflow/dags/example_dags/print/__init__.py | 0 airflow/dags/example_dags/print/dag.py | 25 +++++++++++++++ .../dags/example_dags/print/tasks/__init__.py | 3 ++ .../example_dags/print/tasks/print_context.py | 11 +++++++ .../example_dags/print/tasks/print_finish.py | 9 ++++++ .../example_dags/print/tasks/print_params.py | 13 ++++++++ airflow/dags/print_operator.py | 31 ------------------- 8 files changed, 61 insertions(+), 31 deletions(-) create mode 100644 airflow/dags/example_dags/__init__.py create mode 100644 airflow/dags/example_dags/print/__init__.py create mode 100644 airflow/dags/example_dags/print/dag.py create mode 100644 airflow/dags/example_dags/print/tasks/__init__.py create mode 100644 airflow/dags/example_dags/print/tasks/print_context.py create mode 100644 airflow/dags/example_dags/print/tasks/print_finish.py create mode 100644 airflow/dags/example_dags/print/tasks/print_params.py delete mode 100644 airflow/dags/print_operator.py diff --git a/airflow/dags/example_dags/__init__.py b/airflow/dags/example_dags/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airflow/dags/example_dags/print/__init__.py b/airflow/dags/example_dags/print/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/airflow/dags/example_dags/print/dag.py b/airflow/dags/example_dags/print/dag.py new file mode 100644 index 000000000..8d891a3ca --- /dev/null +++ b/airflow/dags/example_dags/print/dag.py @@ -0,0 +1,25 @@ +"""Example DAG demonstrating the usage of the BashOperator.""" +from __future__ import annotations + +import logging +from datetime import datetime + +from airflow.decorators import dag + +from example_dags.print.tasks import ( + print_context, + print_params, + print_finish, +) + +logger = logging.getLogger(__name__) + + +@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False, dag_id="print") +def taskflow(): + r1 = print_context() + r2 = print_params(r1) + print_finish(r2) + + +taskflow() diff --git a/airflow/dags/example_dags/print/tasks/__init__.py b/airflow/dags/example_dags/print/tasks/__init__.py new file mode 100644 index 000000000..9e3333b7f --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/__init__.py @@ -0,0 +1,3 @@ +from .print_context import print_context +from .print_params import print_params +from .print_finish import print_finish diff --git a/airflow/dags/example_dags/print/tasks/print_context.py b/airflow/dags/example_dags/print/tasks/print_context.py new file mode 100644 index 000000000..4f0b8093f --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/print_context.py @@ -0,0 +1,11 @@ +import logging +from airflow.decorators import task +from pprint import pformat + +logger = logging.getLogger(__name__) + + +@task +def print_context(**context) -> dict: + logger.info('PARAMS: ' + pformat(context)) + return context.get('params', {}) or {'empty': True} diff --git a/airflow/dags/example_dags/print/tasks/print_finish.py b/airflow/dags/example_dags/print/tasks/print_finish.py new file mode 100644 index 000000000..a55a2efb6 --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/print_finish.py @@ -0,0 +1,9 @@ +import logging +from airflow.decorators import task + +logger = logging.getLogger(__name__) + + +@task +def print_finish(data: str): + logger.info(data.upper()) diff --git a/airflow/dags/example_dags/print/tasks/print_params.py b/airflow/dags/example_dags/print/tasks/print_params.py new file mode 100644 index 000000000..45da11746 --- /dev/null +++ b/airflow/dags/example_dags/print/tasks/print_params.py @@ -0,0 +1,13 @@ +import logging +from airflow.decorators import task +from pprint import pformat + +logger = logging.getLogger(__name__) + + +@task +def print_params(data: dict, **kwargs) -> str: + logger.info('PARAMS: ' + pformat(data)) + logger.info('KWARGS: ' + pformat(kwargs)) + return "finish" +"" \ No newline at end of file diff --git a/airflow/dags/print_operator.py b/airflow/dags/print_operator.py deleted file mode 100644 index 04ab937a4..000000000 --- a/airflow/dags/print_operator.py +++ /dev/null @@ -1,31 +0,0 @@ -"""Example DAG demonstrating the usage of the BashOperator.""" -from __future__ import annotations - -from datetime import datetime -from pprint import pprint - -from airflow.decorators import dag, task - - -@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False, dag_id="print") -def taskflow(): - @task - def print_context(**context) -> dict: - pprint(context) - return context.get('params', {}) - - @task - def print_params(data: dict) -> str: - pprint(data) - return "finish" - - @task - def print_finish(data: str): - print(data) - - r1 = print_context() - r2 = print_params(r1) - print_finish(r2) - - -taskflow() From 4bc44af15e9d2f9ec54ce78c749d3e09a3c7f56b Mon Sep 17 00:00:00 2001 From: SunnyCapt Date: Fri, 14 Jun 2024 15:18:14 +0200 Subject: [PATCH 2/2] fix: linters --- airflow/dags/example_dags/print/dag.py | 16 ++++++++-------- .../dags/example_dags/print/tasks/__init__.py | 2 +- .../example_dags/print/tasks/print_context.py | 7 ++++--- .../example_dags/print/tasks/print_finish.py | 1 + .../example_dags/print/tasks/print_params.py | 11 +++++++---- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/airflow/dags/example_dags/print/dag.py b/airflow/dags/example_dags/print/dag.py index 8d891a3ca..973f520c8 100644 --- a/airflow/dags/example_dags/print/dag.py +++ b/airflow/dags/example_dags/print/dag.py @@ -1,21 +1,21 @@ """Example DAG demonstrating the usage of the BashOperator.""" from __future__ import annotations -import logging +import logging from datetime import datetime from airflow.decorators import dag - -from example_dags.print.tasks import ( - print_context, - print_params, - print_finish, -) +from example_dags.print.tasks import print_context, print_finish, print_params logger = logging.getLogger(__name__) -@dag(schedule="@daily", start_date=datetime(2021, 12, 1), catchup=False, dag_id="print") +@dag( + schedule="@daily", + start_date=datetime(2021, 12, 1), + catchup=False, + dag_id="print", +) def taskflow(): r1 = print_context() r2 = print_params(r1) diff --git a/airflow/dags/example_dags/print/tasks/__init__.py b/airflow/dags/example_dags/print/tasks/__init__.py index 9e3333b7f..85041f71b 100644 --- a/airflow/dags/example_dags/print/tasks/__init__.py +++ b/airflow/dags/example_dags/print/tasks/__init__.py @@ -1,3 +1,3 @@ from .print_context import print_context -from .print_params import print_params from .print_finish import print_finish +from .print_params import print_params diff --git a/airflow/dags/example_dags/print/tasks/print_context.py b/airflow/dags/example_dags/print/tasks/print_context.py index 4f0b8093f..4e94cb703 100644 --- a/airflow/dags/example_dags/print/tasks/print_context.py +++ b/airflow/dags/example_dags/print/tasks/print_context.py @@ -1,11 +1,12 @@ import logging -from airflow.decorators import task from pprint import pformat +from airflow.decorators import task + logger = logging.getLogger(__name__) @task def print_context(**context) -> dict: - logger.info('PARAMS: ' + pformat(context)) - return context.get('params', {}) or {'empty': True} + logger.info("PARAMS: " + pformat(context)) + return context.get("params", {}) or {"empty": True} diff --git a/airflow/dags/example_dags/print/tasks/print_finish.py b/airflow/dags/example_dags/print/tasks/print_finish.py index a55a2efb6..df5cd82fb 100644 --- a/airflow/dags/example_dags/print/tasks/print_finish.py +++ b/airflow/dags/example_dags/print/tasks/print_finish.py @@ -1,4 +1,5 @@ import logging + from airflow.decorators import task logger = logging.getLogger(__name__) diff --git a/airflow/dags/example_dags/print/tasks/print_params.py b/airflow/dags/example_dags/print/tasks/print_params.py index 45da11746..41b585d06 100644 --- a/airflow/dags/example_dags/print/tasks/print_params.py +++ b/airflow/dags/example_dags/print/tasks/print_params.py @@ -1,13 +1,16 @@ import logging -from airflow.decorators import task from pprint import pformat +from airflow.decorators import task + logger = logging.getLogger(__name__) @task def print_params(data: dict, **kwargs) -> str: - logger.info('PARAMS: ' + pformat(data)) - logger.info('KWARGS: ' + pformat(kwargs)) + logger.info("PARAMS: " + pformat(data)) + logger.info("KWARGS: " + pformat(kwargs)) return "finish" -"" \ No newline at end of file + + +""