diff --git a/pkg/storage/cache/cache.go b/pkg/storage/cache/cache.go index 5b412076bb2..10371447c99 100644 --- a/pkg/storage/cache/cache.go +++ b/pkg/storage/cache/cache.go @@ -44,15 +44,18 @@ var ( // Config contains the configuring for a cache type Config struct { - Store string `mapstructure:"cache_store"` - Nodes []string `mapstructure:"cache_nodes"` - Database string `mapstructure:"cache_database"` - Table string `mapstructure:"cache_table"` - TTL time.Duration `mapstructure:"cache_ttl"` - Size int `mapstructure:"cache_size"` - DisablePersistence bool `mapstructure:"cache_disable_persistence"` - AuthUsername string `mapstructure:"cache_auth_username"` - AuthPassword string `mapstructure:"cache_auth_password"` + Store string `mapstructure:"cache_store"` + Nodes []string `mapstructure:"cache_nodes"` + Database string `mapstructure:"cache_database"` + Table string `mapstructure:"cache_table"` + TTL time.Duration `mapstructure:"cache_ttl"` + Size int `mapstructure:"cache_size"` + DisablePersistence bool `mapstructure:"cache_disable_persistence"` + AuthUsername string `mapstructure:"cache_auth_username"` + AuthPassword string `mapstructure:"cache_auth_password"` + TLSEnabled bool `mapstructure:"cache_tls_enabled"` + TLSInsecure bool `mapstructure:"cache_tls_insecure"` + TLSRootCACertificate string `mapstructure:"cache_tls_root_ca_certificate"` } // Cache handles key value operations on caches @@ -243,5 +246,8 @@ func getStore(cfg Config) microstore.Store { store.Size(cfg.Size), store.DisablePersistence(cfg.DisablePersistence), store.Authentication(cfg.AuthUsername, cfg.AuthPassword), + store.TLSEnabled(cfg.TLSEnabled), + store.TLSInsecure(cfg.TLSInsecure), + store.TLSRootCA(cfg.TLSRootCACertificate), ) } diff --git a/pkg/storage/fs/posix/lookup/store_idcache.go b/pkg/storage/fs/posix/lookup/store_idcache.go index 7534c9da522..67563e1de9d 100644 --- a/pkg/storage/fs/posix/lookup/store_idcache.go +++ b/pkg/storage/fs/posix/lookup/store_idcache.go @@ -38,12 +38,16 @@ func NewStoreIDCache(c cache.Config) *StoreIDCache { return &StoreIDCache{ cache: store.Create( store.Store(c.Store), + store.TTL(c.TTL), store.Size(c.Size), microstore.Nodes(c.Nodes...), microstore.Database(c.Database), microstore.Table(c.Table), store.DisablePersistence(c.DisablePersistence), store.Authentication(c.AuthUsername, c.AuthPassword), + store.TLSEnabled(c.TLSEnabled), + store.TLSInsecure(c.TLSInsecure), + store.TLSRootCA(c.TLSRootCACertificate), ), } } diff --git a/pkg/storage/fs/posix/posix.go b/pkg/storage/fs/posix/posix.go index 26a6cad0cfa..ec95cd38419 100644 --- a/pkg/storage/fs/posix/posix.go +++ b/pkg/storage/fs/posix/posix.go @@ -131,6 +131,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s } tp, err := tree.New(lu, bs, um, trashbin, p, o, stream, store.Create( + // TODO use a NewStoreIDCache here? store.Store(o.IDCache.Store), store.TTL(o.IDCache.TTL), store.Size(o.IDCache.Size), @@ -139,6 +140,9 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s microstore.Table(o.IDCache.Table), store.DisablePersistence(o.IDCache.DisablePersistence), store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword), + store.TLSEnabled(o.IDCache.TLSEnabled), + store.TLSInsecure(o.IDCache.TLSInsecure), + store.TLSRootCA(o.IDCache.TLSRootCACertificate), ), log) if err != nil { return nil, err diff --git a/pkg/storage/pkg/decomposedfs/decomposedfs.go b/pkg/storage/pkg/decomposedfs/decomposedfs.go index f799b4db443..aa6a210ee75 100644 --- a/pkg/storage/pkg/decomposedfs/decomposedfs.go +++ b/pkg/storage/pkg/decomposedfs/decomposedfs.go @@ -169,6 +169,9 @@ func NewDefault(m map[string]interface{}, bs node.Blobstore, es events.Stream, l microstore.Table(o.IDCache.Table), store.DisablePersistence(o.IDCache.DisablePersistence), store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword), + store.TLSEnabled(o.IDCache.TLSEnabled), + store.TLSInsecure(o.IDCache.TLSInsecure), + store.TLSRootCA(o.IDCache.TLSRootCACertificate), ), log) aspects := aspects.Aspects{ diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 70ca922d8ee..12111c988ca 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -160,6 +160,9 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream, l microstore.Table(o.IDCache.Table), store.DisablePersistence(o.IDCache.DisablePersistence), store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword), + store.TLSEnabled(o.IDCache.TLSEnabled), + store.TLSInsecure(o.IDCache.TLSInsecure), + store.TLSRootCA(o.IDCache.TLSRootCACertificate), ), log) permissionsSelector, err := pool.PermissionsSelector(o.PermissionsSVC, pool.WithTLSMode(o.PermTLSMode)) diff --git a/pkg/store/options.go b/pkg/store/options.go index 75ba99c6bbe..e1080441c93 100644 --- a/pkg/store/options.go +++ b/pkg/store/options.go @@ -103,3 +103,42 @@ func Authentication(username, password string) store.Option { o.Context = context.WithValue(o.Context, authenticationContextKey{}, []string{username, password}) } } + +type tlsEnabledContextKey struct{} + +// TLSEnabled configures whether to use TLS or not. Only supported by the `natsjs` and `natsjskv` implementations. +func TLSEnabled(enabled bool) store.Option { + return func(o *store.Options) { + if o.Context == nil { + o.Context = context.Background() + } + + o.Context = context.WithValue(o.Context, tlsEnabledContextKey{}, enabled) + } +} + +type tlsInsecureContextKey struct{} + +// TLSInsecure configures whether to skip TLS certificate verification. Only supported by the `natsjs` and `natsjskv` implementations. +func TLSInsecure(insecure bool) store.Option { + return func(o *store.Options) { + if o.Context == nil { + o.Context = context.Background() + } + + o.Context = context.WithValue(o.Context, tlsInsecureContextKey{}, insecure) + } +} + +type tlsRootCAContextKey struct{} + +// TLSRootCA configures the root CA certificate to use for TLS verification. Only supported by the `natsjs` and `natsjskv` implementations. +func TLSRootCA(rootCA string) store.Option { + return func(o *store.Options) { + if o.Context == nil { + o.Context = context.Background() + } + + o.Context = context.WithValue(o.Context, tlsRootCAContextKey{}, rootCA) + } +} diff --git a/pkg/store/store.go b/pkg/store/store.go index ebd2936a2ed..a493e6fa5ef 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -20,6 +20,7 @@ package store import ( "context" + "crypto/tls" "strings" "time" @@ -64,6 +65,11 @@ func Create(opts ...microstore.Option) microstore.Store { o(options) } + // ensure we have a logger + if options.Logger == nil { + options.Logger = logger.DefaultLogger + } + storeType, _ := options.Context.Value(typeContextKey{}).(string) switch storeType { @@ -118,51 +124,55 @@ func Create(opts ...microstore.Option) microstore.Store { } return *ocMemStore case TypeNatsJS: - ttl, _ := options.Context.Value(ttlContextKey{}).(time.Duration) - if mem, _ := options.Context.Value(disablePersistanceContextKey{}).(bool); mem { - opts = append(opts, natsjs.DefaultMemory()) - } - // TODO nats needs a DefaultTTL option as it does not support per Write TTL ... - // FIXME nats has restrictions on the key, we cannot use slashes AFAICT - // host, port, clusterid - natsOptions := nats.GetDefaultOptions() - natsOptions.Name = "TODO" // we can pass in the service name to allow identifying the client, but that requires adding a custom context option - if auth, ok := options.Context.Value(authenticationContextKey{}).([]string); ok && len(auth) == 2 { - natsOptions.User = auth[0] - natsOptions.Password = auth[1] - } + opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts) return natsjs.NewStore( append(opts, natsjs.NatsOptions(natsOptions), // always pass in properly initialized default nats options - natsjs.DefaultTTL(ttl))..., - ) // TODO test with OpenCloud nats + natsjs.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL + ) case TypeNatsJSKV: - // NOTE: nats needs a DefaultTTL option as it does not support per Write TTL ... - ttl, _ := options.Context.Value(ttlContextKey{}).(time.Duration) - if mem, _ := options.Context.Value(disablePersistanceContextKey{}).(bool); mem { - opts = append(opts, natsjskv.DefaultMemory()) - } - - natsOptions := nats.GetDefaultOptions() - natsOptions.Name = "TODO" // we can pass in the service name to allow identifying the client, but that requires adding a custom context option - if auth, ok := options.Context.Value(authenticationContextKey{}).([]string); ok && len(auth) == 2 { - natsOptions.User = auth[0] - natsOptions.Password = auth[1] - } + opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts) return natsjskv.NewStore( append(opts, natsjskv.NatsOptions(natsOptions), // always pass in properly initialized default nats options - natsjskv.EncodeKeys(), - natsjskv.DefaultTTL(ttl))..., + natsjskv.EncodeKeys(), // nats has restrictions on the key, we cannot use slashes + natsjskv.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL ) case TypeMemory, "mem", "": // allow existing short form and use as default return microstore.NewMemoryStore(opts...) default: - // try to log an error - if options.Logger == nil { - options.Logger = logger.DefaultLogger - } options.Logger.Logf(logger.ErrorLevel, "unknown store type: '%s', falling back to memory", storeType) return microstore.NewMemoryStore(opts...) } } + +func natsConfig(log logger.Logger, ctx context.Context, opts []microstore.Option) ([]microstore.Option, time.Duration, nats.Options) { + + if mem, _ := ctx.Value(disablePersistanceContextKey{}).(bool); mem { + opts = append(opts, natsjs.DefaultMemory()) + } + + ttl, _ := ctx.Value(ttlContextKey{}).(time.Duration) + + // preparing natsOptions before the switch to reuse the same code + natsOptions := nats.GetDefaultOptions() + natsOptions.Name = "TODO" // we can pass in the service name to allow identifying the client, but that requires adding a custom context option + if auth, ok := ctx.Value(authenticationContextKey{}).([]string); ok && len(auth) == 2 { + natsOptions.User = auth[0] + natsOptions.Password = auth[1] + } + if enableTLS, ok := ctx.Value(tlsEnabledContextKey{}).(bool); ok && enableTLS { + if rootca, ok := ctx.Value(tlsRootCAContextKey{}).(string); ok && rootca != "" { + // when root ca is configured use it. an insecure flag is ignored. + if err := nats.RootCAs(rootca)(&natsOptions); err != nil { + log.Log(logger.ErrorLevel, err) + } + } else { + // enable tls with insecure option + insecure := ctx.Value(tlsInsecureContextKey{}).(bool) + _ = nats.Secure(&tls.Config{MinVersion: tls.VersionTLS12, InsecureSkipVerify: insecure})(&natsOptions) + } + } + + return opts, ttl, natsOptions +}