diff --git a/poetry.lock b/poetry.lock index ac0e164..20e38f4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4456,13 +4456,13 @@ files = [ [[package]] name = "skillpacks" -version = "0.1.100" +version = "0.1.101" description = "Pluggable skills for AI agents" optional = false python-versions = "<4.0,>=3.10" files = [ - {file = "skillpacks-0.1.100-py3-none-any.whl", hash = "sha256:f0d0f849618b175fcb4f6b783ad6c379c9832b893715bd9f64c524c8fe88c862"}, - {file = "skillpacks-0.1.100.tar.gz", hash = "sha256:5dafdb7b31f49044c874ea2f9223d273d57edf5adaed9d002793703126000e38"}, + {file = "skillpacks-0.1.101-py3-none-any.whl", hash = "sha256:5f1e5bcfdebe8a04f2deb8185e241bb4e8fb7cc71efc037d607a2923841e3b9b"}, + {file = "skillpacks-0.1.101.tar.gz", hash = "sha256:f9d0d27869ecb72e3823e0b654cf902ed29efa1db5152987c5812614234dbe9f"}, ] [package.dependencies] @@ -4622,13 +4622,13 @@ widechars = ["wcwidth"] [[package]] name = "taskara" -version = "0.1.187" +version = "0.1.189" description = "Task management for AI agents" optional = false python-versions = "<4.0,>=3.10" files = [ - {file = "taskara-0.1.187-py3-none-any.whl", hash = "sha256:63d64ab179ff572a288200625176b9e89e6d0150665fb826738feca73537dace"}, - {file = "taskara-0.1.187.tar.gz", hash = "sha256:1562463103963d3d81ecdff9c03dccc861c7548346fc6e79263f9b611bb98188"}, + {file = "taskara-0.1.189-py3-none-any.whl", hash = "sha256:cd6a4d78b5eeaeaebdc3af74e5e27489dbcadcf53ad26a28f59ff5a69ae7c3cb"}, + {file = "taskara-0.1.189.tar.gz", hash = "sha256:e70865bdc8e62d6ab85fe3e45c9c1d5bb651c7dbce471046fb515aa5b1d769fa"}, ] [package.dependencies] @@ -4637,7 +4637,7 @@ namesgenerator = ">=0.3,<0.4" pydantic = ">=2.6.4,<3.0.0" redis = ">=5.2.0,<6.0.0" shortuuid = ">=1.0.13,<2.0.0" -skillpacks = ">=0.1.100,<0.2.0" +skillpacks = ">=0.1.101,<0.2.0" sqlalchemy = ">=2.0.29,<3.0.0" threadmem = ">=0.2.26,<0.3.0" tqdm = ">=4.66.4,<5.0.0" @@ -5564,4 +5564,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "jaraco.test", "more [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "dd9d806471a9b301e3fb7bd7ce2c5f46b6b4611eda8b2e83d381a3058765a0c6" +content-hash = "972edc8d60adeee8d35616f3ed58fedd52d236c7b66009cc4ec713c601665bbf" diff --git a/pyproject.toml b/pyproject.toml index c12fb18..8814f82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "surfkit" -version = "0.1.293" +version = "0.1.294" description = "A toolkit for building AI agents that use devices" authors = ["Patrick Barker ", "Jeffrey Huckabay "] license = "MIT" @@ -20,7 +20,7 @@ litellm = "^1.35.8" rich = "^13.7.1" tqdm = "^4.66.4" agentdesk = "^0.2.107" -taskara = "^0.1.187" +taskara = "^0.1.189" [tool.poetry.group.dev.dependencies] @@ -40,6 +40,14 @@ surfkit = "surfkit.cli.main:app" lint = "scripts.lint:main" +[tool.pyright] +reportUnknownParameterType = false +reportMissingTypeArgument = false +reportUnknownMemberType = false +reportUnknownVariableType = false +reportUnknownArgumentType = false + + [tool.isort] line_length = 88 profile = "black" \ No newline at end of file diff --git a/surfkit/agent.py b/surfkit/agent.py index ca0dbca..a50f63f 100644 --- a/surfkit/agent.py +++ b/surfkit/agent.py @@ -5,6 +5,8 @@ from pydantic import BaseModel from taskara import Task +from .skill import Skill + C = TypeVar("C", bound="BaseModel") T = TypeVar("T", bound="TaskAgent") @@ -16,6 +18,17 @@ class TaskAgent(Generic[C, T], ABC): def name(cls) -> str: return cls.__name__ + def learn_skill( + self, + skill: Skill, + ): + """Learn a skill + + Args: + skill (Skill): The skill + """ + raise NotImplementedError("Subclasses must implement this method") + @abstractmethod def solve_task( self, diff --git a/surfkit/db/models.py b/surfkit/db/models.py index fc538a6..c4dce52 100644 --- a/surfkit/db/models.py +++ b/surfkit/db/models.py @@ -7,6 +7,25 @@ Base = declarative_base() +class SkillRecord(Base): + __tablename__ = "skills" + + id = Column(String, primary_key=True) + owner_id = Column(String, nullable=False) + name = Column(String, nullable=False) + status = Column(String, nullable=False) + description = Column(String, nullable=False) + requirements = Column(String, nullable=True) + agent_type = Column(String, nullable=False) + threads = Column(String, nullable=True) + generating_tasks = Column(Boolean, nullable=False) + tasks = Column(String, nullable=True) + min_demos = Column(Integer, nullable=False) + demos_outstanding = Column(Integer, nullable=False) + created = Column(Float, default=time.time) + updated = Column(Float, default=time.time) + + class AgentTypeRecord(Base): __tablename__ = "agent_types" diff --git a/surfkit/runtime/agent/base.py b/surfkit/runtime/agent/base.py index ab22b1f..5e7eaf1 100644 --- a/surfkit/runtime/agent/base.py +++ b/surfkit/runtime/agent/base.py @@ -14,6 +14,7 @@ V1AgentInstance, V1AgentType, V1RuntimeConnect, + V1Skill, V1SolveTask, ) from surfkit.types import AgentType @@ -334,7 +335,6 @@ def refresh(self) -> None: class AgentRuntime(Generic[R, C], ABC): - @classmethod def name(cls) -> str: return cls.__name__ @@ -404,6 +404,25 @@ def run( """ pass + @abstractmethod + def learn_skill( + self, + name: str, + skill: V1Skill, + follow_logs: bool = False, + attach: bool = False, + ) -> None: + """Learn a skill + + Args: + name (str): Name of the agent + skill (V1Skill): The skill + follow_logs (bool, optional): Whether to follow the logs. Defaults to False. + attach (bool, optional): Whether to attach the current process to the agent + If this process dies the agent will also die. Defaults to False. + """ + pass + @abstractmethod def solve_task( self, diff --git a/surfkit/runtime/agent/kube.py b/surfkit/runtime/agent/kube.py index afb5049..7e5005d 100644 --- a/surfkit/runtime/agent/kube.py +++ b/surfkit/runtime/agent/kube.py @@ -30,6 +30,7 @@ V1AgentType, V1ResourceLimits, V1ResourceRequests, + V1Skill, V1SolveTask, ) from surfkit.types import AgentType @@ -841,6 +842,15 @@ def run( # } # return headers + def learn_skill( + self, + name: str, + skill: V1Skill, + follow_logs: bool = False, + attach: bool = False, + ) -> None: + pass + def solve_task( self, name: str, diff --git a/surfkit/server/models.py b/surfkit/server/models.py index 8cd1664..ac788d9 100644 --- a/surfkit/server/models.py +++ b/surfkit/server/models.py @@ -1,7 +1,8 @@ from typing import Any, Dict, List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field from taskara import V1Task +from threadmem import V1RoleThread class V1Action(BaseModel): @@ -175,3 +176,50 @@ class V1Meta(BaseModel): owner_id: Optional[str] = None created: float updated: float + + +class UserTasks(BaseModel): + """A list of tasks for a user story""" + + tasks: List[str] = Field(description="A list of tasks for a user story") + + +class UserTask(BaseModel): + """A task for a user story""" + + task: str = Field(description="A task for a user story") + + +class V1Skill(BaseModel): + id: str + name: str + description: str + requirements: List[str] + tasks: List[V1Task] + threads: List[V1RoleThread] = [] + status: Optional[str] = None + min_demos: Optional[int] = None + demos_outstanding: Optional[int] = None + owner_id: Optional[str] = None + generating_tasks: Optional[bool] = None + agent_type: str + remote: Optional[str] = None + created: int + updated: int + + +class V1UpdateSkill(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + requirements: Optional[List[str]] = None + tasks: Optional[List[str]] = None + threads: Optional[List[str]] = None + status: Optional[str] = None + min_demos: Optional[int] = None + demos_outstanding: Optional[int] = None + + +class V1LearnSkill(BaseModel): + skill_id: str + remote: Optional[str] = None + agent: Optional[V1Agent] = None diff --git a/surfkit/server/routes.py b/surfkit/server/routes.py index d3c9927..f8cf72c 100644 --- a/surfkit/server/routes.py +++ b/surfkit/server/routes.py @@ -1,10 +1,9 @@ import logging import os import time -from typing import Annotated, Type +from typing import Annotated, Optional, Type -from agentdesk import ConnectConfig -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from fastapi import APIRouter, BackgroundTasks, Depends from taskara import Task, TaskStatus from taskara.server.models import V1Task, V1Tasks, V1TaskUpdate from tenacity import retry, stop_after_attempt, wait_fixed @@ -12,7 +11,8 @@ from surfkit.agent import TaskAgent from surfkit.auth.transport import get_user_dependency from surfkit.env import AGENTESEA_HUB_API_KEY_ENV -from surfkit.server.models import V1SolveTask, V1UserProfile +from surfkit.server.models import V1Agent, V1LearnSkill, V1SolveTask, V1UserProfile +from surfkit.skill import Skill DEBUG_ENV_VAR = os.getenv("DEBUG", "false").lower() == "true" log_level = logging.DEBUG if DEBUG_ENV_VAR else logging.INFO @@ -40,6 +40,38 @@ async def root(): async def health(): return {"status": "ok"} + @api_router.post("/v1/learn") + async def learn_skill( + current_user: Annotated[V1UserProfile, Depends(get_user_dependency())], + background_tasks: BackgroundTasks, + skill_model: V1LearnSkill, + ): + logger.info( + f"learning skill: {skill_model.model_dump()} with user {current_user.email}" + ) + + found = Skill.find(remote=skill_model.remote, id=skill_model.skill_id) + if not found: + raise Exception(f"Skill {skill_model.skill_id} not found") + + skill = found[0] + + background_tasks.add_task(_learn_skill, skill, current_user, skill_model.agent) + + def _learn_skill( + skill: Skill, current_user: V1UserProfile, v1_agent: Optional[V1Agent] = None + ): + if v1_agent: + config = Agent.config_type().model_validate(v1_agent.config) + agent = Agent.from_config(config=config) + else: + agent = Agent.default() + + try: + agent.learn_skill(skill) + except Exception as e: + logger.error(f"error learning skill: {e}") + @api_router.post("/v1/tasks") async def solve_task( current_user: Annotated[V1UserProfile, Depends(get_user_dependency())], diff --git a/surfkit/skill.py b/surfkit/skill.py new file mode 100644 index 0000000..f88965c --- /dev/null +++ b/surfkit/skill.py @@ -0,0 +1,428 @@ +import json +import time +from dataclasses import asdict +from datetime import datetime +from enum import Enum +from typing import List, Optional + +import requests +from mllm import Router +from shortuuid import uuid +from sqlalchemy import asc +from taskara import Task, TaskStatus +from threadmem import RoleThread + +from surfkit.db.conn import WithDB +from surfkit.db.models import SkillRecord +from surfkit.server.models import UserTask, UserTasks, V1Skill, V1UpdateSkill + + +class SkillStatus(Enum): + """Skill status""" + + COMPLETED = "completed" + TRAINING = "training" + NEEDS_DEFINITION = "needs_definition" + CREATED = "created" + FINISHED = "finished" + CANCELED = "canceled" + REVIEW = "review" + + +class Skill(WithDB): + """An agent skill""" + + def __init__( + self, + description: Optional[str] = None, + requirements: Optional[list[str]] = None, + name: Optional[str] = None, + id: Optional[str] = None, + status: SkillStatus = SkillStatus.NEEDS_DEFINITION, + agent_type: Optional[str] = None, + owner_id: Optional[str] = None, + min_demos: Optional[int] = None, + demos_outstanding: Optional[int] = None, + remote: Optional[str] = None, + ): + self.description = description or "" + self.name = name + self.generating_tasks = False + if not name: + self.name = self._get_name() + self.status = status + self.requirements = requirements or [] + self.tasks: List[Task] = [] + self.owner_id = owner_id + self.agent_type = agent_type + if not self.agent_type: + self.agent_type = "foo" + self.min_demos = min_demos if min_demos is not None else 100 + self.demos_outstanding = ( + demos_outstanding if demos_outstanding is not None else 5 + ) + self.remote = remote + + self.id = id or uuid() + self.created = int(time.time()) + self.updated = int(time.time()) + + def _get_name(self) -> str: + router = Router( + [ + "gemini/gemini-2.0-flash-exp", + "anthropic/claude-3-5-sonnet-20240620", + "gpt-4o", + ] + ) + print("generating Name") + thread = RoleThread() + thread.post( + role="user", + msg=f"Please generate a name for this skill description that is no longer than 5 words: '{self.description}'", + ) + resp = router.chat(thread, model="gemini/gemini-2.0-flash-exp") + print( + "Get Name Chat response", asdict(resp), flush=True + ) # TODO test pydantic dump + return resp.msg.text + + def to_v1(self) -> V1Skill: + return V1Skill( + id=self.id, + name=self.name, # type: ignore + description=self.description, + requirements=self.requirements, + agent_type=self.agent_type, # type: ignore + tasks=[task.to_v1() for task in self.tasks], + threads=[thread.to_v1() for thread in self.threads], + status=self.status.value, + generating_tasks=self.generating_tasks, + min_demos=self.min_demos, + demos_outstanding=self.demos_outstanding, + owner_id=self.owner_id, + created=self.created, + updated=self.updated, + remote=self.remote, + ) + + @classmethod + def from_v1(cls, data: V1Skill, owner_id: Optional[str] = None) -> "Skill": + skill_status = ( + SkillStatus(data.status) if data.status else SkillStatus.NEEDS_DEFINITION + ) + out = cls.__new__(cls) + out.id = data.id + out.name = data.name + out.description = data.description + out.requirements = data.requirements + out.agent_type = data.agent_type + out.owner_id = owner_id + out.tasks = [Task.find(id=task.id)[0] for task in data.tasks] + out.threads = [RoleThread.find(id=thread.id)[0] for thread in data.threads] + out.status = skill_status + out.min_demos = data.min_demos + out.demos_outstanding = data.demos_outstanding + out.created = data.created + out.updated = data.updated + out.remote = data.remote + return out + + def to_record(self) -> SkillRecord: + return SkillRecord( + id=self.id, + owner_id=self.owner_id, + name=self.name, + description=self.description, + requirements=json.dumps(self.requirements), + agent_type=self.agent_type, + threads=json.dumps([thread._id for thread in self.threads]), # type: ignore + tasks=json.dumps([task.id for task in self.tasks]), + generating_tasks=self.generating_tasks, + status=self.status.value, + min_demos=self.min_demos, + demos_outstanding=self.demos_outstanding, + created=self.created, + updated=self.updated, + ) + + @classmethod + def from_record(cls, record: SkillRecord) -> "Skill": + thread_ids = json.loads(str(record.threads)) + threads = [RoleThread.find(id=thread_id)[0] for thread_id in thread_ids] + + task_ids = json.loads(str(record.tasks)) + + tasks = Task.find_many_lite(task_ids) + valid_task_ids = [] + + if len(tasks) < len(task_ids): + try: + print(f"updating tasks for skill {record.id}", flush=True) + task_map = {task.id: task for task in tasks} + for task_id in task_ids: + if not task_map[task_id]: + print(f"Task {task_id} not found, removing from skill") + continue + + valid_task_ids.append(task_id) + + record.tasks = json.dumps(valid_task_ids) # type: ignore + for db in cls.get_db(): + db.merge(record) + db.commit() + print(f"updated tasks for skill {record.id}", flush=True) + except Exception as e: + print(f"Error updating tasks for skill {record.id}: {e}", flush=True) + + requirements = json.loads(str(record.requirements)) + + out = cls.__new__(cls) + out.id = record.id + out.name = record.name + out.owner_id = record.owner_id + out.description = record.description + out.requirements = requirements + out.agent_type = record.agent_type + out.threads = threads + out.tasks = tasks + out.generating_tasks = record.generating_tasks + out.status = SkillStatus(record.status) + out.min_demos = record.min_demos + out.demos_outstanding = record.demos_outstanding + out.created = record.created + out.updated = record.updated + return out + + def save(self): + for db in self.get_db(): + record = self.to_record() + db.merge(record) + db.commit() + + @classmethod + def find(cls, remote: Optional[str] = None, **kwargs) -> List["Skill"]: # type: ignore + print("running find for skills", flush=True) + + if remote: + resp = requests.get(f"{remote}/v1/skills") + skills = [cls.from_v1(skill) for skill in resp.json()] + for key, value in kwargs.items(): + skills = [ + skill for skill in skills if getattr(skill, key, None) == value + ] + + for skill in skills: + skill.remote = remote + + return skills + + for db in cls.get_db(): + records = ( + db.query(SkillRecord) + .filter_by(**kwargs) + .order_by(asc(SkillRecord.created)) + .all() + ) + print("skills found in db", flush=True) + return [cls.from_record(record) for record in records] + + raise ValueError("no session") + + def update(self, data: V1UpdateSkill): + if data.name: + self.name = data.name + if data.description: + self.description = data.description + if data.requirements: + self.requirements = data.requirements + if data.threads: + self.threads = [ + RoleThread.find(id=thread_id)[0] for thread_id in data.threads + ] + if data.tasks: + self.tasks = [Task.find(id=task_id)[0] for task_id in data.tasks] + if data.status: + self.status = SkillStatus(data.status) + if data.min_demos: + self.min_demos = data.min_demos + if data.demos_outstanding: + self.demos_outstanding = data.demos_outstanding + + self.save() + + def refresh(self): + """ + Refresh the object state from the database. + """ + found = self.find(id=self.id) + if not found: + raise ValueError("Skill not found") + + new = found[0] + self.name = new.name + self.description = new.description + self.requirements = new.requirements + self.threads = new.threads + self.tasks = new.tasks + self.created = new.created + self.updated = new.updated + self.owner_id = new.owner_id + self.agent_type = new.agent_type + self.generating_tasks = new.generating_tasks + self.status = new.status + self.min_demos = new.min_demos + self.demos_outstanding = new.demos_outstanding + return self + + def set_generating_tasks(self, input: bool): + if self.generating_tasks != input: + self.generating_tasks = input + self.save() + + def generate_tasks( + self, + n_permutations: int = 1, + assigned_to: Optional[str] = None, + assigned_type: Optional[str] = None, + ) -> List[Task]: + self.set_generating_tasks(True) + router = Router( + [ + "gemini/gemini-2.0-flash-exp", + "anthropic/claude-3-5-sonnet-20240620", + "gpt-4o", + ] + ) + + if len(self.requirements) > 0: + print( + f"Generating tasks for skill: '{self.description}', skill ID: {self.id} with requirements: {self.requirements}", + flush=True, + ) + current_date = datetime.now().strftime("%B %d, %Y") + prompt = ( + f"Given the agent skill '{self.description}', and the " + f"configurable requirements that the agent skill encompasses '{json.dumps(self.requirements)}', " + "Please generate tasks that a user could take which will excercise this skill, " + "our goal is to train and get good at using a skill" + f"Today's date is {current_date}. " + "For example, if the skill is 'search for stays on airbnb' " + "and a requirement is 'find stays within a travel window' then a task " + "might be 'Find the most popular available stays on Airbnb between October 12th to October 14th' " + f"Please return a raw json object that conforms to the schema {UserTasks.model_json_schema()}. " + f"Please return {n_permutations} for the tasks, for example if I need 2 stories you would return " + '{"tasks": ["Find stays from october 2nd to 3rd", "Find stays from January 15th-17th"]}' + ) + thread = RoleThread(owner_id=self.owner_id) + thread.post("user", prompt) + + response = router.chat( + thread, model="gemini/gemini-2.0-flash-exp", expect=UserTasks + ) + + if not response.parsed: + self.set_generating_tasks(False) + raise ValueError(f"unable to parse response: {response}") + + print( + f"Generated tasks: {response.parsed.model_dump_json()} for skill ID {self.id}", + flush=True, + ) + + gen_tasks = response.parsed.tasks + if not gen_tasks: + self.set_generating_tasks(False) + raise ValueError(f"no tasks generated for skill ID {self.id}") + gen_tasks = gen_tasks[:n_permutations] + + if not self.owner_id: + self.set_generating_tasks(False) + raise ValueError( + f"Owner ID must be set on skill ID {self.id} to generate tasks" + ) + + out: List[Task] = [] + for task in gen_tasks: + tsk = Task( + task, + owner_id=self.owner_id, + review_requirements=[ # TODO commenting this out for now since we are only doing user tasks + # ReviewRequirement( + # number_required=1, users=[self.owner_id] + # ) # TODO: make this configurable + ], + assigned_to=assigned_to if assigned_to else self.owner_id, + assigned_type=assigned_type if assigned_type else "user", + labels={"skill": self.id}, + ) + tsk.status = TaskStatus.IN_QUEUE + self.tasks.append(tsk) + tsk.save() + print( + f"task saved for skill ID: {self.id}", + tsk.to_v1().model_dump_json(), + flush=True, + ) + out.append(tsk) + self.generating_tasks = False + self.save() + + return out + + else: + print(f"Generating tasks for skill: {self.description}", flush=True) + current_date = datetime.now().strftime("%B %d, %Y") + prompt = ( + f"Given the agent skill '{self.description}'" + "Please generate a task that a agent could do which will excercise this skill, " + "our goal is to test whether the agent can perform the skill " + f"Today's date is {current_date}. " + "For example, if the skill is 'search for stays on airbnb' " + "and a requirement is 'find stays within a travel window' then a task " + "might be 'Find the most popular available stays on Airbnb between October 12th to October 14th' " + f"Please return a raw json object that conforms to the schema {UserTask.model_json_schema()}. " + f"For example: " + '{"task": "Find the most popular stays from october 2nd to 3rd"}' + ) + thread = RoleThread(owner_id=self.owner_id) + thread.post("user", prompt) + + response = router.chat( + thread, model="gemini/gemini-2.0-flash-exp", expect=UserTask + ) + + if not response.parsed: + raise ValueError(f"unable to parse response: {response}") + + if not self.owner_id: + raise ValueError("Owner ID must be set on story to generate tasks") + + task = Task( + response.parsed.task, + owner_id=self.owner_id, + review_requirements=[ # TODO commenting this out for now since we are only doing user tasks + # ReviewRequirement( + # number_required=1, users=[self.owner_id] + # ) # TODO: make this configurable + ], + assigned_to=assigned_to if assigned_to else self.owner_id, + assigned_type=assigned_type if assigned_type else "user", + labels={"skill": self.id}, + ) + task.status = TaskStatus.IN_QUEUE + self.tasks.append(task) + task.save() + print("task saved", task.to_v1().model_dump_json(), flush=True) + self.generating_tasks = False + self.save() + print(f"Generated task: {task.id}", flush=True) + return [task] + + def delete(self, owner_id: str): + for db in self.get_db(): + record = ( + db.query(SkillRecord).filter_by(id=self.id, owner_id=owner_id).first() + ) + db.delete(record) + db.commit()