From c7260db889f4b4ea2a1c4783d020c2e098fd567c Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 5 Feb 2026 18:23:33 -0800 Subject: [PATCH 1/3] Attach worker_instance_key to all poll calls --- contrib/datadog/go.mod | 2 +- contrib/datadog/go.sum | 4 +- contrib/envconfig/go.mod | 2 +- contrib/envconfig/go.sum | 4 +- contrib/opentelemetry/go.mod | 2 +- contrib/opentelemetry/go.sum | 4 +- contrib/opentracing/go.mod | 2 +- contrib/opentracing/go.sum | 4 +- contrib/resourcetuner/go.mod | 2 +- contrib/resourcetuner/go.sum | 4 +- contrib/tally/go.mod | 2 +- contrib/tally/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- internal/cmd/build/go.mod | 2 +- internal/cmd/build/go.sum | 4 +- internal/internal_nexus_task_poller.go | 2 + internal/internal_task_pollers.go | 6 +++ internal/internal_worker.go | 5 +- internal/internal_worker_test.go | 72 ++++++++++++++++++++++++++ test/go.mod | 2 +- test/go.sum | 4 +- 22 files changed, 111 insertions(+), 28 deletions(-) diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index cdc784f2b..a7436938c 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -81,7 +81,7 @@ require ( go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index a4ce61df0..dc44e9f8c 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -233,8 +233,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1 h1:T go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1/go.mod h1:riqUmAOJFDFuIAzZu/3V6cOrTyfWzpgNJnG5UwrapCk= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Kopwh/dtdRagJy+qsRRPA86/Ux3g7+zFXM= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/contrib/envconfig/go.mod b/contrib/envconfig/go.mod index 8047ede83..08c42518b 100644 --- a/contrib/envconfig/go.mod +++ b/contrib/envconfig/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/envconfig/go.sum b/contrib/envconfig/go.sum index fdcf67b9c..2a2b2ed09 100644 --- a/contrib/envconfig/go.sum +++ b/contrib/envconfig/go.sum @@ -37,8 +37,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index ed716c99c..7ebe95f00 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.27.0 go.opentelemetry.io/otel/sdk/metric v1.27.0 - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index acbeae855..4c91adb26 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -50,8 +50,8 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index 8018df582..70a5baac2 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 097dfa2e9..82ba4f53c 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -40,8 +40,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/resourcetuner/go.mod b/contrib/resourcetuner/go.mod index dde1cc877..0199430ae 100644 --- a/contrib/resourcetuner/go.mod +++ b/contrib/resourcetuner/go.mod @@ -36,7 +36,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/resourcetuner/go.sum b/contrib/resourcetuner/go.sum index 00877786f..336048dc6 100644 --- a/contrib/resourcetuner/go.sum +++ b/contrib/resourcetuner/go.sum @@ -73,8 +73,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.einride.tech/pid v0.1.3 h1:yWAKSmD2Z10jxd4gYFhOjbBNqXeIQwAtnCO/XKCT7sQ= go.einride.tech/pid v0.1.3/go.mod h1:33JSUbKrH/4v8DZf/0K8IC8Enjd92wB2birp+bCYQso= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index b0c3b20a1..7f8af2fbc 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -23,7 +23,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 596262d90..84516cec0 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -119,8 +119,8 @@ github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/go.mod b/go.mod index 46648f756..476864e28 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nexus-rpc/sdk-go v0.5.1 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.59.0 + go.temporal.io/api v1.62.0 golang.org/x/sync v0.13.0 golang.org/x/sys v0.32.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index 2f5906d58..040d935eb 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index a47bb04f5..d7d4ee508 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -24,7 +24,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.23.0 // indirect golang.org/x/net v0.39.0 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index a4b3569f1..7a7ba2661 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -55,8 +55,8 @@ go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiy go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/internal_nexus_task_poller.go b/internal/internal_nexus_task_poller.go index d7e17b544..1df3de492 100644 --- a/internal/internal_nexus_task_poller.go +++ b/internal/internal_nexus_task_poller.go @@ -42,6 +42,7 @@ func newNexusTaskPoller( useBuildIDVersioning: params.UseBuildIDForVersioning, workerDeploymentVersion: params.DeploymentOptions.Version, capabilities: params.capabilities, + workerInstanceKey: params.workerInstanceKey, }, taskHandler: taskHandler, service: service, @@ -78,6 +79,7 @@ func (ntp *nexusTaskPoller) poll(ctx context.Context) (taskForWorker, error) { ntp.useBuildIDVersioning, ntp.workerDeploymentVersion, ), + WorkerInstanceKey: ntp.basePoller.workerInstanceKey, } response, err := ntp.pollNexusTaskQueue(ctx, request) diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index dba946879..e3ac1ca10 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -85,6 +85,8 @@ type ( workerDeploymentVersion WorkerDeploymentVersion // Server's capabilities capabilities *workflowservice.GetSystemInfoResponse_Capabilities + // UUID identifier for the worker + workerInstanceKey string } // numPollerMetric tracks the number of active pollers and publishes a metric on it. @@ -324,6 +326,7 @@ func newWorkflowTaskProcessor( useBuildIDVersioning: params.UseBuildIDForVersioning, workerDeploymentVersion: params.DeploymentOptions.Version, capabilities: params.capabilities, + workerInstanceKey: params.workerInstanceKey, }, service: service, namespace: params.Namespace, @@ -965,6 +968,7 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po wtp.useBuildIDVersioning, wtp.workerDeploymentVersion, ), + WorkerInstanceKey: wtp.workerInstanceKey, } if wtp.getCapabilities().BuildIdBasedVersioning { //lint:ignore SA1019 ignore deprecated versioning APIs @@ -1155,6 +1159,7 @@ func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowserv useBuildIDVersioning: params.UseBuildIDForVersioning, workerDeploymentVersion: params.DeploymentOptions.Version, capabilities: params.capabilities, + workerInstanceKey: params.workerInstanceKey, }, taskHandler: taskHandler, service: service, @@ -1194,6 +1199,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (taskForWorker, error) atp.useBuildIDVersioning, atp.workerDeploymentVersion, ), + WorkerInstanceKey: atp.workerInstanceKey, } response, err := atp.pollActivityTaskQueue(ctx, request) diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 7c15c2d4c..535ff1ffb 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -208,6 +208,8 @@ type ( eagerActivityExecutor *eagerActivityExecutor capabilities *workflowservice.GetSystemInfoResponse_Capabilities + + workerInstanceKey string } // HistoryJSONOptions are options for HistoryFromJSON. @@ -2061,7 +2063,8 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke taskQueue: taskQueue, maxConcurrent: options.MaxConcurrentEagerActivityExecutionSize, }), - capabilities: &capabilities, + capabilities: &capabilities, + workerInstanceKey: workerInstanceKey, } if options.MaxConcurrentWorkflowTaskPollers != 0 { diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index b63b46274..fb1c975b5 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -15,6 +15,7 @@ import ( "go.temporal.io/sdk/internal/common/metrics" "github.com/golang/mock/gomock" + "github.com/nexus-rpc/sdk-go/nexus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -1817,6 +1818,77 @@ func (s *internalWorkerTestSuite) TestStartWorkerAfterStopped() { _ = worker.Start() // must panic } +func (s *internalWorkerTestSuite) TestWorkerInstanceKeyPropagation() { + namespace := "testNamespace" + mockCtrl := gomock.NewController(s.T()) + service := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) + service.EXPECT().GetSystemInfo(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.GetSystemInfoResponse{}, nil).AnyTimes() + + namespaceDesc := &workflowservice.DescribeNamespaceResponse{ + NamespaceInfo: &namespacepb.NamespaceInfo{ + Name: namespace, + State: enumspb.NAMESPACE_STATE_REGISTERED, + }, + } + service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any(), gomock.Any()). + Return(namespaceDesc, nil).AnyTimes() + + var capturedWfReq atomic.Pointer[workflowservice.PollWorkflowTaskQueueRequest] + var capturedActReq atomic.Pointer[workflowservice.PollActivityTaskQueueRequest] + var capturedNexusReq atomic.Pointer[workflowservice.PollNexusTaskQueueRequest] + + service.EXPECT().PollWorkflowTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(_ context.Context, req *workflowservice.PollWorkflowTaskQueueRequest, _ ...grpc.CallOption) { + capturedWfReq.CompareAndSwap(nil, req) + }). + Return(&workflowservice.PollWorkflowTaskQueueResponse{}, nil).AnyTimes() + + service.EXPECT().PollActivityTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(_ context.Context, req *workflowservice.PollActivityTaskQueueRequest, _ ...grpc.CallOption) { + capturedActReq.CompareAndSwap(nil, req) + }). + Return(&workflowservice.PollActivityTaskQueueResponse{}, nil).AnyTimes() + + service.EXPECT().PollNexusTaskQueue(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(_ context.Context, req *workflowservice.PollNexusTaskQueueRequest, _ ...grpc.CallOption) { + capturedNexusReq.CompareAndSwap(nil, req) + }). + Return(&workflowservice.PollNexusTaskQueueResponse{}, nil).AnyTimes() + + service.EXPECT().RespondWorkflowTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, nil).AnyTimes() + service.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.RespondActivityTaskCompletedResponse{}, nil).AnyTimes() + service.EXPECT().ShutdownWorker(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&workflowservice.ShutdownWorkerResponse{}, nil).Times(1) + + client := NewServiceClient(service, nil, ClientOptions{Namespace: namespace}) + worker := NewAggregatedWorker(client, "testTaskQueue", WorkerOptions{}) + + // Register a nexus service so the nexus worker is started + nexusSvc := nexus.NewService("test") + nexusSvc.MustRegister(nexus.NewSyncOperation("op", func(ctx context.Context, s string, opts nexus.StartOperationOptions) (string, error) { + return s, nil + })) + worker.RegisterNexusService(nexusSvc) + + require.NotEmpty(s.T(), worker.workerInstanceKey) + + err := worker.Start() + require.NoError(s.T(), err) + + require.Eventually(s.T(), func() bool { + return capturedWfReq.Load() != nil && capturedActReq.Load() != nil && capturedNexusReq.Load() != nil + }, 5*time.Second, 10*time.Millisecond) + + worker.Stop() + + assert.Equal(s.T(), worker.workerInstanceKey, capturedWfReq.Load().WorkerInstanceKey) + assert.Equal(s.T(), worker.workerInstanceKey, capturedActReq.Load().WorkerInstanceKey) + assert.Equal(s.T(), worker.workerInstanceKey, capturedNexusReq.Load().WorkerInstanceKey) +} + func ofPollActivityTaskQueueRequest(tps float64) gomock.Matcher { return &mockPollActivityTaskQueueRequest{tps: tps} } diff --git a/test/go.mod b/test/go.mod index 25fd4a34b..f58dec949 100644 --- a/test/go.mod +++ b/test/go.mod @@ -15,7 +15,7 @@ require ( go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 - go.temporal.io/api v1.59.0 + go.temporal.io/api v1.62.0 go.temporal.io/sdk v1.29.1 go.temporal.io/sdk/contrib/opentelemetry v0.0.0-00010101000000-000000000000 go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000 diff --git a/test/go.sum b/test/go.sum index 252849014..7e3e564c3 100644 --- a/test/go.sum +++ b/test/go.sum @@ -172,8 +172,8 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= From 4fbc5f98e9e0685e1636580801364fa113747674 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 5 Feb 2026 19:32:03 -0800 Subject: [PATCH 2/3] Bump dev server version --- internal/cmd/build/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index ad17d4977..5ac236a36 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -121,7 +121,7 @@ func (b *builder) integrationTest() error { if *devServerFlag { devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{ CachedDownload: testsuite.CachedDownload{ - Version: "v1.5.0-rc", + Version: "v1.6.0", }, ClientOptions: &client.Options{ HostPort: "127.0.0.1:7233", From d064b964dfe80862231614a7b0d26c5661108b70 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Thu, 5 Feb 2026 20:44:25 -0800 Subject: [PATCH 3/3] Fix tests due to server version bump --- internal/cmd/build/main.go | 2 ++ test/integration_test.go | 1 + 2 files changed, 3 insertions(+) diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index 5ac236a36..35776b199 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -155,6 +155,8 @@ func (b *builder) integrationTest() error { "--dynamic-config-value", `system.refreshNexusEndpointsMinWait="0s"`, // Make Nexus tests faster "--dynamic-config-value", `component.nexusoperations.recordCancelRequestCompletionEvents=true`, // Defaults to false until after OSS 1.28 is released "--dynamic-config-value", `history.enableRequestIdRefLinks=true`, + "--dynamic-config-value", `component.nexusoperations.useSystemCallbackURL=false`, + "--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`, }, }) if err != nil { diff --git a/test/integration_test.go b/test/integration_test.go index 8df934b2f..9392f17c2 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -7983,6 +7983,7 @@ func (ts *IntegrationTestSuite) TestMutableSideEffectSummary() { } func (ts *IntegrationTestSuite) TestGrpcMessageTooLarge() { + ts.T().Skip("temporal server 1.30 has different behavior") // see https://github.com/temporalio/temporal/pull/8610 assertGrpcErrorInHistory := func(ctx context.Context, run client.WorkflowRun) { iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), true, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) for iter.HasNext() {