From 1130aeb14e0fb496703f31551366597c6d292e3e Mon Sep 17 00:00:00 2001 From: Roman Perekhod <2403905@gmail.com> Date: Tue, 24 Mar 2026 16:30:01 +0100 Subject: [PATCH 1/3] feat: Separate the storage-users and graph to handle vault storage --- .../grpc/services/gateway/storageprovider.go | 17 ++++++++++++ .../services/gateway/storageprovidercache.go | 8 +++--- .../storageprovider/storageprovider.go | 3 ++- .../handlers/apps/sharing/shares/spaces.go | 16 ++---------- pkg/events/postprocessing.go | 2 ++ pkg/storage/registry/spaces/spaces.go | 26 ++++++++++++++++--- .../utils/decomposedfs/decomposedfs.go | 14 +++++++++- .../utils/decomposedfs/options/options.go | 9 +++++-- pkg/utils/utils.go | 3 +++ 9 files changed, 74 insertions(+), 24 deletions(-) diff --git a/internal/grpc/services/gateway/storageprovider.go b/internal/grpc/services/gateway/storageprovider.go index 4a8d15df4f..c45f26e306 100644 --- a/internal/grpc/services/gateway/storageprovider.go +++ b/internal/grpc/services/gateway/storageprovider.go @@ -143,6 +143,12 @@ func (s *svc) CreateHome(ctx context.Context, req *provider.CreateHomeRequest) ( }, } } + + // pass storage_id to the storage provider to handle vault storage id + if storageId := utils.ReadPlainFromOpaque(req.GetOpaque(), "storage_id"); storageId != "" { + createReq.Opaque = utils.AppendPlainToOpaque(createReq.Opaque, "storage_id", storageId) + } + res, err := s.CreateStorageSpace(ctx, createReq) if err != nil { return &provider.CreateHomeResponse{ @@ -170,6 +176,11 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag } } + if storageId := utils.ReadPlainFromOpaque(req.GetOpaque(), "storage_id"); storageId != "" { + space.Root = &provider.ResourceId{StorageId: storageId} + req.Opaque = utils.AppendPlainToOpaque(req.Opaque, "storage_id", storageId) + } + srClient, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint) if err != nil { return &provider.CreateStorageSpaceResponse{ @@ -247,6 +258,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp filters["path"] = path } + hasFileIdFilter := false for _, f := range req.Filters { switch f.Type { case provider.ListStorageSpacesRequest_Filter_TYPE_ID: @@ -255,6 +267,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp continue } filters["storage_id"], filters["space_id"], filters["opaque_id"] = sid, spid, oid + hasFileIdFilter = true case provider.ListStorageSpacesRequest_Filter_TYPE_OWNER: filters["owner_idp"] = f.GetOwner().GetIdp() filters["owner_id"] = f.GetOwner().GetOpaqueId() @@ -270,6 +283,10 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp } } + if !hasFileIdFilter && utils.ReadPlainFromOpaque(req.Opaque, "storage_id") != "" { + filters["storage_id"] = utils.ReadPlainFromOpaque(req.Opaque, "storage_id") + } + c, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint) if err != nil { return &provider.ListStorageSpacesResponse{ diff --git a/internal/grpc/services/gateway/storageprovidercache.go b/internal/grpc/services/gateway/storageprovidercache.go index ba76690e23..b412a43efd 100644 --- a/internal/grpc/services/gateway/storageprovidercache.go +++ b/internal/grpc/services/gateway/storageprovidercache.go @@ -24,8 +24,8 @@ import ( rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" ctxpkg "github.com/owncloud/reva/v2/pkg/ctx" - sdk "github.com/owncloud/reva/v2/pkg/sdk/common" "github.com/owncloud/reva/v2/pkg/storage/cache" + "github.com/owncloud/reva/v2/pkg/storagespace" "github.com/owncloud/reva/v2/pkg/utils" "github.com/pkg/errors" "google.golang.org/grpc" @@ -41,8 +41,10 @@ type cachedRegistryClient struct { } func (c *cachedRegistryClient) ListStorageProviders(ctx context.Context, in *registry.ListStorageProvidersRequest, opts ...grpc.CallOption) (*registry.ListStorageProvidersResponse, error) { - - spaceID := sdk.DecodeOpaqueMap(in.Opaque)["space_id"] + spaceID := utils.ReadPlainFromOpaque(in.GetOpaque(), "space_id") + if storageID := utils.ReadPlainFromOpaque(in.GetOpaque(), "storage_id"); storageID != "" { + spaceID = storagespace.FormatStorageID(storageID, spaceID) + } u, ok := ctxpkg.ContextGetUser(ctx) if !ok { diff --git a/internal/grpc/services/storageprovider/storageprovider.go b/internal/grpc/services/storageprovider/storageprovider.go index d790bf2c1d..2bf7025e9b 100644 --- a/internal/grpc/services/storageprovider/storageprovider.go +++ b/internal/grpc/services/storageprovider/storageprovider.go @@ -33,6 +33,7 @@ import ( rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/mitchellh/mapstructure" "github.com/owncloud/reva/v2/pkg/appctx" "github.com/owncloud/reva/v2/pkg/conversions" ctxpkg "github.com/owncloud/reva/v2/pkg/ctx" @@ -47,7 +48,6 @@ import ( "github.com/owncloud/reva/v2/pkg/storage/fs/registry" "github.com/owncloud/reva/v2/pkg/storagespace" "github.com/owncloud/reva/v2/pkg/utils" - "github.com/mitchellh/mapstructure" "github.com/pkg/errors" "github.com/rs/zerolog" "go.opentelemetry.io/otel/attribute" @@ -787,6 +787,7 @@ func (s *Service) Stat(ctx context.Context, req *provider.StatRequest) (*provide s.addMissingStorageProviderID(md.GetId(), nil) s.addMissingStorageProviderID(md.GetParentId(), nil) s.addMissingStorageProviderID(md.GetSpace().GetRoot(), nil) + s.addMissingStorageProviderID(md.GetSpace().GetRootInfo().GetId(), nil) return &provider.StatResponse{ Status: status.NewOK(ctx), diff --git a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/spaces.go b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/spaces.go index 41c92b936c..12093c6123 100644 --- a/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/spaces.go +++ b/internal/http/services/owncloud/ocs/handlers/apps/sharing/shares/spaces.go @@ -137,8 +137,7 @@ func (h *Handler) addSpaceMember(w http.ResponseWriter, r *http.Request, info *p response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider", err) return } - - providerClient, err := h.getStorageProviderClient(p) + providerClient, err := pool.GetStorageProviderServiceClient(p.Address) if err != nil { response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider client", err) return @@ -244,8 +243,7 @@ func (h *Handler) removeSpaceMember(w http.ResponseWriter, r *http.Request, spac if ref.ResourceId.OpaqueId == "" { ref.ResourceId.OpaqueId = ref.ResourceId.SpaceId } - - providerClient, err := h.getStorageProviderClient(prov) + providerClient, err := pool.GetStorageProviderServiceClient(prov.Address) if err != nil { response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider client", err) return @@ -290,16 +288,6 @@ func (h *Handler) removeSpaceMember(w http.ResponseWriter, r *http.Request, spac response.WriteOCSSuccess(w, r, nil) } -func (h *Handler) getStorageProviderClient(p *registry.ProviderInfo) (provider.ProviderAPIClient, error) { - c, err := pool.GetStorageProviderServiceClient(p.Address) - if err != nil { - err = errors.Wrap(err, "shares spaces: error getting a storage provider client") - return nil, err - } - - return c, nil -} - func (h *Handler) findProvider(ctx context.Context, ref *provider.Reference) (*registry.ProviderInfo, error) { c, err := pool.GetStorageRegistryClient(h.storageRegistryAddr) if err != nil { diff --git a/pkg/events/postprocessing.go b/pkg/events/postprocessing.go index f4268920a3..64318cb948 100644 --- a/pkg/events/postprocessing.go +++ b/pkg/events/postprocessing.go @@ -103,6 +103,7 @@ type PostprocessingStepFinished struct { UploadID string ExecutingUser *user.User Filename string + ResourceID *provider.ResourceId FinishedStep Postprocessingstep // name of the step Result interface{} // result information see VirusscanResult for example @@ -145,6 +146,7 @@ type VirusscanResult struct { type PostprocessingFinished struct { UploadID string Filename string + ResourceID *provider.ResourceId SpaceOwner *user.UserId ExecutingUser *user.User Result map[Postprocessingstep]interface{} // it is a map[step]Event diff --git a/pkg/storage/registry/spaces/spaces.go b/pkg/storage/registry/spaces/spaces.go index ac586e96d3..3ba2ef6600 100644 --- a/pkg/storage/registry/spaces/spaces.go +++ b/pkg/storage/registry/spaces/spaces.go @@ -34,6 +34,7 @@ import ( providerpb "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" registrypb "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1" typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + "github.com/mitchellh/mapstructure" "github.com/owncloud/reva/v2/pkg/appctx" ctxpkg "github.com/owncloud/reva/v2/pkg/ctx" "github.com/owncloud/reva/v2/pkg/errtypes" @@ -44,7 +45,6 @@ import ( pkgregistry "github.com/owncloud/reva/v2/pkg/storage/registry/registry" "github.com/owncloud/reva/v2/pkg/storagespace" "github.com/owncloud/reva/v2/pkg/utils" - "github.com/mitchellh/mapstructure" "google.golang.org/grpc" ) @@ -195,6 +195,18 @@ func (r *registry) GetProvider(ctx context.Context, space *providerpb.StorageSpa if space.SpaceType != "" && spaceType != space.SpaceType { continue } + + // Filter out vault spaces if no storageId is provided + if space.GetRoot().GetStorageId() != "" { + if space.GetRoot().GetStorageId() != provider.ProviderID { + continue + } + } else { + if strings.HasPrefix(sc.MountPoint, "/vault/") { + continue + } + } + if space.Owner != nil { user := ctxpkg.ContextMustGetUser(ctx) spacePath, err = sc.SpacePath(user, space) @@ -289,7 +301,7 @@ func (r *registry) ListProviders(ctx context.Context, filters map[string]string) // return all providers return r.findAllProviders(ctx, mask), nil default: - return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted, mask), nil + return r.findProvidersForFilter(ctx, r.buildFilters(filters), filters["storage_id"], unrestricted, mask), nil } } @@ -340,7 +352,7 @@ func (r *registry) buildFilters(filterMap map[string]string) []*providerpb.ListS return filters } -func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool, _ string) []*registrypb.ProviderInfo { +func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, storageId string, unrestricted bool, _ string) []*registrypb.ProviderInfo { var requestedSpaceType string for _, f := range filters { @@ -357,6 +369,10 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid // we have to ignore a space type filter with +grant or +mountpoint type because they can live on any provider if requestedSpaceType != "" && !strings.HasPrefix(requestedSpaceType, "+") { found := false + if storageId != "" && storageId != provider.ProviderID { + // skip mismatching storageproviders + continue + } for spaceType := range provider.Spaces { if spaceType == requestedSpaceType { found = true @@ -385,6 +401,10 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid if sc, ok = provider.Spaces[space.SpaceType]; !ok { continue } + // Filter out vault spaces if no storageId is provided + if storageId == "" && strings.HasPrefix(sc.MountPoint, "/vault/") { + continue + } spacePath, err = sc.SpacePath(currentUser, space) if err != nil { appctx.GetLogger(ctx).Error().Err(err).Interface("provider", provider).Interface("space", space).Msg("failed to execute template, continuing") diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 79dcc454a7..c4c4fd1e08 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -258,7 +258,7 @@ func New(o *options.Options, aspects aspects.Aspects, log *zerolog.Logger) (stor return nil, errors.New("need nats for async file processing") } - ch, err := events.Consume(fs.stream, "dcfs", _registeredEvents...) + ch, err := events.Consume(fs.stream, o.Events.ConsumerGroup, _registeredEvents...) if err != nil { return nil, err } @@ -285,6 +285,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { switch ev := event.Event.(type) { case events.PostprocessingFinished: sublog := log.With().Str("event", "PostprocessingFinished").Str("uploadid", ev.UploadID).Logger() + if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { + sublog.Debug().Msg("ignoring event for different storage") + continue + } session, err := fs.sessionStore.Get(ctx, ev.UploadID) if err != nil { sublog.Error().Err(err).Msg("Failed to get upload") @@ -450,6 +454,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { session.Cleanup(true, !ev.KeepUpload, !ev.KeepUpload, true) case events.RevertRevision: sublog := log.With().Str("event", "RevertRevision").Interface("nodeid", ev.ResourceID).Logger() + if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { + sublog.Debug().Msg("ignoring event for different storage") + continue + } n, err := fs.lu.NodeFromID(ctx, ev.ResourceID) if err != nil { sublog.Error().Err(err).Msg("Failed to get node") @@ -462,6 +470,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { } case events.PostprocessingStepFinished: sublog := log.With().Str("event", "PostprocessingStepFinished").Str("uploadid", ev.UploadID).Logger() + if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID { + sublog.Debug().Msg("ignoring event for different storage") + continue + } if ev.FinishedStep != events.PPStepAntivirus { // atm we are only interested in antivirus results continue diff --git a/pkg/storage/utils/decomposedfs/options/options.go b/pkg/storage/utils/decomposedfs/options/options.go index 5c76a383ea..210f206813 100644 --- a/pkg/storage/utils/decomposedfs/options/options.go +++ b/pkg/storage/utils/decomposedfs/options/options.go @@ -23,10 +23,10 @@ import ( "strings" "time" + "github.com/mitchellh/mapstructure" "github.com/owncloud/reva/v2/pkg/rgrpc/todo/pool" "github.com/owncloud/reva/v2/pkg/sharedconf" "github.com/owncloud/reva/v2/pkg/storage/cache" - "github.com/mitchellh/mapstructure" "github.com/pkg/errors" ) @@ -103,7 +103,8 @@ type AsyncPropagatorOptions struct { // EventOptions are the configurable options for events type EventOptions struct { - NumConsumers int `mapstructure:"numconsumers"` + NumConsumers int `mapstructure:"numconsumers"` + ConsumerGroup string `mapstructure:"consumer_group"` } // TokenOptions are the configurable option for tokens @@ -172,5 +173,9 @@ func New(m map[string]interface{}) (*Options, error) { o.UploadDirectory = filepath.Join(o.Root, "uploads") } + if o.Events.ConsumerGroup == "" { + o.Events.ConsumerGroup = "dcfs" + } + return o, nil } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index c103136874..c562636e8b 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -64,6 +64,9 @@ var ( // OCMStorageSpaceID is the space id used by the ocmreceived storageprovider OCMStorageSpaceID = "89f37a33-858b-45fa-8890-a1f2b27d90e1" + // VaultStorageProviderID is the storage id used by the vault storageprovider + VaultStorageProviderID = "1a01c2c4-4309-4483-a845-842fd56d8622" + // SpaceGrant is used to signal the storageprovider that the grant is on a space SpaceGrant struct{} ) From 4338c98c90aacd4d665ab4499ea14fe37e7d87f8 Mon Sep 17 00:00:00 2001 From: Roman Perekhod <2403905@gmail.com> Date: Tue, 24 Mar 2026 18:16:35 +0100 Subject: [PATCH 2/3] update ListStorageProviders cache --- internal/grpc/services/gateway/storageprovidercache.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/grpc/services/gateway/storageprovidercache.go b/internal/grpc/services/gateway/storageprovidercache.go index b412a43efd..855a7832cf 100644 --- a/internal/grpc/services/gateway/storageprovidercache.go +++ b/internal/grpc/services/gateway/storageprovidercache.go @@ -42,8 +42,13 @@ type cachedRegistryClient struct { func (c *cachedRegistryClient) ListStorageProviders(ctx context.Context, in *registry.ListStorageProvidersRequest, opts ...grpc.CallOption) (*registry.ListStorageProvidersResponse, error) { spaceID := utils.ReadPlainFromOpaque(in.GetOpaque(), "space_id") + resourceID := spaceID if storageID := utils.ReadPlainFromOpaque(in.GetOpaque(), "storage_id"); storageID != "" { - spaceID = storagespace.FormatStorageID(storageID, spaceID) + if spaceID != "" { + resourceID = storagespace.FormatStorageID(storageID, spaceID) + } else { + resourceID = storageID + } } u, ok := ctxpkg.ContextGetUser(ctx) @@ -51,7 +56,7 @@ func (c *cachedRegistryClient) ListStorageProviders(ctx context.Context, in *reg return nil, errors.New("user not found in context") } - key := c.cache.GetKey(u.GetId(), spaceID) + key := c.cache.GetKey(u.GetId(), resourceID) if key != "" { s := ®istry.ListStorageProvidersResponse{} if err := c.cache.PullFromCache(key, s); err == nil { From cc61754843205bbe7979307934d163a45171642c Mon Sep 17 00:00:00 2001 From: Roman Perekhod <2403905@gmail.com> Date: Tue, 24 Mar 2026 18:33:35 +0100 Subject: [PATCH 3/3] update findProvidersForFilter --- pkg/storage/registry/spaces/spaces.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/storage/registry/spaces/spaces.go b/pkg/storage/registry/spaces/spaces.go index 3ba2ef6600..d2b0cbc7fb 100644 --- a/pkg/storage/registry/spaces/spaces.go +++ b/pkg/storage/registry/spaces/spaces.go @@ -364,15 +364,14 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid currentUser := ctxpkg.ContextMustGetUser(ctx) providerInfos := []*registrypb.ProviderInfo{} for address, provider := range r.c.Providers { - + // skip mismatching storageproviders + if storageId != "" && storageId != provider.ProviderID { + continue + } // when a specific space type is requested we may skip this provider altogether if it is not configured for that type // we have to ignore a space type filter with +grant or +mountpoint type because they can live on any provider if requestedSpaceType != "" && !strings.HasPrefix(requestedSpaceType, "+") { found := false - if storageId != "" && storageId != provider.ProviderID { - // skip mismatching storageproviders - continue - } for spaceType := range provider.Spaces { if spaceType == requestedSpaceType { found = true