diff --git a/.env b/.env index 2b42e8d..28994cc 100644 --- a/.env +++ b/.env @@ -1,23 +1,14 @@ -AIRFLOW_UID=502 -AIRFLOW_GID=0 -API_WORKERS=4 -API_PORT=5551 -API_TIMEOUT=10 +KGX_DATA_SETS="bdc-studies-kgx:v1.0" +INPUT_DATA_SETS="heal-mds-studies:v1.0" -DATA_DIR=./local_storage +LAKEFS_ACCESS_KEY="" +LAKEFS_SECRET_KEY="" +LAKEFS_REPO="" +LAKEFS_BRANCH="" +LAKEFS_URL="https://lakefs.apps.renci.org" -DUG_LOG_LEVEL=INFO - -ELASTICSEARCH_PASSWORD=12345 -ELASTICSEARCH_HOST=elasticsearch -ELASTICSEARCH_USERNAME=elastic - -NBOOST_API_HOST=nboost - -REDIS_PASSWORD=weak -REDIS_HOST=merge-redis-master -REDIS_PORT=6379 -TRANQL_ACCESS_LOG=access.log -TRANQL_ERROR_LOG=error.log -ROGER_DUG__INPUTS_DATA__SETS=topmed:v1.0 \ No newline at end of file +BIOMEGATRON_URL="https://med-nemo.apps.renci.org/annotate" +SAPBERT_URL="https://sap-qdrant.apps.renci.org/annotate" +NODE_NORM_URL="https://nodenormalization-sri.renci.org/get_normalized_nodes?conflate=false&description=true&curie=" +NAME_RES_URL="https://name-resolution-sri.renci.org/reverse_lookup" \ No newline at end of file diff --git a/.github/workflows/trivy-pr-scan.yml b/.github/workflows/trivy-pr-scan.yml index 1e7bc06..b893516 100644 --- a/.github/workflows/trivy-pr-scan.yml +++ b/.github/workflows/trivy-pr-scan.yml @@ -61,7 +61,7 @@ jobs: # We still fail the job if results are found, so below will always run # unless manually canceled. - name: Upload Trivy scan results to GitHub Security tab - uses: github/codeql-action/upload-sarif@v2 + uses: github/codeql-action/upload-sarif@v3 if: '!cancelled()' with: sarif_file: 'trivy-results.sarif' \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index 47d2c13..7323534 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,33 +1,74 @@ -FROM bitnami/airflow:2.10.5-debian-12-r7 - -USER root -RUN apt-get update && apt-get install -y git nano vim gcc rustc cargo -#RUN useradd -u 1001 -ms /bin/bash airflow && chown -R airflow /home/airflow -COPY requirements.txt requirements.txt -RUN source /opt/bitnami/airflow/venv/bin/activate && CARGO_HOME=/tmp/.cargo && \ - pip install setuptools wheel && \ - pip install -r requirements.txt - -RUN rm -f requirements.txt - -## Vul patches -## Python lib patches on airflow python env -RUN source /opt/bitnami/airflow/venv/bin/activate pip install --upgrade \ - flask-appbuilder==4.5.3 \ - cryptography==44.0.1 \ - werkzeug==3.0.6 \ - urllib3==2.2.2 -RUN source /opt/bitnami/airflow/venv/bin/activate pip uninstall -y \ - apache-airflow-providers-mysql==6.2.0 - -# Uninstall these from non airflow python env -RUN pip install --upgrade \ - flask-appbuilder==4.5.3 \ - cryptography==44.0.1 \ - werkzeug==3.0.6 \ - urllib3==2.2.2 -RUN apt-get autoremove -y vim -RUN apt-get autoremove -y binutils -RUN apt-get autoremove -y linux-libc-dev +# Use a Debian-based image for better compatibility +FROM python:3.11.14-slim-trixie +# Set Airflow version and home directory +ARG AIRFLOW_VERSION=3.1.1 +ARG AIRFLOW_HOME=/opt/airflow + +# Environment variables +ENV AIRFLOW_HOME=${AIRFLOW_HOME} +ENV AIRFLOW__CORE__LOAD_EXAMPLES=False +ENV AIRFLOW__CORE__EXECUTOR=LocalExecutor +ENV AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow +ENV PYTHONUNBUFFERED=1 + +# Create airflow user and directories +RUN useradd --uid 50000 --home-dir ${AIRFLOW_HOME} --create-home airflow && \ + mkdir -p ${AIRFLOW_HOME}/dags ${AIRFLOW_HOME}/logs ${AIRFLOW_HOME}/plugins ${AIRFLOW_HOME}/config + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + libpq-dev \ + libffi-dev \ + libssl-dev \ + curl \ + tini \ + tzdata \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Upgrade pip tools +RUN pip install --no-cache-dir --upgrade pip setuptools wheel + +# Install Airflow (with PostgreSQL, Celery, Redis support) +RUN pip install --no-cache-dir \ + "apache-airflow[postgres,celery,redis,fab]==${AIRFLOW_VERSION}" \ + "apache-airflow-providers-cncf-kubernetes" \ + --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-3.11.txt" + +# Optional: install extra packages +RUN pip install --no-cache-dir psycopg2-binary redis + +COPY ./requirements.txt /tmp/requirements.txt + +RUN pip install -r /tmp/requirements.txt + +RUN rm /tmp/requirements.txt + + + +RUN apt-get purge -y --auto-remove \ + build-essential \ + libpq-dev \ + libffi-dev \ + libssl-dev \ + curl \ + git && \ + apt-get clean + +# Set ownership +RUN chown -R airflow:airflow ${AIRFLOW_HOME} + +# Switch to airflow user USER airflow +WORKDIR ${AIRFLOW_HOME} + +# Expose Airflow webserver port +EXPOSE 8080 + +# Use tini for signal handling +ENTRYPOINT ["/usr/bin/tini", "--"] + +# Default command +CMD ["airflow", "webserver"] diff --git a/dags/annotate_and_index.py b/dags/annotate_and_index.py index b82c019..e4cdfd9 100644 --- a/dags/annotate_and_index.py +++ b/dags/annotate_and_index.py @@ -26,7 +26,7 @@ "commitid_from": None, "commitid_to": None }, - schedule_interval=None + # schedule_interval=None ) as dag: init = EmptyOperator(task_id="init", dag=dag) finish = EmptyOperator(task_id="finish", dag=dag) @@ -65,7 +65,7 @@ "commitid_from": None, "commitid_to": None }, - schedule_interval=None + # schedule_interval=None ) as dag: init = EmptyOperator(task_id="init", dag=dag) @@ -80,4 +80,7 @@ def print_context(ds=None, **kwargs): init >> create_python_task(dag, "get_from_lakefs", print_context) >> finish - #run_this = PythonOperator(task_id="print_the_context", python_callable=print_context) \ No newline at end of file + #run_this = PythonOperator(task_id="print_the_context", python_callable=print_context) + +if __name__ == "__main__": + dag.test() \ No newline at end of file diff --git a/dags/knowledge_graph_build.py b/dags/knowledge_graph_build.py index de28aa7..d94d226 100644 --- a/dags/knowledge_graph_build.py +++ b/dags/knowledge_graph_build.py @@ -15,7 +15,7 @@ with DAG( dag_id='knowledge_graph_build', default_args=default_args, - schedule_interval=None + # schedule_interval=None ) as dag: """ Build the workflow tasks. """ diff --git a/dags/roger/config/__init__.py b/dags/roger/config/__init__.py index 71111f3..cc017fc 100644 --- a/dags/roger/config/__init__.py +++ b/dags/roger/config/__init__.py @@ -3,7 +3,7 @@ import warnings from dataclasses import dataclass, field from pathlib import Path -from typing import Dict, Optional, List +from typing import Dict, Optional, List, Union import yaml from dug.config import Config as DugConfig @@ -33,7 +33,7 @@ class LakefsConfig(DictLike): secret_access_key: str branch: str repo: str - enabled: bool = False + enabled: Union[bool, str] = False def __post_init__(self): if isinstance(self.enabled, str): @@ -135,8 +135,8 @@ class AnnotationConfig(DictLike): ]) def __post_init__(self): - self.annotator_args["sapbert"]["bagel"]["enabled"] = self.annotator_args["sapbert"]["bagel"][ - "enabled"].lower() == "true" + self.annotator_args["sapbert"]["bagel"]["enabled"] = str(self.annotator_args["sapbert"]["bagel"][ + "enabled"]).lower() == "true" @dataclass diff --git a/dags/roger/core/bulkload.py b/dags/roger/core/bulkload.py index a8ddbc1..17bd179 100644 --- a/dags/roger/core/bulkload.py +++ b/dags/roger/core/bulkload.py @@ -53,6 +53,9 @@ def create_nodes_csv_file(self, input_data_path=None, output_data_path=None): merged_nodes_file = storage.merged_objects('nodes', input_data_path) counter = 1 for node in storage.json_line_iter(merged_nodes_file): + if node.get('description'): + node['description'] = node['description'].replace('\n', + ' ') if not node.get('category'): category_error_nodes.add(node['id']) node['category'] = [BiolinkModel.root_type] diff --git a/dags/roger/tasks.py b/dags/roger/tasks.py index 2749a52..cd82838 100755 --- a/dags/roger/tasks.py +++ b/dags/roger/tasks.py @@ -1,20 +1,21 @@ -"Tasks and methods related to Airflow implementations of Roger" +# Tasks and methods related to Airflow implementations of Roger import os - -from airflow.operators.python import PythonOperator -from airflow.operators.empty import EmptyOperator -from airflow.utils.task_group import TaskGroup -from airflow.utils.dates import days_ago -from airflow.models import DAG -from airflow.models.dag import DagContext -from airflow.models.taskinstance import TaskInstance -from airflow.operators.bash import BashOperator +from datetime import datetime +from functools import partial from typing import Union from pathlib import Path import glob import shutil +# Airflow 3.x - prefer provider imports and new public types +from airflow.providers.standard.operators.python import PythonOperator +from airflow.operators.empty import EmptyOperator +from airflow.utils.task_group import TaskGroup +from airflow.models import DAG +from airflow.models.taskinstance import TaskInstance +from airflow.providers.standard.operators.bash import BashOperator +from airflow.utils.context import Context # type: ignore from roger.config import config, RogerConfig from roger.logger import get_logger @@ -22,13 +23,12 @@ from avalon.mainoperations import put_files, LakeFsWrapper, get_files from lakefs_sdk.configuration import Configuration from lakefs_sdk.models.merge import Merge -from functools import partial logger = get_logger() default_args = { 'owner': 'RENCI', - 'start_date': days_ago(1) + 'start_date': datetime(2025, 1, 1) } @@ -44,13 +44,17 @@ def task_wrapper(python_callable, **kwargs): pass_conf = kwargs.get('pass_conf', True) if config.lakefs_config.enabled: # get input path - input_data_path = generate_dir_name_from_task_instance(kwargs['ti'], - roger_config=config, - suffix='input') + input_data_path = generate_dir_name_from_task_instance( + kwargs['ti'], + roger_config=config, + suffix='input' + ) # get output path from task id run id dag id combo - output_data_path = generate_dir_name_from_task_instance(kwargs['ti'], - roger_config=config, - suffix='output') + output_data_path = generate_dir_name_from_task_instance( + kwargs['ti'], + roger_config=config, + suffix='output' + ) else: input_data_path, output_data_path = None, None # cast it to a path object @@ -66,6 +70,7 @@ def task_wrapper(python_callable, **kwargs): return python_callable(config=config, **func_args) return python_callable(**func_args) + def get_executor_config(data_path='/opt/airflow/share/data'): """ Get an executor configuration. :param annotations: Annotations to attach to the executor. @@ -73,12 +78,11 @@ def get_executor_config(data_path='/opt/airflow/share/data'): """ env_var_prefix = config.OS_VAR_PREFIX # based on environment set on scheduler pod, make secrets for worker pod - # this ensures passwords don't leak as pod templates. secrets_map = [{ "secret_name_ref": "ELASTIC_SEARCH_PASSWORD_SECRET", "secret_key_ref": "ELASTIC_SEARCH_PASSWORD_SECRET_KEY", "env_var_name": f"{env_var_prefix}ELASTIC__SEARCH_PASSWORD" - },{ + }, { "secret_name_ref": "REDIS_PASSWORD_SECRET", "secret_key_ref": "REDIS_PASSWORD_SECRET_KEY", "env_var_name": f"{env_var_prefix}REDISGRAPH_PASSWORD" @@ -92,8 +96,8 @@ def get_executor_config(data_path='/opt/airflow/share/data'): "name": secret["env_var_name"], "valueFrom": { "secretKeyRef": { - "name": secret_name, - "key": secret_key_name + "name": secret_name, + "key": secret_key_name } }}) @@ -104,6 +108,7 @@ def get_executor_config(data_path='/opt/airflow/share/data'): } return k8s_executor_config + def init_lakefs_client(config: RogerConfig) -> LakeFsWrapper: configuration = Configuration() configuration.username = config.lakefs_config.access_key_id @@ -123,47 +128,28 @@ def pagination_helper(page_fetcher, **kwargs): kwargs['after'] = resp.pagination.next_offset -def avalon_commit_callback(context: DagContext, **kwargs): - client: LakeFsWrapper = init_lakefs_client(config=config) +def avalon_commit_callback(context: Context, **kwargs): + client: LakeFsWrapper = init_lakefs_client(config=config) # now files have been processed, # this part should # get the out path of the task - local_path = str(generate_dir_name_from_task_instance(context['ti'], - roger_config=config, - suffix='output')).rstrip('/') + '/' + local_path = str(generate_dir_name_from_task_instance( + context['ti'], + roger_config=config, + suffix='output')).rstrip('/') + '/' task_id = context['ti'].task_id dag_id = context['ti'].dag_id run_id = context['ti'].run_id - # run id looks like 2023-10-18T17:35:14.890186+00:00 - # normalized to 2023_10_18T17_35_14_890186_00_00 - # since lakefs branch id must consist of letters, digits, underscores and dashes, - # and cannot start with a dash - run_id_normalized = run_id.replace('-','_').replace(':','_').replace('+','_').replace('.','_') - dag_id_normalized = dag_id.replace('-','_').replace(':','_').replace('+','_').replace('.','_') - task_id_normalized = task_id.replace('-','_').replace(':','_').replace('+','_').replace('.','_') + # normalize run/dag/task ids for branch name + run_id_normalized = run_id.replace('-', '_').replace(':', '_').replace('+', '_').replace('.', '_') + dag_id_normalized = dag_id.replace('-', '_').replace(':', '_').replace('+', '_').replace('.', '_') + task_id_normalized = task_id.replace('-', '_').replace(':', '_').replace('+', '_').replace('.', '_') temp_branch_name = f'{dag_id_normalized}_{task_id_normalized}_{run_id_normalized}' - # remote path to upload the files to. remote_path = f'{dag_id}/{task_id}/' - # merge destination branch branch = config.lakefs_config.branch repo = config.lakefs_config.repo - # This part pushes to a temp branch on the repo - # now we have the output path lets do some pushing but where ? - # right now lets stick to using one repo , - - # issue Vladmir pointed out if uploads to a single lakefs branch have not - # been finalized with commit, - # this would cause dirty commits if parallel tasks target the same branch. - - # solution: Lakefs team suggested we commit to a different temp branch per - # task, and merge that branch. - # this callback function will do that for now. - - # 1. put files into a temp branch. - # 2. make sure a commit happens. - # 3. merge that branch to master branch. logger.info("Pushing local path %s to %s@%s in %s dir", local_path, repo, temp_branch_name, remote_path) put_files( @@ -182,27 +168,22 @@ def avalon_commit_callback(context: DagContext, **kwargs): source_branch_name=branch ) - # see what changes are going to be pushed from this branch to main branch for diff in pagination_helper(client._client.refs_api.diff_refs, repository=repo, left_ref=branch, right_ref=temp_branch_name): logger.info("Diff: " + str(diff)) - + try: - # merging temp branch to working branch - # the current working branch wins incase of conflicts merge = Merge(**{"strategy": "source-wins"}) client._client.refs_api.merge_into_branch(repository=repo, - source_ref=temp_branch_name, - destination_branch=branch, - merge=merge - ) + source_ref=temp_branch_name, + destination_branch=branch, + merge=merge + ) logger.info(f"merged branch {temp_branch_name} into {branch}") except Exception as e: - # remove temp logger.error(e) - # delete temp branch finally: client._client.branches_api.delete_branch( repository=repo, @@ -211,38 +192,43 @@ def avalon_commit_callback(context: DagContext, **kwargs): logger.info(f"deleted temp branch {temp_branch_name}") logger.info(f"deleting local dir {local_path}") - files_to_clean = glob.glob(local_path + '**', recursive=True) + [local_path] + # cleanup local dirs clean_up(context, **kwargs) -def clean_up(context: DagContext, **kwargs): - input_dir = str(generate_dir_name_from_task_instance(context['ti'], - roger_config=config, - suffix='output')).rstrip('/') + '/' - output_dir = str(generate_dir_name_from_task_instance(context['ti'], - roger_config=config, - suffix='input')).rstrip('/') + '/' + +def clean_up(context: Context, **kwargs): + input_dir = str(generate_dir_name_from_task_instance( + context['ti'], + roger_config=config, + suffix='output')).rstrip('/') + '/' + output_dir = str(generate_dir_name_from_task_instance( + context['ti'], + roger_config=config, + suffix='input')).rstrip('/') + '/' files_to_clean = glob.glob(input_dir + '**', recursive=True) + [input_dir] files_to_clean += glob.glob(output_dir + '**', recursive=True) + [output_dir] for f in files_to_clean: if os.path.exists(f): shutil.rmtree(f) + def generate_dir_name_from_task_instance(task_instance: TaskInstance, - roger_config: RogerConfig, suffix:str): + roger_config: RogerConfig, suffix: str): # if lakefs is not enabled just return none so methods default to using # local dir structure. if not roger_config.lakefs_config.enabled: return None - root_data_dir = os.getenv("ROGER_DATA_DIR").rstrip('/') + root_data_dir = os.getenv("ROGER_DATA_DIR").rstrip('/') task_id = task_instance.task_id dag_id = task_instance.dag_id run_id = task_instance.run_id - try_number = task_instance._try_number + try_number = task_instance.try_number return Path( f"{root_data_dir}/{dag_id}_{task_id}_{run_id}_{try_number}_{suffix}") -def setup_input_data(context, exec_conf): + +def setup_input_data(context: Context, exec_conf): logger.info(""" - Figures out the task name and id, - find its data dependencies @@ -253,40 +239,29 @@ def setup_input_data(context, exec_conf): logger.info(">>> context") logger.info(context) - # Serves as a location where files the task will work on are placed. - # computed as ROGER_DATA_DIR + /current task instance name_input_dir - input_dir = str(generate_dir_name_from_task_instance( context['ti'], roger_config=config, suffix="input")) - # Clear up files from previous run etc... - - # create input dir os.makedirs(input_dir, exist_ok=True) - # Download files from lakefs and store them in this new input_path client = init_lakefs_client(config=config) - repos = exec_conf['repos'] - dag_params = context["params"] + repos = exec_conf.get('repos', []) + dag_params = context.get("params", {}) if dag_params.get("repository_id"): logger.info(">>> repository_id supplied. Overriding repo.") - repos=[{ + repos = [{ 'repo': dag_params.get("repository_id"), 'branch': dag_params.get("branch_name"), 'commitid_from': dag_params.get("commitid_from"), 'commitid_to': dag_params.get("commitid_to") }] - # if no external repo is provided we assume to get the upstream task dataset. if not repos or len(repos) == 0: - # merge destination branch branch = config.lakefs_config.branch repo = config.lakefs_config.repo task_instance: TaskInstance = context['ti'] - # get upstream ids upstream_ids = task_instance.task.upstream_task_ids dag_id = task_instance.dag_id - # calculate remote dirs using dag_id + upstreams repos = [{ 'repo': repo, 'branch': branch, @@ -295,18 +270,13 @@ def setup_input_data(context, exec_conf): 'commitid_to': None } for upstream_id in upstream_ids] - # input_repo = exec_conf['input_repo'] - # input_branch = exec_conf['input_branch'] - # If input repo is provided use that as source of files - for repo in repos: - if not repo.get('path'): - # get all if path is not specified - repo['path'] = '*' + for r in repos: + if not r.get('path'): + r['path'] = '*' logger.info(f"repos : {repos}") logger.info(">>> start of downloading data") for r in repos: - # create path to download to ... if not os.path.exists(input_dir + f'/{r["repo"]}'): os.mkdir(input_dir + f'/{r["repo"]}') @@ -325,46 +295,39 @@ def setup_input_data(context, exec_conf): logger.info(">>> end of downloading data") - -def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos = {}, pass_conf=True, no_output_files=False): +def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos=None, pass_conf=True, no_output_files=False): """ Create a python task. :param func_kwargs: additional arguments for callable. :param dag: dag to add task to. :param name: The name of the task. :param a_callable: The code to run in this task. - """ - + """ + + if external_repos is None: + external_repos = {} + # these are actual arguments passed down to the task function op_kwargs = { "python_callable": a_callable, "to_string": True, "pass_conf": pass_conf } - # update / override some of the args passed to the task function by default if func_kwargs is None: func_kwargs = {} op_kwargs.update(func_kwargs) - - # Python operator arguments , by default for non-lakefs config this is all we need. python_operator_args = { - "task_id": name, - "python_callable":task_wrapper, - # "executor_config" : get_executor_config(), - "dag": dag, - "provide_context" : True + "task_id": name, + "python_callable": task_wrapper, + # executor_config example left commented; fill if needed + "dag": dag, } - # if we have lakefs... if config.lakefs_config.enabled: - - # repo and branch for pre-execution , to download input objects pre_exec_conf = { 'repos': [] } if external_repos: - # if the task is a root task , beginning of the dag... - # and we want to pull data from a different repo. pre_exec_conf = { 'repos': [{ 'repo': r['name'], @@ -374,18 +337,19 @@ def create_python_task(dag, name, a_callable, func_kwargs=None, external_repos = } pre_exec = partial(setup_input_data, exec_conf=pre_exec_conf) - # add pre_exec partial function as an argument to python executor conf + # pre_execute will be called with context -> partial keeps exec_conf fixed python_operator_args['pre_execute'] = pre_exec - python_operator_args['on_failure_callback'] = partial(clean_up, kwargs=op_kwargs) - # if the task has output files, we will add a commit callback + + # pass fixed kwargs into partials so resulting callback accepts (context,) + python_operator_args['on_failure_callback'] = partial(clean_up, **op_kwargs) if not no_output_files: - python_operator_args['on_success_callback'] = partial(avalon_commit_callback, kwargs=op_kwargs) - - # add kwargs + python_operator_args['on_success_callback'] = partial(avalon_commit_callback, **op_kwargs) + python_operator_args["op_kwargs"] = op_kwargs return PythonOperator(**python_operator_args) + def create_pipeline_taskgroup( dag, pipeline_class: type, @@ -416,7 +380,6 @@ def create_pipeline_taskgroup( f"index_{name}_variables", pipeline.index_variables, pass_conf=False, - # declare that this task will not generate files. no_output_files=True) index_variables_task.set_upstream(annotate_task) @@ -425,9 +388,8 @@ def create_pipeline_taskgroup( f"validate_{name}_index_variables", pipeline.validate_indexed_variables, pass_conf=False, - # declare that this task will not generate files. no_output_files=True - ) + ) validate_index_variables_task.set_upstream([annotate_task, index_variables_task]) make_kgx_task = create_python_task( @@ -441,7 +403,7 @@ def create_pipeline_taskgroup( dag, f"crawl_{name}", pipeline.crawl_tranql, - pass_conf=False) + pass_conf=False) crawl_task.set_upstream(annotate_task) index_concepts_task = create_python_task( @@ -449,7 +411,6 @@ def create_pipeline_taskgroup( f"index_{name}_concepts", pipeline.index_concepts, pass_conf=False, - # declare that this task will not generate files. no_output_files=True) index_concepts_task.set_upstream(crawl_task) @@ -458,12 +419,10 @@ def create_pipeline_taskgroup( f"validate_{name}_index_concepts", pipeline.validate_indexed_concepts, pass_conf=False, - # declare that this task will not generate files. no_output_files=True ) validate_index_concepts_task.set_upstream([crawl_task, index_concepts_task, annotate_task]) - complete_task = EmptyOperator(task_id=f"complete_{name}") complete_task.set_upstream( (make_kgx_task, diff --git a/docker-compose.yaml b/docker-compose.yaml index 7c698ed..fd81f5f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -1,207 +1,167 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# +version: '3.8' -# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. -# -# WARNING: This configuration is for local development. Do not use it in a production deployment. -# -# This configuration supports basic configuration using environment variables or an .env file -# The following variables are supported: -# -# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. -# Default: apache/airflow:master-python3.8 -# AIRFLOW_UID - User ID in Airflow containers -# Default: 50000 -# AIRFLOW_GID - Group ID in Airflow containers -# Default: 50000 -# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account. -# Default: airflow -# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account. -# Default: airflow -# -# Feel free to modify this file to suit your needs. ---- -version: '3' -x-airflow-common: - &airflow-common - build: - dockerfile: Dockerfile - context: . - environment: - &airflow-common-env - AIRFLOW__CORE__EXECUTOR: CeleryExecutor - AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow - AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow - AIRFLOW__CELERY__BROKER_URL: redis://:$REDIS_PASSWORD@redis:$REDIS_PORT/0 +x-airflow-common: &airflow-common + build: . + environment: &airflow_common_environment + AIRFLOW__CORE__EXECUTOR: LocalExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres:5432/airflow AIRFLOW__CORE__FERNET_KEY: '' AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' AIRFLOW__CORE__LOAD_EXAMPLES: 'false' - - ROGER_DUG__INPUTS_DATA__SETS: "$ROGER_DUG__INPUTS_DATA__SETS" - ROGER_ELASTICSEARCH_HOST: "$ELASTIC_API_HOST" - ROGER_ELASTICSEARCH_PASSWORD: "$ELASTIC_PASSWORD" - ROGER_ELASTICSEARCH_NBOOST__HOST: "$NBOOST_API_HOST" - ROGER_REDISGRAPH_HOST: "$REDIS_HOST" - ROGER_REDISGRAPH_PASSWORD: "$REDIS_PASSWORD" - ROGER_KGX_DATASET__VERSION: "v3.0" - ROGER_DATA_DIR: "/opt/airflow/share/data" + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + AIRFLOW__CORE__SIMPLE_AUTH_MANAGER_USERS: 'admin:Admin' + ROGER_ELASTICSEARCH_HOST: "elasticsearch" + ROGER_ELASTICSEARCH_PASSWORD: "" + ROGER_ELASTICSEARCH_SCHEME: "http" + ROGER_ELASTICSEARCH_USERNAME: "elastic" + ROGER_REDISGRAPH_GRAPH: "test" + ROGER_REDISGRAPH_HOST: "redis-stack" + ROGER_REDISGRAPH_PASSWORD: "" + ROGER_REDISGRAPH_PORT: "6379" + ROGER_KGX_DATA__SETS: ${KGX_DATA_SETS} + ROGER_LAKEFS__CONFIG_ACCESS__KEY__ID: ${LAKEFS_ACCESS_KEY} + ROGER_LAKEFS__CONFIG_BRANCH: ${LAKEFS_BRANCH} + ROGER_LAKEFS__CONFIG_ENABLED: "true" + ROGER_LAKEFS__CONFIG_HOST: ${LAKEFS_URL} + ROGER_LAKEFS__CONFIG_REPO: ${LAKEFS_REPO} + ROGER_LAKEFS__CONFIG_SECRET__ACCESS__KEY: ${LAKEFS_SECRET_KEY} + ROGER_DUG__INPUTS_DATA__SETS: ${INPUT_DATA_SETS} + ROGER_ANNOTATION_ANNOTATOR__ARGS_SAPBERT_CLASSIFICATION__URL: ${BIOMEGATRON_URL} + ROGER_ANNOTATION_ANNOTATOR__ARGS_SAPBERT_ANNOTATOR__URL : ${SAPBERT_URL} + ROGER_ANNOTATION_NORMALIZER : ${NODE_NORM_URL} + ROGER_ANNOTATION_SYNONYM__SERVICE : ${NAME_RES_URL} volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./plugins:/opt/airflow/plugins - - ./data:/opt/airflow/share/data - user: root + - ./config:/opt/airflow/config depends_on: - redis: - condition: service_healthy postgres: condition: service_healthy + networks: + - airflow-network services: postgres: - image: postgres:13 + image: postgres:15-alpine environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - postgres-db-volume:/var/lib/postgresql/data - - ${DATA_DIR}/elastic:/elastic - - ${DATA_DIR}/redis:/redis healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] - interval: 5s + interval: 10s retries: 5 - restart: always + start_period: 5s + ports: + - "5432:5432" + networks: + - airflow-network - airflow-webserver: - <<: *airflow-common - command: webserver + # --- NEW: Elasticsearch Service --- + elasticsearch: + image: docker.elastic.co/elasticsearch/elasticsearch:8.11.1 + environment: + - discovery.type=single-node + - xpack.security.enabled=false + - ES_JAVA_OPTS=-Xms512m -Xmx512m # Limit RAM usage for dev + volumes: + - elasticsearch-data:/usr/share/elasticsearch/data ports: - - 8080:8080 + - "9200:9200" # REST API + - "9300:9300" # Internal transport + networks: + - airflow-network + + # --- NEW: Redis Stack (includes RedisGraph/FalkorDB) --- + redis-stack: + image: redis/redis-stack:latest + volumes: + - redis-stack-data:/data + ports: + - "6379:6379" # Redis port + - "8001:8001" # RedisInsight UI healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + test: [ "CMD", "redis-cli", "ping" ] interval: 10s - timeout: 10s retries: 5 - restart: always - - airflow-scheduler: - <<: *airflow-common - command: scheduler - restart: always - - airflow-worker: - <<: *airflow-common - command: celery worker - restart: always + start_period: 5s + networks: + - airflow-network airflow-init: <<: *airflow-common - command: version - environment: - <<: *airflow-common-env - _AIRFLOW_DB_UPGRADE: 'true' - _AIRFLOW_WWW_USER_CREATE: 'true' - _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} - _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + entrypoint: /bin/bash + command: + - -c + - | + mkdir -p /opt/airflow/logs /opt/airflow/dags /opt/airflow/plugins + airflow db migrate + depends_on: + postgres: + condition: service_healthy - flower: + airflow-webserver: <<: *airflow-common - command: celery flower + command: airflow api-server ports: - - 5555:5555 + - "8080:8080" healthcheck: - test: ["CMD", "curl", "--fail", "http://localhost:5555/"] - interval: 10s + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s timeout: 10s retries: 5 - restart: always + start_period: 30s + depends_on: + airflow-init: + condition: service_completed_successfully - redis: - # image: redislabs/redisgraph:2.10.9 #Alternative Image - user: root - image: 'redis/redis-stack:6.2.4-v2' - command: "redis-server --requirepass $REDIS_PASSWORD --loadmodule /opt/redis-stack/lib/redisgraph.so" - environment: - - REDIS_ARGS=--requirepass $REDIS_PASSWORD - volumes: - - $DATA_DIR/redis:/data # FIX RDB Error on local - ports: - - $REDIS_PORT:$REDIS_PORT + airflow-scheduler: + <<: *airflow-common + command: airflow scheduler healthcheck: - test: ["CMD", "redis-cli", "ping"] - interval: 5s - timeout: 30s - retries: 50 - restart: always + test: ["CMD", "airflow", "jobs", "check", "--job-type", "SchedulerJob", "--hostname", "$${HOSTNAME}"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + depends_on: + airflow-init: + condition: service_completed_successfully - dug: - image: containers.renci.org/helxplatform/dug:latest + airflow-triggerer: + <<: *airflow-common + command: airflow triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s depends_on: - - elasticsearch - - redis - restart: always - environment: - ELASTIC_API_HOST: "$ELASTIC_API_HOST" - ELASTIC_PASSWORD: "$ELASTIC_PASSWORD" - REDIS_HOST: "$REDIS_HOST" - REDIS_PASSWORD: "$REDIS_PASSWORD" - FLASK_ENV: "development" - PYTHONUNBUFFERED: "TRUE" - entrypoint: [ "gunicorn", - "--workers=$API_WORKERS", "--name=dug", - "--bind=0.0.0.0:$API_PORT", "--timeout=$API_TIMEOUT", - "--log-level=DEBUG", "-k", "uvicorn.workers.UvicornWorker", "--reload", "dug.server:APP"] - ports: - - $API_PORT:$API_PORT + airflow-init: + condition: service_completed_successfully - elasticsearch: - user: root - image: docker.elastic.co/elasticsearch/elasticsearch:8.5.2 - environment: - - ELASTIC_PASSWORD=$ELASTIC_PASSWORD - - discovery.type=single-node - - xpack.security.enabled=true - - ingest.geoip.downloader.enabled=false - volumes: - - $DATA_DIR/elastic:/usr/share/elasticsearch/data - ports: - - '9200:9200' - - '9300:9300' + airflow-dag-processor: + <<: *airflow-common + command: airflow dag-processor + healthcheck: + test: [ "CMD", "airflow", "jobs", "check", "--job-type", "DagProcessorJob", "--hostname", "$${HOSTNAME}" ] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + depends_on: + airflow-init: + condition: service_completed_successfully - tranql: - image: containers.renci.org/helxplatform/tranql:rti-merge - ports: - - '8001:8001' - entrypoint: [ - "gunicorn", - "--workers=4", - "--bind=0.0.0.0:8001", - "--timeout=300", - "--access-logfile=$TRANQL_ACCESS_LOG", - "--error-logfile=$TRANQL_ERROR_LOG", - "--log-level=debug", - "tranql.api:app", - ] - environment: - - REDIS_PASSWORD=$REDIS_PASSWORD - volumes: - - ./tranql-schema.yaml:/tranql/tranql/conf/schema.yaml volumes: - postgres-db-volume: \ No newline at end of file + postgres-db-volume: + elasticsearch-data: # <-- New volume for Elasticsearch + redis-stack-data: +networks: + airflow-network: + driver: bridge \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6327511..c8c705f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -elasticsearch==8.5.2 +elasticsearch>=8.5.2 flatten-dict jsonpickle git+https://github.com/falkordb/falkordb-bulk-loader.git@v1.0.6 @@ -6,9 +6,15 @@ setuptools>=66 pytest PyYAML git+https://github.com/helxplatform/dug@develop -orjson==3.9.15 +orjson>=3.9.15 git+https://github.com/helxplatform/kg_utils.git@v0.0.10 git+https://github.com/helxplatform/python-stringcase@1.2.1 bmt==1.4.4 -git+https://github.com/helxplatform/avalon.git@v1.1.0 +git+https://github.com/helxplatform/avalon.git@lakefs-1.71.0 h11>=0.16.0 +starlette>=0.49.1 +datetime +aiohttp +#--- patch +werkzeug==3.0.6 +cryptography>=44.0.1 \ No newline at end of file