diff --git a/catalog/aws.go b/catalog/aws.go index cde2fc2..5e53335 100644 --- a/catalog/aws.go +++ b/catalog/aws.go @@ -339,6 +339,7 @@ func (a *aws) create(services map[string]service) int { input.DnsConfig = &sd.DnsConfig{ DnsRecords: []sd.DnsRecord{ {TTL: &a.dnsTTL, Type: sd.RecordTypeSrv}, + {TTL: &a.dnsTTL, Type: sd.RecordTypeA}, }, } } @@ -363,7 +364,7 @@ func (a *aws) create(services map[string]service) int { wg.Add(1) go func(serviceID, name, h string, n node) { wg.Done() - instanceID := id(serviceID, h, n.port) + instanceID := n.name attributes := n.attributes attributes["AWS_INSTANCE_IPV4"] = h attributes["AWS_INSTANCE_PORT"] = fmt.Sprintf("%d", n.port) diff --git a/catalog/consul.go b/catalog/consul.go index c010f1b..5174f64 100644 --- a/catalog/consul.go +++ b/catalog/consul.go @@ -138,23 +138,22 @@ func (c *consul) sync(aws *aws, stop, stopped chan struct{}) { func (c *consul) transformNodes(cnodes []*api.CatalogService) map[string]map[int]node { nodes := map[string]map[int]node{} for _, n := range cnodes { - address := n.ServiceAddress - if len(address) == 0 { - address = n.Address - } + // use Address instead of ServiceAddress; RabbitMQ updates the service + // address to be its internal DNS instead which breaks stuff + address := n.Address if nodes[address] == nil { nodes[address] = map[int]node{} } ports := nodes[address] - ports[n.ServicePort] = node{port: n.ServicePort, host: address, consulID: n.ServiceID, awsID: n.ServiceMeta[ConsulAWSID], attributes: n.ServiceMeta} + ports[n.ServicePort] = node{name: n.Node, port: n.ServicePort, host: address, consulID: n.ServiceID, awsID: n.ServiceMeta[ConsulAWSID], attributes: n.ServiceMeta} nodes[address] = ports } return nodes } -func (c *consul) fetchNodes(service string) ([]*api.CatalogService, error) { +func (c *consul) fetchNodes(service string, tag string) ([]*api.CatalogService, error) { opts := &api.QueryOptions{AllowStale: c.stale} - nodes, _, err := c.client.Catalog().Service(service, "", opts) + nodes, _, err := c.client.Catalog().Service(service, tag, opts) if err != nil { return nil, fmt.Errorf("error querying services, will retry: %s", err) } @@ -203,36 +202,50 @@ func (c *consul) fetch(waitIndex uint64) (uint64, error) { if err != nil { return waitIndex, fmt.Errorf("error fetching services: %s", err) } - services := c.transformServices(cservices) + services := map[string]service{} for id, s := range c.transformServices(cservices) { - if s.fromAWS { - id = c.awsPrefix + id - } - if cnodes, err := c.fetchNodes(id); err == nil { - s.nodes = c.transformNodes(cnodes) - } else { - c.log.Error("error fetching nodes", "error", err) - continue - } - if chealths, err := c.fetchHealth(id); err == nil { - s.healths = c.transformHealth(chealths) + if len(s.tags) == 0 { + services[id] = c.mapServiceWithTag(id, s, "") } else { - // TODO (hans): decide what to do when health errors - c.log.Error("error fetching health", "error", err) - } - if s.fromAWS { - s.healths = c.rekeyHealths(s.name, s.healths) + for _, tag := range s.tags { + services[tag+"."+id] = c.mapServiceWithTag(id, s, tag) + } } - services[id] = s } c.setServices(services) return waitIndex, nil } +func (c *consul) mapServiceWithTag(id string, s service, tag string) service { + if s.fromAWS { + id = c.awsPrefix + id + } + + if cnodes, err := c.fetchNodes(id, tag); err == nil { + s.nodes = c.transformNodes(cnodes) + } else { + c.log.Error("error fetching nodes", "error", err) + return s + } + + if chealths, err := c.fetchHealth(id); err == nil { + s.healths = c.transformHealth(chealths) + } else { + // TODO (hans): decide what to do when health errors + c.log.Error("error fetching health", "error", err) + } + + if s.fromAWS { + s.healths = c.rekeyHealths(s.name, s.healths) + } + + return s +} + func (c *consul) transformServices(cservices map[string][]string) map[string]service { services := make(map[string]service, len(cservices)) for k, tags := range cservices { - s := service{id: k, name: k, consulID: k} + s := service{id: k, name: k, consulID: k, tags: tags} for _, t := range tags { if t == ConsulAWSTag { s.fromAWS = true diff --git a/catalog/service.go b/catalog/service.go index b0bc966..1cbb672 100644 --- a/catalog/service.go +++ b/catalog/service.go @@ -27,9 +27,11 @@ type service struct { awsID string consulID string awsNamespace string + tags []string } type node struct { + name string port int host string awsID string