diff --git a/examples/05-orchestrator/README.md b/examples/05-orchestrator/README.md new file mode 100644 index 00000000..2c468111 --- /dev/null +++ b/examples/05-orchestrator/README.md @@ -0,0 +1,86 @@ +# 05 — Orchestrator Pattern + +An orchestrator workflow where independent Tasks run in parallel and a final +Task synthesizes their outputs. This demonstrates the `dependsOn` field and +prompt templates that reference dependency results. + +## Use Case + +Break a large task into smaller, independent research or work steps that run +concurrently, then combine their outputs in a single orchestrator Task. + +## How It Works + +``` +research-api-design ──┐ + ├──▶ synthesize-design +research-data-model ──┘ +``` + +1. **Stage 1** — `research-api-design` and `research-data-model` run in + parallel (no dependencies). +2. **Stage 2** — `synthesize-design` has `dependsOn` set to both stage-1 Tasks. + It stays in the **Waiting** phase until both dependencies succeed, then its + prompt template is rendered with the dependency outputs before the agent + starts. + +## Prompt Templates + +Tasks that declare `dependsOn` can use Go `text/template` syntax in their +prompt to reference dependency outputs: + +```yaml +prompt: | + # Iterate over output lines + {{ range (index .Deps "research-api-design" "Outputs") }}- {{ . }} + {{ end }} + + # Access a specific structured result by key + Schema: {{ index .Deps "research-data-model" "Results" "schema" }} +``` + +Available template data per dependency: + +| Key | Type | Description | +|-----------|-------------------|--------------------------------------| +| `Outputs` | `[]string` | Free-form output lines from the agent | +| `Results` | `map[string]string` | Structured key-value results | +| `Name` | `string` | The dependency Task name | + +## Resources + +| File | Kind | Purpose | +|------|------|---------| +| `secret.yaml` | Secret | Anthropic API key for all Tasks | +| `tasks.yaml` | Task (×3) | Two research Tasks and one orchestrator Task | + +## Steps + +1. **Edit `secret.yaml`** — replace the placeholder with your real Anthropic API key. + +2. **Apply the resources:** + +```bash +kubectl apply -f examples/05-orchestrator/ +``` + +3. **Watch the Tasks:** + +```bash +kubectl get tasks -w +``` + +You should see both research Tasks start immediately, while +`synthesize-design` stays in `Waiting` until they succeed. + +4. **Stream the orchestrator logs:** + +```bash +kubectl logs -l job-name=synthesize-design -f +``` + +5. **Cleanup:** + +```bash +kubectl delete -f examples/05-orchestrator/ +``` diff --git a/examples/05-orchestrator/secret.yaml b/examples/05-orchestrator/secret.yaml new file mode 100644 index 00000000..e5ec6140 --- /dev/null +++ b/examples/05-orchestrator/secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: orchestrator-api-key +type: Opaque +stringData: + # TODO: Replace with your Anthropic API key + ANTHROPIC_API_KEY: "sk-ant-REPLACE-ME" diff --git a/examples/05-orchestrator/tasks.yaml b/examples/05-orchestrator/tasks.yaml new file mode 100644 index 00000000..2d869435 --- /dev/null +++ b/examples/05-orchestrator/tasks.yaml @@ -0,0 +1,62 @@ +### Stage 1 — Independent research tasks (run in parallel) + +apiVersion: axon.io/v1alpha1 +kind: Task +metadata: + name: research-api-design +spec: + type: claude-code + prompt: | + Research best practices for designing a REST API for a bookstore. + Cover endpoint naming, pagination, error responses, and versioning. + Be concise — produce a bullet-point summary. + credentials: + type: api-key + secretRef: + name: orchestrator-api-key + +--- +apiVersion: axon.io/v1alpha1 +kind: Task +metadata: + name: research-data-model +spec: + type: claude-code + prompt: | + Design a data model for a bookstore application. + Include entities for books, authors, categories, and inventory. + Output the schema as a concise list of tables and columns. + credentials: + type: api-key + secretRef: + name: orchestrator-api-key + +--- +### Stage 2 — Orchestrator task that synthesizes results from stage 1 + +apiVersion: axon.io/v1alpha1 +kind: Task +metadata: + name: synthesize-design +spec: + type: claude-code + dependsOn: + - research-api-design + - research-data-model + prompt: | + You are the orchestrator. Two research tasks have completed: + + ## API Design Research + {{ range (index .Deps "research-api-design" "Outputs") }}- {{ . }} + {{ end }} + + ## Data Model Research + {{ range (index .Deps "research-data-model" "Outputs") }}- {{ . }} + {{ end }} + + Combine both into a single, coherent design document for a bookstore API. + Include the data model, endpoint list, and example request/response pairs. + credentials: + type: api-key + secretRef: + name: orchestrator-api-key diff --git a/examples/README.md b/examples/README.md index 997fa7e0..b3c78753 100644 --- a/examples/README.md +++ b/examples/README.md @@ -15,6 +15,7 @@ Ready-to-use patterns and YAML manifests for orchestrating AI agents with Axon. | [02-task-with-workspace](02-task-with-workspace/) | Run a Task that clones a git repo and can create PRs | | [03-taskspawner-github-issues](03-taskspawner-github-issues/) | Automatically create Tasks from labeled GitHub issues | | [04-taskspawner-cron](04-taskspawner-cron/) | Run agent tasks on a cron schedule | +| [05-orchestrator](05-orchestrator/) | Fan-out/fan-in pattern with `dependsOn` and prompt templates | ## How to Use diff --git a/test/integration/task_test.go b/test/integration/task_test.go index 1ff5f8c8..8aff489a 100644 --- a/test/integration/task_test.go +++ b/test/integration/task_test.go @@ -13,6 +13,7 @@ import ( . "github.com/onsi/gomega" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -2216,4 +2217,162 @@ var _ = Describe("Task Controller", func() { Expect(mainContainer.Args[0]).To(Equal("Review branch feature-456")) }) }) + + Context("When creating an orchestrator Task with multiple dependencies (fan-out/fan-in)", func() { + It("Should wait for all dependencies before creating its Job", func() { + By("Creating a namespace") + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task-orchestrator", + }, + } + Expect(k8sClient.Create(ctx, ns)).Should(Succeed()) + + By("Creating a Secret with API key") + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "anthropic-api-key", + Namespace: ns.Name, + }, + StringData: map[string]string{ + "ANTHROPIC_API_KEY": "test-api-key", + }, + } + Expect(k8sClient.Create(ctx, secret)).Should(Succeed()) + + By("Creating two independent upstream tasks (fan-out)") + taskAlpha := &axonv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "orch-alpha", + Namespace: ns.Name, + }, + Spec: axonv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "Research alpha topic", + Credentials: axonv1alpha1.Credentials{ + Type: axonv1alpha1.CredentialTypeAPIKey, + SecretRef: axonv1alpha1.SecretReference{Name: "anthropic-api-key"}, + }, + }, + } + taskBeta := &axonv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "orch-beta", + Namespace: ns.Name, + }, + Spec: axonv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "Research beta topic", + Credentials: axonv1alpha1.Credentials{ + Type: axonv1alpha1.CredentialTypeAPIKey, + SecretRef: axonv1alpha1.SecretReference{Name: "anthropic-api-key"}, + }, + }, + } + Expect(k8sClient.Create(ctx, taskAlpha)).Should(Succeed()) + Expect(k8sClient.Create(ctx, taskBeta)).Should(Succeed()) + + By("Creating orchestrator task that depends on both (fan-in)") + orchestrator := &axonv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "orch-synthesize", + Namespace: ns.Name, + }, + Spec: axonv1alpha1.TaskSpec{ + Type: "claude-code", + DependsOn: []string{"orch-alpha", "orch-beta"}, + Prompt: "Synthesize results from alpha and beta", + Credentials: axonv1alpha1.Credentials{ + Type: axonv1alpha1.CredentialTypeAPIKey, + SecretRef: axonv1alpha1.SecretReference{Name: "anthropic-api-key"}, + }, + }, + } + Expect(k8sClient.Create(ctx, orchestrator)).Should(Succeed()) + + By("Verifying orchestrator enters Waiting phase") + orchKey := types.NamespacedName{Name: "orch-synthesize", Namespace: ns.Name} + Eventually(func() axonv1alpha1.TaskPhase { + var t axonv1alpha1.Task + if err := k8sClient.Get(ctx, orchKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(axonv1alpha1.TaskPhaseWaiting)) + + By("Verifying orchestrator Job does not exist yet") + orchJobKey := types.NamespacedName{Name: "orch-synthesize", Namespace: ns.Name} + Consistently(func() bool { + var job batchv1.Job + return apierrors.IsNotFound(k8sClient.Get(ctx, orchJobKey, &job)) + }, "2s", interval).Should(BeTrue()) + + By("Completing only Task Alpha — orchestrator should remain Waiting") + jobAlphaKey := types.NamespacedName{Name: "orch-alpha", Namespace: ns.Name} + var jobAlpha batchv1.Job + Eventually(func() bool { + return k8sClient.Get(ctx, jobAlphaKey, &jobAlpha) == nil + }, timeout, interval).Should(BeTrue()) + Eventually(func() error { + if err := k8sClient.Get(ctx, jobAlphaKey, &jobAlpha); err != nil { + return err + } + jobAlpha.Status.Succeeded = 1 + return k8sClient.Status().Update(ctx, &jobAlpha) + }, timeout, interval).Should(Succeed()) + + alphaKey := types.NamespacedName{Name: "orch-alpha", Namespace: ns.Name} + Eventually(func() axonv1alpha1.TaskPhase { + var t axonv1alpha1.Task + if err := k8sClient.Get(ctx, alphaKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(axonv1alpha1.TaskPhaseSucceeded)) + + By("Verifying orchestrator still has no Job (one dep remaining)") + Consistently(func() bool { + var job batchv1.Job + return apierrors.IsNotFound(k8sClient.Get(ctx, orchJobKey, &job)) + }, "2s", interval).Should(BeTrue()) + + By("Completing Task Beta — orchestrator should now start") + jobBetaKey := types.NamespacedName{Name: "orch-beta", Namespace: ns.Name} + var jobBeta batchv1.Job + Eventually(func() bool { + return k8sClient.Get(ctx, jobBetaKey, &jobBeta) == nil + }, timeout, interval).Should(BeTrue()) + Eventually(func() error { + if err := k8sClient.Get(ctx, jobBetaKey, &jobBeta); err != nil { + return err + } + jobBeta.Status.Succeeded = 1 + return k8sClient.Status().Update(ctx, &jobBeta) + }, timeout, interval).Should(Succeed()) + + betaKey := types.NamespacedName{Name: "orch-beta", Namespace: ns.Name} + Eventually(func() axonv1alpha1.TaskPhase { + var t axonv1alpha1.Task + if err := k8sClient.Get(ctx, betaKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).Should(Equal(axonv1alpha1.TaskPhaseSucceeded)) + + By("Verifying orchestrator Job is created after all dependencies succeed") + var orchJob batchv1.Job + Eventually(func() bool { + return k8sClient.Get(ctx, orchJobKey, &orchJob) == nil + }, 2*timeout, interval).Should(BeTrue()) + + By("Verifying orchestrator task transitions out of Waiting") + Eventually(func() axonv1alpha1.TaskPhase { + var t axonv1alpha1.Task + if err := k8sClient.Get(ctx, orchKey, &t); err != nil { + return "" + } + return t.Status.Phase + }, timeout, interval).ShouldNot(Equal(axonv1alpha1.TaskPhaseWaiting)) + }) + }) })