Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apis/network/v1alpha1/broker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ type BrokerSpec struct {
ClCert string `json:"clcert"`
CaCert string `json:"cacert"`
Role string `json:"role"`
Rule string `json:"rule"`
Metric string `json:"metric"`
}

// ClCert *corev1.Secret `json:"clcert"`
// CaCert *corev1.Secret `json:"cacert"`

// BrokerStatus defines the observed state of Broker.
type BrokerStatus struct {

Expand Down
6 changes: 6 additions & 0 deletions deployments/node/crds/network.fluidos.eu_brokers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,22 @@ spec:
type: string
clcert:
type: string
metric:
type: string
name:
type: string
role:
type: string
rule:
type: string
required:
- address
- cacert
- clcert
- metric
- name
- role
- rule
type: object
status:
description: BrokerStatus defines the observed state of Broker.
Expand Down
13 changes: 8 additions & 5 deletions deployments/node/samples/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ metadata:
name: broker-sample
namespace: fluidos
spec:
name: fluidos.top-ix.org
name: broker-sample
address: fluidos.top-ix.org
# "publisher" only publisher
# "subscriber" only subscriber
# anything else both publisher AND subscriber
# role
# "publisher" -> publisher only, "subscriber" -> subscriber only
# anything else -> both publisher AND subscriber
role: both
#secrets must be created from certificate and key provided by broker server
rule: "{\"MAXlatency\":1000,\"MINbandwidth\":1,\"locations\":[\"K\",\"J\",\"Z\"],\"example\":\"aAbB12\"}"
metric: "{\"bandwidth\":1,\"latency\":10,\"location\":\"K\",\"foo\":\"bar\"}"
#secrets must be created from certificates and key provided by broker server's administrator
cacert: brokera-ca-xxxxx
clcert: brokera-cl-yyyyy

6 changes: 6 additions & 0 deletions deployments/node/samples/metric.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"bandwidth": 1,
"latency": 10,
"location": "K",
"foo": "bar"
}
6 changes: 6 additions & 0 deletions deployments/node/samples/routing-rule.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"MAXlatency": 1000,
"MINbandwidth": 1,
"locations": ["K","J","Z"],
"example": "aAbB12"
}
115 changes: 81 additions & 34 deletions pkg/network-manager/broker_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,22 @@ type BrokerClient struct {
serverAddr string
clientCert *corev1.Secret
rootCert *corev1.Secret

metric string
clientName string
brokerConn *brokerConnection
}

// BrokerConnection keeps all the broker connection data.
type brokerConnection struct {
amqpConn *amqp.Connection
amqpChan *amqp.Channel
exchangeName string
routingKey string
queueName string
inboundMsgs <-chan amqp.Delivery
outboundMsg []byte
confirms chan amqp.Confirmation
amqpConn *amqp.Connection
amqpChan *amqp.Channel
announceExchangeName string
ruleExchangeName string
queueName string
inboundMsgs <-chan amqp.Delivery
outboundAnnounceMsg []byte
outboundRuleMsg []byte
confirms chan amqp.Confirmation
}

// SetupBrokerClient sets the Broker Client from NM reconcile.
Expand All @@ -90,9 +92,10 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
bc.serverAddr = broker.Spec.Address

bc.brokerConn = &brokerConnection{}
bc.brokerConn.exchangeName = "DefaultPeerRequest"
bc.brokerConn.announceExchangeName = "announcements_exchange"
bc.brokerConn.ruleExchangeName = "rules_exchange"

bc.brokerConn.outboundMsg, err = json.Marshal(bc.ID)
bc.brokerConn.outboundAnnounceMsg, err = json.Marshal(bc.ID)
if err != nil {
return err
}
Expand All @@ -113,13 +116,14 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
}

// Certificates.
bc.clientCert = &corev1.Secret{}
bc.rootCert = &corev1.Secret{}

klog.Infof("Root Secret Name: %s\n", broker.Spec.CaCert)
klog.Infof("Client Secret Name: %s\n", broker.Spec.ClCert)
secretNamespace := "fluidos"

bc.clientCert = &corev1.Secret{}
bc.rootCert = &corev1.Secret{}

err = bc.extractSecret(cl, broker.Spec.ClCert, secretNamespace, bc.clientCert)
if err != nil {
return err
Expand Down Expand Up @@ -160,12 +164,25 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
}

// Routing key for topic.
bc.brokerConn.routingKey, err = extractCNfromCert(&clientCert)
bc.brokerConn.queueName, err = extractCNfromCert(&clientCert)
if err != nil {
klog.Errorf("Common Name extraction error: %v", err)
}
bc.brokerConn.queueName = bc.brokerConn.routingKey

bc.brokerConn.outboundRuleMsg, err = json.Marshal(broker.Spec.Rule)
if err != nil {
klog.Errorf("Error reading rules JSON: %s", err)
}

// Set the metric to be sent to the broker.
bc.metric = broker.Spec.Metric
bc.brokerConn.outboundAnnounceMsg = nil
err = bc.buildOutboundMessage()
if err != nil {
klog.Errorf("Message building error: %v", err)
}

klog.Infof("outbound msg: %s\n", bc.brokerConn.outboundRuleMsg)
// TLS config.
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
Expand All @@ -174,48 +191,57 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
MinVersion: tls.VersionTLS12,
}

bc.clientName = bc.brokerConn.queueName

err = bc.brokerConnectionConfig(tlsConfig)

return err
}

// ExecuteBrokerClient executes the Network Manager Broker routines.
func (bc *BrokerClient) ExecuteBrokerClient(cl client.Client) error {
// Start sending messages
// Start sending announcement messages
klog.Info("executing broker client routines")
var err error
if bc.pubFlag {
go func() {
bc.publishOnBroker()
bc.publishOnBroker(bc.brokerConn.announceExchangeName, bc.brokerConn.outboundAnnounceMsg)
}()
}

// Start receiving messages
// Start receiving announcement messages
if bc.subFlag {
go func() {
if err = bc.readMsgOnBroker(bc.ctx, cl); err != nil {
klog.ErrorS(err, "error receiving advertisement")
}
}()
}

// Start sending rule messages
go func() {
bc.publishOnBroker(bc.brokerConn.ruleExchangeName, bc.brokerConn.outboundRuleMsg)
}()

return err
}

func (bc *BrokerClient) publishOnBroker() {
func (bc *BrokerClient) publishOnBroker(exchangeName string, message []byte) {
ticker := time.NewTicker(10 * time.Second)
for {
select {
case <-ticker.C:

// Pub on exchange
err := bc.brokerConn.amqpChan.Publish(
bc.brokerConn.exchangeName,
bc.brokerConn.routingKey,
true, // Mandatory: if not routable -> error
exchangeName,
"", // routingKey
false, // Mandatory: if not routable -> error
false, // Immediate
amqp.Publishing{
ContentType: "application/json",
Body: bc.brokerConn.outboundMsg,
UserId: bc.clientName,
Body: message,
Expiration: "30000", // TTL ms
})
if err != nil {
Expand All @@ -225,12 +251,12 @@ func (bc *BrokerClient) publishOnBroker() {
select {
case confirm := <-bc.brokerConn.confirms:
if confirm.Ack {
klog.Info("Message successfully published!")
klog.InfoS("Message successfully published on ", exchangeName)
} else {
klog.Info("Message failed to publish!")
klog.InfoS("Message failed to publish on ", exchangeName)
}
case <-time.After(5 * time.Second): // Timeout
klog.Info("No confirmation received, message status unknown.")
case <-time.After(15 * time.Second): // Timeout
klog.InfoS("No confirmation received, message status unknown from ", exchangeName)
}

case <-bc.ctx.Done():
Expand Down Expand Up @@ -309,7 +335,7 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
SASL: []amqp.Authentication{&amqp.ExternalAuth{}}, // auth EXTERNAL
TLSClientConfig: tlsConfig, // config TLS
Vhost: "/", // vhost
Heartbeat: 10 * time.Second, // heartbeat
Heartbeat: 5 * time.Second, // heartbeat
}

// Config connection
Expand Down Expand Up @@ -343,17 +369,16 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
return err
}

// Write confirm broker
// Write confirmations
if err := bc.brokerConn.amqpChan.Confirm(false); err != nil {
klog.Errorf("Failed to enable publisher confirms: %v", err)
klog.Errorf("Failed to enable publisher confirms for Announcements: %v", err)
return err
}

// Channels for write confirm
bc.brokerConn.confirms = bc.brokerConn.amqpChan.NotifyPublish(make(chan amqp.Confirmation, 1))

klog.InfoS("Node", "ID", bc.ID.NodeID, "Client Address", bc.ID.IP, "Server Address", bc.serverAddr, "RoutingKey", bc.brokerConn.routingKey)
// Channels for write confirmations
bc.brokerConn.confirms = bc.brokerConn.amqpChan.NotifyPublish(make(chan amqp.Confirmation, 5))

klog.InfoS("Node", "ID", bc.ID.NodeID, "Client Address", bc.ID.IP, "Server Address", bc.serverAddr /*, "RoutingKey" , bc.brokerConn.routingKey*/)
return nil
}

Expand All @@ -368,3 +393,25 @@ func (bc *BrokerClient) extractSecret(cl client.Client, secretName, secretNamesp
}
return nil
}

func (bc *BrokerClient) buildOutboundMessage() error {
var err error

var data map[string]interface{}
err = json.Unmarshal([]byte(bc.metric), &data)
if err != nil {
klog.Error("Error parsing JSON in builOutboundMessage:", err)
return err
}

data["ip"] = bc.ID.IP
data["domain"] = bc.ID.Domain
data["nodeID"] = bc.ID.NodeID

bc.brokerConn.outboundAnnounceMsg, err = json.Marshal(data)
if err != nil {
klog.Errorf("Error reading metric JSON: %s", err)
return err
}
return nil
}
9 changes: 9 additions & 0 deletions tools/scripts/broker-creation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ broker_ca_cert="null"
broker_client_cert="null"
broker_priv_key="null"
role="null"
rule_file="null"
metric_file="null"

read_input "Broker's name of your choice" "broker_name"
read_input "Broker server address (must match certificate CN)" "address"
read_input ".pem ROOT certificate" "broker_ca_cert"
read_input ".pem client certificate" "broker_client_cert"
read_input ".pem private key" "broker_priv_key"
read_input "Type the role: publisher | subscriber | both" "role"
read_input ".json file for RULE" "rule_file"
read_input ".json file for metrics" "metric_file"

rule_json=$(jq -c . "$rule_file" | sed 's/"/\\"/g')
metric_json=$(jq -c . "$metric_file" | sed 's/"/\\"/g')

broker_ca_secret="$broker_name"-ca-"$RANDOM"
broker_client_secret="$broker_name"-cl-"$RANDOM"
Expand Down Expand Up @@ -55,6 +62,8 @@ spec:
name: $broker_name
address: $address
role: $role
rule: "$rule_json"
metric: "$metric_json"
cacert: $broker_ca_secret
clcert: $broker_client_secret

Expand Down
Loading