From 3258b35407a54b24097dac4c0e45c1146f0bb808 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergen=20Yal=C3=A7=C4=B1n?= Date: Tue, 13 Jan 2026 11:34:44 +0300 Subject: [PATCH 1/7] Add storage version migration support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergen Yalçın --- go.mod | 2 +- pkg/config/crds_migrator.go | 171 ++++++++++++++++++++++++++++++++++++ pkg/config/provider.go | 12 +++ 3 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 pkg/config/crds_migrator.go diff --git a/go.mod b/go.mod index 426fe0de..f187051f 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.33.0 + k8s.io/apiextensions-apiserver v0.33.0 k8s.io/apimachinery v0.33.0 k8s.io/client-go v0.33.0 k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e @@ -121,7 +122,6 @@ require ( google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - k8s.io/apiextensions-apiserver v0.33.0 // indirect k8s.io/code-generator v0.33.0 // indirect k8s.io/component-base v0.33.0 // indirect k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 // indirect diff --git a/pkg/config/crds_migrator.go b/pkg/config/crds_migrator.go new file mode 100644 index 00000000..e2f79cde --- /dev/null +++ b/pkg/config/crds_migrator.go @@ -0,0 +1,171 @@ +// SPDX-FileCopyrightText: 2026 The Crossplane Authors +// +// SPDX-License-Identifier: Apache-2.0 + +package config + +import ( + "context" + "fmt" + + "github.com/crossplane/crossplane-runtime/v2/pkg/errors" + "github.com/crossplane/crossplane-runtime/v2/pkg/logging" + extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/discovery" + "k8s.io/client-go/restmapper" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// The all logic was borrowed from crossplane-runtime. Because the relevant +// files are in the internal package in crossplane-runtime. When we move them +// outside the internal package, we can remove the duplication here. + +// CRDsMigrator makes sure the CRDs are using the latest storage version. +type CRDsMigrator struct { + gvkList []schema.GroupVersionKind +} + +// NewCRDsMigrator returns a new *CRDsMigrator. +func NewCRDsMigrator(gvkList []schema.GroupVersionKind) *CRDsMigrator { + c := &CRDsMigrator{ + gvkList: gvkList, + } + + return c +} + +// Run migrates CRDs to use the latest storage version by listing all resources +// of the old storage version, patching them to trigger conversion to the new +// storage version, and updating the CRD status to reflect only the new storage version. +func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryClient discovery.DiscoveryInterface, kube client.Client) error { //nolint:gocyclo // easier to follow as a unit + for _, gvk := range c.gvkList { + crdName, err := GetCRDNameFromGVK(discoveryClient, gvk) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed to get CRD name from GVK %s", gvk.Kind)) + } + + var crd extv1.CustomResourceDefinition + if err := kube.Get(ctx, client.ObjectKey{Name: crdName}, &crd); err != nil { + if kerrors.IsNotFound(err) { + // nothing to do for this CRD + continue + } + + return errors.Wrapf(err, "cannot get %s crd", crdName) + } + + // Find the current storage version (the version marked as storage in the spec) + var storageVersion string + for _, v := range crd.Spec.Versions { + if v.Storage { + storageVersion = v.Name + break + } + } + storedVersions := crd.Status.StoredVersions + + // Check if migration is needed by comparing stored versions with the current storage version + var needMigration bool + for _, storedVersion := range storedVersions { + if storedVersion != storageVersion { + needMigration = true + } + } + + if !needMigration { + continue + } + + logr.Debug("Storage version migration is starting", "crd", crdName) + // Prepare to list all resources of this CRD using the current storage version + resources := unstructured.UnstructuredList{} + resources.SetGroupVersionKind(schema.GroupVersionKind{ + Group: crd.Spec.Group, + Version: storageVersion, + Kind: crd.Spec.Names.ListKind, + }) + + // List all resources in batches and patch each one to trigger storage version migration. + // The empty patch causes the API server to read the resource in its stored version + // and write it back in the current storage version. + var continueToken string + for { + if err := kube.List(ctx, &resources, + client.Limit(500), + client.Continue(continueToken), + ); err != nil { + return errors.Wrapf(err, "cannot list %s", resources.GroupVersionKind().String()) + } + + for i := range resources.Items { + // apply empty patch for storage version upgrade + res := resources.Items[i] + if err := kube.Patch(ctx, &res, client.RawPatch(types.MergePatchType, []byte(`{}`))); err != nil { + return errors.Wrapf(err, "cannot patch %s %q", crd.Spec.Names.Kind, res.GetName()) + } + } + + continueToken = resources.GetContinue() + if continueToken == "" { + break + } + } + + // Update CRD status to reflect that only the new storage version is stored + origCrd := crd.DeepCopy() + + crd.Status.StoredVersions = []string{storageVersion} + if err := kube.Status().Patch(ctx, &crd, client.MergeFrom(origCrd)); err != nil { + return errors.Wrapf(err, "couldn't update %s crd", crd.Name) + } + + // One more check just to be sure we actually updated the crd + if err := kube.Get(ctx, client.ObjectKey{Name: crd.Name}, &crd); err != nil { + return errors.Wrapf(err, "cannot get %s crd to check", crd.Name) + } + + if len(crd.Status.StoredVersions) != 1 || crd.Status.StoredVersions[0] != storageVersion { + return errors.Errorf("was expecting CRD %q to only have %s, got instead: %v", crd.Name, storageVersion, crd.Status.StoredVersions) + } + logr.Debug("Storage version migration completed", "crd", crdName) + } + return nil +} + +// GetCRDNameFromGVK returns the CRD name (e.g., "resources.group.example.com") for a given GroupVersionKind +// by using the discovery client to find the REST mapping. +func GetCRDNameFromGVK(discoveryClient discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (string, error) { + groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) + if err != nil { + return "", err + } + + mapper := restmapper.NewDiscoveryRESTMapper(groupResources) + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return "", err + } + + return mapping.Resource.Resource + "." + mapping.Resource.Group, nil +} + +// PrepareCRDsMigrator scans the provider's resources for any that have previous versions +// and creates a CRDsMigrator to handle storage version migration for those resources. +// It sets the StorageVersionMigrator field on the Provider with the configured migrator. +func PrepareCRDsMigrator(pc *Provider) { + var gvkList []schema.GroupVersionKind + for _, r := range pc.Resources { + if len(r.PreviousVersions) != 0 { + gvkList = append(gvkList, schema.GroupVersionKind{ + Group: r.ShortGroup + "." + pc.RootGroup, + Version: r.CRDStorageVersion(), + Kind: r.Kind, + }) + } + } + pc.StorageVersionMigrator = NewCRDsMigrator(gvkList) +} diff --git a/pkg/config/provider.go b/pkg/config/provider.go index a81e4b49..9ab7deba 100644 --- a/pkg/config/provider.go +++ b/pkg/config/provider.go @@ -179,6 +179,10 @@ type Provider struct { // modify the Provider configuration based on the underlying Terraform // resource schemas. schemaTraversers []traverser.SchemaTraverser + + // StorageVersionMigrator handles the migration of CRD resources from old + // storage versions to new storage versions when the CRD schema is updated. + StorageVersionMigrator *CRDsMigrator } // ReferenceInjector injects cross-resource references across the resources @@ -308,6 +312,14 @@ func WithExampleManifestConfiguration(emc ExampleManifestConfiguration) Provider } } +// WithStorageVersionMigrator configures a CRDsMigrator for handling storage version +// migrations of CRD resources when their schemas are updated. +func WithStorageVersionMigrator(migrator *CRDsMigrator) ProviderOption { + return func(p *Provider) { + p.StorageVersionMigrator = migrator + } +} + // NewProvider builds and returns a new Provider from provider // tfjson schema, that is generated using Terraform CLI with: // `terraform providers schema --json` From 738bbecb16b20f5b1ffe3fa3e0d8f4e7a85c299e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergen=20Yal=C3=A7=C4=B1n?= Date: Thu, 15 Jan 2026 15:58:44 +0300 Subject: [PATCH 2/7] Prepare mapper just one time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergen Yalçın --- pkg/config/crds_migrator.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/pkg/config/crds_migrator.go b/pkg/config/crds_migrator.go index e2f79cde..7eb647c6 100644 --- a/pkg/config/crds_migrator.go +++ b/pkg/config/crds_migrator.go @@ -7,11 +7,13 @@ package config import ( "context" "fmt" + "strings" "github.com/crossplane/crossplane-runtime/v2/pkg/errors" "github.com/crossplane/crossplane-runtime/v2/pkg/logging" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -42,8 +44,15 @@ func NewCRDsMigrator(gvkList []schema.GroupVersionKind) *CRDsMigrator { // of the old storage version, patching them to trigger conversion to the new // storage version, and updating the CRD status to reflect only the new storage version. func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryClient discovery.DiscoveryInterface, kube client.Client) error { //nolint:gocyclo // easier to follow as a unit + // Perform API discovery once before the loop to avoid expensive repeated discovery calls + groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) + if err != nil { + return errors.Wrap(err, "failed to get API group resources") + } + mapper := restmapper.NewDiscoveryRESTMapper(groupResources) + for _, gvk := range c.gvkList { - crdName, err := GetCRDNameFromGVK(discoveryClient, gvk) + crdName, err := GetCRDNameFromGVK(mapper, gvk) if err != nil { return errors.Wrap(err, fmt.Sprintf("failed to get CRD name from GVK %s", gvk.Kind)) } @@ -66,6 +75,9 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl break } } + if storageVersion == "" { + return errors.Errorf("no storage version found for CRD %s", crdName) + } storedVersions := crd.Status.StoredVersions // Check if migration is needed by comparing stored versions with the current storage version @@ -73,6 +85,7 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl for _, storedVersion := range storedVersions { if storedVersion != storageVersion { needMigration = true + break } } @@ -137,14 +150,8 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl } // GetCRDNameFromGVK returns the CRD name (e.g., "resources.group.example.com") for a given GroupVersionKind -// by using the discovery client to find the REST mapping. -func GetCRDNameFromGVK(discoveryClient discovery.DiscoveryInterface, gvk schema.GroupVersionKind) (string, error) { - groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) - if err != nil { - return "", err - } - - mapper := restmapper.NewDiscoveryRESTMapper(groupResources) +// by using the provided REST mapper to find the REST mapping. +func GetCRDNameFromGVK(mapper meta.RESTMapper, gvk schema.GroupVersionKind) (string, error) { mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return "", err @@ -161,7 +168,7 @@ func PrepareCRDsMigrator(pc *Provider) { for _, r := range pc.Resources { if len(r.PreviousVersions) != 0 { gvkList = append(gvkList, schema.GroupVersionKind{ - Group: r.ShortGroup + "." + pc.RootGroup, + Group: strings.ToLower(r.ShortGroup + "." + pc.RootGroup), Version: r.CRDStorageVersion(), Kind: r.Kind, }) From e3d30b6fe9aefd3c20615919841aa7b82a4c9f21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergen=20Yal=C3=A7=C4=B1n?= Date: Fri, 23 Jan 2026 12:34:34 +0300 Subject: [PATCH 3/7] Add verb check and tool for manual storage version migration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergen Yalçın --- cmd/crdsmigrator/example.yaml | 5 + cmd/crdsmigrator/main.go | 209 ++++++++++++++++++++++++++++++++++ pkg/config/crds_migrator.go | 143 +++++++++++++++++++---- 3 files changed, 334 insertions(+), 23 deletions(-) create mode 100644 cmd/crdsmigrator/example.yaml create mode 100644 cmd/crdsmigrator/main.go diff --git a/cmd/crdsmigrator/example.yaml b/cmd/crdsmigrator/example.yaml new file mode 100644 index 00000000..dc0f1b37 --- /dev/null +++ b/cmd/crdsmigrator/example.yaml @@ -0,0 +1,5 @@ +{ + "locations.conditionalaccess.azuread.upbound.io":"v1beta2", + "invitations.invitations.azuread.upbound.io":"v1beta2", + "principals.serviceprincipals.azuread.upbound.io":"v1beta2" +} \ No newline at end of file diff --git a/cmd/crdsmigrator/main.go b/cmd/crdsmigrator/main.go new file mode 100644 index 00000000..cde9a1f1 --- /dev/null +++ b/cmd/crdsmigrator/main.go @@ -0,0 +1,209 @@ +// SPDX-FileCopyrightText: 2026 The Crossplane Authors +// +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "context" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/alecthomas/kingpin/v2" + "github.com/crossplane/crossplane-runtime/v2/pkg/errors" + authv1 "k8s.io/api/authorization/v1" + extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" + + "github.com/crossplane/upjet/v2/pkg/config" +) + +var ( + app = kingpin.New("crds-migrator", "A CLI tool to manually update CRD storage versions for storage version migration") + + // Global flags + kubeconfig = app.Flag("kubeconfig", "Path to kubeconfig file. If not specified, uses in-cluster config or default kubeconfig location").String() + + // Update command + updateCmd = app.Command("update", "Update CRD status to reflect the current storage version") + updateCRDNames = updateCmd.Flag("crd-names", "Comma-separated list of CRD:version pairs (e.g., 'buckets.s3.aws.upbound.io:v1beta2,users.iam.aws.upbound.io:v1beta1')").String() + updateCRDFile = updateCmd.Flag("crd-file", "Path to YAML file containing CRD to storage version mappings").String() + updateRetries = updateCmd.Flag("retries", "Number of retry attempts").Default("5").Int() + updateDuration = updateCmd.Flag("retry-duration", "Initial retry duration in seconds").Default("1").Int() + updateFactor = updateCmd.Flag("retry-factor", "Retry backoff factor").Default("2.0").Float64() + updateJitter = updateCmd.Flag("retry-jitter", "Retry jitter (0.0-1.0)").Default("0.1").Float64() + skipPermCheck = updateCmd.Flag("skip-permission-check", "Skip permission check before updating CRD").Bool() +) + +func main() { + command := kingpin.MustParse(app.Parse(os.Args[1:])) + + if command == updateCmd.FullCommand() { + if err := runUpdate(); err != nil { + kingpin.FatalIfError(err, "Failed to update CRD storage") + } + } +} + +func runUpdate() error { //nolint:gocyclo // easier to follow as a unit + ctx := context.Background() + + // Parse CRD names and versions from flags + crdVersionMap, err := parseCRDMappings(*updateCRDNames, *updateCRDFile) + if err != nil { + return errors.Wrap(err, "failed to parse CRD mappings") + } + + if len(crdVersionMap) == 0 { + return errors.New("no CRD mappings provided. Use --crd-names or --crd-file") + } + + // Build Kubernetes client configuration + cfg, err := buildKubeConfig(*kubeconfig) + if err != nil { + return errors.Wrap(err, "failed to build kube config") + } + + // Register the CRD and authorization schemes + if err := extv1.AddToScheme(scheme.Scheme); err != nil { + return errors.Wrap(err, "failed to add extensions v1 to scheme") + } + if err := authv1.AddToScheme(scheme.Scheme); err != nil { + return errors.Wrap(err, "failed to add authv1 to scheme") + } + + // Create a Kubernetes client + kube, err := client.New(cfg, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return errors.Wrap(err, "failed to create kube client") + } + + // Configure retry backoff + retryBackoff := wait.Backoff{ + Duration: time.Duration(*updateDuration) * time.Second, + Factor: *updateFactor, + Jitter: *updateJitter, + Steps: *updateRetries, + } + + log.Println("Starting CRD storage version migration", "crd-count", len(crdVersionMap)) + + // Process each CRD + var failedCRDs []string + for crdName, storageVersion := range crdVersionMap { + log.Println("Processing CRD", "crd-name", crdName, "target-storage-version", storageVersion) + + // Check permissions before attempting the update + if !*skipPermCheck { + hasPermission, err := config.CheckCRDStatusUpdatePermission(ctx, kube, crdName) + if err != nil { + log.Println("Permission check failed", "crd-name", crdName, "error", err) + failedCRDs = append(failedCRDs, crdName) + continue + } + + if !hasPermission { + log.Println("WARNING: The current user does not have sufficient permissions to patch CRD status.\n"+ + "Required permissions: patch verb on customresourcedefinitions/status subresource", + "crd-name", crdName) + failedCRDs = append(failedCRDs, crdName) + continue + } + } + + // Execute the CRD storage version update + if err := config.UpdateCRDStorageVersion(ctx, kube, retryBackoff, crdName, storageVersion); err != nil { + log.Println("Failed to update CRD storage version", "crd-name", crdName, "error", err) + failedCRDs = append(failedCRDs, crdName) + continue + } + + log.Println("Successfully updated CRD storage version", "crd-name", crdName, "storage-version", storageVersion) + } + + // Report results + successCount := len(crdVersionMap) - len(failedCRDs) + log.Println("CRD storage version migration completed", "total", len(crdVersionMap), "successful", successCount, "failed", len(failedCRDs)) + + if len(failedCRDs) > 0 { + log.Println("Failed CRDs", "crd-names", failedCRDs) + return errors.Errorf("failed to update %d CRD(s): %v", len(failedCRDs), failedCRDs) + } + + return nil +} + +func parseCRDMappings(crdNamesFlag, crdFile string) (map[string]string, error) { //nolint:gocyclo // easier to follow as a unit + // Validate that only one input method is used + if crdNamesFlag != "" && crdFile != "" { + return nil, errors.New("cannot use both --crd-names and --crd-file at the same time. Please use only one") + } + + crdVersionMap := make(map[string]string) + + // Parse from comma-separated flag (format: "crd1:version1,crd2:version2") + if crdNamesFlag != "" { + for _, pair := range strings.Split(crdNamesFlag, ",") { + trimmed := strings.TrimSpace(pair) + if trimmed == "" { + continue + } + + parts := strings.SplitN(trimmed, ":", 2) + if len(parts) != 2 { + return nil, errors.Errorf("invalid CRD:version format: %q. Expected format: 'crd-name:storage-version'", trimmed) + } + + crdName := strings.TrimSpace(parts[0]) + version := strings.TrimSpace(parts[1]) + + if crdName == "" || version == "" { + return nil, errors.Errorf("invalid CRD:version format: %q. Both CRD name and version must be non-empty", trimmed) + } + + crdVersionMap[crdName] = version + } + return crdVersionMap, nil + } + + // Parse from YAML file (format: crd-name: storage-version) + if crdFile != "" { + data, err := os.ReadFile(filepath.Clean(crdFile)) + if err != nil { + return nil, errors.Wrapf(err, "failed to read CRD file: %s", crdFile) + } + + if err := yaml.Unmarshal(data, &crdVersionMap); err != nil { + return nil, errors.Wrapf(err, "failed to parse YAML from file: %s", crdFile) + } + + return crdVersionMap, nil + } + + return nil, nil +} + +func buildKubeConfig(kubeconfigPath string) (*rest.Config, error) { + // If kubeconfig path is not specified, use default location + if kubeconfigPath == "" { + // Try in-cluster config first + if cfg, err := rest.InClusterConfig(); err == nil { + return cfg, nil + } + + // Fall back to default kubeconfig location + if home := os.Getenv("HOME"); home != "" { + kubeconfigPath = filepath.Join(home, ".kube", "config") + } + } + + return clientcmd.BuildConfigFromFlags("", kubeconfigPath) +} diff --git a/pkg/config/crds_migrator.go b/pkg/config/crds_migrator.go index 7eb647c6..c0b4fa9c 100644 --- a/pkg/config/crds_migrator.go +++ b/pkg/config/crds_migrator.go @@ -8,17 +8,21 @@ import ( "context" "fmt" "strings" + "time" "github.com/crossplane/crossplane-runtime/v2/pkg/errors" "github.com/crossplane/crossplane-runtime/v2/pkg/logging" + authv1 "k8s.io/api/authorization/v1" extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/restmapper" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -28,13 +32,42 @@ import ( // CRDsMigrator makes sure the CRDs are using the latest storage version. type CRDsMigrator struct { - gvkList []schema.GroupVersionKind + gvkList []schema.GroupVersionKind + maxRetries int + retryBackoff wait.Backoff } -// NewCRDsMigrator returns a new *CRDsMigrator. -func NewCRDsMigrator(gvkList []schema.GroupVersionKind) *CRDsMigrator { +// CRDsMigratorOption is a functional option for configuring CRDsMigrator. +type CRDsMigratorOption func(*CRDsMigrator) + +// WithMaxRetries sets the maximum number of retries for transient failures. +func WithMaxRetries(maxRetries int) CRDsMigratorOption { + return func(c *CRDsMigrator) { + c.maxRetries = maxRetries + } +} + +// WithRetryBackoff sets the retry backoff configuration. +func WithRetryBackoff(backoff wait.Backoff) CRDsMigratorOption { + return func(c *CRDsMigrator) { + c.retryBackoff = backoff + } +} + +// NewCRDsMigrator returns a new *CRDsMigrator with default retry configuration. +func NewCRDsMigrator(gvkList []schema.GroupVersionKind, opts ...CRDsMigratorOption) *CRDsMigrator { c := &CRDsMigrator{ - gvkList: gvkList, + gvkList: gvkList, + maxRetries: 10, + retryBackoff: wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Jitter: 0.1, + }, + } + + for _, opt := range opts { + opt(c) } return c @@ -90,6 +123,7 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl } if !needMigration { + logr.Debug("Skipping CRD migration for CRD because it has already been migrated", crdName) continue } @@ -107,18 +141,25 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl // and write it back in the current storage version. var continueToken string for { - if err := kube.List(ctx, &resources, - client.Limit(500), - client.Continue(continueToken), - ); err != nil { - return errors.Wrapf(err, "cannot list %s", resources.GroupVersionKind().String()) + // Retry resource listing with exponential backoff + listErr := retry.OnError(c.retryBackoff, func(err error) bool { return true }, func() error { + return kube.List(ctx, &resources, + client.Limit(500), + client.Continue(continueToken), + ) + }) + if listErr != nil { + return errors.Wrapf(listErr, "cannot list %s", resources.GroupVersionKind().String()) } for i := range resources.Items { - // apply empty patch for storage version upgrade + // apply empty patch for storage version upgrade with retry res := resources.Items[i] - if err := kube.Patch(ctx, &res, client.RawPatch(types.MergePatchType, []byte(`{}`))); err != nil { - return errors.Wrapf(err, "cannot patch %s %q", crd.Spec.Names.Kind, res.GetName()) + patchErr := retry.OnError(c.retryBackoff, func(err error) bool { return true }, func() error { + return kube.Patch(ctx, &res, client.RawPatch(types.MergePatchType, []byte(`{}`))) + }) + if patchErr != nil { + return errors.Wrapf(patchErr, "cannot patch %s %q", crd.Spec.Names.Kind, res.GetName()) } } @@ -128,21 +169,20 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl } } - // Update CRD status to reflect that only the new storage version is stored - origCrd := crd.DeepCopy() - - crd.Status.StoredVersions = []string{storageVersion} - if err := kube.Status().Patch(ctx, &crd, client.MergeFrom(origCrd)); err != nil { - return errors.Wrapf(err, "couldn't update %s crd", crd.Name) + // Check if the client has permission to update/patch CRD status before attempting the update + hasPermission, err := CheckCRDStatusUpdatePermission(ctx, kube, crdName) + if err != nil { + return errors.Wrapf(err, "permission check failed for CRD %s", crdName) } - // One more check just to be sure we actually updated the crd - if err := kube.Get(ctx, client.ObjectKey{Name: crd.Name}, &crd); err != nil { - return errors.Wrapf(err, "cannot get %s crd to check", crd.Name) + if !hasPermission { + logr.Info(fmt.Sprintf("This client does not have permission to execute %s operation for patch", crdName)) + continue } - if len(crd.Status.StoredVersions) != 1 || crd.Status.StoredVersions[0] != storageVersion { - return errors.Errorf("was expecting CRD %q to only have %s, got instead: %v", crd.Name, storageVersion, crd.Status.StoredVersions) + // Update CRD status to reflect that only the new storage version is stored + if err := UpdateCRDStorageVersion(ctx, kube, c.retryBackoff, crdName, storageVersion); err != nil { + return err } logr.Debug("Storage version migration completed", "crd", crdName) } @@ -160,6 +200,63 @@ func GetCRDNameFromGVK(mapper meta.RESTMapper, gvk schema.GroupVersionKind) (str return mapping.Resource.Resource + "." + mapping.Resource.Group, nil } +// UpdateCRDStorageVersion updates the CRD status to reflect only the specified storage version. +// It retries the update with exponential backoff and verifies the update was successful. +func UpdateCRDStorageVersion(ctx context.Context, kube client.Client, retryBackoff wait.Backoff, crdName, storageVersion string) error { + var crd extv1.CustomResourceDefinition + // Update CRD status to reflect that only the new storage version is stored + // Use retry for status updates as they can fail due to conflicts + statusUpdateErr := retry.OnError(retryBackoff, func(err error) bool { return true }, func() error { + // Re-fetch the CRD to get the latest version before patching + if err := kube.Get(ctx, client.ObjectKey{Name: crdName}, &crd); err != nil { + return err + } + origCrd := crd.DeepCopy() + crd.Status.StoredVersions = []string{storageVersion} + return kube.Status().Patch(ctx, &crd, client.MergeFrom(origCrd)) + }) + if statusUpdateErr != nil { + return errors.Wrapf(statusUpdateErr, "couldn't update %s crd", crd.Name) + } + + // One more check just to be sure we actually updated the crd + if err := kube.Get(ctx, client.ObjectKey{Name: crd.Name}, &crd); err != nil { + return errors.Wrapf(err, "cannot get %s crd to check", crd.Name) + } + + if len(crd.Status.StoredVersions) != 1 || crd.Status.StoredVersions[0] != storageVersion { + return errors.Errorf("was expecting CRD %q to only have %s, got instead: %v", crd.Name, storageVersion, crd.Status.StoredVersions) + } + return nil +} + +// CheckCRDStatusUpdatePermission checks if the current client has permission to update/patch +// the status subresource of the specified CRD using SelfSubjectAccessReview. +func CheckCRDStatusUpdatePermission(ctx context.Context, kube client.Client, crdName string) (bool, error) { + // Check for both 'patch' verb on the status subresource + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "apiextensions.k8s.io", + Resource: "customresourcedefinitions", + Subresource: "status", + Name: crdName, + Verb: "patch", + }, + }, + } + + if err := kube.Create(ctx, ssar); err != nil { + return false, errors.Wrap(err, "failed to create SelfSubjectAccessReview for verb patch") + } + + if !ssar.Status.Allowed { + return false, nil + } + + return true, nil +} + // PrepareCRDsMigrator scans the provider's resources for any that have previous versions // and creates a CRDsMigrator to handle storage version migration for those resources. // It sets the StorageVersionMigrator field on the Provider with the configured migrator. From cc354067647033b48688188a7a1a1df987ad91bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergen=20Yal=C3=A7=C4=B1n?= Date: Fri, 23 Jan 2026 13:01:09 +0300 Subject: [PATCH 4/7] Add licence statement MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergen Yalçın --- cmd/crdsmigrator/example.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cmd/crdsmigrator/example.yaml b/cmd/crdsmigrator/example.yaml index dc0f1b37..384200f3 100644 --- a/cmd/crdsmigrator/example.yaml +++ b/cmd/crdsmigrator/example.yaml @@ -1,3 +1,7 @@ +# SPDX-FileCopyrightText: 2026 The Crossplane Authors +# +# SPDX-License-Identifier: CC0-1.0 + { "locations.conditionalaccess.azuread.upbound.io":"v1beta2", "invitations.invitations.azuread.upbound.io":"v1beta2", From 80703f83fa750770059eeb1ec171a507dd68721c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergen=20Yal=C3=A7=C4=B1n?= Date: Fri, 23 Jan 2026 13:28:45 +0300 Subject: [PATCH 5/7] Fix retry logic and add validations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergen Yalçın --- cmd/crdsmigrator/main.go | 30 +++++++++++++++++++++++++----- pkg/config/crds_migrator.go | 12 ++---------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/cmd/crdsmigrator/main.go b/cmd/crdsmigrator/main.go index cde9a1f1..122677df 100644 --- a/cmd/crdsmigrator/main.go +++ b/cmd/crdsmigrator/main.go @@ -36,10 +36,10 @@ var ( updateCmd = app.Command("update", "Update CRD status to reflect the current storage version") updateCRDNames = updateCmd.Flag("crd-names", "Comma-separated list of CRD:version pairs (e.g., 'buckets.s3.aws.upbound.io:v1beta2,users.iam.aws.upbound.io:v1beta1')").String() updateCRDFile = updateCmd.Flag("crd-file", "Path to YAML file containing CRD to storage version mappings").String() - updateRetries = updateCmd.Flag("retries", "Number of retry attempts").Default("5").Int() - updateDuration = updateCmd.Flag("retry-duration", "Initial retry duration in seconds").Default("1").Int() - updateFactor = updateCmd.Flag("retry-factor", "Retry backoff factor").Default("2.0").Float64() - updateJitter = updateCmd.Flag("retry-jitter", "Retry jitter (0.0-1.0)").Default("0.1").Float64() + updateRetries = updateCmd.Flag("retries", "Number of retry attempts (must be > 0)").Default("10").Int() + updateDuration = updateCmd.Flag("retry-duration", "Initial retry duration in seconds (must be > 0)").Default("1").Int() + updateFactor = updateCmd.Flag("retry-factor", "Retry backoff factor (must be > 1.0 for exponential growth)").Default("2.0").Float64() + updateJitter = updateCmd.Flag("retry-jitter", "Retry jitter between 0.0 and 1.0").Default("0.1").Float64() skipPermCheck = updateCmd.Flag("skip-permission-check", "Skip permission check before updating CRD").Bool() ) @@ -86,7 +86,11 @@ func runUpdate() error { //nolint:gocyclo // easier to follow as a unit return errors.Wrap(err, "failed to create kube client") } - // Configure retry backoff + // Validate and configure retry backoff + if err := validateRetryConfig(*updateRetries, *updateDuration, *updateFactor, *updateJitter); err != nil { + return errors.Wrap(err, "invalid retry configuration") + } + retryBackoff := wait.Backoff{ Duration: time.Duration(*updateDuration) * time.Second, Factor: *updateFactor, @@ -191,6 +195,22 @@ func parseCRDMappings(crdNamesFlag, crdFile string) (map[string]string, error) { return nil, nil } +func validateRetryConfig(retries, duration int, factor, jitter float64) error { + if retries <= 0 { + return errors.Errorf("retries must be greater than 0, got %d", retries) + } + if duration <= 0 { + return errors.Errorf("retry-duration must be greater than 0, got %d", duration) + } + if factor <= 1.0 { + return errors.Errorf("retry-factor must be greater than 1.0 for exponential backoff, got %.2f", factor) + } + if jitter < 0.0 || jitter > 1.0 { + return errors.Errorf("retry-jitter must be between 0.0 and 1.0, got %.2f", jitter) + } + return nil +} + func buildKubeConfig(kubeconfigPath string) (*rest.Config, error) { // If kubeconfig path is not specified, use default location if kubeconfigPath == "" { diff --git a/pkg/config/crds_migrator.go b/pkg/config/crds_migrator.go index c0b4fa9c..7f0e6814 100644 --- a/pkg/config/crds_migrator.go +++ b/pkg/config/crds_migrator.go @@ -33,20 +33,12 @@ import ( // CRDsMigrator makes sure the CRDs are using the latest storage version. type CRDsMigrator struct { gvkList []schema.GroupVersionKind - maxRetries int retryBackoff wait.Backoff } // CRDsMigratorOption is a functional option for configuring CRDsMigrator. type CRDsMigratorOption func(*CRDsMigrator) -// WithMaxRetries sets the maximum number of retries for transient failures. -func WithMaxRetries(maxRetries int) CRDsMigratorOption { - return func(c *CRDsMigrator) { - c.maxRetries = maxRetries - } -} - // WithRetryBackoff sets the retry backoff configuration. func WithRetryBackoff(backoff wait.Backoff) CRDsMigratorOption { return func(c *CRDsMigrator) { @@ -57,12 +49,12 @@ func WithRetryBackoff(backoff wait.Backoff) CRDsMigratorOption { // NewCRDsMigrator returns a new *CRDsMigrator with default retry configuration. func NewCRDsMigrator(gvkList []schema.GroupVersionKind, opts ...CRDsMigratorOption) *CRDsMigrator { c := &CRDsMigrator{ - gvkList: gvkList, - maxRetries: 10, + gvkList: gvkList, retryBackoff: wait.Backoff{ Duration: 1 * time.Second, Factor: 2.0, Jitter: 0.1, + Steps: 10, }, } From 15b3aeef46409393fa51725569ac0b0c06186b5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergen=20Yal=C3=A7=C4=B1n?= Date: Fri, 23 Jan 2026 14:00:45 +0300 Subject: [PATCH 6/7] Add error type checks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergen Yalçın --- pkg/config/crds_migrator.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/config/crds_migrator.go b/pkg/config/crds_migrator.go index 7f0e6814..921ced06 100644 --- a/pkg/config/crds_migrator.go +++ b/pkg/config/crds_migrator.go @@ -147,10 +147,15 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl for i := range resources.Items { // apply empty patch for storage version upgrade with retry res := resources.Items[i] - patchErr := retry.OnError(c.retryBackoff, func(err error) bool { return true }, func() error { + patchErr := retry.OnError(c.retryBackoff, func(err error) bool { + return !kerrors.IsNotFound(err) + }, func() error { return kube.Patch(ctx, &res, client.RawPatch(types.MergePatchType, []byte(`{}`))) }) if patchErr != nil { + if kerrors.IsNotFound(patchErr) { + continue + } return errors.Wrapf(patchErr, "cannot patch %s %q", crd.Spec.Names.Kind, res.GetName()) } } @@ -239,6 +244,9 @@ func CheckCRDStatusUpdatePermission(ctx context.Context, kube client.Client, crd } if err := kube.Create(ctx, ssar); err != nil { + if kerrors.IsForbidden(err) || kerrors.IsUnauthorized(err) || kerrors.IsNotFound(err) { + return false, nil + } return false, errors.Wrap(err, "failed to create SelfSubjectAccessReview for verb patch") } From 2b630225b1df72f56794a29a2c6ae4b8b10a3683 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergen=20Yal=C3=A7=C4=B1n?= Date: Fri, 23 Jan 2026 14:10:19 +0300 Subject: [PATCH 7/7] Add log key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sergen Yalçın --- pkg/config/crds_migrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/crds_migrator.go b/pkg/config/crds_migrator.go index 921ced06..d40d06de 100644 --- a/pkg/config/crds_migrator.go +++ b/pkg/config/crds_migrator.go @@ -115,7 +115,7 @@ func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryCl } if !needMigration { - logr.Debug("Skipping CRD migration for CRD because it has already been migrated", crdName) + logr.Debug("Skipping CRD migration for CRD because it has already been migrated", "crd-name", crdName) continue }