From 0310f0f132b7bbd873bd73eac8152fd25970ae55 Mon Sep 17 00:00:00 2001 From: Hoang Ngo Date: Fri, 27 Mar 2026 22:41:20 +0700 Subject: [PATCH 1/2] feat: provide methods for getting live state from ecs resources Signed-off-by: Hoang Ngo --- .../plugin/ecs/deployment/test_helper.go | 8 +++ pkg/app/pipedv1/plugin/ecs/provider/client.go | 61 +++++++++++++++++++ pkg/app/pipedv1/plugin/ecs/provider/ecs.go | 2 + 3 files changed, 71 insertions(+) diff --git a/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go b/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go index 1737950a57..842e1ed9ca 100644 --- a/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go +++ b/pkg/app/pipedv1/plugin/ecs/deployment/test_helper.go @@ -41,11 +41,13 @@ func (f *fakeLogPersister) Complete(time.Duration) error { return nil } type mockECSClient struct { CreateServiceFunc func(ctx context.Context, service types.Service) (*types.Service, error) UpdateServiceFunc func(ctx context.Context, service types.Service) (*types.Service, error) + DescribeServiceFunc func(ctx context.Context, service types.Service) (*types.Service, error) GetServiceTaskSetsFunc func(ctx context.Context, service types.Service) ([]types.TaskSet, error) GetPrimaryTaskSetFunc func(ctx context.Context, service types.Service) (*types.TaskSet, error) CreateTaskSetFunc func(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition, targetGroup *types.LoadBalancer, scale float64) (*types.TaskSet, error) UpdateServicePrimaryTaskSetFunc func(ctx context.Context, service types.Service, taskSet types.TaskSet) (*types.TaskSet, error) DeleteTaskSetFunc func(ctx context.Context, taskSet types.TaskSet) error + GetTasksFunc func(ctx context.Context, service types.Service) ([]types.Task, error) ServiceExistsFunc func(ctx context.Context, cluster, serviceName string) (bool, error) GetServiceStatusFunc func(ctx context.Context, cluster, serviceName string) (string, error) WaitServiceStableFunc func(ctx context.Context, cluster, serviceName string) error @@ -65,6 +67,9 @@ func (m *mockECSClient) CreateService(ctx context.Context, service types.Service func (m *mockECSClient) UpdateService(ctx context.Context, service types.Service) (*types.Service, error) { return m.UpdateServiceFunc(ctx, service) } +func (m *mockECSClient) DescribeService(ctx context.Context, service types.Service) (*types.Service, error) { + return m.DescribeServiceFunc(ctx, service) +} func (m *mockECSClient) GetServiceTaskSets(ctx context.Context, service types.Service) ([]types.TaskSet, error) { return m.GetServiceTaskSetsFunc(ctx, service) } @@ -80,6 +85,9 @@ func (m *mockECSClient) UpdateServicePrimaryTaskSet(ctx context.Context, service func (m *mockECSClient) DeleteTaskSet(ctx context.Context, taskSet types.TaskSet) error { return m.DeleteTaskSetFunc(ctx, taskSet) } +func (m *mockECSClient) GetTasks(ctx context.Context, service types.Service) ([]types.Task, error) { + return m.GetTasksFunc(ctx, service) +} func (m *mockECSClient) ServiceExists(ctx context.Context, cluster, serviceName string) (bool, error) { return m.ServiceExistsFunc(ctx, cluster, serviceName) } diff --git a/pkg/app/pipedv1/plugin/ecs/provider/client.go b/pkg/app/pipedv1/plugin/ecs/provider/client.go index 42a7b2f09a..ac41bfcc7a 100644 --- a/pkg/app/pipedv1/plugin/ecs/provider/client.go +++ b/pkg/app/pipedv1/plugin/ecs/provider/client.go @@ -145,6 +145,25 @@ func (c *client) UpdateService(ctx context.Context, service types.Service) (*typ return output.Service, nil } +func (c *client) DescribeService(ctx context.Context, service types.Service) (*types.Service, error) { + input := &ecs.DescribeServicesInput{ + Cluster: service.ClusterArn, + Services: []string{ + *service.ServiceName, + }, + } + output, err := c.ecsClient.DescribeServices(ctx, input) + if err != nil { + return nil, fmt.Errorf("failed to get service %s description: %w", *service.ServiceName, err) + } + + if len(output.Services) == 0 { + return nil, fmt.Errorf("services %s does not exist", *service.ServiceName) + } + + return &output.Services[0], nil +} + func (c *client) GetServiceTaskSets(ctx context.Context, service types.Service) ([]types.TaskSet, error) { input := &ecs.DescribeServicesInput{ Cluster: service.ClusterArn, @@ -311,6 +330,48 @@ func (c *client) DeleteTaskSet(ctx context.Context, taskSet types.TaskSet) error return nil } +func (c *client) GetTasks(ctx context.Context, service types.Service) ([]types.Task, error) { + // Get list of task ARN of the given service, using pagination here because max number of tasks return from ListTasks API is 100 + var taskArns []string + paginator := ecs.NewListTasksPaginator(c.ecsClient, &ecs.ListTasksInput{ + Cluster: service.ClusterArn, + ServiceName: service.ServiceName, + }) + for paginator.HasMorePages() { + page, err := paginator.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list tasks of service %s: %w", *service.ServiceName, err) + } + taskArns = append(taskArns, page.TaskArns...) + } + + if len(taskArns) == 0 { + return nil, nil + } + + var tasks []types.Task + // Max number of tasks in each run of DescribeTasks is 100 + const batchSize = 100 + for i := 0; i < len(taskArns); i += batchSize { + end := i + batchSize + if end > len(taskArns) { + end = len(taskArns) + } + + batch := taskArns[i:end] + out, err := c.ecsClient.DescribeTasks(ctx, &ecs.DescribeTasksInput{ + Cluster: service.ClusterArn, + Tasks: batch, + }) + if err != nil { + return nil, fmt.Errorf("failed to describe tasks: %w", err) + } + + tasks = append(tasks, out.Tasks...) + } + return tasks, nil +} + func (c *client) ServiceExists(ctx context.Context, cluster, serviceName string) (bool, error) { input := &ecs.DescribeServicesInput{ Cluster: aws.String(cluster), // cluster field can be ARN or name diff --git a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go index a693dd54b8..389de028fd 100644 --- a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go +++ b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go @@ -43,11 +43,13 @@ type Client interface { type ECS interface { CreateService(ctx context.Context, service types.Service) (*types.Service, error) UpdateService(ctx context.Context, service types.Service) (*types.Service, error) + DescribeService(ctx context.Context, service types.Service) (*types.Service, error) GetServiceTaskSets(ctx context.Context, service types.Service) ([]types.TaskSet, error) GetPrimaryTaskSet(ctx context.Context, service types.Service) (*types.TaskSet, error) CreateTaskSet(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition, targetGroup *types.LoadBalancer, scale float64) (*types.TaskSet, error) UpdateServicePrimaryTaskSet(ctx context.Context, service types.Service, taskSet types.TaskSet) (*types.TaskSet, error) DeleteTaskSet(ctx context.Context, taskSet types.TaskSet) error + GetTasks(ctx context.Context, service types.Service) ([]types.Task, error) ServiceExists(ctx context.Context, cluster, serviceName string) (bool, error) GetServiceStatus(ctx context.Context, cluster, serviceName string) (string, error) WaitServiceStable(ctx context.Context, cluster, serviceName string) error From b564211f6646aa72a13919108de0f1de44db458d Mon Sep 17 00:00:00 2001 From: Hoang Ngo Date: Fri, 27 Mar 2026 23:42:44 +0700 Subject: [PATCH 2/2] feat: implement livestate plugin Signed-off-by: Hoang Ngo --- .../pipedv1/plugin/ecs/livestate/fetcher.go | 65 ++++ .../pipedv1/plugin/ecs/livestate/plugin.go | 307 ++++++++++++++++++ pkg/app/pipedv1/plugin/ecs/livestate/sync.go | 84 +++++ pkg/app/pipedv1/plugin/ecs/main.go | 2 + pkg/app/pipedv1/plugin/ecs/provider/ecs.go | 8 + 5 files changed, 466 insertions(+) create mode 100644 pkg/app/pipedv1/plugin/ecs/livestate/fetcher.go create mode 100644 pkg/app/pipedv1/plugin/ecs/livestate/plugin.go create mode 100644 pkg/app/pipedv1/plugin/ecs/livestate/sync.go diff --git a/pkg/app/pipedv1/plugin/ecs/livestate/fetcher.go b/pkg/app/pipedv1/plugin/ecs/livestate/fetcher.go new file mode 100644 index 0000000000..ada8fea74e --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/livestate/fetcher.go @@ -0,0 +1,65 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livestate + +import ( + "context" + + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/provider" +) + +// ECSFetcher fetches the current state of an ECS application's resources +type ECSFetcher struct { + client provider.Client +} + +// QueryResources fetches the live Service, its PipeCD-managed TaskSets, and all Tasks for the given service descriptor +func (w *ECSFetcher) FetchResources(ctx context.Context, service types.Service) (*queryResourcesResult, error) { + liveService, err := w.client.DescribeService(ctx, service) + if err != nil { + return nil, err + } + + // GetServiceTaskSets filters out DRAINING task sets and task sets not created by Pipecd, + // so the result only contains task sets that Pipecd is responsible for + tasksets, err := w.client.GetServiceTaskSets(ctx, *liveService) + if err != nil { + return nil, err + } + + // Tasks are fetched for the whole service rather than per task set. + // Grouping tasks under their parent task set is deferred to the caller. + tasks, err := w.client.GetTasks(ctx, *liveService) + if err != nil { + return nil, err + } + + return &queryResourcesResult{ + Service: liveService, + TaskSets: tasksets, + Tasks: tasks, + }, nil +} + +// queryResourcesResult holds the raw AWS objects for a single ECS application. +// +// Tasks are a flat list, not group by taskset ID yet +type queryResourcesResult struct { + Service *types.Service + TaskSets []types.TaskSet + Tasks []types.Task +} diff --git a/pkg/app/pipedv1/plugin/ecs/livestate/plugin.go b/pkg/app/pipedv1/plugin/ecs/livestate/plugin.go new file mode 100644 index 0000000000..376c80a486 --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/livestate/plugin.go @@ -0,0 +1,307 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livestate + +import ( + "context" + "fmt" + "strconv" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + "go.uber.org/zap" + + ecsconfig "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/config" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/provider" +) + +const ( + resourceTypeService = "ECS:Service" + resourceTypeTaskSet = "ECS:TaskSet" + resourceTypeTask = "ECS:Task" +) + +var ( + _ sdk.LivestatePlugin[ecsconfig.ECSPluginConfig, ecsconfig.ECSDeployTargetConfig, ecsconfig.ECSApplicationSpec] = (*ECSLivestatePlugin)(nil) + _ sdk.Initializer[ecsconfig.ECSPluginConfig, ecsconfig.ECSDeployTargetConfig] = (*ECSLivestatePlugin)(nil) +) + +type ECSLivestatePlugin struct { + fetcher *ECSFetcher + initialized sync.Once +} + +func (p *ECSLivestatePlugin) Initialize( + ctx context.Context, + input *sdk.InitializeInput[ecsconfig.ECSPluginConfig, ecsconfig.ECSDeployTargetConfig], +) error { + var err error + p.initialized.Do(func() { + if len(input.DeployTargets) != 1 { + err = fmt.Errorf("only 1 deploy target is allowed but got %d", len(input.DeployTargets)) + return + } + var ( + dtName string + dtConfig ecsconfig.ECSDeployTargetConfig + client provider.Client + ) + for name, cfg := range input.DeployTargets { + dtName = name + dtConfig = cfg.Config + } + + client, err = provider.DefaultRegistry().Client(dtName, dtConfig) + if err != nil { + return + } + + p.fetcher = &ECSFetcher{ + client: client, + } + }) + return err +} + +// GetLivestate returns the current live state of the ECS application and whether it is in sync with the desired state declared in Git +func (p *ECSLivestatePlugin) GetLivestate( + ctx context.Context, + _ *ecsconfig.ECSPluginConfig, + deployTargets []*sdk.DeployTarget[ecsconfig.ECSDeployTargetConfig], + input *sdk.GetLivestateInput[ecsconfig.ECSApplicationSpec], +) (*sdk.GetLivestateResponse, error) { + appCfg, err := input.Request.DeploymentSource.AppConfig() + if err != nil { + return nil, fmt.Errorf("failed to get app config: %w", err) + } + spec := appCfg.Spec.Input + + input.Logger.Info("GetLivestate called", + zap.String("applicationID", input.Request.ApplicationID), + zap.String("serviceDefinitionFile", spec.ServiceDefinitionFile), + zap.Bool("runStandaloneTask", spec.RunStandaloneTask), + zap.String("appDir", input.Request.DeploymentSource.ApplicationDirectory), + ) + + // Standalone tasks are one-off runs with no persistent service to observe. + // There is no meaningful live state to report, return UNKNOWN rather than misleading the user with an empty or stale result + if spec.RunStandaloneTask { + return &sdk.GetLivestateResponse{ + SyncState: sdk.ApplicationSyncState{ + Status: sdk.ApplicationSyncStateUnknown, + ShortReason: "Standalone task does not have a live state", + }, + }, nil + } + + // A missing service definition file is a misconfiguration, without it plugin cannot identify which ECS service to inspect + if spec.ServiceDefinitionFile == "" { + return &sdk.GetLivestateResponse{ + SyncState: sdk.ApplicationSyncState{ + Status: sdk.ApplicationSyncStateInvalidConfig, + ShortReason: "serviceDefinitionFile is required", + }, + }, nil + } + + // Parse the service definition from Git to obtain the cluster and service name + // Intentionally, do not inject deployment tags here as only need the identifiers to locate the live resource on AWS + appDir := input.Request.DeploymentSource.ApplicationDirectory + desiredService, err := provider.ParseServiceDefinition(appDir, spec.ServiceDefinitionFile) + if err != nil { + return nil, fmt.Errorf("failed to parse service definition: %w", err) + } + + // Fetch the live state from AWS, an error here (for example network issue, permission denied, ...) + // is surfaced as OUT_OF_SYNC rather than a hard error + // so the UI can still display something meaningful instead of going blank + result, err := p.fetcher.FetchResources(ctx, desiredService) + if err != nil { + return &sdk.GetLivestateResponse{ + SyncState: sdk.ApplicationSyncState{ + Status: sdk.ApplicationSyncStateOutOfSync, + ShortReason: fmt.Sprintf("Failed to query ECS resources: %v", err), + }, + }, nil + } + + deployTargetName := deployTargets[0].Name + + // Build the response by converting raw AWS objects into SDK resource states and + // computing the sync verdict against the current Git commit + return &sdk.GetLivestateResponse{ + LiveState: sdk.ApplicationLiveState{ + Resources: buildResourceStates(result, deployTargetName), + }, + SyncState: computeSyncState(result, desiredService, input.Request.DeploymentSource.CommitHash), + }, nil +} + +func buildResourceStates(result *queryResourcesResult, deployTarget string) []sdk.ResourceState { + if result.Service == nil { + return nil + } + + svc := result.Service + resources := make([]sdk.ResourceState, 0, 1+len(result.TaskSets)+len(result.Tasks)) + + // Service + svcHealth, svcDesc := serviceHealthStatus(svc) + resources = append(resources, sdk.ResourceState{ + ID: aws.ToString(svc.ServiceArn), + Name: aws.ToString(svc.ServiceName), + ResourceType: resourceTypeService, + ResourceMetadata: map[string]string{ + "status": aws.ToString(svc.Status), + "runningCount": strconv.Itoa(int(svc.RunningCount)), + "desiredCount": strconv.Itoa(int(svc.DesiredCount)), + "pendingCount": strconv.Itoa(int(svc.PendingCount)), + }, + HealthStatus: svcHealth, + HealthDescription: svcDesc, + DeployTarget: deployTarget, + CreatedAt: derefTime(svc.CreatedAt), + }) + + // TaskSets, build a map startedBy(=taskSet.Id) -> taskSetArn for task parent lookup + taskSetArnByID := make(map[string]string, len(result.TaskSets)) + for _, ts := range result.TaskSets { + tsHealth, tsDesc := taskSetHealthStatus(&ts) + meta := map[string]string{ + "status": aws.ToString(ts.Status), + "taskDefinition": aws.ToString(ts.TaskDefinition), + } + if ts.Scale != nil { + meta["scale"] = fmt.Sprintf("%.0f%%", ts.Scale.Value) + } + for _, tag := range ts.Tags { + if aws.ToString(tag.Key) == provider.LabelCommitHash { + meta["commit"] = aws.ToString(tag.Value) + break + } + } + + taskSetArnByID[aws.ToString(ts.Id)] = aws.ToString(ts.TaskSetArn) + resources = append(resources, sdk.ResourceState{ + ID: aws.ToString(ts.TaskSetArn), + ParentIDs: []string{aws.ToString(svc.ServiceArn)}, + Name: aws.ToString(ts.Id), + ResourceType: resourceTypeTaskSet, + ResourceMetadata: meta, + HealthStatus: tsHealth, + HealthDescription: tsDesc, + DeployTarget: deployTarget, + CreatedAt: derefTime(ts.CreatedAt), + }) + } + + // Tasks + for _, task := range result.Tasks { + taskHealth, taskDesc := taskHealthStatus(&task) + meta := map[string]string{ + "lastStatus": aws.ToString(task.LastStatus), + } + if task.StartedAt != nil { + meta["startedAt"] = task.StartedAt.Format(time.RFC3339) + } + + // Link task to its TaskSet via StartedBy field + parentIDs := []string{aws.ToString(svc.ServiceArn)} + if tsArn, ok := taskSetArnByID[aws.ToString(task.StartedBy)]; ok { + parentIDs = []string{tsArn} + } + + resources = append(resources, sdk.ResourceState{ + ID: aws.ToString(task.TaskArn), + ParentIDs: parentIDs, + Name: aws.ToString(task.TaskArn), + ResourceType: resourceTypeTask, + ResourceMetadata: meta, + HealthStatus: taskHealth, + HealthDescription: taskDesc, + DeployTarget: deployTarget, + CreatedAt: derefTime(task.CreatedAt), + }) + } + + return resources +} + +// derefTime safely dereferences a *time.Time returned by the AWS SDK. +// +// AWS represents optional timestamps as pointers, so a nil value indicates the field was not populated. +func derefTime(t *time.Time) time.Time { + if t == nil { + return time.Time{} + } + return *t +} + +func serviceHealthStatus(svc *types.Service) (sdk.ResourceHealthStatus, string) { + switch aws.ToString(svc.Status) { + case "ACTIVE": + if svc.PendingCount == 0 && svc.RunningCount >= svc.DesiredCount { + return sdk.ResourceHealthStateHealthy, "" + } + return sdk.ResourceHealthStateUnhealthy, fmt.Sprintf( + "Service has %d running tasks out of %d desired (%d pending)", + svc.RunningCount, svc.DesiredCount, svc.PendingCount, + ) + case "DRAINING", "INACTIVE": + return sdk.ResourceHealthStateUnhealthy, fmt.Sprintf("Service is in %s state", aws.ToString(svc.Status)) + default: + return sdk.ResourceHealthStateUnknown, "" + } +} + +func taskSetHealthStatus(ts *types.TaskSet) (sdk.ResourceHealthStatus, string) { + switch ts.StabilityStatus { + case types.StabilityStatusSteadyState: + return sdk.ResourceHealthStateHealthy, "" + case types.StabilityStatusStabilizing: + return sdk.ResourceHealthStateUnhealthy, fmt.Sprintf( + "TaskSet is stabilizing: %d pending, %d running", + ts.PendingCount, ts.RunningCount, + ) + default: + return sdk.ResourceHealthStateUnknown, "" + } +} + +func taskHealthStatus(task *types.Task) (sdk.ResourceHealthStatus, string) { + if task.LastStatus == nil { + return sdk.ResourceHealthStateUnknown, "" + } + switch aws.ToString(task.LastStatus) { + case "RUNNING": + if task.HealthStatus == types.HealthStatusUnhealthy { + return sdk.ResourceHealthStateUnhealthy, "Task container health checks are failing" + } + return sdk.ResourceHealthStateHealthy, "" + case "PENDING": + return sdk.ResourceHealthStateUnhealthy, "Task is in PENDING state" + case "STOPPED": + reason := "Task stopped" + if task.StoppedReason != nil { + reason = fmt.Sprintf("Task stopped: %s", aws.ToString(task.StoppedReason)) + } + return sdk.ResourceHealthStateUnhealthy, reason + default: + return sdk.ResourceHealthStateUnknown, "" + } +} diff --git a/pkg/app/pipedv1/plugin/ecs/livestate/sync.go b/pkg/app/pipedv1/plugin/ecs/livestate/sync.go new file mode 100644 index 0000000000..2f98d4e485 --- /dev/null +++ b/pkg/app/pipedv1/plugin/ecs/livestate/sync.go @@ -0,0 +1,84 @@ +// Copyright 2026 The PipeCD Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package livestate + +import ( + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/ecs/types" + sdk "github.com/pipe-cd/piped-plugin-sdk-go" + + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/provider" +) + +// computeSyncState determines whether the live ECS state matches what was declared in Git. +// +// Rather than diffing configuration fields directly, +// it relies on the commit hash that PipeCD stamps onto the PRIMARY task set during deployment. +// +// This keeps the check resilient to AWS-side mutations (e.g. auto-scaling adjustments) that are intentional +// and should not be treated as drift. +func computeSyncState(result *queryResourcesResult, desiredService types.Service, commitHash string) sdk.ApplicationSyncState { + // A nil service means AWS returned no matching service for the cluster in the desired configuration. + // The app has never been deployed, or was deleted out-of-band + if result.Service == nil { + return sdk.ApplicationSyncState{ + Status: sdk.ApplicationSyncStateOutOfSync, + ShortReason: fmt.Sprintf("Service %s not found in cluster", aws.ToString(desiredService.ServiceName)), + } + } + + // Under the EXTERNAL deployment controller model PipeCD uses, + // exactly one task set carries the PRIMARY status at any point in time. + // Its absence means no successful deployment has ever completed, or the service is in a broken intermediate state. + var primaryTaskSet *types.TaskSet + for i := range result.TaskSets { + if aws.ToString(result.TaskSets[i].Status) == "PRIMARY" { + primaryTaskSet = &result.TaskSets[i] + break + } + } + if primaryTaskSet == nil { + return sdk.ApplicationSyncState{ + Status: sdk.ApplicationSyncStateOutOfSync, + ShortReason: "No PRIMARY task set found", + } + } + + // PipeCD tags every task set it creates with the Git commit hash at the time of deployment. + // If the PRIMARY task set's hash differs from the current HEAD, + // the cluster is running an older revision and needs a new deployment to reconcile. + // + // When commitHash is empty (e.g. the deployment source has no VCS info), + // we skip this check rather than producing a false OUT_OF_SYNC. + if commitHash != "" { + for _, tag := range primaryTaskSet.Tags { + if aws.ToString(tag.Key) == provider.LabelCommitHash { + liveCommit := aws.ToString(tag.Value) + if liveCommit != commitHash { + return sdk.ApplicationSyncState{ + Status: sdk.ApplicationSyncStateOutOfSync, + ShortReason: "Deployed commit does not match current commit", + Reason: fmt.Sprintf("deployed: %s, expected: %s", liveCommit, commitHash), + } + } + break + } + } + } + + return sdk.ApplicationSyncState{Status: sdk.ApplicationSyncStateSynced} +} diff --git a/pkg/app/pipedv1/plugin/ecs/main.go b/pkg/app/pipedv1/plugin/ecs/main.go index a98665d61a..db91f96430 100644 --- a/pkg/app/pipedv1/plugin/ecs/main.go +++ b/pkg/app/pipedv1/plugin/ecs/main.go @@ -20,12 +20,14 @@ import ( sdk "github.com/pipe-cd/piped-plugin-sdk-go" "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/deployment" + "github.com/pipe-cd/pipecd/pkg/app/pipedv1/plugin/ecs/livestate" ) func main() { plugin, err := sdk.NewPlugin( "0.0.1", sdk.WithDeploymentPlugin(&deployment.ECSPlugin{}), + sdk.WithLivestatePlugin(&livestate.ECSLivestatePlugin{}), ) if err != nil { log.Fatalf("failed to create plugin: %v", err) diff --git a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go index 389de028fd..6c3230dbd6 100644 --- a/pkg/app/pipedv1/plugin/ecs/provider/ecs.go +++ b/pkg/app/pipedv1/plugin/ecs/provider/ecs.go @@ -67,6 +67,14 @@ func LoadTaskDefinition(appDir, taskDefinition string) (types.TaskDefinition, er return loadTaskDefinition(path) } +// ParseServiceDefinition returns Service object from a given service definition file without adding deployment tags +// +// Use this for read-only operations like livestate that do not need PipeCD metadata injected +func ParseServiceDefinition(appDir, serviceDefinition string) (types.Service, error) { + path := filepath.Join(appDir, serviceDefinition) + return loadServiceDefinition(path) +} + // LoadServiceDefinition returns Service object from a given service definition file. func LoadServiceDefinition(appDir, serviceDefinition string, input *sdk.ExecuteStageInput[config.ECSApplicationSpec]) (types.Service, error) { path := filepath.Join(appDir, serviceDefinition)