From 0f05308b60391713940b0b9d4716a07e6e454d9f Mon Sep 17 00:00:00 2001 From: "Leslie P. Polzer" Date: Tue, 6 Jan 2026 13:39:35 +0000 Subject: [PATCH 1/3] Add OSS-Fuzz infrastructure for Airflow Adds base infrastructure for OSS-Fuzz fuzzing under `scripts/ossfuzz/`. Includes: - pyproject.toml with proper Python packaging (Private :: Do Not Upload) - Dependencies on apache-airflow-core, apache-airflow-providers-standard, atheris - Entry points for uv run support - README documenting security model alignment and local testing --- pyproject.toml | 1 + scripts/ossfuzz/README.md | 72 +++++++++++++++++++++++++ scripts/ossfuzz/pyproject.toml | 56 +++++++++++++++++++ scripts/ossfuzz/src/ossfuzz/__init__.py | 0 4 files changed, 129 insertions(+) create mode 100644 scripts/ossfuzz/README.md create mode 100644 scripts/ossfuzz/pyproject.toml create mode 100644 scripts/ossfuzz/src/ossfuzz/__init__.py diff --git a/pyproject.toml b/pyproject.toml index 959ab2e07e0bd..ac02ee1f7df87 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1504,6 +1504,7 @@ members = [ "shared/secrets_backend", "shared/secrets_masker", "shared/timezones", + "scripts/ossfuzz", # Automatically generated provider workspace members (update_airflow_pyproject_toml.py) "providers/airbyte", "providers/alibaba", diff --git a/scripts/ossfuzz/README.md b/scripts/ossfuzz/README.md new file mode 100644 index 0000000000000..d4d2193c4e351 --- /dev/null +++ b/scripts/ossfuzz/README.md @@ -0,0 +1,72 @@ +# Airflow OSS-Fuzz fuzzers + +This directory contains the upstream-owned fuzz targets used by OSS-Fuzz for +Apache Airflow. + +## Security Model Alignment + +These fuzzers target code paths with **clear security boundaries** per +Airflow's [security model](../../airflow-core/docs/security/security_model.rst): + +- **DAG Serialization/Deserialization**: Used by Scheduler and API Server with + schema validation. Input comes from DAG parsing and caching. +- **Connection URI Parsing**: Used when creating/updating connections via API. + +We explicitly **avoid** fuzzing code paths in the "DAG author trust zone" +where Airflow's policy is that DAG authors can execute arbitrary code. + +## What's here + +- `src/ossfuzz/*_fuzz.py`: Atheris fuzz targets. +- `*.dict`: Optional libFuzzer dictionaries for structured inputs. +- `*.options`: libFuzzer options (e.g. `max_len`) tuned per target. +- `seed_corpus//...`: Small seed corpora that get zipped and uploaded to + OSS-Fuzz for each target. + +## Fuzzers + +| Fuzzer | Target | Security Boundary | +|--------|--------|-------------------| +| `serialized_dag_fuzz` | `DagSerialization.from_dict()` | Schema validation | +| `connection_uri_fuzz` | `Connection._parse_from_uri()` | API input validation | + +## Supported engines / sanitizers (Python constraints) + +Airflow is fuzzed as a **Python** OSS-Fuzz project. Practically, this means: + +- **Fuzzing engine**: `libfuzzer` (Atheris). Other engines (AFL/honggfuzz) are + not typically used/supported for Python targets in OSS-Fuzz. +- **Sanitizers**: `address`, `undefined`, `coverage`, `introspector` are the + relevant modes. **MSan (`memory`) is not supported** for Python OSS-Fuzz + projects. + +## Running locally + +From the repository root, use `uv run` which will automatically set up the +virtual environment and install dependencies: + +```bash +# Run serialized_dag_fuzz (quick test, 60 seconds) +uv run --package apache-airflow-ossfuzz serialized_dag_fuzz -max_total_time=60 + +# Run connection_uri_fuzz +uv run --package apache-airflow-ossfuzz connection_uri_fuzz -max_total_time=60 + +# Run with a corpus directory +mkdir -p corpus/serialized_dag_fuzz +uv run --package apache-airflow-ossfuzz serialized_dag_fuzz corpus/serialized_dag_fuzz -max_total_time=300 +``` + +## Running with OSS-Fuzz helper + +From a checkout of `google/oss-fuzz`: + +```bash +# Build + basic validation: +python3 infra/helper.py build_fuzzers --clean --sanitizer address airflow /path/to/airflow +python3 infra/helper.py check_build --sanitizer address airflow + +# Coverage build + validation: +python3 infra/helper.py build_fuzzers --clean --sanitizer coverage airflow /path/to/airflow +python3 infra/helper.py check_build --sanitizer coverage airflow +``` diff --git a/scripts/ossfuzz/pyproject.toml b/scripts/ossfuzz/pyproject.toml new file mode 100644 index 0000000000000..3c9f98d1690f2 --- /dev/null +++ b/scripts/ossfuzz/pyproject.toml @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[project] +name = "apache-airflow-ossfuzz" +description = "OSS-Fuzz harnesses for Apache Airflow" +version = "0.0" +readme = "README.md" +classifiers = [ + "Private :: Do Not Upload", +] + +dependencies = [ + "apache-airflow-core", + "apache-airflow-providers-standard", + "atheris>=2.3.0; python_version >= '3.11'", + "pendulum>=3.0.0", +] + +[project.scripts] +serialized_dag_fuzz = "ossfuzz.serialized_dag_fuzz:main" +connection_uri_fuzz = "ossfuzz.connection_uri_fuzz:main" + +[dependency-groups] +dev = [ + "apache-airflow-devel-common", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/ossfuzz"] + +[tool.ruff] +extend = "../../pyproject.toml" +src = ["src"] + +[tool.ruff.lint.per-file-ignores] +# Ignore Doc rules et al for anything outside of tests +"!src/*" = ["D", "S101", "TRY002"] diff --git a/scripts/ossfuzz/src/ossfuzz/__init__.py b/scripts/ossfuzz/src/ossfuzz/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d From 15971f3ece4fa21090cc9536e4cd5581864d547e Mon Sep 17 00:00:00 2001 From: "Leslie P. Polzer" Date: Tue, 6 Jan 2026 13:39:48 +0000 Subject: [PATCH 2/3] Add serialized_dag_fuzz OSS-Fuzz target Fuzzer for DAG serialization/deserialization targeting `DagSerialization.from_dict()`. Used by Scheduler and API Server with schema validation. Input comes from DAG parsing and caching. Includes: - `.options` with max_len tuning - `.dict` for structured input fuzzing - Seed corpus with minimal DAG JSON --- .../serialized_dag_fuzz/minimal.json | 1 + scripts/ossfuzz/serialized_dag_fuzz.dict | 14 ++ scripts/ossfuzz/serialized_dag_fuzz.options | 2 + .../src/ossfuzz/serialized_dag_fuzz.py | 197 ++++++++++++++++++ 4 files changed, 214 insertions(+) create mode 100644 scripts/ossfuzz/seed_corpus/serialized_dag_fuzz/minimal.json create mode 100644 scripts/ossfuzz/serialized_dag_fuzz.dict create mode 100644 scripts/ossfuzz/serialized_dag_fuzz.options create mode 100644 scripts/ossfuzz/src/ossfuzz/serialized_dag_fuzz.py diff --git a/scripts/ossfuzz/seed_corpus/serialized_dag_fuzz/minimal.json b/scripts/ossfuzz/seed_corpus/serialized_dag_fuzz/minimal.json new file mode 100644 index 0000000000000..6843a01314920 --- /dev/null +++ b/scripts/ossfuzz/seed_corpus/serialized_dag_fuzz/minimal.json @@ -0,0 +1 @@ +{"__version": 3, "dag": {"dag_id": "test_dag", "timezone": "UTC", "schedule": "@daily", "start_date": "2020-01-01T00:00:00+00:00"}} diff --git a/scripts/ossfuzz/serialized_dag_fuzz.dict b/scripts/ossfuzz/serialized_dag_fuzz.dict new file mode 100644 index 0000000000000..7c97bb9d84803 --- /dev/null +++ b/scripts/ossfuzz/serialized_dag_fuzz.dict @@ -0,0 +1,14 @@ +"dag_id" +"task_id" +"schedule" +"start_date" +"end_date" +"catchup" +"description" +"default_args" +"params" +"tasks" +"__type" +"__var" +"@daily" +"@hourly" diff --git a/scripts/ossfuzz/serialized_dag_fuzz.options b/scripts/ossfuzz/serialized_dag_fuzz.options new file mode 100644 index 0000000000000..8ea8588375d7e --- /dev/null +++ b/scripts/ossfuzz/serialized_dag_fuzz.options @@ -0,0 +1,2 @@ +[libfuzzer] +max_len = 8192 diff --git a/scripts/ossfuzz/src/ossfuzz/serialized_dag_fuzz.py b/scripts/ossfuzz/src/ossfuzz/serialized_dag_fuzz.py new file mode 100644 index 0000000000000..575cde6f188ec --- /dev/null +++ b/scripts/ossfuzz/src/ossfuzz/serialized_dag_fuzz.py @@ -0,0 +1,197 @@ +#!/usr/bin/python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Fuzzer for DAG serialization/deserialization. + +This targets DagSerialization.from_dict() which is used by the Scheduler and +API Server to load serialized DAGs. The security boundary is the schema +validation and type checking during deserialization. + +Target: airflow.serialization.serialized_objects.DagSerialization +Security boundary: Schema validation +""" + +import os +import sys +from datetime import timedelta + +import atheris +import pendulum + +os.environ.setdefault("AIRFLOW_HOME", "/tmp/airflow") + +with atheris.instrument_imports(include=["airflow"], enable_loader_override=False): + from airflow import DAG + from airflow.exceptions import AirflowException, DeserializationError + from airflow.serialization.serialized_objects import DagSerialization + + try: + from airflow.providers.standard.operators.empty import EmptyOperator as DummyOperator + from airflow.providers.standard.operators.python import PythonOperator + except ImportError: + try: + from airflow.operators.empty import EmptyOperator as DummyOperator + from airflow.operators.python import PythonOperator + except ImportError: + from airflow.operators.dummy_operator import DummyOperator + from airflow.operators.python_operator import PythonOperator + + +def _py_func(): + return None + + +def _collect_containers(obj, dicts: list[dict], lists: list[list], depth: int = 0) -> None: + if depth > 32: + return + if isinstance(obj, dict): + dicts.append(obj) + for v in obj.values(): + _collect_containers(v, dicts, lists, depth + 1) + elif isinstance(obj, list): + lists.append(obj) + for v in obj: + _collect_containers(v, dicts, lists, depth + 1) + + +def _fuzz_value(fdp: atheris.FuzzedDataProvider, depth: int = 0): + if depth > 2: + return None + choice = fdp.ConsumeIntInRange(0, 7) + if choice == 0: + return None + if choice == 1: + return fdp.ConsumeBool() + if choice == 2: + return fdp.ConsumeIntInRange(-1024, 1024) + if choice == 3: + return fdp.ConsumeString(256) + if choice == 4: + return fdp.ConsumeFloat() + if choice == 5: + return [_fuzz_value(fdp, depth + 1) for _ in range(fdp.ConsumeIntInRange(0, 8))] + return { + fdp.ConsumeString(16): _fuzz_value(fdp, depth + 1) for _ in range(fdp.ConsumeIntInRange(0, 8)) + } + + +def _mutate(serialized: dict, fdp: atheris.FuzzedDataProvider) -> None: + dicts: list[dict] = [] + lists: list[list] = [] + _collect_containers(serialized, dicts, lists) + + if not dicts and not lists: + return + + action = fdp.ConsumeIntInRange(0, 7) + if action in (0, 1, 2, 3) and dicts: + d = dicts[fdp.ConsumeIntInRange(0, len(dicts) - 1)] + if action == 0 and d: + k = list(d.keys())[fdp.ConsumeIntInRange(0, len(d) - 1)] + d.pop(k, None) + elif action == 1 and d: + k = list(d.keys())[fdp.ConsumeIntInRange(0, len(d) - 1)] + d[k] = _fuzz_value(fdp) + elif action == 2: + d[fdp.ConsumeString(16)] = _fuzz_value(fdp) + else: + # Rename key. + if d: + k = list(d.keys())[fdp.ConsumeIntInRange(0, len(d) - 1)] + v = d.pop(k) + d[fdp.ConsumeString(16) or k] = v + return + + if lists: + lst = lists[fdp.ConsumeIntInRange(0, len(lists) - 1)] + if action == 4 and lst: + idx = fdp.ConsumeIntInRange(0, len(lst) - 1) + lst[idx] = _fuzz_value(fdp) + elif action == 5 and len(lst) < 128: + lst.append(_fuzz_value(fdp)) + elif action == 6 and lst: + idx = fdp.ConsumeIntInRange(0, len(lst) - 1) + del lst[idx] + elif action == 7 and lst: + del lst[fdp.ConsumeIntInRange(0, len(lst) - 1) :] + + +def TestInput(input_bytes: bytes): + if len(input_bytes) > 8192: + return + + fdp = atheris.FuzzedDataProvider(input_bytes) + start_date = pendulum.datetime(2020, 1, 1, tz="UTC") + + schedule = fdp.PickValueInList([None, "@daily", "@hourly", "*/5 * * * *"]) + if fdp.ConsumeBool(): + schedule = fdp.ConsumeString(64) or schedule + + try: + with DAG( + fdp.ConsumeString(64) or "dag", + schedule=schedule, + start_date=start_date, + catchup=fdp.ConsumeBool(), + ) as dag: + t1 = DummyOperator(task_id=fdp.ConsumeString(64) or "t1") + t2 = PythonOperator(task_id=fdp.ConsumeString(64) or "t2", python_callable=_py_func) + t1 >> t2 + dag.description = fdp.ConsumeString(128) + dag.dagrun_timeout = timedelta(seconds=fdp.ConsumeIntInRange(0, 3600)) + except Exception: + return + + try: + serialized = DagSerialization.to_dict(dag) + except Exception: + return + + for _ in range(fdp.ConsumeIntInRange(0, 32)): + try: + _mutate(serialized, fdp) + except Exception: + break + + try: + deserialized = DagSerialization.from_dict(serialized) + _ = deserialized.dag_id + if getattr(deserialized, "task_dict", None): + _ = list(deserialized.task_dict.keys())[:3] + except ( + AirflowException, + DeserializationError, + ValueError, + TypeError, + KeyError, + RecursionError, + AttributeError, + IndexError, + UnicodeDecodeError, + ): + return + + +def main(): + atheris.Setup(sys.argv, TestInput, enable_python_coverage=True) + atheris.Fuzz() + + +if __name__ == "__main__": + main() From bb46319dc53d7a4dde25c1106be1f27d7055dd0d Mon Sep 17 00:00:00 2001 From: "Leslie P. Polzer" Date: Tue, 6 Jan 2026 13:41:20 +0000 Subject: [PATCH 3/3] Add connection_uri_fuzz OSS-Fuzz target Adds fuzzer for Connection URI parsing which is a security boundary (API input validation). Target: Connection._parse_from_uri() and sanitize_conn_id() Includes dictionary, options file, and seed corpus. --- scripts/ossfuzz/connection_uri_fuzz.dict | 19 +++ scripts/ossfuzz/connection_uri_fuzz.options | 2 + .../connection_uri_fuzz/http_uri.txt | 1 + .../connection_uri_fuzz/postgres_uri.txt | 1 + .../connection_uri_fuzz/with_extra.txt | 1 + .../src/ossfuzz/connection_uri_fuzz.py | 130 ++++++++++++++++++ 6 files changed, 154 insertions(+) create mode 100644 scripts/ossfuzz/connection_uri_fuzz.dict create mode 100644 scripts/ossfuzz/connection_uri_fuzz.options create mode 100644 scripts/ossfuzz/seed_corpus/connection_uri_fuzz/http_uri.txt create mode 100644 scripts/ossfuzz/seed_corpus/connection_uri_fuzz/postgres_uri.txt create mode 100644 scripts/ossfuzz/seed_corpus/connection_uri_fuzz/with_extra.txt create mode 100644 scripts/ossfuzz/src/ossfuzz/connection_uri_fuzz.py diff --git a/scripts/ossfuzz/connection_uri_fuzz.dict b/scripts/ossfuzz/connection_uri_fuzz.dict new file mode 100644 index 0000000000000..7a4a3c4d9c669 --- /dev/null +++ b/scripts/ossfuzz/connection_uri_fuzz.dict @@ -0,0 +1,19 @@ +"postgres" +"postgresql" +"mysql" +"mssql" +"sqlite" +"http" +"https" +"ssh" +"s3" +"gcs" +"wasb" +"://" +"@" +":" +"/" +"?" +"&" +"=" +"__extra__" diff --git a/scripts/ossfuzz/connection_uri_fuzz.options b/scripts/ossfuzz/connection_uri_fuzz.options new file mode 100644 index 0000000000000..60bd9b0b2fa5c --- /dev/null +++ b/scripts/ossfuzz/connection_uri_fuzz.options @@ -0,0 +1,2 @@ +[libfuzzer] +max_len = 2048 diff --git a/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/http_uri.txt b/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/http_uri.txt new file mode 100644 index 0000000000000..d4c49b6283f51 --- /dev/null +++ b/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/http_uri.txt @@ -0,0 +1 @@ +http://api.example.com/v1?timeout=30 diff --git a/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/postgres_uri.txt b/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/postgres_uri.txt new file mode 100644 index 0000000000000..a2dd858f69add --- /dev/null +++ b/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/postgres_uri.txt @@ -0,0 +1 @@ +postgres://user:password@localhost:5432/mydb diff --git a/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/with_extra.txt b/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/with_extra.txt new file mode 100644 index 0000000000000..14b38934dc118 --- /dev/null +++ b/scripts/ossfuzz/seed_corpus/connection_uri_fuzz/with_extra.txt @@ -0,0 +1 @@ +mysql://root:secret@db.local:3306/app?__extra__={"ssl": true} diff --git a/scripts/ossfuzz/src/ossfuzz/connection_uri_fuzz.py b/scripts/ossfuzz/src/ossfuzz/connection_uri_fuzz.py new file mode 100644 index 0000000000000..8b050c8925e5a --- /dev/null +++ b/scripts/ossfuzz/src/ossfuzz/connection_uri_fuzz.py @@ -0,0 +1,130 @@ +#!/usr/bin/python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Fuzzer for Airflow Connection URI parsing. + +This targets the Connection._parse_from_uri() method which is used when +creating connections via the API. Input comes from authenticated API users. + +Target: airflow.models.connection.Connection (uri parameter) +Security boundary: API input validation +""" + +import os +import sys +from urllib.parse import urlencode + +import atheris + +os.environ.setdefault("AIRFLOW_HOME", "/tmp/airflow") + +_CONN_TYPES = [ + "postgres", + "postgresql", + "mysql", + "mssql", + "sqlite", + "http", + "https", + "ssh", + "s3", + "gcs", + "wasb", +] +_HOST_PROTOCOLS = ["http", "https", "tcp", "udp"] + + +def _maybe_consume_str(fdp: atheris.FuzzedDataProvider, max_len: int) -> str | None: + if not fdp.ConsumeBool(): + return None + return fdp.ConsumeString(max_len) + + +def _build_query(fdp: atheris.FuzzedDataProvider) -> str: + params: dict[str, str] = {} + for _ in range(fdp.ConsumeIntInRange(0, 8)): + k = fdp.ConsumeString(16) + v = fdp.ConsumeString(64) + if k: + params[k] = v + if not params: + return "" + return "?" + urlencode(params, doseq=False) + + +def _build_uri(fdp: atheris.FuzzedDataProvider) -> str: + conn_type = fdp.PickValueInList(_CONN_TYPES) + host_protocol = fdp.PickValueInList(_HOST_PROTOCOLS) if fdp.ConsumeBool() else None + + login = _maybe_consume_str(fdp, 32) or "" + password = _maybe_consume_str(fdp, 32) or "" + host = _maybe_consume_str(fdp, 64) or "" + schema = _maybe_consume_str(fdp, 64) or "" + port = fdp.ConsumeIntInRange(0, 65535) if fdp.ConsumeBool() else None + + authority = "" + if login or password: + authority = login + if password or fdp.ConsumeBool(): + authority += ":" + password + authority += "@" + + host_block = host + if port is not None and (host_block or authority): + host_block += f":{port}" + if schema: + host_block += f"/{schema}" + + uri = f"{conn_type}://" + if host_protocol: + uri += f"{host_protocol}://" + uri += authority + host_block + uri += _build_query(fdp) + return uri + + +with atheris.instrument_imports(include=["airflow"], enable_loader_override=False): + from airflow.exceptions import AirflowException + from airflow.models.connection import Connection, sanitize_conn_id + + +def TestInput(input_bytes: bytes): + fdp = atheris.FuzzedDataProvider(input_bytes) + + # Test sanitize_conn_id with fuzzed input + conn_id = fdp.ConsumeString(256) + _ = sanitize_conn_id(conn_id) + + # Test Connection URI parsing + uri = _build_uri(fdp) + try: + conn = Connection(conn_id=conn_id, uri=uri) + _ = conn.get_uri() + _ = conn.extra_dejson + except (AirflowException, ValueError, TypeError, UnicodeError): + return + + +def main(): + atheris.Setup(sys.argv, TestInput, enable_python_coverage=True) + atheris.Fuzz() + + +if __name__ == "__main__": + main()