From 0dad63668e40fdc8a04e8c765bb5432e2ae51142 Mon Sep 17 00:00:00 2001 From: Hoang Ngo Date: Sun, 15 Mar 2026 16:36:37 +0700 Subject: [PATCH 1/2] feat: implement necessary methods for interacting with elb resources Signed-off-by: Hoang Ngo --- pkg/app/pipedv1/plugin/ecs/go.mod | 1 + pkg/app/pipedv1/plugin/ecs/go.sum | 2 + pkg/app/pipedv1/plugin/ecs/provider/client.go | 111 ++++++++++++++ pkg/app/pipedv1/plugin/ecs/provider/ecs.go | 7 + .../plugin/ecs/provider/routing_traffic.go | 48 +++++++ .../ecs/provider/routing_traffic_test.go | 135 ++++++++++++++++++ 6 files changed, 304 insertions(+) create mode 100644 pkg/app/pipedv1/plugin/ecs/provider/routing_traffic.go create mode 100644 pkg/app/pipedv1/plugin/ecs/provider/routing_traffic_test.go diff --git a/pkg/app/pipedv1/plugin/ecs/go.mod b/pkg/app/pipedv1/plugin/ecs/go.mod index e31eac37e4..544793c010 100644 --- a/pkg/app/pipedv1/plugin/ecs/go.mod +++ b/pkg/app/pipedv1/plugin/ecs/go.mod @@ -7,6 +7,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.27.38 github.com/aws/aws-sdk-go-v2/credentials v1.17.36 github.com/aws/aws-sdk-go-v2/service/ecs v1.46.2 + github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.38.2 github.com/creasty/defaults v1.6.0 github.com/go-playground/assert/v2 v2.2.0 github.com/pipe-cd/pipecd v0.54.0-rc1.0.20250912082650-0b949bb7aac9 diff --git a/pkg/app/pipedv1/plugin/ecs/go.sum b/pkg/app/pipedv1/plugin/ecs/go.sum index 34c9de9ba6..75ed8f3ff7 100644 --- a/pkg/app/pipedv1/plugin/ecs/go.sum +++ b/pkg/app/pipedv1/plugin/ecs/go.sum @@ -64,6 +64,8 @@ github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvK github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/service/ecs v1.46.2 h1:mC8vCpzGYi87z5Ot+LcIU7rpabkX88os9ZvtelIhHu0= github.com/aws/aws-sdk-go-v2/service/ecs v1.46.2/go.mod h1:/IMvyX4u5s4Ed0kzD+vWdPK92zm/q4CN1afJeDCsdhE= +github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.38.2 h1:0pVeGkp7MqM3k3Il75hA6xI2USdkjaUv58SXJwvFIGY= +github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.38.2/go.mod h1:V/sx2Ja18AlrvTGQsilx8CAH0CPm+hpKdT9RbSpceik= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5/go.mod h1:QdZ3OmoIjSX+8D1OPAzPxDfjXASbBMDsz9qvtyIhtik= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 h1:Xbwbmk44URTiHNx6PNo0ujDE6ERlsCKJD3u1zfnzAPg= diff --git a/pkg/app/pipedv1/plugin/ecs/provider/client.go b/pkg/app/pipedv1/plugin/ecs/provider/client.go index 42a7b2f09a..5b74816294 100644 --- a/pkg/app/pipedv1/plugin/ecs/provider/client.go +++ b/pkg/app/pipedv1/plugin/ecs/provider/client.go @@ -24,6 +24,8 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials/stscreds" "github.com/aws/aws-sdk-go-v2/service/ecs" "github.com/aws/aws-sdk-go-v2/service/ecs/types" + "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2" + elbtypes "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types" "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider" "github.com/pipe-cd/pipecd/pkg/backoff" @@ -43,6 +45,7 @@ const ( type client struct { ecsClient *ecs.Client + elbClient *elasticloadbalancingv2.Client } func newClient(region, profile, credentialsFile, roleARN, tokenPath string) (Client, error) { @@ -71,10 +74,118 @@ func newClient(region, profile, credentialsFile, roleARN, tokenPath string) (Cli return nil, fmt.Errorf("failed to load config to create ecs client: %w", err) } c.ecsClient = ecs.NewFromConfig(cfg) + c.elbClient = elasticloadbalancingv2.NewFromConfig(cfg) return c, nil } +func (c *client) GetListenerArns(ctx context.Context, targetGroup types.LoadBalancer) ([]string, error) { + loadBalancerArn, err := c.getLoadBalancerArn(ctx, *targetGroup.TargetGroupArn) + if err != nil { + return nil, err + } + + output, err := c.elbClient.DescribeListeners(ctx, &elasticloadbalancingv2.DescribeListenersInput{ + LoadBalancerArn: aws.String(loadBalancerArn), + }) + if err != nil { + return nil, fmt.Errorf("failed to describe listeners for load balancer %s: %w", loadBalancerArn, err) + } + if len(output.Listeners) == 0 { + return nil, platformprovider.ErrNotFound + } + + arns := make([]string, len(output.Listeners)) + for i := range output.Listeners { + arns[i] = *output.Listeners[i].ListenerArn + } + return arns, nil +} + +func (c *client) getLoadBalancerArn(ctx context.Context, targetGroupArn string) (string, error) { + output, err := c.elbClient.DescribeTargetGroups(ctx, &elasticloadbalancingv2.DescribeTargetGroupsInput{ + TargetGroupArns: []string{targetGroupArn}, + }) + if err != nil { + return "", fmt.Errorf("failed to describe target group %s: %w", targetGroupArn, err) + } + if len(output.TargetGroups) == 0 { + return "", platformprovider.ErrNotFound + } + if len(output.TargetGroups[0].LoadBalancerArns) == 0 { + return "", fmt.Errorf("target group %s is not attached to any load balancer", targetGroupArn) + } + // Only support target groups that serve traffic from one load balancer. + return output.TargetGroups[0].LoadBalancerArns[0], nil +} + +func (c *client) ModifyListeners(ctx context.Context, listenerArns []string, routingTrafficCfg RoutingTrafficConfig) ([]string, error) { + if len(routingTrafficCfg) != 2 { + return nil, fmt.Errorf("invalid routing config: requires exactly 2 target groups") + } + + modifiedRuleArns := make([]string, 0) + + for _, listenerArn := range listenerArns { + describeRulesOutput, err := c.elbClient.DescribeRules(ctx, &elasticloadbalancingv2.DescribeRulesInput{ + ListenerArn: aws.String(listenerArn), + }) + if err != nil { + return modifiedRuleArns, fmt.Errorf("failed to describe rules of listener %s: %w", listenerArn, err) + } + + for _, rule := range describeRulesOutput.Rules { + modifiedActions := make([]elbtypes.Action, 0, len(rule.Actions)) + for _, action := range rule.Actions { + if action.Type == elbtypes.ActionTypeEnumForward && + action.ForwardConfig != nil && + routingTrafficCfg.hasSameTargets(action.ForwardConfig.TargetGroups) { + modifiedActions = append(modifiedActions, elbtypes.Action{ + Type: elbtypes.ActionTypeEnumForward, + Order: action.Order, + ForwardConfig: &elbtypes.ForwardActionConfig{ + TargetGroups: []elbtypes.TargetGroupTuple{ + { + TargetGroupArn: aws.String(routingTrafficCfg[0].TargetGroupArn), + Weight: aws.Int32(int32(routingTrafficCfg[0].Weight)), + }, + { + TargetGroupArn: aws.String(routingTrafficCfg[1].TargetGroupArn), + Weight: aws.Int32(int32(routingTrafficCfg[1].Weight)), + }, + }, + }, + }) + } else { + modifiedActions = append(modifiedActions, action) + } + } + + if aws.ToBool(rule.IsDefault) { + _, err := c.elbClient.ModifyListener(ctx, &elasticloadbalancingv2.ModifyListenerInput{ + ListenerArn: &listenerArn, + DefaultActions: modifiedActions, + }) + if err != nil { + return modifiedRuleArns, fmt.Errorf("failed to modify default rule of listener %s: %w", listenerArn, err) + } + modifiedRuleArns = append(modifiedRuleArns, fmt.Sprintf("default rule of listener %s", listenerArn)) + } else { + _, err := c.elbClient.ModifyRule(ctx, &elasticloadbalancingv2.ModifyRuleInput{ + RuleArn: rule.RuleArn, + Actions: modifiedActions, + }) + if err != nil { + return modifiedRuleArns, fmt.Errorf("failed to modify rule %s: %w", *rule.RuleArn, err) + } + modifiedRuleArns = append(modifiedRuleArns, *rule.RuleArn) + } + } + } + + return modifiedRuleArns, nil +} + func (c *client) CreateService(ctx context.Context, service types.Service) (*types.Service, error) { if service.DeploymentController == nil || service.DeploymentController.Type != types.DeploymentControllerTypeExternal { return nil, fmt.Errorf("failed to create ECS service %s: deployment controller of type EXTERNAL is required", *service.ServiceName) diff --git a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go index a693dd54b8..3045fa59ae 100644 --- a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go +++ b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go @@ -37,6 +37,7 @@ const ( type Client interface { ECS + ELB } // ECS defines methods for interacting with ECS resources. @@ -59,6 +60,12 @@ type ECS interface { UntagResource(ctx context.Context, resourceArn string, tagKeys []string) error } +// ELB defines methods for interacting with ELB resources. +type ELB interface { + GetListenerArns(ctx context.Context, targetGroup types.LoadBalancer) ([]string, error) + ModifyListeners(ctx context.Context, listenerArns []string, routingTrafficCfg RoutingTrafficConfig) ([]string, error) +} + // LoadTaskDefinition returns TaskDefinition object from a given task definition file. func LoadTaskDefinition(appDir, taskDefinition string) (types.TaskDefinition, error) { path := filepath.Join(appDir, taskDefinition) diff --git a/pkg/app/pipedv1/plugin/ecs/provider/routing_traffic.go b/pkg/app/pipedv1/plugin/ecs/provider/routing_traffic.go new file mode 100644 index 0000000000..b1872a2344 --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/provider/routing_traffic.go @@ -0,0 +1,48 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package provider + +import ( + "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types" +) + +type RoutingTrafficConfig []targetGroupWeight + +type targetGroupWeight struct { + TargetGroupArn string + Weight int +} + +func (c RoutingTrafficConfig) hasSameTargets(forwardActionTargets []types.TargetGroupTuple) bool { + if len(c) != len(forwardActionTargets) { + return false + } + + cMap := make(map[string]struct{}) + for _, item := range c { + cMap[item.TargetGroupArn] = struct{}{} + } + + for _, target := range forwardActionTargets { + if target.TargetGroupArn == nil { + return false + } + if _, ok := cMap[*target.TargetGroupArn]; !ok { + return false + } + } + + return true +} diff --git a/pkg/app/pipedv1/plugin/ecs/provider/routing_traffic_test.go b/pkg/app/pipedv1/plugin/ecs/provider/routing_traffic_test.go new file mode 100644 index 0000000000..4522924b6a --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/provider/routing_traffic_test.go @@ -0,0 +1,135 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package provider + +import ( + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2/types" + "github.com/stretchr/testify/assert" +) + +func TestHasSameTargets(t *testing.T) { + t.Parallel() + + testcases := []struct { + name string + cfg RoutingTrafficConfig + actionTargets []types.TargetGroupTuple + expected bool + }{ + { + name: "has the same target groups in the same order", + cfg: RoutingTrafficConfig{ + { + TargetGroupArn: "arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1", + Weight: 100, + }, + { + TargetGroupArn: "arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy2", + Weight: 0, + }, + }, + actionTargets: []types.TargetGroupTuple{ + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1"), + Weight: aws.Int32(100), + }, + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy2"), + Weight: aws.Int32(0), + }, + }, + expected: true, + }, + { + name: "has the same target groups in the different order", + cfg: RoutingTrafficConfig{ + { + TargetGroupArn: "arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1", + Weight: 100, + }, + { + TargetGroupArn: "arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy2", + Weight: 0, + }, + }, + actionTargets: []types.TargetGroupTuple{ + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy2"), + Weight: aws.Int32(0), + }, + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1"), + Weight: aws.Int32(100), + }, + }, + expected: true, + }, + { + name: "the number of target groups are different", + cfg: RoutingTrafficConfig{ + { + TargetGroupArn: "arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1", + Weight: 100, + }, + }, + actionTargets: []types.TargetGroupTuple{ + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1"), + Weight: aws.Int32(0), + }, + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy2"), + Weight: aws.Int32(100), + }, + }, + expected: false, + }, + { + name: "has a different target group", + cfg: RoutingTrafficConfig{ + { + TargetGroupArn: "arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1", + Weight: 100, + }, + { + TargetGroupArn: "arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy2", + Weight: 0, + }, + }, + actionTargets: []types.TargetGroupTuple{ + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy1"), + Weight: aws.Int32(0), + }, + { + TargetGroupArn: aws.String("arn:aws:elasticloadbalancing:::targetgroup/xxx/yyy3"), + Weight: aws.Int32(100), + }, + }, + expected: false, + }, + } + + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + hasSame := tc.cfg.hasSameTargets(tc.actionTargets) + assert.Equal(t, tc.expected, hasSame) + }) + } +} From c8937ccd3046f97249bea51a3b1fea38050b9501 Mon Sep 17 00:00:00 2001 From: Hoang Ngo Date: Sun, 15 Mar 2026 16:37:03 +0700 Subject: [PATCH 2/2] feat: implement traffic routing stage Signed-off-by: Hoang Ngo --- pkg/app/pipedv1/plugin/ecs/config/traffic.go | 37 +++ .../pipedv1/plugin/ecs/deployment/plugin.go | 4 + .../plugin/ecs/deployment/test_helper.go | 8 + .../pipedv1/plugin/ecs/deployment/traffic.go | 161 +++++++++++ .../plugin/ecs/deployment/traffic_test.go | 256 ++++++++++++++++++ 5 files changed, 466 insertions(+) create mode 100644 pkg/app/pipedv1/plugin/ecs/config/traffic.go create mode 100644 pkg/app/pipedv1/plugin/ecs/deployment/traffic.go create mode 100644 pkg/app/pipedv1/plugin/ecs/deployment/traffic_test.go diff --git a/pkg/app/pipedv1/plugin/ecs/config/traffic.go b/pkg/app/pipedv1/plugin/ecs/config/traffic.go new file mode 100644 index 0000000000..1e9714ce8b --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/config/traffic.go @@ -0,0 +1,37 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// ECSTrafficRoutingStageOptions contains all configurable values for an ECS_TRAFFIC_ROUTING stage. +type ECSTrafficRoutingStageOptions struct { + // Canary represents the percentage of traffic to route to the canary variant. + // If set, primary will be 100 - canary. + Canary int `json:"canary,omitempty"` + // Primary represents the percentage of traffic to route to the primary variant. + // If set, canary will be 100 - primary. + Primary int `json:"primary,omitempty"` +} + +// Percentages returns the traffic split between primary and canary. +// If neither is set, primary gets 100% by default. +func (opts ECSTrafficRoutingStageOptions) Percentages() (primary, canary int) { + if opts.Primary > 0 && opts.Primary <= 100 { + return opts.Primary, 100 - opts.Primary + } + if opts.Canary > 0 && opts.Canary <= 100 { + return 100 - opts.Canary, opts.Canary + } + return 100, 0 +} diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go b/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go index aaa819da44..4f3b04ba11 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/plugin.go @@ -81,6 +81,10 @@ func (p *ECSPlugin) ExecuteStage( return &sdk.ExecuteStageResponse{ Status: p.executeECSCanaryRolloutStage(ctx, input, deployTargets[0]), }, nil + case StageECSTrafficRouting: + return &sdk.ExecuteStageResponse{ + Status: p.executeECSTrafficRouting(ctx, input, deployTargets[0]), + }, nil default: return nil, ErrUnsupportedStage } diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go b/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go index 1737950a57..acdcbc641e 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go @@ -55,6 +55,8 @@ type mockECSClient struct { ListTagsFunc func(ctx context.Context, resourceArn string) ([]types.Tag, error) TagResourceFunc func(ctx context.Context, resourceArn string, tags []types.Tag) error UntagResourceFunc func(ctx context.Context, resourceArn string, tagKeys []string) error + GetListenerArnsFunc func(ctx context.Context, targetGroup types.LoadBalancer) ([]string, error) + ModifyListenersFunc func(ctx context.Context, listenerArns []string, routingTrafficCfg provider.RoutingTrafficConfig) ([]string, error) } var _ provider.Client = (*mockECSClient)(nil) @@ -107,6 +109,12 @@ func (m *mockECSClient) TagResource(ctx context.Context, resourceArn string, tag func (m *mockECSClient) UntagResource(ctx context.Context, resourceArn string, tagKeys []string) error { return m.UntagResourceFunc(ctx, resourceArn, tagKeys) } +func (m *mockECSClient) GetListenerArns(ctx context.Context, targetGroup types.LoadBalancer) ([]string, error) { + return m.GetListenerArnsFunc(ctx, targetGroup) +} +func (m *mockECSClient) ModifyListeners(ctx context.Context, listenerArns []string, routingTrafficCfg provider.RoutingTrafficConfig) ([]string, error) { + return m.ModifyListenersFunc(ctx, listenerArns, routingTrafficCfg) +} func happyPathClient(registeredTD *types.TaskDefinition, updatedSvc *types.Service, newTS *types.TaskSet, prevTaskSets []types.TaskSet) *mockECSClient { return &mockECSClient{ diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/traffic.go b/pkg/app/pipedv1/plugin/ecs/deployment/traffic.go new file mode 100644 index 0000000000..17b255481a --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/deployment/traffic.go @@ -0,0 +1,161 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/config" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/provider" +) + +const ( + canaryTargetGroupArnKey = "canary-target-group-arn" + trafficRoutePrimaryMetadataKey = "primary-percentage" + trafficRouteCanaryMetadataKey = "canary-percentage" + currentListenersKey = "current-listeners" +) + +func (p *ECSPlugin) executeECSTrafficRouting( + ctx context.Context, + input *sdk.ExecuteStageInput[config.ECSApplicationSpec], + deployTarget *sdk.DeployTarget[config.ECSDeployTargetConfig], +) sdk.StageStatus { + lp := input.Client.LogPersister() + + cfg, err := input.Request.TargetDeploymentSource.AppConfig() + if err != nil { + lp.Errorf("Failed to load app config: %v", err) + return sdk.StageStatusFailure + } + + accessType := cfg.Spec.Input.AccessType + if accessType != "ELB" { + lp.Errorf("Unsupported access type %s in stage Traffic Routing for ECS application", accessType) + return sdk.StageStatusFailure + } + + primary, canary, err := provider.LoadTargetGroups(cfg.Spec.Input.TargetGroups) + if err != nil { + lp.Errorf("Failed to load target groups: %v", err) + return sdk.StageStatusFailure + } + + if primary == nil || canary == nil { + lp.Errorf("Required both primary and canary target groups for traffic routing") + return sdk.StageStatusFailure + } + + if err = input.Client.PutDeploymentPluginMetadata( + ctx, + canaryTargetGroupArnKey, + *canary.TargetGroupArn, + ); err != nil { + lp.Errorf("Failed to save canary target group ARN for rollback: %v", err) + return sdk.StageStatusFailure + } + + client, err := provider.DefaultRegistry().Client(deployTarget.Name, deployTarget.Config) + if err != nil { + lp.Errorf("Failed to get ECS client for deploy target %s: %v", deployTarget.Name, err) + return sdk.StageStatusFailure + } + + var options config.ECSTrafficRoutingStageOptions + if err := json.Unmarshal(input.Request.StageConfig, &options); err != nil { + lp.Errorf("Failed to parse stage option: %v", err) + return sdk.StageStatusFailure + } + + lp.Infof("Start performing routing traffic") + if err = routing(ctx, lp, input.Client, client, *primary, *canary, options); err != nil { + lp.Errorf("Failed to route traffic: %v", err) + return sdk.StageStatusFailure + } + + return sdk.StageStatusSuccess +} + +// metadataStore abstracts the deployment plugin metadata operations for testability +type metadataStore interface { + PutDeploymentPluginMetadataMulti(ctx context.Context, metadata map[string]string) error + GetDeploymentPluginMetadata(ctx context.Context, key string) (string, bool, error) + PutDeploymentPluginMetadata(ctx context.Context, key string, value string) error +} + +func routing( + ctx context.Context, + lp sdk.StageLogPersister, + mdStore metadataStore, + providerClient provider.Client, + primaryTargetGroup types.LoadBalancer, + canaryTargetGroup types.LoadBalancer, + options config.ECSTrafficRoutingStageOptions, +) error { + // Retrieve traffic split of primary and canary + primaryWeight, canaryWeight := options.Percentages() + routeTrafficCfg := provider.RoutingTrafficConfig{ + { + TargetGroupArn: *primaryTargetGroup.TargetGroupArn, + Weight: primaryWeight, + }, + { + TargetGroupArn: *canaryTargetGroup.TargetGroupArn, + Weight: canaryWeight, + }, + } + + percentageMetadata := map[string]string{ + trafficRoutePrimaryMetadataKey: strconv.FormatInt(int64(primaryWeight), 10), + trafficRouteCanaryMetadataKey: strconv.FormatInt(int64(canaryWeight), 10), + } + if err := mdStore.PutDeploymentPluginMetadataMulti(ctx, percentageMetadata); err != nil { + return fmt.Errorf("Failed to store percentage metadata: %v", err) + } + + var currListenerArns []string + value, ok, err := mdStore.GetDeploymentPluginMetadata(ctx, currentListenersKey) + if err != nil { + return fmt.Errorf("Failed to get current listener arns: %v", err) + } + if ok { + currListenerArns = strings.Split(value, ",") + } else { + currListenerArns, err = providerClient.GetListenerArns(ctx, primaryTargetGroup) + if err != nil { + return fmt.Errorf("Failed to get current active listeners: %v", err) + } + } + + metadataCurrListener := strings.Join(currListenerArns, ",") + if err := mdStore.PutDeploymentPluginMetadata(ctx, currentListenersKey, metadataCurrListener); err != nil { + return fmt.Errorf("Failed to store listeners to metadata store: %v", err) + } + + modifiedRules, err := providerClient.ModifyListeners(ctx, currListenerArns, routeTrafficCfg) + if err != nil { + return fmt.Errorf("Failed to routing traffic to primary and canary variants: %v", err) + } + lp.Infof("Modified %d listener rules: %v", len(modifiedRules), modifiedRules) + + return nil +} diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/traffic_test.go b/pkg/app/pipedv1/plugin/ecs/deployment/traffic_test.go new file mode 100644 index 0000000000..d6a1e744fc --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/deployment/traffic_test.go @@ -0,0 +1,256 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deployment + +import ( + "context" + "errors" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/config" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/provider" +) + +// mockMetadataStore implements metadataStore for testing. +type mockMetadataStore struct { + PutMultiFunc func(ctx context.Context, metadata map[string]string) error + GetFunc func(ctx context.Context, key string) (string, bool, error) + PutFunc func(ctx context.Context, key string, value string) error +} + +func (m *mockMetadataStore) PutDeploymentPluginMetadataMulti(ctx context.Context, metadata map[string]string) error { + return m.PutMultiFunc(ctx, metadata) +} +func (m *mockMetadataStore) GetDeploymentPluginMetadata(ctx context.Context, key string) (string, bool, error) { + return m.GetFunc(ctx, key) +} +func (m *mockMetadataStore) PutDeploymentPluginMetadata(ctx context.Context, key string, value string) error { + return m.PutFunc(ctx, key, value) +} + +var _ metadataStore = (*mockMetadataStore)(nil) + +func happyMetadataStore() *mockMetadataStore { + return &mockMetadataStore{ + PutMultiFunc: func(_ context.Context, _ map[string]string) error { return nil }, + GetFunc: func(_ context.Context, _ string) (string, bool, error) { return "", false, nil }, + PutFunc: func(_ context.Context, _, _ string) error { return nil }, + } +} + +func TestRouting(t *testing.T) { + t.Parallel() + + var ( + primaryARN = "arn:aws:elasticloadbalancing:us-east-1:123:targetgroup/primary/aaa" + canaryARN = "arn:aws:elasticloadbalancing:us-east-1:123:targetgroup/canary/bbb" + listenerARN1 = "arn:aws:elasticloadbalancing:us-east-1:123:listener/app/my-alb/aaa/bbb" + listenerARN2 = "arn:aws:elasticloadbalancing:us-east-1:123:listener/app/my-alb/aaa/ccc" + primaryTG = types.LoadBalancer{TargetGroupArn: aws.String(primaryARN)} + canaryTG = types.LoadBalancer{TargetGroupArn: aws.String(canaryARN)} + ) + + testcases := []struct { + name string + options config.ECSTrafficRoutingStageOptions + metadata *mockMetadataStore + client *mockECSClient + wantErr bool + wantErrMsg string + }{ + { + name: "success: listener ARNs not cached, fetched from AWS", + options: config.ECSTrafficRoutingStageOptions{Canary: 20}, + metadata: happyMetadataStore(), + client: func() *mockECSClient { + m := &mockECSClient{} + m.GetListenerArnsFunc = func(_ context.Context, _ types.LoadBalancer) ([]string, error) { + return []string{listenerARN1}, nil + } + m.ModifyListenersFunc = func(_ context.Context, listenerArns []string, cfg provider.RoutingTrafficConfig) ([]string, error) { + assert.Equal(t, []string{listenerARN1}, listenerArns) + assert.Equal(t, provider.RoutingTrafficConfig{ + {TargetGroupArn: primaryARN, Weight: 80}, + {TargetGroupArn: canaryARN, Weight: 20}, + }, cfg) + return []string{"rule-1"}, nil + } + return m + }(), + }, + { + name: "success: listener ARNs cached in metadata, skip GetListenerArns", + options: config.ECSTrafficRoutingStageOptions{Canary: 20}, + metadata: func() *mockMetadataStore { + m := happyMetadataStore() + m.GetFunc = func(_ context.Context, key string) (string, bool, error) { + if key == currentListenersKey { + return listenerARN1 + "," + listenerARN2, true, nil + } + return "", false, nil + } + return m + }(), + client: func() *mockECSClient { + m := &mockECSClient{} + m.GetListenerArnsFunc = func(_ context.Context, _ types.LoadBalancer) ([]string, error) { + t.Error("GetListenerArns should not be called when cached") + return nil, nil + } + m.ModifyListenersFunc = func(_ context.Context, listenerArns []string, _ provider.RoutingTrafficConfig) ([]string, error) { + assert.Equal(t, []string{listenerARN1, listenerARN2}, listenerArns) + return []string{"rule-1", "rule-2"}, nil + } + return m + }(), + }, + { + name: "success: primary=100 routes all traffic to primary", + options: config.ECSTrafficRoutingStageOptions{Primary: 100}, + metadata: happyMetadataStore(), + client: func() *mockECSClient { + m := &mockECSClient{} + m.GetListenerArnsFunc = func(_ context.Context, _ types.LoadBalancer) ([]string, error) { + return []string{listenerARN1}, nil + } + m.ModifyListenersFunc = func(_ context.Context, _ []string, cfg provider.RoutingTrafficConfig) ([]string, error) { + assert.Equal(t, 100, cfg[0].Weight) + assert.Equal(t, 0, cfg[1].Weight) + return []string{"rule-1"}, nil + } + return m + }(), + }, + { + name: "success: no options set defaults to primary=100", + options: config.ECSTrafficRoutingStageOptions{}, + metadata: happyMetadataStore(), + client: func() *mockECSClient { + m := &mockECSClient{} + m.GetListenerArnsFunc = func(_ context.Context, _ types.LoadBalancer) ([]string, error) { + return []string{listenerARN1}, nil + } + m.ModifyListenersFunc = func(_ context.Context, _ []string, cfg provider.RoutingTrafficConfig) ([]string, error) { + assert.Equal(t, 100, cfg[0].Weight) + assert.Equal(t, 0, cfg[1].Weight) + return []string{"rule-1"}, nil + } + return m + }(), + }, + { + name: "fail: PutDeploymentPluginMetadataMulti error", + options: config.ECSTrafficRoutingStageOptions{Canary: 20}, + metadata: func() *mockMetadataStore { + m := happyMetadataStore() + m.PutMultiFunc = func(_ context.Context, _ map[string]string) error { + return errors.New("put multi error") + } + return m + }(), + client: &mockECSClient{}, + wantErr: true, + wantErrMsg: "Failed to store percentage metadata", + }, + { + name: "fail: GetDeploymentPluginMetadata error", + options: config.ECSTrafficRoutingStageOptions{Canary: 20}, + metadata: func() *mockMetadataStore { + m := happyMetadataStore() + m.GetFunc = func(_ context.Context, _ string) (string, bool, error) { + return "", false, errors.New("get error") + } + return m + }(), + client: &mockECSClient{}, + wantErr: true, + wantErrMsg: "Failed to get current listener arns", + }, + { + name: "fail: GetListenerArns error when not cached", + options: config.ECSTrafficRoutingStageOptions{Canary: 20}, + metadata: happyMetadataStore(), + client: func() *mockECSClient { + m := &mockECSClient{} + m.GetListenerArnsFunc = func(_ context.Context, _ types.LoadBalancer) ([]string, error) { + return nil, errors.New("describe listeners error") + } + return m + }(), + wantErr: true, + wantErrMsg: "Failed to get current active listeners", + }, + { + name: "fail: PutDeploymentPluginMetadata error when saving listeners", + options: config.ECSTrafficRoutingStageOptions{Canary: 20}, + metadata: func() *mockMetadataStore { + m := happyMetadataStore() + m.PutFunc = func(_ context.Context, key, _ string) error { + if key == currentListenersKey { + return errors.New("put listeners error") + } + return nil + } + return m + }(), + client: func() *mockECSClient { + m := &mockECSClient{} + m.GetListenerArnsFunc = func(_ context.Context, _ types.LoadBalancer) ([]string, error) { + return []string{listenerARN1}, nil + } + return m + }(), + wantErr: true, + wantErrMsg: "Failed to store listeners to metadata store", + }, + { + name: "fail: ModifyListeners error", + options: config.ECSTrafficRoutingStageOptions{Canary: 20}, + metadata: happyMetadataStore(), + client: func() *mockECSClient { + m := &mockECSClient{} + m.GetListenerArnsFunc = func(_ context.Context, _ types.LoadBalancer) ([]string, error) { + return []string{listenerARN1}, nil + } + m.ModifyListenersFunc = func(_ context.Context, _ []string, _ provider.RoutingTrafficConfig) ([]string, error) { + return nil, errors.New("modify listeners error") + } + return m + }(), + wantErr: true, + wantErrMsg: "Failed to routing traffic", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + err := routing(context.Background(), &fakeLogPersister{}, tc.metadata, tc.client, primaryTG, canaryTG, tc.options) + + if tc.wantErr { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.wantErrMsg) + return + } + require.NoError(t, err) + }) + } +}