Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Or to run a specific test in the suite:
go test ./... -run SomeTestFunction_name
```

**Note:** To run the sync integration tests, you must specify `INTTEST=1` in your environment and [AWS credentials](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials).
**Note:** To run the sync integration tests, you must specify `INTTEST=1` in your environment and [AWS credentials](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials). To run enterprise based integration tests, you must specify `INTTEST_ENTERPRISE=1` in your environment.

## Compatibility with Consul

Expand Down
2 changes: 1 addition & 1 deletion catalog/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (a *aws) remove(services map[string]service) int {
return count
}

func (a *aws) fetchIndefinetely(stop, stopped chan struct{}) {
func (a *aws) fetchIndefinitely(stop, stopped chan struct{}) {
defer close(stopped)
for {
err := a.fetch()
Expand Down
62 changes: 44 additions & 18 deletions catalog/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ const (
)

type consul struct {
client *api.Client
log hclog.Logger
consulPrefix string
awsPrefix string
services map[string]service
trigger chan bool
lock sync.RWMutex
toAWS bool
stale bool
client *api.Client
log hclog.Logger
consulPrefix string
awsPrefix string
services map[string]service
trigger chan bool
lock sync.RWMutex
toAWS bool
stale bool
namespace string
adminPartition string
}

func (c *consul) getServices() map[string]service {
Expand Down Expand Up @@ -153,7 +155,11 @@ func (c *consul) transformNodes(cnodes []*api.CatalogService) map[string]map[int
}

func (c *consul) fetchNodes(service string) ([]*api.CatalogService, error) {
opts := &api.QueryOptions{AllowStale: c.stale}
opts := &api.QueryOptions{
AllowStale: c.stale,
Namespace: c.namespace,
Partition: c.adminPartition,
}
nodes, _, err := c.client.Catalog().Service(service, "", opts)
if err != nil {
return nil, fmt.Errorf("error querying services, will retry: %s", err)
Expand All @@ -177,7 +183,11 @@ func (c *consul) transformHealth(chealths api.HealthChecks) map[string]health {
}

func (c *consul) fetchHealth(name string) (api.HealthChecks, error) {
opts := &api.QueryOptions{AllowStale: c.stale}
opts := &api.QueryOptions{
AllowStale: c.stale,
Namespace: c.namespace,
Partition: c.adminPartition,
}
status, _, err := c.client.Health().Checks(name, opts)
if err != nil {
return nil, fmt.Errorf("error querying health, will retry: %s", err)
Expand All @@ -187,6 +197,8 @@ func (c *consul) fetchHealth(name string) (api.HealthChecks, error) {

func (c *consul) fetchServices(waitIndex uint64) (map[string][]string, uint64, error) {
opts := &api.QueryOptions{
Namespace: c.namespace,
Partition: c.adminPartition,
AllowStale: c.stale,
WaitIndex: waitIndex,
WaitTime: WaitTime * time.Second,
Expand Down Expand Up @@ -222,6 +234,7 @@ func (c *consul) fetch(waitIndex uint64) (uint64, error) {
}
if s.fromAWS {
s.healths = c.rekeyHealths(s.name, s.healths)
id = strings.TrimPrefix(id, c.awsPrefix)
}
services[id] = s
}
Expand Down Expand Up @@ -258,7 +271,7 @@ func (c *consul) rekeyHealths(name string, healths map[string]health) map[string
return rekeyed
}

func (c *consul) fetchIndefinetely(stop, stopped chan struct{}) {
func (c *consul) fetchIndefinitely(stop, stopped chan struct{}) {
defer close(stopped)
waitIndex := uint64(1)
subsequentErrors := 0
Expand Down Expand Up @@ -306,11 +319,13 @@ func (c *consul) create(services map[string]service) int {
meta[ConsulAWSNS] = ns
meta[ConsulAWSID] = n.awsID
service := api.AgentService{
ID: id,
Service: name,
Tags: []string{ConsulAWSTag},
Address: h,
Meta: meta,
ID: id,
Service: name,
Tags: []string{ConsulAWSTag},
Address: h,
Meta: meta,
Namespace: c.namespace,
Partition: c.adminPartition,
}
if n.port != 0 {
service.Port = n.port
Expand All @@ -321,6 +336,7 @@ func (c *consul) create(services map[string]service) int {
NodeMeta: map[string]string{ConsulSourceKey: ConsulAWSTag},
SkipNodeUpdate: true,
Service: &service,
Partition: c.adminPartition,
}
_, err := c.client.Catalog().Register(&reg, nil)
if err != nil {
Expand Down Expand Up @@ -349,7 +365,10 @@ func (c *consul) create(services map[string]service) int {
Node: "consul-aws",
Name: "AWS Route53 Health Check",
Status: string(h),
Namespace: c.namespace,
Partition: c.adminPartition,
},
Partition: c.adminPartition,
}
_, err := c.client.Catalog().Register(&reg, nil)
if err != nil {
Expand All @@ -376,7 +395,14 @@ func (c *consul) remove(services map[string]service) int {
wg.Add(1)
go func(id string) {
defer wg.Done()
_, err := c.client.Catalog().Deregister(&api.CatalogDeregistration{Node: ConsulAWSNodeName, ServiceID: id}, nil)
deregistrationInput := &api.CatalogDeregistration{
Node: ConsulAWSNodeName,
ServiceID: id,
Namespace: c.namespace,
Partition: c.adminPartition,
}

_, err := c.client.Catalog().Deregister(deregistrationInput, nil)
if err != nil {
c.log.Error("cannot remove service", "error", err.Error())
} else {
Expand Down
55 changes: 38 additions & 17 deletions catalog/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,68 @@ import (
"github.com/hashicorp/go-hclog"
)

type SyncInput struct {
ToAWS bool
ToConsul bool

ConsulPrefix string
ConsulNamespace string
ConsulAdminPartition string

AWSNamespaceID string
AWSPrefix string
AWSPullInterval string
AWSDNSTTL int64

Stale bool

AWSClient *sd.ServiceDiscovery
ConsulClient *api.Client
}

// Sync aws->consul and vice versa.
func Sync(toAWS, toConsul bool, namespaceID, consulPrefix, awsPrefix, awsPullInterval string, awsDNSTTL int64, stale bool, awsClient *sd.ServiceDiscovery, consulClient *api.Client, stop, stopped chan struct{}) {
func Sync(input *SyncInput, stop, stopped chan struct{}) {
defer close(stopped)
log := hclog.Default().Named("sync")
consul := consul{
client: consulClient,
log: hclog.Default().Named("consul"),
trigger: make(chan bool, 1),
consulPrefix: consulPrefix,
awsPrefix: awsPrefix,
toAWS: toAWS,
stale: stale,
client: input.ConsulClient,
log: hclog.Default().Named("consul"),
trigger: make(chan bool, 1),
consulPrefix: input.ConsulPrefix,
awsPrefix: input.AWSPrefix,
toAWS: input.ToAWS,
stale: input.Stale,
namespace: input.ConsulNamespace,
adminPartition: input.ConsulAdminPartition,
}
pullInterval, err := time.ParseDuration(awsPullInterval)
pullInterval, err := time.ParseDuration(input.AWSPullInterval)
if err != nil {
log.Error("cannot parse aws pull interval", "error", err)
return
}
aws := aws{
client: awsClient,
client: input.AWSClient,
log: hclog.Default().Named("aws"),
trigger: make(chan bool, 1),
consulPrefix: consulPrefix,
awsPrefix: awsPrefix,
toConsul: toConsul,
consulPrefix: input.ConsulPrefix,
awsPrefix: input.AWSPrefix,
toConsul: input.ToConsul,
pullInterval: pullInterval,
dnsTTL: awsDNSTTL,
dnsTTL: input.AWSDNSTTL,
}

err = aws.setupNamespace(namespaceID)
err = aws.setupNamespace(input.AWSNamespaceID)
if err != nil {
log.Error("cannot setup namespace", "error", err)
return
}

fetchConsulStop := make(chan struct{})
fetchConsulStopped := make(chan struct{})
go consul.fetchIndefinetely(fetchConsulStop, fetchConsulStopped)
go consul.fetchIndefinitely(fetchConsulStop, fetchConsulStopped)
fetchAWSStop := make(chan struct{})
fetchAWSStopped := make(chan struct{})
go aws.fetchIndefinetely(fetchAWSStop, fetchAWSStopped)
go aws.fetchIndefinitely(fetchAWSStop, fetchAWSStopped)

toConsulStop := make(chan struct{})
toConsulStopped := make(chan struct{})
Expand Down
18 changes: 18 additions & 0 deletions catalog/sync_enterprise_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package catalog

import (
"os"
"testing"
)

func TestSyncEnterprise(t *testing.T) {
if len(os.Getenv("INTTEST_ENTERPRISE")) == 0 {
t.Skip("Set INTTEST_ENTERPRISE=1 to enable integration tests targetting consul enterprise")
}
awsNamespaceID := os.Getenv("NAMESPACEID")
if len(awsNamespaceID) == 0 {
awsNamespaceID = "ns-n5qqli2346hqood4"
}

runSyncTest(t, awsNamespaceID, "test-partition", "test-namespace")
}
Loading