From 1177dbcac5b62154bef450fe0beee0adde0ab51d Mon Sep 17 00:00:00 2001 From: Aashish93-stack Date: Mon, 14 Nov 2022 12:59:25 -0800 Subject: [PATCH 1/2] Adding support for Certificate based Authentication for eventhub-scaler --- .../v1alpha1/triggerauthentication_types.go | 4 +- .../azure/azure_aad_workload_identity.go | 6 +- pkg/scalers/azure/azure_eventhub.go | 58 +++++++++++++++---- pkg/scalers/azure_eventhub_scaler.go | 29 ++++++---- 4 files changed, 70 insertions(+), 27 deletions(-) diff --git a/apis/keda/v1alpha1/triggerauthentication_types.go b/apis/keda/v1alpha1/triggerauthentication_types.go index 21f49c7f1b9..3b998fb3bdf 100644 --- a/apis/keda/v1alpha1/triggerauthentication_types.go +++ b/apis/keda/v1alpha1/triggerauthentication_types.go @@ -114,11 +114,11 @@ const ( // AuthPodIdentity allows users to select the platform native identity // mechanism type AuthPodIdentity struct { - Provider PodIdentityProvider `json:"provider"` + Provider PodIdentityProvider `json:"provider"` // +optional IdentityID string `json:"identityId"` // +optional - TenantID string `json:"tenantId"` + TenantID string `json:"tenantId"` } // AuthSecretTargetRef is used to authenticate using a reference to a secret diff --git a/pkg/scalers/azure/azure_aad_workload_identity.go b/pkg/scalers/azure/azure_aad_workload_identity.go index 05d2015a17b..9251d8e6650 100644 --- a/pkg/scalers/azure/azure_aad_workload_identity.go +++ b/pkg/scalers/azure/azure_aad_workload_identity.go @@ -138,9 +138,9 @@ type ADWorkloadIdentityCredential struct { ctx context.Context IdentityID string // +optional - TenantID string - Resource string - aadToken AADToken + TenantID string + Resource string + aadToken AADToken } func NewADWorkloadIdentityCredential(ctx context.Context, identityID, resource string) *ADWorkloadIdentityCredential { diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 6ec0b4b818a..2059107a20b 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -9,15 +9,16 @@ import ( "github.com/Azure/azure-amqp-common-go/v3/aad" eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/Azure/go-autorest/autorest/azure" + "github.com/go-logr/logr" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) // EventHubInfo to keep event hub connection and resources type EventHubInfo struct { - EventHubConnection string - EventHubConsumerGroup string - StorageConnection string + EventHubConnection string + EventHubConsumerGroup string + StorageConnection string // +optional StorageAccountName string BlobStorageEndpoint string @@ -29,10 +30,13 @@ type EventHubInfo struct { ActiveDirectoryEndpoint string EventHubResourceURL string // +optional - CheckpointIdentityID string + CheckpointIdentityID string // +optional - CheckpointTenantID string - PodIdentity kedav1alpha1.AuthPodIdentity + CheckpointTenantID string + PodIdentity kedav1alpha1.AuthPodIdentity + // +optional + Certificate string + PrivateKey string } const ( @@ -40,15 +44,47 @@ const ( ) // GetEventHubClient returns eventhub client -func GetEventHubClient(ctx context.Context, info EventHubInfo) (*eventhub.Hub, error) { +func GetEventHubClient(logger logr.Logger, ctx context.Context, info EventHubInfo) (*eventhub.Hub, error) { switch info.PodIdentity.Provider { case "", kedav1alpha1.PodIdentityProviderNone: // The user wants to use a connectionstring, not a pod identity - hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) - if err != nil { - return nil, fmt.Errorf("failed to create hub client: %s", err) + if info.Certificate != "" { + env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} + hubEnvOptions := eventhub.HubWithEnvironment(env) + // Since there is no connectionstring, then user wants to use AAD Pod identity + // Internally, the JWTProvider will use Managed Service Identity to authenticate if no Service Principal info supplied + envJWTProviderOption := aad.JWTProviderWithAzureEnvironment(&env) + resourceURLJWTProviderOption := aad.JWTProviderWithResourceURI(info.EventHubResourceURL) + clientIDJWTProviderOption := func(config *aad.TokenProviderConfiguration) error { + config.TenantID = info.PodIdentity.TenantID + config.ClientSecret = info.Certificate + config.CertificatePassword = "" + config.ClientID = "c373f16c-ab39-497c-b14e-04cd793ba425" // TODO pass via stuff + config.Env = &env + return nil + } + provider, aadErr := aad.NewJWTProvider(envJWTProviderOption, resourceURLJWTProviderOption, clientIDJWTProviderOption) + + if aadErr == nil { + token, err := provider.GetToken("12345-654783") // dummy change this + if err != nil { + logger.Error(err, "unable to get eventhub client") + } + logger.Info("Token retrieved from AAD: %s", token) + + return eventhub.NewHub(info.Namespace, info.EventHubName, provider, hubEnvOptions) + } + + return nil, aadErr + + } else { + hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) + if err != nil { + return nil, fmt.Errorf("failed to create hub client: %s", err) + } + return hub, nil } - return hub, nil + case kedav1alpha1.PodIdentityProviderAzure: env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} hubEnvOptions := eventhub.HubWithEnvironment(env) diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 041b67f6b6a..28aff177231 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -77,7 +77,7 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler, return nil, fmt.Errorf("unable to get eventhub metadata: %s", err) } - hub, err := azure.GetEventHubClient(ctx, parsedMetadata.eventHubInfo) + hub, err := azure.GetEventHubClient(logger, ctx, parsedMetadata.eventHubInfo) if err != nil { return nil, fmt.Errorf("unable to get eventhub client: %s", err) } @@ -189,18 +189,25 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *Scaler switch config.PodIdentity.Provider { case "", v1alpha1.PodIdentityProviderNone: - if len(meta.eventHubInfo.StorageConnection) == 0 { - return fmt.Errorf("no storage connection string given") - } + if certValue, ok := config.AuthParams["secretCertRef"]; ok { + meta.eventHubInfo.Certificate = certValue + if privateKeyVal, ok := config.AuthParams["secretKeyRef"]; ok { + meta.eventHubInfo.PrivateKey = privateKeyVal + } + } else { + if len(meta.eventHubInfo.StorageConnection) == 0 { + return fmt.Errorf("no storage connection string given") + } - if config.AuthParams["connection"] != "" { - meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] - } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] - } + if config.AuthParams["connection"] != "" { + meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] + } else if config.TriggerMetadata["connectionFromEnv"] != "" { + meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] + } - if len(meta.eventHubInfo.EventHubConnection) == 0 { - return fmt.Errorf("no event hub connection string given") + if len(meta.eventHubInfo.EventHubConnection) == 0 { + return fmt.Errorf("no event hub connection string given") + } } case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload: meta.eventHubInfo.StorageAccountName = "" From cf75ac3661aaeb08ce093c168da2d05eb060a82b Mon Sep 17 00:00:00 2001 From: Aashish93-stack Date: Thu, 17 Nov 2022 17:35:32 -0800 Subject: [PATCH 2/2] Added Certificate based authentication using Service Principals, in order to enable System Assigned Identites --- .../v1alpha1/triggerauthentication_types.go | 21 ++-- .../bases/keda.sh_triggerauthentications.yaml | 21 ++++ pkg/scalers/azure/azure_eventhub.go | 116 +++++++++++++----- pkg/scalers/azure_eventhub_scaler.go | 35 +++--- pkg/scaling/resolver/scale_resolvers.go | 12 +- 5 files changed, 145 insertions(+), 60 deletions(-) diff --git a/apis/keda/v1alpha1/triggerauthentication_types.go b/apis/keda/v1alpha1/triggerauthentication_types.go index 3b998fb3bdf..873da557caf 100644 --- a/apis/keda/v1alpha1/triggerauthentication_types.go +++ b/apis/keda/v1alpha1/triggerauthentication_types.go @@ -95,13 +95,14 @@ type PodIdentityProvider string // PodIdentityProviderNone specifies the default state when there is no Identity Provider // PodIdentityProvider specifies other available Identity providers const ( - PodIdentityProviderNone PodIdentityProvider = "none" - PodIdentityProviderAzure PodIdentityProvider = "azure" - PodIdentityProviderAzureWorkload PodIdentityProvider = "azure-workload" - PodIdentityProviderGCP PodIdentityProvider = "gcp" - PodIdentityProviderSpiffe PodIdentityProvider = "spiffe" - PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks" - PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam" + PodIdentityProviderAzureServicePrincipal PodIdentityProvider = "azure-service-principal" + PodIdentityProviderNone PodIdentityProvider = "none" + PodIdentityProviderAzure PodIdentityProvider = "azure" + PodIdentityProviderAzureWorkload PodIdentityProvider = "azure-workload" + PodIdentityProviderGCP PodIdentityProvider = "gcp" + PodIdentityProviderSpiffe PodIdentityProvider = "spiffe" + PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks" + PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam" ) // PodIdentityAnnotationEKS specifies aws role arn for aws-eks Identity Provider @@ -118,7 +119,13 @@ type AuthPodIdentity struct { // +optional IdentityID string `json:"identityId"` // +optional + ClientID string `json:"clientId"` + // +optional + Audience string `json:"audience"` + // +optional TenantID string `json:"tenantId"` + // +optional + SecretTargetRef []AuthSecretTargetRef `json:"secretTargetRef,omitempty"` } // AuthSecretTargetRef is used to authenticate using a reference to a secret diff --git a/config/crd/bases/keda.sh_triggerauthentications.yaml b/config/crd/bases/keda.sh_triggerauthentications.yaml index 90aaca081a7..9ce93763220 100644 --- a/config/crd/bases/keda.sh_triggerauthentications.yaml +++ b/config/crd/bases/keda.sh_triggerauthentications.yaml @@ -184,11 +184,32 @@ spec: description: AuthPodIdentity allows users to select the platform native identity mechanism properties: + audience: + type: string + clientId: + type: string identityId: type: string provider: description: PodIdentityProvider contains the list of providers type: string + secretTargetRef: + items: + description: AuthSecretTargetRef is used to authenticate using + a reference to a secret + properties: + key: + type: string + name: + type: string + parameter: + type: string + required: + - key + - name + - parameter + type: object + type: array tenantId: type: string required: diff --git a/pkg/scalers/azure/azure_eventhub.go b/pkg/scalers/azure/azure_eventhub.go index 2059107a20b..c59464f1827 100644 --- a/pkg/scalers/azure/azure_eventhub.go +++ b/pkg/scalers/azure/azure_eventhub.go @@ -2,12 +2,16 @@ package azure import ( "context" + "crypto/rsa" + "crypto/x509" + "encoding/pem" "errors" "fmt" "strings" "github.com/Azure/azure-amqp-common-go/v3/aad" eventhub "github.com/Azure/azure-event-hubs-go/v3" + "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" "github.com/go-logr/logr" @@ -36,7 +40,6 @@ type EventHubInfo struct { PodIdentity kedav1alpha1.AuthPodIdentity // +optional Certificate string - PrivateKey string } const ( @@ -48,43 +51,55 @@ func GetEventHubClient(logger logr.Logger, ctx context.Context, info EventHubInf switch info.PodIdentity.Provider { case "", kedav1alpha1.PodIdentityProviderNone: // The user wants to use a connectionstring, not a pod identity - if info.Certificate != "" { - env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} - hubEnvOptions := eventhub.HubWithEnvironment(env) - // Since there is no connectionstring, then user wants to use AAD Pod identity - // Internally, the JWTProvider will use Managed Service Identity to authenticate if no Service Principal info supplied - envJWTProviderOption := aad.JWTProviderWithAzureEnvironment(&env) - resourceURLJWTProviderOption := aad.JWTProviderWithResourceURI(info.EventHubResourceURL) - clientIDJWTProviderOption := func(config *aad.TokenProviderConfiguration) error { - config.TenantID = info.PodIdentity.TenantID - config.ClientSecret = info.Certificate - config.CertificatePassword = "" - config.ClientID = "c373f16c-ab39-497c-b14e-04cd793ba425" // TODO pass via stuff - config.Env = &env - return nil - } - provider, aadErr := aad.NewJWTProvider(envJWTProviderOption, resourceURLJWTProviderOption, clientIDJWTProviderOption) + hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) + if err != nil { + return nil, fmt.Errorf("failed to create hub client: %s", err) + } + return hub, nil + case kedav1alpha1.PodIdentityProviderAzureServicePrincipal: + env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} + hubEnvOptions := eventhub.HubWithEnvironment(env) - if aadErr == nil { - token, err := provider.GetToken("12345-654783") // dummy change this - if err != nil { - logger.Error(err, "unable to get eventhub client") - } - logger.Info("Token retrieved from AAD: %s", token) + envJWTProviderOption := aad.JWTProviderWithAzureEnvironment(&env) + resourceURLJWTProviderOption := aad.JWTProviderWithResourceURI(info.EventHubResourceURL) + oauthConfig, _ := adal.NewOAuthConfig(info.ActiveDirectoryEndpoint, info.PodIdentity.TenantID) + certificate, privateKey, err := LoadCertAndKeyFromSecret([]byte(info.Certificate)) - return eventhub.NewHub(info.Namespace, info.EventHubName, provider, hubEnvOptions) - } + if err != nil { + return nil, fmt.Errorf("unable to load certificate %v", err) + } + servicePrincipalToken, err := adal.NewServicePrincipalTokenFromCertificate(*oauthConfig, info.PodIdentity.ClientID, certificate, privateKey, info.EventHubResourceURL) + + if err != nil { + return nil, fmt.Errorf("failed to get oauth token from certificate auth: %v", err) + } + + aadFuncOption := aad.JWTProviderWithAADToken(servicePrincipalToken) + clientIDJWTProviderOption := func(config *aad.TokenProviderConfiguration) error { + config.TenantID = info.PodIdentity.TenantID + config.ClientID = info.PodIdentity.ClientID + config.Env = &env + return nil + } - return nil, aadErr + provider, aadErr := aad.NewJWTProvider(envJWTProviderOption, resourceURLJWTProviderOption, clientIDJWTProviderOption, aadFuncOption) + + if aadErr != nil { + return nil, fmt.Errorf("failed to get refresh oauth token from certificate auth: %v", err) + } - } else { - hub, err := eventhub.NewHubFromConnectionString(info.EventHubConnection) + if aadErr == nil { + token, err := provider.GetToken(info.PodIdentity.Audience) // dummy change this if err != nil { - return nil, fmt.Errorf("failed to create hub client: %s", err) + logger.Error(err, "unable to get eventhub client") } - return hub, nil + logger.Info("Token retrieved from AAD: %s", token) + + return eventhub.NewHub(info.Namespace, info.EventHubName, provider, hubEnvOptions) } + return nil, aadErr + case kedav1alpha1.PodIdentityProviderAzure: env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix} hubEnvOptions := eventhub.HubWithEnvironment(env) @@ -116,6 +131,47 @@ func GetEventHubClient(logger logr.Logger, ctx context.Context, info EventHubInf return nil, fmt.Errorf("event hub does not support pod identity %v", info.PodIdentity) } +// LoadCertAndKeyFromSecret takes the encoded PEM and tries to extract the Certificate and Private Key +func LoadCertAndKeyFromSecret(pemBytes []byte) (*x509.Certificate, *rsa.PrivateKey, error) { + var certificate *x509.Certificate + var privateKey *rsa.PrivateKey + for len(pemBytes) > 0 { + data, block := pem.Decode(pemBytes) + if block == nil { + return nil, nil, fmt.Errorf("no certificate or private key block found") + } + if data.Type == "CERTIFICATE" { + var err error + certificate, err = x509.ParseCertificate(data.Bytes) + if err != nil { + return nil, nil, fmt.Errorf("error while decoding certificate %v", err) + } + if privateKey != nil { + break + } + } + if data.Type == "PRIVATE KEY" { + var err error + anypk, err := x509.ParsePKCS8PrivateKey(data.Bytes) + if err == nil { + switch key := anypk.(type) { + case *rsa.PrivateKey: + privateKey = key + default: + return nil, nil, fmt.Errorf("found unknown private key type in pkcs#8 wrapping") + } + } + if err != nil { + return nil, nil, fmt.Errorf("malformed private key detected %v", err) + } + + } + pemBytes = block + } + + return certificate, privateKey, nil +} + // ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name) // Connection string should be in following format: // Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 28aff177231..00985e126f3 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -189,27 +189,20 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *Scaler switch config.PodIdentity.Provider { case "", v1alpha1.PodIdentityProviderNone: - if certValue, ok := config.AuthParams["secretCertRef"]; ok { - meta.eventHubInfo.Certificate = certValue - if privateKeyVal, ok := config.AuthParams["secretKeyRef"]; ok { - meta.eventHubInfo.PrivateKey = privateKeyVal - } - } else { - if len(meta.eventHubInfo.StorageConnection) == 0 { - return fmt.Errorf("no storage connection string given") - } + if len(meta.eventHubInfo.StorageConnection) == 0 { + return fmt.Errorf("no storage connection string given") + } - if config.AuthParams["connection"] != "" { - meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] - } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] - } + if config.AuthParams["connection"] != "" { + meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] + } else if config.TriggerMetadata["connectionFromEnv"] != "" { + meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] + } - if len(meta.eventHubInfo.EventHubConnection) == 0 { - return fmt.Errorf("no event hub connection string given") - } + if len(meta.eventHubInfo.EventHubConnection) == 0 { + return fmt.Errorf("no event hub connection string given") } - case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload: + case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload, v1alpha1.PodIdentityProviderAzureServicePrincipal: meta.eventHubInfo.StorageAccountName = "" if val, ok := config.TriggerMetadata["storageAccountName"]; ok { meta.eventHubInfo.StorageAccountName = val @@ -217,6 +210,12 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *Scaler logger.Info("no 'storageAccountName' provided to enable identity based authentication to Blob Storage. Attempting to use connection string instead") } + if val, ok := config.AuthParams["secretCertRef"]; ok { + meta.eventHubInfo.Certificate = val + } else if config.PodIdentity.Provider == v1alpha1.PodIdentityProviderAzureServicePrincipal { + return fmt.Errorf("no certificate secret reference provided for certificate based authentication") + } + if val, ok := config.TriggerMetadata["checkpointIdentityID"]; ok { meta.eventHubInfo.CheckpointIdentityID = val } else { diff --git a/pkg/scaling/resolver/scale_resolvers.go b/pkg/scaling/resolver/scale_resolvers.go index 1b47227cad1..f2cef216d07 100644 --- a/pkg/scaling/resolver/scale_resolvers.go +++ b/pkg/scaling/resolver/scale_resolvers.go @@ -169,6 +169,13 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge } else { if triggerAuthSpec.PodIdentity != nil { podIdentity = *triggerAuthSpec.PodIdentity + + // If PodIdentity is of type Azure ServicePrincipal, then the certificate needs to be passed as a kubernetes secret ref + if triggerAuthSpec.PodIdentity.SecretTargetRef != nil { + for _, e := range triggerAuthSpec.PodIdentity.SecretTargetRef { + result[e.Parameter] = resolveAuthSecret(ctx, client, logger, e.Name, triggerNamespace, e.Key) + } + } } if triggerAuthSpec.Env != nil { for _, e := range triggerAuthSpec.Env { @@ -465,11 +472,6 @@ func resolveAuthSecret(ctx context.Context, client client.Client, logger logr.Lo return "" } result := secret.Data[key] - - if result == nil { - return "" - } - return string(result) }