Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ var (

// Config contains the configuring for a cache
type Config struct {
Store string `mapstructure:"cache_store"`
Nodes []string `mapstructure:"cache_nodes"`
Database string `mapstructure:"cache_database"`
Table string `mapstructure:"cache_table"`
TTL time.Duration `mapstructure:"cache_ttl"`
Size int `mapstructure:"cache_size"`
DisablePersistence bool `mapstructure:"cache_disable_persistence"`
AuthUsername string `mapstructure:"cache_auth_username"`
AuthPassword string `mapstructure:"cache_auth_password"`
Store string `mapstructure:"cache_store"`
Nodes []string `mapstructure:"cache_nodes"`
Database string `mapstructure:"cache_database"`
Table string `mapstructure:"cache_table"`
TTL time.Duration `mapstructure:"cache_ttl"`
Size int `mapstructure:"cache_size"`
DisablePersistence bool `mapstructure:"cache_disable_persistence"`
AuthUsername string `mapstructure:"cache_auth_username"`
AuthPassword string `mapstructure:"cache_auth_password"`
TLSEnabled bool `mapstructure:"cache_tls_enabled"`
TLSInsecure bool `mapstructure:"cache_tls_insecure"`
TLSRootCACertificate string `mapstructure:"cache_tls_root_ca_certificate"`
}

// Cache handles key value operations on caches
Expand Down Expand Up @@ -243,5 +246,8 @@ func getStore(cfg Config) microstore.Store {
store.Size(cfg.Size),
store.DisablePersistence(cfg.DisablePersistence),
store.Authentication(cfg.AuthUsername, cfg.AuthPassword),
store.TLSEnabled(cfg.TLSEnabled),
store.TLSInsecure(cfg.TLSInsecure),
store.TLSRootCA(cfg.TLSRootCACertificate),
)
}
4 changes: 4 additions & 0 deletions pkg/storage/fs/posix/lookup/store_idcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ func NewStoreIDCache(c cache.Config) *StoreIDCache {
return &StoreIDCache{
cache: store.Create(
store.Store(c.Store),
store.TTL(c.TTL),
store.Size(c.Size),
microstore.Nodes(c.Nodes...),
microstore.Database(c.Database),
microstore.Table(c.Table),
store.DisablePersistence(c.DisablePersistence),
store.Authentication(c.AuthUsername, c.AuthPassword),
store.TLSEnabled(c.TLSEnabled),
store.TLSInsecure(c.TLSInsecure),
store.TLSRootCA(c.TLSRootCACertificate),
),
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/fs/posix/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s
}

tp, err := tree.New(lu, bs, um, trashbin, p, o, stream, store.Create(
// TODO use a NewStoreIDCache here?
store.Store(o.IDCache.Store),
store.TTL(o.IDCache.TTL),
store.Size(o.IDCache.Size),
Expand All @@ -139,6 +140,9 @@ func New(m map[string]interface{}, stream events.Stream, log *zerolog.Logger) (s
microstore.Table(o.IDCache.Table),
store.DisablePersistence(o.IDCache.DisablePersistence),
store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword),
store.TLSEnabled(o.IDCache.TLSEnabled),
store.TLSInsecure(o.IDCache.TLSInsecure),
store.TLSRootCA(o.IDCache.TLSRootCACertificate),
), log)
if err != nil {
return nil, err
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/pkg/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ func NewDefault(m map[string]interface{}, bs node.Blobstore, es events.Stream, l
microstore.Table(o.IDCache.Table),
store.DisablePersistence(o.IDCache.DisablePersistence),
store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword),
store.TLSEnabled(o.IDCache.TLSEnabled),
store.TLSInsecure(o.IDCache.TLSInsecure),
store.TLSRootCA(o.IDCache.TLSRootCACertificate),
), log)

aspects := aspects.Aspects{
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ func NewDefault(m map[string]interface{}, bs tree.Blobstore, es events.Stream, l
microstore.Table(o.IDCache.Table),
store.DisablePersistence(o.IDCache.DisablePersistence),
store.Authentication(o.IDCache.AuthUsername, o.IDCache.AuthPassword),
store.TLSEnabled(o.IDCache.TLSEnabled),
store.TLSInsecure(o.IDCache.TLSInsecure),
store.TLSRootCA(o.IDCache.TLSRootCACertificate),
), log)

permissionsSelector, err := pool.PermissionsSelector(o.PermissionsSVC, pool.WithTLSMode(o.PermTLSMode))
Expand Down
39 changes: 39 additions & 0 deletions pkg/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,42 @@ func Authentication(username, password string) store.Option {
o.Context = context.WithValue(o.Context, authenticationContextKey{}, []string{username, password})
}
}

type tlsEnabledContextKey struct{}

// TLSEnabled configures whether to use TLS or not. Only supported by the `natsjs` and `natsjskv` implementations.
func TLSEnabled(enabled bool) store.Option {
return func(o *store.Options) {
if o.Context == nil {
o.Context = context.Background()
}

o.Context = context.WithValue(o.Context, tlsEnabledContextKey{}, enabled)
}
}

type tlsInsecureContextKey struct{}

// TLSInsecure configures whether to skip TLS certificate verification. Only supported by the `natsjs` and `natsjskv` implementations.
func TLSInsecure(insecure bool) store.Option {
return func(o *store.Options) {
if o.Context == nil {
o.Context = context.Background()
}

o.Context = context.WithValue(o.Context, tlsInsecureContextKey{}, insecure)
}
}

type tlsRootCAContextKey struct{}

// TLSRootCA configures the root CA certificate to use for TLS verification. Only supported by the `natsjs` and `natsjskv` implementations.
func TLSRootCA(rootCA string) store.Option {
return func(o *store.Options) {
if o.Context == nil {
o.Context = context.Background()
}

o.Context = context.WithValue(o.Context, tlsRootCAContextKey{}, rootCA)
}
}
76 changes: 43 additions & 33 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package store

import (
"context"
"crypto/tls"
"strings"
"time"

Expand Down Expand Up @@ -64,6 +65,11 @@ func Create(opts ...microstore.Option) microstore.Store {
o(options)
}

// ensure we have a logger
if options.Logger == nil {
options.Logger = logger.DefaultLogger
}

storeType, _ := options.Context.Value(typeContextKey{}).(string)

switch storeType {
Expand Down Expand Up @@ -118,51 +124,55 @@ func Create(opts ...microstore.Option) microstore.Store {
}
return *ocMemStore
case TypeNatsJS:
ttl, _ := options.Context.Value(ttlContextKey{}).(time.Duration)
if mem, _ := options.Context.Value(disablePersistanceContextKey{}).(bool); mem {
opts = append(opts, natsjs.DefaultMemory())
}
// TODO nats needs a DefaultTTL option as it does not support per Write TTL ...
// FIXME nats has restrictions on the key, we cannot use slashes AFAICT
// host, port, clusterid
natsOptions := nats.GetDefaultOptions()
natsOptions.Name = "TODO" // we can pass in the service name to allow identifying the client, but that requires adding a custom context option
if auth, ok := options.Context.Value(authenticationContextKey{}).([]string); ok && len(auth) == 2 {
natsOptions.User = auth[0]
natsOptions.Password = auth[1]
}
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
return natsjs.NewStore(
append(opts,
natsjs.NatsOptions(natsOptions), // always pass in properly initialized default nats options
natsjs.DefaultTTL(ttl))...,
) // TODO test with OpenCloud nats
natsjs.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
)
case TypeNatsJSKV:
// NOTE: nats needs a DefaultTTL option as it does not support per Write TTL ...
ttl, _ := options.Context.Value(ttlContextKey{}).(time.Duration)
if mem, _ := options.Context.Value(disablePersistanceContextKey{}).(bool); mem {
opts = append(opts, natsjskv.DefaultMemory())
}

natsOptions := nats.GetDefaultOptions()
natsOptions.Name = "TODO" // we can pass in the service name to allow identifying the client, but that requires adding a custom context option
if auth, ok := options.Context.Value(authenticationContextKey{}).([]string); ok && len(auth) == 2 {
natsOptions.User = auth[0]
natsOptions.Password = auth[1]
}
opts, ttl, natsOptions := natsConfig(options.Logger, options.Context, opts)
return natsjskv.NewStore(
append(opts,
natsjskv.NatsOptions(natsOptions), // always pass in properly initialized default nats options
natsjskv.EncodeKeys(),
natsjskv.DefaultTTL(ttl))...,
natsjskv.EncodeKeys(), // nats has restrictions on the key, we cannot use slashes
natsjskv.DefaultTTL(ttl))..., // nats needs a DefaultTTL option as it does not support per Write TTL
)
case TypeMemory, "mem", "": // allow existing short form and use as default
return microstore.NewMemoryStore(opts...)
default:
// try to log an error
if options.Logger == nil {
options.Logger = logger.DefaultLogger
}
options.Logger.Logf(logger.ErrorLevel, "unknown store type: '%s', falling back to memory", storeType)
return microstore.NewMemoryStore(opts...)
}
}

func natsConfig(log logger.Logger, ctx context.Context, opts []microstore.Option) ([]microstore.Option, time.Duration, nats.Options) {

if mem, _ := ctx.Value(disablePersistanceContextKey{}).(bool); mem {
opts = append(opts, natsjs.DefaultMemory())
}

ttl, _ := ctx.Value(ttlContextKey{}).(time.Duration)

// preparing natsOptions before the switch to reuse the same code
natsOptions := nats.GetDefaultOptions()
natsOptions.Name = "TODO" // we can pass in the service name to allow identifying the client, but that requires adding a custom context option
if auth, ok := ctx.Value(authenticationContextKey{}).([]string); ok && len(auth) == 2 {
natsOptions.User = auth[0]
natsOptions.Password = auth[1]
}
if enableTLS, ok := ctx.Value(tlsEnabledContextKey{}).(bool); ok && enableTLS {
if rootca, ok := ctx.Value(tlsRootCAContextKey{}).(string); ok && rootca != "" {
// when root ca is configured use it. an insecure flag is ignored.
if err := nats.RootCAs(rootca)(&natsOptions); err != nil {
log.Log(logger.ErrorLevel, err)
}
} else {
// enable tls with insecure option
insecure := ctx.Value(tlsInsecureContextKey{}).(bool)
_ = nats.Secure(&tls.Config{MinVersion: tls.VersionTLS12, InsecureSkipVerify: insecure})(&natsOptions)
}
}

return opts, ttl, natsOptions
}