Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
583e6f4
fix: add permissions to get/list/watch any resources
JoseSzycho Feb 24, 2026
400e236
chore: crete pf-all command and update nats-queue to get reindex queu…
JoseSzycho Feb 24, 2026
1d732f3
feat: add RBAC to resource-indexer
JoseSzycho Feb 24, 2026
480dde6
refactor: centralize Kubernetes cache startup and synchronization for…
JoseSzycho Feb 24, 2026
52f2517
feat: introduce SearchQuery against MeiliSearch
JoseSzycho Feb 25, 2026
c05ce96
feat: add NATS TLS configuration options and apply them to the NATS c…
JoseSzycho Feb 27, 2026
714b498
feat: Add `LeaderElectionNamespace` option and flag for controller ma…
JoseSzycho Feb 27, 2026
8cda70b
feat: Add NATS TLS CA, certificate, and key configuration for secure …
JoseSzycho Feb 27, 2026
d21cc0b
feat: Add resource GVK to policy evaluation results and transformed d…
JoseSzycho Feb 27, 2026
0163b33
fix: Enable OpenAPI schema generation and update code generation tool…
JoseSzycho Mar 2, 2026
f5b032d
chore: consolidate kustomize components into core-control-plane overlay
JoseSzycho Mar 3, 2026
0ce40b9
feat: Introduce new IAM roles (viewer, editor, admin) and define the …
JoseSzycho Mar 3, 2026
68cc460
feat: Add NATS URL, TLS, and leader election configuration to control…
JoseSzycho Mar 3, 2026
d0f2eae
feat: Add Meilisearch domain argument and environment variable to the…
JoseSzycho Mar 3, 2026
1c26183
feat: add meilisearch-domain as env pattern
JoseSzycho Mar 3, 2026
a74d692
chore: update NATS URL in controller-manager deployment and Meilisear…
JoseSzycho Mar 3, 2026
0e089eb
chore: update diagrams with latest plantuml renderer
JoseSzycho Mar 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-apiserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ jobs:
with:
bundle-name: ghcr.io/datum-cloud/search-kustomize
bundle-path: config
image-overlays: config/base
image-overlays: config/overlays/core-control-plane
image-name: ghcr.io/datum-cloud/search
secrets: inherit
68 changes: 28 additions & 40 deletions Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,37 +122,18 @@ tasks:
# Generate RBAC rules for the controllers.
- echo "Generating RBAC rules for the controllers..."
- "\"{{.TOOL_DIR}}/controller-gen\" rbac:roleName=milo-controller-manager paths=\"./internal/controllers/...\" output:dir=\"./config/overlays/controller-manager/core-control-plane/rbac\""
- "\"{{.TOOL_DIR}}/controller-gen\" rbac:roleName=milo-resource-indexer paths=\"./internal/indexer/...\" output:dir=\"./config/overlays/resource-indexer/core-control-plane/rbac\""
- task: generate:openapi
silent: true

generate:openapi:
desc: Generate OpenAPI definitions for search API types
deps:
- task: install-go-tool
vars:
NAME: openapi-gen
PACKAGE: k8s.io/code-generator/cmd/openapi-gen
VERSION: v0.23.0
desc: Generate Kubernetes OpenAPI definitions
cmds:
- echo "Generating OpenAPI definitions..."
- |
set -e
# Packages to generate OpenAPI for
PACKAGES=(
"pkg/apis/search/v1alpha1"
)

for REL_DIR in "${PACKAGES[@]}"; do
PKG="go.miloapis.net/search/$REL_DIR"
echo "Generating OpenAPI for $PKG..."

"{{.TOOL_DIR}}/openapi-gen" \
--input-dirs "$PKG,k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/apimachinery/pkg/runtime,k8s.io/apimachinery/pkg/version" \
--output-package "$REL_DIR" \
--output-base "." \
--output-file-base "zz_generated.openapi" \
--go-header-file "hack/boilerplate.go.txt"
done
echo "🔄 Generating Kubernetes OpenAPI definitions..."
./hack/update-codegen.sh
echo "✅ OpenAPI generation complete"
silent: true

# Test tasks
Expand Down Expand Up @@ -376,6 +357,7 @@ tasks:
echo " Etcd: task test-infra:kubectl -- logs -l app.kubernetes.io/name=etcd -n etcd-system -f"
echo " Search API Server: task test-infra:kubectl -- logs -l app.kubernetes.io/name=search-apiserver -n search-system -f"
echo " Search Controller: task test-infra:kubectl -- logs -l app.kubernetes.io/name=search-controller-manager -n search-system -f"
echo " Search Indexer: task test-infra:kubectl -- logs -l app.kubernetes.io/name=resource-indexer -n search-system -f"

dev:generate-webhook-certs:
desc: Generate all certificates for webhook server
Expand Down Expand Up @@ -409,7 +391,9 @@ tasks:
--metrics-bind-address=:8085 \
--health-probe-bind-address=:8086 \
--leader-elect=false \
--meilisearch-domain="http://127.0.0.1:7700"
--meilisearch-domain="http://127.0.0.1:7700" \
--nats-url="nats://127.0.0.1:4222"

silent: true

dev:run-indexer:
Expand All @@ -422,21 +406,10 @@ tasks:
# Trap to kill background processes (port-forward) on exit
trap 'kill $(jobs -p)' EXIT

echo "🚀 Starting NATS port-forward in background..."
task dev:pf-nats > /dev/null 2>&1 &

echo "🚀 Starting Meilisearch port-forward in background..."
task dev:pf-meilisearch > /dev/null 2>&1 &

echo "Waiting for port-forward to be ready..."
sleep 5

echo "🚀 Running indexer against local NATS..."
MEILISEARCH_API_KEY=search-master-key \
go run ./cmd/search indexer \
--nats-url="nats://127.0.0.1:4222" \
--nats-subject="audit.>" \
--nats-queue-group="search-indexer" \
--nats-audit-consumer-name="search-indexer" \
--nats-reindex-consumer-name="search-reindexer" \
--nats-stream-name="AUDIT_EVENTS" \
Expand Down Expand Up @@ -472,6 +445,15 @@ tasks:
- echo "Port forwarding Etcd to localhost:2379..."
- task test-infra:kubectl -- port-forward -n etcd-system svc/etcd 2379:2379

dev:pf-all:
desc: Port forward all dependencies for local development
cmds:
- |
task dev:pf-meilisearch &
task dev:pf-nats &
task dev:pf-etcd &
wait

dev:run-apiserver:
desc: Run the API server locally (requires dev:pf-etcd running)
cmds:
Expand All @@ -495,6 +477,7 @@ tasks:
- |
# Use KUBECONFIG if set, otherwise fallback to default
KCFG=${KUBECONFIG:-$HOME/.kube/config}
export MEILISEARCH_API_KEY=search-master-key
go run ./cmd/search serve \
--etcd-servers http://127.0.0.1:2379 \
--secure-port 9443 \
Expand All @@ -504,7 +487,9 @@ tasks:
--authorization-kubeconfig="$KCFG" \
--kubeconfig="$KCFG" \
--client-ca-file="{{.CERTS_DIR}}/kind-ca.crt" \
--authorization-always-allow-paths=/healthz,/readyz,/livez,/openapi,/openapi/v2,/openapi/v3,/apis,/api
--authorization-always-allow-paths=/healthz,/readyz,/livez,/openapi,/openapi/v2,/openapi/v3,/apis,/api \
--meilisearch-domain="http://127.0.0.1:7700" \


dev:undeploy:
desc: Undeploy Search server from test-infra cluster
Expand Down Expand Up @@ -660,7 +645,6 @@ tasks:
deps:
- dev:build
- dev:load
- dev:deploy
cmds:
- |
set -e
Expand All @@ -671,15 +655,19 @@ tasks:
# Restart the deployment to pick up new image
task test-infra:kubectl -- rollout restart deployment/search-apiserver -n search-system
task test-infra:kubectl -- rollout restart deployment/search-controller-manager -n search-system
task test-infra:kubectl -- rollout restart deployment/resource-indexer -n search-system

# Wait for rollout to complete
echo "Waiting for rollout to complete..."
task test-infra:kubectl -- rollout status deployment/search-apiserver -n search-system --timeout=1000s
task test-infra:kubectl -- rollout status deployment/search-controller-manager -n search-system --timeout=1000s
task test-infra:kubectl -- rollout status deployment/resource-indexer -n search-system --timeout=1000s

echo "✅ Redeployment complete!"
echo "Check logs with: task test-infra:kubectl -- logs -n search-system -l app.kubernetes.io/name=search-controller-manager"
echo "Check pods with: task test-infra:kubectl -- get pods -n search-system"

dev:nats-queue:
desc: View the NATS queue for the indexer
cmds:
- task test-infra:kubectl -- exec -n nats-system $(task test-infra:kubectl -- get pods -n nats-system -l app.kubernetes.io/component=nats-box -o jsonpath="{.items[0].metadata.name}") -- nats consumer info AUDIT_EVENTS search-indexer
- task test-infra:kubectl -- exec -n nats-system $(task test-infra:kubectl -- get pods -n nats-system -l app.kubernetes.io/component=nats-box -o jsonpath="{.items[0].metadata.name}") -- nats consumer info AUDIT_EVENTS search-indexer
- task test-infra:kubectl -- exec -n nats-system $(task test-infra:kubectl -- get pods -n nats-system -l app.kubernetes.io/component=nats-box -o jsonpath="{.items[0].metadata.name}") -- nats consumer info REINDEX_EVENTS search-reindexer
45 changes: 37 additions & 8 deletions cmd/search/indexer/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/klog/v2"
runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client/config"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
)

// ResourceIndexerOptions holds the configuration for the resource indexer.
Expand All @@ -25,6 +26,9 @@ type ResourceIndexerOptions struct {
NatsURL string
NatsAuditConsumerName string
NatsStreamName string
NatsTLSCA string
NatsTLSCert string
NatsTLSKey string

// NATS re-index consumer settings (separate REINDEX_EVENTS stream)
NatsReindexStream string
Expand Down Expand Up @@ -72,6 +76,9 @@ func (o *ResourceIndexerOptions) AddFlags(fs *pflag.FlagSet) {

fs.StringVar(&o.NatsReindexStream, "nats-reindex-stream", o.NatsReindexStream, "The JetStream stream name for re-index messages.")
fs.StringVar(&o.NatsReindexConsumerName, "nats-reindex-consumer-name", o.NatsReindexConsumerName, "The name of the re-index JetStream consumer (must match the manifest).")
fs.StringVar(&o.NatsTLSCA, "nats-tls-ca", o.NatsTLSCA, "The path to the NATS TLS CA file.")
fs.StringVar(&o.NatsTLSCert, "nats-tls-cert", o.NatsTLSCert, "The path to the NATS TLS certificate file.")
fs.StringVar(&o.NatsTLSKey, "nats-tls-key", o.NatsTLSKey, "The path to the NATS TLS key file.")

fs.StringVar(&o.MeilisearchDomain, "meilisearch-domain", o.MeilisearchDomain, "Domain of the Meilisearch instance.")
fs.DurationVar(&o.MeilisearchTaskWaitTimeout, "meilisearch-task-wait-timeout", o.MeilisearchTaskWaitTimeout, "Timeout for waiting for Meilisearch tasks to complete.")
Expand Down Expand Up @@ -159,6 +166,7 @@ func NewIndexerCommand() *cobra.Command {

// Run starts the indexer consumer
func Run(o *ResourceIndexerOptions, ctx context.Context) error {
ctrllog.SetLogger(klog.NewKlogr())
// Build a scheme and REST config for the controller-runtime cache.
scheme := runtime.NewScheme()
if err := searchv1alpha1.AddToScheme(scheme); err != nil {
Expand Down Expand Up @@ -188,21 +196,42 @@ func Run(o *ResourceIndexerOptions, ctx context.Context) error {
return fmt.Errorf("failed to create policy cache: %w", err)
}

go func() {
if err := indexPolicyCache.Start(ctx); err != nil {
klog.Errorf("Index Policy cache stopped: %v", err)
}
}()
// Register handlers for both caches. They share the same underlying informer.
if err := indexPolicyCache.RegisterHandlers(ctx); err != nil {
return fmt.Errorf("failed to register index policy handlers: %w", err)
}
if err := reindexPolicyCache.RegisterHandlers(ctx); err != nil {
return fmt.Errorf("failed to register reindex policy handlers: %w", err)
}

// Start the shared cache and wait for it to be synced.
go func() {
if err := reindexPolicyCache.Start(ctx); err != nil {
klog.Errorf("Reindex Policy cache stopped: %v", err)
klog.Info("Starting shared Kubernetes cache...")
if err := k8sCache.Start(ctx); err != nil {
klog.Fatalf("Kubernetes cache stopped with error: %v", err)
}
}()

klog.Info("Waiting for cache to sync...")
if !k8sCache.WaitForCacheSync(ctx) {
return fmt.Errorf("failed to sync Kubernetes cache")
}
klog.Info("Cache synced successfully")

// Connect to NATS
klog.Infof("Connecting to NATS at %s...", o.NatsURL)
nc, err := nats.Connect(o.NatsURL)

var natsOpts []nats.Option
if o.NatsTLSCert != "" && o.NatsTLSKey != "" {
if o.NatsTLSCA != "" {
klog.Infof("Using NATS TLS CA from %s", o.NatsTLSCA)
natsOpts = append(natsOpts, nats.RootCAs(o.NatsTLSCA))
}
klog.Infof("Using NATS TLS cert from %s and key from %s", o.NatsTLSCert, o.NatsTLSKey)
natsOpts = append(natsOpts, nats.ClientCert(o.NatsTLSCert, o.NatsTLSKey))
}

nc, err := nats.Connect(o.NatsURL, natsOpts...)
if err != nil {
return fmt.Errorf("failed to connect to NATS: %w", err)
}
Expand Down
Loading