Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
f424285
refactor(mcp): migrate to fastmcp-extensions library
devin-ai-integration[bot] Jan 19, 2026
07e450a
refactor(mcp): rename modules to preserve original domain names
devin-ai-integration[bot] Jan 19, 2026
c586b8b
feat(mcp): add backward-compatible config args and filters for legacy…
devin-ai-integration[bot] Jan 19, 2026
7d27a44
fix(mcp): move type-only imports to TYPE_CHECKING block
devin-ai-integration[bot] Jan 19, 2026
8d77e00
refactor(mcp): use constants for env vars and config names
devin-ai-integration[bot] Jan 20, 2026
bc02fa1
fix(mcp): add noqa comments to all side-effect imports
devin-ai-integration[bot] Jan 20, 2026
15b6c39
fix(mcp): remove unnecessary noqa comments from side-effect imports
devin-ai-integration[bot] Jan 20, 2026
2d3509b
refactor(mcp): use explicit registration pattern instead of side-effe…
devin-ai-integration[bot] Jan 20, 2026
45db195
fix(mcp): handle potential None from get_mcp_config() calls
devin-ai-integration[bot] Jan 20, 2026
e3f941f
refactor(mcp): delete bin/test_mcp_tool.py in favor of fastmcp-extens…
devin-ai-integration[bot] Jan 20, 2026
07ec4d0
refactor(mcp): delete header extraction functions and use get_mcp_con…
devin-ai-integration[bot] Jan 20, 2026
b4721c9
refactor(mcp): move config args and filters from server.py to _tool_u…
devin-ai-integration[bot] Jan 20, 2026
d2b9578
refactor(mcp): rename initialize_secrets() to load_secrets_to_env_vars()
devin-ai-integration[bot] Jan 20, 2026
94c9cf9
refactor(mcp): move resolver functions to _arg_resolvers.py
devin-ai-integration[bot] Jan 20, 2026
c6f9bc7
refactor(mcp): inline config args in server.py and use multiline list…
devin-ai-integration[bot] Jan 20, 2026
07fcf9b
refactor(mcp): simplify register_cloud_tools by evaluating workspace_…
devin-ai-integration[bot] Jan 20, 2026
b12ba50
refactor(mcp): rename _util.py to _config.py
devin-ai-integration[bot] Jan 20, 2026
a856bf5
fix(mcp): address CodeRabbit feedback
devin-ai-integration[bot] Jan 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 54 additions & 22 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,36 +258,68 @@ def _str_to_bool(value: str) -> bool:

# MCP (Model Context Protocol) Constants

MCP_TOOL_DOMAINS: list[str] = ["cloud", "local", "registry"]
"""Valid MCP tool domains available in the server.
MCP_READONLY_MODE_ENV_VAR: str = "AIRBYTE_CLOUD_MCP_READONLY_MODE"
"""Environment variable to enable read-only mode for the MCP server.

- `cloud`: Tools for managing Airbyte Cloud resources (sources, destinations, connections)
- `local`: Tools for local operations (connector validation, caching, SQL queries)
- `registry`: Tools for querying the Airbyte connector registry
When set to "1" or "true", only tools with readOnlyHint=True will be available.
"""

AIRBYTE_MCP_DOMAINS: list[str] | None = [
d.strip().lower() for d in os.getenv("AIRBYTE_MCP_DOMAINS", "").split(",") if d.strip()
] or None
"""Enabled MCP tool domains from the `AIRBYTE_MCP_DOMAINS` environment variable.
MCP_DOMAINS_DISABLED_ENV_VAR: str = "AIRBYTE_MCP_DOMAINS_DISABLED"
"""Environment variable to disable specific MCP tool domains.

Accepts a comma-separated list of domain names (e.g., "registry,cloud").
Accepts a comma-separated list of domain names (e.g., "local,registry").
Tools from these domains will not be advertised by the MCP server.
"""

MCP_DOMAINS_ENV_VAR: str = "AIRBYTE_MCP_DOMAINS"
"""Environment variable to enable specific MCP tool domains.

Accepts a comma-separated list of domain names (e.g., "cloud,registry").
If set, only tools from these domains will be advertised by the MCP server.
If not set (None), all domains are enabled by default.
"""

Values are case-insensitive and whitespace is trimmed.
MCP_WORKSPACE_ID_HEADER: str = "X-Airbyte-Workspace-Id"
"""HTTP header key for passing workspace ID to the MCP server.

This allows per-request workspace ID configuration when using HTTP transport.
"""

AIRBYTE_MCP_DOMAINS_DISABLED: list[str] | None = [
d.strip().lower() for d in os.getenv("AIRBYTE_MCP_DOMAINS_DISABLED", "").split(",") if d.strip()
] or None
"""Disabled MCP tool domains from the `AIRBYTE_MCP_DOMAINS_DISABLED` environment variable.
# MCP Config Arg Names (used with get_mcp_config)

Accepts a comma-separated list of domain names (e.g., "registry").
Tools from these domains will not be advertised by the MCP server.
MCP_CONFIG_READONLY_MODE: str = "airbyte_readonly_mode"
"""Config arg name for the legacy AIRBYTE_CLOUD_MCP_READONLY_MODE setting."""

When both `AIRBYTE_MCP_DOMAINS` and `AIRBYTE_MCP_DOMAINS_DISABLED` are set,
the disabled list takes precedence (subtracts from the enabled list).
MCP_CONFIG_EXCLUDE_MODULES: str = "airbyte_exclude_modules"
"""Config arg name for the legacy AIRBYTE_MCP_DOMAINS_DISABLED setting."""

Values are case-insensitive and whitespace is trimmed.
"""
MCP_CONFIG_INCLUDE_MODULES: str = "airbyte_include_modules"
"""Config arg name for the legacy AIRBYTE_MCP_DOMAINS setting."""

MCP_CONFIG_WORKSPACE_ID: str = "workspace_id"
"""Config arg name for the workspace ID setting."""

MCP_CONFIG_BEARER_TOKEN: str = "bearer_token"
"""Config arg name for the bearer token setting."""

MCP_CONFIG_CLIENT_ID: str = "client_id"
"""Config arg name for the client ID setting."""

MCP_CONFIG_CLIENT_SECRET: str = "client_secret"
"""Config arg name for the client secret setting."""

MCP_CONFIG_API_URL: str = "api_url"
"""Config arg name for the API URL setting."""

# MCP HTTP Header Keys for credentials

MCP_BEARER_TOKEN_HEADER: str = "Authorization"
"""HTTP header key for bearer token (standard Authorization header)."""

MCP_CLIENT_ID_HEADER: str = "X-Airbyte-Cloud-Client-Id"
"""HTTP header key for client ID."""

MCP_CLIENT_SECRET_HEADER: str = "X-Airbyte-Cloud-Client-Secret"
"""HTTP header key for client secret."""

MCP_API_URL_HEADER: str = "X-Airbyte-Cloud-Api-Url"
"""HTTP header key for API URL."""
8 changes: 4 additions & 4 deletions airbyte/mcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,13 @@
""" # noqa: D415

from airbyte.mcp import cloud_ops, connector_registry, local_ops, server
from airbyte.mcp import cloud, local, registry, server


__all__: list[str] = [
"cloud_ops",
"connector_registry",
"local_ops",
"cloud",
"local",
"registry",
"server",
]

Expand Down
52 changes: 0 additions & 52 deletions airbyte/mcp/_annotations.py

This file was deleted.

173 changes: 173 additions & 0 deletions airbyte/mcp/_arg_resolvers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Argument resolver functions for MCP tools.

This module provides functions to resolve and validate arguments passed to MCP tools,
including connector configurations and list-of-strings arguments.
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, overload

import yaml

from airbyte.secrets.hydration import deep_update, detect_hardcoded_secrets
from airbyte.secrets.util import get_secret


# Hint: Null result if input is Null
@overload
def resolve_list_of_strings(value: None) -> None: ...


# Hint: Non-null result if input is non-null
@overload
def resolve_list_of_strings(value: str | list[str] | set[str]) -> list[str]: ...


def resolve_list_of_strings(value: str | list[str] | set[str] | None) -> list[str] | None:
"""Resolve a string or list of strings to a list of strings.

This method will handle three types of input:

1. A list of strings (e.g., ["stream1", "stream2"]) will be returned as-is.
2. None or empty input will return None.
3. A single CSV string (e.g., "stream1,stream2") will be split into a list.
4. A JSON string (e.g., '["stream1", "stream2"]') will be parsed into a list.
5. If the input is empty or None, an empty list will be returned.

Args:
value: A string or list of strings.
"""
if value is None:
return None

if isinstance(value, list):
return value

if isinstance(value, set):
return list(value)

if not isinstance(value, str):
raise TypeError(
"Expected a string, list of strings, a set of strings, or None. "
f"Got '{type(value).__name__}': {value}"
)

value = value.strip()
if not value:
return []

if value.startswith("[") and value.endswith("]"):
# Try to parse as JSON array:
try:
parsed = json.loads(value)
if isinstance(parsed, list) and all(isinstance(item, str) for item in parsed):
return parsed
except json.JSONDecodeError as ex:
raise ValueError(f"Invalid JSON array: {value}") from ex

# Fallback to CSV split:
return [item.strip() for item in value.split(",") if item.strip()]


def resolve_connector_config( # noqa: PLR0912
config: dict | str | None = None,
config_file: str | Path | None = None,
config_secret_name: str | None = None,
config_spec_jsonschema: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Resolve a configuration dictionary, JSON string, or file path to a dictionary.

Returns:
Resolved configuration dictionary (empty if no inputs provided)

Raises:
ValueError: If JSON parsing fails or a provided input is invalid

We reject hardcoded secrets in a config dict if we detect them.
"""
config_dict: dict[str, Any] = {}

if config is None and config_file is None and config_secret_name is None:
return {}

if config_file is not None:
if isinstance(config_file, str):
config_file = Path(config_file)

if not isinstance(config_file, Path):
raise ValueError(
f"config_file must be a string or Path object, got: {type(config_file).__name__}"
)

if not config_file.exists():
raise FileNotFoundError(f"Configuration file not found: {config_file}")

def _raise_invalid_type(file_config: object) -> None:
raise TypeError(
f"Configuration file must contain a valid JSON/YAML object, "
f"got: {type(file_config).__name__}"
)

try:
file_config = yaml.safe_load(config_file.read_text())
if not isinstance(file_config, dict):
_raise_invalid_type(file_config)
config_dict.update(file_config)
except Exception as e:
raise ValueError(f"Error reading configuration file {config_file}: {e}") from e

if config is not None:
if isinstance(config, dict):
config_dict.update(config)
elif isinstance(config, str):
try:
parsed_config = json.loads(config)
if not isinstance(parsed_config, dict):
raise TypeError(
f"Parsed JSON config must be an object/dict, "
f"got: {type(parsed_config).__name__}"
)
config_dict.update(parsed_config)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in config parameter: {e}") from e
else:
raise ValueError(f"Config must be a dict or JSON string, got: {type(config).__name__}")

if config_dict and config_spec_jsonschema is not None:
hardcoded_secrets: list[list[str]] = detect_hardcoded_secrets(
config=config_dict,
spec_json_schema=config_spec_jsonschema,
)
if hardcoded_secrets:
error_msg = "Configuration contains hardcoded secrets in fields: "
error_msg += ", ".join(
[".".join(hardcoded_secret) for hardcoded_secret in hardcoded_secrets]
)

error_msg += (
"Please use environment variables instead. For example:\n"
"To set a secret via reference, set its value to "
"`secret_reference::ENV_VAR_NAME`.\n"
)
raise ValueError(error_msg)

if config_secret_name is not None:
# Assume this is a secret name that points to a JSON/YAML config.
secret_config = yaml.safe_load(str(get_secret(config_secret_name)))
if not isinstance(secret_config, dict):
raise ValueError(
f"Secret '{config_secret_name}' must contain a valid JSON or YAML object, "
f"but got: {type(secret_config).__name__}"
)

# Merge the secret config into the main config:
deep_update(
config_dict,
secret_config,
)

return config_dict
Loading