diff --git a/cmd/crdsmigrator/example.yaml b/cmd/crdsmigrator/example.yaml new file mode 100644 index 00000000..384200f3 --- /dev/null +++ b/cmd/crdsmigrator/example.yaml @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2026 The Crossplane Authors +# +# SPDX-License-Identifier: CC0-1.0 + +{ + "locations.conditionalaccess.azuread.upbound.io":"v1beta2", + "invitations.invitations.azuread.upbound.io":"v1beta2", + "principals.serviceprincipals.azuread.upbound.io":"v1beta2" +} \ No newline at end of file diff --git a/cmd/crdsmigrator/main.go b/cmd/crdsmigrator/main.go new file mode 100644 index 00000000..122677df --- /dev/null +++ b/cmd/crdsmigrator/main.go @@ -0,0 +1,229 @@ +// SPDX-FileCopyrightText: 2026 The Crossplane Authors +// +// SPDX-License-Identifier: Apache-2.0 + +package main + +import ( + "context" + "log" + "os" + "path/filepath" + "strings" + "time" + + "github.com/alecthomas/kingpin/v2" + "github.com/crossplane/crossplane-runtime/v2/pkg/errors" + authv1 "k8s.io/api/authorization/v1" + extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/yaml" + + "github.com/crossplane/upjet/v2/pkg/config" +) + +var ( + app = kingpin.New("crds-migrator", "A CLI tool to manually update CRD storage versions for storage version migration") + + // Global flags + kubeconfig = app.Flag("kubeconfig", "Path to kubeconfig file. If not specified, uses in-cluster config or default kubeconfig location").String() + + // Update command + updateCmd = app.Command("update", "Update CRD status to reflect the current storage version") + updateCRDNames = updateCmd.Flag("crd-names", "Comma-separated list of CRD:version pairs (e.g., 'buckets.s3.aws.upbound.io:v1beta2,users.iam.aws.upbound.io:v1beta1')").String() + updateCRDFile = updateCmd.Flag("crd-file", "Path to YAML file containing CRD to storage version mappings").String() + updateRetries = updateCmd.Flag("retries", "Number of retry attempts (must be > 0)").Default("10").Int() + updateDuration = updateCmd.Flag("retry-duration", "Initial retry duration in seconds (must be > 0)").Default("1").Int() + updateFactor = updateCmd.Flag("retry-factor", "Retry backoff factor (must be > 1.0 for exponential growth)").Default("2.0").Float64() + updateJitter = updateCmd.Flag("retry-jitter", "Retry jitter between 0.0 and 1.0").Default("0.1").Float64() + skipPermCheck = updateCmd.Flag("skip-permission-check", "Skip permission check before updating CRD").Bool() +) + +func main() { + command := kingpin.MustParse(app.Parse(os.Args[1:])) + + if command == updateCmd.FullCommand() { + if err := runUpdate(); err != nil { + kingpin.FatalIfError(err, "Failed to update CRD storage") + } + } +} + +func runUpdate() error { //nolint:gocyclo // easier to follow as a unit + ctx := context.Background() + + // Parse CRD names and versions from flags + crdVersionMap, err := parseCRDMappings(*updateCRDNames, *updateCRDFile) + if err != nil { + return errors.Wrap(err, "failed to parse CRD mappings") + } + + if len(crdVersionMap) == 0 { + return errors.New("no CRD mappings provided. Use --crd-names or --crd-file") + } + + // Build Kubernetes client configuration + cfg, err := buildKubeConfig(*kubeconfig) + if err != nil { + return errors.Wrap(err, "failed to build kube config") + } + + // Register the CRD and authorization schemes + if err := extv1.AddToScheme(scheme.Scheme); err != nil { + return errors.Wrap(err, "failed to add extensions v1 to scheme") + } + if err := authv1.AddToScheme(scheme.Scheme); err != nil { + return errors.Wrap(err, "failed to add authv1 to scheme") + } + + // Create a Kubernetes client + kube, err := client.New(cfg, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return errors.Wrap(err, "failed to create kube client") + } + + // Validate and configure retry backoff + if err := validateRetryConfig(*updateRetries, *updateDuration, *updateFactor, *updateJitter); err != nil { + return errors.Wrap(err, "invalid retry configuration") + } + + retryBackoff := wait.Backoff{ + Duration: time.Duration(*updateDuration) * time.Second, + Factor: *updateFactor, + Jitter: *updateJitter, + Steps: *updateRetries, + } + + log.Println("Starting CRD storage version migration", "crd-count", len(crdVersionMap)) + + // Process each CRD + var failedCRDs []string + for crdName, storageVersion := range crdVersionMap { + log.Println("Processing CRD", "crd-name", crdName, "target-storage-version", storageVersion) + + // Check permissions before attempting the update + if !*skipPermCheck { + hasPermission, err := config.CheckCRDStatusUpdatePermission(ctx, kube, crdName) + if err != nil { + log.Println("Permission check failed", "crd-name", crdName, "error", err) + failedCRDs = append(failedCRDs, crdName) + continue + } + + if !hasPermission { + log.Println("WARNING: The current user does not have sufficient permissions to patch CRD status.\n"+ + "Required permissions: patch verb on customresourcedefinitions/status subresource", + "crd-name", crdName) + failedCRDs = append(failedCRDs, crdName) + continue + } + } + + // Execute the CRD storage version update + if err := config.UpdateCRDStorageVersion(ctx, kube, retryBackoff, crdName, storageVersion); err != nil { + log.Println("Failed to update CRD storage version", "crd-name", crdName, "error", err) + failedCRDs = append(failedCRDs, crdName) + continue + } + + log.Println("Successfully updated CRD storage version", "crd-name", crdName, "storage-version", storageVersion) + } + + // Report results + successCount := len(crdVersionMap) - len(failedCRDs) + log.Println("CRD storage version migration completed", "total", len(crdVersionMap), "successful", successCount, "failed", len(failedCRDs)) + + if len(failedCRDs) > 0 { + log.Println("Failed CRDs", "crd-names", failedCRDs) + return errors.Errorf("failed to update %d CRD(s): %v", len(failedCRDs), failedCRDs) + } + + return nil +} + +func parseCRDMappings(crdNamesFlag, crdFile string) (map[string]string, error) { //nolint:gocyclo // easier to follow as a unit + // Validate that only one input method is used + if crdNamesFlag != "" && crdFile != "" { + return nil, errors.New("cannot use both --crd-names and --crd-file at the same time. Please use only one") + } + + crdVersionMap := make(map[string]string) + + // Parse from comma-separated flag (format: "crd1:version1,crd2:version2") + if crdNamesFlag != "" { + for _, pair := range strings.Split(crdNamesFlag, ",") { + trimmed := strings.TrimSpace(pair) + if trimmed == "" { + continue + } + + parts := strings.SplitN(trimmed, ":", 2) + if len(parts) != 2 { + return nil, errors.Errorf("invalid CRD:version format: %q. Expected format: 'crd-name:storage-version'", trimmed) + } + + crdName := strings.TrimSpace(parts[0]) + version := strings.TrimSpace(parts[1]) + + if crdName == "" || version == "" { + return nil, errors.Errorf("invalid CRD:version format: %q. Both CRD name and version must be non-empty", trimmed) + } + + crdVersionMap[crdName] = version + } + return crdVersionMap, nil + } + + // Parse from YAML file (format: crd-name: storage-version) + if crdFile != "" { + data, err := os.ReadFile(filepath.Clean(crdFile)) + if err != nil { + return nil, errors.Wrapf(err, "failed to read CRD file: %s", crdFile) + } + + if err := yaml.Unmarshal(data, &crdVersionMap); err != nil { + return nil, errors.Wrapf(err, "failed to parse YAML from file: %s", crdFile) + } + + return crdVersionMap, nil + } + + return nil, nil +} + +func validateRetryConfig(retries, duration int, factor, jitter float64) error { + if retries <= 0 { + return errors.Errorf("retries must be greater than 0, got %d", retries) + } + if duration <= 0 { + return errors.Errorf("retry-duration must be greater than 0, got %d", duration) + } + if factor <= 1.0 { + return errors.Errorf("retry-factor must be greater than 1.0 for exponential backoff, got %.2f", factor) + } + if jitter < 0.0 || jitter > 1.0 { + return errors.Errorf("retry-jitter must be between 0.0 and 1.0, got %.2f", jitter) + } + return nil +} + +func buildKubeConfig(kubeconfigPath string) (*rest.Config, error) { + // If kubeconfig path is not specified, use default location + if kubeconfigPath == "" { + // Try in-cluster config first + if cfg, err := rest.InClusterConfig(); err == nil { + return cfg, nil + } + + // Fall back to default kubeconfig location + if home := os.Getenv("HOME"); home != "" { + kubeconfigPath = filepath.Join(home, ".kube", "config") + } + } + + return clientcmd.BuildConfigFromFlags("", kubeconfigPath) +} diff --git a/go.mod b/go.mod index 426fe0de..f187051f 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.33.0 + k8s.io/apiextensions-apiserver v0.33.0 k8s.io/apimachinery v0.33.0 k8s.io/client-go v0.33.0 k8s.io/utils v0.0.0-20250321185631-1f6e0b77f77e @@ -121,7 +122,6 @@ require ( google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - k8s.io/apiextensions-apiserver v0.33.0 // indirect k8s.io/code-generator v0.33.0 // indirect k8s.io/component-base v0.33.0 // indirect k8s.io/gengo/v2 v2.0.0-20250207200755-1244d31929d7 // indirect diff --git a/pkg/config/crds_migrator.go b/pkg/config/crds_migrator.go new file mode 100644 index 00000000..d40d06de --- /dev/null +++ b/pkg/config/crds_migrator.go @@ -0,0 +1,275 @@ +// SPDX-FileCopyrightText: 2026 The Crossplane Authors +// +// SPDX-License-Identifier: Apache-2.0 + +package config + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/crossplane/crossplane-runtime/v2/pkg/errors" + "github.com/crossplane/crossplane-runtime/v2/pkg/logging" + authv1 "k8s.io/api/authorization/v1" + extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + kerrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/discovery" + "k8s.io/client-go/restmapper" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// The all logic was borrowed from crossplane-runtime. Because the relevant +// files are in the internal package in crossplane-runtime. When we move them +// outside the internal package, we can remove the duplication here. + +// CRDsMigrator makes sure the CRDs are using the latest storage version. +type CRDsMigrator struct { + gvkList []schema.GroupVersionKind + retryBackoff wait.Backoff +} + +// CRDsMigratorOption is a functional option for configuring CRDsMigrator. +type CRDsMigratorOption func(*CRDsMigrator) + +// WithRetryBackoff sets the retry backoff configuration. +func WithRetryBackoff(backoff wait.Backoff) CRDsMigratorOption { + return func(c *CRDsMigrator) { + c.retryBackoff = backoff + } +} + +// NewCRDsMigrator returns a new *CRDsMigrator with default retry configuration. +func NewCRDsMigrator(gvkList []schema.GroupVersionKind, opts ...CRDsMigratorOption) *CRDsMigrator { + c := &CRDsMigrator{ + gvkList: gvkList, + retryBackoff: wait.Backoff{ + Duration: 1 * time.Second, + Factor: 2.0, + Jitter: 0.1, + Steps: 10, + }, + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +// Run migrates CRDs to use the latest storage version by listing all resources +// of the old storage version, patching them to trigger conversion to the new +// storage version, and updating the CRD status to reflect only the new storage version. +func (c *CRDsMigrator) Run(ctx context.Context, logr logging.Logger, discoveryClient discovery.DiscoveryInterface, kube client.Client) error { //nolint:gocyclo // easier to follow as a unit + // Perform API discovery once before the loop to avoid expensive repeated discovery calls + groupResources, err := restmapper.GetAPIGroupResources(discoveryClient) + if err != nil { + return errors.Wrap(err, "failed to get API group resources") + } + mapper := restmapper.NewDiscoveryRESTMapper(groupResources) + + for _, gvk := range c.gvkList { + crdName, err := GetCRDNameFromGVK(mapper, gvk) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed to get CRD name from GVK %s", gvk.Kind)) + } + + var crd extv1.CustomResourceDefinition + if err := kube.Get(ctx, client.ObjectKey{Name: crdName}, &crd); err != nil { + if kerrors.IsNotFound(err) { + // nothing to do for this CRD + continue + } + + return errors.Wrapf(err, "cannot get %s crd", crdName) + } + + // Find the current storage version (the version marked as storage in the spec) + var storageVersion string + for _, v := range crd.Spec.Versions { + if v.Storage { + storageVersion = v.Name + break + } + } + if storageVersion == "" { + return errors.Errorf("no storage version found for CRD %s", crdName) + } + storedVersions := crd.Status.StoredVersions + + // Check if migration is needed by comparing stored versions with the current storage version + var needMigration bool + for _, storedVersion := range storedVersions { + if storedVersion != storageVersion { + needMigration = true + break + } + } + + if !needMigration { + logr.Debug("Skipping CRD migration for CRD because it has already been migrated", "crd-name", crdName) + continue + } + + logr.Debug("Storage version migration is starting", "crd", crdName) + // Prepare to list all resources of this CRD using the current storage version + resources := unstructured.UnstructuredList{} + resources.SetGroupVersionKind(schema.GroupVersionKind{ + Group: crd.Spec.Group, + Version: storageVersion, + Kind: crd.Spec.Names.ListKind, + }) + + // List all resources in batches and patch each one to trigger storage version migration. + // The empty patch causes the API server to read the resource in its stored version + // and write it back in the current storage version. + var continueToken string + for { + // Retry resource listing with exponential backoff + listErr := retry.OnError(c.retryBackoff, func(err error) bool { return true }, func() error { + return kube.List(ctx, &resources, + client.Limit(500), + client.Continue(continueToken), + ) + }) + if listErr != nil { + return errors.Wrapf(listErr, "cannot list %s", resources.GroupVersionKind().String()) + } + + for i := range resources.Items { + // apply empty patch for storage version upgrade with retry + res := resources.Items[i] + patchErr := retry.OnError(c.retryBackoff, func(err error) bool { + return !kerrors.IsNotFound(err) + }, func() error { + return kube.Patch(ctx, &res, client.RawPatch(types.MergePatchType, []byte(`{}`))) + }) + if patchErr != nil { + if kerrors.IsNotFound(patchErr) { + continue + } + return errors.Wrapf(patchErr, "cannot patch %s %q", crd.Spec.Names.Kind, res.GetName()) + } + } + + continueToken = resources.GetContinue() + if continueToken == "" { + break + } + } + + // Check if the client has permission to update/patch CRD status before attempting the update + hasPermission, err := CheckCRDStatusUpdatePermission(ctx, kube, crdName) + if err != nil { + return errors.Wrapf(err, "permission check failed for CRD %s", crdName) + } + + if !hasPermission { + logr.Info(fmt.Sprintf("This client does not have permission to execute %s operation for patch", crdName)) + continue + } + + // Update CRD status to reflect that only the new storage version is stored + if err := UpdateCRDStorageVersion(ctx, kube, c.retryBackoff, crdName, storageVersion); err != nil { + return err + } + logr.Debug("Storage version migration completed", "crd", crdName) + } + return nil +} + +// GetCRDNameFromGVK returns the CRD name (e.g., "resources.group.example.com") for a given GroupVersionKind +// by using the provided REST mapper to find the REST mapping. +func GetCRDNameFromGVK(mapper meta.RESTMapper, gvk schema.GroupVersionKind) (string, error) { + mapping, err := mapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return "", err + } + + return mapping.Resource.Resource + "." + mapping.Resource.Group, nil +} + +// UpdateCRDStorageVersion updates the CRD status to reflect only the specified storage version. +// It retries the update with exponential backoff and verifies the update was successful. +func UpdateCRDStorageVersion(ctx context.Context, kube client.Client, retryBackoff wait.Backoff, crdName, storageVersion string) error { + var crd extv1.CustomResourceDefinition + // Update CRD status to reflect that only the new storage version is stored + // Use retry for status updates as they can fail due to conflicts + statusUpdateErr := retry.OnError(retryBackoff, func(err error) bool { return true }, func() error { + // Re-fetch the CRD to get the latest version before patching + if err := kube.Get(ctx, client.ObjectKey{Name: crdName}, &crd); err != nil { + return err + } + origCrd := crd.DeepCopy() + crd.Status.StoredVersions = []string{storageVersion} + return kube.Status().Patch(ctx, &crd, client.MergeFrom(origCrd)) + }) + if statusUpdateErr != nil { + return errors.Wrapf(statusUpdateErr, "couldn't update %s crd", crd.Name) + } + + // One more check just to be sure we actually updated the crd + if err := kube.Get(ctx, client.ObjectKey{Name: crd.Name}, &crd); err != nil { + return errors.Wrapf(err, "cannot get %s crd to check", crd.Name) + } + + if len(crd.Status.StoredVersions) != 1 || crd.Status.StoredVersions[0] != storageVersion { + return errors.Errorf("was expecting CRD %q to only have %s, got instead: %v", crd.Name, storageVersion, crd.Status.StoredVersions) + } + return nil +} + +// CheckCRDStatusUpdatePermission checks if the current client has permission to update/patch +// the status subresource of the specified CRD using SelfSubjectAccessReview. +func CheckCRDStatusUpdatePermission(ctx context.Context, kube client.Client, crdName string) (bool, error) { + // Check for both 'patch' verb on the status subresource + ssar := &authv1.SelfSubjectAccessReview{ + Spec: authv1.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authv1.ResourceAttributes{ + Group: "apiextensions.k8s.io", + Resource: "customresourcedefinitions", + Subresource: "status", + Name: crdName, + Verb: "patch", + }, + }, + } + + if err := kube.Create(ctx, ssar); err != nil { + if kerrors.IsForbidden(err) || kerrors.IsUnauthorized(err) || kerrors.IsNotFound(err) { + return false, nil + } + return false, errors.Wrap(err, "failed to create SelfSubjectAccessReview for verb patch") + } + + if !ssar.Status.Allowed { + return false, nil + } + + return true, nil +} + +// PrepareCRDsMigrator scans the provider's resources for any that have previous versions +// and creates a CRDsMigrator to handle storage version migration for those resources. +// It sets the StorageVersionMigrator field on the Provider with the configured migrator. +func PrepareCRDsMigrator(pc *Provider) { + var gvkList []schema.GroupVersionKind + for _, r := range pc.Resources { + if len(r.PreviousVersions) != 0 { + gvkList = append(gvkList, schema.GroupVersionKind{ + Group: strings.ToLower(r.ShortGroup + "." + pc.RootGroup), + Version: r.CRDStorageVersion(), + Kind: r.Kind, + }) + } + } + pc.StorageVersionMigrator = NewCRDsMigrator(gvkList) +} diff --git a/pkg/config/provider.go b/pkg/config/provider.go index a81e4b49..9ab7deba 100644 --- a/pkg/config/provider.go +++ b/pkg/config/provider.go @@ -179,6 +179,10 @@ type Provider struct { // modify the Provider configuration based on the underlying Terraform // resource schemas. schemaTraversers []traverser.SchemaTraverser + + // StorageVersionMigrator handles the migration of CRD resources from old + // storage versions to new storage versions when the CRD schema is updated. + StorageVersionMigrator *CRDsMigrator } // ReferenceInjector injects cross-resource references across the resources @@ -308,6 +312,14 @@ func WithExampleManifestConfiguration(emc ExampleManifestConfiguration) Provider } } +// WithStorageVersionMigrator configures a CRDsMigrator for handling storage version +// migrations of CRD resources when their schemas are updated. +func WithStorageVersionMigrator(migrator *CRDsMigrator) ProviderOption { + return func(p *Provider) { + p.StorageVersionMigrator = migrator + } +} + // NewProvider builds and returns a new Provider from provider // tfjson schema, that is generated using Terraform CLI with: // `terraform providers schema --json`