diff --git a/Makefile b/Makefile index eb36ae7..81d9444 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,8 @@ MINIO_ROOT_PASSWORD ?= minioadmin MINIO_BUCKET ?= kafscale KAFSCALE_KIND_CLUSTER ?= kafscale-demo KAFSCALE_DEMO_NAMESPACE ?= kafscale-demo +KAFSCALE_UI_USERNAME ?= kafscaleadmin +KAFSCALE_UI_PASSWORD ?= kafscale BROKER_PORT ?= 39092 BROKER_PORTS ?= 39092 39093 39094 @@ -292,6 +294,8 @@ demo-platform: docker-build ## Launch a full platform demo on kind (operator HA --set operator.image.tag=$$OPERATOR_TAG \ --set console.image.repository=$$CONSOLE_REPO \ --set console.image.tag=$$CONSOLE_TAG \ + --set console.auth.username=$(KAFSCALE_UI_USERNAME) \ + --set console.auth.password=$(KAFSCALE_UI_PASSWORD) \ --set operator.etcdEndpoints[0]= @OPERATOR_DEPLOY=$$(kubectl -n $(KAFSCALE_DEMO_NAMESPACE) get deployments -l app.kubernetes.io/component=operator -o jsonpath='{.items[0].metadata.name}'); \ kubectl -n $(KAFSCALE_DEMO_NAMESPACE) set env deployment/$$OPERATOR_DEPLOY \ diff --git a/addons/processors/iceberg-processor/cmd/processor/main.go b/addons/processors/iceberg-processor/cmd/processor/main.go index 8d56f06..17cbb03 100644 --- a/addons/processors/iceberg-processor/cmd/processor/main.go +++ b/addons/processors/iceberg-processor/cmd/processor/main.go @@ -21,6 +21,7 @@ import ( "log" "os" "os/signal" + "strings" "syscall" "github.com/novatechflow/kafscale/addons/processors/iceberg-processor/internal/config" @@ -58,5 +59,17 @@ func envOrDefault(key, fallback string) string { if value := os.Getenv(key); value != "" { return value } + warnDefaultEnv(key, fallback) return fallback } + +func warnDefaultEnv(key, fallback string) { + if !isProdEnv() { + return + } + log.Printf("using default for unset env %s=%s", key, fallback) +} + +func isProdEnv() bool { + return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod") +} diff --git a/cmd/broker/main.go b/cmd/broker/main.go index 27b3ced..d4a760a 100644 --- a/cmd/broker/main.go +++ b/cmd/broker/main.go @@ -1634,7 +1634,10 @@ func parseEnvInt(name string, fallback int) int { if parsed, err := strconv.Atoi(val); err == nil { return parsed } + warnDefaultEnv(name, val, fmt.Sprintf("%d", fallback)) + return fallback } + warnDefaultEnv(name, "", fmt.Sprintf("%d", fallback)) return fallback } @@ -1643,7 +1646,10 @@ func parseEnvInt32(name string, fallback int32) int32 { if parsed, err := strconv.ParseInt(val, 10, 32); err == nil { return int32(parsed) } + warnDefaultEnv(name, val, fmt.Sprintf("%d", fallback)) + return fallback } + warnDefaultEnv(name, "", fmt.Sprintf("%d", fallback)) return fallback } @@ -1660,6 +1666,7 @@ func envOrDefault(name, fallback string) string { if val := strings.TrimSpace(os.Getenv(name)); val != "" { return val } + warnDefaultEnv(name, "", fallback) return fallback } @@ -1668,7 +1675,10 @@ func parseEnvFloat(name string, fallback float64) float64 { if parsed, err := strconv.ParseFloat(val, 64); err == nil { return parsed } + warnDefaultEnv(name, val, fmt.Sprintf("%g", fallback)) + return fallback } + warnDefaultEnv(name, "", fmt.Sprintf("%g", fallback)) return fallback } @@ -1680,10 +1690,28 @@ func parseEnvBool(name string, fallback bool) bool { case "0", "false", "no", "off": return false } + warnDefaultEnv(name, val, fmt.Sprintf("%t", fallback)) + return fallback } + warnDefaultEnv(name, "", fmt.Sprintf("%t", fallback)) return fallback } +func warnDefaultEnv(name, value, fallback string) { + if !isProdEnv() { + return + } + if value == "" { + slog.Warn("using default for unset env", "env", name, "default", fallback) + return + } + slog.Warn("using default for invalid env", "env", name, "value", value, "default", fallback) +} + +func isProdEnv() bool { + return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod") +} + type apiVersionSupport struct { key int16 minVersion, maxVersion int16 diff --git a/cmd/console/main.go b/cmd/console/main.go index 1e8961d..9c9e95b 100644 --- a/cmd/console/main.go +++ b/cmd/console/main.go @@ -56,9 +56,21 @@ func envOrDefault(key, fallback string) string { if val := os.Getenv(key); val != "" { return val } + warnDefaultEnv(key, fallback) return fallback } +func warnDefaultEnv(key, fallback string) { + if !isProdEnv() { + return + } + log.Printf("using default for unset env %s=%s", key, fallback) +} + +func isProdEnv() bool { + return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod") +} + func buildMetadataStore(ctx context.Context) (metadata.Store, error) { cfg, ok := consoleEtcdConfigFromEnv() if !ok { diff --git a/cmd/mcp/main.go b/cmd/mcp/main.go index f5a8694..3aba0a5 100644 --- a/cmd/mcp/main.go +++ b/cmd/mcp/main.go @@ -104,6 +104,7 @@ func envOrDefault(key, fallback string) string { if val := os.Getenv(key); val != "" { return val } + warnDefaultEnv(key, fallback) return fallback } @@ -115,6 +116,17 @@ func parseDurationEnv(key string) (time.Duration, error) { return time.ParseDuration(val) } +func warnDefaultEnv(key, fallback string) { + if !isProdEnv() { + return + } + log.Printf("using default for unset env %s=%s", key, fallback) +} + +func isProdEnv() bool { + return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod") +} + func buildMetadataStore(ctx context.Context) (metadata.Store, error) { cfg, ok := mcpEtcdConfigFromEnv() if !ok { diff --git a/pkg/operator/cluster_controller.go b/pkg/operator/cluster_controller.go index ec30c72..f3271cc 100644 --- a/pkg/operator/cluster_controller.go +++ b/pkg/operator/cluster_controller.go @@ -148,7 +148,6 @@ func (r *ClusterReconciler) brokerContainer(cluster *kafscalev1alpha1.KafscaleCl {Name: "KAFSCALE_ETCD_ENDPOINTS", Value: strings.Join(endpoints, ",")}, {Name: "KAFSCALE_BROKER_HOST", Value: brokerHost}, {Name: "KAFSCALE_BROKER_PORT", Value: fmt.Sprintf("%d", brokerPort)}, - {Name: "KAFSCALE_BROKER_ADDR", Value: ":9092"}, {Name: "KAFSCALE_METRICS_ADDR", Value: ":9093"}, } if strings.TrimSpace(cluster.Spec.S3.Endpoint) != "" {