From f04335df889679ff3f5f52cca67634d0aa034a21 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Mon, 19 May 2025 15:22:31 +0530 Subject: [PATCH 1/2] parallel execution of resource clean up --- controllers/druid/handler.go | 104 +++--------- controllers/druid/resource_cleanup.go | 222 ++++++++++++++++++++++++++ 2 files changed, 240 insertions(+), 86 deletions(-) create mode 100644 controllers/druid/resource_cleanup.go diff --git a/controllers/druid/handler.go b/controllers/druid/handler.go index 8fc251dc..e2c33d96 100644 --- a/controllers/druid/handler.go +++ b/controllers/druid/handler.go @@ -253,92 +253,24 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui } } - //update status and delete unwanted resources - updatedStatus := v1alpha1.DruidClusterStatus{} - - updatedStatus.StatefulSets = deleteUnusedResources(ctx, sdk, m, statefulSetNames, ls, - func() objectList { return &appsv1.StatefulSetList{} }, - func(listObj runtime.Object) []object { - items := listObj.(*appsv1.StatefulSetList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, emitEvents) - sort.Strings(updatedStatus.StatefulSets) - - updatedStatus.Deployments = deleteUnusedResources(ctx, sdk, m, deploymentNames, ls, - func() objectList { return &appsv1.DeploymentList{} }, - func(listObj runtime.Object) []object { - items := listObj.(*appsv1.DeploymentList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, emitEvents) - sort.Strings(updatedStatus.Deployments) - - updatedStatus.HPAutoScalers = deleteUnusedResources(ctx, sdk, m, hpaNames, ls, - func() objectList { return &autoscalev2.HorizontalPodAutoscalerList{} }, - func(listObj runtime.Object) []object { - items := listObj.(*autoscalev2.HorizontalPodAutoscalerList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, emitEvents) - sort.Strings(updatedStatus.HPAutoScalers) - - updatedStatus.Ingress = deleteUnusedResources(ctx, sdk, m, ingressNames, ls, - func() objectList { return &networkingv1.IngressList{} }, - func(listObj runtime.Object) []object { - items := listObj.(*networkingv1.IngressList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, emitEvents) - sort.Strings(updatedStatus.Ingress) - - updatedStatus.PodDisruptionBudgets = deleteUnusedResources(ctx, sdk, m, podDisruptionBudgetNames, ls, - func() objectList { return &policyv1.PodDisruptionBudgetList{} }, - func(listObj runtime.Object) []object { - items := listObj.(*policyv1.PodDisruptionBudgetList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, emitEvents) - sort.Strings(updatedStatus.PodDisruptionBudgets) - - updatedStatus.Services = deleteUnusedResources(ctx, sdk, m, serviceNames, ls, - func() objectList { return &v1.ServiceList{} }, - func(listObj runtime.Object) []object { - items := listObj.(*v1.ServiceList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, emitEvents) - sort.Strings(updatedStatus.Services) - - updatedStatus.ConfigMaps = deleteUnusedResources(ctx, sdk, m, configMapNames, ls, - func() objectList { return &v1.ConfigMapList{} }, - func(listObj runtime.Object) []object { - items := listObj.(*v1.ConfigMapList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, emitEvents) - sort.Strings(updatedStatus.ConfigMaps) + // update status and delete unwanted resources - CONSOLIDATED APPROACH + cleanupResult, err := deleteAllUnusedResources( + ctx, sdk, m, ls, + statefulSetNames, deploymentNames, serviceNames, configMapNames, + podDisruptionBudgetNames, hpaNames, ingressNames, emitEvents, + ) + if err != nil { + return err + } + + // Log any individual cleanup errors but continue processing + for _, cleanupErr := range cleanupResult.Errors { + logger.Error(cleanupErr, "Resource cleanup error", "name", m.Name, "namespace", m.Namespace) + emitEvents.EmitEventGeneric(m, "ResourceCleanupError", "", cleanupErr) + } + + // Use the consolidated status + updatedStatus := *cleanupResult.Status podList, _ := readers.List(ctx, sdk, m, makeLabelsForDruid(m), emitEvents, func() objectList { return &v1.PodList{} }, func(listObj runtime.Object) []object { items := listObj.(*v1.PodList).Items diff --git a/controllers/druid/resource_cleanup.go b/controllers/druid/resource_cleanup.go new file mode 100644 index 00000000..9376d06c --- /dev/null +++ b/controllers/druid/resource_cleanup.go @@ -0,0 +1,222 @@ +package druid + +import ( + "context" + "fmt" + "sort" + "sync" + + "github.com/datainfrahq/druid-operator/apis/druid/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + autoscalev2 "k8s.io/api/autoscaling/v2" + v1 "k8s.io/api/core/v1" + networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// ResourceConfig defines configuration for each resource type cleanup +type ResourceConfig struct { + Name string + ExpectedNames map[string]bool + EmptyListObjFn func() objectList + ItemsExtractorFn func(obj runtime.Object) []object +} + +// ResourceCleanupResult holds the result of cleaning up a specific resource type +type ResourceCleanupResult struct { + ResourceType string + SurvivorNames []string + Error error +} + +// ConsolidatedResourceCleanupResult holds all cleanup results +type ConsolidatedResourceCleanupResult struct { + Status *v1alpha1.DruidClusterStatus + Errors []error +} + +// deleteAllUnusedResources consolidates all resource cleanup operations into parallel execution +func deleteAllUnusedResources( + ctx context.Context, + sdk client.Client, + drd *v1alpha1.Druid, + selectorLabels map[string]string, + statefulSetNames map[string]bool, + deploymentNames map[string]bool, + serviceNames map[string]bool, + configMapNames map[string]bool, + podDisruptionBudgetNames map[string]bool, + hpaNames map[string]bool, + ingressNames map[string]bool, + emitEvents EventEmitter, +) (*ConsolidatedResourceCleanupResult, error) { + + // Define all resource types to clean up + resourceConfigs := []ResourceConfig{ + { + Name: "StatefulSets", + ExpectedNames: statefulSetNames, + EmptyListObjFn: func() objectList { return &appsv1.StatefulSetList{} }, + ItemsExtractorFn: func(listObj runtime.Object) []object { + items := listObj.(*appsv1.StatefulSetList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }, + }, + { + Name: "Deployments", + ExpectedNames: deploymentNames, + EmptyListObjFn: func() objectList { return &appsv1.DeploymentList{} }, + ItemsExtractorFn: func(listObj runtime.Object) []object { + items := listObj.(*appsv1.DeploymentList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }, + }, + { + Name: "Services", + ExpectedNames: serviceNames, + EmptyListObjFn: func() objectList { return &v1.ServiceList{} }, + ItemsExtractorFn: func(listObj runtime.Object) []object { + items := listObj.(*v1.ServiceList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }, + }, + { + Name: "ConfigMaps", + ExpectedNames: configMapNames, + EmptyListObjFn: func() objectList { return &v1.ConfigMapList{} }, + ItemsExtractorFn: func(listObj runtime.Object) []object { + items := listObj.(*v1.ConfigMapList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }, + }, + { + Name: "HPAutoScalers", + ExpectedNames: hpaNames, + EmptyListObjFn: func() objectList { return &autoscalev2.HorizontalPodAutoscalerList{} }, + ItemsExtractorFn: func(listObj runtime.Object) []object { + items := listObj.(*autoscalev2.HorizontalPodAutoscalerList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }, + }, + { + Name: "Ingress", + ExpectedNames: ingressNames, + EmptyListObjFn: func() objectList { return &networkingv1.IngressList{} }, + ItemsExtractorFn: func(listObj runtime.Object) []object { + items := listObj.(*networkingv1.IngressList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }, + }, + { + Name: "PodDisruptionBudgets", + ExpectedNames: podDisruptionBudgetNames, + EmptyListObjFn: func() objectList { return &policyv1.PodDisruptionBudgetList{} }, + ItemsExtractorFn: func(listObj runtime.Object) []object { + items := listObj.(*policyv1.PodDisruptionBudgetList).Items + result := make([]object, len(items)) + for i := 0; i < len(items); i++ { + result[i] = &items[i] + } + return result + }, + }, + } + + // Channel to collect results from parallel goroutines + resultChan := make(chan ResourceCleanupResult, len(resourceConfigs)) + var wg sync.WaitGroup + + // Launch parallel cleanup operations + for _, config := range resourceConfigs { + wg.Add(1) + go func(cfg ResourceConfig) { + defer wg.Done() + + // Call the existing deleteUnusedResources function for this resource type + survivors := deleteUnusedResources( + ctx, sdk, drd, cfg.ExpectedNames, selectorLabels, + cfg.EmptyListObjFn, cfg.ItemsExtractorFn, emitEvents, + ) + + // Send result to channel + resultChan <- ResourceCleanupResult{ + ResourceType: cfg.Name, + SurvivorNames: survivors, + Error: nil, // deleteUnusedResources doesn't return errors currently + } + }(config) + } + + // Wait for all goroutines to complete + wg.Wait() + close(resultChan) + + // Collect all results + status := &v1alpha1.DruidClusterStatus{} + var errors []error + + for result := range resultChan { + if result.Error != nil { + errors = append(errors, fmt.Errorf("failed to cleanup %s: %w", result.ResourceType, result.Error)) + continue + } + + // Assign results to appropriate status fields + switch result.ResourceType { + case "StatefulSets": + status.StatefulSets = result.SurvivorNames + case "Deployments": + status.Deployments = result.SurvivorNames + case "Services": + status.Services = result.SurvivorNames + case "ConfigMaps": + status.ConfigMaps = result.SurvivorNames + case "HPAutoScalers": + status.HPAutoScalers = result.SurvivorNames + case "Ingress": + status.Ingress = result.SurvivorNames + case "PodDisruptionBudgets": + status.PodDisruptionBudgets = result.SurvivorNames + } + } + + // Sort all result slices for consistency (matching original behavior) + sort.Strings(status.StatefulSets) + sort.Strings(status.Deployments) + sort.Strings(status.Services) + sort.Strings(status.ConfigMaps) + sort.Strings(status.HPAutoScalers) + sort.Strings(status.Ingress) + sort.Strings(status.PodDisruptionBudgets) + + return &ConsolidatedResourceCleanupResult{ + Status: status, + Errors: errors, + }, nil +} From a60fb84f58c95c670601809f80e37b3f4ee73fe9 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Mon, 19 May 2025 16:10:46 +0530 Subject: [PATCH 2/2] refactor and clean up --- controllers/druid/handler.go | 16 +- controllers/druid/resource_cleanup.go | 318 +++++++++++++------------- 2 files changed, 162 insertions(+), 172 deletions(-) diff --git a/controllers/druid/handler.go b/controllers/druid/handler.go index e2c33d96..afb34acf 100644 --- a/controllers/druid/handler.go +++ b/controllers/druid/handler.go @@ -253,22 +253,18 @@ func deployDruidCluster(ctx context.Context, sdk client.Client, m *v1alpha1.Drui } } - // update status and delete unwanted resources - CONSOLIDATED APPROACH - cleanupResult, err := deleteAllUnusedResources( - ctx, sdk, m, ls, + expectedResources := BuildResourceExpectations( statefulSetNames, deploymentNames, serviceNames, configMapNames, - podDisruptionBudgetNames, hpaNames, ingressNames, emitEvents, + podDisruptionBudgetNames, hpaNames, ingressNames, + ) + + cleanupResult, err := deleteAllUnusedResources( + ctx, sdk, m, ls, expectedResources, emitEvents, ) if err != nil { return err } - // Log any individual cleanup errors but continue processing - for _, cleanupErr := range cleanupResult.Errors { - logger.Error(cleanupErr, "Resource cleanup error", "name", m.Name, "namespace", m.Namespace) - emitEvents.EmitEventGeneric(m, "ResourceCleanupError", "", cleanupErr) - } - // Use the consolidated status updatedStatus := *cleanupResult.Status diff --git a/controllers/druid/resource_cleanup.go b/controllers/druid/resource_cleanup.go index 9376d06c..46495f58 100644 --- a/controllers/druid/resource_cleanup.go +++ b/controllers/druid/resource_cleanup.go @@ -3,6 +3,7 @@ package druid import ( "context" "fmt" + "reflect" "sort" "sync" @@ -12,24 +13,22 @@ import ( v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" policyv1 "k8s.io/api/policy/v1" - "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) -// ResourceConfig defines configuration for each resource type cleanup -type ResourceConfig struct { - Name string - ExpectedNames map[string]bool - EmptyListObjFn func() objectList - ItemsExtractorFn func(obj runtime.Object) []object -} +// Constants for resource type names to avoid repetition +const ( + ResourceTypeStatefulSets = "StatefulSets" + ResourceTypeDeployments = "Deployments" + ResourceTypeServices = "Services" + ResourceTypeConfigMaps = "ConfigMaps" + ResourceTypeHPAutoScalers = "HPAutoScalers" + ResourceTypeIngress = "Ingress" + ResourceTypePodDisruptionBudgets = "PodDisruptionBudgets" +) -// ResourceCleanupResult holds the result of cleaning up a specific resource type -type ResourceCleanupResult struct { - ResourceType string - SurvivorNames []string - Error error -} +// ResourceExpectations - Simple map of resource types to expected names +type ResourceExpectations map[string]map[string]bool // ConsolidatedResourceCleanupResult holds all cleanup results type ConsolidatedResourceCleanupResult struct { @@ -37,186 +36,181 @@ type ConsolidatedResourceCleanupResult struct { Errors []error } -// deleteAllUnusedResources consolidates all resource cleanup operations into parallel execution +// ResourceTypeConfig holds the configuration for each resource type +type ResourceTypeConfig struct { + Name string + CreateList func() client.ObjectList +} + +// deleteAllUnusedResources - THE SIMPLEST, CLEANEST VERSION func deleteAllUnusedResources( ctx context.Context, sdk client.Client, drd *v1alpha1.Druid, selectorLabels map[string]string, - statefulSetNames map[string]bool, - deploymentNames map[string]bool, - serviceNames map[string]bool, - configMapNames map[string]bool, - podDisruptionBudgetNames map[string]bool, - hpaNames map[string]bool, - ingressNames map[string]bool, + expectedResources ResourceExpectations, emitEvents EventEmitter, ) (*ConsolidatedResourceCleanupResult, error) { - // Define all resource types to clean up - resourceConfigs := []ResourceConfig{ - { - Name: "StatefulSets", - ExpectedNames: statefulSetNames, - EmptyListObjFn: func() objectList { return &appsv1.StatefulSetList{} }, - ItemsExtractorFn: func(listObj runtime.Object) []object { - items := listObj.(*appsv1.StatefulSetList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, - }, - { - Name: "Deployments", - ExpectedNames: deploymentNames, - EmptyListObjFn: func() objectList { return &appsv1.DeploymentList{} }, - ItemsExtractorFn: func(listObj runtime.Object) []object { - items := listObj.(*appsv1.DeploymentList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, - }, - { - Name: "Services", - ExpectedNames: serviceNames, - EmptyListObjFn: func() objectList { return &v1.ServiceList{} }, - ItemsExtractorFn: func(listObj runtime.Object) []object { - items := listObj.(*v1.ServiceList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, - }, - { - Name: "ConfigMaps", - ExpectedNames: configMapNames, - EmptyListObjFn: func() objectList { return &v1.ConfigMapList{} }, - ItemsExtractorFn: func(listObj runtime.Object) []object { - items := listObj.(*v1.ConfigMapList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, - }, - { - Name: "HPAutoScalers", - ExpectedNames: hpaNames, - EmptyListObjFn: func() objectList { return &autoscalev2.HorizontalPodAutoscalerList{} }, - ItemsExtractorFn: func(listObj runtime.Object) []object { - items := listObj.(*autoscalev2.HorizontalPodAutoscalerList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, - }, - { - Name: "Ingress", - ExpectedNames: ingressNames, - EmptyListObjFn: func() objectList { return &networkingv1.IngressList{} }, - ItemsExtractorFn: func(listObj runtime.Object) []object { - items := listObj.(*networkingv1.IngressList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, - }, - { - Name: "PodDisruptionBudgets", - ExpectedNames: podDisruptionBudgetNames, - EmptyListObjFn: func() objectList { return &policyv1.PodDisruptionBudgetList{} }, - ItemsExtractorFn: func(listObj runtime.Object) []object { - items := listObj.(*policyv1.PodDisruptionBudgetList).Items - result := make([]object, len(items)) - for i := 0; i < len(items); i++ { - result[i] = &items[i] - } - return result - }, - }, + // Define all resource types with proper type safety - NO MORE REPETITION! + resourceTypes := map[string]ResourceTypeConfig{ + ResourceTypeStatefulSets: {ResourceTypeStatefulSets, func() client.ObjectList { return &appsv1.StatefulSetList{} }}, + ResourceTypeDeployments: {ResourceTypeDeployments, func() client.ObjectList { return &appsv1.DeploymentList{} }}, + ResourceTypeServices: {ResourceTypeServices, func() client.ObjectList { return &v1.ServiceList{} }}, + ResourceTypeConfigMaps: {ResourceTypeConfigMaps, func() client.ObjectList { return &v1.ConfigMapList{} }}, + ResourceTypeHPAutoScalers: {ResourceTypeHPAutoScalers, func() client.ObjectList { return &autoscalev2.HorizontalPodAutoscalerList{} }}, + ResourceTypeIngress: {ResourceTypeIngress, func() client.ObjectList { return &networkingv1.IngressList{} }}, + ResourceTypePodDisruptionBudgets: {ResourceTypePodDisruptionBudgets, func() client.ObjectList { return &policyv1.PodDisruptionBudgetList{} }}, } - // Channel to collect results from parallel goroutines - resultChan := make(chan ResourceCleanupResult, len(resourceConfigs)) + status := &v1alpha1.DruidClusterStatus{} + resultChan := make(chan struct { + resourceType string + survivors []string + err error + }, len(resourceTypes)) var wg sync.WaitGroup - // Launch parallel cleanup operations - for _, config := range resourceConfigs { + // Process all resource types in parallel + for resourceType, config := range resourceTypes { wg.Add(1) - go func(cfg ResourceConfig) { + go func(resType string, cfg ResourceTypeConfig) { defer wg.Done() - // Call the existing deleteUnusedResources function for this resource type - survivors := deleteUnusedResources( - ctx, sdk, drd, cfg.ExpectedNames, selectorLabels, - cfg.EmptyListObjFn, cfg.ItemsExtractorFn, emitEvents, + // Get expected names, default to empty if not provided + expectedNames := expectedResources[resType] + if expectedNames == nil { + expectedNames = make(map[string]bool) + } + + // Generic cleanup + survivors, err := cleanupSingleResourceType( + ctx, sdk, drd, cfg, expectedNames, selectorLabels, emitEvents, ) - // Send result to channel - resultChan <- ResourceCleanupResult{ - ResourceType: cfg.Name, - SurvivorNames: survivors, - Error: nil, // deleteUnusedResources doesn't return errors currently - } - }(config) + resultChan <- struct { + resourceType string + survivors []string + err error + }{resType, survivors, err} + }(resourceType, config) } - // Wait for all goroutines to complete + // Wait and collect results wg.Wait() close(resultChan) - // Collect all results - status := &v1alpha1.DruidClusterStatus{} var errors []error - for result := range resultChan { - if result.Error != nil { - errors = append(errors, fmt.Errorf("failed to cleanup %s: %w", result.ResourceType, result.Error)) + if result.err != nil { + errors = append(errors, result.err) continue } - // Assign results to appropriate status fields - switch result.ResourceType { - case "StatefulSets": - status.StatefulSets = result.SurvivorNames - case "Deployments": - status.Deployments = result.SurvivorNames - case "Services": - status.Services = result.SurvivorNames - case "ConfigMaps": - status.ConfigMaps = result.SurvivorNames - case "HPAutoScalers": - status.HPAutoScalers = result.SurvivorNames - case "Ingress": - status.Ingress = result.SurvivorNames - case "PodDisruptionBudgets": - status.PodDisruptionBudgets = result.SurvivorNames + sort.Strings(result.survivors) + + // Update status fields + switch result.resourceType { + case ResourceTypeStatefulSets: + status.StatefulSets = result.survivors + case ResourceTypeDeployments: + status.Deployments = result.survivors + case ResourceTypeServices: + status.Services = result.survivors + case ResourceTypeConfigMaps: + status.ConfigMaps = result.survivors + case ResourceTypeHPAutoScalers: + status.HPAutoScalers = result.survivors + case ResourceTypeIngress: + status.Ingress = result.survivors + case ResourceTypePodDisruptionBudgets: + status.PodDisruptionBudgets = result.survivors } } - // Sort all result slices for consistency (matching original behavior) - sort.Strings(status.StatefulSets) - sort.Strings(status.Deployments) - sort.Strings(status.Services) - sort.Strings(status.ConfigMaps) - sort.Strings(status.HPAutoScalers) - sort.Strings(status.Ingress) - sort.Strings(status.PodDisruptionBudgets) - return &ConsolidatedResourceCleanupResult{ Status: status, Errors: errors, }, nil } + +// Generic cleanup for any resource type +func cleanupSingleResourceType( + ctx context.Context, + sdk client.Client, + drd *v1alpha1.Druid, + config ResourceTypeConfig, + expectedNames map[string]bool, + selectorLabels map[string]string, + emitEvents EventEmitter, +) ([]string, error) { + + // Create list object with proper type safety + listObj := config.CreateList() + + // List resources + listOpts := []client.ListOption{ + client.InNamespace(drd.Namespace), + client.MatchingLabels(selectorLabels), + } + + if err := sdk.List(ctx, listObj, listOpts...); err != nil { + return nil, fmt.Errorf("failed to list %s: %w", config.Name, err) + } + + // Extract items using reflection (still needed to be generic across types) + items := extractItemsFromList(listObj) + survivorNames := make([]string, 0, len(expectedNames)) + + for _, item := range items { + itemMeta := item.(client.Object) + name := itemMeta.GetName() + + if !expectedNames[name] { + // Delete unexpected resource + if err := writers.Delete(ctx, sdk, drd, item.(object), emitEvents, &client.DeleteOptions{}); err != nil { + survivorNames = append(survivorNames, name) // Failed to delete, so it's a survivor + } + } else { + // Keep expected resource + survivorNames = append(survivorNames, name) + } + } + + return survivorNames, nil +} + +// Extract items from any Kubernetes list object using reflection +func extractItemsFromList(listObj client.ObjectList) []interface{} { + // Use reflection to get the Items field from any list type + listValue := reflect.ValueOf(listObj).Elem() + itemsField := listValue.FieldByName("Items") + + if !itemsField.IsValid() { + return nil + } + + items := make([]interface{}, itemsField.Len()) + for i := 0; i < itemsField.Len(); i++ { + // Get pointer to the item + itemValue := itemsField.Index(i) + items[i] = itemValue.Addr().Interface() + } + return items +} + +// Helper to build ResourceExpectations from existing variables +func BuildResourceExpectations( + statefulSetNames, deploymentNames, serviceNames, configMapNames, + podDisruptionBudgetNames, hpaNames, ingressNames map[string]bool, +) ResourceExpectations { + return ResourceExpectations{ + ResourceTypeStatefulSets: statefulSetNames, + ResourceTypeDeployments: deploymentNames, + ResourceTypeServices: serviceNames, + ResourceTypeConfigMaps: configMapNames, + ResourceTypePodDisruptionBudgets: podDisruptionBudgetNames, + ResourceTypeHPAutoScalers: hpaNames, + ResourceTypeIngress: ingressNames, + } +}