From d2c8d7fa2fe554606b44b412a3607aa1060a98eb Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Tue, 2 Sep 2025 12:48:12 -0400 Subject: [PATCH 1/8] feat: implement the processor server feat: add metrics setup for processor feat: remove exit from sigterm and let it shutdown gracefully feat: fix conflicts feat: update logs feat: test pipeline with ProcessorAllocator in alpha feat: test pipeline with ProcessorAllocator in alpha feat: debug e2e failing feat: debug e2e failing feat: fix error handling and feature gate from dep env feat: fix extension and allocator with processor server feat: update unit test feat: add missing status feat: fix missing namespace feat: refactor a bit the error handling feat: update copyright to 2026 feat: minor changes feat: rollback unit test fix (other PR) feat: fix rebase issue feat: fix unit test from rebase Signed-off-by: Thomas Lacroix --- build/Makefile | 4 +- cloudbuild.yaml | 2 +- cmd/allocator/main.go | 4 +- cmd/processor/handler.go | 347 +++++++++++++++++++ cmd/processor/main.go | 283 ++++++++++++--- install/helm/agones/templates/processor.yaml | 39 ++- install/helm/agones/values.yaml | 2 + pkg/gameserverallocations/controller.go | 67 ++-- 8 files changed, 665 insertions(+), 83 deletions(-) create mode 100644 cmd/processor/handler.go diff --git a/build/Makefile b/build/Makefile index 6f72e27212..0860bfec90 100644 --- a/build/Makefile +++ b/build/Makefile @@ -73,7 +73,7 @@ BETA_FEATURE_GATES ?= "CountsAndLists=true&GKEAutopilotExtendedDurationPods=true # Enable all alpha feature gates. Keep in sync with `false` (alpha) entries in pkg/util/runtime/features.go:featureDefaults -ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&PlayerTracking=true&WasmAutoscaler=true&Example=true" +ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&PlayerTracking=true&WasmAutoscaler=true&Example=true&ProcessorAllocator=true" # Build with Windows support WITH_WINDOWS=1 @@ -798,7 +798,7 @@ build-processor-image-arm64: $(ensure-build-image) build-processor-binary create # Build the debug image for the processor service build-processor-debug-image: $(ensure-build-image) build-processor-debug-binary build-licenses build-required-src-dist - docker build $(agones_path)/cmd/processor/ --file $(agones_path)/cmd/processor/Dockerfile.debug --tag=$(processor_tag) $(DOCKER_BUILD_ARGS) + docker build $(agones_path)/cmd/processor/ --file $(agones_path)/cmd/processor/Dockerfile.debug --tag=$(processor_amd64_tag) $(DOCKER_BUILD_ARGS) # Pushes up the processor image push-processor-image: push-processor-image-amd64 diff --git a/cloudbuild.yaml b/cloudbuild.yaml index bbba6011cd..0e532d1034 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -322,7 +322,7 @@ steps: # Keep in sync with the inverse of 'alpha' and 'beta' features in # pkg/util/runtime/features.go:featureDefaults - featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=false&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=false&WasmAutoscaler=true&Example=true" + featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=false&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=false&WasmAutoscaler=true&Example=true&ProcessorAllocator=true" featureWithoutGate="" # Use this if specific feature gates can only be supported on specific Kubernetes versions. diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 4287eee04f..b612c51bde 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -737,9 +737,7 @@ func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest) gsa.ApplyDefaults() if runtime.FeatureEnabled(runtime.FeatureProcessorAllocator) { - req := converters.ConvertGSAToAllocationRequest(gsa) - - resp, err := h.processorClient.Allocate(ctx, req) + resp, err := h.processorClient.Allocate(ctx, in) if err != nil { logger.WithField("gsa", gsa).WithError(err).Error("allocation failed") return nil, err diff --git a/cmd/processor/handler.go b/cmd/processor/handler.go new file mode 100644 index 0000000000..22dcc9e39f --- /dev/null +++ b/cmd/processor/handler.go @@ -0,0 +1,347 @@ +// Copyright 2026 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Processor +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "agones.dev/agones/pkg/allocation/converters" + allocationpb "agones.dev/agones/pkg/allocation/go" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + "agones.dev/agones/pkg/client/clientset/versioned" + "agones.dev/agones/pkg/client/informers/externalversions" + "agones.dev/agones/pkg/gameserverallocations" + "agones.dev/agones/pkg/gameservers" + + "github.com/heptiolabs/healthcheck" + "github.com/sirupsen/logrus" + "go.opencensus.io/plugin/ocgrpc" + rpcstatus "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/status" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" +) + +// allocationResult represents the result of an allocation attempt +type allocationResult struct { + response *allocationpb.AllocationResponse + error *rpcstatus.Status +} + +// processorHandler represents the gRPC server for processing allocation requests +type processorHandler struct { + allocationpb.UnimplementedProcessorServer + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + allocator *gameserverallocations.Allocator + clients map[string]allocationpb.Processor_StreamBatchesServer + grpcUnallocatedStatusCode codes.Code + pullInterval time.Duration +} + +// newServiceHandler creates a new instance of processorHandler +func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, agonesClient versioned.Interface, + health healthcheck.Handler, config processorConfig, grpcUnallocatedStatusCode codes.Code) *processorHandler { + defaultResync := 30 * time.Second + agonesInformerFactory := externalversions.NewSharedInformerFactory(agonesClient, defaultResync) + kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, defaultResync) + gsCounter := gameservers.NewPerNodeCounter(kubeInformerFactory, agonesInformerFactory) + + allocator := gameserverallocations.NewAllocator( + agonesInformerFactory.Multicluster().V1().GameServerAllocationPolicies(), + kubeInformerFactory.Core().V1().Secrets(), + agonesClient.AgonesV1(), + kubeClient, + gameserverallocations.NewAllocationCache(agonesInformerFactory.Agones().V1().GameServers(), gsCounter, health), + config.RemoteAllocationTimeout, + config.TotalRemoteAllocationTimeout, + config.AllocationBatchWaitTime) + + batchCtx, cancel := context.WithCancel(ctx) + h := processorHandler{ + allocator: allocator, + ctx: batchCtx, + cancel: cancel, + grpcUnallocatedStatusCode: grpcUnallocatedStatusCode, + pullInterval: config.PullInterval, + } + + kubeInformerFactory.Start(ctx.Done()) + agonesInformerFactory.Start(ctx.Done()) + + if err := allocator.Run(ctx); err != nil { + logger.WithError(err).Fatal("Starting allocator failed.") + } + + return &h +} + +// StreamBatches handles a bidirectional stream for batch allocation requests from a client +// Registers the client, processes incoming batches, and sends responses +func (h *processorHandler) StreamBatches(stream allocationpb.Processor_StreamBatchesServer) error { + var clientID string + + // Wait for first message to get clientID + msg, err := stream.Recv() + if err != nil { + logger.WithError(err).Debug("Stream receive error on connect") + return err + } + + clientID = msg.GetClientId() + if clientID == "" { + logger.Warn("Received empty clientID, closing stream") + return nil + } + + h.addClient(clientID, stream) + defer h.removeClient(clientID) + logger.WithField("clientID", clientID).Debug("Client registered") + + // Main loop: handle incoming messages + for { + msg, err := stream.Recv() + if err != nil { + logger.WithError(err).Debug("Stream receive error") + return err + } + + payload := msg.GetPayload() + if payload == nil { + logger.WithField("clientID", clientID).Warn("Received message with nil payload") + continue + } + + batchPayload, ok := payload.(*allocationpb.ProcessorMessage_BatchRequest) + if !ok { + logger.WithField("clientID", clientID).Warn("Received non-batch request payload") + continue + } + + batchRequest := batchPayload.BatchRequest + batchID := batchRequest.GetBatchId() + requestWrappers := batchRequest.GetRequests() + + logger.WithFields(logrus.Fields{ + "clientID": clientID, + "batchID": batchID, + "requestCount": len(requestWrappers), + }).Debug("Received batch request") + + // Extract request IDs for logging + requestIDs := make([]string, len(requestWrappers)) + for i, wrapper := range requestWrappers { + requestIDs[i] = wrapper.GetRequestId() + } + + // Submit batch for processing + response := h.submitBatch(batchID, requestWrappers) + + respMsg := &allocationpb.ProcessorMessage{ + ClientId: clientID, + Payload: &allocationpb.ProcessorMessage_BatchResponse{ + BatchResponse: response, + }, + } + + // TODO: we might want to retry on failure here ? + if err := stream.Send(respMsg); err != nil { + logger.WithFields(logrus.Fields{ + "clientID": clientID, + "batchID": batchID, + "requestCount": len(requestWrappers), + }).WithError(err).Error("Failed to send response") + continue + } + } +} + +// StartPullRequestTicker periodically sends pull requests to all connected clients +func (h *processorHandler) StartPullRequestTicker() { + go func() { + ticker := time.NewTicker(h.pullInterval) + defer ticker.Stop() + + for { + select { + case <-h.ctx.Done(): + return + case <-ticker.C: + h.mu.RLock() + for clientID, stream := range h.clients { + pullMsg := &allocationpb.ProcessorMessage{ + ClientId: clientID, + Payload: &allocationpb.ProcessorMessage_Pull{ + Pull: &allocationpb.PullRequest{Message: "pull"}, + }, + } + go func(id string, s allocationpb.Processor_StreamBatchesServer) { + if err := s.Send(pullMsg); err != nil { + logger.WithFields(logrus.Fields{ + "clientID": id, + "error": err, + }).Warn("Failed to send pull request, removing client") + h.removeClient(id) + } + }(clientID, stream) + } + h.mu.RUnlock() + } + } + }() +} + +// processAllocationsConcurrently processes multiple allocation requests in parallel +func (h *processorHandler) processAllocationsConcurrently(requestWrappers []*allocationpb.RequestWrapper) []allocationResult { + var wg sync.WaitGroup + results := make([]allocationResult, len(requestWrappers)) + + for i, reqWrapper := range requestWrappers { + wg.Add(1) + go func(index int, requestWrapper *allocationpb.RequestWrapper) { + defer wg.Done() + results[index] = h.processAllocation(requestWrapper.Request) + }(i, reqWrapper) + } + + wg.Wait() + + return results +} + +// processAllocation handles a single allocation request by using the allocator +func (h *processorHandler) processAllocation(req *allocationpb.AllocationRequest) allocationResult { + gsa := converters.ConvertAllocationRequestToGSA(req) + gsa.ApplyDefaults() + + makeError := func(err error, fallbackCode codes.Code) allocationResult { + code := fallbackCode + msg := err.Error() + if grpcStatus, ok := status.FromError(err); ok { + code = grpcStatus.Code() + msg = grpcStatus.Message() + } + return allocationResult{ + error: &rpcstatus.Status{Code: int32(code), Message: msg}, + } + } + + resultObj, err := h.allocator.Allocate(h.ctx, gsa) + if err != nil { + return makeError(err, h.grpcUnallocatedStatusCode) + } + + if s, ok := resultObj.(*metav1.Status); ok { + return allocationResult{ + error: &rpcstatus.Status{ + Code: int32(grpcCodeFromHTTPStatus(int(s.Code))), + Message: s.Message, + }, + } + } + + allocatedGsa, ok := resultObj.(*allocationv1.GameServerAllocation) + if !ok { + return allocationResult{ + error: &rpcstatus.Status{ + Code: int32(codes.Internal), + Message: fmt.Sprintf("internal server error - Bad GSA format %v", resultObj), + }, + } + } + + response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa, h.grpcUnallocatedStatusCode) + if err != nil { + return makeError(err, h.grpcUnallocatedStatusCode) + } + + return allocationResult{response: response} +} + +// submitBatch accepts a batch of allocation requests, processes them, and assembles a batch response +func (h *processorHandler) submitBatch(batchID string, requestWrappers []*allocationpb.RequestWrapper) *allocationpb.BatchResponse { + results := h.processAllocationsConcurrently(requestWrappers) + responseWrappers := make([]*allocationpb.ResponseWrapper, len(requestWrappers)) + + for i, result := range results { + wrapper := &allocationpb.ResponseWrapper{ + RequestId: requestWrappers[i].RequestId, + } + + if result.error != nil { + wrapper.Result = &allocationpb.ResponseWrapper_Error{ + Error: result.error, + } + } else { + wrapper.Result = &allocationpb.ResponseWrapper_Response{ + Response: result.response, + } + } + responseWrappers[i] = wrapper + } + + return &allocationpb.BatchResponse{ + BatchId: batchID, + Responses: responseWrappers, + } +} + +// getGRPCServerOptions returns a list of GRPC server options to use when only serving gRPC requests. +func (h *processorHandler) getGRPCServerOptions() []grpc.ServerOption { + opts := []grpc.ServerOption{ + grpc.StatsHandler(&ocgrpc.ServerHandler{}), + + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 10 * time.Second, + PermitWithoutStream: true, + }), + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: 1 * time.Minute, + Timeout: 30 * time.Second, + Time: 30 * time.Second, + }), + } + + return opts +} + +// addClient registers a new client for streaming allocation responses +func (h *processorHandler) addClient(clientID string, stream allocationpb.Processor_StreamBatchesServer) { + h.mu.Lock() + defer h.mu.Unlock() + + if h.clients == nil { + h.clients = make(map[string]allocationpb.Processor_StreamBatchesServer) + } + + h.clients[clientID] = stream +} + +// removeClient unregisters a client from streaming allocation responses +func (h *processorHandler) removeClient(clientID string) { + h.mu.Lock() + defer h.mu.Unlock() + + delete(h.clients, clientID) +} diff --git a/cmd/processor/main.go b/cmd/processor/main.go index 03c603763c..bd7de4646c 100644 --- a/cmd/processor/main.go +++ b/cmd/processor/main.go @@ -17,11 +17,18 @@ package main import ( "context" + "errors" + "fmt" + "net" + "net/http" "os" "strings" "time" "agones.dev/agones/pkg" + allocationpb "agones.dev/agones/pkg/allocation/go" + "agones.dev/agones/pkg/client/clientset/versioned" + "agones.dev/agones/pkg/metrics" "agones.dev/agones/pkg/util/httpserver" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/signals" @@ -31,6 +38,10 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/pflag" "github.com/spf13/viper" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + grpchealth "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -39,12 +50,24 @@ import ( ) const ( - logLevelFlag = "log-level" - leaderElectionFlag = "leader-election" - podNamespace = "pod-namespace" - leaseDurationFlag = "lease-duration" - renewDeadlineFlag = "renew-deadline" - retryPeriodFlag = "retry-period" + allocationBatchWaitTime = "allocation-batch-wait-time" + apiServerBurstQPSFlag = "api-server-qps-burst" + apiServerSustainedQPSFlag = "api-server-qps" + enablePrometheusMetricsFlag = "prometheus-exporter" + enableStackdriverMetricsFlag = "stackdriver-exporter" + grpcPortFlag = "grpc-port" + httpUnallocatedStatusCode = "http-unallocated-status-code" + leaderElectionFlag = "leader-election" + leaseDurationFlag = "lease-duration" + logLevelFlag = "log-level" + podNamespace = "pod-namespace" + projectIDFlag = "gcp-project-id" + pullIntervalFlag = "pull-interval" + remoteAllocationTimeoutFlag = "remote-allocation-timeout" + renewDeadlineFlag = "renew-deadline" + retryPeriodFlag = "retry-period" + stackdriverLabels = "stackdriver-labels" + totalRemoteAllocationTimeoutFlag = "total-remote-allocation-timeout" ) var ( @@ -52,41 +75,108 @@ var ( ) type processorConfig struct { - LogLevel string - LeaderElection bool - PodNamespace string - LeaseDuration time.Duration - RenewDeadline time.Duration - RetryPeriod time.Duration + GCPProjectID string + LogLevel string + PodNamespace string + StackdriverLabels string + APIServerBurstQPS int + APIServerSustainedQPS int + GRPCPort int + HTTPUnallocatedStatusCode int + LeaderElection bool + PrometheusMetrics bool + Stackdriver bool + AllocationBatchWaitTime time.Duration + LeaseDuration time.Duration + PullInterval time.Duration + RenewDeadline time.Duration + RemoteAllocationTimeout time.Duration + RetryPeriod time.Duration + TotalRemoteAllocationTimeout time.Duration } func parseEnvFlags() processorConfig { - viper.SetDefault(logLevelFlag, "Info") + viper.SetDefault(allocationBatchWaitTime, 50*time.Millisecond) + viper.SetDefault(apiServerBurstQPSFlag, 500) + viper.SetDefault(apiServerSustainedQPSFlag, 400) + viper.SetDefault(enablePrometheusMetricsFlag, true) + viper.SetDefault(enableStackdriverMetricsFlag, false) + viper.SetDefault(grpcPortFlag, 9090) + viper.SetDefault(httpUnallocatedStatusCode, http.StatusTooManyRequests) viper.SetDefault(leaderElectionFlag, false) - viper.SetDefault(podNamespace, "") viper.SetDefault(leaseDurationFlag, 15*time.Second) + viper.SetDefault(logLevelFlag, "Info") + viper.SetDefault(podNamespace, "") + viper.SetDefault(projectIDFlag, "") + viper.SetDefault(pullIntervalFlag, 200*time.Millisecond) + viper.SetDefault(remoteAllocationTimeoutFlag, 10*time.Second) viper.SetDefault(renewDeadlineFlag, 10*time.Second) viper.SetDefault(retryPeriodFlag, 2*time.Second) + viper.SetDefault(stackdriverLabels, "") + viper.SetDefault(totalRemoteAllocationTimeoutFlag, 30*time.Second) - pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Log level") + pflag.Duration(allocationBatchWaitTime, viper.GetDuration(allocationBatchWaitTime), "Flag to configure the waiting period between allocations batches") + pflag.Int32(apiServerBurstQPSFlag, viper.GetInt32(apiServerBurstQPSFlag), "Maximum burst queries per second to send to the API server") + pflag.Int32(apiServerSustainedQPSFlag, viper.GetInt32(apiServerSustainedQPSFlag), "Maximum sustained queries per second to send to the API server") + pflag.Bool(enablePrometheusMetricsFlag, viper.GetBool(enablePrometheusMetricsFlag), "Flag to activate metrics of Agones. Can also use PROMETHEUS_EXPORTER env variable.") + pflag.Bool(enableStackdriverMetricsFlag, viper.GetBool(enableStackdriverMetricsFlag), "Flag to activate stackdriver monitoring metrics for Agones. Can also use STACKDRIVER_EXPORTER env variable.") + pflag.Int32(grpcPortFlag, viper.GetInt32(grpcPortFlag), "Port to listen on for gRPC requests") + pflag.Int32(httpUnallocatedStatusCode, viper.GetInt32(httpUnallocatedStatusCode), "HTTP status code to return when no GameServer is available") pflag.Bool(leaderElectionFlag, viper.GetBool(leaderElectionFlag), "Enable leader election") - pflag.String(podNamespace, viper.GetString(podNamespace), "Pod namespace") pflag.Duration(leaseDurationFlag, viper.GetDuration(leaseDurationFlag), "Leader election lease duration") + pflag.String(logLevelFlag, viper.GetString(logLevelFlag), "Log level") + pflag.String(podNamespace, viper.GetString(podNamespace), "Pod namespace") + pflag.String(projectIDFlag, viper.GetString(projectIDFlag), "GCP ProjectID used for Stackdriver, if not specified ProjectID from Application Default Credentials would be used. Can also use GCP_PROJECT_ID env variable.") + pflag.Duration(pullIntervalFlag, viper.GetDuration(pullIntervalFlag), "Interval between pull requests sent to processor clients") + pflag.Duration(remoteAllocationTimeoutFlag, viper.GetDuration(remoteAllocationTimeoutFlag), "Flag to set remote allocation call timeout.") pflag.Duration(renewDeadlineFlag, viper.GetDuration(renewDeadlineFlag), "Leader election renew deadline") pflag.Duration(retryPeriodFlag, viper.GetDuration(retryPeriodFlag), "Leader election retry period") + pflag.String(stackdriverLabels, viper.GetString(stackdriverLabels), "A set of default labels to add to all stackdriver metrics generated. By default metadata are automatically added using Kubernetes API and GCP metadata enpoint.") + pflag.Duration(totalRemoteAllocationTimeoutFlag, viper.GetDuration(totalRemoteAllocationTimeoutFlag), "Flag to set total remote allocation timeout including retries.") pflag.Parse() viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) - viper.AutomaticEnv() - _ = viper.BindPFlags(pflag.CommandLine) + runtime.Must(viper.BindEnv(allocationBatchWaitTime)) + runtime.Must(viper.BindEnv(apiServerBurstQPSFlag)) + runtime.Must(viper.BindEnv(apiServerSustainedQPSFlag)) + runtime.Must(viper.BindEnv(enablePrometheusMetricsFlag)) + runtime.Must(viper.BindEnv(enableStackdriverMetricsFlag)) + runtime.Must(viper.BindEnv(grpcPortFlag)) + runtime.Must(viper.BindEnv(httpUnallocatedStatusCode)) + runtime.Must(viper.BindEnv(leaderElectionFlag)) + runtime.Must(viper.BindEnv(leaseDurationFlag)) + runtime.Must(viper.BindEnv(logLevelFlag)) + runtime.Must(viper.BindEnv(podNamespace)) + runtime.Must(viper.BindEnv(projectIDFlag)) + runtime.Must(viper.BindEnv(pullIntervalFlag)) + runtime.Must(viper.BindEnv(remoteAllocationTimeoutFlag)) + runtime.Must(viper.BindEnv(renewDeadlineFlag)) + runtime.Must(viper.BindEnv(retryPeriodFlag)) + runtime.Must(viper.BindEnv(stackdriverLabels)) + runtime.Must(viper.BindEnv(totalRemoteAllocationTimeoutFlag)) + runtime.Must(runtime.FeaturesBindEnv()) + + runtime.Must(runtime.ParseFeaturesFromEnv()) return processorConfig{ - LogLevel: viper.GetString(logLevelFlag), - LeaderElection: viper.GetBool(leaderElectionFlag), - PodNamespace: viper.GetString(podNamespace), - LeaseDuration: viper.GetDuration(leaseDurationFlag), - RenewDeadline: viper.GetDuration(renewDeadlineFlag), - RetryPeriod: viper.GetDuration(retryPeriodFlag), + AllocationBatchWaitTime: viper.GetDuration(allocationBatchWaitTime), + APIServerBurstQPS: int(viper.GetInt32(apiServerBurstQPSFlag)), + APIServerSustainedQPS: int(viper.GetInt32(apiServerSustainedQPSFlag)), + GCPProjectID: viper.GetString(projectIDFlag), + GRPCPort: int(viper.GetInt32(grpcPortFlag)), + HTTPUnallocatedStatusCode: int(viper.GetInt32(httpUnallocatedStatusCode)), + LeaderElection: viper.GetBool(leaderElectionFlag), + LeaseDuration: viper.GetDuration(leaseDurationFlag), + LogLevel: viper.GetString(logLevelFlag), + PodNamespace: viper.GetString(podNamespace), + PrometheusMetrics: viper.GetBool(enablePrometheusMetricsFlag), + PullInterval: viper.GetDuration(pullIntervalFlag), + RenewDeadline: viper.GetDuration(renewDeadlineFlag), + RemoteAllocationTimeout: viper.GetDuration(remoteAllocationTimeoutFlag), + RetryPeriod: viper.GetDuration(retryPeriodFlag), + Stackdriver: viper.GetBool(enableStackdriverMetricsFlag), + StackdriverLabels: viper.GetString(stackdriverLabels), + TotalRemoteAllocationTimeout: viper.GetDuration(totalRemoteAllocationTimeoutFlag), } } @@ -110,47 +200,46 @@ func main() { } healthserver := &httpserver.Server{Logger: logger} - health := healthcheck.NewHandler() + var health healthcheck.Handler - config, err := rest.InClusterConfig() - if err != nil { - logger.WithError(err).Fatal("Failed to create in-cluster config") - panic("Failed to create in-cluster config: " + err.Error()) + metricsConf := metrics.Config{ + Stackdriver: conf.Stackdriver, + PrometheusMetrics: conf.PrometheusMetrics, + GCPProjectID: conf.GCPProjectID, + StackdriverLabels: conf.StackdriverLabels, } - kubeClient, err := kubernetes.NewForConfig(config) + health, closer := metrics.SetupMetrics(metricsConf, healthserver) + defer closer() + + metrics.SetReportingPeriod(conf.PrometheusMetrics, conf.Stackdriver) + + kubeClient, agonesClient, err := getClients(conf) if err != nil { - logger.WithError(err).Fatal("Failed to create Kubernetes client") - panic("Failed to create Kubernetes client: " + err.Error()) + logger.WithError(err).Fatal("Could not create clients") } + grpcUnallocatedStatusCode := grpcCodeFromHTTPStatus(conf.HTTPUnallocatedStatusCode) + processorService := newServiceHandler(ctx, kubeClient, agonesClient, health, conf, grpcUnallocatedStatusCode) + + grpcHealth := grpchealth.NewServer() + grpcHealth.SetServingStatus("processor", grpc_health_v1.HealthCheckResponse_NOT_SERVING) + runGRPC(ctx, processorService, grpcHealth, conf.GRPCPort) + go func() { healthserver.Handle("/", health) - _ = healthserver.Run(context.Background(), 0) + _ = healthserver.Run(ctx, 0) }() signals.NewSigTermHandler(func() { logger.Info("Pod shutdown has been requested, failing readiness check") + grpcHealth.Shutdown() cancelCtx() - os.Exit(0) }) - whenLeader(ctx, cancelCtx, logger, conf, kubeClient, func(ctx context.Context) { + whenLeader(ctx, cancelCtx, logger, conf, kubeClient, func(_ context.Context) { logger.Info("Starting processor work as leader") - - // Simulate processor work (to ensure the leader is working) - // TODO: implement processor work - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - logger.Info("Processor work completed") - return - case <-ticker.C: - logger.Info("Processor is active as leader") - } - } + grpcHealth.SetServingStatus("processor", grpc_health_v1.HealthCheckResponse_SERVING) + processorService.StartPullRequestTicker() }) logger.Info("Processor exited gracefully.") @@ -158,13 +247,16 @@ func main() { func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, conf processorConfig, kubeClient *kubernetes.Clientset, start func(_ context.Context)) { + logger.WithField("leaderElectionEnabled", conf.LeaderElection).Info("Leader election configuration") + if !conf.LeaderElection { start(ctx) + <-ctx.Done() + return } id := uuid.New().String() - lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: "agones-processor-lock", @@ -206,3 +298,92 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E }, }) } + +func runGRPC(ctx context.Context, h *processorHandler, grpcHealth *grpchealth.Server, grpcPort int) { + logger.WithField("port", grpcPort).Info("Running the grpc handler on port") + listener, err := net.Listen("tcp", fmt.Sprintf(":%d", grpcPort)) + if err != nil { + logger.WithError(err).Fatalf("Failed to listen on TCP port %d", grpcPort) + os.Exit(1) + } + + grpcServer := grpc.NewServer(h.getGRPCServerOptions()...) + allocationpb.RegisterProcessorServer(grpcServer, h) + grpc_health_v1.RegisterHealthServer(grpcServer, grpcHealth) + + go func() { + go func() { + <-ctx.Done() + grpcServer.GracefulStop() + }() + + err := grpcServer.Serve(listener) + if err != nil { + logger.WithError(err).Fatal("Allocation service crashed") + os.Exit(1) + } + logger.Info("Allocation server closed") + os.Exit(0) + + }() +} + +// Set up our client which we will use to call the API +func getClients(ctlConfig processorConfig) (*kubernetes.Clientset, *versioned.Clientset, error) { + // Create the in-cluster config + config, err := rest.InClusterConfig() + if err != nil { + return nil, nil, errors.New("Could not create in cluster config") + } + + config.QPS = float32(ctlConfig.APIServerSustainedQPS) + config.Burst = ctlConfig.APIServerBurstQPS + + // Access to the Agones resources through the Agones Clientset + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, nil, errors.New("Could not create the kubernetes api clientset") + } + + // Access to the Agones resources through the Agones Clientset + agonesClient, err := versioned.NewForConfig(config) + if err != nil { + return nil, nil, errors.New("Could not create the agones api clientset") + } + return kubeClient, agonesClient, nil +} + +// grpcCodeFromHTTPStatus converts an HTTP status code to the corresponding gRPC status code. +func grpcCodeFromHTTPStatus(httpUnallocatedStatusCode int) codes.Code { + switch httpUnallocatedStatusCode { + case http.StatusOK: + return codes.OK + case 499: + return codes.Canceled + case http.StatusInternalServerError: + return codes.Internal + case http.StatusBadRequest: + return codes.InvalidArgument + case http.StatusGatewayTimeout: + return codes.DeadlineExceeded + case http.StatusNotFound: + return codes.NotFound + case http.StatusConflict: + return codes.AlreadyExists + case http.StatusForbidden: + return codes.PermissionDenied + case http.StatusUnauthorized: + return codes.Unauthenticated + case http.StatusTooManyRequests: + return codes.ResourceExhausted + case http.StatusNotImplemented: + return codes.Unimplemented + case http.StatusUnprocessableEntity: + return codes.InvalidArgument + case http.StatusServiceUnavailable: + return codes.Unavailable + default: + logger.WithField("httpStatusCode", httpUnallocatedStatusCode).Warnf("Received unknown http status code, defaulting to codes.ResourceExhausted / 429") + return codes.ResourceExhausted + } +} diff --git a/install/helm/agones/templates/processor.yaml b/install/helm/agones/templates/processor.yaml index 5efe43284a..96673205d5 100644 --- a/install/helm/agones/templates/processor.yaml +++ b/install/helm/agones/templates/processor.yaml @@ -113,13 +113,39 @@ spec: {{- if gt (int .Values.agones.allocator.processor.replicas) 1 }} - name: LEADER_ELECTION value: "true" + {{- end }} - name: LEASE_DURATION value: {{ .Values.agones.allocator.processor.leaderElection.leaseDuration | default "15s" | quote }} - name: RENEW_DEADLINE value: {{ .Values.agones.allocator.processor.leaderElection.renewDeadline | default "10s" | quote }} - name: RETRY_PERIOD value: {{ .Values.agones.allocator.processor.leaderElection.retryPeriod | default "2s" | quote }} - {{- end }} + - name: PULL_INTERVAL + value: {{ .Values.agones.allocator.processor.pullInterval | default "200ms" | quote }} + - name: GRPC_PORT + value: {{ .Values.agones.allocator.processor.grpc.port | quote }} + - name: HTTP_UNALLOCATED_STATUS_CODE + value: {{ .Values.agones.allocator.service.http.unallocatedStatusCode | quote }} + - name: API_SERVER_QPS + value: {{ .Values.agones.allocator.apiServerQPS | quote }} + - name: API_SERVER_QPS_BURST + value: {{ .Values.agones.allocator.apiServerQPSBurst | quote }} + - name: FEATURE_GATES + value: {{ .Values.agones.featureGates | quote }} + - name: PROMETHEUS_EXPORTER + value: {{ .Values.agones.metrics.prometheusEnabled | quote }} + - name: STACKDRIVER_EXPORTER + value: {{ .Values.agones.metrics.stackdriverEnabled | quote }} + - name: GCP_PROJECT_ID + value: {{ .Values.agones.metrics.stackdriverProjectID | quote }} + - name: STACKDRIVER_LABELS + value: {{ .Values.agones.metrics.stackdriverLabels | quote }} + - name: REMOTE_ALLOCATION_TIMEOUT + value: {{ .Values.agones.allocator.remoteAllocationTimeout | quote }} + - name: TOTAL_REMOTE_ALLOCATION_TIMEOUT + value: {{ .Values.agones.allocator.totalRemoteAllocationTimeout | quote }} + - name: ALLOCATION_BATCH_WAIT_TIME + value: {{ .Values.agones.allocator.processor.allocationBatchWaitTime | quote }} livenessProbe: httpGet: path: /live @@ -138,16 +164,18 @@ spec: ports: - name: http containerPort: {{ .Values.agones.allocator.processor.http.port }} + - name: grpc + containerPort: {{ .Values.agones.allocator.processor.grpc.port }} resources: {{- if .Values.agones.allocator.processor.resources }} {{ toYaml .Values.agones.allocator.processor.resources | indent 10 }} {{- else}} requests: - cpu: 10m - memory: 32Mi + cpu: 300m + memory: 256Mi limits: cpu: 500m - memory: 256Mi + memory: 512Mi {{- end }} {{- if .Values.agones.image.processor.pullSecret }} imagePullSecrets: @@ -175,4 +203,7 @@ spec: - port: {{ .Values.agones.allocator.processor.http.port }} name: http targetPort: {{ .Values.agones.allocator.processor.http.port }} + - port: {{ .Values.agones.allocator.processor.grpc.port }} + name: grpc + targetPort: {{ .Values.agones.allocator.processor.grpc.port }} {{- end }} \ No newline at end of file diff --git a/install/helm/agones/values.yaml b/install/helm/agones/values.yaml index 65ac44f430..053314c38c 100644 --- a/install/helm/agones/values.yaml +++ b/install/helm/agones/values.yaml @@ -292,6 +292,8 @@ agones: processor: replicas: 2 maxBatchSize: 100 + pullInterval: 200ms + allocationBatchWaitTime: 50ms resources: {} nodeSelector: {} annotations: {} diff --git a/pkg/gameserverallocations/controller.go b/pkg/gameserverallocations/controller.go index 24d5848004..83c662e4a5 100644 --- a/pkg/gameserverallocations/controller.go +++ b/pkg/gameserverallocations/controller.go @@ -25,6 +25,7 @@ import ( "github.com/heptiolabs/healthcheck" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -37,6 +38,7 @@ import ( "k8s.io/client-go/tools/record" "agones.dev/agones/pkg/allocation/converters" + pb "agones.dev/agones/pkg/allocation/go" allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" "agones.dev/agones/pkg/client/clientset/versioned" "agones.dev/agones/pkg/client/informers/externalversions" @@ -163,33 +165,15 @@ func (c *Extensions) processAllocationRequest(ctx context.Context, w http.Respon } if runtime.FeatureEnabled(runtime.FeatureProcessorAllocator) { - var result k8sruntime.Object - var code int - req := converters.ConvertGSAToAllocationRequest(gsa) resp, err := c.processorClient.Allocate(ctx, req) if err != nil { - if st, ok := status.FromError(err); ok { - code = gwruntime.HTTPStatusFromCode(st.Code()) - } else { - code = http.StatusInternalServerError - } - - result = &metav1.Status{ - TypeMeta: metav1.TypeMeta{ - Kind: "Status", - APIVersion: "v1", - }, - Status: metav1.StatusFailure, - Message: err.Error(), - Code: int32(code), - } - } else { - result = converters.ConvertAllocationResponseToGSA(resp, resp.Source) - code = http.StatusCreated + result, code := c.convertProcessorError(err, gsa) + return c.serialisation(r, w, result, code, scheme.Codecs) } - return c.serialisation(r, w, result, code, scheme.Codecs) + result := c.convertProcessorResponse(resp, gsa) + return c.serialisation(r, w, result, http.StatusCreated, scheme.Codecs) } result, err := c.allocator.Allocate(ctx, gsa) @@ -266,3 +250,42 @@ func (c *Extensions) serialisation(r *http.Request, w http.ResponseWriter, obj k err = info.Serializer.Encode(obj, w) return errors.Wrapf(err, "error encoding %T", obj) } + +// convertProcessorError handles processor client errors and converts them to appropriate responses +func (c *Extensions) convertProcessorError(err error, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, int) { + if st, ok := status.FromError(err); ok { + switch st.Code() { + case codes.ResourceExhausted: + gsa.Status.State = allocationv1.GameServerAllocationUnAllocated + return gsa, http.StatusCreated + case codes.Aborted: + gsa.Status.State = allocationv1.GameServerAllocationContention + return gsa, http.StatusCreated + default: + code := gwruntime.HTTPStatusFromCode(st.Code()) + return &metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: metav1.StatusFailure, + Message: err.Error(), + Code: int32(code), + }, code + } + } + + return &metav1.Status{ + TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, + Status: metav1.StatusFailure, + Message: err.Error(), + Code: int32(http.StatusInternalServerError), + }, http.StatusInternalServerError +} + +// convertProcessorResponse handles successful processor responses +func (c *Extensions) convertProcessorResponse(resp *pb.AllocationResponse, originalGSA *allocationv1.GameServerAllocation) k8sruntime.Object { + resultGSA := converters.ConvertAllocationResponseToGSA(resp, resp.Source) + resultGSA.Spec = originalGSA.Spec + resultGSA.ObjectMeta.Namespace = originalGSA.ObjectMeta.Namespace + resultGSA.ObjectMeta.Name = resp.GameServerName + + return resultGSA +} From 058f79c36eedc7857ae9dfbf9f0047834ec14f95 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Wed, 25 Mar 2026 15:57:39 -0400 Subject: [PATCH 2/8] feat: add interface around allocator (for processor) Signed-off-by: Thomas Lacroix --- cmd/processor/handler.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/processor/handler.go b/cmd/processor/handler.go index 22dcc9e39f..4f630af737 100644 --- a/cmd/processor/handler.go +++ b/cmd/processor/handler.go @@ -29,6 +29,8 @@ import ( "agones.dev/agones/pkg/gameserverallocations" "agones.dev/agones/pkg/gameservers" + + k8sruntime "k8s.io/apimachinery/pkg/runtime" "github.com/heptiolabs/healthcheck" "github.com/sirupsen/logrus" "go.opencensus.io/plugin/ocgrpc" @@ -42,6 +44,11 @@ import ( "k8s.io/client-go/kubernetes" ) +// gameServerAllocator represent the interface to allocate game servers +type gameServerAllocator interface { + Allocate(ctx context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) +} + // allocationResult represents the result of an allocation attempt type allocationResult struct { response *allocationpb.AllocationResponse @@ -54,7 +61,7 @@ type processorHandler struct { ctx context.Context cancel context.CancelFunc mu sync.RWMutex - allocator *gameserverallocations.Allocator + allocator gameServerAllocator clients map[string]allocationpb.Processor_StreamBatchesServer grpcUnallocatedStatusCode codes.Code pullInterval time.Duration From 599d1ba74197b9f4d06d2fa046b1722d0674f8ea Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Thu, 26 Mar 2026 03:11:53 -0400 Subject: [PATCH 3/8] feat: minor fixes around error handling / ctx and unit tests Signed-off-by: Thomas Lacroix --- cmd/allocator/main.go | 16 +- cmd/processor/handler.go | 18 +- cmd/processor/handler_test.go | 751 ++++++++++++++++++++++++ cmd/processor/main.go | 5 +- pkg/gameserverallocations/controller.go | 10 +- pkg/processor/client.go | 57 +- 6 files changed, 837 insertions(+), 20 deletions(-) create mode 100644 cmd/processor/handler_test.go diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index b612c51bde..7bed08a718 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -50,6 +50,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" @@ -301,6 +302,8 @@ func main() { return } logger.WithError(err).Error("Processor client failed, initiating graceful shutdown") + podReady = false + cancelListenCtx() } }() @@ -737,17 +740,20 @@ func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest) gsa.ApplyDefaults() if runtime.FeatureEnabled(runtime.FeatureProcessorAllocator) { + if errs := gsa.Validate(); len(errs) > 0 { + kind := allocationv1.SchemeGroupVersion.WithKind("GameServerAllocation").GroupKind() + statusErr := k8serrors.NewInvalid(kind, gsa.Name, errs) + s := &statusErr.ErrStatus + return nil, status.Errorf(codes.Code(s.Code), s.Message) + } + resp, err := h.processorClient.Allocate(ctx, in) if err != nil { logger.WithField("gsa", gsa).WithError(err).Error("allocation failed") return nil, err } - allocatedGsa := converters.ConvertAllocationResponseToGSA(resp, resp.Source) - response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa, h.grpcUnallocatedStatusCode) - logger.WithField("response", response).WithError(err).Info("allocation response is being sent") - - return response, err + return resp, nil } resultObj, err := h.allocationCallback(ctx, gsa) diff --git a/cmd/processor/handler.go b/cmd/processor/handler.go index 4f630af737..19c00c96b4 100644 --- a/cmd/processor/handler.go +++ b/cmd/processor/handler.go @@ -29,8 +29,6 @@ import ( "agones.dev/agones/pkg/gameserverallocations" "agones.dev/agones/pkg/gameservers" - - k8sruntime "k8s.io/apimachinery/pkg/runtime" "github.com/heptiolabs/healthcheck" "github.com/sirupsen/logrus" "go.opencensus.io/plugin/ocgrpc" @@ -40,6 +38,7 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" ) @@ -163,7 +162,7 @@ func (h *processorHandler) StreamBatches(stream allocationpb.Processor_StreamBat } // Submit batch for processing - response := h.submitBatch(batchID, requestWrappers) + response := h.submitBatch(stream.Context(), batchID, requestWrappers) respMsg := &allocationpb.ProcessorMessage{ ClientId: clientID, @@ -172,7 +171,6 @@ func (h *processorHandler) StreamBatches(stream allocationpb.Processor_StreamBat }, } - // TODO: we might want to retry on failure here ? if err := stream.Send(respMsg); err != nil { logger.WithFields(logrus.Fields{ "clientID": clientID, @@ -220,7 +218,7 @@ func (h *processorHandler) StartPullRequestTicker() { } // processAllocationsConcurrently processes multiple allocation requests in parallel -func (h *processorHandler) processAllocationsConcurrently(requestWrappers []*allocationpb.RequestWrapper) []allocationResult { +func (h *processorHandler) processAllocationsConcurrently(ctx context.Context, requestWrappers []*allocationpb.RequestWrapper) []allocationResult { var wg sync.WaitGroup results := make([]allocationResult, len(requestWrappers)) @@ -228,7 +226,7 @@ func (h *processorHandler) processAllocationsConcurrently(requestWrappers []*all wg.Add(1) go func(index int, requestWrapper *allocationpb.RequestWrapper) { defer wg.Done() - results[index] = h.processAllocation(requestWrapper.Request) + results[index] = h.processAllocation(ctx, requestWrapper.Request) }(i, reqWrapper) } @@ -238,7 +236,7 @@ func (h *processorHandler) processAllocationsConcurrently(requestWrappers []*all } // processAllocation handles a single allocation request by using the allocator -func (h *processorHandler) processAllocation(req *allocationpb.AllocationRequest) allocationResult { +func (h *processorHandler) processAllocation(ctx context.Context, req *allocationpb.AllocationRequest) allocationResult { gsa := converters.ConvertAllocationRequestToGSA(req) gsa.ApplyDefaults() @@ -254,7 +252,7 @@ func (h *processorHandler) processAllocation(req *allocationpb.AllocationRequest } } - resultObj, err := h.allocator.Allocate(h.ctx, gsa) + resultObj, err := h.allocator.Allocate(ctx, gsa) if err != nil { return makeError(err, h.grpcUnallocatedStatusCode) } @@ -287,8 +285,8 @@ func (h *processorHandler) processAllocation(req *allocationpb.AllocationRequest } // submitBatch accepts a batch of allocation requests, processes them, and assembles a batch response -func (h *processorHandler) submitBatch(batchID string, requestWrappers []*allocationpb.RequestWrapper) *allocationpb.BatchResponse { - results := h.processAllocationsConcurrently(requestWrappers) +func (h *processorHandler) submitBatch(ctx context.Context, batchID string, requestWrappers []*allocationpb.RequestWrapper) *allocationpb.BatchResponse { + results := h.processAllocationsConcurrently(ctx, requestWrappers) responseWrappers := make([]*allocationpb.ResponseWrapper, len(requestWrappers)) for i, result := range results { diff --git a/cmd/processor/handler_test.go b/cmd/processor/handler_test.go new file mode 100644 index 0000000000..e25a6ab8a8 --- /dev/null +++ b/cmd/processor/handler_test.go @@ -0,0 +1,751 @@ +// Copyright 2026 Google LLC All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "testing" + "time" + + allocationpb "agones.dev/agones/pkg/allocation/go" + agonesv1 "agones.dev/agones/pkg/apis/agones/v1" + allocationv1 "agones.dev/agones/pkg/apis/allocation/v1" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" +) + +// newTestHandler creates a processorHandler with a mock allocator +func newTestHandler(ctx context.Context, allocFunc func(context.Context, *allocationv1.GameServerAllocation) (k8sruntime.Object, error)) *processorHandler { + handlerCtx, cancel := context.WithCancel(ctx) + return &processorHandler{ + allocator: &mockAllocator{allocateFunc: allocFunc}, + ctx: handlerCtx, + cancel: cancel, + clients: make(map[string]allocationpb.Processor_StreamBatchesServer), + grpcUnallocatedStatusCode: codes.ResourceExhausted, + pullInterval: 100 * time.Millisecond, + } +} + +func TestAddClient(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + clientIDs []string + wantLen int + }{ + { + name: "single client", + clientIDs: []string{"client-1"}, + wantLen: 1, + }, + { + name: "multiple clients", + clientIDs: []string{"client-1", "client-2", "client-3"}, + wantLen: 3, + }, + { + name: "duplicate client overwrites", + clientIDs: []string{"client-1", "client-1"}, + wantLen: 1, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + h := &processorHandler{} + h.clients = make(map[string]allocationpb.Processor_StreamBatchesServer) + stream := newMockServerStream(context.Background()) + + for _, id := range tc.clientIDs { + h.addClient(id, stream) + } + + h.mu.RLock() + defer h.mu.RUnlock() + assert.Len(t, h.clients, tc.wantLen) + }) + } +} + +func TestRemoveClient(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + initialIDs []string + removeID string + wantLen int + }{ + { + name: "remove existing client", + initialIDs: []string{"client-1", "client-2"}, + removeID: "client-1", + wantLen: 1, + }, + { + name: "remove non-existing client is a no-op", + initialIDs: []string{"client-1"}, + removeID: "client-unknown", + wantLen: 1, + }, + { + name: "remove last client leaves empty map", + initialIDs: []string{"client-1"}, + removeID: "client-1", + wantLen: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + h := &processorHandler{ + clients: make(map[string]allocationpb.Processor_StreamBatchesServer), + } + stream := newMockServerStream(context.Background()) + for _, id := range tc.initialIDs { + h.addClient(id, stream) + } + + h.removeClient(tc.removeID) + + h.mu.RLock() + defer h.mu.RUnlock() + assert.Len(t, h.clients, tc.wantLen) + }) + } +} + +func TestProcessAllocation(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + allocateFunc func(ctx context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) + wantResponse bool + wantErrorCode codes.Code + wantErrorMsg string + }{ + { + name: "successful allocation", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default"}, + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-1", + Address: "192.168.1.1", + NodeName: "node-1", + Ports: []agonesv1.GameServerStatusPort{ + {Name: "game", Port: 7777}, + }, + }, + }, nil + }, + wantResponse: true, + }, + { + name: "allocator returns error", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return nil, errors.New("allocation failed") + }, + wantErrorCode: codes.ResourceExhausted, + wantErrorMsg: "allocation failed", + }, + { + name: "allocator returns grpc status error", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return nil, status.Error(codes.NotFound, "no game servers available") + }, + wantErrorCode: codes.NotFound, + wantErrorMsg: "no game servers available", + }, + { + name: "allocator returns metav1.Status (e.g. validation failure)", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &metav1.Status{ + Status: metav1.StatusFailure, + Message: "GameServerAllocation is invalid", + Code: http.StatusUnprocessableEntity, + }, nil + }, + wantErrorCode: codes.InvalidArgument, + wantErrorMsg: "GameServerAllocation is invalid", + }, + { + name: "allocator returns unexpected type", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &agonesv1.GameServer{}, nil + }, + wantErrorCode: codes.Internal, + wantErrorMsg: "internal server error - Bad GSA format", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + h := newTestHandler(ctx, tc.allocateFunc) + + req := &allocationpb.AllocationRequest{Namespace: "default"} + result := h.processAllocation(ctx, req) + + if tc.wantResponse { + assert.NotNil(t, result.response) + assert.Nil(t, result.error) + } else { + assert.Nil(t, result.response) + require.NotNil(t, result.error) + assert.Equal(t, int32(tc.wantErrorCode), result.error.Code) + assert.Contains(t, result.error.Message, tc.wantErrorMsg) + } + }) + } +} + +func TestProcessAllocationsConcurrently(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + allocateFunc func(context.Context, *allocationv1.GameServerAllocation) (k8sruntime.Object, error) + requestCount int + wantErrors []bool + }{ + { + name: "all requests succeed", + allocateFunc: func(_ context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: gsa.Namespace}, + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-concurrent", + }, + }, nil + }, + requestCount: 5, + wantErrors: []bool{false, false, false, false, false}, + }, + { + name: "all requests fail", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return nil, errors.New("no capacity") + }, + requestCount: 3, + wantErrors: []bool{true, true, true}, + }, + { + name: "empty request list", + allocateFunc: nil, + requestCount: 0, + wantErrors: []bool{}, + }, + { + name: "single request succeeds", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-single", + }, + }, nil + }, + requestCount: 1, + wantErrors: []bool{false}, + }, + { + name: "mixed results based on namespace", + allocateFunc: func(_ context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + if gsa.Namespace == "fail" { + return nil, errors.New("allocation failed") + } + return &allocationv1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: gsa.Namespace}, + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-ok", + }, + }, nil + }, + requestCount: -1, + wantErrors: []bool{false, true, false}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + allocFunc := tc.allocateFunc + if allocFunc == nil { + allocFunc = func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return nil, nil + } + } + h := newTestHandler(ctx, allocFunc) + + var requestWrappers []*allocationpb.RequestWrapper + if tc.requestCount == -1 { + requestWrappers = []*allocationpb.RequestWrapper{ + {RequestId: "req-0", Request: &allocationpb.AllocationRequest{Namespace: "default"}}, + {RequestId: "req-1", Request: &allocationpb.AllocationRequest{Namespace: "fail"}}, + {RequestId: "req-2", Request: &allocationpb.AllocationRequest{Namespace: "other"}}, + } + } else { + requestWrappers = make([]*allocationpb.RequestWrapper, tc.requestCount) + for i := range requestWrappers { + requestWrappers[i] = &allocationpb.RequestWrapper{ + RequestId: fmt.Sprintf("req-%d", i), + Request: &allocationpb.AllocationRequest{Namespace: "default"}, + } + } + } + + results := h.processAllocationsConcurrently(ctx, requestWrappers) + + require.Len(t, results, len(tc.wantErrors)) + for i, result := range results { + if tc.wantErrors[i] { + assert.NotNil(t, result.error, "result %d should have an error", i) + assert.Nil(t, result.response, "result %d should not have a response", i) + } else { + assert.Nil(t, result.error, "result %d should not have an error", i) + } + } + }) + } +} + +func TestSubmitBatch(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + allocateFunc func(ctx context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) + requestCount int + wantBatchID string + wantErrors []bool + }{ + { + name: "all requests succeed", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-1", + }, + }, nil + }, + requestCount: 3, + wantBatchID: "batch-1", + wantErrors: []bool{false, false, false}, + }, + { + name: "all requests fail", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return nil, errors.New("no capacity") + }, + requestCount: 2, + wantBatchID: "batch-err", + wantErrors: []bool{true, true}, + }, + { + name: "mixed success and failure based on namespace", + allocateFunc: func(_ context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + if gsa.Namespace == "fail" { + return nil, errors.New("no capacity") + } + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-mixed", + }, + }, nil + }, + requestCount: -1, + wantBatchID: "batch-mixed", + wantErrors: []bool{false, true, false}, + }, + { + name: "empty batch", + allocateFunc: nil, + requestCount: 0, + wantBatchID: "batch-empty", + wantErrors: []bool{}, + }, + { + name: "single request succeeds", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-solo", + }, + }, nil + }, + requestCount: 1, + wantBatchID: "batch-single", + wantErrors: []bool{false}, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + allocFunc := tc.allocateFunc + if allocFunc == nil { + allocFunc = func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return nil, nil + } + } + h := newTestHandler(ctx, allocFunc) + + var requestWrappers []*allocationpb.RequestWrapper + if tc.requestCount == -1 { + requestWrappers = []*allocationpb.RequestWrapper{ + {RequestId: "req-0", Request: &allocationpb.AllocationRequest{Namespace: "default"}}, + {RequestId: "req-1", Request: &allocationpb.AllocationRequest{Namespace: "fail"}}, + {RequestId: "req-2", Request: &allocationpb.AllocationRequest{Namespace: "other"}}, + } + } else { + requestWrappers = make([]*allocationpb.RequestWrapper, tc.requestCount) + for i := range requestWrappers { + requestWrappers[i] = &allocationpb.RequestWrapper{ + RequestId: fmt.Sprintf("req-%d", i), + Request: &allocationpb.AllocationRequest{Namespace: "default"}, + } + } + } + + resp := h.submitBatch(ctx, tc.wantBatchID, requestWrappers) + + assert.Equal(t, tc.wantBatchID, resp.BatchId) + require.Len(t, resp.Responses, len(tc.wantErrors)) + + for i, wrapper := range resp.Responses { + assert.Equal(t, fmt.Sprintf("req-%d", i), wrapper.RequestId) + if tc.wantErrors[i] { + assert.NotNil(t, wrapper.GetError(), "request %d should have error", i) + assert.Nil(t, wrapper.GetResponse(), "request %d should not have response", i) + } else { + assert.Nil(t, wrapper.GetError(), "request %d should not have error", i) + } + } + }) + } +} + +func TestStreamBatches(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + messages []*allocationpb.ProcessorMessage + allocFunc func(context.Context, *allocationv1.GameServerAllocation) (k8sruntime.Object, error) + wantErr bool + wantClients int + validate func(t *testing.T, sent []*allocationpb.ProcessorMessage) + }{ + { + name: "recv error on first message closes stream", + wantErr: true, + wantClients: 0, + }, + { + name: "empty clientID closes stream without error", + messages: []*allocationpb.ProcessorMessage{ + {ClientId: ""}, + }, + wantErr: false, + wantClients: 0, + }, + { + name: "registers client then handles batch request", + messages: []*allocationpb.ProcessorMessage{ + {ClientId: "client-1"}, + { + ClientId: "client-1", + Payload: &allocationpb.ProcessorMessage_BatchRequest{ + BatchRequest: &allocationpb.BatchRequest{ + BatchId: "b-1", + Requests: []*allocationpb.RequestWrapper{ + { + RequestId: "r-1", + Request: &allocationpb.AllocationRequest{Namespace: "default"}, + }, + }, + }, + }, + }, + }, + allocFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-1", + }, + }, nil + }, + wantErr: true, + wantClients: 0, + validate: func(t *testing.T, sent []*allocationpb.ProcessorMessage) { + require.Len(t, sent, 1) + batchResp := sent[0].GetBatchResponse() + require.NotNil(t, batchResp) + assert.Equal(t, "b-1", batchResp.BatchId) + require.Len(t, batchResp.Responses, 1) + assert.Equal(t, "r-1", batchResp.Responses[0].RequestId) + assert.NotNil(t, batchResp.Responses[0].GetResponse()) + }, + }, + { + name: "nil payload is skipped", + messages: []*allocationpb.ProcessorMessage{ + {ClientId: "client-1"}, + {ClientId: "client-1", Payload: nil}, + }, + wantErr: true, + wantClients: 0, + }, + { + name: "non-batch payload is skipped", + messages: []*allocationpb.ProcessorMessage{ + {ClientId: "client-1"}, + { + ClientId: "client-1", + Payload: &allocationpb.ProcessorMessage_Pull{Pull: &allocationpb.PullRequest{Message: "pull"}}, + }, + }, + wantErr: true, + wantClients: 0, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + allocFunc := tc.allocFunc + if allocFunc == nil { + allocFunc = func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationAllocated, + GameServerName: "gs-default", + }, + }, nil + } + } + h := newTestHandler(ctx, allocFunc) + + stream := newMockServerStream(ctx) + + go func() { + for _, msg := range tc.messages { + stream.recvChan <- msg + } + close(stream.recvChan) + }() + + err := h.StreamBatches(stream) + + if tc.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + h.mu.RLock() + assert.Len(t, h.clients, tc.wantClients) + h.mu.RUnlock() + + if tc.validate != nil { + var sent []*allocationpb.ProcessorMessage + for { + select { + case msg := <-stream.sendChan: + sent = append(sent, msg) + default: + tc.validate(t, sent) + return + } + } + } + }) + } +} + +func TestStartPullRequestTicker(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + clientCount int + wantPulls bool + }{ + { + name: "sends pull to connected clients", + clientCount: 2, + wantPulls: true, + }, + { + name: "no clients means no pulls sent", + clientCount: 0, + wantPulls: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := newTestHandler(ctx, nil) + + streams := make([]*mockServerStream, tc.clientCount) + for i := 0; i < tc.clientCount; i++ { + streams[i] = newMockServerStream(ctx) + h.addClient(fmt.Sprintf("client-%d", i), streams[i]) + } + + h.StartPullRequestTicker() + + if tc.wantPulls { + for i, stream := range streams { + assert.Eventually(t, func() bool { + select { + case msg := <-stream.sendChan: + return msg.GetPull() != nil + default: + return false + } + }, 5*time.Second, 10*time.Millisecond, "client-%d should receive a pull", i) + } + } else { + assert.Never(t, func() bool { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) > 0 + }, 200*time.Millisecond, 20*time.Millisecond, "no clients should appear") + } + }) + } +} + +func TestStartPullRequestTickerRemovesFailingClient(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + h := newTestHandler(ctx, nil) + h.pullInterval = 20 * time.Millisecond + + failStream := &failingSendStream{ + mockServerStream: *newMockServerStream(ctx), + } + h.addClient("failing-client", failStream) + + h.StartPullRequestTicker() + + assert.Eventually(t, func() bool { + h.mu.RLock() + defer h.mu.RUnlock() + return len(h.clients) == 0 + }, 5*time.Second, 20*time.Millisecond, "failing client should be removed") +} + +type mockAllocator struct { + allocateFunc func(ctx context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) +} + +func (m *mockAllocator) Allocate(ctx context.Context, gsa *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return m.allocateFunc(ctx, gsa) +} + +type mockServerStream struct { + recvChan chan *allocationpb.ProcessorMessage + sendChan chan *allocationpb.ProcessorMessage + ctx context.Context +} + +func newMockServerStream(ctx context.Context) *mockServerStream { + return &mockServerStream{ + recvChan: make(chan *allocationpb.ProcessorMessage, 10), + sendChan: make(chan *allocationpb.ProcessorMessage, 10), + ctx: ctx, + } +} + +func (m *mockServerStream) Send(msg *allocationpb.ProcessorMessage) error { + select { + case m.sendChan <- msg: + return nil + case <-m.ctx.Done(): + return m.ctx.Err() + } +} + +func (m *mockServerStream) Recv() (*allocationpb.ProcessorMessage, error) { + select { + case msg, ok := <-m.recvChan: + if !ok { + return nil, io.EOF + } + return msg, nil + case <-m.ctx.Done(): + return nil, m.ctx.Err() + } +} + +func (m *mockServerStream) SetHeader(metadata.MD) error { return nil } +func (m *mockServerStream) SendHeader(metadata.MD) error { return nil } +func (m *mockServerStream) SetTrailer(metadata.MD) {} +func (m *mockServerStream) Context() context.Context { return m.ctx } +func (m *mockServerStream) SendMsg(interface{}) error { return nil } +func (m *mockServerStream) RecvMsg(interface{}) error { return nil } + +type failingSendStream struct { + mockServerStream +} + +func (f *failingSendStream) Send(_ *allocationpb.ProcessorMessage) error { + return errors.New("send failed") +} diff --git a/cmd/processor/main.go b/cmd/processor/main.go index bd7de4646c..ac4fe95d11 100644 --- a/cmd/processor/main.go +++ b/cmd/processor/main.go @@ -236,7 +236,7 @@ func main() { cancelCtx() }) - whenLeader(ctx, cancelCtx, logger, conf, kubeClient, func(_ context.Context) { + whenLeader(ctx, cancelCtx, logger, conf, kubeClient, grpcHealth, func(_ context.Context) { logger.Info("Starting processor work as leader") grpcHealth.SetServingStatus("processor", grpc_health_v1.HealthCheckResponse_SERVING) processorService.StartPullRequestTicker() @@ -246,7 +246,7 @@ func main() { } func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.Entry, - conf processorConfig, kubeClient *kubernetes.Clientset, start func(_ context.Context)) { + conf processorConfig, kubeClient *kubernetes.Clientset, grpcHealth *grpchealth.Server, start func(_ context.Context)) { logger.WithField("leaderElectionEnabled", conf.LeaderElection).Info("Leader election configuration") if !conf.LeaderElection { @@ -286,6 +286,7 @@ func whenLeader(ctx context.Context, cancel context.CancelFunc, logger *logrus.E OnStartedLeading: start, OnStoppedLeading: func() { logger.WithField("id", id).Info("Leader Lost") + grpcHealth.SetServingStatus("processor", grpc_health_v1.HealthCheckResponse_NOT_SERVING) cancel() os.Exit(0) }, diff --git a/pkg/gameserverallocations/controller.go b/pkg/gameserverallocations/controller.go index 83c662e4a5..3308aaa112 100644 --- a/pkg/gameserverallocations/controller.go +++ b/pkg/gameserverallocations/controller.go @@ -28,6 +28,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -165,6 +166,12 @@ func (c *Extensions) processAllocationRequest(ctx context.Context, w http.Respon } if runtime.FeatureEnabled(runtime.FeatureProcessorAllocator) { + if errs := gsa.Validate(); len(errs) > 0 { + kind := allocationv1.SchemeGroupVersion.WithKind("GameServerAllocation").GroupKind() + statusErr := k8serrors.NewInvalid(kind, gsa.Name, errs) + return c.serialisation(r, w, &statusErr.ErrStatus, http.StatusUnprocessableEntity, scheme.Codecs) + } + req := converters.ConvertGSAToAllocationRequest(gsa) resp, err := c.processorClient.Allocate(ctx, req) if err != nil { @@ -266,7 +273,7 @@ func (c *Extensions) convertProcessorError(err error, gsa *allocationv1.GameServ return &metav1.Status{ TypeMeta: metav1.TypeMeta{Kind: "Status", APIVersion: "v1"}, Status: metav1.StatusFailure, - Message: err.Error(), + Message: st.Message(), Code: int32(code), }, code } @@ -286,6 +293,7 @@ func (c *Extensions) convertProcessorResponse(resp *pb.AllocationResponse, origi resultGSA.Spec = originalGSA.Spec resultGSA.ObjectMeta.Namespace = originalGSA.ObjectMeta.Namespace resultGSA.ObjectMeta.Name = resp.GameServerName + resultGSA.ObjectMeta.CreationTimestamp = originalGSA.ObjectMeta.CreationTimestamp return resultGSA } diff --git a/pkg/processor/client.go b/pkg/processor/client.go index 015e621a93..0e9673fc24 100644 --- a/pkg/processor/client.go +++ b/pkg/processor/client.go @@ -32,6 +32,9 @@ import ( allocationpb "agones.dev/agones/pkg/allocation/go" ) +// maxSendRetries is the maximum number of times a request can be re-queued after send failures +const maxSendRetries = 3 + // Config holds the processor client configuration type Config struct { // ClientID is a unique identifier for this processor client instance @@ -83,6 +86,9 @@ type pendingRequest struct { // id is the unique identifier for this request id string + + // sendRetries tracks how many times this request has been re-queued after a send failure + sendRetries int } // Client interface for allocation operations @@ -117,6 +123,8 @@ func NewClient(config Config, logger logrus.FieldLogger) Client { func (p *client) Run(ctx context.Context) error { p.logger.Info("Starting processor client") + defer p.drainPendingRequests() + // Main connection loop with retry for { select { @@ -260,7 +268,7 @@ func (p *client) pullRequestHandler(ctx context.Context, stream allocationpb.Pro // handlePullRequest responds to pull requests by sending the current batch of allocation requests // It swaps out the hot batch, resets it for new requests, and sends the ready batch to the processor func (p *client) handlePullRequest(stream allocationpb.Processor_StreamBatchesClient) { - // Swap out the hot batch and pending requests + // Swap out the hot batch and filter stale requests p.batchMutex.Lock() readyBatch := p.hotBatch readyRequests := p.pendingRequests @@ -270,8 +278,23 @@ func (p *client) handlePullRequest(stream allocationpb.Processor_StreamBatchesCl Requests: make([]*allocationpb.RequestWrapper, 0, p.config.MaxBatchSize), } p.pendingRequests = make([]*pendingRequest, 0, p.config.MaxBatchSize) + + // Filter out requests whose callers have already timed out or cancelled + filteredRequests := make([]*pendingRequest, 0, len(readyRequests)) + filteredWrappers := make([]*allocationpb.RequestWrapper, 0, len(readyBatch.Requests)) + for i, req := range readyRequests { + if _, exists := p.requestIDMapping[req.id]; exists { + filteredRequests = append(filteredRequests, req) + filteredWrappers = append(filteredWrappers, readyBatch.Requests[i]) + } else { + p.logger.WithField("requestID", req.id).Debug("Dropping stale request from batch") + } + } p.batchMutex.Unlock() + readyBatch.Requests = filteredWrappers + readyRequests = filteredRequests + if len(readyRequests) == 0 { p.logger.Debug("No requests to send in batch") return @@ -297,9 +320,19 @@ func (p *client) sendBatch(stream allocationpb.Processor_StreamBatchesClient, ba if err := stream.Send(batchMsg); err != nil { p.logger.WithError(err).Error("Failed to send batch") - // Re-add the request to the hot batch and pendingRequests for the next pull + // Re-add retryable requests to the hot batch p.batchMutex.Lock() for _, req := range requests { + req.sendRetries++ + if req.sendRetries > maxSendRetries { + p.logger.WithField("requestID", req.id).Warn("Request exceeded max send retries, failing") + delete(p.requestIDMapping, req.id) + select { + case req.error <- status.Errorf(codes.Unavailable, "failed to send after %d retries", maxSendRetries): + default: + } + continue + } p.hotBatch.Requests = append(p.hotBatch.Requests, &allocationpb.RequestWrapper{ RequestId: req.id, Request: req.request, @@ -528,3 +561,23 @@ func (p *client) registerClient(stream allocationpb.Processor_StreamBatchesClien return nil } + +// drainPendingRequests clears all pending requests and sends an error to their channels during shutdown +func (p *client) drainPendingRequests() { + p.batchMutex.Lock() + defer p.batchMutex.Unlock() + + drainErr := status.Errorf(codes.Unavailable, "processor client shutting down") + for id, req := range p.requestIDMapping { + select { + case req.error <- drainErr: + default: + } + delete(p.requestIDMapping, id) + } + + p.hotBatch.Requests = p.hotBatch.Requests[:0] + p.pendingRequests = p.pendingRequests[:0] + + p.logger.Info("Drained all pending requests on shutdown") +} From b334e280940ef02e5e66efebaeb3bb5c287b76bf Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 29 Mar 2026 06:09:14 -0400 Subject: [PATCH 4/8] feat: fix unit tests Signed-off-by: Thomas Lacroix --- cmd/allocator/main.go | 2 +- cmd/processor/handler.go | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cmd/allocator/main.go b/cmd/allocator/main.go index 7bed08a718..a2b680b0ef 100644 --- a/cmd/allocator/main.go +++ b/cmd/allocator/main.go @@ -744,7 +744,7 @@ func (h *serviceHandler) Allocate(ctx context.Context, in *pb.AllocationRequest) kind := allocationv1.SchemeGroupVersion.WithKind("GameServerAllocation").GroupKind() statusErr := k8serrors.NewInvalid(kind, gsa.Name, errs) s := &statusErr.ErrStatus - return nil, status.Errorf(codes.Code(s.Code), s.Message) + return nil, status.Error(codes.Code(s.Code), s.Message) } resp, err := h.processorClient.Allocate(ctx, in) diff --git a/cmd/processor/handler.go b/cmd/processor/handler.go index 19c00c96b4..a1f815ca67 100644 --- a/cmd/processor/handler.go +++ b/cmd/processor/handler.go @@ -18,6 +18,7 @@ package main import ( "context" "fmt" + "io" "sync" "time" @@ -89,6 +90,7 @@ func newServiceHandler(ctx context.Context, kubeClient kubernetes.Interface, ago allocator: allocator, ctx: batchCtx, cancel: cancel, + clients: make(map[string]allocationpb.Processor_StreamBatchesServer), grpcUnallocatedStatusCode: grpcUnallocatedStatusCode, pullInterval: config.PullInterval, } @@ -118,7 +120,7 @@ func (h *processorHandler) StreamBatches(stream allocationpb.Processor_StreamBat clientID = msg.GetClientId() if clientID == "" { logger.Warn("Received empty clientID, closing stream") - return nil + return status.Error(codes.InvalidArgument, "clientID is required") } h.addClient(clientID, stream) @@ -129,7 +131,11 @@ func (h *processorHandler) StreamBatches(stream allocationpb.Processor_StreamBat for { msg, err := stream.Recv() if err != nil { - logger.WithError(err).Debug("Stream receive error") + if err == io.EOF { + logger.WithField("clientID", clientID).Debug("Stream closed by client") + } else { + logger.WithField("clientID", clientID).WithError(err).Warn("Stream receive error") + } return err } @@ -335,11 +341,6 @@ func (h *processorHandler) getGRPCServerOptions() []grpc.ServerOption { func (h *processorHandler) addClient(clientID string, stream allocationpb.Processor_StreamBatchesServer) { h.mu.Lock() defer h.mu.Unlock() - - if h.clients == nil { - h.clients = make(map[string]allocationpb.Processor_StreamBatchesServer) - } - h.clients[clientID] = stream } From 84a65842c7bb67f034f27a73b0aeaec0ad5fff89 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 29 Mar 2026 06:53:58 -0400 Subject: [PATCH 5/8] feat: fix unit tests Signed-off-by: Thomas Lacroix --- cmd/processor/handler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/processor/handler_test.go b/cmd/processor/handler_test.go index e25a6ab8a8..9e995af2b4 100644 --- a/cmd/processor/handler_test.go +++ b/cmd/processor/handler_test.go @@ -484,11 +484,11 @@ func TestStreamBatches(t *testing.T) { wantClients: 0, }, { - name: "empty clientID closes stream without error", + name: "empty clientID closes stream with InvalidArgument error", messages: []*allocationpb.ProcessorMessage{ {ClientId: ""}, }, - wantErr: false, + wantErr: true, wantClients: 0, }, { From a3fd32b0eac3c1318e0227f727de9dff8c5b0dcd Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 29 Mar 2026 08:57:03 -0400 Subject: [PATCH 6/8] feat: fix allocation validation error Signed-off-by: Thomas Lacroix --- pkg/gameserverallocations/controller.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/gameserverallocations/controller.go b/pkg/gameserverallocations/controller.go index 3308aaa112..cce5123af4 100644 --- a/pkg/gameserverallocations/controller.go +++ b/pkg/gameserverallocations/controller.go @@ -169,7 +169,11 @@ func (c *Extensions) processAllocationRequest(ctx context.Context, w http.Respon if errs := gsa.Validate(); len(errs) > 0 { kind := allocationv1.SchemeGroupVersion.WithKind("GameServerAllocation").GroupKind() statusErr := k8serrors.NewInvalid(kind, gsa.Name, errs) - return c.serialisation(r, w, &statusErr.ErrStatus, http.StatusUnprocessableEntity, scheme.Codecs) + s := &statusErr.ErrStatus + if gvks, _, err := apiserver.Scheme.ObjectKinds(s); err == nil { + s.TypeMeta = metav1.TypeMeta{Kind: gvks[0].Kind, APIVersion: gvks[0].Version} + } + return c.serialisation(r, w, s, http.StatusUnprocessableEntity, scheme.Codecs) } req := converters.ConvertGSAToAllocationRequest(gsa) From 32fcf61f5cb6f5bf9a542bd586a59238e5ca8361 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 29 Mar 2026 11:07:28 -0400 Subject: [PATCH 7/8] feat: fix last error handling Signed-off-by: Thomas Lacroix --- cmd/processor/handler.go | 5 +++-- cmd/processor/handler_test.go | 26 ++++++++++++++++++++++++- pkg/gameserverallocations/controller.go | 7 +++---- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/cmd/processor/handler.go b/cmd/processor/handler.go index a1f815ca67..9fe3a5912f 100644 --- a/cmd/processor/handler.go +++ b/cmd/processor/handler.go @@ -260,7 +260,7 @@ func (h *processorHandler) processAllocation(ctx context.Context, req *allocatio resultObj, err := h.allocator.Allocate(ctx, gsa) if err != nil { - return makeError(err, h.grpcUnallocatedStatusCode) + return makeError(err, codes.Internal) } if s, ok := resultObj.(*metav1.Status); ok { @@ -284,7 +284,7 @@ func (h *processorHandler) processAllocation(ctx context.Context, req *allocatio response, err := converters.ConvertGSAToAllocationResponse(allocatedGsa, h.grpcUnallocatedStatusCode) if err != nil { - return makeError(err, h.grpcUnallocatedStatusCode) + return makeError(err, codes.Internal) } return allocationResult{response: response} @@ -341,6 +341,7 @@ func (h *processorHandler) getGRPCServerOptions() []grpc.ServerOption { func (h *processorHandler) addClient(clientID string, stream allocationpb.Processor_StreamBatchesServer) { h.mu.Lock() defer h.mu.Unlock() + h.clients[clientID] = stream } diff --git a/cmd/processor/handler_test.go b/cmd/processor/handler_test.go index 9e995af2b4..6e07bd6d9c 100644 --- a/cmd/processor/handler_test.go +++ b/cmd/processor/handler_test.go @@ -171,12 +171,36 @@ func TestProcessAllocation(t *testing.T) { }, wantResponse: true, }, + { + name: "unallocated state returns ResourceExhausted", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationUnAllocated, + }, + }, nil + }, + wantErrorCode: codes.ResourceExhausted, + wantErrorMsg: "there is no available GameServer to allocate", + }, + { + name: "contention state returns Aborted", + allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { + return &allocationv1.GameServerAllocation{ + Status: allocationv1.GameServerAllocationStatus{ + State: allocationv1.GameServerAllocationContention, + }, + }, nil + }, + wantErrorCode: codes.Aborted, + wantErrorMsg: "too many concurrent requests have overwhelmed the system", + }, { name: "allocator returns error", allocateFunc: func(_ context.Context, _ *allocationv1.GameServerAllocation) (k8sruntime.Object, error) { return nil, errors.New("allocation failed") }, - wantErrorCode: codes.ResourceExhausted, + wantErrorCode: codes.Internal, wantErrorMsg: "allocation failed", }, { diff --git a/pkg/gameserverallocations/controller.go b/pkg/gameserverallocations/controller.go index cce5123af4..9ed4f472da 100644 --- a/pkg/gameserverallocations/controller.go +++ b/pkg/gameserverallocations/controller.go @@ -293,11 +293,10 @@ func (c *Extensions) convertProcessorError(err error, gsa *allocationv1.GameServ // convertProcessorResponse handles successful processor responses func (c *Extensions) convertProcessorResponse(resp *pb.AllocationResponse, originalGSA *allocationv1.GameServerAllocation) k8sruntime.Object { - resultGSA := converters.ConvertAllocationResponseToGSA(resp, resp.Source) - resultGSA.Spec = originalGSA.Spec - resultGSA.ObjectMeta.Namespace = originalGSA.ObjectMeta.Namespace + resultGSA := originalGSA.DeepCopy() + converted := converters.ConvertAllocationResponseToGSA(resp, resp.Source) + resultGSA.Status = converted.Status resultGSA.ObjectMeta.Name = resp.GameServerName - resultGSA.ObjectMeta.CreationTimestamp = originalGSA.ObjectMeta.CreationTimestamp return resultGSA } From 0e0739938fb49d5fa88c9e3c891de106eb9fb714 Mon Sep 17 00:00:00 2001 From: Thomas Lacroix Date: Sun, 29 Mar 2026 11:11:03 -0400 Subject: [PATCH 8/8] feat: rollback to dev feature gate Signed-off-by: Thomas Lacroix --- build/Makefile | 2 +- cloudbuild.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/build/Makefile b/build/Makefile index 0860bfec90..90d1e15eb0 100644 --- a/build/Makefile +++ b/build/Makefile @@ -73,7 +73,7 @@ BETA_FEATURE_GATES ?= "CountsAndLists=true&GKEAutopilotExtendedDurationPods=true # Enable all alpha feature gates. Keep in sync with `false` (alpha) entries in pkg/util/runtime/features.go:featureDefaults -ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&PlayerTracking=true&WasmAutoscaler=true&Example=true&ProcessorAllocator=true" +ALPHA_FEATURE_GATES ?= "PlayerAllocationFilter=true&PlayerTracking=true&WasmAutoscaler=true&Example=true" # Build with Windows support WITH_WINDOWS=1 diff --git a/cloudbuild.yaml b/cloudbuild.yaml index 0e532d1034..bbba6011cd 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -322,7 +322,7 @@ steps: # Keep in sync with the inverse of 'alpha' and 'beta' features in # pkg/util/runtime/features.go:featureDefaults - featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=false&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=false&WasmAutoscaler=true&Example=true&ProcessorAllocator=true" + featureWithGate="PlayerAllocationFilter=true&FleetAutoscaleRequestMetaData=false&PlayerTracking=true&CountsAndLists=false&RollingUpdateFix=false&PortRanges=false&PortPolicyNone=false&ScheduledAutoscaler=false&GKEAutopilotExtendedDurationPods=false&SidecarContainers=false&WasmAutoscaler=true&Example=true" featureWithoutGate="" # Use this if specific feature gates can only be supported on specific Kubernetes versions.