From 07def54c56f6516503baa4303216e3e90ffd740e Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Wed, 4 Feb 2026 11:48:30 -0800 Subject: [PATCH 1/3] Fix WorkerInstanceKey not forwarded between task queue partitions When workflow or activity poll requests are forwarded from one partition to another, the WorkerInstanceKey field was not being copied to the forwarded request. This caused the worker's instance key to be lost. Changes: - Add workerInstanceKey field to pollMetadata struct - Populate workerInstanceKey from request in PollWorkflowTaskQueue - Populate workerInstanceKey from request in PollActivityTaskQueue - Forward workerInstanceKey in Forwarder.ForwardPoll for both task types - Forward workerInstanceKey in ForwardPollWithTarget (pri_forwarder) - Add tests to verify WorkerInstanceKey is preserved during forwarding --- service/matching/forwarder.go | 3 + service/matching/forwarder_test.go | 87 +++++++++++++++++++++++++++++ service/matching/matching_engine.go | 4 ++ service/matching/pri_forwarder.go | 3 + 4 files changed, 97 insertions(+) diff --git a/service/matching/forwarder.go b/service/matching/forwarder.go index cfdf18757f..018aeffca1 100644 --- a/service/matching/forwarder.go +++ b/service/matching/forwarder.go @@ -248,6 +248,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada Identity: identity, WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, + WorkerInstanceKey: pollMetadata.workerInstanceKey, }, ForwardedSource: fwdr.partition.RpcName(), Conditions: pollMetadata.conditions, @@ -271,6 +272,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada TaskQueueMetadata: pollMetadata.taskQueueMetadata, WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, + WorkerInstanceKey: pollMetadata.workerInstanceKey, }, ForwardedSource: fwdr.partition.RpcName(), Conditions: pollMetadata.conditions, @@ -293,6 +295,7 @@ func (fwdr *Forwarder) ForwardPoll(ctx context.Context, pollMetadata *pollMetada Identity: identity, WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, + WorkerInstanceKey: pollMetadata.workerInstanceKey, // Namespace is ignored here. }, ForwardedSource: fwdr.partition.RpcName(), diff --git a/service/matching/forwarder_test.go b/service/matching/forwarder_test.go index 8298d23160..32957ad0df 100644 --- a/service/matching/forwarder_test.go +++ b/service/matching/forwarder_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/suite" enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/workflowservice/v1" enumsspb "go.temporal.io/server/api/enums/v1" "go.temporal.io/server/api/matchingservice/v1" "go.temporal.io/server/api/matchingservicemock/v1" @@ -282,6 +283,34 @@ func (t *ForwarderTestSuite) TestForwardPollWorkflowTaskQueue() { t.Nil(task.pollActivityTaskQueueResponse()) } +func (t *ForwarderTestSuite) TestForwardPollWorkflowTaskQueuePreservesWorkerInstanceKey() { + t.usingTaskqueuePartition(enumspb.TASK_QUEUE_TYPE_WORKFLOW) + + pollerID := uuid.NewString() + workerInstanceKey := "test-worker-instance-" + uuid.NewString() + ctx := context.WithValue(context.Background(), pollerIDKey, pollerID) + ctx = context.WithValue(ctx, identityKey, "id1") + resp := &matchingservice.PollWorkflowTaskQueueResponse{ + TaskToken: []byte("token1"), + } + + var request *matchingservice.PollWorkflowTaskQueueRequest + t.client.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(arg0 context.Context, arg1 *matchingservice.PollWorkflowTaskQueueRequest, arg2 ...interface{}) { + request = arg1 + }, + ).Return(resp, nil) + + task, err := t.fwdr.ForwardPoll(ctx, &pollMetadata{ + workerInstanceKey: workerInstanceKey, + }) + t.NoError(err) + t.NotNil(task) + t.NotNil(request) + t.Equal(workerInstanceKey, request.GetPollRequest().GetWorkerInstanceKey(), + "WorkerInstanceKey should be preserved when forwarding workflow poll") +} + func (t *ForwarderTestSuite) TestForwardPollForActivity() { t.usingTaskqueuePartition(enumspb.TASK_QUEUE_TYPE_ACTIVITY) @@ -312,6 +341,64 @@ func (t *ForwarderTestSuite) TestForwardPollForActivity() { t.Nil(task.pollWorkflowTaskQueueResponse()) } +func (t *ForwarderTestSuite) TestForwardPollForActivityPreservesWorkerInstanceKey() { + t.usingTaskqueuePartition(enumspb.TASK_QUEUE_TYPE_ACTIVITY) + + pollerID := uuid.NewString() + workerInstanceKey := "test-worker-instance-" + uuid.NewString() + ctx := context.WithValue(context.Background(), pollerIDKey, pollerID) + ctx = context.WithValue(ctx, identityKey, "id1") + resp := &matchingservice.PollActivityTaskQueueResponse{ + TaskToken: []byte("token1"), + } + + var request *matchingservice.PollActivityTaskQueueRequest + t.client.EXPECT().PollActivityTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(arg0 context.Context, arg1 *matchingservice.PollActivityTaskQueueRequest, arg2 ...interface{}) { + request = arg1 + }, + ).Return(resp, nil) + + task, err := t.fwdr.ForwardPoll(ctx, &pollMetadata{ + workerInstanceKey: workerInstanceKey, + }) + t.NoError(err) + t.NotNil(task) + t.NotNil(request) + t.Equal(workerInstanceKey, request.GetPollRequest().GetWorkerInstanceKey(), + "WorkerInstanceKey should be preserved when forwarding activity poll") +} + +func (t *ForwarderTestSuite) TestForwardPollForNexusPreservesWorkerInstanceKey() { + t.usingTaskqueuePartition(enumspb.TASK_QUEUE_TYPE_NEXUS) + + pollerID := uuid.NewString() + workerInstanceKey := "test-worker-instance-" + uuid.NewString() + ctx := context.WithValue(context.Background(), pollerIDKey, pollerID) + ctx = context.WithValue(ctx, identityKey, "id1") + resp := &matchingservice.PollNexusTaskQueueResponse{ + Response: &workflowservice.PollNexusTaskQueueResponse{ + TaskToken: []byte("token1"), + }, + } + + var request *matchingservice.PollNexusTaskQueueRequest + t.client.EXPECT().PollNexusTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(arg0 context.Context, arg1 *matchingservice.PollNexusTaskQueueRequest, arg2 ...interface{}) { + request = arg1 + }, + ).Return(resp, nil) + + task, err := t.fwdr.ForwardPoll(ctx, &pollMetadata{ + workerInstanceKey: workerInstanceKey, + }) + t.NoError(err) + t.NotNil(task) + t.NotNil(request) + t.Equal(workerInstanceKey, request.GetRequest().GetWorkerInstanceKey(), + "WorkerInstanceKey should be preserved when forwarding Nexus poll") +} + // TODO(pri): old matcher cleanup func (t *ForwarderTestSuite) TestMaxOutstandingConcurrency() { if t.newFwdr { diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index e0f1271bda..1122bbc829 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -94,6 +94,7 @@ type ( conditions *matchingservice.PollConditions forwardedFrom string localPollStartTime time.Time + workerInstanceKey string } userDataUpdate struct { @@ -638,6 +639,7 @@ pollLoop: deploymentOptions: request.DeploymentOptions, forwardedFrom: req.ForwardedSource, conditions: req.Conditions, + workerInstanceKey: request.WorkerInstanceKey, } task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata) if err != nil { @@ -942,6 +944,7 @@ pollLoop: deploymentOptions: request.DeploymentOptions, forwardedFrom: req.ForwardedSource, conditions: req.Conditions, + workerInstanceKey: request.WorkerInstanceKey, } task, versionSetUsed, err := e.pollTask(pollerCtx, partition, pollMetadata) if err != nil { @@ -2495,6 +2498,7 @@ pollLoop: deploymentOptions: request.DeploymentOptions, forwardedFrom: req.ForwardedSource, conditions: req.Conditions, + workerInstanceKey: request.WorkerInstanceKey, } task, _, err := e.pollTask(pollerCtx, partition, pollMetadata) if err != nil { diff --git a/service/matching/pri_forwarder.go b/service/matching/pri_forwarder.go index 0f68bcf245..ca1e1f6f5c 100644 --- a/service/matching/pri_forwarder.go +++ b/service/matching/pri_forwarder.go @@ -218,6 +218,7 @@ func ForwardPollWithTarget( Identity: identity, WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, + WorkerInstanceKey: pollMetadata.workerInstanceKey, }, ForwardedSource: source.RpcName(), Conditions: pollMetadata.conditions, @@ -241,6 +242,7 @@ func ForwardPollWithTarget( TaskQueueMetadata: pollMetadata.taskQueueMetadata, WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, + WorkerInstanceKey: pollMetadata.workerInstanceKey, }, ForwardedSource: source.RpcName(), Conditions: pollMetadata.conditions, @@ -263,6 +265,7 @@ func ForwardPollWithTarget( Identity: identity, WorkerVersionCapabilities: pollMetadata.workerVersionCapabilities, DeploymentOptions: pollMetadata.deploymentOptions, + WorkerInstanceKey: pollMetadata.workerInstanceKey, // Namespace is ignored here. }, ForwardedSource: source.RpcName(), From 38d35ebaa678ac5de2c47b8e0f236f87da999614 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Thu, 5 Feb 2026 19:49:45 -0800 Subject: [PATCH 2/3] Fix lint --- service/matching/forwarder_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/service/matching/forwarder_test.go b/service/matching/forwarder_test.go index 32957ad0df..876b3a0ba2 100644 --- a/service/matching/forwarder_test.go +++ b/service/matching/forwarder_test.go @@ -304,7 +304,7 @@ func (t *ForwarderTestSuite) TestForwardPollWorkflowTaskQueuePreservesWorkerInst task, err := t.fwdr.ForwardPoll(ctx, &pollMetadata{ workerInstanceKey: workerInstanceKey, }) - t.NoError(err) + t.Require().NoError(err) t.NotNil(task) t.NotNil(request) t.Equal(workerInstanceKey, request.GetPollRequest().GetWorkerInstanceKey(), @@ -329,7 +329,7 @@ func (t *ForwarderTestSuite) TestForwardPollForActivity() { ).Return(resp, nil) task, err := t.fwdr.ForwardPoll(ctx, &pollMetadata{}) - t.NoError(err) + t.Require().NoError(err) t.NotNil(task) t.NotNil(request) t.Equal(pollerID, request.GetPollerId()) @@ -392,7 +392,7 @@ func (t *ForwarderTestSuite) TestForwardPollForNexusPreservesWorkerInstanceKey() task, err := t.fwdr.ForwardPoll(ctx, &pollMetadata{ workerInstanceKey: workerInstanceKey, }) - t.NoError(err) + t.Require().NoError(err) t.NotNil(task) t.NotNil(request) t.Equal(workerInstanceKey, request.GetRequest().GetWorkerInstanceKey(), From a7c0437781b33b2ac2873d9e7140da1c684ed630 Mon Sep 17 00:00:00 2001 From: Kannan Rajah Date: Fri, 6 Feb 2026 15:19:18 -0800 Subject: [PATCH 3/3] fix lint --- service/matching/forwarder_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/matching/forwarder_test.go b/service/matching/forwarder_test.go index 876b3a0ba2..3d545431a1 100644 --- a/service/matching/forwarder_test.go +++ b/service/matching/forwarder_test.go @@ -362,7 +362,7 @@ func (t *ForwarderTestSuite) TestForwardPollForActivityPreservesWorkerInstanceKe task, err := t.fwdr.ForwardPoll(ctx, &pollMetadata{ workerInstanceKey: workerInstanceKey, }) - t.NoError(err) + t.Require().NoError(err) t.NotNil(task) t.NotNil(request) t.Equal(workerInstanceKey, request.GetPollRequest().GetWorkerInstanceKey(),