diff --git a/controllers/druid/dynamic_config.go b/controllers/druid/dynamic_config.go index 9ffb10c4..95ff7a67 100644 --- a/controllers/druid/dynamic_config.go +++ b/controllers/druid/dynamic_config.go @@ -31,7 +31,7 @@ func updateDruidDynamicConfigs( dynamicConfig := nodeConfig.DynamicConfig.Raw - svcName, err := druidapi.GetRouterSvcUrl(druid.Namespace, druid.Name, client) + svcName, err := druidapi.GetRouterSvcUrl(ctx, druid.Namespace, druid.Name, client) if err != nil { emitEvent.EmitEventGeneric( druid, diff --git a/controllers/ingestion/reconciler.go b/controllers/ingestion/reconciler.go index 1deca6d9..feb76db3 100644 --- a/controllers/ingestion/reconciler.go +++ b/controllers/ingestion/reconciler.go @@ -46,7 +46,7 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng return err } - svcName, err := druidapi.GetRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName, r.Client) + svcName, err := druidapi.GetRouterSvcUrl(ctx, di.Namespace, di.Spec.DruidClusterName, r.Client) if err != nil { return err } @@ -73,7 +73,7 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng } else { if controllerutil.ContainsFinalizer(di, DruidIngestionControllerFinalizer) { // our finalizer is present, so lets handle any external dependency - svcName, err := druidapi.GetRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName, r.Client) + svcName, err := druidapi.GetRouterSvcUrl(ctx, di.Namespace, di.Spec.DruidClusterName, r.Client) if err != nil { return err } diff --git a/pkg/druidapi/druidapi.go b/pkg/druidapi/druidapi.go index effda506..7e11dd04 100644 --- a/pkg/druidapi/druidapi.go +++ b/pkg/druidapi/druidapi.go @@ -6,6 +6,7 @@ import ( "fmt" "net/url" "path" + "strings" internalhttp "github.com/datainfrahq/druid-operator/pkg/http" v1 "k8s.io/api/core/v1" @@ -19,6 +20,12 @@ const ( OperatorPassword = "OperatorPassword" ) +// RouterConnectionInfo contains the inferred connection details for a Druid router +type RouterConnectionInfo struct { + Protocol string + Port string +} + type AuthType string const ( @@ -94,6 +101,122 @@ func GetAuthCreds( return internalhttp.BasicAuth{}, nil } +// parseProperties parses Java properties format string into a map +func parseProperties(props string) map[string]string { + result := make(map[string]string) + lines := strings.Split(props, "\n") + + for _, line := range lines { + line = strings.TrimSpace(line) + // Skip empty lines and comments + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + // Split on first '=' character + parts := strings.SplitN(line, "=", 2) + if len(parts) == 2 { + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + result[key] = value + } + } + + return result +} + +// InferRouterConnectionFromConfig analyzes Druid configuration to determine router connection settings +func InferRouterConnectionFromConfig(commonConfig, routerConfig string) RouterConnectionInfo { + // Parse both configurations + commonProps := parseProperties(commonConfig) + routerProps := parseProperties(routerConfig) + + // Merge configurations, with router config taking precedence + mergedProps := make(map[string]string) + for k, v := range commonProps { + mergedProps[k] = v + } + for k, v := range routerProps { + mergedProps[k] = v + } + + // Default values + protocol := "http" + port := DruidRouterPort + + // Check TLS configuration + enableTlsPort := mergedProps["druid.enableTlsPort"] + enablePlaintextPort := mergedProps["druid.enablePlaintextPort"] + tlsPort := mergedProps["druid.tlsPort"] + plaintextPort := mergedProps["druid.plaintextPort"] + + // Determine protocol and port based on configuration + if enableTlsPort == "true" { + protocol = "https" + if tlsPort != "" { + port = tlsPort + } else { + // If TLS is enabled but no TLS port specified, check for druid.port + if druidPort := mergedProps["druid.port"]; druidPort != "" { + port = druidPort + } + } + } else if enablePlaintextPort != "false" { // Default is true if not explicitly disabled + protocol = "http" + if plaintextPort != "" { + port = plaintextPort + } else { + // Check for druid.port as fallback + if druidPort := mergedProps["druid.port"]; druidPort != "" { + port = druidPort + } + } + } + + return RouterConnectionInfo{ + Protocol: protocol, + Port: port, + } +} + +// GetRouterConfigFromCluster retrieves router configuration from Kubernetes ConfigMaps +func GetRouterConfigFromCluster(ctx context.Context, c client.Client, namespace, druidClusterName string) (RouterConnectionInfo, error) { + // Get common configuration + commonConfigMapName := fmt.Sprintf("%s-druid-common-config", druidClusterName) + commonConfigMap := &v1.ConfigMap{} + err := c.Get(ctx, types.NamespacedName{ + Namespace: namespace, + Name: commonConfigMapName, + }, commonConfigMap) + if err != nil { + return RouterConnectionInfo{}, fmt.Errorf("failed to get common config map %s: %w", commonConfigMapName, err) + } + + commonConfig := "" + if commonRuntimeProps, exists := commonConfigMap.Data["common.runtime.properties"]; exists { + commonConfig = commonRuntimeProps + } + + // Get router-specific configuration + routerConfigMapName := fmt.Sprintf("druid-%s-routers-config", druidClusterName) + routerConfigMap := &v1.ConfigMap{} + err = c.Get(ctx, types.NamespacedName{ + Namespace: namespace, + Name: routerConfigMapName, + }, routerConfigMap) + + routerConfig := "" + if err == nil { + // Router config map exists, get runtime properties + if routerRuntimeProps, exists := routerConfigMap.Data["runtime.properties"]; exists { + routerConfig = routerRuntimeProps + } + } + // If router config map doesn't exist, that's fine - we'll just use common config + + return InferRouterConnectionFromConfig(commonConfig, routerConfig), nil +} + // MakePath constructs the appropriate path for the specified Druid API. // Parameters: // @@ -123,9 +246,10 @@ func MakePath(baseURL, componentType, apiType string, additionalPaths ...string) return u.String() } -// GetRouterSvcUrl retrieves the URL of the Druid router service. +// GetRouterSvcUrl retrieves the URL of the Druid router service using configuration inference. // Parameters: // +// ctx: The context object. // namespace: The namespace of the Druid cluster. // druidClusterName: The name of the Druid cluster. // c: The Kubernetes client. @@ -133,7 +257,13 @@ func MakePath(baseURL, componentType, apiType string, additionalPaths ...string) // Returns: // // string: The URL of the Druid router service. -func GetRouterSvcUrl(namespace, druidClusterName string, c client.Client) (string, error) { +func GetRouterSvcUrl(ctx context.Context, namespace, druidClusterName string, c client.Client) (string, error) { + // Get router configuration from cluster ConfigMaps + routerInfo, err := GetRouterConfigFromCluster(ctx, c, namespace, druidClusterName) + if err != nil { + return "", fmt.Errorf("failed to infer router configuration: %w", err) + } + listOpts := []client.ListOption{ client.InNamespace(namespace), client.MatchingLabels(map[string]string{ @@ -142,7 +272,7 @@ func GetRouterSvcUrl(namespace, druidClusterName string, c client.Client) (strin }), } svcList := &v1.ServiceList{} - if err := c.List(context.Background(), svcList, listOpts...); err != nil { + if err := c.List(ctx, svcList, listOpts...); err != nil { return "", err } var svcName string @@ -155,7 +285,7 @@ func GetRouterSvcUrl(namespace, druidClusterName string, c client.Client) (strin return "", errors.New("router svc discovery fail") } - newName := "http://" + svcName + "." + namespace + ":" + DruidRouterPort + newName := routerInfo.Protocol + "://" + svcName + "." + namespace + ":" + routerInfo.Port return newName, nil } diff --git a/pkg/druidapi/druidapi_test.go b/pkg/druidapi/druidapi_test.go index 10a686c1..79f5d384 100644 --- a/pkg/druidapi/druidapi_test.go +++ b/pkg/druidapi/druidapi_test.go @@ -2,12 +2,13 @@ package druidapi import ( "context" + "testing" + internalhttp "github.com/datainfrahq/druid-operator/pkg/http" "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "testing" ) func TestGetAuthCreds(t *testing.T) { @@ -164,3 +165,170 @@ func TestMakePath(t *testing.T) { }) } } + +func TestParseProperties(t *testing.T) { + tests := []struct { + name string + input string + expected map[string]string + }{ + { + name: "empty string", + input: "", + expected: map[string]string{}, + }, + { + name: "simple properties", + input: `druid.host=localhost +druid.port=8082`, + expected: map[string]string{ + "druid.host": "localhost", + "druid.port": "8082", + }, + }, + { + name: "properties with comments and empty lines", + input: `# This is a comment +druid.host=localhost + +druid.port=8082 +# Another comment +druid.service=druid/broker`, + expected: map[string]string{ + "druid.host": "localhost", + "druid.port": "8082", + "druid.service": "druid/broker", + }, + }, + { + name: "properties with spaces", + input: `druid.host = localhost +druid.port= 8082 +druid.service =druid/broker`, + expected: map[string]string{ + "druid.host": "localhost", + "druid.port": "8082", + "druid.service": "druid/broker", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := parseProperties(tt.input) + + if len(result) != len(tt.expected) { + t.Errorf("expected %d properties, got %d", len(tt.expected), len(result)) + } + + for key, expectedValue := range tt.expected { + if actualValue, exists := result[key]; !exists { + t.Errorf("expected key %s not found", key) + } else if actualValue != expectedValue { + t.Errorf("for key %s, expected %s, got %s", key, expectedValue, actualValue) + } + } + }) + } +} + +func TestInferRouterConnectionFromConfig(t *testing.T) { + tests := []struct { + name string + commonConfig string + routerConfig string + expected RouterConnectionInfo + }{ + { + name: "default configuration", + commonConfig: "", + routerConfig: "", + expected: RouterConnectionInfo{ + Protocol: "http", + Port: "8088", + }, + }, + { + name: "HTTP with custom port in common config", + commonConfig: `druid.enablePlaintextPort=true +druid.plaintextPort=9090`, + routerConfig: "", + expected: RouterConnectionInfo{ + Protocol: "http", + Port: "9090", + }, + }, + { + name: "HTTPS enabled in common config", + commonConfig: `druid.enableTlsPort=true +druid.tlsPort=8443`, + routerConfig: "", + expected: RouterConnectionInfo{ + Protocol: "https", + Port: "8443", + }, + }, + { + name: "HTTPS enabled without specific TLS port", + commonConfig: `druid.enableTlsPort=true +druid.port=8283`, + routerConfig: "", + expected: RouterConnectionInfo{ + Protocol: "https", + Port: "8283", + }, + }, + { + name: "router config overrides common config", + commonConfig: `druid.enableTlsPort=false +druid.plaintextPort=8088`, + routerConfig: `druid.enableTlsPort=true +druid.tlsPort=8443`, + expected: RouterConnectionInfo{ + Protocol: "https", + Port: "8443", + }, + }, + { + name: "plaintext disabled, TLS enabled", + commonConfig: `druid.enablePlaintextPort=false +druid.enableTlsPort=true +druid.tlsPort=8443`, + routerConfig: "", + expected: RouterConnectionInfo{ + Protocol: "https", + Port: "8443", + }, + }, + { + name: "complex configuration with router override", + commonConfig: `# Common configuration +druid.host=localhost +druid.enableTlsPort=false +druid.plaintextPort=8088`, + routerConfig: `# Router specific configuration +druid.service=druid/router +druid.enableTlsPort=true +druid.tlsPort=8443 +druid.router.http.numConnections=50`, + expected: RouterConnectionInfo{ + Protocol: "https", + Port: "8443", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := InferRouterConnectionFromConfig(tt.commonConfig, tt.routerConfig) + + if result.Protocol != tt.expected.Protocol { + t.Errorf("expected protocol %s, got %s", tt.expected.Protocol, result.Protocol) + } + + if result.Port != tt.expected.Port { + t.Errorf("expected port %s, got %s", tt.expected.Port, result.Port) + } + }) + } +}