diff --git a/common/model/consts.go b/common/model/consts.go index 3aa625e..911b69d 100644 --- a/common/model/consts.go +++ b/common/model/consts.go @@ -49,6 +49,8 @@ const ( CommandInstallBiz = "installBiz" // CommandUnInstallBiz uninstalls a business module CommandUnInstallBiz = "uninstallBiz" + // CommandBatchInstallBiz batch install biz, since koupleless-runtime 1.4.1 + CommandBatchInstallBiz = "batchInstallBiz" ) // MQTT topic patterns for base communication @@ -65,6 +67,8 @@ const ( BaseAllBizTopic = "koupleless_%s/%s/base/biz" // BaseBizOperationResponseTopic for business operation responses, p2p mode BaseBizOperationResponseTopic = "koupleless_%s/%s/base/bizOperation" + // BaseBatchInstallBizResponseTopic for response of batch install biz, p2p mode, since koupleless-runtime 1.4.1 + BaseBatchInstallBizResponseTopic = "koupleless_%s/%s/base/batchInstallBizResponse" // BaseBaselineResponseTopic for baseline responses, p2p mode BaseBaselineResponseTopic = "koupleless_%s/%s/base/baseline" ) diff --git a/common/model/model.go b/common/model/model.go index f4d46a6..ad72b17 100644 --- a/common/model/model.go +++ b/common/model/model.go @@ -31,10 +31,15 @@ type BaseStatus struct { // BizOperationResponse represents the response from a business operation type BizOperationResponse struct { - Command string `json:"command"` // Operation command executed - BizName string `json:"bizName"` // ClusterName of the business - BizVersion string `json:"bizVersion"` // Version of the business - Response ark_service.ArkResponse `json:"response"` // Response from ark service + Command string `json:"command"` // Operation command executed + BizName string `json:"bizName"` // ClusterName of the business + BizVersion string `json:"bizVersion"` // Version of the business + Response ark_service.ArkResponse[ark.ArkResponseData] `json:"response"` // Response from ark service +} + +type BatchInstallBizResponse struct { + Command string `json:"command"` // Operation command executed + Response ark_service.ArkResponse[ark.ArkBatchInstallResponse] `json:"response"` // Response from ark service } // QueryBaselineRequest is the request parameters of query baseline func diff --git a/controller/module_deployment_controller/module_deployment_controller.go b/controller/module_deployment_controller/module_deployment_controller.go index 4fe3ff8..b03efeb 100644 --- a/controller/module_deployment_controller/module_deployment_controller.go +++ b/controller/module_deployment_controller/module_deployment_controller.go @@ -36,8 +36,6 @@ type ModuleDeploymentController struct { cache cache.Cache // The cache for storing and retrieving Kubernetes objects. - runtimeStorage *RuntimeInfoStore // Storage for runtime information about deployments and nodes. - updateToken chan interface{} // A channel for signaling updates. } @@ -51,9 +49,8 @@ func (moduleDeploymentController *ModuleDeploymentController) Reconcile(ctx cont // NewModuleDeploymentController creates a new instance of the controller. func NewModuleDeploymentController(env string) (*ModuleDeploymentController, error) { return &ModuleDeploymentController{ - env: env, - runtimeStorage: NewRuntimeInfoStore(), - updateToken: make(chan interface{}, 1), + env: env, + updateToken: make(chan interface{}, 1), }, nil } @@ -100,25 +97,17 @@ func (moduleDeploymentController *ModuleDeploymentController) SetupWithManager(c return } - for _, vnode := range vnodeList.Items { - // no deployment, just add - moduleDeploymentController.runtimeStorage.PutNode(vnode.DeepCopy()) - } - // init deployments depList := &appsv1.DeploymentList{} err = moduleDeploymentController.cache.List(ctx, depList, &client.ListOptions{ LabelSelector: deploymentSelector, }) + if err != nil { err = errors.New("failed to list deployments") return } - for _, deployment := range depList.Items { - moduleDeploymentController.runtimeStorage.PutDeployment(deployment) - } - moduleDeploymentController.updateDeploymentReplicas(ctx, depList.Items) }() diff --git a/controller/module_deployment_controller/module_deployment_controller_test.go b/controller/module_deployment_controller/module_deployment_controller_test.go index 6c7fb2f..7b1a70d 100644 --- a/controller/module_deployment_controller/module_deployment_controller_test.go +++ b/controller/module_deployment_controller/module_deployment_controller_test.go @@ -2,22 +2,15 @@ package module_deployment_controller import ( "context" - "github.com/stretchr/testify/assert" "testing" ) func TestDeploymentHandler(t *testing.T) { c := &ModuleDeploymentController{ - runtimeStorage: NewRuntimeInfoStore(), - updateToken: make(chan interface{}), + updateToken: make(chan interface{}), } ctx := context.Background() c.deploymentAddHandler(ctx, nil) - - assert.Equal(t, 0, len(c.runtimeStorage.peerDeploymentMap)) - c.deploymentUpdateHandler(ctx, nil, nil) - - assert.Equal(t, 0, len(c.runtimeStorage.peerDeploymentMap)) } diff --git a/controller/module_deployment_controller/runtime_info_store.go b/controller/module_deployment_controller/runtime_info_store.go deleted file mode 100644 index 5e6e166..0000000 --- a/controller/module_deployment_controller/runtime_info_store.go +++ /dev/null @@ -1,413 +0,0 @@ -package module_deployment_controller - -import ( - "fmt" - "sync" - - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/labels" -) - -// RuntimeInfoStore provides in-memory runtime information storage with thread-safe access. -// Key components: -// - Deployment maps: Stores peer and non-peer deployments -// - Node map: Stores node information -// - Label maps: Tracks relationships between deployments, nodes and their labels -// - Thread safety: Uses RWMutex for concurrent access - -// Main operations: -// 1. Deployment Management -// - PutDeployment: Adds/updates deployment and its label selectors -// - DeleteDeployment: Removes deployment and its label mappings -// -// 2. Node Management -// - PutNode: Adds/updates node and its labels, tracks label changes -// - DeleteNode: Removes node and updates label mappings -// -// 3. Label Management -// - updateDeploymentSelectorLabelMap: Maintains deployment->label mappings -// - updateNodeLabelMap: Maintains node->label mappings -// - Tracks both equality and non-equality label requirements -// -// 4. Matching Logic -// - GetRelatedDeploymentsByNode: Finds deployments matching a node's labels -// - GetMatchedNodeNum: Counts nodes matching a deployment's label selectors -// - isNodeFitDep: Checks if node labels satisfy deployment requirements -// -// 5. Helper Functions -// - Label set operations: union, intersection, subtraction -// - Label diff calculation -// - Deployment label extraction - -// RuntimeInfoStore provide the in memory runtime information. -type RuntimeInfoStore struct { - sync.RWMutex - peerDeploymentMap map[string]appsv1.Deployment - nonPeerDeploymentMap map[string]appsv1.Deployment - nodeMap map[string]*corev1.Node - depKeyToEqLabels map[string]map[string]map[string]interface{} - depKeyToNeLabels map[string]map[string]map[string]interface{} - labelKeyToValueToNodeKeysMap map[string]map[string]map[string]interface{} -} - -func NewRuntimeInfoStore() *RuntimeInfoStore { - return &RuntimeInfoStore{ - RWMutex: sync.RWMutex{}, - peerDeploymentMap: make(map[string]appsv1.Deployment), - nonPeerDeploymentMap: make(map[string]appsv1.Deployment), - nodeMap: make(map[string]*corev1.Node), - depKeyToEqLabels: make(map[string]map[string]map[string]interface{}), - depKeyToNeLabels: make(map[string]map[string]map[string]interface{}), - labelKeyToValueToNodeKeysMap: make(map[string]map[string]map[string]interface{}), - } -} - -// getResourceKey generates a unique key for a k8s resource by combining namespace and name -func (r *RuntimeInfoStore) getResourceKey(namespace, name string) string { - return fmt.Sprintf("%s/%s", namespace, name) -} - -// PutDeployment adds or updates a deployment in the store -// - Stores deployment in peerDeploymentMap -// - Updates label selector mappings -func (r *RuntimeInfoStore) PutDeployment(deployment appsv1.Deployment) { - r.Lock() - defer r.Unlock() - depKey := r.getResourceKey(deployment.Namespace, deployment.Name) - r.peerDeploymentMap[depKey] = deployment - r.updateDeploymentSelectorLabelMap(depKey, deployment.DeepCopy()) -} - -// DeleteDeployment removes a deployment and its label mappings from the store -// - Removes from peerDeploymentMap -// - Cleans up label selector mappings -func (r *RuntimeInfoStore) DeleteDeployment(deployment appsv1.Deployment) { - r.Lock() - defer r.Unlock() - - depKey := r.getResourceKey(deployment.Namespace, deployment.Name) - // check has old version - _, has := r.peerDeploymentMap[depKey] - if has { - delete(r.peerDeploymentMap, depKey) - delete(r.depKeyToEqLabels, depKey) - delete(r.depKeyToNeLabels, depKey) - } -} - -// PutNode adds or updates a node in the store -// - Stores node in nodeMap -// - Tracks label changes between old and new node -// - Updates label mappings -// Returns true if labels changed -func (r *RuntimeInfoStore) PutNode(node *corev1.Node) (labelChanged bool) { - r.Lock() - defer r.Unlock() - nodeKey := r.getResourceKey(node.Namespace, node.Name) - // check has old version - var sub, plus labels.Set - - var oldLabels labels.Set - - oldNode, has := r.nodeMap[nodeKey] - if has && oldNode != nil { - oldLabels = oldNode.DeepCopy().Labels - } - r.nodeMap[nodeKey] = node - sub, plus = getLabelDiff(oldLabels, node.DeepCopy().Labels) - r.updateNodeLabelMap(nodeKey, sub, plus) - return len(sub) != 0 || len(plus) != 0 -} - -// DeleteNode removes a node and its label mappings from the store -// - Removes from nodeMap -// - Updates label mappings -func (r *RuntimeInfoStore) DeleteNode(node *corev1.Node) { - r.Lock() - defer r.Unlock() - - nodeKey := r.getResourceKey(node.Namespace, node.Name) - // check has old version - var oldLabels labels.Set - - oldNode, has := r.nodeMap[nodeKey] - if has && oldNode != nil { - oldLabels = oldNode.DeepCopy().Labels - } - delete(r.nodeMap, nodeKey) - r.updateNodeLabelMap(nodeKey, oldLabels, nil) -} - -// updateDeploymentSelectorLabelMap maintains deployment->label mappings -// - Extracts equality and non-equality label requirements -// - Updates internal maps -func (r *RuntimeInfoStore) updateDeploymentSelectorLabelMap(depKey string, newDep *appsv1.Deployment) { - newEqLabels, newNeLabels := getDeploymentMatchLabels(*newDep) - - r.depKeyToEqLabels[depKey] = newEqLabels - r.depKeyToNeLabels[depKey] = newNeLabels -} - -// updateNodeLabelMap maintains node->label mappings -// - Removes old label mappings (sub) -// - Adds new label mappings (plus) -func (r *RuntimeInfoStore) updateNodeLabelMap(nodeKey string, sub, plus labels.Set) { - // delete map item - for key, value := range sub { - valueMap, has := r.labelKeyToValueToNodeKeysMap[key] - if !has { - continue - } - keyMap, has := valueMap[value] - if !has { - continue - } - delete(keyMap, nodeKey) - valueMap[value] = keyMap - r.labelKeyToValueToNodeKeysMap[key] = valueMap - } - - // add map item - for key, value := range plus { - valueMap, has := r.labelKeyToValueToNodeKeysMap[key] - if !has { - valueMap = make(map[string]map[string]interface{}) - } - keyMap, has := valueMap[value] - if !has { - keyMap = make(map[string]interface{}) - } - keyMap[nodeKey] = nil - valueMap[value] = keyMap - r.labelKeyToValueToNodeKeysMap[key] = valueMap - } -} - -// GetRelatedDeploymentsByNode finds deployments matching a node's labels -// - Checks each deployment against node labels -// - Returns matching deployments -func (r *RuntimeInfoStore) GetRelatedDeploymentsByNode(node *corev1.Node) []appsv1.Deployment { - r.Lock() - defer r.Unlock() - - matchedDeployments := make([]appsv1.Deployment, 0) - - for depKey := range r.peerDeploymentMap { - if r.isNodeFitDep(node.Labels, depKey) { - matchedDeployments = append(matchedDeployments, r.peerDeploymentMap[depKey]) - } - } - - return matchedDeployments -} - -// isNodeFitDep checks if node labels satisfy deployment requirements -// - Verifies equality label matches -// - Verifies non-equality label matches -func (r *RuntimeInfoStore) isNodeFitDep(nodeLabels labels.Set, depKey string) bool { - eqLabels := r.depKeyToEqLabels[depKey] - neLabels := r.depKeyToNeLabels[depKey] - for key, labelValues := range eqLabels { - value, has := nodeLabels[key] - if !has { - return false - } - _, has = labelValues[value] - if !has { - return false - } - } - for key, labelValues := range neLabels { - value, has := nodeLabels[key] - if !has { - continue - } - _, has = labelValues[value] - if has { - return false - } - } - return true -} - -// GetMatchedNodeNum counts nodes matching a deployment's label selectors -// - Checks equality and non-equality label requirements -// - Returns count of matching nodes -func (r *RuntimeInfoStore) GetMatchedNodeNum(deployment appsv1.Deployment) int { - r.Lock() - defer r.Unlock() - - matchedNodeKeys := make(map[string]interface{}) - - eqLabels, neLabels := getDeploymentMatchLabels(deployment) - - for key, labelValues := range eqLabels { - valueMap, has := r.labelKeyToValueToNodeKeysMap[key] - if !has { - // no label matched - return 0 - } - validNodeKeys := make(map[string]interface{}) - for value := range labelValues { - validNodeKeys = union(validNodeKeys, valueMap[value]) - } - if len(matchedNodeKeys) == 0 { - matchedNodeKeys = validNodeKeys - } else { - matchedNodeKeys = intersection(matchedNodeKeys, validNodeKeys) - } - if len(matchedNodeKeys) == 0 { - // no matched deployments, return directly - return 0 - } - } - - for key, labelValues := range neLabels { - valueMap := r.labelKeyToValueToNodeKeysMap[key] - if valueMap == nil { - valueMap = make(map[string]map[string]interface{}) - } - invalidNodeKeys := make(map[string]interface{}) - for value := range labelValues { - invalidNodeKeys = union(invalidNodeKeys, valueMap[value]) - } - matchedNodeKeys = subKeys(matchedNodeKeys, invalidNodeKeys) - if len(matchedNodeKeys) == 0 { - // no matched deployments, return directly - return 0 - } - } - - nodeNum := 0 - for key := range matchedNodeKeys { - node := r.nodeMap[key] - if node == nil { - continue - } - nodeNum++ - } - return nodeNum -} - -// getLabelDiff calculates differences between old and new label sets -// Returns: -// - sub: labels removed -// - plus: labels added -func getLabelDiff(oldLabels, newLabels labels.Set) (sub labels.Set, plus labels.Set) { - sub = labels.Set{} - plus = labels.Set{} - - for key, value := range oldLabels { - newValue, has := newLabels[key] - if !has || newValue != value { - sub[key] = value - } else { - delete(newLabels, key) - } - } - - for key, value := range newLabels { - plus[key] = value - } - - return -} - -// subKeys returns keys in keyList1 that are not in keyList2 -func subKeys(keyList1, keyList2 map[string]interface{}) map[string]interface{} { - sub := make(map[string]interface{}) - for key := range keyList1 { - _, has := keyList2[key] - if has { - continue - } - sub[key] = keyList1[key] - } - return sub -} - -// unionLabels adds labels2 entries to srcMap -func unionLabels(srcMap map[string]map[string]interface{}, labels2 labels.Set) { - for key, value := range labels2 { - valueMap := srcMap[key] - if valueMap == nil { - valueMap = make(map[string]interface{}) - } - valueMap[value] = nil - srcMap[key] = valueMap - } -} - -// intersection returns keys present in both maps -func intersection(src, target map[string]interface{}) map[string]interface{} { - ret := map[string]interface{}{} - - for key := range target { - _, has := src[key] - if !has { - continue - } - ret[key] = nil - } - return ret -} - -// union returns keys present in either map -func union(src, target map[string]interface{}) map[string]interface{} { - ret := map[string]interface{}{} - - for key := range target { - ret[key] = nil - } - for key := range src { - ret[key] = nil - } - return ret -} - -// getDeploymentMatchLabels extracts label requirements from deployment spec -// Returns: -// - eqLabels: equality requirements (In/Exists) -// - neLabels: non-equality requirements (NotIn/DoesNotExist) -func getDeploymentMatchLabels(dep appsv1.Deployment) (eqLabels, neLabels map[string]map[string]interface{}) { - - eqLabels = make(map[string]map[string]interface{}) - neLabels = make(map[string]map[string]interface{}) - - affinity := dep.Spec.Template.Spec.Affinity - if affinity != nil && affinity.NodeAffinity != nil { - nodeAffinity := affinity.NodeAffinity - for _, term := range nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { - for _, expressions := range term.MatchExpressions { - if expressions.Operator == corev1.NodeSelectorOpIn || expressions.Operator == corev1.NodeSelectorOpExists { - eqValues := make(map[string]interface{}) - for _, value := range expressions.Values { - eqValues[value] = nil - } - oldValues, has := eqLabels[expressions.Key] - if !has { - eqLabels[expressions.Key] = eqValues - } else { - eqLabels[expressions.Key] = intersection(oldValues, eqValues) - } - } else if expressions.Operator == corev1.NodeSelectorOpNotIn || expressions.Operator == corev1.NodeSelectorOpDoesNotExist { - neValues := make(map[string]interface{}) - for _, value := range expressions.Values { - neValues[value] = nil - } - oldValues, has := neLabels[expressions.Key] - if !has { - neLabels[expressions.Key] = neValues - } else { - neLabels[expressions.Key] = union(oldValues, neValues) - } - } - } - } - } - - unionLabels(eqLabels, dep.Spec.Template.Spec.NodeSelector) - - return -} diff --git a/controller/module_deployment_controller/runtime_info_store_test.go b/controller/module_deployment_controller/runtime_info_store_test.go deleted file mode 100644 index 895fe28..0000000 --- a/controller/module_deployment_controller/runtime_info_store_test.go +++ /dev/null @@ -1,428 +0,0 @@ -package module_deployment_controller - -import ( - "github.com/stretchr/testify/assert" - appsv1 "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "testing" -) - -func TestUnionLabels(t *testing.T) { - srcMap := map[string]map[string]interface{}{} - unionLabels(srcMap, labels.Set{ - "foo": "bar", - }) - _, exist := srcMap["foo"]["bar"] - assert.True(t, exist) -} - -func TestGetDeploymentMatchLabels(t *testing.T) { - eqLabels, neLabels := getDeploymentMatchLabels(appsv1.Deployment{ - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{"foo": "bar1"}, - Affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "foo", - Operator: v1.NodeSelectorOpIn, - Values: []string{"bar2"}, - }, - { - Key: "foo", - Operator: v1.NodeSelectorOpExists, - Values: []string{"bar3"}, - }, - { - Key: "baz", - Operator: v1.NodeSelectorOpNotIn, - Values: []string{"bar"}, - }, - { - Key: "baz", - Operator: v1.NodeSelectorOpDoesNotExist, - Values: []string{"bar2"}, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }) - - assert.Equal(t, len(eqLabels), 1) - assert.Equal(t, len(eqLabels["foo"]), 1) - assert.Equal(t, len(neLabels), 1) - assert.Equal(t, len(neLabels["baz"]), 2) -} - -func TestSubKeys(t *testing.T) { - ret := subKeys(map[string]interface{}{ - "key1": nil, - "key2": nil, - }, map[string]interface{}{ - "key2": nil, - "key3": nil, - }) - assert.Equal(t, len(ret), 1) -} - -func TestGetLabelDiff(t *testing.T) { - sub, plus := getLabelDiff(labels.Set{ - "foo": "bar", - "bar": "baz", - "baz": "qux", - }, labels.Set{ - "foo": "bar1", - "bar": "baz", - }) - assert.Equal(t, len(sub), 2) - assert.Equal(t, len(plus), 1) -} - -func TestRuntimeInfoStore_Deployment(t *testing.T) { - runtimeInfoStore := NewRuntimeInfoStore() - deployment := appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{"foo": "bar1"}, - Affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "foo", - Operator: v1.NodeSelectorOpIn, - Values: []string{"bar2"}, - }, - { - Key: "foo", - Operator: v1.NodeSelectorOpExists, - Values: []string{"bar3"}, - }, - { - Key: "baz", - Operator: v1.NodeSelectorOpNotIn, - Values: []string{"bar"}, - }, - { - Key: "baz", - Operator: v1.NodeSelectorOpDoesNotExist, - Values: []string{"bar2"}, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - runtimeInfoStore.PutDeployment(deployment) - - assert.Equal(t, len(runtimeInfoStore.peerDeploymentMap), 1) - assert.Equal(t, len(runtimeInfoStore.depKeyToEqLabels["test/test"]["foo"]), 1) - assert.Equal(t, len(runtimeInfoStore.depKeyToNeLabels["test/test"]["baz"]), 2) - - // Delete exist - runtimeInfoStore.DeleteDeployment(deployment) - assert.Equal(t, len(runtimeInfoStore.peerDeploymentMap), 0) - assert.Equal(t, len(runtimeInfoStore.depKeyToEqLabels), 0) - assert.Equal(t, len(runtimeInfoStore.depKeyToNeLabels), 0) -} - -func TestRuntimeInfoStore_Node(t *testing.T) { - runtimeInfoStore := NewRuntimeInfoStore() - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test", - Labels: map[string]string{ - "foo": "bar", - }, - }, - }) - assert.Equal(t, len(runtimeInfoStore.nodeMap), 1) - assert.Equal(t, len(runtimeInfoStore.labelKeyToValueToNodeKeysMap), 1) - // update node - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test", - Labels: map[string]string{ - "foo": "bar", - "foo2": "bar2", - }, - }, - }) - assert.Equal(t, len(runtimeInfoStore.nodeMap), 1) - assert.Equal(t, len(runtimeInfoStore.labelKeyToValueToNodeKeysMap), 2) - - // update node - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test", - Labels: map[string]string{ - "foo": "bar", - "foo2": "bar3", - }, - }, - }) - assert.Equal(t, len(runtimeInfoStore.nodeMap), 1) - assert.Equal(t, len(runtimeInfoStore.labelKeyToValueToNodeKeysMap), 2) - - runtimeInfoStore.DeleteNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test", - Labels: map[string]string{ - "foo": "bar", - "foo2": "bar3", - }, - }, - }) -} - -func TestRuntimeInfoStore_NodeMatchDeployment(t *testing.T) { - runtimeInfoStore := NewRuntimeInfoStore() - node := &v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test", - Labels: map[string]string{ - "foo": "bar", - }, - }, - } - runtimeInfoStore.PutNode(node) - assert.Equal(t, len(runtimeInfoStore.nodeMap), 1) - assert.Equal(t, len(runtimeInfoStore.labelKeyToValueToNodeKeysMap), 1) - runtimeInfoStore.PutDeployment(appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test1", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{"foo": "bar"}, - Affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "baz", - Operator: v1.NodeSelectorOpNotIn, - Values: []string{"bar"}, - }, - { - Key: "baz", - Operator: v1.NodeSelectorOpDoesNotExist, - Values: []string{"bar2"}, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }) - runtimeInfoStore.PutDeployment(appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test2", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{"foo1": "bar1"}, - }, - }, - }, - }) - - runtimeInfoStore.PutDeployment(appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test3", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{"foo": "bar1"}, - }, - }, - }, - }) - - runtimeInfoStore.PutDeployment(appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test4", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "foo", - Operator: v1.NodeSelectorOpNotIn, - Values: []string{"bar"}, - }, - { - Key: "baz", - Operator: v1.NodeSelectorOpDoesNotExist, - Values: []string{"bar2"}, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }) - - runtimeInfoStore.GetRelatedDeploymentsByNode(node) - assert.Equal(t, len(runtimeInfoStore.nodeMap), 1) -} - -func TestRuntimeInfoStore_DeploymentMatchNode(t *testing.T) { - runtimeInfoStore := NewRuntimeInfoStore() - num := runtimeInfoStore.GetMatchedNodeNum(appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test1", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{"foo": "bar"}, - }, - }, - }, - }) - assert.Equal(t, num, 0) - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test", - Labels: map[string]string{ - "foo": "bar", - }, - }, - }) - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test0", - Labels: map[string]string{ - "baz": "bar", - }, - }, - }) - num = runtimeInfoStore.GetMatchedNodeNum(appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test1", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{ - "foo": "bar", - "baz": "bar", - }, - }, - }, - }, - }) - assert.Equal(t, num, 0) - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test1", - Labels: map[string]string{ - "foo": "bar1", - }, - }, - }) - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test2", - Labels: map[string]string{ - "foo1": "bar1", - }, - }, - }) - runtimeInfoStore.PutNode(&v1.Node{ - ObjectMeta: v12.ObjectMeta{ - Name: "test3", - Labels: map[string]string{ - "foo": "bar", - "baz": "bar", - }, - }, - }) - num = runtimeInfoStore.GetMatchedNodeNum(appsv1.Deployment{ - ObjectMeta: v12.ObjectMeta{ - Name: "test1", - Namespace: "test", - }, - Spec: appsv1.DeploymentSpec{ - Template: v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - NodeSelector: map[string]string{"foo": "bar"}, - Affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "baz", - Operator: v1.NodeSelectorOpNotIn, - Values: []string{"bar"}, - }, - { - Key: "baz", - Operator: v1.NodeSelectorOpDoesNotExist, - Values: []string{"bar2"}, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }, - }) - assert.Equal(t, num, 1) -} diff --git a/go.mod b/go.mod index 0c3dad9..8a0574a 100644 --- a/go.mod +++ b/go.mod @@ -9,8 +9,8 @@ require ( github.com/go-logr/logr v1.4.2 github.com/go-resty/resty/v2 v2.11.0 github.com/google/uuid v1.6.0 - github.com/koupleless/arkctl v0.2.3 - github.com/koupleless/virtual-kubelet v0.3.6-0.20241219063827-66baf533a4ab + github.com/koupleless/arkctl v0.2.4-0.20250106035535-5ed5cb871995 + github.com/koupleless/virtual-kubelet v0.3.6-0.20250106054929-8ac8664e1858 github.com/onsi/ginkgo/v2 v2.19.0 github.com/onsi/gomega v1.33.1 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index d1a24e9..78ae9ad 100644 --- a/go.sum +++ b/go.sum @@ -92,10 +92,10 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/koupleless/arkctl v0.2.3 h1:gjKvxvh2WT9dOGrFCWFFIAvnwN7uy1UtI7DA+npnJuY= -github.com/koupleless/arkctl v0.2.3/go.mod h1:nbnAiPEv7x/ZDQ+QsjFWkqwxMDofGmqnFPHa3XpXHyE= -github.com/koupleless/virtual-kubelet v0.3.6-0.20241219063827-66baf533a4ab h1:EkozcBGAFwB1H03he6jHJXSB2gAXMvv7+IXRC18LwsM= -github.com/koupleless/virtual-kubelet v0.3.6-0.20241219063827-66baf533a4ab/go.mod h1:5fQ0uI9egbYDMxgUE0niS4agDTssIOCjq5wyO0ALNaU= +github.com/koupleless/arkctl v0.2.4-0.20250106035535-5ed5cb871995 h1:dkBdI/WczkOJ4LaoZteX3uz8r+WKxBqYQjpzwyVDvyw= +github.com/koupleless/arkctl v0.2.4-0.20250106035535-5ed5cb871995/go.mod h1:nbnAiPEv7x/ZDQ+QsjFWkqwxMDofGmqnFPHa3XpXHyE= +github.com/koupleless/virtual-kubelet v0.3.6-0.20250106054929-8ac8664e1858 h1:yMpgkyoWcr3XNj6cHqt/HwzE9LO6/HHqcRn4vvd3PBk= +github.com/koupleless/virtual-kubelet v0.3.6-0.20250106054929-8ac8664e1858/go.mod h1:V/RjXRvoSNr55I9KMV+tgtOp6duxxBMcwyDTH04XiX0= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= diff --git a/module_tunnels/koupleless_http_tunnel/ark_service/ark_service.go b/module_tunnels/koupleless_http_tunnel/ark_service/ark_service.go index 3dca65b..1f8fe30 100644 --- a/module_tunnels/koupleless_http_tunnel/ark_service/ark_service.go +++ b/module_tunnels/koupleless_http_tunnel/ark_service/ark_service.go @@ -5,7 +5,9 @@ import ( "encoding/json" "errors" "fmt" + "github.com/koupleless/arkctl/v1/service/ark" "github.com/koupleless/module_controller/common/zaplogger" + "net/http" "github.com/go-resty/resty/v2" ) @@ -18,7 +20,7 @@ func NewService() *Service { return &Service{client: resty.New()} } -func (h *Service) InstallBiz(ctx context.Context, req InstallBizRequest, baseIP string, arkletPort int) (response ArkResponse, err error) { +func (h *Service) InstallBiz(ctx context.Context, req InstallBizRequest, baseIP string, arkletPort int) (response ArkResponse[ark.ArkResponseData], err error) { logger := zaplogger.FromContext(ctx) resp, err := h.client.R(). @@ -37,6 +39,12 @@ func (h *Service) InstallBiz(ctx context.Context, req InstallBizRequest, baseIP return } + if resp.StatusCode() != http.StatusOK { + err = errors.New(fmt.Sprintf("response status: %d", resp.StatusCode())) + logger.Error(err, "installBiz request failed") + return + } + if err = json.Unmarshal(resp.Body(), &response); err != nil { logger.Error(err, fmt.Sprintf("Unmarshal InstallBiz response: %s", resp.Body())) return @@ -45,7 +53,7 @@ func (h *Service) InstallBiz(ctx context.Context, req InstallBizRequest, baseIP return response, nil } -func (h *Service) UninstallBiz(ctx context.Context, req UninstallBizRequest, baseIP string, arkletPort int) (response ArkResponse, err error) { +func (h *Service) UninstallBiz(ctx context.Context, req UninstallBizRequest, baseIP string, arkletPort int) (response ArkResponse[ark.ArkResponseData], err error) { logger := zaplogger.FromContext(ctx) resp, err := h.client.R(). @@ -64,6 +72,12 @@ func (h *Service) UninstallBiz(ctx context.Context, req UninstallBizRequest, bas return } + if resp.StatusCode() != http.StatusOK { + err = errors.New(fmt.Sprintf("response status: %d", resp.StatusCode())) + logger.Error(err, "uninstall request failed") + return + } + if err = json.Unmarshal(resp.Body(), &response); err != nil { logger.Error(err, fmt.Sprintf("Unmarshal UnInstallBiz response: %s", resp.Body())) return @@ -91,6 +105,12 @@ func (h *Service) QueryAllBiz(ctx context.Context, baseIP string, port int) (res return } + if resp.StatusCode() != http.StatusOK { + err = errors.New(fmt.Sprintf("response status: %d", resp.StatusCode())) + logger.Error(err, "queryAllBiz request failed") + return + } + if err = json.Unmarshal(resp.Body(), &response); err != nil { logger.Error(err, fmt.Sprintf("Unmarshal QueryAllBiz response: %s", resp.Body())) return @@ -118,6 +138,12 @@ func (h *Service) Health(ctx context.Context, baseIP string, arkletPort int) (re return } + if resp.StatusCode() != http.StatusOK { + err = errors.New(fmt.Sprintf("response status: %d", resp.StatusCode())) + logger.Error(err, "health request failed") + return + } + if err = json.Unmarshal(resp.Body(), &response); err != nil { logger.Error(err, fmt.Sprintf("Unmarshal Health response: %s", resp.Body())) return diff --git a/module_tunnels/koupleless_http_tunnel/ark_service/model.go b/module_tunnels/koupleless_http_tunnel/ark_service/model.go index bceb985..7aa780b 100644 --- a/module_tunnels/koupleless_http_tunnel/ark_service/model.go +++ b/module_tunnels/koupleless_http_tunnel/ark_service/model.go @@ -13,12 +13,12 @@ type UninstallBizRequest struct { } // ArkResponse is a generic response structure for Ark service operations -type ArkResponse struct { +type ArkResponse[T any] struct { // Code is the response code indicating the outcome of the operation Code string `json:"code"` // Data is the response data, which can vary depending on the operation - Data ark.ArkResponseData `json:"data"` + Data T `json:"data"` // Message is the error message in case of an error Message string `json:"message"` diff --git a/module_tunnels/koupleless_http_tunnel/http_tunnel.go b/module_tunnels/koupleless_http_tunnel/http_tunnel.go index 34ee357..1a07e0f 100644 --- a/module_tunnels/koupleless_http_tunnel/http_tunnel.go +++ b/module_tunnels/koupleless_http_tunnel/http_tunnel.go @@ -73,7 +73,7 @@ func (h *HttpTunnel) GetBizUniqueKey(container *corev1.Container) string { } // RegisterNode is called when a new node starts -func (h *HttpTunnel) RegisterNode(initData vkModel.NodeInfo) { +func (h *HttpTunnel) RegisterNode(initData vkModel.NodeInfo) error { h.Lock() defer h.Unlock() @@ -83,6 +83,7 @@ func (h *HttpTunnel) RegisterNode(initData vkModel.NodeInfo) { if !has { h.nodeIdToBaseStatusMap[nodeID] = utils.ConvertBaseStatusFromNodeInfo(initData) } + return nil } // UnRegisterNode is called when a node stops @@ -226,24 +227,32 @@ func (h *HttpTunnel) allBizMsgCallback(nodeID string, data ark_service.QueryAllB return } if h.onQueryAllBizDataArrived != nil { + fmt.Println("allBizMsgCallback!") h.onQueryAllBizDataArrived(utils2.FormatNodeName(nodeID, h.env), utils.TranslateBizInfosToContainerStatuses(data.GenericArkResponseBase.Data, time.Now().UnixMilli())) } } // bizOperationResponseCallback is the callback function for business operation responses -func (h *HttpTunnel) bizOperationResponseCallback(nodeID string, data model.BizOperationResponse) { - logger := zaplogger.FromContext(h.ctx) +func (httpTunnel *HttpTunnel) bizOperationResponseCallback(nodeID string, data model.BizOperationResponse) { + logger := zaplogger.FromContext(httpTunnel.ctx) + nodeName := utils2.FormatNodeName(nodeID, httpTunnel.env) if data.Response.Code == "SUCCESS" { if data.Command == model.CommandInstallBiz { - // not update here, update in all biz response callback + logger.Info("install biz success: ", data.BizName, data.BizVersion) + httpTunnel.onOneBizDataArrived(nodeName, vkModel.BizStatusData{ + Key: utils.GetBizIdentity(data.BizName, data.BizVersion), + Name: data.BizName, + State: string(vkModel.BizStateActivated), + ChangeTime: time.Now(), + Reason: fmt.Sprintf("%s:%s %s succeed", data.BizName, data.BizVersion, data.Command), + Message: data.Response.Data.Message, + }) return + } else { + logger.Error(nil, fmt.Sprintf("biz operation failed: %s:%s %s\n%s\n%s\n%s", data.BizName, data.BizVersion, data.Command, data.Response.Message, data.Response.ErrorStackTrace, data.Response.Data.Message)) } - } else { - // operation failed, log - logger.Error(nil, "biz operation failed: %s\n%s\n%s", data.Response.Message, data.Response.ErrorStackTrace, data.Response.Data.Message) } - - h.onOneBizDataArrived(utils2.FormatNodeName(nodeID, h.env), vkModel.BizStatusData{ + httpTunnel.onOneBizDataArrived(nodeName, vkModel.BizStatusData{ Key: utils.GetBizIdentity(data.BizName, data.BizVersion), Name: data.BizName, // fille PodKey when using diff --git a/module_tunnels/koupleless_mqtt_tunnel/mqtt_tunnel.go b/module_tunnels/koupleless_mqtt_tunnel/mqtt_tunnel.go index 0f71e0d..979775d 100644 --- a/module_tunnels/koupleless_mqtt_tunnel/mqtt_tunnel.go +++ b/module_tunnels/koupleless_mqtt_tunnel/mqtt_tunnel.go @@ -15,8 +15,10 @@ import ( utils2 "github.com/koupleless/virtual-kubelet/common/utils" vkModel "github.com/koupleless/virtual-kubelet/model" "github.com/koupleless/virtual-kubelet/tunnel" + "github.com/virtual-kubelet/virtual-kubelet/log" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" + "strings" "sync" "time" ) @@ -58,17 +60,50 @@ func (mqttTunnel *MqttTunnel) GetBizUniqueKey(container *corev1.Container) strin return utils.GetBizIdentity(container.Name, utils.GetBizVersionFromContainer(container)) } -func (mqttTunnel *MqttTunnel) RegisterNode(nodeInfo vkModel.NodeInfo) { +func (mqttTunnel *MqttTunnel) RegisterNode(nodeInfo vkModel.NodeInfo) error { + zlogger := zaplogger.FromContext(mqttTunnel.ctx) + nodeID := utils2.ExtractNodeIDFromNodeName(nodeInfo.Metadata.Name) - mqttTunnel.mqttClient.Sub(fmt.Sprintf(model.BaseHealthTopic, mqttTunnel.env, nodeID), mqtt.Qos1, mqttTunnel.healthMsgCallback) - mqttTunnel.mqttClient.Sub(fmt.Sprintf(model.BaseSimpleBizTopic, mqttTunnel.env, nodeID), mqtt.Qos1, mqttTunnel.bizMsgCallback) + var err error + topic := fmt.Sprintf(model.BaseHealthTopic, mqttTunnel.env, nodeID) + err = mqttTunnel.mqttClient.Sub(topic, mqtt.Qos1, mqttTunnel.healthMsgCallback) + if err != nil { + zlogger.Error(err, fmt.Sprintf("Error when registering node: %s for topic: %s", nodeID, topic)) + return err + } + + topic = fmt.Sprintf(model.BaseSimpleBizTopic, mqttTunnel.env, nodeID) + err = mqttTunnel.mqttClient.Sub(topic, mqtt.Qos1, mqttTunnel.bizMsgCallback) + if err != nil { + zlogger.Error(err, fmt.Sprintf("Error when registering node: %s for topic: %s", nodeID, topic)) + return err + } + + topic = fmt.Sprintf(model.BaseAllBizTopic, mqttTunnel.env, nodeID) + err = mqttTunnel.mqttClient.Sub(topic, mqtt.Qos1, mqttTunnel.allBizMsgCallback) + if err != nil { + zlogger.Error(err, fmt.Sprintf("Error when registering node: %s for topic: %s", nodeID, topic)) + return err + } + + topic = fmt.Sprintf(model.BaseBizOperationResponseTopic, mqttTunnel.env, nodeID) + err = mqttTunnel.mqttClient.Sub(topic, mqtt.Qos1, mqttTunnel.bizOperationResponseCallback) + if err != nil { + zlogger.Error(err, fmt.Sprintf("Error when registering node: %s for topic: %s", nodeID, topic)) + return err + } - mqttTunnel.mqttClient.Sub(fmt.Sprintf(model.BaseAllBizTopic, mqttTunnel.env, nodeID), mqtt.Qos1, mqttTunnel.allBizMsgCallback) + topic = fmt.Sprintf(model.BaseBatchInstallBizResponseTopic, mqttTunnel.env, nodeID) + err = mqttTunnel.mqttClient.Sub(topic, mqtt.Qos1, mqttTunnel.batchInstallBizResponseCallback) + if err != nil { + zlogger.Error(err, fmt.Sprintf("Error when registering node: %s for topic: %s", nodeID, topic)) + return err + } - mqttTunnel.mqttClient.Sub(fmt.Sprintf(model.BaseBizOperationResponseTopic, mqttTunnel.env, nodeID), mqtt.Qos1, mqttTunnel.bizOperationResponseCallback) mqttTunnel.Lock() defer mqttTunnel.Unlock() + return nil } func (mqttTunnel *MqttTunnel) UnRegisterNode(nodeName string) { @@ -183,6 +218,7 @@ func (mqttTunnel *MqttTunnel) queryBaselineMsgCallback(_ paho.Client, msg paho.M for _, container := range baselineContainers { baselineBizs = append(baselineBizs, utils.TranslateCoreV1ContainerToBizModel(&container)) } + log.G(mqttTunnel.ctx).Info("queried baseline {} ", baselineBizs) baselineBytes, _ := json.Marshal(baselineBizs) err = mqttTunnel.mqttClient.Pub(utils.FormatBaselineResponseTopic(mqttTunnel.env, nodeID), mqtt.Qos1, baselineBytes) if err != nil { @@ -325,6 +361,64 @@ func (mqttTunnel *MqttTunnel) bizOperationResponseCallback(_ paho.Client, msg pa } } +func (mqttTunnel *MqttTunnel) batchInstallBizResponseCallback(_ paho.Client, msg paho.Message) { + defer msg.Ack() + zlogger := zaplogger.FromContext(mqttTunnel.ctx) + + zlogger.Info(fmt.Sprintf("query batch biz operation status callback for %s: %s", msg.Topic(), msg.Payload())) + var data model.ArkMqttMsg[model.BatchInstallBizResponse] + err := json.Unmarshal(msg.Payload(), &data) + if err != nil { + zlogger.Error(err, "Error unmarshalling biz response") + return + } + + if data.Data.Command != model.CommandBatchInstallBiz { + unsupported := fmt.Sprintf("Error command: %s, should be batchInstallBiz", data.Data.Command) + zlogger.Error(errors.New(unsupported), unsupported) + return + } + + bizStatusData := mqttTunnel.convertToBizStatusData(data) + if bizStatusData != nil && len(bizStatusData) > 0 { + nodeID := utils.GetBaseIdentityFromTopic(msg.Topic()) + nodeName := utils2.FormatNodeName(nodeID, mqttTunnel.env) + mqttTunnel.onAllBizStatusArrived(nodeName, bizStatusData) + } +} + +func (mqttTunnel *MqttTunnel) convertToBizStatusData(msg model.ArkMqttMsg[model.BatchInstallBizResponse]) []vkModel.BizStatusData { + var result []vkModel.BizStatusData + arkBatchInstallResponse := msg.Data.Response.Data + for _, bizResponse := range arkBatchInstallResponse.BizUrlToResponse { + bizInfo := bizResponse.BizInfos[0] + bizName := bizInfo.BizName + bizVersion := bizInfo.BizVersion + bizState := strings.ToUpper(bizInfo.BizState) + changeTime := time.UnixMilli(msg.PublishTimestamp) + message := bizResponse.Message + var reason string + if mqttTunnel.isActivatedBiz(bizInfo.BizState) { + reason = fmt.Sprintf("%s:%s %s succeed", bizName, bizVersion, bizState) + } else { + reason = fmt.Sprintf("%s:%s %s failed", bizName, bizVersion, bizState) + } + result = append(result, vkModel.BizStatusData{ + Key: utils.GetBizIdentity(bizName, bizVersion), + Name: bizName, + State: bizState, + ChangeTime: changeTime, + Reason: reason, + Message: message, + }) + } + return result +} + +func (mqttTunnel *MqttTunnel) isActivatedBiz(bizState string) bool { + return strings.ToUpper(bizState) == "ACTIVATED" +} + func (mqttTunnel *MqttTunnel) FetchHealthData(nodeName string) error { nodeID := utils2.ExtractNodeIDFromNodeName(nodeName) return mqttTunnel.mqttClient.Pub(utils.FormatArkletCommandTopic(mqttTunnel.env, nodeID, model.CommandHealth), mqtt.Qos0, []byte("{}")) diff --git a/suite/http/base_http_lifecycle_test.go b/suite/http/base_http_lifecycle_test.go index 1d56398..4aba9a8 100644 --- a/suite/http/base_http_lifecycle_test.go +++ b/suite/http/base_http_lifecycle_test.go @@ -17,12 +17,15 @@ var _ = Describe("Base Lifecycle Test", func() { ctx := context.Background() + // NOTICE: nodeId should be unique in suite test to avoid incorrect vnode handling pod or deployment. + nodeId := "http-base-for-base-test" clusterName := "test-cluster-name" - httpNodeID := "test-http-base" - mockHttpBase := NewMockHttpBase(httpNodeID, clusterName, "1.0.0", env, 1238) + // NOTICE: port should be unique in suite test to avoid incorrect base handling request. + mockHttpBase := NewMockHttpBase(nodeId, clusterName, "1.0.0", env, 1237) + // NOTICE: if test cases will contaminate each other, the cases should add `Serial` keyword in ginkgo Context("http base online and deactive finally", func() { - It("base should become a ready vnode eventually", func() { + It("base should become a ready vnode eventually", Serial, func() { time.Sleep(time.Second) go mockHttpBase.Start(ctx, clientID) @@ -30,7 +33,7 @@ var _ = Describe("Base Lifecycle Test", func() { Eventually(func() bool { lease := &v12.Lease{} err := k8sClient.Get(ctx, types.NamespacedName{ - Name: utils.FormatNodeName(httpNodeID, env), + Name: utils.FormatNodeName(nodeId, env), Namespace: v1.NamespaceNodeLease, }, lease) @@ -44,7 +47,7 @@ var _ = Describe("Base Lifecycle Test", func() { Eventually(func() bool { vnode := &v1.Node{} err := k8sClient.Get(ctx, types.NamespacedName{ - Name: utils.FormatNodeName(httpNodeID, env), + Name: utils.FormatNodeName(nodeId, env), }, vnode) vnodeReady := false for _, cond := range vnode.Status.Conditions { @@ -57,25 +60,25 @@ var _ = Describe("Base Lifecycle Test", func() { }, time.Second*50, time.Second).Should(BeTrue()) }) - It("base offline with deactive message and finally exit", func() { + It("base offline with deactive message and finally exit", Serial, func() { mockHttpBase.Exit() Eventually(func() bool { vnode := &v1.Node{} err := k8sClient.Get(ctx, types.NamespacedName{ - Name: utils.FormatNodeName(httpNodeID, env), + Name: utils.FormatNodeName(nodeId, env), }, vnode) return errors.IsNotFound(err) }, time.Second*50, time.Second).Should(BeTrue()) }) - It("base should become a ready vnode eventually", func() { + It("base should become a ready vnode eventually", Serial, func() { time.Sleep(time.Second) go mockHttpBase.Start(ctx, clientID) vnode := &v1.Node{} Eventually(func() bool { err := k8sClient.Get(ctx, types.NamespacedName{ - Name: utils.FormatNodeName(httpNodeID, env), + Name: utils.FormatNodeName(nodeId, env), }, vnode) vnodeReady := false for _, cond := range vnode.Status.Conditions { @@ -88,25 +91,25 @@ var _ = Describe("Base Lifecycle Test", func() { }, time.Second*50, time.Second).Should(BeTrue()) }) - It("base unreachable finally exit", func() { + It("base unreachable finally exit", Serial, func() { mockHttpBase.reachable = false Eventually(func() bool { vnode := &v1.Node{} err := k8sClient.Get(ctx, types.NamespacedName{ - Name: utils.FormatNodeName(httpNodeID, env), + Name: utils.FormatNodeName(nodeId, env), }, vnode) return errors.IsNotFound(err) }, time.Minute*2, time.Second).Should(BeTrue()) }) - It("reachable base should become a ready vnode eventually", func() { + It("reachable base should become a ready vnode eventually", Serial, func() { time.Sleep(time.Second) mockHttpBase.reachable = true go mockHttpBase.Start(ctx, clientID) vnode := &v1.Node{} Eventually(func() bool { err := k8sClient.Get(ctx, types.NamespacedName{ - Name: utils.FormatNodeName(httpNodeID, env), + Name: utils.FormatNodeName(nodeId, env), }, vnode) vnodeReady := false for _, cond := range vnode.Status.Conditions { @@ -119,13 +122,13 @@ var _ = Describe("Base Lifecycle Test", func() { }, time.Second*50, time.Second).Should(BeTrue()) }) - It("base finally exit", func() { + It("base finally exit", Serial, func() { mockHttpBase.Exit() Eventually(func() bool { vnode := &v1.Node{} err := k8sClient.Get(ctx, types.NamespacedName{ - Name: utils.FormatNodeName(httpNodeID, env), + Name: utils.FormatNodeName(nodeId, env), }, vnode) return errors.IsNotFound(err) }, time.Second*50, time.Second).Should(BeTrue()) diff --git a/suite/http/mock_http_base.go b/suite/http/mock_http_base.go index ec65126..bab0be1 100644 --- a/suite/http/mock_http_base.go +++ b/suite/http/mock_http_base.go @@ -54,39 +54,39 @@ func (b *MockHttpBase) Exit() { } } -func (b *MockHttpBase) Start(ctx context.Context, clientID string) error { - b.exit = make(chan struct{}) - b.CurrState = "ACTIVATED" +func (base *MockHttpBase) Start(ctx context.Context, clientID string) error { + base.exit = make(chan struct{}) + base.CurrState = "ACTIVATED" // start a http server to mock base mux := http.NewServeMux() server := http.Server{ - Addr: fmt.Sprintf(":%d", b.port), + Addr: fmt.Sprintf(":%d", base.port), Handler: mux, } mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - if b.reachable { + if base.reachable { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") - w.Write(b.getHealthMsg()) + w.Write(base.getHealthMsg()) } else { w.WriteHeader(http.StatusBadGateway) } }) mux.HandleFunc("/queryAllBiz", func(w http.ResponseWriter, r *http.Request) { - if b.reachable { + if base.reachable { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") - w.Write(b.getQueryAllBizMsg()) + w.Write(base.getQueryAllBizMsg()) } else { w.WriteHeader(http.StatusBadGateway) } }) mux.HandleFunc("/installBiz", func(w http.ResponseWriter, r *http.Request) { - if b.reachable { + if base.reachable { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") @@ -97,7 +97,7 @@ func (b *MockHttpBase) Start(ctx context.Context, clientID string) error { return } - w.Write(b.processInstallBiz(body)) + w.Write(base.processInstallBiz(body)) } else { w.WriteHeader(http.StatusBadGateway) } @@ -105,7 +105,7 @@ func (b *MockHttpBase) Start(ctx context.Context, clientID string) error { }) mux.HandleFunc("/uninstallBiz", func(w http.ResponseWriter, r *http.Request) { - if b.reachable { + if base.reachable { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") @@ -116,7 +116,7 @@ func (b *MockHttpBase) Start(ctx context.Context, clientID string) error { return } - w.Write(b.processUnInstallBiz(body)) + w.Write(base.processUnInstallBiz(body)) } else { w.WriteHeader(http.StatusBadGateway) } @@ -126,16 +126,16 @@ func (b *MockHttpBase) Start(ctx context.Context, clientID string) error { // Start a new goroutine to upload node heart beat data every 10 seconds go utils.TimedTaskWithInterval(ctx, time.Second*10, func(ctx context.Context) { - if b.reachable { - log.G(ctx).Info("upload node heart beat data from node ", b.Metadata.Identity) - _, err := http.Post("http://127.0.0.1:7777/heartbeat", "application/json", bytes.NewBuffer(b.getHeartBeatMsg())) + if base.reachable { + log.G(ctx).Info("upload node heart beat data from node ", base.Metadata.Identity) + _, err := http.Post("http://127.0.0.1:7777/heartbeat", "application/json", bytes.NewBuffer(base.getHeartBeatMsg())) if err != nil { logrus.Errorf("error calling heartbeat: %s", err) } } }) - _, err := http.Post("http://127.0.0.1:7777/heartbeat", "application/json", bytes.NewBuffer(b.getHeartBeatMsg())) + _, err := http.Post("http://127.0.0.1:7777/heartbeat", "application/json", bytes.NewBuffer(base.getHeartBeatMsg())) if err != nil { logrus.Errorf("error calling heartbeat: %s", err) return err @@ -144,10 +144,10 @@ func (b *MockHttpBase) Start(ctx context.Context, clientID string) error { go func() { select { case <-ctx.Done(): - case <-b.exit: + case <-base.exit: } - b.CurrState = "DEACTIVATED" - _, err = http.Post("http://127.0.0.1:7777/heartbeat", "application/json", bytes.NewBuffer(b.getHeartBeatMsg())) + base.CurrState = "DEACTIVATED" + _, err = http.Post("http://127.0.0.1:7777/heartbeat", "application/json", bytes.NewBuffer(base.getHeartBeatMsg())) time.Sleep(2 * time.Second) server.Shutdown(ctx) if err != nil { @@ -186,6 +186,14 @@ func (b *MockHttpBase) getHealthMsg() []byte { BizState: b.CurrState, BizVersion: b.Metadata.Version, }, + Cpu: ark.CpuInfo{ + Count: 1, + TotalUsed: 20, + Type: "intel", + UserUsed: 2, + Free: 80, + SystemUsed: 13, + }, }, }, Message: "", @@ -235,13 +243,15 @@ func (b *MockHttpBase) processInstallBiz(msg []byte) []byte { }, } } - response := ark_service.ArkResponse{ + response := ark_service.ArkResponse[ark.ArkResponseData]{ Code: "SUCCESS", Data: ark.ArkResponseData{ - Code: "SUCCESS", - Message: "", + ArkClientResponse: ark.ArkClientResponse{ + Code: "SUCCESS", + Message: "", + BizInfos: nil, + }, ElapsedSpace: 0, - BizInfos: nil, }, Message: "", ErrorStackTrace: "", @@ -260,13 +270,15 @@ func (b *MockHttpBase) processUnInstallBiz(msg []byte) []byte { logrus.Infof("uninstall biz %s from http base", identity) delete(b.BizInfos, identity) // send to response - response := ark_service.ArkResponse{ + response := ark_service.ArkResponse[ark.ArkResponseData]{ Code: "SUCCESS", Data: ark.ArkResponseData{ - Code: "SUCCESS", - Message: "", + ArkClientResponse: ark.ArkClientResponse{ + Code: "SUCCESS", + Message: "", + BizInfos: nil, + }, ElapsedSpace: 0, - BizInfos: nil, }, Message: "", ErrorStackTrace: "", diff --git a/suite/http/module_http_lifecycle_test.go b/suite/http/module_http_lifecycle_test.go index c2ae966..8ac8e25 100644 --- a/suite/http/module_http_lifecycle_test.go +++ b/suite/http/module_http_lifecycle_test.go @@ -15,15 +15,17 @@ var _ = Describe("Module Lifecycle Test", func() { ctx := context.Background() - nodeID := "test-http-base" + // NOTICE: nodeId should be unique in suite test to avoid incorrect vnode handling pod or deployment. + nodeID := "http-base-for-module-test" clusterName := "test-cluster-name" - + // NOTICE: port should be unique in suite test to avoid incorrect base handling request. mockBase := NewMockHttpBase(nodeID, clusterName, "1.0.0", env, 1238) mockModulePod := prepareModulePod("test-module", "default", utils.FormatNodeName(nodeID, env)) - Context("pod install", func() { - It("base should become a ready vnode eventually", func() { + // NOTICE: if test cases will contaminate each other, the cases should add `Serial` keyword in ginkgo + Context("pod install", Serial, func() { + It("base should become a ready vnode eventually", Serial, func() { go mockBase.Start(ctx, clientID) Eventually(func() bool { vnode := &v1.Node{} @@ -38,10 +40,10 @@ var _ = Describe("Module Lifecycle Test", func() { } } return err == nil && vnodeReady - }, time.Second*50, time.Second).Should(BeTrue()) + }, time.Second*20, time.Second).Should(BeTrue()) }) - It("publish a module pod and it should be running", func() { + It("publish a module pod and it should be pending", Serial, func() { err := k8sClient.Create(ctx, &mockModulePod) Expect(err).To(BeNil()) Eventually(func() bool { @@ -50,14 +52,14 @@ var _ = Describe("Module Lifecycle Test", func() { Namespace: mockModulePod.Namespace, Name: mockModulePod.Name, }, podFromKubernetes) - return err == nil && podFromKubernetes.Status.Phase == v1.PodRunning - }, time.Second*50, time.Second).Should(BeTrue()) + return err == nil && podFromKubernetes.Status.Phase == v1.PodPending && podFromKubernetes.Spec.NodeName == utils.FormatNodeName(nodeID, env) + }, time.Second*20, time.Second).Should(BeTrue()) Eventually(func() bool { return len(mockBase.BizInfos) == 1 }, time.Second*20, time.Second).Should(BeTrue()) }) - It("delete pod, all modules should deactivated, pod should finally deleted from k8s", func() { + It("delete pod, all modules should deactivated, pod should finally deleted from k8s", Serial, func() { err := k8sClient.Delete(ctx, &mockModulePod) Expect(err).To(BeNil()) Eventually(func() bool { @@ -70,7 +72,7 @@ var _ = Describe("Module Lifecycle Test", func() { }, time.Second*40, time.Second).Should(BeTrue()) }) - It("base offline with deactive message and finally exit", func() { + It("base offline with deactive message and finally exit", Serial, func() { mockBase.Exit() Eventually(func() bool { vnode := &v1.Node{} diff --git a/suite/mqtt/mock_mqtt_base.go b/suite/mqtt/mock_mqtt_base.go index 0847f51..a647238 100644 --- a/suite/mqtt/mock_mqtt_base.go +++ b/suite/mqtt/mock_mqtt_base.go @@ -131,6 +131,14 @@ func (b *MockMQTTBase) getHealthMsg() []byte { BizState: b.CurrState, BizVersion: b.BaseMetadata.Version, }, + Cpu: ark.CpuInfo{ + Count: 1, + TotalUsed: 20, + Type: "intel", + UserUsed: 2, + Free: 80, + SystemUsed: 13, + }, }, }, Message: "", @@ -231,11 +239,15 @@ func (b *MockMQTTBase) processUnInstallBiz(msg []byte) { Command: model.CommandUnInstallBiz, BizName: request.BizName, BizVersion: request.BizVersion, - Response: ark_service.ArkResponse{ + Response: ark_service.ArkResponse[ark.ArkResponseData]{ Code: "SUCCESS", Data: ark.ArkResponseData{ - Code: "SUCCESS", - Message: "", + ArkClientResponse: ark.ArkClientResponse{ + Code: "SUCCESS", + Message: "", + BizInfos: nil, + }, + ElapsedSpace: 0, }, Message: "", ErrorStackTrace: "", @@ -293,11 +305,15 @@ func (b *MockMQTTBase) SetBizState(bizIdentity, state, reason, message string) { Command: model.CommandInstallBiz, BizName: info.BizName, BizVersion: info.BizVersion, - Response: ark_service.ArkResponse{ + Response: ark_service.ArkResponse[ark.ArkResponseData]{ Code: "SUCCESS", Data: ark.ArkResponseData{ - Code: "SUCCESS", - Message: "", + ArkClientResponse: ark.ArkClientResponse{ + Code: "SUCCESS", + Message: "", + BizInfos: nil, + }, + ElapsedSpace: 0, }, Message: "", ErrorStackTrace: "",