Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ MINIO_ROOT_PASSWORD ?= minioadmin
MINIO_BUCKET ?= kafscale
KAFSCALE_KIND_CLUSTER ?= kafscale-demo
KAFSCALE_DEMO_NAMESPACE ?= kafscale-demo
KAFSCALE_UI_USERNAME ?= kafscaleadmin
KAFSCALE_UI_PASSWORD ?= kafscale
BROKER_PORT ?= 39092
BROKER_PORTS ?= 39092 39093 39094

Expand Down Expand Up @@ -292,6 +294,8 @@ demo-platform: docker-build ## Launch a full platform demo on kind (operator HA
--set operator.image.tag=$$OPERATOR_TAG \
--set console.image.repository=$$CONSOLE_REPO \
--set console.image.tag=$$CONSOLE_TAG \
--set console.auth.username=$(KAFSCALE_UI_USERNAME) \
--set console.auth.password=$(KAFSCALE_UI_PASSWORD) \
--set operator.etcdEndpoints[0]=
@OPERATOR_DEPLOY=$$(kubectl -n $(KAFSCALE_DEMO_NAMESPACE) get deployments -l app.kubernetes.io/component=operator -o jsonpath='{.items[0].metadata.name}'); \
kubectl -n $(KAFSCALE_DEMO_NAMESPACE) set env deployment/$$OPERATOR_DEPLOY \
Expand Down
13 changes: 13 additions & 0 deletions addons/processors/iceberg-processor/cmd/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"log"
"os"
"os/signal"
"strings"
"syscall"

"github.com/novatechflow/kafscale/addons/processors/iceberg-processor/internal/config"
Expand Down Expand Up @@ -58,5 +59,17 @@ func envOrDefault(key, fallback string) string {
if value := os.Getenv(key); value != "" {
return value
}
warnDefaultEnv(key, fallback)
return fallback
}

func warnDefaultEnv(key, fallback string) {
if !isProdEnv() {
return
}
log.Printf("using default for unset env %s=%s", key, fallback)
}

func isProdEnv() bool {
return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod")
}
28 changes: 28 additions & 0 deletions cmd/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1634,7 +1634,10 @@ func parseEnvInt(name string, fallback int) int {
if parsed, err := strconv.Atoi(val); err == nil {
return parsed
}
warnDefaultEnv(name, val, fmt.Sprintf("%d", fallback))
return fallback
}
warnDefaultEnv(name, "", fmt.Sprintf("%d", fallback))
return fallback
}

Expand All @@ -1643,7 +1646,10 @@ func parseEnvInt32(name string, fallback int32) int32 {
if parsed, err := strconv.ParseInt(val, 10, 32); err == nil {
return int32(parsed)
}
warnDefaultEnv(name, val, fmt.Sprintf("%d", fallback))
return fallback
}
warnDefaultEnv(name, "", fmt.Sprintf("%d", fallback))
return fallback
}

Expand All @@ -1660,6 +1666,7 @@ func envOrDefault(name, fallback string) string {
if val := strings.TrimSpace(os.Getenv(name)); val != "" {
return val
}
warnDefaultEnv(name, "", fallback)
return fallback
}

Expand All @@ -1668,7 +1675,10 @@ func parseEnvFloat(name string, fallback float64) float64 {
if parsed, err := strconv.ParseFloat(val, 64); err == nil {
return parsed
}
warnDefaultEnv(name, val, fmt.Sprintf("%g", fallback))
return fallback
}
warnDefaultEnv(name, "", fmt.Sprintf("%g", fallback))
return fallback
}

Expand All @@ -1680,10 +1690,28 @@ func parseEnvBool(name string, fallback bool) bool {
case "0", "false", "no", "off":
return false
}
warnDefaultEnv(name, val, fmt.Sprintf("%t", fallback))
return fallback
}
warnDefaultEnv(name, "", fmt.Sprintf("%t", fallback))
return fallback
}

func warnDefaultEnv(name, value, fallback string) {
if !isProdEnv() {
return
}
if value == "" {
slog.Warn("using default for unset env", "env", name, "default", fallback)
return
}
slog.Warn("using default for invalid env", "env", name, "value", value, "default", fallback)
}

func isProdEnv() bool {
return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod")
}

type apiVersionSupport struct {
key int16
minVersion, maxVersion int16
Expand Down
12 changes: 12 additions & 0 deletions cmd/console/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,21 @@ func envOrDefault(key, fallback string) string {
if val := os.Getenv(key); val != "" {
return val
}
warnDefaultEnv(key, fallback)
return fallback
}

func warnDefaultEnv(key, fallback string) {
if !isProdEnv() {
return
}
log.Printf("using default for unset env %s=%s", key, fallback)
}

func isProdEnv() bool {
return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod")
}

func buildMetadataStore(ctx context.Context) (metadata.Store, error) {
cfg, ok := consoleEtcdConfigFromEnv()
if !ok {
Expand Down
12 changes: 12 additions & 0 deletions cmd/mcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func envOrDefault(key, fallback string) string {
if val := os.Getenv(key); val != "" {
return val
}
warnDefaultEnv(key, fallback)
return fallback
}

Expand All @@ -115,6 +116,17 @@ func parseDurationEnv(key string) (time.Duration, error) {
return time.ParseDuration(val)
}

func warnDefaultEnv(key, fallback string) {
if !isProdEnv() {
return
}
log.Printf("using default for unset env %s=%s", key, fallback)
}

func isProdEnv() bool {
return strings.EqualFold(strings.TrimSpace(os.Getenv("KAFSCALE_ENV")), "prod")
}

func buildMetadataStore(ctx context.Context) (metadata.Store, error) {
cfg, ok := mcpEtcdConfigFromEnv()
if !ok {
Expand Down
1 change: 0 additions & 1 deletion pkg/operator/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ func (r *ClusterReconciler) brokerContainer(cluster *kafscalev1alpha1.KafscaleCl
{Name: "KAFSCALE_ETCD_ENDPOINTS", Value: strings.Join(endpoints, ",")},
{Name: "KAFSCALE_BROKER_HOST", Value: brokerHost},
{Name: "KAFSCALE_BROKER_PORT", Value: fmt.Sprintf("%d", brokerPort)},
{Name: "KAFSCALE_BROKER_ADDR", Value: ":9092"},
{Name: "KAFSCALE_METRICS_ADDR", Value: ":9093"},
}
if strings.TrimSpace(cluster.Spec.S3.Endpoint) != "" {
Expand Down