Patterns and recommendations for writing maintainable, testable Crossplane compositions with function-starlark.
The standard composition structure. Read inputs, build configurations, emit resources:
# 1. Extract: Read input from XR
region = get(oxr, "spec.region", "us-east-1")
env = get(oxr, "spec.environment", "dev")
# 2. Transform: Build resource configurations
bucket_config = {
"apiVersion": "s3.aws.upbound.io/v1beta1",
"kind": "Bucket",
"spec": {"forProvider": {"region": region}},
}
# 3. Emit: Register resources
Resource("bucket", bucket_config)Use plain if statements for environment-specific or optional resources:
if env == "prod":
Resource("monitoring", {
"apiVersion": "monitoring.example.io/v1",
"kind": "Dashboard",
"spec": {"forProvider": {"region": region, "enabled": True}},
})Capture ResourceRef objects for dependency chains:
bucket_refs = []
for i in range(count):
ref = Resource("bucket-%d" % i, {
"apiVersion": "s3.aws.upbound.io/v1beta1",
"kind": "Bucket",
"metadata": {"name": "%s-bucket-%d" % (xr_name, i)},
"spec": {"forProvider": {"region": region}},
})
bucket_refs.append(ref)
# Aggregator depends on all buckets
Resource("aggregator", {
"apiVersion": "lambda.aws.upbound.io/v1beta1",
"kind": "Function",
"spec": {"forProvider": {"region": region}},
}, depends_on=bucket_refs)Extract repeated logic into functions at the top of the script:
def make_bucket(name, region, tags={}):
return {
"apiVersion": "s3.aws.upbound.io/v1beta1",
"kind": "Bucket",
"metadata": {"name": name},
"spec": {"forProvider": {"region": region, "tags": tags}},
}
Resource("data", make_bucket("data-bucket", region))
Resource("logs", make_bucket("logs-bucket", region, tags={"Purpose": "logging"}))Always set a condition and emit a summary event at the end of the script:
count = 10
set_condition("Ready", "True", "Available", "Created %d resources" % count)
emit_event("Normal", "Composition reconciled successfully")This provides visibility into composition health via kubectl describe and
XR status conditions.
For writing structured status fields, use set_xr_status() instead of direct
dxr["status"] assignment. It auto-creates intermediate dicts and preserves
sibling fields:
set_xr_status("atProvider.projectId", project_id)
set_xr_status("atProvider.arn", arn)
set_xr_status("region", region)Let auto-injection handle crossplane.io/* labels. Do not manually set them
-- function-starlark injects crossplane.io/composite, crossplane.io/claim-name,
and crossplane.io/claim-namespace automatically on every Resource() call.
To read labels with dotted keys like app.kubernetes.io/name, use
get_label() instead of get() which splits on dots:
# Correct -- looks up the literal key in the labels map
name = get_label(oxr, "app.kubernetes.io/name", "unknown")
# Also works for annotations
ext_name = get_annotation(oxr, "crossplane.io/external-name", "")Both return the default when the key, labels/annotations map, or metadata is missing. See the builtins reference for full details.
Use the labels= kwarg for team, cost-center, or environment labels that
should apply to all resources:
common_labels = {"team": "platform", "cost-center": "eng-123", "env": env}
Resource("bucket", {...}, labels=common_labels)
Resource("topic", {...}, labels=common_labels)Use body metadata.labels for resource-specific labels that vary per
resource (e.g., index labels in a loop):
for i in range(count):
Resource("bucket-%d" % i, {
"apiVersion": "s3.aws.upbound.io/v1beta1",
"kind": "Bucket",
"metadata": {"labels": {"index": str(i)}},
"spec": {"forProvider": {"region": region}},
}, labels=common_labels)
# Result: index label from body + common labels from kwarg + crossplane auto-labelsUse labels=None only when you need exact control over labels -- for example,
when migrating existing resources that must not gain new labels:
Resource("legacy-bucket", {...}, labels=None)If your labels= kwarg uses a key that collides with crossplane.io/*, a
Warning event is emitted. This is usually a mistake -- let auto-injection
handle Crossplane labels.
A depends on B:
b_ref = Resource("database", {...})
Resource("schema", {...}, depends_on=[b_ref])Multiple resources depend on one parent:
parent_ref = Resource("vpc", {...})
Resource("subnet-a", {...}, depends_on=[parent_ref])
Resource("subnet-b", {...}, depends_on=[parent_ref])
Resource("subnet-c", {...}, depends_on=[parent_ref])One resource depends on many:
ref1 = Resource("subnet-a", {...})
ref2 = Resource("subnet-b", {...})
ref3 = Resource("subnet-c", {...})
Resource("route-table", {...}, depends_on=[ref1, ref2, ref3])When a resource is wrapped in a kubernetes.crossplane.io Object, the Object
appears in observed state before the inner resource has its status populated.
Use tuple syntax to wait for a specific field instead of manual observed-state
guards:
# Instead of:
# group_oid = get(observed, "group.status.atProvider.manifest.status.atProvider.objectId", "")
# if group_oid:
# Resource("mapping", {...})
# Use tuple syntax:
group = Resource("group", object_body)
Resource("mapping", {
"spec": {"forProvider": {"groupId": get(observed, "group.status.atProvider.manifest.status.atProvider.objectId", "")}},
}, depends_on=[(group, "status.atProvider.manifest.status.atProvider.objectId")])This is cleaner and ensures the SAML mapping is deferred until the field is truthy, while still generating Usage resources for deletion ordering.
function-starlark detects cycles in the dependency graph and reports a fatal error. Ensure your dependency graph is a DAG.
The default 10s TTL works for most resources. Increase for slow-provisioning resources (e.g., RDS instances, EKS clusters):
apiVersion: starlark.fn.crossplane.io/v1alpha1
kind: StarlarkInput
spec:
sequencingTTL: "60s" # default: 10sThe crossplane render CLI runs compositions locally against a function Docker
image without needing a Kubernetes cluster. This is the primary testing
workflow for function-starlark compositions.
- Build the function Docker image:
make build- Create example fixtures. The project's own example/ directory is a working template:
example/xr.yaml-- sample XR inputexample/composition.yaml-- composition to testexample/functions.yaml-- function reference with Docker runtimeexample/expected-output.yaml-- expected render output
crossplane render example/xr.yaml example/composition.yaml example/functions.yamlUse --include-function-results to see events and results in the output:
crossplane render example/xr.yaml example/composition.yaml example/functions.yaml \
--include-function-resultsUse make render-check to diff render output against expected output. Add this
to your CI pipeline:
make render-checkThis builds the Docker image, runs crossplane render, and diffs the output
against example/expected-output.yaml. Any unexpected change causes a failure.
- When updating compositions, update
expected-output.yamlto match. The diff shows you exactly what changed. - Keep fixture XRs minimal -- test one pattern per XR, not every feature at once.
- Use the existing example/ directory as a starting template for your own composition tests.
| Composition size | Recommendation |
|---|---|
| Small (< 100 lines) | Inline source is fine |
| Medium (100-300 lines) | Extract helpers into inline modules (spec.modules) |
| Large (300+ lines) or shared | Package as OCI modules -- see OCI module distribution |
Use standard library modules for common patterns (networking, naming, labels, conditions) rather than reimplementing. See the standard library reference.
When multiple schema packages export the same type name (common with cloud
providers that define Account, Network, or Subnet across API groups),
use namespace alias imports to avoid name conflicts:
# Problem: both modules export "Account" -- flat star imports clash
# load("schemas-azure:v2.5.0/storage/v1.star", "*")
# load("schemas-azure:v2.5.0/cosmosdb/v1.star", "*")
# Solution: namespace alias imports keep each provider's types separate
load("schemas-azure:v2.5.0/storage/v1.star", storage="*")
load("schemas-azure:v2.5.0/cosmosdb/v1.star", cosmosdb="*")
storage.Account(location="eastus", account_replication_type="LRS")
cosmosdb.Account(location="eastus", kind="GlobalDocumentDB")Use one namespace per API group or provider package. This mirrors how Go and Python organize types by package path and makes it clear which provider each type belongs to.
Starlark has no try/except. Use defensive coding patterns:
Use get() with defaults instead of direct dict access:
# Safe -- returns "us-east-1" if path does not exist
region = get(oxr, "spec.region", "us-east-1")
# Unsafe -- raises KeyError if spec or region is missing
region = oxr["spec"]["region"]For observed resources, use get_observed() to avoid manual existence checks:
# One call instead of checking "bucket" in observed first
arn = get_observed("bucket", "status.atProvider.arn", "pending")Use if "key" in dict: before accessing optional fields:
if "monitoring" in get(oxr, "spec", {}):
Resource("dashboard", {...})Use fatal() for unrecoverable errors with clear messages:
region = get(oxr, "spec.region")
if not region:
fatal("spec.region is required but was not provided")Use emit_event("Warning", ...) for situations that are not fatal but should
be visible:
if count > 100:
emit_event("Warning", "Creating %d resources -- consider splitting into smaller compositions" % count)Use schemas when field accuracy matters:
- Production resources -- storage accounts, databases, networking rules where a typo causes silent misconfiguration
- Frequently-edited compositions -- schemas catch regressions when multiple people modify the same composition
- Resources with many similar field names --
accountTiervsaccountKindvsaccountReplicationTypeare easy to confuse
Use plain dicts when the overhead is not worth it:
- Simple resources with 2-3 obvious fields
- Prototyping -- schemas can be added later without changing resource output
- Well-understood structures that rarely change
Start with your most error-prone resource. Add schemas incrementally -- you do not need to schema-validate every resource in a composition. Schema-validated and plain dict resources mix freely:
# Schema-validated -- catches typos in storage account fields
sa = StorageAccountSpec(location=location, account_replication_type="LRS")
Resource("storage-account", {
"spec": {"forProvider": sa},
# ...
})
# Plain dict -- simple resource, schema not needed
Resource("resource-group", {
"spec": {"forProvider": {"location": location}},
# ...
})Define sub-schemas for nested structures. Keep schema definitions at the top of the script, before Extract-Transform-Emit:
# 1. Schema definitions (top of script)
NetworkRules = schema("NetworkRules",
default_action=field(type="string", enum=["Allow", "Deny"]),
)
StorageAccountSpec = schema("StorageAccountSpec",
location=field(type="string", required=True),
network_rules=field(type=NetworkRules),
)
# 2. Extract: Read input from XR
location = get(oxr, "spec.location", "eastus")
# 3. Transform + Emit: Build and register resources
sa = StorageAccountSpec(location=location, network_rules=NetworkRules(default_action="Deny"))
Resource("storage-account", {"spec": {"forProvider": sa}})For shared schemas across compositions, schema definitions can be placed in
modules loaded via load(). See module system for details.
Non-deterministic names (randAlpha, UUID) cause resource churn across
reconciliation cycles. Use crypto.stable_id() for deterministic suffixes
derived from composite inputs.
xr_name = get(oxr, "metadata.name", "unknown")
region = get(oxr, "spec.region", "us-east-1")
# Short deterministic ID from composite inputs -- same every reconciliation
suffix = crypto.stable_id(xr_name + "-" + region)
Resource("bucket", {
"apiVersion": "s3.aws.upbound.io/v1beta1",
"kind": "Bucket",
"metadata": {"name": "data-%s" % suffix},
"spec": {"forProvider": {"region": region}},
})stable_id generates a deterministic lowercase alphanumeric ID from a seed.
The same seed always produces the same ID. Use it wherever you need a short
unique suffix derived from XR inputs. The length parameter controls output
(1-64 chars, default 8).
Platform defaults must merge recursively with user overrides without mutating
either dict. Use dict.deep_merge() for nested structures.
defaults = {
"region": "us-east-1",
"tags": {"managed-by": "crossplane", "env": "dev"},
"networking": {"vpcCidr": "10.0.0.0/16", "subnetBits": 8},
}
user = get(oxr, "spec.parameters", {})
merged = dict.deep_merge(defaults, user)
# user's tags merge INTO defaults.tags -- both dicts preserveddeep_merge recursively merges nested dicts with right-wins semantics. Both
inputs are unchanged. Use dict.merge() for shallow (top-level keys only)
merge.
Extracting account IDs, regions, or resource names from AWS ARNs, Azure
resource IDs, or URIs requires fragile string splitting. Use
regex.find_groups() for structured extraction.
arn = get_observed("role", "status.atProvider.arn", "")
groups = regex.find_groups(r"arn:aws:iam::(\d+):role/(.*)", arn)
if groups:
account_id = groups[0]
role_name = groups[1]find_groups returns capture group strings from the first match, or None
if no match. Use regex.match() for boolean checks, regex.replace_all()
for transformations. Patterns use Go RE2 syntax (not PCRE).
Checking readiness across multiple composed resources requires iterating observed state and parsing condition arrays. The conditions stdlib simplifies this to a single function call.
load("starlark-stdlib:v1/conditions.star", "all_ready", "any_degraded", "degraded")
if any_degraded(["database", "cache"]):
degraded("SubsystemFailing", "One or more data stores is not healthy")
elif all_ready():
set_condition("Ready", "True", "Available", "All resources ready")all_ready() returns True when every listed resource (or all observed, if
None) has Ready=True. any_degraded() returns True when any has
Ready=False or Synced=False. With None argument and zero observed
resources, all_ready returns False (first-reconcile safety).
Accessing observed resources that do not exist yet (first reconciliation) crashes the script. Use the v1.8 observed helpers for safe access.
# Branch safely on existence
if is_observed("database"):
db_host = get_observed("database", "status.atProvider.address", "")
db_ready = get_condition("database", "Ready")
else:
db_host = "pending"
db_ready = None
# Or get the full body with a safe default
db = observed_body("database", default={})is_observed() checks existence without field access. observed_body()
returns the full body dict or a default. get_condition() returns None
when the resource or condition is missing. All three are safe on first
reconciliation when observed is empty.
The default 10s requeue is too fast for slow-provisioning resources (RDS,
EKS) or too slow for time-sensitive operations. Use set_response_ttl() to
tune the interval based on resource state.
# Fast polling while waiting for slow resource
if not is_observed("cluster"):
set_response_ttl("15s") # first reconcile -- medium poll
elif get_condition("cluster", "Ready") and get_condition("cluster", "Ready")["status"] != "True":
set_response_ttl("30s") # provisioning -- slower poll
else:
set_response_ttl("5m") # ready -- slow pollset_response_ttl() overrides the default sequencingTTL. Accepts Go
duration strings ("30s", "5m") or int seconds. Last call wins if called
multiple times.
Replace manual None-guarding with recursive dict.compact. Set optional fields
to None and let compact prune them at any depth.
# Before: manual None-guarding
spec = {
"replicas": replicas,
}
if annotations:
spec["metadata"] = {"annotations": annotations}
if volumes:
spec["template"] = {"spec": {"volumes": volumes}}
# After: recursive dict.compact
spec = dict.compact({
"replicas": replicas,
"metadata": {
"annotations": annotations if annotations else None,
},
"template": {
"spec": {
"volumes": volumes if volumes else None,
},
},
})Empty strings, lists, and dicts are preserved -- these carry intent in
Kubernetes manifests (e.g., resources: {} means "no limits", not "omit the
field"). See builtins reference for the
full signature and behavior details.
Three progressive patterns for controlling resource emission declaratively, from simple conditional skipping to cliff-guard preservation.
(a) Simple conditional emission with when/skip_reason:
Skip a resource when a feature is disabled. Replaces wrapping Resource() in
if/else blocks with skip_resource():
# Skip resource when feature is disabled
feature_enabled = get(oxr, "spec.features.monitoring", False)
Resource("monitoring-stack", monitoring_body,
when=feature_enabled, skip_reason="monitoring disabled in spec")(b) Cliff guard with preserve_observed:
When config comes from an extra resource that may not exist on the first
reconciliation (e.g., Azure connection config), use preserve_observed to keep
the resource alive while the config source is temporarily unavailable:
# Extra resource may not exist on first reconciliation
azure_config = get_extra_resource("azure-conn", "data.config", None)
body = {
"apiVersion": "nop.crossplane.io/v1alpha1",
"kind": "NopResource",
"spec": {"forProvider": {"config": azure_config}},
} if azure_config else None
Resource("azure-dep", body, preserve_observed=True)
# First reconcile (no extra resource yet): body=None, emits observed body if it
# exists, skips if not
# Subsequent reconciles: body=dict, emitted normally (preserve_observed is a no-op)(c) Combined: when + preserve_observed:
Gate on an explicit toggle while also preserving the observed body when config is absent:
# Gate + preserve: skip when explicitly disabled, preserve observed when body absent
enabled = get(oxr, "spec.features.cache", True)
cache_config = get(oxr, "spec.cacheConfig", None)
body = build_cache(cache_config) if cache_config else None
Resource("cache", body,
when=enabled, skip_reason="cache disabled",
preserve_observed=True)See builtins reference for the full behavior
state table covering all combinations of when, body, and
preserve_observed.
- Builtins reference -- complete function signatures
- Features guide -- detailed coverage of depends_on, labels, connection details, namespace modules, and metrics
- Migration cheatsheet -- Sprig/KCL to function-starlark helper mapping
- Module system -- load(), OCI modules, standard library
- Deployment guide -- cluster deployment and metrics setup