diff --git a/docs/IndexIngestionSeparation.md b/docs/IndexIngestionSeparation.md index dd53922ff..63da30333 100644 --- a/docs/IndexIngestionSeparation.md +++ b/docs/IndexIngestionSeparation.md @@ -1,3 +1,9 @@ +--- +title: Index and Ingestion Separation +parent: Deploy & Configure +nav_order: 6 +--- + # Background Separation between ingestion and indexing services within Splunk Operator for Kubernetes enables the operator to independently manage the ingestion service while maintaining seamless integration with the indexing service. @@ -10,7 +16,7 @@ This separation enables: # Important Note > [!WARNING] -> **As of now, only brand new deployments are supported for Index and Ingestion Separation. No migration path is implemented, described or tested for existing deployments to move from a standard model to Index & Ingestion separation model.** +> **For customers deploying SmartBus on CMP, the Splunk Operator for Kubernetes (SOK) manages the configuration and lifecycle of the ingestor tier. The following SOK guide provides implementation details for setting up ingestion separation and integrating with existing indexers. This reference is primarily intended for CMP users leveraging SOK-managed ingestors.** # Document Variables @@ -40,7 +46,7 @@ SQS message bus inputs can be found in the table below. | largeMessageStorePath | string | S3 path for Large Message Store | | deadLetterQueueName | string | Name of the SQS dead letter queue | -Change of any of the bus inputs does not restart Splunk. It just updates the config values with no disruptions. +**First provisioning or update of any of the bus inputs requires Ingestor Cluster and Indexer Cluster Splunkd restart, but this restart is implemented automatically and done by SOK.** ## Example ``` @@ -376,6 +382,14 @@ In the following example, the dashboard presents ingestion and indexing data in - [kube-prometheus-stack](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack) +# App Installation for Ingestor Cluster Instances + +Application installation is supported for Ingestor Cluster instances. However, as of now, applications are installed using local scope and if any application requires Splunk restart, there is no automated way to detect it and trigger automatically via Splunk Operator. + +Therefore, to be able to enforce Splunk restart for each of the Ingestor Cluster pods, it is recommended to add/update IngestorCluster CR annotations/labels and apply the new configuration which will trigger the rolling restart of Splunk pods for Ingestor Cluster. + +We are under the investigation on how to make it fully automated. What is more, ideally, update of annotations and labels should not trigger pod restart at all and we are investigating on how to fix this behaviour eventually. + # Example 1. Install CRDs and Splunk Operator for Kubernetes. diff --git a/pkg/splunk/enterprise/indexercluster.go b/pkg/splunk/enterprise/indexercluster.go index 74b1b0a91..7800bd236 100644 --- a/pkg/splunk/enterprise/indexercluster.go +++ b/pkg/splunk/enterprise/indexercluster.go @@ -274,6 +274,15 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller } cr.Status.BusConfiguration = busConfig.Spec + + for i := int32(0); i < cr.Spec.Replicas; i++ { + idxcClient := mgr.getClient(ctx, i) + err = idxcClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "indexer", i) + } } } @@ -565,6 +574,15 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient, } cr.Status.BusConfiguration = busConfig.Spec + + for i := int32(0); i < cr.Spec.Replicas; i++ { + idxcClient := mgr.getClient(ctx, i) + err = idxcClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "indexer", i) + } } } diff --git a/pkg/splunk/enterprise/ingestorcluster.go b/pkg/splunk/enterprise/ingestorcluster.go index 4f96f05bc..835240996 100644 --- a/pkg/splunk/enterprise/ingestorcluster.go +++ b/pkg/splunk/enterprise/ingestorcluster.go @@ -228,7 +228,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr // If bus config is updated if !reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) { - mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient) + mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client) err = mgr.handlePushBusChange(ctx, cr, busConfig, client) if err != nil { @@ -238,6 +238,15 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr } cr.Status.BusConfiguration = busConfig.Spec + + for i := int32(0); i < cr.Spec.Replicas; i++ { + ingClient := mgr.getClient(ctx, i) + err = ingClient.RestartSplunk() + if err != nil { + return result, err + } + scopedLog.Info("Restarted splunk", "ingestor", i) + } } // Upgrade fron automated MC to MC CRD @@ -280,6 +289,27 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr return result, nil } +// getClient for ingestorClusterPodManager returns a SplunkClient for the member n +func (mgr *ingestorClusterPodManager) getClient(ctx context.Context, n int32) *splclient.SplunkClient { + reqLogger := log.FromContext(ctx) + scopedLog := reqLogger.WithName("ingestorClusterPodManager.getClient").WithValues("name", mgr.cr.GetName(), "namespace", mgr.cr.GetNamespace()) + + // Get Pod Name + memberName := GetSplunkStatefulsetPodName(SplunkIngestor, mgr.cr.GetName(), n) + + // Get Fully Qualified Domain Name + fqdnName := splcommon.GetServiceFQDN(mgr.cr.GetNamespace(), + fmt.Sprintf("%s.%s", memberName, GetSplunkServiceName(SplunkIngestor, mgr.cr.GetName(), true))) + + // Retrieve admin password from Pod + adminPwd, err := splutil.GetSpecificSecretTokenFromPod(ctx, mgr.c, memberName, mgr.cr.GetNamespace(), "password") + if err != nil { + scopedLog.Error(err, "Couldn't retrieve the admin password from pod") + } + + return mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", adminPwd) +} + // validateIngestorClusterSpec checks validity and makes default updates to a IngestorClusterSpec and returns error if something is wrong func validateIngestorClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IngestorCluster) error { // We cannot have 0 replicas in IngestorCluster spec since this refers to number of ingestion pods in an ingestor cluster @@ -372,6 +402,7 @@ func getChangedBusFieldsForIngestor(busConfig *enterpriseApi.BusConfiguration, b } type ingestorClusterPodManager struct { + c splcommon.ControllerClient log logr.Logger cr *enterpriseApi.IngestorCluster secrets *corev1.Secret @@ -379,12 +410,13 @@ type ingestorClusterPodManager struct { } // newIngestorClusterPodManager function to create pod manager this is added to write unit test case -var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) ingestorClusterPodManager { +var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ log: log, cr: cr, secrets: secret, newSplunkClient: newSplunkClient, + c: c, } } diff --git a/pkg/splunk/enterprise/ingestorcluster_test.go b/pkg/splunk/enterprise/ingestorcluster_test.go index bee3df4d6..8bd0ee20c 100644 --- a/pkg/splunk/enterprise/ingestorcluster_test.go +++ b/pkg/splunk/enterprise/ingestorcluster_test.go @@ -25,15 +25,14 @@ import ( "github.com/go-logr/logr" enterpriseApi "github.com/splunk/splunk-operator/api/v4" splclient "github.com/splunk/splunk-operator/pkg/splunk/client" + splcommon "github.com/splunk/splunk-operator/pkg/splunk/common" spltest "github.com/splunk/splunk-operator/pkg/splunk/test" splutil "github.com/splunk/splunk-operator/pkg/splunk/util" "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client/fake" ) func init() { @@ -56,11 +55,7 @@ func TestApplyIngestorCluster(t *testing.T) { ctx := context.TODO() - scheme := runtime.NewScheme() - _ = enterpriseApi.AddToScheme(scheme) - _ = corev1.AddToScheme(scheme) - _ = appsv1.AddToScheme(scheme) - c := fake.NewClientBuilder().WithScheme(scheme).Build() + c := spltest.NewMockClient() // Object definitions busConfig := &enterpriseApi.BusConfiguration{ @@ -250,8 +245,9 @@ func TestApplyIngestorCluster(t *testing.T) { // outputs.conf origNew := newIngestorClusterPodManager mockHTTPClient := &spltest.MockHTTPClient{} - newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc) ingestorClusterPodManager { + newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager { return ingestorClusterPodManager{ + c: c, log: l, cr: cr, secrets: secret, newSplunkClient: func(uri, user, pass string) *splclient.SplunkClient { return &splclient.SplunkClient{ManagementURI: uri, Username: user, Password: pass, Client: mockHTTPClient} diff --git a/pkg/splunk/enterprise/util_test.go b/pkg/splunk/enterprise/util_test.go index f5405b2cf..32948f15f 100644 --- a/pkg/splunk/enterprise/util_test.go +++ b/pkg/splunk/enterprise/util_test.go @@ -2624,6 +2624,8 @@ func TestUpdateCRStatus(t *testing.T) { WithStatusSubresource(&enterpriseApi.Standalone{}). WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). + WithStatusSubresource(&enterpriseApi.BusConfiguration{}). + WithStatusSubresource(&enterpriseApi.IngestorCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}) c := builder.Build() ctx := context.TODO() @@ -3302,9 +3304,11 @@ func TestGetCurrentImage(t *testing.T) { WithStatusSubresource(&enterpriseApi.ClusterManager{}). WithStatusSubresource(&enterpriseApi.Standalone{}). WithStatusSubresource(&enterpriseApi.MonitoringConsole{}). + WithStatusSubresource(&enterpriseApi.BusConfiguration{}). WithStatusSubresource(&enterpriseApi.IndexerCluster{}). WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}). WithStatusSubresource(&enterpriseApi.IngestorCluster{}) + client := builder.Build() client.Create(ctx, ¤t) _, err := ApplyClusterManager(ctx, client, ¤t) diff --git a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go index 8bccddb47..e5d7e1647 100644 --- a/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go +++ b/test/index_and_ingestion_separation/index_and_ingestion_separation_test.go @@ -384,14 +384,6 @@ var _ = Describe("indingsep test", func() { err = deployment.UpdateCR(ctx, bus) Expect(err).To(Succeed(), "Unable to deploy Bus Configuration with updated CR") - // Ensure that Ingestor Cluster has not been restarted - testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster has not been restarted") - testenv.IngestorReady(ctx, deployment, testcaseEnvInst) - - // Ensure that Indexer Cluster has not been restarted - testcaseEnvInst.Log.Info("Ensure that Indexer Cluster has not been restarted") - testenv.SingleSiteIndexersReady(ctx, deployment, testcaseEnvInst) - // Get instance of current Ingestor Cluster CR with latest config testcaseEnvInst.Log.Info("Get instance of current Ingestor Cluster CR with latest config") ingest := &enterpriseApi.IngestorCluster{}