From 1441aa851895f5c196fbc5d1138cf1e07f71cb77 Mon Sep 17 00:00:00 2001 From: Logan Seeley Date: Thu, 22 Jan 2026 11:57:15 -0300 Subject: [PATCH] Add topic get/set replication clusters commands --- pkg/ctl/topic/get_replication_clusters.go | 82 ++++++++++++++++ pkg/ctl/topic/replication_clusters_test.go | 107 ++++++++++++++++++++ pkg/ctl/topic/set_replication_clusters.go | 108 +++++++++++++++++++++ pkg/ctl/topic/topic.go | 2 + 4 files changed, 299 insertions(+) create mode 100644 pkg/ctl/topic/get_replication_clusters.go create mode 100644 pkg/ctl/topic/replication_clusters_test.go create mode 100644 pkg/ctl/topic/set_replication_clusters.go diff --git a/pkg/ctl/topic/get_replication_clusters.go b/pkg/ctl/topic/get_replication_clusters.go new file mode 100644 index 00000000..87229afc --- /dev/null +++ b/pkg/ctl/topic/get_replication_clusters.go @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + + "github.com/streamnative/pulsarctl/pkg/cmdutils" +) + +func GetReplicationClustersCmd(vc *cmdutils.VerbCmd) { + desc := cmdutils.LongDescription{} + desc.CommandUsedFor = "Get the replication clusters for a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + getClusters := cmdutils.Example{ + Desc: "Get the replication clusters for a topic", + Command: "pulsarctl topics get-replication-clusters persistent://tenant/namespace/topic", + } + examples = append(examples, getClusters) + desc.CommandExamples = examples + + var out []cmdutils.Output + successOut := cmdutils.Output{ + Desc: "normal output", + Out: "[\"cluster1\", \"cluster2\"]", + } + + out = append(out, successOut, ArgError) + out = append(out, TopicNameErrors...) + out = append(out, NamespaceErrors...) + desc.CommandOutput = out + + vc.SetDescription( + "get-replication-clusters", + "Get the replication clusters for a topic", + desc.ToString(), + desc.ExampleToString(), + "get-replication-clusters", + ) + + vc.SetRunFuncWithNameArg(func() error { + return doGetReplicationClusters(vc) + }, "the topic name is not specified or the topic name is specified more than one") + + vc.EnableOutputFlagSet() +} + +func doGetReplicationClusters(vc *cmdutils.VerbCmd) error { + // for testing + if vc.NameError != nil { + return vc.NameError + } + + topic, err := utils.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + clusters, err := admin.Topics().GetReplicationClusters(*topic) + if err == nil { + cmdutils.PrintJSON(vc.Command.OutOrStdout(), &clusters) + } + return err +} diff --git a/pkg/ctl/topic/replication_clusters_test.go b/pkg/ctl/topic/replication_clusters_test.go new file mode 100644 index 00000000..ed6356c4 --- /dev/null +++ b/pkg/ctl/topic/replication_clusters_test.go @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/streamnative/pulsarctl/pkg/ctl/cluster" + "github.com/streamnative/pulsarctl/pkg/ctl/tenant" + "github.com/streamnative/pulsarctl/pkg/test" + + "github.com/stretchr/testify/assert" +) + +func TestReplicationClustersCmd(t *testing.T) { + topicName := fmt.Sprintf("persistent://public/default/test-replication-clusters-topic-%s", test.RandomSuffix()) + + // Create the topic first + args := []string{"create", topicName, "0"} + _, execErr, _, err := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, err) + assert.Nil(t, execErr) + + // Create a test cluster for replication + clusterArgs := []string{"create", "test-replication-cluster", "--url", "http://192.168.12.11:8080"} + _, execErr, _, err = cluster.TestClusterCommands(cluster.CreateClusterCmd, clusterArgs) + assert.Nil(t, err) + assert.Nil(t, execErr) + + // Update tenant to allow the new cluster + updateTenantArgs := []string{"update", "--allowed-clusters", "test-replication-cluster", + "--allowed-clusters", "standalone", "public"} + _, execErr, _, err = tenant.TestTenantCommands(tenant.UpdateTenantCmd, updateTenantArgs) + assert.Nil(t, err) + assert.Nil(t, execErr) + + // Set replication clusters for the topic + setArgs := []string{"set-replication-clusters", topicName, "--clusters", "test-replication-cluster"} + setOut, execErr, _, _ := TestTopicCommands(SetReplicationClustersCmd, setArgs) + assert.Nil(t, execErr) + assert.True(t, strings.Contains(setOut.String(), "Set replication clusters successfully")) + + // Get replication clusters for the topic + getArgs := []string{"get-replication-clusters", topicName} + getOut, execErr, _, _ := TestTopicCommands(GetReplicationClustersCmd, getArgs) + assert.Nil(t, execErr) + + var clusters []string + err = json.Unmarshal(getOut.Bytes(), &clusters) + assert.Nil(t, err) + assert.Contains(t, clusters, "test-replication-cluster") + + // Reset tenant clusters for other test cases + updateTenantArgs = []string{"update", "--allowed-clusters", "standalone", "public"} + _, execErr, _, err = tenant.TestTenantCommands(tenant.UpdateTenantCmd, updateTenantArgs) + assert.Nil(t, err) + assert.Nil(t, execErr) +} + +func TestSetReplicationClustersArgError(t *testing.T) { + // Test with no topic name + args := []string{"set-replication-clusters", "--clusters", "standalone"} + _, _, nameErr, _ := TestTopicCommands(SetReplicationClustersCmd, args) + assert.NotNil(t, nameErr) + assert.Equal(t, "the topic name is not specified or the topic name is specified more than one", nameErr.Error()) +} + +func TestGetReplicationClustersArgError(t *testing.T) { + // Test with no topic name + args := []string{"get-replication-clusters"} + _, _, nameErr, _ := TestTopicCommands(GetReplicationClustersCmd, args) + assert.NotNil(t, nameErr) + assert.Equal(t, "the topic name is not specified or the topic name is specified more than one", nameErr.Error()) +} + +func TestSetReplicationClustersInvalidCluster(t *testing.T) { + topicName := fmt.Sprintf("persistent://public/default/test-invalid-cluster-topic-%s", test.RandomSuffix()) + + // Create the topic first + args := []string{"create", topicName, "0"} + _, execErr, _, err := TestTopicCommands(CreateTopicCmd, args) + assert.Nil(t, err) + assert.Nil(t, execErr) + + // Try to set an invalid cluster + setArgs := []string{"set-replication-clusters", topicName, "--clusters", "invalid-cluster"} + _, execErr, _, _ = TestTopicCommands(SetReplicationClustersCmd, setArgs) + assert.NotNil(t, execErr) +} diff --git a/pkg/ctl/topic/set_replication_clusters.go b/pkg/ctl/topic/set_replication_clusters.go new file mode 100644 index 00000000..f297cec4 --- /dev/null +++ b/pkg/ctl/topic/set_replication_clusters.go @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package topic + +import ( + "strings" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + "github.com/streamnative/pulsarctl/pkg/cmdutils" +) + +func SetReplicationClustersCmd(vc *cmdutils.VerbCmd) { + desc := cmdutils.LongDescription{} + desc.CommandUsedFor = "Set the replication clusters for a topic" + desc.CommandPermission = "This command requires tenant admin permissions." + + var examples []cmdutils.Example + setClusters := cmdutils.Example{ + Desc: "Set the replication clusters for a topic", + Command: "pulsarctl topics set-replication-clusters persistent://tenant/namespace/topic --clusters cluster1,cluster2", + } + + examples = append(examples, setClusters) + desc.CommandExamples = examples + + var out []cmdutils.Output + successOut := cmdutils.Output{ + Desc: "normal output", + Out: "Set replication clusters successfully for [topic]", + } + + out = append(out, successOut, ArgError) + out = append(out, TopicNameErrors...) + out = append(out, NamespaceErrors...) + + invalidClustersName := cmdutils.Output{ + Desc: "Invalid cluster name, please check if your cluster name has the appropriate " + + "permissions under the current tenant", + Out: "[✖] code: 403 reason: Invalid cluster id: ", + } + + out = append(out, invalidClustersName, TopicLevelPolicyNotEnabledError) + desc.CommandOutput = out + + vc.SetDescription( + "set-replication-clusters", + "Set the replication clusters for a topic", + desc.ToString(), + desc.ExampleToString(), + "set-replication-clusters", + ) + + var clusterIDs string + + vc.SetRunFuncWithNameArg(func() error { + return doSetReplicationClusters(vc, clusterIDs) + }, "the topic name is not specified or the topic name is specified more than one") + + vc.FlagSetGroup.InFlagSet("ReplicationClusters", func(flagSet *pflag.FlagSet) { + flagSet.StringVarP( + &clusterIDs, + "clusters", + "c", + "", + "Replication Cluster Ids list (comma separated values)") + + _ = cobra.MarkFlagRequired(flagSet, "clusters") + }) + vc.EnableOutputFlagSet() +} + +func doSetReplicationClusters(vc *cmdutils.VerbCmd, clusterIDs string) error { + // for testing + if vc.NameError != nil { + return vc.NameError + } + + topic, err := utils.GetTopicName(vc.NameArg) + if err != nil { + return err + } + + admin := cmdutils.NewPulsarClient() + clusters := strings.Split(clusterIDs, ",") + err = admin.Topics().SetReplicationClusters(*topic, clusters) + if err == nil { + vc.Command.Printf("Set replication clusters successfully for [%s]\n", topic.String()) + } + return err +} diff --git a/pkg/ctl/topic/topic.go b/pkg/ctl/topic/topic.go index 7d88b159..02318998 100644 --- a/pkg/ctl/topic/topic.go +++ b/pkg/ctl/topic/topic.go @@ -95,6 +95,8 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command { RemoveInactiveTopicCmd, SetDispatchRateCmd, RemoveDispatchRateCmd, + GetReplicationClustersCmd, + SetReplicationClustersCmd, } cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)