Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions apis/keda/v1alpha1/triggerauthentication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,14 @@ type PodIdentityProvider string
// PodIdentityProviderNone specifies the default state when there is no Identity Provider
// PodIdentityProvider<IDENTITY_PROVIDER> specifies other available Identity providers
const (
PodIdentityProviderNone PodIdentityProvider = "none"
PodIdentityProviderAzure PodIdentityProvider = "azure"
PodIdentityProviderAzureWorkload PodIdentityProvider = "azure-workload"
PodIdentityProviderGCP PodIdentityProvider = "gcp"
PodIdentityProviderSpiffe PodIdentityProvider = "spiffe"
PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks"
PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam"
PodIdentityProviderAzureServicePrincipal PodIdentityProvider = "azure-service-principal"
PodIdentityProviderNone PodIdentityProvider = "none"
PodIdentityProviderAzure PodIdentityProvider = "azure"
PodIdentityProviderAzureWorkload PodIdentityProvider = "azure-workload"
PodIdentityProviderGCP PodIdentityProvider = "gcp"
PodIdentityProviderSpiffe PodIdentityProvider = "spiffe"
PodIdentityProviderAwsEKS PodIdentityProvider = "aws-eks"
PodIdentityProviderAwsKiam PodIdentityProvider = "aws-kiam"
)

// PodIdentityAnnotationEKS specifies aws role arn for aws-eks Identity Provider
Expand All @@ -114,11 +115,17 @@ const (
// AuthPodIdentity allows users to select the platform native identity
// mechanism
type AuthPodIdentity struct {
Provider PodIdentityProvider `json:"provider"`
Provider PodIdentityProvider `json:"provider"`
// +optional
IdentityID string `json:"identityId"`
// +optional
TenantID string `json:"tenantId"`
ClientID string `json:"clientId"`
// +optional
Audience string `json:"audience"`
// +optional
TenantID string `json:"tenantId"`
// +optional
SecretTargetRef []AuthSecretTargetRef `json:"secretTargetRef,omitempty"`
}

// AuthSecretTargetRef is used to authenticate using a reference to a secret
Expand Down
21 changes: 21 additions & 0 deletions config/crd/bases/keda.sh_triggerauthentications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,32 @@ spec:
description: AuthPodIdentity allows users to select the platform native
identity mechanism
properties:
audience:
type: string
clientId:
type: string
identityId:
type: string
provider:
description: PodIdentityProvider contains the list of providers
type: string
secretTargetRef:
items:
description: AuthSecretTargetRef is used to authenticate using
a reference to a secret
properties:
key:
type: string
name:
type: string
parameter:
type: string
required:
- key
- name
- parameter
type: object
type: array
tenantId:
type: string
required:
Expand Down
6 changes: 3 additions & 3 deletions pkg/scalers/azure/azure_aad_workload_identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ type ADWorkloadIdentityCredential struct {
ctx context.Context
IdentityID string
// +optional
TenantID string
Resource string
aadToken AADToken
TenantID string
Resource string
aadToken AADToken
}

func NewADWorkloadIdentityCredential(ctx context.Context, identityID, resource string) *ADWorkloadIdentityCredential {
Expand Down
106 changes: 99 additions & 7 deletions pkg/scalers/azure/azure_eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@ package azure

import (
"context"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"errors"
"fmt"
"strings"

"github.com/Azure/azure-amqp-common-go/v3/aad"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/go-logr/logr"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

// EventHubInfo to keep event hub connection and resources
type EventHubInfo struct {
EventHubConnection string
EventHubConsumerGroup string
StorageConnection string
EventHubConnection string
EventHubConsumerGroup string
StorageConnection string
// +optional
StorageAccountName string
BlobStorageEndpoint string
Expand All @@ -29,18 +34,20 @@ type EventHubInfo struct {
ActiveDirectoryEndpoint string
EventHubResourceURL string
// +optional
CheckpointIdentityID string
CheckpointIdentityID string
// +optional
CheckpointTenantID string
PodIdentity kedav1alpha1.AuthPodIdentity
CheckpointTenantID string
PodIdentity kedav1alpha1.AuthPodIdentity
// +optional
Certificate string
}

const (
DefaultEventhubResourceURL = "https://eventhubs.azure.net/"
)

// GetEventHubClient returns eventhub client
func GetEventHubClient(ctx context.Context, info EventHubInfo) (*eventhub.Hub, error) {
func GetEventHubClient(logger logr.Logger, ctx context.Context, info EventHubInfo) (*eventhub.Hub, error) {
switch info.PodIdentity.Provider {
case "", kedav1alpha1.PodIdentityProviderNone:
// The user wants to use a connectionstring, not a pod identity
Expand All @@ -49,6 +56,50 @@ func GetEventHubClient(ctx context.Context, info EventHubInfo) (*eventhub.Hub, e
return nil, fmt.Errorf("failed to create hub client: %s", err)
}
return hub, nil
case kedav1alpha1.PodIdentityProviderAzureServicePrincipal:
env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix}
hubEnvOptions := eventhub.HubWithEnvironment(env)

envJWTProviderOption := aad.JWTProviderWithAzureEnvironment(&env)
resourceURLJWTProviderOption := aad.JWTProviderWithResourceURI(info.EventHubResourceURL)
oauthConfig, _ := adal.NewOAuthConfig(info.ActiveDirectoryEndpoint, info.PodIdentity.TenantID)
certificate, privateKey, err := LoadCertAndKeyFromSecret([]byte(info.Certificate))

if err != nil {
return nil, fmt.Errorf("unable to load certificate %v", err)
}
servicePrincipalToken, err := adal.NewServicePrincipalTokenFromCertificate(*oauthConfig, info.PodIdentity.ClientID, certificate, privateKey, info.EventHubResourceURL)

if err != nil {
return nil, fmt.Errorf("failed to get oauth token from certificate auth: %v", err)
}

aadFuncOption := aad.JWTProviderWithAADToken(servicePrincipalToken)
clientIDJWTProviderOption := func(config *aad.TokenProviderConfiguration) error {
config.TenantID = info.PodIdentity.TenantID
config.ClientID = info.PodIdentity.ClientID
config.Env = &env
return nil
}

provider, aadErr := aad.NewJWTProvider(envJWTProviderOption, resourceURLJWTProviderOption, clientIDJWTProviderOption, aadFuncOption)

if aadErr != nil {
return nil, fmt.Errorf("failed to get refresh oauth token from certificate auth: %v", err)
}

if aadErr == nil {
token, err := provider.GetToken(info.PodIdentity.Audience) // dummy change this
if err != nil {
logger.Error(err, "unable to get eventhub client")
}
logger.Info("Token retrieved from AAD: %s", token)

return eventhub.NewHub(info.Namespace, info.EventHubName, provider, hubEnvOptions)
}

return nil, aadErr

case kedav1alpha1.PodIdentityProviderAzure:
env := azure.Environment{ActiveDirectoryEndpoint: info.ActiveDirectoryEndpoint, ServiceBusEndpointSuffix: info.ServiceBusEndpointSuffix}
hubEnvOptions := eventhub.HubWithEnvironment(env)
Expand Down Expand Up @@ -80,6 +131,47 @@ func GetEventHubClient(ctx context.Context, info EventHubInfo) (*eventhub.Hub, e
return nil, fmt.Errorf("event hub does not support pod identity %v", info.PodIdentity)
}

// LoadCertAndKeyFromSecret takes the encoded PEM and tries to extract the Certificate and Private Key
func LoadCertAndKeyFromSecret(pemBytes []byte) (*x509.Certificate, *rsa.PrivateKey, error) {
var certificate *x509.Certificate
var privateKey *rsa.PrivateKey
for len(pemBytes) > 0 {
data, block := pem.Decode(pemBytes)
if block == nil {
return nil, nil, fmt.Errorf("no certificate or private key block found")
}
if data.Type == "CERTIFICATE" {
var err error
certificate, err = x509.ParseCertificate(data.Bytes)
if err != nil {
return nil, nil, fmt.Errorf("error while decoding certificate %v", err)
}
if privateKey != nil {
break
}
}
if data.Type == "PRIVATE KEY" {
var err error
anypk, err := x509.ParsePKCS8PrivateKey(data.Bytes)
if err == nil {
switch key := anypk.(type) {
case *rsa.PrivateKey:
privateKey = key
default:
return nil, nil, fmt.Errorf("found unknown private key type in pkcs#8 wrapping")
}
}
if err != nil {
return nil, nil, fmt.Errorf("malformed private key detected %v", err)
}

}
pemBytes = block
}

return certificate, privateKey, nil
}

// ParseAzureEventHubConnectionString parses Event Hub connection string into (namespace, name)
// Connection string should be in following format:
// Endpoint=sb://eventhub-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=secretKey123;EntityPath=eventhub-name
Expand Down
10 changes: 8 additions & 2 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewAzureEventHubScaler(ctx context.Context, config *ScalerConfig) (Scaler,
return nil, fmt.Errorf("unable to get eventhub metadata: %s", err)
}

hub, err := azure.GetEventHubClient(ctx, parsedMetadata.eventHubInfo)
hub, err := azure.GetEventHubClient(logger, ctx, parsedMetadata.eventHubInfo)
if err != nil {
return nil, fmt.Errorf("unable to get eventhub client: %s", err)
}
Expand Down Expand Up @@ -202,14 +202,20 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *Scaler
if len(meta.eventHubInfo.EventHubConnection) == 0 {
return fmt.Errorf("no event hub connection string given")
}
case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload:
case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload, v1alpha1.PodIdentityProviderAzureServicePrincipal:
meta.eventHubInfo.StorageAccountName = ""
if val, ok := config.TriggerMetadata["storageAccountName"]; ok {
meta.eventHubInfo.StorageAccountName = val
} else {
logger.Info("no 'storageAccountName' provided to enable identity based authentication to Blob Storage. Attempting to use connection string instead")
}

if val, ok := config.AuthParams["secretCertRef"]; ok {
meta.eventHubInfo.Certificate = val
} else if config.PodIdentity.Provider == v1alpha1.PodIdentityProviderAzureServicePrincipal {
return fmt.Errorf("no certificate secret reference provided for certificate based authentication")
}

if val, ok := config.TriggerMetadata["checkpointIdentityID"]; ok {
meta.eventHubInfo.CheckpointIdentityID = val
} else {
Expand Down
12 changes: 7 additions & 5 deletions pkg/scaling/resolver/scale_resolvers.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func resolveAuthRef(ctx context.Context, client client.Client, logger logr.Logge
} else {
if triggerAuthSpec.PodIdentity != nil {
podIdentity = *triggerAuthSpec.PodIdentity

// If PodIdentity is of type Azure ServicePrincipal, then the certificate needs to be passed as a kubernetes secret ref
if triggerAuthSpec.PodIdentity.SecretTargetRef != nil {
for _, e := range triggerAuthSpec.PodIdentity.SecretTargetRef {
result[e.Parameter] = resolveAuthSecret(ctx, client, logger, e.Name, triggerNamespace, e.Key)
}
}
}
if triggerAuthSpec.Env != nil {
for _, e := range triggerAuthSpec.Env {
Expand Down Expand Up @@ -465,11 +472,6 @@ func resolveAuthSecret(ctx context.Context, client client.Client, logger logr.Lo
return ""
}
result := secret.Data[key]

if result == nil {
return ""
}

return string(result)
}

Expand Down