Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions examples/05-orchestrator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# 05 — Orchestrator Pattern

An orchestrator workflow where independent Tasks run in parallel and a final
Task synthesizes their outputs. This demonstrates the `dependsOn` field and
prompt templates that reference dependency results.

## Use Case

Break a large task into smaller, independent research or work steps that run
concurrently, then combine their outputs in a single orchestrator Task.

## How It Works

```
research-api-design ──┐
├──▶ synthesize-design
research-data-model ──┘
```

1. **Stage 1** — `research-api-design` and `research-data-model` run in
parallel (no dependencies).
2. **Stage 2** — `synthesize-design` has `dependsOn` set to both stage-1 Tasks.
It stays in the **Waiting** phase until both dependencies succeed, then its
prompt template is rendered with the dependency outputs before the agent
starts.

## Prompt Templates

Tasks that declare `dependsOn` can use Go `text/template` syntax in their
prompt to reference dependency outputs:

```yaml
prompt: |
# Iterate over output lines
{{ range (index .Deps "research-api-design" "Outputs") }}- {{ . }}
{{ end }}

# Access a specific structured result by key
Schema: {{ index .Deps "research-data-model" "Results" "schema" }}
```

Available template data per dependency:

| Key | Type | Description |
|-----------|-------------------|--------------------------------------|
| `Outputs` | `[]string` | Free-form output lines from the agent |
| `Results` | `map[string]string` | Structured key-value results |
| `Name` | `string` | The dependency Task name |

## Resources

| File | Kind | Purpose |
|------|------|---------|
| `secret.yaml` | Secret | Anthropic API key for all Tasks |
| `tasks.yaml` | Task (×3) | Two research Tasks and one orchestrator Task |

## Steps

1. **Edit `secret.yaml`** — replace the placeholder with your real Anthropic API key.

2. **Apply the resources:**

```bash
kubectl apply -f examples/05-orchestrator/
```

3. **Watch the Tasks:**

```bash
kubectl get tasks -w
```

You should see both research Tasks start immediately, while
`synthesize-design` stays in `Waiting` until they succeed.

4. **Stream the orchestrator logs:**

```bash
kubectl logs -l job-name=synthesize-design -f
```

5. **Cleanup:**

```bash
kubectl delete -f examples/05-orchestrator/
```
8 changes: 8 additions & 0 deletions examples/05-orchestrator/secret.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: Secret
metadata:
name: orchestrator-api-key
type: Opaque
stringData:
# TODO: Replace with your Anthropic API key
ANTHROPIC_API_KEY: "sk-ant-REPLACE-ME"
62 changes: 62 additions & 0 deletions examples/05-orchestrator/tasks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
### Stage 1 — Independent research tasks (run in parallel)

apiVersion: axon.io/v1alpha1
kind: Task
metadata:
name: research-api-design
spec:
type: claude-code
prompt: |
Research best practices for designing a REST API for a bookstore.
Cover endpoint naming, pagination, error responses, and versioning.
Be concise — produce a bullet-point summary.
credentials:
type: api-key
secretRef:
name: orchestrator-api-key

---
apiVersion: axon.io/v1alpha1
kind: Task
metadata:
name: research-data-model
spec:
type: claude-code
prompt: |
Design a data model for a bookstore application.
Include entities for books, authors, categories, and inventory.
Output the schema as a concise list of tables and columns.
credentials:
type: api-key
secretRef:
name: orchestrator-api-key

---
### Stage 2 — Orchestrator task that synthesizes results from stage 1

apiVersion: axon.io/v1alpha1
kind: Task
metadata:
name: synthesize-design
spec:
type: claude-code
dependsOn:
- research-api-design
- research-data-model
prompt: |
You are the orchestrator. Two research tasks have completed:

## API Design Research
{{ range (index .Deps "research-api-design" "Outputs") }}- {{ . }}
{{ end }}

## Data Model Research
{{ range (index .Deps "research-data-model" "Outputs") }}- {{ . }}
{{ end }}

Combine both into a single, coherent design document for a bookstore API.
Include the data model, endpoint list, and example request/response pairs.
credentials:
type: api-key
secretRef:
name: orchestrator-api-key
1 change: 1 addition & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Ready-to-use patterns and YAML manifests for orchestrating AI agents with Axon.
| [02-task-with-workspace](02-task-with-workspace/) | Run a Task that clones a git repo and can create PRs |
| [03-taskspawner-github-issues](03-taskspawner-github-issues/) | Automatically create Tasks from labeled GitHub issues |
| [04-taskspawner-cron](04-taskspawner-cron/) | Run agent tasks on a cron schedule |
| [05-orchestrator](05-orchestrator/) | Fan-out/fan-in pattern with `dependsOn` and prompt templates |

## How to Use

Expand Down
159 changes: 159 additions & 0 deletions test/integration/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

Expand Down Expand Up @@ -2216,4 +2217,162 @@ var _ = Describe("Task Controller", func() {
Expect(mainContainer.Args[0]).To(Equal("Review branch feature-456"))
})
})

Context("When creating an orchestrator Task with multiple dependencies (fan-out/fan-in)", func() {
It("Should wait for all dependencies before creating its Job", func() {
By("Creating a namespace")
ns := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "test-task-orchestrator",
},
}
Expect(k8sClient.Create(ctx, ns)).Should(Succeed())

By("Creating a Secret with API key")
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "anthropic-api-key",
Namespace: ns.Name,
},
StringData: map[string]string{
"ANTHROPIC_API_KEY": "test-api-key",
},
}
Expect(k8sClient.Create(ctx, secret)).Should(Succeed())

By("Creating two independent upstream tasks (fan-out)")
taskAlpha := &axonv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "orch-alpha",
Namespace: ns.Name,
},
Spec: axonv1alpha1.TaskSpec{
Type: "claude-code",
Prompt: "Research alpha topic",
Credentials: axonv1alpha1.Credentials{
Type: axonv1alpha1.CredentialTypeAPIKey,
SecretRef: axonv1alpha1.SecretReference{Name: "anthropic-api-key"},
},
},
}
taskBeta := &axonv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "orch-beta",
Namespace: ns.Name,
},
Spec: axonv1alpha1.TaskSpec{
Type: "claude-code",
Prompt: "Research beta topic",
Credentials: axonv1alpha1.Credentials{
Type: axonv1alpha1.CredentialTypeAPIKey,
SecretRef: axonv1alpha1.SecretReference{Name: "anthropic-api-key"},
},
},
}
Expect(k8sClient.Create(ctx, taskAlpha)).Should(Succeed())
Expect(k8sClient.Create(ctx, taskBeta)).Should(Succeed())

By("Creating orchestrator task that depends on both (fan-in)")
orchestrator := &axonv1alpha1.Task{
ObjectMeta: metav1.ObjectMeta{
Name: "orch-synthesize",
Namespace: ns.Name,
},
Spec: axonv1alpha1.TaskSpec{
Type: "claude-code",
DependsOn: []string{"orch-alpha", "orch-beta"},
Prompt: "Synthesize results from alpha and beta",
Credentials: axonv1alpha1.Credentials{
Type: axonv1alpha1.CredentialTypeAPIKey,
SecretRef: axonv1alpha1.SecretReference{Name: "anthropic-api-key"},
},
},
}
Expect(k8sClient.Create(ctx, orchestrator)).Should(Succeed())

By("Verifying orchestrator enters Waiting phase")
orchKey := types.NamespacedName{Name: "orch-synthesize", Namespace: ns.Name}
Eventually(func() axonv1alpha1.TaskPhase {
var t axonv1alpha1.Task
if err := k8sClient.Get(ctx, orchKey, &t); err != nil {
return ""
}
return t.Status.Phase
}, timeout, interval).Should(Equal(axonv1alpha1.TaskPhaseWaiting))

By("Verifying orchestrator Job does not exist yet")
orchJobKey := types.NamespacedName{Name: "orch-synthesize", Namespace: ns.Name}
Consistently(func() bool {
var job batchv1.Job
return apierrors.IsNotFound(k8sClient.Get(ctx, orchJobKey, &job))
}, "2s", interval).Should(BeTrue())

By("Completing only Task Alpha — orchestrator should remain Waiting")
jobAlphaKey := types.NamespacedName{Name: "orch-alpha", Namespace: ns.Name}
var jobAlpha batchv1.Job
Eventually(func() bool {
return k8sClient.Get(ctx, jobAlphaKey, &jobAlpha) == nil
}, timeout, interval).Should(BeTrue())
Eventually(func() error {
if err := k8sClient.Get(ctx, jobAlphaKey, &jobAlpha); err != nil {
return err
}
jobAlpha.Status.Succeeded = 1
return k8sClient.Status().Update(ctx, &jobAlpha)
}, timeout, interval).Should(Succeed())

alphaKey := types.NamespacedName{Name: "orch-alpha", Namespace: ns.Name}
Eventually(func() axonv1alpha1.TaskPhase {
var t axonv1alpha1.Task
if err := k8sClient.Get(ctx, alphaKey, &t); err != nil {
return ""
}
return t.Status.Phase
}, timeout, interval).Should(Equal(axonv1alpha1.TaskPhaseSucceeded))

By("Verifying orchestrator still has no Job (one dep remaining)")
Consistently(func() bool {
var job batchv1.Job
return apierrors.IsNotFound(k8sClient.Get(ctx, orchJobKey, &job))
}, "2s", interval).Should(BeTrue())

By("Completing Task Beta — orchestrator should now start")
jobBetaKey := types.NamespacedName{Name: "orch-beta", Namespace: ns.Name}
var jobBeta batchv1.Job
Eventually(func() bool {
return k8sClient.Get(ctx, jobBetaKey, &jobBeta) == nil
}, timeout, interval).Should(BeTrue())
Eventually(func() error {
if err := k8sClient.Get(ctx, jobBetaKey, &jobBeta); err != nil {
return err
}
jobBeta.Status.Succeeded = 1
return k8sClient.Status().Update(ctx, &jobBeta)
}, timeout, interval).Should(Succeed())

betaKey := types.NamespacedName{Name: "orch-beta", Namespace: ns.Name}
Eventually(func() axonv1alpha1.TaskPhase {
var t axonv1alpha1.Task
if err := k8sClient.Get(ctx, betaKey, &t); err != nil {
return ""
}
return t.Status.Phase
}, timeout, interval).Should(Equal(axonv1alpha1.TaskPhaseSucceeded))

By("Verifying orchestrator Job is created after all dependencies succeed")
var orchJob batchv1.Job
Eventually(func() bool {
return k8sClient.Get(ctx, orchJobKey, &orchJob) == nil
}, 2*timeout, interval).Should(BeTrue())

By("Verifying orchestrator task transitions out of Waiting")
Eventually(func() axonv1alpha1.TaskPhase {
var t axonv1alpha1.Task
if err := k8sClient.Get(ctx, orchKey, &t); err != nil {
return ""
}
return t.Status.Phase
}, timeout, interval).ShouldNot(Equal(axonv1alpha1.TaskPhaseWaiting))
})
})
})