From 34430536e68384c998ee352d04eb65a3526c4c50 Mon Sep 17 00:00:00 2001 From: clarkdave Date: Tue, 28 Nov 2023 13:46:53 +0000 Subject: [PATCH 1/3] use node Address instead of ServiceAddress our rabbitmq consul setup likes to set the ServiceAddress to be its own node name, which is the internal dns record. this breaks the expectations of cloud map, which wants IP addresses we can use Node.Address here instead which will be the IP address of the node, and therefore the service address too (we don't have multiple network interfaces for our nodes) this also changes the service instance ID to the node name, which we know is unique and is easier to identify --- catalog/aws.go | 2 +- catalog/consul.go | 9 ++++----- catalog/service.go | 1 + 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/catalog/aws.go b/catalog/aws.go index cde2fc2..d2fe11f 100644 --- a/catalog/aws.go +++ b/catalog/aws.go @@ -363,7 +363,7 @@ func (a *aws) create(services map[string]service) int { wg.Add(1) go func(serviceID, name, h string, n node) { wg.Done() - instanceID := id(serviceID, h, n.port) + instanceID := n.name attributes := n.attributes attributes["AWS_INSTANCE_IPV4"] = h attributes["AWS_INSTANCE_PORT"] = fmt.Sprintf("%d", n.port) diff --git a/catalog/consul.go b/catalog/consul.go index c010f1b..1968dce 100644 --- a/catalog/consul.go +++ b/catalog/consul.go @@ -138,15 +138,14 @@ func (c *consul) sync(aws *aws, stop, stopped chan struct{}) { func (c *consul) transformNodes(cnodes []*api.CatalogService) map[string]map[int]node { nodes := map[string]map[int]node{} for _, n := range cnodes { - address := n.ServiceAddress - if len(address) == 0 { - address = n.Address - } + // use Address instead of ServiceAddress; RabbitMQ updates the service + // address to be its internal DNS instead which breaks stuff + address := n.Address if nodes[address] == nil { nodes[address] = map[int]node{} } ports := nodes[address] - ports[n.ServicePort] = node{port: n.ServicePort, host: address, consulID: n.ServiceID, awsID: n.ServiceMeta[ConsulAWSID], attributes: n.ServiceMeta} + ports[n.ServicePort] = node{name: n.Node, port: n.ServicePort, host: address, consulID: n.ServiceID, awsID: n.ServiceMeta[ConsulAWSID], attributes: n.ServiceMeta} nodes[address] = ports } return nodes diff --git a/catalog/service.go b/catalog/service.go index b0bc966..315cfb2 100644 --- a/catalog/service.go +++ b/catalog/service.go @@ -30,6 +30,7 @@ type service struct { } type node struct { + name string port int host string awsID string From 9eed2957531e335d15589f00aac342cb7ea53c0e Mon Sep 17 00:00:00 2001 From: clarkdave Date: Tue, 28 Nov 2023 14:13:11 +0000 Subject: [PATCH 2/3] include tags in service names by default, consul-aws only creates 1:1 services with those registered in consul, but we leverage tags to distinguish between difference kinds of service instances (e.g. primary and replica database nodes) this change takes tags into account when creating the list of services, resulting in one service per tag+service combination --- catalog/consul.go | 56 +++++++++++++++++++++++++++++----------------- catalog/service.go | 1 + 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/catalog/consul.go b/catalog/consul.go index 1968dce..5174f64 100644 --- a/catalog/consul.go +++ b/catalog/consul.go @@ -151,9 +151,9 @@ func (c *consul) transformNodes(cnodes []*api.CatalogService) map[string]map[int return nodes } -func (c *consul) fetchNodes(service string) ([]*api.CatalogService, error) { +func (c *consul) fetchNodes(service string, tag string) ([]*api.CatalogService, error) { opts := &api.QueryOptions{AllowStale: c.stale} - nodes, _, err := c.client.Catalog().Service(service, "", opts) + nodes, _, err := c.client.Catalog().Service(service, tag, opts) if err != nil { return nil, fmt.Errorf("error querying services, will retry: %s", err) } @@ -202,36 +202,50 @@ func (c *consul) fetch(waitIndex uint64) (uint64, error) { if err != nil { return waitIndex, fmt.Errorf("error fetching services: %s", err) } - services := c.transformServices(cservices) + services := map[string]service{} for id, s := range c.transformServices(cservices) { - if s.fromAWS { - id = c.awsPrefix + id - } - if cnodes, err := c.fetchNodes(id); err == nil { - s.nodes = c.transformNodes(cnodes) - } else { - c.log.Error("error fetching nodes", "error", err) - continue - } - if chealths, err := c.fetchHealth(id); err == nil { - s.healths = c.transformHealth(chealths) + if len(s.tags) == 0 { + services[id] = c.mapServiceWithTag(id, s, "") } else { - // TODO (hans): decide what to do when health errors - c.log.Error("error fetching health", "error", err) - } - if s.fromAWS { - s.healths = c.rekeyHealths(s.name, s.healths) + for _, tag := range s.tags { + services[tag+"."+id] = c.mapServiceWithTag(id, s, tag) + } } - services[id] = s } c.setServices(services) return waitIndex, nil } +func (c *consul) mapServiceWithTag(id string, s service, tag string) service { + if s.fromAWS { + id = c.awsPrefix + id + } + + if cnodes, err := c.fetchNodes(id, tag); err == nil { + s.nodes = c.transformNodes(cnodes) + } else { + c.log.Error("error fetching nodes", "error", err) + return s + } + + if chealths, err := c.fetchHealth(id); err == nil { + s.healths = c.transformHealth(chealths) + } else { + // TODO (hans): decide what to do when health errors + c.log.Error("error fetching health", "error", err) + } + + if s.fromAWS { + s.healths = c.rekeyHealths(s.name, s.healths) + } + + return s +} + func (c *consul) transformServices(cservices map[string][]string) map[string]service { services := make(map[string]service, len(cservices)) for k, tags := range cservices { - s := service{id: k, name: k, consulID: k} + s := service{id: k, name: k, consulID: k, tags: tags} for _, t := range tags { if t == ConsulAWSTag { s.fromAWS = true diff --git a/catalog/service.go b/catalog/service.go index 315cfb2..1cbb672 100644 --- a/catalog/service.go +++ b/catalog/service.go @@ -27,6 +27,7 @@ type service struct { awsID string consulID string awsNamespace string + tags []string } type node struct { From 9559df6184fdc78a8261db5bd9e30619840f48f0 Mon Sep 17 00:00:00 2001 From: clarkdave Date: Tue, 28 Nov 2023 14:40:16 +0000 Subject: [PATCH 3/3] create a records in cloud map in addition to srv records --- catalog/aws.go | 1 + 1 file changed, 1 insertion(+) diff --git a/catalog/aws.go b/catalog/aws.go index d2fe11f..5e53335 100644 --- a/catalog/aws.go +++ b/catalog/aws.go @@ -339,6 +339,7 @@ func (a *aws) create(services map[string]service) int { input.DnsConfig = &sd.DnsConfig{ DnsRecords: []sd.DnsRecord{ {TTL: &a.dnsTTL, Type: sd.RecordTypeSrv}, + {TTL: &a.dnsTTL, Type: sd.RecordTypeA}, }, } }