Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion controllers/druid/dynamic_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func updateDruidDynamicConfigs(

dynamicConfig := nodeConfig.DynamicConfig.Raw

svcName, err := druidapi.GetRouterSvcUrl(druid.Namespace, druid.Name, client)
svcName, err := druidapi.GetRouterSvcUrl(ctx, druid.Namespace, druid.Name, client)
if err != nil {
emitEvent.EmitEventGeneric(
druid,
Expand Down
4 changes: 2 additions & 2 deletions controllers/ingestion/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng
return err
}

svcName, err := druidapi.GetRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName, r.Client)
svcName, err := druidapi.GetRouterSvcUrl(ctx, di.Namespace, di.Spec.DruidClusterName, r.Client)
if err != nil {
return err
}
Expand All @@ -73,7 +73,7 @@ func (r *DruidIngestionReconciler) do(ctx context.Context, di *v1alpha1.DruidIng
} else {
if controllerutil.ContainsFinalizer(di, DruidIngestionControllerFinalizer) {
// our finalizer is present, so lets handle any external dependency
svcName, err := druidapi.GetRouterSvcUrl(di.Namespace, di.Spec.DruidClusterName, r.Client)
svcName, err := druidapi.GetRouterSvcUrl(ctx, di.Namespace, di.Spec.DruidClusterName, r.Client)
if err != nil {
return err
}
Expand Down
138 changes: 134 additions & 4 deletions pkg/druidapi/druidapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/url"
"path"
"strings"

internalhttp "github.com/datainfrahq/druid-operator/pkg/http"
v1 "k8s.io/api/core/v1"
Expand All @@ -19,6 +20,12 @@ const (
OperatorPassword = "OperatorPassword"
)

// RouterConnectionInfo contains the inferred connection details for a Druid router
type RouterConnectionInfo struct {
Protocol string
Port string
}

type AuthType string

const (
Expand Down Expand Up @@ -94,6 +101,122 @@ func GetAuthCreds(
return internalhttp.BasicAuth{}, nil
}

// parseProperties parses Java properties format string into a map
func parseProperties(props string) map[string]string {
result := make(map[string]string)
lines := strings.Split(props, "\n")

for _, line := range lines {
line = strings.TrimSpace(line)
// Skip empty lines and comments
if line == "" || strings.HasPrefix(line, "#") {
continue
}

// Split on first '=' character
parts := strings.SplitN(line, "=", 2)
if len(parts) == 2 {
key := strings.TrimSpace(parts[0])
value := strings.TrimSpace(parts[1])
result[key] = value
}
}

return result
}

// InferRouterConnectionFromConfig analyzes Druid configuration to determine router connection settings
func InferRouterConnectionFromConfig(commonConfig, routerConfig string) RouterConnectionInfo {
// Parse both configurations
commonProps := parseProperties(commonConfig)
routerProps := parseProperties(routerConfig)

// Merge configurations, with router config taking precedence
mergedProps := make(map[string]string)
for k, v := range commonProps {
mergedProps[k] = v
}
for k, v := range routerProps {
mergedProps[k] = v
}

// Default values
protocol := "http"
port := DruidRouterPort

// Check TLS configuration
enableTlsPort := mergedProps["druid.enableTlsPort"]
enablePlaintextPort := mergedProps["druid.enablePlaintextPort"]
tlsPort := mergedProps["druid.tlsPort"]
plaintextPort := mergedProps["druid.plaintextPort"]

// Determine protocol and port based on configuration
if enableTlsPort == "true" {
protocol = "https"
if tlsPort != "" {
port = tlsPort
} else {
// If TLS is enabled but no TLS port specified, check for druid.port
if druidPort := mergedProps["druid.port"]; druidPort != "" {
port = druidPort
}
}
} else if enablePlaintextPort != "false" { // Default is true if not explicitly disabled
protocol = "http"
if plaintextPort != "" {
port = plaintextPort
} else {
// Check for druid.port as fallback
if druidPort := mergedProps["druid.port"]; druidPort != "" {
port = druidPort
}
}
}

return RouterConnectionInfo{
Protocol: protocol,
Port: port,
}
}

// GetRouterConfigFromCluster retrieves router configuration from Kubernetes ConfigMaps
func GetRouterConfigFromCluster(ctx context.Context, c client.Client, namespace, druidClusterName string) (RouterConnectionInfo, error) {
// Get common configuration
commonConfigMapName := fmt.Sprintf("%s-druid-common-config", druidClusterName)
commonConfigMap := &v1.ConfigMap{}
err := c.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: commonConfigMapName,
}, commonConfigMap)
if err != nil {
return RouterConnectionInfo{}, fmt.Errorf("failed to get common config map %s: %w", commonConfigMapName, err)
}

commonConfig := ""
if commonRuntimeProps, exists := commonConfigMap.Data["common.runtime.properties"]; exists {
commonConfig = commonRuntimeProps
}

// Get router-specific configuration
routerConfigMapName := fmt.Sprintf("druid-%s-routers-config", druidClusterName)
routerConfigMap := &v1.ConfigMap{}
err = c.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: routerConfigMapName,
}, routerConfigMap)

routerConfig := ""
if err == nil {
// Router config map exists, get runtime properties
if routerRuntimeProps, exists := routerConfigMap.Data["runtime.properties"]; exists {
routerConfig = routerRuntimeProps
}
}
// If router config map doesn't exist, that's fine - we'll just use common config

return InferRouterConnectionFromConfig(commonConfig, routerConfig), nil
}

// MakePath constructs the appropriate path for the specified Druid API.
// Parameters:
//
Expand Down Expand Up @@ -123,17 +246,24 @@ func MakePath(baseURL, componentType, apiType string, additionalPaths ...string)
return u.String()
}

// GetRouterSvcUrl retrieves the URL of the Druid router service.
// GetRouterSvcUrl retrieves the URL of the Druid router service using configuration inference.
// Parameters:
//
// ctx: The context object.
// namespace: The namespace of the Druid cluster.
// druidClusterName: The name of the Druid cluster.
// c: The Kubernetes client.
//
// Returns:
//
// string: The URL of the Druid router service.
func GetRouterSvcUrl(namespace, druidClusterName string, c client.Client) (string, error) {
func GetRouterSvcUrl(ctx context.Context, namespace, druidClusterName string, c client.Client) (string, error) {
// Get router configuration from cluster ConfigMaps
routerInfo, err := GetRouterConfigFromCluster(ctx, c, namespace, druidClusterName)
if err != nil {
return "", fmt.Errorf("failed to infer router configuration: %w", err)
}

listOpts := []client.ListOption{
client.InNamespace(namespace),
client.MatchingLabels(map[string]string{
Expand All @@ -142,7 +272,7 @@ func GetRouterSvcUrl(namespace, druidClusterName string, c client.Client) (strin
}),
}
svcList := &v1.ServiceList{}
if err := c.List(context.Background(), svcList, listOpts...); err != nil {
if err := c.List(ctx, svcList, listOpts...); err != nil {
return "", err
}
var svcName string
Expand All @@ -155,7 +285,7 @@ func GetRouterSvcUrl(namespace, druidClusterName string, c client.Client) (strin
return "", errors.New("router svc discovery fail")
}

newName := "http://" + svcName + "." + namespace + ":" + DruidRouterPort
newName := routerInfo.Protocol + "://" + svcName + "." + namespace + ":" + routerInfo.Port

return newName, nil
}
Loading