From 2312240cc66e29937d1d5009e66078ec612ccbfa Mon Sep 17 00:00:00 2001 From: AlessandroCarbonelli Date: Mon, 28 Apr 2025 17:45:59 +0200 Subject: [PATCH] NetworkManager updated to Liqo v1.0.0 and broker logic updated --- apis/network/v1alpha1/broker_types.go | 5 + .../node/crds/network.fluidos.eu_brokers.yaml | 6 + deployments/node/samples/broker.yaml | 13 +- deployments/node/samples/metric.json | 6 + deployments/node/samples/routing-rule.json | 6 + pkg/network-manager/broker_client.go | 115 ++++++++++++------ tools/scripts/broker-creation.sh | 9 ++ 7 files changed, 121 insertions(+), 39 deletions(-) create mode 100644 deployments/node/samples/metric.json create mode 100644 deployments/node/samples/routing-rule.json diff --git a/apis/network/v1alpha1/broker_types.go b/apis/network/v1alpha1/broker_types.go index 5c15f10..dcb941b 100644 --- a/apis/network/v1alpha1/broker_types.go +++ b/apis/network/v1alpha1/broker_types.go @@ -27,8 +27,13 @@ type BrokerSpec struct { ClCert string `json:"clcert"` CaCert string `json:"cacert"` Role string `json:"role"` + Rule string `json:"rule"` + Metric string `json:"metric"` } +// ClCert *corev1.Secret `json:"clcert"` +// CaCert *corev1.Secret `json:"cacert"` + // BrokerStatus defines the observed state of Broker. type BrokerStatus struct { diff --git a/deployments/node/crds/network.fluidos.eu_brokers.yaml b/deployments/node/crds/network.fluidos.eu_brokers.yaml index 17a3dae..8f1f91b 100644 --- a/deployments/node/crds/network.fluidos.eu_brokers.yaml +++ b/deployments/node/crds/network.fluidos.eu_brokers.yaml @@ -49,16 +49,22 @@ spec: type: string clcert: type: string + metric: + type: string name: type: string role: type: string + rule: + type: string required: - address - cacert - clcert + - metric - name - role + - rule type: object status: description: BrokerStatus defines the observed state of Broker. diff --git a/deployments/node/samples/broker.yaml b/deployments/node/samples/broker.yaml index d03f09a..810990a 100644 --- a/deployments/node/samples/broker.yaml +++ b/deployments/node/samples/broker.yaml @@ -5,12 +5,15 @@ metadata: name: broker-sample namespace: fluidos spec: - name: fluidos.top-ix.org + name: broker-sample address: fluidos.top-ix.org - # "publisher" only publisher - # "subscriber" only subscriber - # anything else both publisher AND subscriber + # role + # "publisher" -> publisher only, "subscriber" -> subscriber only + # anything else -> both publisher AND subscriber role: both - #secrets must be created from certificate and key provided by broker server + rule: "{\"MAXlatency\":1000,\"MINbandwidth\":1,\"locations\":[\"K\",\"J\",\"Z\"],\"example\":\"aAbB12\"}" + metric: "{\"bandwidth\":1,\"latency\":10,\"location\":\"K\",\"foo\":\"bar\"}" + #secrets must be created from certificates and key provided by broker server's administrator cacert: brokera-ca-xxxxx clcert: brokera-cl-yyyyy + diff --git a/deployments/node/samples/metric.json b/deployments/node/samples/metric.json new file mode 100644 index 0000000..d26fe64 --- /dev/null +++ b/deployments/node/samples/metric.json @@ -0,0 +1,6 @@ +{ + "bandwidth": 1, + "latency": 10, + "location": "K", + "foo": "bar" +} \ No newline at end of file diff --git a/deployments/node/samples/routing-rule.json b/deployments/node/samples/routing-rule.json new file mode 100644 index 0000000..cd63578 --- /dev/null +++ b/deployments/node/samples/routing-rule.json @@ -0,0 +1,6 @@ +{ + "MAXlatency": 1000, + "MINbandwidth": 1, + "locations": ["K","J","Z"], + "example": "aAbB12" +} \ No newline at end of file diff --git a/pkg/network-manager/broker_client.go b/pkg/network-manager/broker_client.go index dec77cb..dfe0233 100644 --- a/pkg/network-manager/broker_client.go +++ b/pkg/network-manager/broker_client.go @@ -56,20 +56,22 @@ type BrokerClient struct { serverAddr string clientCert *corev1.Secret rootCert *corev1.Secret - + metric string + clientName string brokerConn *brokerConnection } // BrokerConnection keeps all the broker connection data. type brokerConnection struct { - amqpConn *amqp.Connection - amqpChan *amqp.Channel - exchangeName string - routingKey string - queueName string - inboundMsgs <-chan amqp.Delivery - outboundMsg []byte - confirms chan amqp.Confirmation + amqpConn *amqp.Connection + amqpChan *amqp.Channel + announceExchangeName string + ruleExchangeName string + queueName string + inboundMsgs <-chan amqp.Delivery + outboundAnnounceMsg []byte + outboundRuleMsg []byte + confirms chan amqp.Confirmation } // SetupBrokerClient sets the Broker Client from NM reconcile. @@ -90,9 +92,10 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp bc.serverAddr = broker.Spec.Address bc.brokerConn = &brokerConnection{} - bc.brokerConn.exchangeName = "DefaultPeerRequest" + bc.brokerConn.announceExchangeName = "announcements_exchange" + bc.brokerConn.ruleExchangeName = "rules_exchange" - bc.brokerConn.outboundMsg, err = json.Marshal(bc.ID) + bc.brokerConn.outboundAnnounceMsg, err = json.Marshal(bc.ID) if err != nil { return err } @@ -113,13 +116,14 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp } // Certificates. - bc.clientCert = &corev1.Secret{} - bc.rootCert = &corev1.Secret{} klog.Infof("Root Secret Name: %s\n", broker.Spec.CaCert) klog.Infof("Client Secret Name: %s\n", broker.Spec.ClCert) secretNamespace := "fluidos" + bc.clientCert = &corev1.Secret{} + bc.rootCert = &corev1.Secret{} + err = bc.extractSecret(cl, broker.Spec.ClCert, secretNamespace, bc.clientCert) if err != nil { return err @@ -160,12 +164,25 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp } // Routing key for topic. - bc.brokerConn.routingKey, err = extractCNfromCert(&clientCert) + bc.brokerConn.queueName, err = extractCNfromCert(&clientCert) if err != nil { klog.Errorf("Common Name extraction error: %v", err) } - bc.brokerConn.queueName = bc.brokerConn.routingKey + bc.brokerConn.outboundRuleMsg, err = json.Marshal(broker.Spec.Rule) + if err != nil { + klog.Errorf("Error reading rules JSON: %s", err) + } + + // Set the metric to be sent to the broker. + bc.metric = broker.Spec.Metric + bc.brokerConn.outboundAnnounceMsg = nil + err = bc.buildOutboundMessage() + if err != nil { + klog.Errorf("Message building error: %v", err) + } + + klog.Infof("outbound msg: %s\n", bc.brokerConn.outboundRuleMsg) // TLS config. tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, @@ -174,6 +191,8 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp MinVersion: tls.VersionTLS12, } + bc.clientName = bc.brokerConn.queueName + err = bc.brokerConnectionConfig(tlsConfig) return err @@ -181,16 +200,16 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp // ExecuteBrokerClient executes the Network Manager Broker routines. func (bc *BrokerClient) ExecuteBrokerClient(cl client.Client) error { - // Start sending messages + // Start sending announcement messages klog.Info("executing broker client routines") var err error if bc.pubFlag { go func() { - bc.publishOnBroker() + bc.publishOnBroker(bc.brokerConn.announceExchangeName, bc.brokerConn.outboundAnnounceMsg) }() } - // Start receiving messages + // Start receiving announcement messages if bc.subFlag { go func() { if err = bc.readMsgOnBroker(bc.ctx, cl); err != nil { @@ -198,10 +217,16 @@ func (bc *BrokerClient) ExecuteBrokerClient(cl client.Client) error { } }() } + + // Start sending rule messages + go func() { + bc.publishOnBroker(bc.brokerConn.ruleExchangeName, bc.brokerConn.outboundRuleMsg) + }() + return err } -func (bc *BrokerClient) publishOnBroker() { +func (bc *BrokerClient) publishOnBroker(exchangeName string, message []byte) { ticker := time.NewTicker(10 * time.Second) for { select { @@ -209,13 +234,14 @@ func (bc *BrokerClient) publishOnBroker() { // Pub on exchange err := bc.brokerConn.amqpChan.Publish( - bc.brokerConn.exchangeName, - bc.brokerConn.routingKey, - true, // Mandatory: if not routable -> error + exchangeName, + "", // routingKey + false, // Mandatory: if not routable -> error false, // Immediate amqp.Publishing{ ContentType: "application/json", - Body: bc.brokerConn.outboundMsg, + UserId: bc.clientName, + Body: message, Expiration: "30000", // TTL ms }) if err != nil { @@ -225,12 +251,12 @@ func (bc *BrokerClient) publishOnBroker() { select { case confirm := <-bc.brokerConn.confirms: if confirm.Ack { - klog.Info("Message successfully published!") + klog.InfoS("Message successfully published on ", exchangeName) } else { - klog.Info("Message failed to publish!") + klog.InfoS("Message failed to publish on ", exchangeName) } - case <-time.After(5 * time.Second): // Timeout - klog.Info("No confirmation received, message status unknown.") + case <-time.After(15 * time.Second): // Timeout + klog.InfoS("No confirmation received, message status unknown from ", exchangeName) } case <-bc.ctx.Done(): @@ -309,7 +335,7 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error { SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, // auth EXTERNAL TLSClientConfig: tlsConfig, // config TLS Vhost: "/", // vhost - Heartbeat: 10 * time.Second, // heartbeat + Heartbeat: 5 * time.Second, // heartbeat } // Config connection @@ -343,17 +369,16 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error { return err } - // Write confirm broker + // Write confirmations if err := bc.brokerConn.amqpChan.Confirm(false); err != nil { - klog.Errorf("Failed to enable publisher confirms: %v", err) + klog.Errorf("Failed to enable publisher confirms for Announcements: %v", err) return err } - // Channels for write confirm - bc.brokerConn.confirms = bc.brokerConn.amqpChan.NotifyPublish(make(chan amqp.Confirmation, 1)) - - klog.InfoS("Node", "ID", bc.ID.NodeID, "Client Address", bc.ID.IP, "Server Address", bc.serverAddr, "RoutingKey", bc.brokerConn.routingKey) + // Channels for write confirmations + bc.brokerConn.confirms = bc.brokerConn.amqpChan.NotifyPublish(make(chan amqp.Confirmation, 5)) + klog.InfoS("Node", "ID", bc.ID.NodeID, "Client Address", bc.ID.IP, "Server Address", bc.serverAddr /*, "RoutingKey" , bc.brokerConn.routingKey*/) return nil } @@ -368,3 +393,25 @@ func (bc *BrokerClient) extractSecret(cl client.Client, secretName, secretNamesp } return nil } + +func (bc *BrokerClient) buildOutboundMessage() error { + var err error + + var data map[string]interface{} + err = json.Unmarshal([]byte(bc.metric), &data) + if err != nil { + klog.Error("Error parsing JSON in builOutboundMessage:", err) + return err + } + + data["ip"] = bc.ID.IP + data["domain"] = bc.ID.Domain + data["nodeID"] = bc.ID.NodeID + + bc.brokerConn.outboundAnnounceMsg, err = json.Marshal(data) + if err != nil { + klog.Errorf("Error reading metric JSON: %s", err) + return err + } + return nil +} diff --git a/tools/scripts/broker-creation.sh b/tools/scripts/broker-creation.sh index 29e6fd4..e951b93 100755 --- a/tools/scripts/broker-creation.sh +++ b/tools/scripts/broker-creation.sh @@ -19,6 +19,8 @@ broker_ca_cert="null" broker_client_cert="null" broker_priv_key="null" role="null" +rule_file="null" +metric_file="null" read_input "Broker's name of your choice" "broker_name" read_input "Broker server address (must match certificate CN)" "address" @@ -26,6 +28,11 @@ read_input ".pem ROOT certificate" "broker_ca_cert" read_input ".pem client certificate" "broker_client_cert" read_input ".pem private key" "broker_priv_key" read_input "Type the role: publisher | subscriber | both" "role" +read_input ".json file for RULE" "rule_file" +read_input ".json file for metrics" "metric_file" + +rule_json=$(jq -c . "$rule_file" | sed 's/"/\\"/g') +metric_json=$(jq -c . "$metric_file" | sed 's/"/\\"/g') broker_ca_secret="$broker_name"-ca-"$RANDOM" broker_client_secret="$broker_name"-cl-"$RANDOM" @@ -55,6 +62,8 @@ spec: name: $broker_name address: $address role: $role + rule: "$rule_json" + metric: "$metric_json" cacert: $broker_ca_secret clcert: $broker_client_secret