Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions internal/grpc/services/gateway/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ func (s *svc) CreateHome(ctx context.Context, req *provider.CreateHomeRequest) (
},
}
}

// pass storage_id to the storage provider to handle vault storage id
if storageId := utils.ReadPlainFromOpaque(req.GetOpaque(), "storage_id"); storageId != "" {
createReq.Opaque = utils.AppendPlainToOpaque(createReq.Opaque, "storage_id", storageId)
}

res, err := s.CreateStorageSpace(ctx, createReq)
if err != nil {
return &provider.CreateHomeResponse{
Expand Down Expand Up @@ -170,6 +176,11 @@ func (s *svc) CreateStorageSpace(ctx context.Context, req *provider.CreateStorag
}
}

if storageId := utils.ReadPlainFromOpaque(req.GetOpaque(), "storage_id"); storageId != "" {
space.Root = &provider.ResourceId{StorageId: storageId}
req.Opaque = utils.AppendPlainToOpaque(req.Opaque, "storage_id", storageId)
}

srClient, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return &provider.CreateStorageSpaceResponse{
Expand Down Expand Up @@ -247,6 +258,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp
filters["path"] = path
}

hasFileIdFilter := false
for _, f := range req.Filters {
switch f.Type {
case provider.ListStorageSpacesRequest_Filter_TYPE_ID:
Expand All @@ -255,6 +267,7 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp
continue
}
filters["storage_id"], filters["space_id"], filters["opaque_id"] = sid, spid, oid
hasFileIdFilter = true
case provider.ListStorageSpacesRequest_Filter_TYPE_OWNER:
filters["owner_idp"] = f.GetOwner().GetIdp()
filters["owner_id"] = f.GetOwner().GetOpaqueId()
Expand All @@ -270,6 +283,10 @@ func (s *svc) ListStorageSpaces(ctx context.Context, req *provider.ListStorageSp
}
}

if !hasFileIdFilter && utils.ReadPlainFromOpaque(req.Opaque, "storage_id") != "" {
filters["storage_id"] = utils.ReadPlainFromOpaque(req.Opaque, "storage_id")
}

c, err := s.getStorageRegistryClient(ctx, s.c.StorageRegistryEndpoint)
if err != nil {
return &provider.ListStorageSpacesResponse{
Expand Down
15 changes: 11 additions & 4 deletions internal/grpc/services/gateway/storageprovidercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
registry "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
ctxpkg "github.com/owncloud/reva/v2/pkg/ctx"
sdk "github.com/owncloud/reva/v2/pkg/sdk/common"
"github.com/owncloud/reva/v2/pkg/storage/cache"
"github.com/owncloud/reva/v2/pkg/storagespace"
"github.com/owncloud/reva/v2/pkg/utils"
"github.com/pkg/errors"
"google.golang.org/grpc"
Expand All @@ -41,15 +41,22 @@ type cachedRegistryClient struct {
}

func (c *cachedRegistryClient) ListStorageProviders(ctx context.Context, in *registry.ListStorageProvidersRequest, opts ...grpc.CallOption) (*registry.ListStorageProvidersResponse, error) {

spaceID := sdk.DecodeOpaqueMap(in.Opaque)["space_id"]
spaceID := utils.ReadPlainFromOpaque(in.GetOpaque(), "space_id")
resourceID := spaceID
if storageID := utils.ReadPlainFromOpaque(in.GetOpaque(), "storage_id"); storageID != "" {
if spaceID != "" {
resourceID = storagespace.FormatStorageID(storageID, spaceID)
} else {
resourceID = storageID
}
}

u, ok := ctxpkg.ContextGetUser(ctx)
if !ok {
return nil, errors.New("user not found in context")
}

key := c.cache.GetKey(u.GetId(), spaceID)
key := c.cache.GetKey(u.GetId(), resourceID)
if key != "" {
s := &registry.ListStorageProvidersResponse{}
if err := c.cache.PullFromCache(key, s); err == nil {
Expand Down
3 changes: 2 additions & 1 deletion internal/grpc/services/storageprovider/storageprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1"
provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/mitchellh/mapstructure"
"github.com/owncloud/reva/v2/pkg/appctx"
"github.com/owncloud/reva/v2/pkg/conversions"
ctxpkg "github.com/owncloud/reva/v2/pkg/ctx"
Expand All @@ -47,7 +48,6 @@ import (
"github.com/owncloud/reva/v2/pkg/storage/fs/registry"
"github.com/owncloud/reva/v2/pkg/storagespace"
"github.com/owncloud/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -787,6 +787,7 @@ func (s *Service) Stat(ctx context.Context, req *provider.StatRequest) (*provide
s.addMissingStorageProviderID(md.GetId(), nil)
s.addMissingStorageProviderID(md.GetParentId(), nil)
s.addMissingStorageProviderID(md.GetSpace().GetRoot(), nil)
s.addMissingStorageProviderID(md.GetSpace().GetRootInfo().GetId(), nil)

return &provider.StatResponse{
Status: status.NewOK(ctx),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ func (h *Handler) addSpaceMember(w http.ResponseWriter, r *http.Request, info *p
response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider", err)
return
}

providerClient, err := h.getStorageProviderClient(p)
providerClient, err := pool.GetStorageProviderServiceClient(p.Address)
if err != nil {
response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider client", err)
return
Expand Down Expand Up @@ -244,8 +243,7 @@ func (h *Handler) removeSpaceMember(w http.ResponseWriter, r *http.Request, spac
if ref.ResourceId.OpaqueId == "" {
ref.ResourceId.OpaqueId = ref.ResourceId.SpaceId
}

providerClient, err := h.getStorageProviderClient(prov)
providerClient, err := pool.GetStorageProviderServiceClient(prov.Address)
if err != nil {
response.WriteOCSError(w, r, response.MetaNotFound.StatusCode, "error getting storage provider client", err)
return
Expand Down Expand Up @@ -290,16 +288,6 @@ func (h *Handler) removeSpaceMember(w http.ResponseWriter, r *http.Request, spac
response.WriteOCSSuccess(w, r, nil)
}

func (h *Handler) getStorageProviderClient(p *registry.ProviderInfo) (provider.ProviderAPIClient, error) {
c, err := pool.GetStorageProviderServiceClient(p.Address)
if err != nil {
err = errors.Wrap(err, "shares spaces: error getting a storage provider client")
return nil, err
}

return c, nil
}

func (h *Handler) findProvider(ctx context.Context, ref *provider.Reference) (*registry.ProviderInfo, error) {
c, err := pool.GetStorageRegistryClient(h.storageRegistryAddr)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/events/postprocessing.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type PostprocessingStepFinished struct {
UploadID string
ExecutingUser *user.User
Filename string
ResourceID *provider.ResourceId

FinishedStep Postprocessingstep // name of the step
Result interface{} // result information see VirusscanResult for example
Expand Down Expand Up @@ -145,6 +146,7 @@ type VirusscanResult struct {
type PostprocessingFinished struct {
UploadID string
Filename string
ResourceID *provider.ResourceId
SpaceOwner *user.UserId
ExecutingUser *user.User
Result map[Postprocessingstep]interface{} // it is a map[step]Event
Expand Down
27 changes: 23 additions & 4 deletions pkg/storage/registry/spaces/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
providerpb "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
registrypb "github.com/cs3org/go-cs3apis/cs3/storage/registry/v1beta1"
typesv1beta1 "github.com/cs3org/go-cs3apis/cs3/types/v1beta1"
"github.com/mitchellh/mapstructure"
"github.com/owncloud/reva/v2/pkg/appctx"
ctxpkg "github.com/owncloud/reva/v2/pkg/ctx"
"github.com/owncloud/reva/v2/pkg/errtypes"
Expand All @@ -44,7 +45,6 @@ import (
pkgregistry "github.com/owncloud/reva/v2/pkg/storage/registry/registry"
"github.com/owncloud/reva/v2/pkg/storagespace"
"github.com/owncloud/reva/v2/pkg/utils"
"github.com/mitchellh/mapstructure"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -195,6 +195,18 @@ func (r *registry) GetProvider(ctx context.Context, space *providerpb.StorageSpa
if space.SpaceType != "" && spaceType != space.SpaceType {
continue
}

// Filter out vault spaces if no storageId is provided
if space.GetRoot().GetStorageId() != "" {
if space.GetRoot().GetStorageId() != provider.ProviderID {
continue
}
} else {
if strings.HasPrefix(sc.MountPoint, "/vault/") {
continue
}
}
Comment on lines +199 to +208
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This vault-space filter checks strings.HasPrefix(sc.MountPoint, "/vault/"), but MountPoint is documented as potentially being a regex (and it may also be configured as /vault without the trailing slash). In those cases this filter won't behave as intended. Consider filtering based on the resolved spacePath (or introducing an explicit config flag for vault spaces) instead of using a prefix check on the mountpoint pattern.

Copilot uses AI. Check for mistakes.

if space.Owner != nil {
user := ctxpkg.ContextMustGetUser(ctx)
spacePath, err = sc.SpacePath(user, space)
Expand Down Expand Up @@ -289,7 +301,7 @@ func (r *registry) ListProviders(ctx context.Context, filters map[string]string)
// return all providers
return r.findAllProviders(ctx, mask), nil
default:
return r.findProvidersForFilter(ctx, r.buildFilters(filters), unrestricted, mask), nil
return r.findProvidersForFilter(ctx, r.buildFilters(filters), filters["storage_id"], unrestricted, mask), nil
}
}

Expand Down Expand Up @@ -340,7 +352,7 @@ func (r *registry) buildFilters(filterMap map[string]string) []*providerpb.ListS
return filters
}

func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, unrestricted bool, _ string) []*registrypb.ProviderInfo {
func (r *registry) findProvidersForFilter(ctx context.Context, filters []*providerpb.ListStorageSpacesRequest_Filter, storageId string, unrestricted bool, _ string) []*registrypb.ProviderInfo {

var requestedSpaceType string
for _, f := range filters {
Expand All @@ -352,7 +364,10 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid
currentUser := ctxpkg.ContextMustGetUser(ctx)
providerInfos := []*registrypb.ProviderInfo{}
for address, provider := range r.c.Providers {

// skip mismatching storageproviders
if storageId != "" && storageId != provider.ProviderID {
continue
}
// when a specific space type is requested we may skip this provider altogether if it is not configured for that type
// we have to ignore a space type filter with +grant or +mountpoint type because they can live on any provider
if requestedSpaceType != "" && !strings.HasPrefix(requestedSpaceType, "+") {
Expand Down Expand Up @@ -385,6 +400,10 @@ func (r *registry) findProvidersForFilter(ctx context.Context, filters []*provid
if sc, ok = provider.Spaces[space.SpaceType]; !ok {
continue
}
// Filter out vault spaces if no storageId is provided
if storageId == "" && strings.HasPrefix(sc.MountPoint, "/vault/") {
continue
}
Comment on lines +403 to +406
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: vault-space filtering uses strings.HasPrefix(sc.MountPoint, "/vault/") even though MountPoint can be a regex / may not have a trailing slash. This can lead to vault spaces leaking into results when storageId == "". Consider basing the check on the resolved spacePath (or an explicit vault flag) instead of the mountpoint pattern string.

Copilot uses AI. Check for mistakes.
spacePath, err = sc.SpacePath(currentUser, space)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Interface("provider", provider).Interface("space", space).Msg("failed to execute template, continuing")
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/utils/decomposedfs/decomposedfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func New(o *options.Options, aspects aspects.Aspects, log *zerolog.Logger) (stor
return nil, errors.New("need nats for async file processing")
}

ch, err := events.Consume(fs.stream, "dcfs", _registeredEvents...)
ch, err := events.Consume(fs.stream, o.Events.ConsumerGroup, _registeredEvents...)
if err != nil {
return nil, err
}
Expand All @@ -285,6 +285,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
switch ev := event.Event.(type) {
case events.PostprocessingFinished:
sublog := log.With().Str("event", "PostprocessingFinished").Str("uploadid", ev.UploadID).Logger()
if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID {
sublog.Debug().Msg("ignoring event for different storage")
continue
}
Comment on lines 287 to +291
session, err := fs.sessionStore.Get(ctx, ev.UploadID)
if err != nil {
sublog.Error().Err(err).Msg("Failed to get upload")
Expand Down Expand Up @@ -450,6 +454,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
session.Cleanup(true, !ev.KeepUpload, !ev.KeepUpload, true)
case events.RevertRevision:
sublog := log.With().Str("event", "RevertRevision").Interface("nodeid", ev.ResourceID).Logger()
if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID {
sublog.Debug().Msg("ignoring event for different storage")
continue
}
n, err := fs.lu.NodeFromID(ctx, ev.ResourceID)
if err != nil {
sublog.Error().Err(err).Msg("Failed to get node")
Expand All @@ -462,6 +470,10 @@ func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) {
}
case events.PostprocessingStepFinished:
sublog := log.With().Str("event", "PostprocessingStepFinished").Str("uploadid", ev.UploadID).Logger()
if ev.ResourceID != nil && ev.ResourceID.GetStorageId() != "" && ev.ResourceID.GetStorageId() != fs.o.MountID {
sublog.Debug().Msg("ignoring event for different storage")
continue
}
if ev.FinishedStep != events.PPStepAntivirus {
// atm we are only interested in antivirus results
continue
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/utils/decomposedfs/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"strings"
"time"

"github.com/mitchellh/mapstructure"
"github.com/owncloud/reva/v2/pkg/rgrpc/todo/pool"
"github.com/owncloud/reva/v2/pkg/sharedconf"
"github.com/owncloud/reva/v2/pkg/storage/cache"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -103,7 +103,8 @@ type AsyncPropagatorOptions struct {

// EventOptions are the configurable options for events
type EventOptions struct {
NumConsumers int `mapstructure:"numconsumers"`
NumConsumers int `mapstructure:"numconsumers"`
ConsumerGroup string `mapstructure:"consumer_group"`
}

// TokenOptions are the configurable option for tokens
Expand Down Expand Up @@ -172,5 +173,9 @@ func New(m map[string]interface{}) (*Options, error) {
o.UploadDirectory = filepath.Join(o.Root, "uploads")
}

if o.Events.ConsumerGroup == "" {
o.Events.ConsumerGroup = "dcfs"
}

return o, nil
}
3 changes: 3 additions & 0 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ var (
// OCMStorageSpaceID is the space id used by the ocmreceived storageprovider
OCMStorageSpaceID = "89f37a33-858b-45fa-8890-a1f2b27d90e1"

// VaultStorageProviderID is the storage id used by the vault storageprovider
VaultStorageProviderID = "1a01c2c4-4309-4483-a845-842fd56d8622"

// SpaceGrant is used to signal the storageprovider that the grant is on a space
SpaceGrant struct{}
)
Expand Down