Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions pkg/ctl/topic/get_replication_clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func GetReplicationClustersCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Get the replication clusters for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
getClusters := cmdutils.Example{
Desc: "Get the replication clusters for a topic",
Command: "pulsarctl topics get-replication-clusters persistent://tenant/namespace/topic",
}
examples = append(examples, getClusters)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "[\"cluster1\", \"cluster2\"]",
}

out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)
desc.CommandOutput = out

vc.SetDescription(
"get-replication-clusters",
"Get the replication clusters for a topic",
desc.ToString(),
desc.ExampleToString(),
"get-replication-clusters",
)

vc.SetRunFuncWithNameArg(func() error {
return doGetReplicationClusters(vc)
}, "the topic name is not specified or the topic name is specified more than one")

vc.EnableOutputFlagSet()
}

func doGetReplicationClusters(vc *cmdutils.VerbCmd) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
clusters, err := admin.Topics().GetReplicationClusters(*topic)
if err == nil {
cmdutils.PrintJSON(vc.Command.OutOrStdout(), &clusters)
}
return err
}
107 changes: 107 additions & 0 deletions pkg/ctl/topic/replication_clusters_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"encoding/json"
"fmt"
"strings"
"testing"

"github.com/streamnative/pulsarctl/pkg/ctl/cluster"
"github.com/streamnative/pulsarctl/pkg/ctl/tenant"
"github.com/streamnative/pulsarctl/pkg/test"

"github.com/stretchr/testify/assert"
)

func TestReplicationClustersCmd(t *testing.T) {
topicName := fmt.Sprintf("persistent://public/default/test-replication-clusters-topic-%s", test.RandomSuffix())

// Create the topic first
args := []string{"create", topicName, "0"}
_, execErr, _, err := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, err)
assert.Nil(t, execErr)

// Create a test cluster for replication
clusterArgs := []string{"create", "test-replication-cluster", "--url", "http://192.168.12.11:8080"}
_, execErr, _, err = cluster.TestClusterCommands(cluster.CreateClusterCmd, clusterArgs)
assert.Nil(t, err)
assert.Nil(t, execErr)

// Update tenant to allow the new cluster
updateTenantArgs := []string{"update", "--allowed-clusters", "test-replication-cluster",
"--allowed-clusters", "standalone", "public"}
_, execErr, _, err = tenant.TestTenantCommands(tenant.UpdateTenantCmd, updateTenantArgs)
assert.Nil(t, err)
assert.Nil(t, execErr)

// Set replication clusters for the topic
setArgs := []string{"set-replication-clusters", topicName, "--clusters", "test-replication-cluster"}
setOut, execErr, _, _ := TestTopicCommands(SetReplicationClustersCmd, setArgs)
assert.Nil(t, execErr)
assert.True(t, strings.Contains(setOut.String(), "Set replication clusters successfully"))

// Get replication clusters for the topic
getArgs := []string{"get-replication-clusters", topicName}
getOut, execErr, _, _ := TestTopicCommands(GetReplicationClustersCmd, getArgs)
assert.Nil(t, execErr)

var clusters []string
err = json.Unmarshal(getOut.Bytes(), &clusters)
assert.Nil(t, err)
assert.Contains(t, clusters, "test-replication-cluster")

// Reset tenant clusters for other test cases
updateTenantArgs = []string{"update", "--allowed-clusters", "standalone", "public"}
_, execErr, _, err = tenant.TestTenantCommands(tenant.UpdateTenantCmd, updateTenantArgs)
assert.Nil(t, err)
assert.Nil(t, execErr)
}

func TestSetReplicationClustersArgError(t *testing.T) {
// Test with no topic name
args := []string{"set-replication-clusters", "--clusters", "standalone"}
_, _, nameErr, _ := TestTopicCommands(SetReplicationClustersCmd, args)
assert.NotNil(t, nameErr)
assert.Equal(t, "the topic name is not specified or the topic name is specified more than one", nameErr.Error())
}

func TestGetReplicationClustersArgError(t *testing.T) {
// Test with no topic name
args := []string{"get-replication-clusters"}
_, _, nameErr, _ := TestTopicCommands(GetReplicationClustersCmd, args)
assert.NotNil(t, nameErr)
assert.Equal(t, "the topic name is not specified or the topic name is specified more than one", nameErr.Error())
}

func TestSetReplicationClustersInvalidCluster(t *testing.T) {
topicName := fmt.Sprintf("persistent://public/default/test-invalid-cluster-topic-%s", test.RandomSuffix())

// Create the topic first
args := []string{"create", topicName, "0"}
_, execErr, _, err := TestTopicCommands(CreateTopicCmd, args)
assert.Nil(t, err)
assert.Nil(t, execErr)

// Try to set an invalid cluster
setArgs := []string{"set-replication-clusters", topicName, "--clusters", "invalid-cluster"}
_, execErr, _, _ = TestTopicCommands(SetReplicationClustersCmd, setArgs)
assert.NotNil(t, execErr)
}
108 changes: 108 additions & 0 deletions pkg/ctl/topic/set_replication_clusters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package topic

import (
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

"github.com/streamnative/pulsarctl/pkg/cmdutils"
)

func SetReplicationClustersCmd(vc *cmdutils.VerbCmd) {
desc := cmdutils.LongDescription{}
desc.CommandUsedFor = "Set the replication clusters for a topic"
desc.CommandPermission = "This command requires tenant admin permissions."

var examples []cmdutils.Example
setClusters := cmdutils.Example{
Desc: "Set the replication clusters for a topic",
Command: "pulsarctl topics set-replication-clusters persistent://tenant/namespace/topic --clusters cluster1,cluster2",
}

examples = append(examples, setClusters)
desc.CommandExamples = examples

var out []cmdutils.Output
successOut := cmdutils.Output{
Desc: "normal output",
Out: "Set replication clusters successfully for [topic]",
}

out = append(out, successOut, ArgError)
out = append(out, TopicNameErrors...)
out = append(out, NamespaceErrors...)

invalidClustersName := cmdutils.Output{
Desc: "Invalid cluster name, please check if your cluster name has the appropriate " +
"permissions under the current tenant",
Out: "[✖] code: 403 reason: Invalid cluster id: <cluster-name>",
}

out = append(out, invalidClustersName, TopicLevelPolicyNotEnabledError)
desc.CommandOutput = out

vc.SetDescription(
"set-replication-clusters",
"Set the replication clusters for a topic",
desc.ToString(),
desc.ExampleToString(),
"set-replication-clusters",
)

var clusterIDs string

vc.SetRunFuncWithNameArg(func() error {
return doSetReplicationClusters(vc, clusterIDs)
}, "the topic name is not specified or the topic name is specified more than one")

vc.FlagSetGroup.InFlagSet("ReplicationClusters", func(flagSet *pflag.FlagSet) {
flagSet.StringVarP(
&clusterIDs,
"clusters",
"c",
"",
"Replication Cluster Ids list (comma separated values)")

_ = cobra.MarkFlagRequired(flagSet, "clusters")
})
vc.EnableOutputFlagSet()
}

func doSetReplicationClusters(vc *cmdutils.VerbCmd, clusterIDs string) error {
// for testing
if vc.NameError != nil {
return vc.NameError
}

topic, err := utils.GetTopicName(vc.NameArg)
if err != nil {
return err
}

admin := cmdutils.NewPulsarClient()
clusters := strings.Split(clusterIDs, ",")
err = admin.Topics().SetReplicationClusters(*topic, clusters)
if err == nil {
vc.Command.Printf("Set replication clusters successfully for [%s]\n", topic.String())
}
return err
}
2 changes: 2 additions & 0 deletions pkg/ctl/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func Command(flagGrouping *cmdutils.FlagGrouping) *cobra.Command {
RemoveInactiveTopicCmd,
SetDispatchRateCmd,
RemoveDispatchRateCmd,
GetReplicationClustersCmd,
SetReplicationClustersCmd,
}

cmdutils.AddVerbCmds(flagGrouping, resourceCmd, commands...)
Expand Down
Loading