Skip to content
This repository was archived by the owner on Nov 26, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ statsmodels==0.14.4
tabulate==0.9.0
pysnc==1.1.10
shortuuid==1.0.13
textract-py3==2.1.1
textract-py3==2.1.1
deltalake==1.0.2
google_cloud_bigquery==3.34.0
10 changes: 10 additions & 0 deletions src/alita_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
from importlib import import_module

from httpx import get

from .github import get_tools as get_github, AlitaGitHubToolkit
from .openapi import get_tools as get_openapi
from .jira import get_tools as get_jira, JiraToolkit
Expand Down Expand Up @@ -44,6 +46,8 @@
from .carrier import get_tools as get_carrier, AlitaCarrierToolkit
from .ocr import get_tools as get_ocr, OCRToolkit
from .pptx import get_tools as get_pptx, PPTXToolkit
from .aws import get_tools as get_delta_lake, DeltaLakeToolkit
from .google import get_tools as get_bigquery, BigQueryToolkit

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -120,6 +124,10 @@ def get_tools(tools_list, alita: 'AlitaClient', llm: 'LLMLikeObject', *args, **k
tools.extend(get_ocr(tool))
elif tool['type'] == 'pptx':
tools.extend(get_pptx(tool))
elif tool['type'] == 'delta_lake':
tools.extend(get_delta_lake(tool))
elif tool['type'] == 'bigquery':
tools.extend(get_bigquery(tool))
else:
if tool.get("settings", {}).get("module"):
try:
Expand Down Expand Up @@ -175,4 +183,6 @@ def get_toolkits():
AlitaCarrierToolkit.toolkit_config_schema(),
OCRToolkit.toolkit_config_schema(),
PPTXToolkit.toolkit_config_schema(),
DeltaLakeToolkit.toolkit_config_schema(),
BigQueryToolkit.toolkit_config_schema(),
]
7 changes: 7 additions & 0 deletions src/alita_tools/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .delta_lake import DeltaLakeToolkit

name = "aws"

def get_tools(tool_type, tool):
if tool_type == 'delta_lake':
return DeltaLakeToolkit().get_toolkit().get_tools()
136 changes: 136 additions & 0 deletions src/alita_tools/aws/delta_lake/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@

from functools import lru_cache
from typing import List, Optional, Type

from langchain_core.tools import BaseTool, BaseToolkit
from pydantic import BaseModel, Field, SecretStr, computed_field, field_validator

from ...utils import TOOLKIT_SPLITTER, clean_string, get_max_toolkit_length
from .api_wrapper import DeltaLakeApiWrapper
from .tool import DeltaLakeAction

name = "delta_lake"

@lru_cache(maxsize=1)
def get_available_tools() -> dict[str, dict]:
api_wrapper = DeltaLakeApiWrapper.model_construct()
available_tools: dict = {
x["name"]: x["args_schema"].model_json_schema()
for x in api_wrapper.get_available_tools()
}
return available_tools

toolkit_max_length = lru_cache(maxsize=1)(
lambda: get_max_toolkit_length(get_available_tools())
)

class DeltaLakeToolkitConfig(BaseModel):
class Config:
title = name
json_schema_extra = {
"metadata": {
"hidden": True,
"label": "AWS Delta Lake",
"icon_url": "delta-lake.svg",
"sections": {
"auth": {
"required": False,
"subsections": [
{"name": "AWS Access Key ID", "fields": ["aws_access_key_id"]},
{"name": "AWS Secret Access Key", "fields": ["aws_secret_access_key"]},
{"name": "AWS Session Token", "fields": ["aws_session_token"]},
{"name": "AWS Region", "fields": ["aws_region"]},
],
},
"connection": {
"required": False,
"subsections": [
{"name": "Delta Lake S3 Path", "fields": ["s3_path"]},
{"name": "Delta Lake Table Path", "fields": ["table_path"]},
],
},
},
}
}

aws_access_key_id: Optional[SecretStr] = Field(default=None, description="AWS access key ID", json_schema_extra={"secret": True, "configuration": True})
aws_secret_access_key: Optional[SecretStr] = Field(default=None, description="AWS secret access key", json_schema_extra={"secret": True, "configuration": True})
aws_session_token: Optional[SecretStr] = Field(default=None, description="AWS session token (optional)", json_schema_extra={"secret": True, "configuration": True})
aws_region: Optional[str] = Field(default=None, description="AWS region for Delta Lake storage", json_schema_extra={"configuration": True})
s3_path: Optional[str] = Field(default=None, description="S3 path to Delta Lake data (e.g., s3://bucket/path)", json_schema_extra={"configuration": True})
table_path: Optional[str] = Field(default=None, description="Delta Lake table path (if not using s3_path)", json_schema_extra={"configuration": True})
selected_tools: List[str] = Field(default=[], description="Selected tools", json_schema_extra={"args_schemas": get_available_tools()})

@field_validator("selected_tools", mode="before", check_fields=False)
@classmethod
def selected_tools_validator(cls, value: List[str]) -> list[str]:
return [i for i in value if i in get_available_tools()]

def _get_toolkit(tool) -> BaseToolkit:
return DeltaLakeToolkit().get_toolkit(
selected_tools=tool["settings"].get("selected_tools", []),
aws_access_key_id=tool["settings"].get("aws_access_key_id", None),
aws_secret_access_key=tool["settings"].get("aws_secret_access_key", None),
aws_session_token=tool["settings"].get("aws_session_token", None),
aws_region=tool["settings"].get("aws_region", None),
s3_path=tool["settings"].get("s3_path", None),
table_path=tool["settings"].get("table_path", None),
toolkit_name=tool.get("toolkit_name"),
)

def get_toolkit():
return DeltaLakeToolkit.toolkit_config_schema()

def get_tools(tool):
return _get_toolkit(tool).get_tools()

class DeltaLakeToolkit(BaseToolkit):
tools: List[BaseTool] = []
api_wrapper: Optional[DeltaLakeApiWrapper] = Field(default_factory=DeltaLakeApiWrapper.model_construct)
toolkit_name: Optional[str] = None

@computed_field
@property
def tool_prefix(self) -> str:
return (
clean_string(self.toolkit_name, toolkit_max_length()) + TOOLKIT_SPLITTER
if self.toolkit_name
else ""
)

@computed_field
@property
def available_tools(self) -> List[dict]:
return self.api_wrapper.get_available_tools()

@staticmethod
def toolkit_config_schema() -> Type[BaseModel]:
return DeltaLakeToolkitConfig

@classmethod
def get_toolkit(
cls,
selected_tools: list[str] | None = None,
toolkit_name: Optional[str] = None,
**kwargs,
) -> "DeltaLakeToolkit":
delta_lake_api_wrapper = DeltaLakeApiWrapper(**kwargs)
instance = cls(
tools=[], api_wrapper=delta_lake_api_wrapper, toolkit_name=toolkit_name
)
if selected_tools:
selected_tools = set(selected_tools)
for t in instance.available_tools:
if t["name"] in selected_tools:
instance.tools.append(
DeltaLakeAction(
api_wrapper=instance.api_wrapper,
name=instance.tool_prefix + t["name"],
description=f"S3 Path: {getattr(instance.api_wrapper, 's3_path', '')} Table Path: {getattr(instance.api_wrapper, 'table_path', '')}\n" + t["description"],
args_schema=t["args_schema"],
)
)
return instance

def get_tools(self):
return self.tools
Loading