Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion catalog/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (a *aws) create(services map[string]service) int {
input.DnsConfig = &sd.DnsConfig{
DnsRecords: []sd.DnsRecord{
{TTL: &a.dnsTTL, Type: sd.RecordTypeSrv},
{TTL: &a.dnsTTL, Type: sd.RecordTypeA},
},
}
}
Expand All @@ -363,7 +364,7 @@ func (a *aws) create(services map[string]service) int {
wg.Add(1)
go func(serviceID, name, h string, n node) {
wg.Done()
instanceID := id(serviceID, h, n.port)
instanceID := n.name
attributes := n.attributes
attributes["AWS_INSTANCE_IPV4"] = h
attributes["AWS_INSTANCE_PORT"] = fmt.Sprintf("%d", n.port)
Expand Down
65 changes: 39 additions & 26 deletions catalog/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,22 @@ func (c *consul) sync(aws *aws, stop, stopped chan struct{}) {
func (c *consul) transformNodes(cnodes []*api.CatalogService) map[string]map[int]node {
nodes := map[string]map[int]node{}
for _, n := range cnodes {
address := n.ServiceAddress
if len(address) == 0 {
address = n.Address
}
// use Address instead of ServiceAddress; RabbitMQ updates the service
// address to be its internal DNS instead which breaks stuff
address := n.Address
if nodes[address] == nil {
nodes[address] = map[int]node{}
}
ports := nodes[address]
ports[n.ServicePort] = node{port: n.ServicePort, host: address, consulID: n.ServiceID, awsID: n.ServiceMeta[ConsulAWSID], attributes: n.ServiceMeta}
ports[n.ServicePort] = node{name: n.Node, port: n.ServicePort, host: address, consulID: n.ServiceID, awsID: n.ServiceMeta[ConsulAWSID], attributes: n.ServiceMeta}
nodes[address] = ports
}
return nodes
}

func (c *consul) fetchNodes(service string) ([]*api.CatalogService, error) {
func (c *consul) fetchNodes(service string, tag string) ([]*api.CatalogService, error) {
opts := &api.QueryOptions{AllowStale: c.stale}
nodes, _, err := c.client.Catalog().Service(service, "", opts)
nodes, _, err := c.client.Catalog().Service(service, tag, opts)
if err != nil {
return nil, fmt.Errorf("error querying services, will retry: %s", err)
}
Expand Down Expand Up @@ -203,36 +202,50 @@ func (c *consul) fetch(waitIndex uint64) (uint64, error) {
if err != nil {
return waitIndex, fmt.Errorf("error fetching services: %s", err)
}
services := c.transformServices(cservices)
services := map[string]service{}
for id, s := range c.transformServices(cservices) {
if s.fromAWS {
id = c.awsPrefix + id
}
if cnodes, err := c.fetchNodes(id); err == nil {
s.nodes = c.transformNodes(cnodes)
} else {
c.log.Error("error fetching nodes", "error", err)
continue
}
if chealths, err := c.fetchHealth(id); err == nil {
s.healths = c.transformHealth(chealths)
if len(s.tags) == 0 {
services[id] = c.mapServiceWithTag(id, s, "")
} else {
// TODO (hans): decide what to do when health errors
c.log.Error("error fetching health", "error", err)
}
if s.fromAWS {
s.healths = c.rekeyHealths(s.name, s.healths)
for _, tag := range s.tags {
services[tag+"."+id] = c.mapServiceWithTag(id, s, tag)
}
}
services[id] = s
}
c.setServices(services)
return waitIndex, nil
}

func (c *consul) mapServiceWithTag(id string, s service, tag string) service {
if s.fromAWS {
id = c.awsPrefix + id
}

if cnodes, err := c.fetchNodes(id, tag); err == nil {
s.nodes = c.transformNodes(cnodes)
} else {
c.log.Error("error fetching nodes", "error", err)
return s
}

if chealths, err := c.fetchHealth(id); err == nil {
s.healths = c.transformHealth(chealths)
} else {
// TODO (hans): decide what to do when health errors
c.log.Error("error fetching health", "error", err)
}

if s.fromAWS {
s.healths = c.rekeyHealths(s.name, s.healths)
}

return s
}

func (c *consul) transformServices(cservices map[string][]string) map[string]service {
services := make(map[string]service, len(cservices))
for k, tags := range cservices {
s := service{id: k, name: k, consulID: k}
s := service{id: k, name: k, consulID: k, tags: tags}
for _, t := range tags {
if t == ConsulAWSTag {
s.fromAWS = true
Expand Down
2 changes: 2 additions & 0 deletions catalog/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ type service struct {
awsID string
consulID string
awsNamespace string
tags []string
}

type node struct {
name string
port int
host string
awsID string
Expand Down