diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0008a2e18..241d8f8bc 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -99,6 +99,7 @@ jobs: # TODO(antlai-temporal): Remove this flag once server 1.27 released. DISABLE_SERVER_1_27_TESTS: "1" DISABLE_PRIORITY_TESTS: "1" + DISABLE_STANDALONE_ACTIVITY_TESTS: "1" working-directory: ./internal/cmd/build cloud-test: diff --git a/.github/workflows/docker/dynamic-config-custom.yaml b/.github/workflows/docker/dynamic-config-custom.yaml index 4b638db67..49dddbb77 100644 --- a/.github/workflows/docker/dynamic-config-custom.yaml +++ b/.github/workflows/docker/dynamic-config-custom.yaml @@ -41,4 +41,12 @@ component.callbacks.allowedAddresses: component.nexusoperations.recordCancelRequestCompletionEvents: - value: true frontend.activityAPIsEnabled: - - value: true \ No newline at end of file + - value: true +activity.enableStandalone: + - value: true +history.enableChasm: + - value: true +history.enableTransitionHistory: + - value: true +component.nexusoperations.useSystemCallbackURL: + - value: false \ No newline at end of file diff --git a/client/client.go b/client/client.go index ed149e647..a04461139 100644 --- a/client/client.go +++ b/client/client.go @@ -8,13 +8,12 @@ package client import ( "context" "crypto/tls" - "io" - commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" historypb "go.temporal.io/api/history/v1" "go.temporal.io/api/operatorservice/v1" "go.temporal.io/api/workflowservice/v1" + "io" "go.temporal.io/sdk/converter" "go.temporal.io/sdk/internal" @@ -896,6 +895,80 @@ type ( // Note, this is not related to any general concept of timing out or cancelling a running update, this is only related to the client call itself. WorkflowUpdateServiceTimeoutOrCanceledError = internal.WorkflowUpdateServiceTimeoutOrCanceledError + // StartActivityOptions contains configuration parameters for starting an activity execution from the client. + // ID and TaskQueue are required. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // Other parameters are optional. + // + // NOTE: Experimental + StartActivityOptions = internal.ClientStartActivityOptions + + // GetActivityHandleOptions contains input for GetActivityHandle call. + // ActivityID and RunID are required. + // + // NOTE: Experimental + GetActivityHandleOptions = internal.ClientGetActivityHandleOptions + + // ListActivitiesOptions contains input for ListActivities call. + // + // NOTE: Experimental + ListActivitiesOptions = internal.ClientListActivitiesOptions + + // ListActivitiesResult contains the result of the ListActivities call. + // + // NOTE: Experimental + ListActivitiesResult = internal.ClientListActivitiesResult + + // CountActivitiesOptions contains input for CountActivities call. + // + // NOTE: Experimental + CountActivitiesOptions = internal.ClientCountActivitiesOptions + + // CountActivitiesResult contains the result of the CountActivities call. + // + // NOTE: Experimental + CountActivitiesResult = internal.ClientCountActivitiesResult + + // CountActivitiesAggregationGroup contains groups of activities if + // CountActivityExecutions is grouped by a field. + // The list might not be complete, and the counts of each group is approximate. + // + // NOTE: Experimental + CountActivitiesAggregationGroup = internal.ClientCountActivitiesAggregationGroup + + // ActivityHandle represents a running or completed standalone activity execution. + // It can be used to get the result, describe, cancel, or terminate the activity. + // + // NOTE: Experimental + ActivityHandle = internal.ClientActivityHandle + + // ActivityExecutionInfo contains information about an activity execution. + // This is returned by ListActivities and embedded in ClientActivityExecutionDescription. + // + // NOTE: Experimental + ActivityExecutionInfo = internal.ClientActivityExecutionInfo + + // ActivityExecutionDescription contains detailed information about an activity execution. + // This is returned by ClientActivityHandle.Describe. + // + // NOTE: Experimental + ActivityExecutionDescription = internal.ClientActivityExecutionDescription + + // DescribeActivityOptions contains options for ClientActivityHandle.Describe call. + // For future compatibility, currently unused. + // + // NOTE: Experimental + DescribeActivityOptions = internal.ClientDescribeActivityOptions + + // CancelActivityOptions contains options for ClientActivityHandle.Cancel call. + // + // NOTE: Experimental + CancelActivityOptions = internal.ClientCancelActivityOptions + + // TerminateActivityOptions contains options for ClientActivityHandle.Terminate call. + // + // NOTE: Experimental + TerminateActivityOptions = internal.ClientTerminateActivityOptions + // Client is the client for starting and getting information about a workflow executions as well as // completing activities asynchronously. Client interface { @@ -1022,11 +1095,10 @@ type ( GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType enumspb.HistoryEventFilterType) HistoryEventIterator // CompleteActivity reports activity completed. - // activity Execute method can return activity.ErrResultPending to - // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method - // should be called when that activity is completed with the actual result and error. If err is nil, activity task - // completed event will be reported; if err is CanceledError, activity task canceled event will be reported; otherwise, - // activity task failed event will be reported. + // An activity's implementation can return activity.ErrResultPending to indicate it will be completed asynchronously. + // In that case, this CompleteActivity() method should be called when the activity is completed with the + // actual result and error. If err is nil, activity task completed event will be reported; if err is CanceledError, + // activity task canceled event will be reported; otherwise, activity task failed event will be reported. // An activity implementation should use GetActivityInfo(ctx).TaskToken function to get task token to use for completion. // Example:- // To complete with a result. @@ -1037,20 +1109,39 @@ type ( CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error // CompleteActivityByID reports activity completed. - // Similar to CompleteActivity, but may save user from keeping taskToken info. - // activity Execute method can return activity.ErrResultPending to - // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivityById() method - // should be called when that activity is completed with the actual result and error. If err is nil, activity task - // completed event will be reported; if err is CanceledError, activity task canceled event will be reported; otherwise, - // activity task failed event will be reported. + // Similar to CompleteActivity, but may save the user from keeping taskToken info. + // This method works only for workflow activities. workflowID and runID must be set to the workflow ID and workflow run ID + // of the workflow that started the activity. To complete a standalone activity (not started by workflow), + // use CompleteActivityByActivityID. + // + // An activity's implementation can return activity.ErrResultPending to indicate it will be completed asynchronously. + // In that case, this CompleteActivityByID() method should be called when the activity is completed with the + // actual result and error. If err is nil, activity task completed event will be reported; if err is CanceledError, + // activity task canceled event will be reported; otherwise, activity task failed event will be reported. // An activity implementation should use activityID provided in ActivityOption to use for completion. - // namespace name, workflowID, activityID are required, runID is optional. + // namespace, workflowID and activityID are required, runID is optional. // The errors it can return: // - ApplicationError // - TimeoutError // - CanceledError CompleteActivityByID(ctx context.Context, namespace, workflowID, runID, activityID string, result interface{}, err error) error + // CompleteActivityByActivityID reports activity completed. + // Similar to CompleteActivity, but may save the user from keeping taskToken info. + // This method works only for standalone activities. To complete a workflow activity, use CompleteActivityByID. + // + // An activity's implementation can return activity.ErrResultPending to indicate it will be completed asynchronously. + // In that case, this CompleteActivityByActivityID() method should be called when the activity is completed with the + // actual result and error. If err is nil, activity task completed event will be reported; if err is CanceledError, + // activity task canceled event will be reported; otherwise, activity task failed event will be reported. + // An activity implementation should use activityID provided in ActivityOption to use for completion. + // namespace and activityID are required, activityRunID is optional. + // The errors it can return: + // - ApplicationError + // - TimeoutError + // - CanceledError + CompleteActivityByActivityID(ctx context.Context, namespace, activityID, activityRunID string, result interface{}, err error) error + // RecordActivityHeartbeat records heartbeat for an activity. // taskToken - is the value of the binary "TaskToken" field of the "ActivityInfo" struct retrieved inside the activity. // details - is the progress you want to record along with heart beat for this activity. If the activity is canceled, @@ -1306,6 +1397,41 @@ type ( // if not specified the most recent runID will be used. GetWorkflowUpdateHandle(ref GetWorkflowUpdateHandleOptions) WorkflowUpdateHandle + // ExecuteActivity starts a standalone activity execution and returns an ActivityHandle. + // The user can use this to start using a function or activity type name. + // Either by + // ExecuteActivity(ctx, options, "activityTypeName", arg1, arg2, arg3) + // or + // ExecuteActivity(ctx, options, activityFn, arg1, arg2, arg3) + // + // Returns an ActivityExecutionAlreadyStarted error if an activity with the same ID already exists + // in this namespace, unless permitted by the specified ID conflict policy. + // + // NOTE: Standalone activities are not associated with a workflow execution. + // They are scheduled directly on a task queue and executed by a worker. + // + // NOTE: Experimental + ExecuteActivity(ctx context.Context, options StartActivityOptions, activity any, args ...any) (ActivityHandle, error) + + // GetActivityHandle creates a handle to the referenced activity. + // + // NOTE: Experimental + GetActivityHandle(options GetActivityHandleOptions) ActivityHandle + + // ListActivities lists activity executions based on query. + // + // Currently, all errors are returned in the iterator and not the base level error. + // + // NOTE: Experimental + ListActivities(ctx context.Context, options ListActivitiesOptions) (ListActivitiesResult, error) + + // CountActivities counts activity executions based on query. The result + // includes the total count and optionally grouped counts if the query includes + // a GROUP BY clause. + // + // NOTE: Experimental + CountActivities(ctx context.Context, options CountActivitiesOptions) (*CountActivitiesResult, error) + // WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases // that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the // service are not configured with internal semantics such as automatic retries. diff --git a/contrib/datadog/go.mod b/contrib/datadog/go.mod index cdc784f2b..a7436938c 100644 --- a/contrib/datadog/go.mod +++ b/contrib/datadog/go.mod @@ -81,7 +81,7 @@ require ( go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect go.opentelemetry.io/otel/trace v1.37.0 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect diff --git a/contrib/datadog/go.sum b/contrib/datadog/go.sum index a4ce61df0..dc44e9f8c 100644 --- a/contrib/datadog/go.sum +++ b/contrib/datadog/go.sum @@ -233,8 +233,8 @@ go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1 h1:T go.opentelemetry.io/proto/slim/otlp/collector/profiles/v1development v0.0.1/go.mod h1:riqUmAOJFDFuIAzZu/3V6cOrTyfWzpgNJnG5UwrapCk= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1 h1:z/oMlrCv3Kopwh/dtdRagJy+qsRRPA86/Ux3g7+zFXM= go.opentelemetry.io/proto/slim/otlp/profiles/v1development v0.0.1/go.mod h1:C7EHYSIiaALi9RnNORCVaPCQDuJgJEn/XxkctaTez1E= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= diff --git a/contrib/envconfig/go.mod b/contrib/envconfig/go.mod index 8047ede83..08c42518b 100644 --- a/contrib/envconfig/go.mod +++ b/contrib/envconfig/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/envconfig/go.sum b/contrib/envconfig/go.sum index fdcf67b9c..2a2b2ed09 100644 --- a/contrib/envconfig/go.sum +++ b/contrib/envconfig/go.sum @@ -37,8 +37,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentelemetry/go.mod b/contrib/opentelemetry/go.mod index ed716c99c..7ebe95f00 100644 --- a/contrib/opentelemetry/go.mod +++ b/contrib/opentelemetry/go.mod @@ -32,7 +32,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect go.opentelemetry.io/otel/metric v1.27.0 go.opentelemetry.io/otel/sdk/metric v1.27.0 - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sys v0.32.0 // indirect golang.org/x/text v0.24.0 // indirect diff --git a/contrib/opentelemetry/go.sum b/contrib/opentelemetry/go.sum index acbeae855..4c91adb26 100644 --- a/contrib/opentelemetry/go.sum +++ b/contrib/opentelemetry/go.sum @@ -50,8 +50,8 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/opentracing/go.mod b/contrib/opentracing/go.mod index 8018df582..70a5baac2 100644 --- a/contrib/opentracing/go.mod +++ b/contrib/opentracing/go.mod @@ -22,7 +22,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect golang.org/x/sys v0.32.0 // indirect diff --git a/contrib/opentracing/go.sum b/contrib/opentracing/go.sum index 097dfa2e9..82ba4f53c 100644 --- a/contrib/opentracing/go.sum +++ b/contrib/opentracing/go.sum @@ -40,8 +40,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/contrib/resourcetuner/go.mod b/contrib/resourcetuner/go.mod index dde1cc877..0199430ae 100644 --- a/contrib/resourcetuner/go.mod +++ b/contrib/resourcetuner/go.mod @@ -36,7 +36,7 @@ require ( github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/resourcetuner/go.sum b/contrib/resourcetuner/go.sum index 00877786f..336048dc6 100644 --- a/contrib/resourcetuner/go.sum +++ b/contrib/resourcetuner/go.sum @@ -73,8 +73,8 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.einride.tech/pid v0.1.3 h1:yWAKSmD2Z10jxd4gYFhOjbBNqXeIQwAtnCO/XKCT7sQ= go.einride.tech/pid v0.1.3/go.mod h1:33JSUbKrH/4v8DZf/0K8IC8Enjd92wB2birp+bCYQso= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/contrib/tally/go.mod b/contrib/tally/go.mod index b0c3b20a1..7f8af2fbc 100644 --- a/contrib/tally/go.mod +++ b/contrib/tally/go.mod @@ -23,7 +23,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/twmb/murmur3 v1.1.5 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/net v0.39.0 // indirect golang.org/x/sync v0.13.0 // indirect diff --git a/contrib/tally/go.sum b/contrib/tally/go.sum index 596262d90..84516cec0 100644 --- a/contrib/tally/go.sum +++ b/contrib/tally/go.sum @@ -119,8 +119,8 @@ github.com/uber-go/tally/v4 v4.1.1/go.mod h1:aXeSTDMl4tNosyf6rdU8jlgScHyjEGGtfJ/ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/go.mod b/go.mod index 46648f756..476864e28 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/nexus-rpc/sdk-go v0.5.1 github.com/robfig/cron v1.2.0 github.com/stretchr/testify v1.10.0 - go.temporal.io/api v1.59.0 + go.temporal.io/api v1.62.0 golang.org/x/sync v0.13.0 golang.org/x/sys v0.32.0 golang.org/x/time v0.3.0 diff --git a/go.sum b/go.sum index 2f5906d58..040d935eb 100644 --- a/go.sum +++ b/go.sum @@ -35,8 +35,8 @@ github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/interceptor/interceptor.go b/interceptor/interceptor.go index fcc2f314c..55f497f67 100644 --- a/interceptor/interceptor.go +++ b/interceptor/interceptor.go @@ -196,6 +196,54 @@ type ClientDescribeWorkflowInput = internal.ClientDescribeWorkflowInput // ClientOutboundInterceptor.DescribeWorkflow. type ClientDescribeWorkflowOutput = internal.ClientDescribeWorkflowOutput +// ClientExecuteActivityInput is the input to +// ClientOutboundInterceptor.ExecuteActivity. +// +// NOTE: Experimental +type ClientExecuteActivityInput = internal.ClientExecuteActivityInput + +// ClientGetActivityHandleInput is the input to +// ClientOutboundInterceptor.GetActivityHandle. +// +// NOTE: Experimental +type ClientGetActivityHandleInput = internal.ClientGetActivityHandleInput + +// ClientCancelActivityInput is the input to +// ClientOutboundInterceptor.CancelActivity. +// +// NOTE: Experimental +type ClientCancelActivityInput = internal.ClientCancelActivityInput + +// ClientTerminateActivityInput is the input to +// ClientOutboundInterceptor.TerminateActivity. +// +// NOTE: Experimental +type ClientTerminateActivityInput = internal.ClientTerminateActivityInput + +// ClientDescribeActivityInput is the input to +// ClientOutboundInterceptor.DescribeActivity. +// +// NOTE: Experimental +type ClientDescribeActivityInput = internal.ClientDescribeActivityInput + +// ClientDescribeActivityOutput is the output of +// ClientOutboundInterceptor.DescribeActivity. +// +// NOTE: Experimental +type ClientDescribeActivityOutput = internal.ClientDescribeActivityOutput + +// ClientPollActivityResultInput is the input to +// ClientOutboundInterceptor.PollActivityResult. +// +// NOTE: Experimental +type ClientPollActivityResultInput = internal.ClientPollActivityResultInput + +// ClientPollActivityResultOutput is the output of +// ClientOutboundInterceptor.PollActivityResult. +// +// NOTE: Experimental +type ClientPollActivityResultOutput = internal.ClientPollActivityResultOutput + // ScheduleClientCreateInput is input for // ScheduleClientInterceptor.CreateSchedule. type ScheduleClientCreateInput = internal.ScheduleClientCreateInput diff --git a/internal/activity.go b/internal/activity.go index 13e74cd6b..7891138ca 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -25,13 +25,22 @@ type ( // // Exposed as: [go.temporal.io/sdk/activity.Info] ActivityInfo struct { - TaskToken []byte - WorkflowType *WorkflowType - WorkflowNamespace string + TaskToken []byte + WorkflowType *WorkflowType + // Namespace of the workflow that started this activity. Empty if this activity was not started by a workflow. + // If present, the value is always the same as Namespace since workflows can only run activities in their own + // namespace. + // + // Deprecated: use Namespace instead. + WorkflowNamespace string + // Execution details of the workflow that started this activity. All fields are empty if this activity was not + // started by a workflow. WorkflowExecution WorkflowExecution ActivityID string + ActivityRunID string // Run ID of the activity. Empty if the activity was started by a workflow. ActivityType ActivityType TaskQueue string + Namespace string // Namespace of this activity. HeartbeatTimeout time.Duration // Maximum time between heartbeats. 0 means no heartbeat needed. ScheduleToCloseTimeout time.Duration // Schedule to close timeout set by the activity options. StartToCloseTimeout time.Duration // Start to close timeout set by the activity options. @@ -195,6 +204,11 @@ type ( } ) +// IsWorkflowActivity returns true if this activity was started by a workflow. +func (i *ActivityInfo) IsWorkflowActivity() bool { + return i.WorkflowExecution.ID != "" +} + // GetActivityInfo returns information about the currently executing activity. // // Exposed as: [go.temporal.io/sdk/activity.GetInfo] @@ -308,24 +322,11 @@ func WithActivityTask( heartbeatTimeout := task.GetHeartbeatTimeout().AsDuration() deadline := calculateActivityDeadline(scheduled, scheduleToCloseTimeout, startToCloseTimeout) - logger = log.With(logger, - tagActivityID, task.ActivityId, - tagActivityType, task.ActivityType.GetName(), - tagAttempt, task.Attempt, - tagWorkflowType, task.WorkflowType.GetName(), - tagWorkflowID, task.WorkflowExecution.WorkflowId, - tagRunID, task.WorkflowExecution.RunId, - ) - - return newActivityContext(ctx, interceptors, &activityEnvironment{ - taskToken: task.TaskToken, - serviceInvoker: invoker, - activityType: ActivityType{Name: task.ActivityType.GetName()}, - activityID: task.ActivityId, - workflowExecution: WorkflowExecution{ - RunID: task.WorkflowExecution.RunId, - ID: task.WorkflowExecution.WorkflowId}, - logger: logger, + env := &activityEnvironment{ + taskToken: task.TaskToken, + serviceInvoker: invoker, + activityType: ActivityType{Name: task.ActivityType.GetName()}, + activityID: task.ActivityId, metricsHandler: metricsHandler, deadline: deadline, heartbeatTimeout: heartbeatTimeout, @@ -338,15 +339,40 @@ func WithActivityTask( attempt: task.GetAttempt(), priority: task.GetPriority(), heartbeatDetails: task.HeartbeatDetails, - workflowType: &WorkflowType{ + namespace: task.WorkflowNamespace, + retryPolicy: convertFromPBRetryPolicy(task.RetryPolicy), + workerStopChannel: workerStopChannel, + contextPropagators: contextPropagators, + client: client, + } + + if task.WorkflowExecution.GetWorkflowId() == "" { + env.activityRunID = task.ActivityRunId + env.logger = log.With(logger, + tagActivityID, task.ActivityId, + tagActivityRunID, task.ActivityRunId, + tagActivityType, task.ActivityType.GetName(), + tagAttempt, task.Attempt, + ) + } else { + env.workflowExecution = WorkflowExecution{ + ID: task.WorkflowExecution.GetWorkflowId(), + RunID: task.WorkflowExecution.GetRunId(), + } + env.workflowType = &WorkflowType{ Name: task.WorkflowType.GetName(), - }, - workflowNamespace: task.WorkflowNamespace, - retryPolicy: convertFromPBRetryPolicy(task.RetryPolicy), - workerStopChannel: workerStopChannel, - contextPropagators: contextPropagators, - client: client, - }) + } + env.logger = log.With(logger, + tagActivityID, task.ActivityId, + tagActivityType, task.ActivityType.GetName(), + tagAttempt, task.Attempt, + tagWorkflowType, task.WorkflowType.GetName(), + tagWorkflowID, task.WorkflowExecution.GetWorkflowId(), + tagRunID, task.WorkflowExecution.GetRunId(), + ) + } + + return newActivityContext(ctx, interceptors, env) } // WithLocalActivityTask adds local activity specific information into context. @@ -391,7 +417,7 @@ func WithLocalActivityTask( } return newActivityContext(ctx, interceptors, &activityEnvironment{ workflowType: &workflowTypeLocal, - workflowNamespace: task.params.WorkflowInfo.Namespace, + namespace: task.params.WorkflowInfo.Namespace, taskQueue: task.params.WorkflowInfo.TaskQueueName, activityType: ActivityType{Name: activityType}, activityID: fmt.Sprintf("%v", task.activityID), diff --git a/internal/client.go b/internal/client.go index 24c7f6737..cc779ced8 100644 --- a/internal/client.go +++ b/internal/client.go @@ -164,11 +164,10 @@ type ( GetWorkflowHistory(ctx context.Context, workflowID string, runID string, isLongPoll bool, filterType enumspb.HistoryEventFilterType) HistoryEventIterator // CompleteActivity reports activity completed. - // activity Execute method can return activity.ErrResultPending to - // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivity() method - // should be called when that activity is completed with the actual result and error. If err is nil, activity task - // completed event will be reported; if err is CanceledError, activity task canceled event will be reported; otherwise, - // activity task failed event will be reported. + // An activity's implementation can return activity.ErrResultPending to indicate it will be completed asynchronously. + // In that case, this CompleteActivity() method should be called when the activity is completed with the + // actual result and error. If err is nil, activity task completed event will be reported; if err is CanceledError, + // activity task canceled event will be reported; otherwise, activity task failed event will be reported. // An activity implementation should use GetActivityInfo(ctx).TaskToken function to get task token to use for completion. // Example:- // To complete with a result. @@ -179,20 +178,39 @@ type ( CompleteActivity(ctx context.Context, taskToken []byte, result interface{}, err error) error // CompleteActivityByID reports activity completed. - // Similar to CompleteActivity, but may save user from keeping taskToken info. - // activity Execute method can return activity.ErrResultPending to - // indicate the activity is not completed when it's Execute method returns. In that case, this CompleteActivityById() method - // should be called when that activity is completed with the actual result and error. If err is nil, activity task - // completed event will be reported; if err is CanceledError, activity task canceled event will be reported; otherwise, - // activity task failed event will be reported. + // Similar to CompleteActivity, but may save the user from keeping taskToken info. + // This method works only for workflow activities. workflowID and runID must be set to the workflow ID and workflow run ID + // of the workflow that started the activity. To complete a standalone activity (not started by workflow), + // use CompleteActivityByActivityID. + // + // An activity's implementation can return activity.ErrResultPending to indicate it will be completed asynchronously. + // In that case, this CompleteActivityByID() method should be called when the activity is completed with the + // actual result and error. If err is nil, activity task completed event will be reported; if err is CanceledError, + // activity task canceled event will be reported; otherwise, activity task failed event will be reported. // An activity implementation should use activityID provided in ActivityOption to use for completion. - // namespace name, workflowID, activityID are required, runID is optional. + // namespace, workflowID and activityID are required, runID is optional. // The errors it can return: // - ApplicationError // - TimeoutError // - CanceledError CompleteActivityByID(ctx context.Context, namespace, workflowID, runID, activityID string, result interface{}, err error) error + // CompleteActivityByActivityID reports activity completed. + // Similar to CompleteActivity, but may save the user from keeping taskToken info. + // This method works only for standalone activities. To complete a workflow activity, use CompleteActivityByID. + // + // An activity's implementation can return activity.ErrResultPending to indicate it will be completed asynchronously. + // In that case, this CompleteActivityByActivityID() method should be called when the activity is completed with + // the actual result and error. If err is nil, activity task completed event will be reported; if err is CanceledError, + // activity task canceled event will be reported; otherwise, activity task failed event will be reported. + // An activity implementation should use activityID provided in ActivityOption to use for completion. + // namespace and activityID are required, activityRunID is optional. + // The errors it can return: + // - ApplicationError + // - TimeoutError + // - CanceledError + CompleteActivityByActivityID(ctx context.Context, namespace, activityID, activityRunID string, result interface{}, err error) error + // RecordActivityHeartbeat records heartbeat for an activity. // details - is the progress you want to record along with heart beat for this activity. // The errors it can return: @@ -411,6 +429,41 @@ type ( // if not specified the most recent runID will be used. GetWorkflowUpdateHandle(GetWorkflowUpdateHandleOptions) WorkflowUpdateHandle + // ExecuteActivity starts a standalone activity execution and returns an ActivityHandle. + // The user can use this to start using a function or activity type name. + // Either by + // ExecuteActivity(ctx, options, "activityTypeName", arg1, arg2, arg3) + // or + // ExecuteActivity(ctx, options, activityFn, arg1, arg2, arg3) + // + // Returns an ActivityExecutionAlreadyStarted error if an activity with the same ID already exists + // in this namespace, unless permitted by the specified ID conflict policy. + // + // NOTE: Standalone activities are not associated with a workflow execution. + // They are scheduled directly on a task queue and executed by a worker. + // + // NOTE: Experimental + ExecuteActivity(ctx context.Context, options ClientStartActivityOptions, activity any, args ...any) (ClientActivityHandle, error) + + // GetActivityHandle creates a handle to the referenced activity. + // + // NOTE: Experimental + GetActivityHandle(options ClientGetActivityHandleOptions) ClientActivityHandle + + // ListActivities lists activity executions based on query. + // + // Currently, all errors are returned in the iterator and not the base level error. + // + // NOTE: Experimental + ListActivities(ctx context.Context, options ClientListActivitiesOptions) (ClientListActivitiesResult, error) + + // CountActivities counts activity executions based on query. The result + // includes the total count and optionally grouped counts if the query includes + // a GROUP BY clause. + // + // NOTE: Experimental + CountActivities(ctx context.Context, options ClientCountActivitiesOptions) (*ClientCountActivitiesResult, error) + // WorkflowService provides access to the underlying gRPC service. This should only be used for advanced use cases // that cannot be accomplished via other Client methods. Unlike calls to other Client methods, calls directly to the // service are not configured with internal semantics such as automatic retries. diff --git a/internal/cmd/build/go.mod b/internal/cmd/build/go.mod index a47bb04f5..d7d4ee508 100644 --- a/internal/cmd/build/go.mod +++ b/internal/cmd/build/go.mod @@ -24,7 +24,7 @@ require ( github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.10.0 // indirect - go.temporal.io/api v1.59.0 // indirect + go.temporal.io/api v1.62.0 // indirect golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect golang.org/x/mod v0.23.0 // indirect golang.org/x/net v0.39.0 // indirect diff --git a/internal/cmd/build/go.sum b/internal/cmd/build/go.sum index a4b3569f1..7a7ba2661 100644 --- a/internal/cmd/build/go.sum +++ b/internal/cmd/build/go.sum @@ -55,8 +55,8 @@ go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiy go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/internal/cmd/build/main.go b/internal/cmd/build/main.go index ad17d4977..c949c056b 100644 --- a/internal/cmd/build/main.go +++ b/internal/cmd/build/main.go @@ -121,7 +121,7 @@ func (b *builder) integrationTest() error { if *devServerFlag { devServer, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{ CachedDownload: testsuite.CachedDownload{ - Version: "v1.5.0-rc", + Version: "v1.6.1-server-1.31.0-150.0", }, ClientOptions: &client.Options{ HostPort: "127.0.0.1:7233", @@ -155,7 +155,11 @@ func (b *builder) integrationTest() error { "--dynamic-config-value", `system.refreshNexusEndpointsMinWait="0s"`, // Make Nexus tests faster "--dynamic-config-value", `component.nexusoperations.recordCancelRequestCompletionEvents=true`, // Defaults to false until after OSS 1.28 is released "--dynamic-config-value", `history.enableRequestIdRefLinks=true`, - }, + "--dynamic-config-value", "activity.enableStandalone=true", + "--dynamic-config-value", "history.enableChasm=true", + "--dynamic-config-value", "history.enableTransitionHistory=true", + "--dynamic-config-value", `component.nexusoperations.useSystemCallbackURL=false`, + "--dynamic-config-value", `component.nexusoperations.callback.endpoint.template="http://localhost:7243/namespaces/{{.NamespaceName}}/nexus/callback"`}, }) if err != nil { return fmt.Errorf("failed starting dev server: %w", err) diff --git a/internal/interceptor.go b/internal/interceptor.go index 878d403db..dbccfb945 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -418,6 +418,38 @@ type ClientOutboundInterceptor interface { // DescribeWorkflow intercepts client.Client.DescribeWorkflow. DescribeWorkflow(context.Context, *ClientDescribeWorkflowInput) (*ClientDescribeWorkflowOutput, error) + // ExecuteActivity intercepts client.Client.ExecuteActivity. + // + // NOTE: Experimental + ExecuteActivity(context.Context, *ClientExecuteActivityInput) (ClientActivityHandle, error) + + // GetActivityHandle intercepts client.Client.GetActivityHandle. + // While the interceptor is allowed to make network calls here, note that the base implementation does not - it only constructs + // the handle which is then used to make network calls. There is no context object provided and errors cannot be returned. + // + // NOTE: Experimental + GetActivityHandle(*ClientGetActivityHandleInput) ClientActivityHandle + + // CancelActivity intercepts client.ActivityHandle.Cancel. + // + // NOTE: Experimental + CancelActivity(context.Context, *ClientCancelActivityInput) error + + // TerminateActivity intercepts client.ActivityHandle.Terminate. + // + // NOTE: Experimental + TerminateActivity(context.Context, *ClientTerminateActivityInput) error + + // DescribeActivity intercepts client.ActivityHandle.Describe. + // + // NOTE: Experimental + DescribeActivity(context.Context, *ClientDescribeActivityInput) (*ClientDescribeActivityOutput, error) + + // PollActivityResult intercepts client.ActivityHandle.Get. + // + // NOTE: Experimental + PollActivityResult(context.Context, *ClientPollActivityResultInput) (*ClientPollActivityResultOutput, error) + mustEmbedClientOutboundInterceptorBase() } @@ -538,7 +570,7 @@ type ClientDescribeWorkflowInput struct { RunID string } -// ClientDescribeWorkflowInput is the output to +// ClientDescribeWorkflowOutput is the output to // ClientOutboundInterceptor.DescribeWorkflow. // // Exposed as: [go.temporal.io/sdk/interceptor.ClientDescribeWorkflowOutput] @@ -546,6 +578,98 @@ type ClientDescribeWorkflowOutput struct { Response *WorkflowExecutionDescription } +// ClientExecuteActivityInput is the input to +// ClientOutboundInterceptor.ExecuteActivity. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientExecuteActivityInput] +type ClientExecuteActivityInput struct { + Options *ClientStartActivityOptions + ActivityType string + Args []interface{} +} + +// ClientGetActivityHandleInput is the input to +// ClientOutboundInterceptor.GetActivityHandle. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientGetActivityHandleInput] +type ClientGetActivityHandleInput struct { + ActivityID string + RunID string +} + +// ClientCancelActivityInput is the input to +// ClientOutboundInterceptor.CancelActivity. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientCancelActivityInput] +type ClientCancelActivityInput struct { + ActivityID string + RunID string + Reason string +} + +// ClientTerminateActivityInput is the input to +// ClientOutboundInterceptor.TerminateActivity. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientTerminateActivityInput] +type ClientTerminateActivityInput struct { + ActivityID string + RunID string + Reason string +} + +// ClientDescribeActivityInput is the input to +// ClientOutboundInterceptor.DescribeActivity. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientDescribeActivityInput] +type ClientDescribeActivityInput struct { + ActivityID string + RunID string +} + +// ClientDescribeActivityOutput is the output of +// ClientOutboundInterceptor.DescribeActivity. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientDescribeActivityOutput] +type ClientDescribeActivityOutput struct { + Description *ClientActivityExecutionDescription +} + +// ClientPollActivityResultInput is the input to +// ClientOutboundInterceptor.PollActivityResult. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientPollActivityResultInput] +type ClientPollActivityResultInput struct { + ActivityID string + RunID string +} + +// ClientPollActivityResultOutput is the output of +// ClientOutboundInterceptor.PollActivityResult. +// +// NOTE: Experimental +// +// Exposed as: [go.temporal.io/sdk/interceptor.ClientPollActivityResultOutput] +type ClientPollActivityResultOutput struct { + // Result is the result of the update, if it has completed successfully. + Result converter.EncodedValue + // Error is the result of a failed update. + Error error +} + // NexusOutboundInterceptor intercepts Nexus operation method invocations. See documentation in the interceptor package // for more details. // diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 170c8149a..c1ae31044 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -573,11 +573,68 @@ func (c *ClientOutboundInterceptorBase) DescribeWorkflow( return c.Next.DescribeWorkflow(ctx, in) } -// ExecuteWorkflow implements ClientOutboundInterceptor.CreateSchedule. +// CreateSchedule implements ClientOutboundInterceptor.CreateSchedule. func (c *ClientOutboundInterceptorBase) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (ScheduleHandle, error) { return c.Next.CreateSchedule(ctx, in) } +// ExecuteActivity implements ClientOutboundInterceptor.ExecuteActivity. +func (c *ClientOutboundInterceptorBase) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + return c.Next.ExecuteActivity(ctx, in) +} + +// GetActivityHandle implements ClientOutboundInterceptor.GetActivityHandle. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) GetActivityHandle( + in *ClientGetActivityHandleInput, +) ClientActivityHandle { + return c.Next.GetActivityHandle(in) +} + +// CancelActivity implements ClientOutboundInterceptor.CancelActivity. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) CancelActivity( + ctx context.Context, + in *ClientCancelActivityInput, +) error { + return c.Next.CancelActivity(ctx, in) +} + +// TerminateActivity implements ClientOutboundInterceptor.TerminateActivity. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) TerminateActivity( + ctx context.Context, + in *ClientTerminateActivityInput, +) error { + return c.Next.TerminateActivity(ctx, in) +} + +// DescribeActivity implements ClientOutboundInterceptor.DescribeActivity. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) DescribeActivity( + ctx context.Context, + in *ClientDescribeActivityInput, +) (*ClientDescribeActivityOutput, error) { + return c.Next.DescribeActivity(ctx, in) +} + +// PollActivityResult implements ClientOutboundInterceptor.PollActivityResult. +// +// NOTE: Experimental +func (c *ClientOutboundInterceptorBase) PollActivityResult( + ctx context.Context, + in *ClientPollActivityResultInput, +) (*ClientPollActivityResultOutput, error) { + return c.Next.PollActivityResult(ctx, in) +} + func (*ClientOutboundInterceptorBase) mustEmbedClientOutboundInterceptorBase() {} // NexusOperationInboundInterceptorBase is a default implementation of [NexusOperationInboundInterceptor] that diff --git a/internal/interceptortest/proxy.go b/internal/interceptortest/proxy.go index 584f9fc27..125c86fcb 100644 --- a/internal/interceptortest/proxy.go +++ b/internal/interceptortest/proxy.go @@ -536,3 +536,56 @@ func (p *proxyClientOutbound) QueryWorkflow( err, _ = vals[1].Interface().(error) return } + +func (p *proxyClientOutbound) ExecuteActivity( + ctx context.Context, + in *interceptor.ClientExecuteActivityInput, +) (ret client.ActivityHandle, err error) { + vals := p.invoke(ctx, in) + ret, _ = vals[0].Interface().(client.ActivityHandle) + err, _ = vals[1].Interface().(error) + return +} + +func (p *proxyClientOutbound) GetActivityHandle( + in *interceptor.ClientGetActivityHandleInput, +) (ret client.ActivityHandle) { + ret, _ = p.invoke(in)[0].Interface().(client.ActivityHandle) + return +} + +func (p *proxyClientOutbound) CancelActivity( + ctx context.Context, + in *interceptor.ClientCancelActivityInput, +) (err error) { + err, _ = p.invoke(ctx, in)[0].Interface().(error) + return +} + +func (p *proxyClientOutbound) TerminateActivity( + ctx context.Context, + in *interceptor.ClientTerminateActivityInput, +) (err error) { + err, _ = p.invoke(ctx, in)[0].Interface().(error) + return +} + +func (p *proxyClientOutbound) DescribeActivity( + ctx context.Context, + in *interceptor.ClientDescribeActivityInput, +) (ret *interceptor.ClientDescribeActivityOutput, err error) { + vals := p.invoke(ctx, in) + ret, _ = vals[0].Interface().(*interceptor.ClientDescribeActivityOutput) + err, _ = vals[1].Interface().(error) + return +} + +func (p *proxyClientOutbound) PollActivityResult( + ctx context.Context, + in *interceptor.ClientPollActivityResultInput, +) (ret *interceptor.ClientPollActivityResultOutput, err error) { + vals := p.invoke(ctx, in) + ret, _ = vals[0].Interface().(*interceptor.ClientPollActivityResultOutput) + err, _ = vals[1].Interface().(error) + return +} diff --git a/internal/internal_activity.go b/internal/internal_activity.go index 825ef1679..70f1841cf 100644 --- a/internal/internal_activity.go +++ b/internal/internal_activity.go @@ -123,12 +123,13 @@ type ( attempt int32 // starts from 1. heartbeatDetails *commonpb.Payloads workflowType *WorkflowType - workflowNamespace string + namespace string workerStopChannel <-chan struct{} contextPropagators []ContextPropagator client *WorkflowClient priority *commonpb.Priority retryPolicy *RetryPolicy + activityRunID string } // context.WithValue need this type instead of basic type string to avoid lint error @@ -351,6 +352,11 @@ func (a *activityEnvironmentInterceptor) ExecuteActivity( } func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityInfo { + workflowNamespace := "" + if a.env.workflowExecution.ID != "" { + workflowNamespace = a.env.namespace + } + return ActivityInfo{ ActivityID: a.env.activityID, ActivityType: a.env.activityType, @@ -363,12 +369,14 @@ func (a *activityEnvironmentInterceptor) GetInfo(ctx context.Context) ActivityIn ScheduledTime: a.env.scheduledTime, StartedTime: a.env.startedTime, TaskQueue: a.env.taskQueue, + Namespace: a.env.namespace, Attempt: a.env.attempt, WorkflowType: a.env.workflowType, - WorkflowNamespace: a.env.workflowNamespace, + WorkflowNamespace: workflowNamespace, IsLocalActivity: a.env.isLocalActivity, Priority: convertFromPBPriority(a.env.priority), RetryPolicy: a.env.retryPolicy, + ActivityRunID: a.env.activityRunID, } } diff --git a/internal/internal_activity_client.go b/internal/internal_activity_client.go new file mode 100644 index 000000000..7c540cb80 --- /dev/null +++ b/internal/internal_activity_client.go @@ -0,0 +1,769 @@ +package internal + +import ( + "context" + "errors" + "fmt" + "iter" + "time" + + "github.com/google/uuid" + activitypb "go.temporal.io/api/activity/v1" + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" + "google.golang.org/protobuf/types/known/durationpb" +) + +type ( + // ClientStartActivityOptions contains configuration parameters for starting an activity execution. + // ID and TaskQueue are required. At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. + // Other parameters are optional. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.StartActivityOptions] + ClientStartActivityOptions struct { + // ID - The business identifier of the activity. + // + // Mandatory: No default. + ID string + // TaskQueue - The task queue to schedule the activity on. + // + // Mandatory: No default. + TaskQueue string + // ScheduleToCloseTimeout - Total time that a workflow is willing to wait for an Activity to complete. + // ScheduleToCloseTimeout limits the total time of an Activity's execution including retries + // (use StartToCloseTimeout to limit the time of a single attempt). + // The zero value of this uses default value. + // Either this option or StartToCloseTimeout is required: Defaults to unlimited. + ScheduleToCloseTimeout time.Duration + // ScheduleToStartTimeout - Time that the Activity Task can stay in the Task Queue before it is picked up by + // a Worker. Do not specify this timeout unless using host specific Task Queues for Activity Tasks are being + // used for routing. In almost all situations that don't involve routing activities to specific hosts, it is + // better to rely on the default value. + // ScheduleToStartTimeout is always non-retryable. Retrying after this timeout doesn't make sense, as it would + // just put the Activity Task back into the same Task Queue. + // + // Optional: Defaults to unlimited. + ScheduleToStartTimeout time.Duration + // StartToCloseTimeout - Maximum time of a single Activity execution attempt. + // Note that the Temporal Server doesn't detect Worker process failures directly. It relies on this timeout + // to detect that an Activity that didn't complete on time. So this timeout should be as short as the longest + // possible execution of the Activity body. Potentially long running Activities must specify HeartbeatTimeout + // and call Activity.RecordHeartbeat(ctx, "my-heartbeat") periodically for timely failure detection. + // Either this option or ScheduleToCloseTimeout is required: Defaults to the ScheduleToCloseTimeout value. + StartToCloseTimeout time.Duration + // HeartbeatTimeout - Heartbeat interval. Activity must call Activity.RecordHeartbeat(ctx, "my-heartbeat") + // before this interval passes after the last heartbeat or the Activity starts. + HeartbeatTimeout time.Duration + // ActivityIDConflictPolicy - Defines what to do when trying to start an activity with the same ID as a + // running activity. Note that it is never valid to have two running instances of the same activity ID. + // See ActivityIDReusePolicy for handling activity ID duplication with a *closed* activity. + ActivityIDConflictPolicy enumspb.ActivityIdConflictPolicy + // ActivityIDReusePolicy - Defines whether to allow re-using an activity ID from a previously closed activity. + // If the request is denied, the server returns an ActivityExecutionAlreadyStarted error. + // See ActivityIDConflictPolicy for handling ID duplication with a *running* activity. + ActivityIDReusePolicy enumspb.ActivityIdReusePolicy + // RetryPolicy - Specifies how to retry an Activity if an error occurs. + // More details are available at docs.temporal.io. + // RetryPolicy is optional. If one is not specified, a default RetryPolicy is provided by the server. + // The default RetryPolicy provided by the server specifies: + // - InitialInterval of 1 second + // - BackoffCoefficient of 2.0 + // - MaximumInterval of 100 x InitialInterval + // - MaximumAttempts of 0 (unlimited) + // To disable retries, set MaximumAttempts to 1. + // The default RetryPolicy provided by the server can be overridden by the dynamic config. + RetryPolicy *RetryPolicy + // TypedSearchAttributes - Specifies Search Attributes that will be attached to the Workflow. Search Attributes are + // additional indexed information attributed to workflow and used for search and visibility. The search attributes + // can be used in query of List/Scan/Count workflow APIs. The key and its value type must be registered on Temporal + // server side. For supported operations on different server versions see [Visibility]. + // + // Optional: default to none. + // + // [Visibility]: https://docs.temporal.io/visibility + TypedSearchAttributes SearchAttributes + // Summary is a single-line summary for this activity that will appear in UI/CLI. This can be + // in single-line Temporal Markdown format. + // + // Optional: defaults to none/empty. + // + // NOTE: Experimental + Summary string + // Details - General fixed details for this workflow execution that will appear in UI/CLI. This can be in + // Temporal markdown format and can span multiple lines. This is a fixed value on the workflow that cannot be + // updated. For details that can be updated, use SetCurrentDetails within the workflow. + // + // Optional: defaults to none/empty. + // + // NOTE: Experimental + Details string + // Priority - Optional priority settings that control relative ordering of + // task processing when tasks are backed up in a queue. + // + // WARNING: Task queue priority is currently experimental. + Priority Priority + } + + // ClientGetActivityHandleOptions contains input for GetActivityHandle call. + // ActivityID and RunID are required. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.GetActivityHandleOptions] + ClientGetActivityHandleOptions struct { + ActivityID string + RunID string + } + + // ClientListActivitiesOptions contains input for ListActivities call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.ListActivitiesOptions] + ClientListActivitiesOptions struct { + Query string + } + + // ClientListActivitiesResult contains the result of the ListActivities call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.ListActivitiesResult] + ClientListActivitiesResult struct { + Results iter.Seq2[*ClientActivityExecutionInfo, error] + } + + // ClientCountActivitiesOptions contains input for CountActivities call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CountActivitiesOptions] + ClientCountActivitiesOptions struct { + Query string + } + + // ClientCountActivitiesResult contains the result of the CountActivities call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CountActivitiesResult] + ClientCountActivitiesResult struct { + Count int64 + Groups []ClientCountActivitiesAggregationGroup + } + + // ClientCountActivitiesAggregationGroup contains groups of activities if + // CountActivityExecutions is grouped by a field. + // The list might not be complete, and the counts of each group is approximate. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CountActivitiesAggregationGroup] + ClientCountActivitiesAggregationGroup struct { + GroupValues []any + Count int64 + } + + // ClientActivityHandle represents a running or completed standalone activity execution. + // It can be used to get the result, describe, cancel, or terminate the activity. + // + // Methods may be added to this interface; implementing it directly is discouraged. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.ActivityHandle] + ClientActivityHandle interface { + // GetID returns the ID of the activity this handle points to. + GetID() string + // GetRunID returns the run ID that this handle was created with. + // + // Handle returned by [client.Client] has it set to run ID of the started execution. + // + // Handle returned by client.Client.GetActivityHandle has it set to the provided run ID. + // If empty run ID was provided, then this function returns empty string and the handle points to the most + // recent execution with matching activity ID. The run ID of this execution can be retrieved by calling Describe. + GetRunID() string + // Get waits until the activity finishes and gets its result. If the activity completes successfully, the result + // is written to valuePtr and nil is returned. If the activity failed, the failure is returned as an error. + // If an error is encountered trying to get the activity result, that error is returned. + Get(ctx context.Context, valuePtr any) error + // Describe returns detailed information about current state of the activity execution. + Describe(ctx context.Context, options ClientDescribeActivityOptions) (*ClientActivityExecutionDescription, error) + // Cancel requests cancellation of the activity. + Cancel(ctx context.Context, options ClientCancelActivityOptions) error + // Terminate terminates the activity. + Terminate(ctx context.Context, options ClientTerminateActivityOptions) error + } + + // ClientDescribeActivityOptions contains options for ClientActivityHandle.Describe call. + // For future compatibility, currently unused. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.DescribeActivityOptions] + ClientDescribeActivityOptions struct{} + + // ClientCancelActivityOptions contains options for ClientActivityHandle.Cancel call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.CancelActivityOptions] + ClientCancelActivityOptions struct { + // Reason is optional description of the reason for cancellation. + Reason string + } + + // ClientTerminateActivityOptions contains options for ClientActivityHandle.Terminate call. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.TerminateActivityOptions] + ClientTerminateActivityOptions struct { + // Reason is optional description of the reason for cancellation. + Reason string + } + + // ClientActivityExecutionInfo contains information about an activity execution. + // This is returned by ListActivities and embedded in ClientActivityExecutionDescription. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.ActivityExecutionInfo] + ClientActivityExecutionInfo struct { + // Raw PB message this struct was built from. This field is nil in the result of ClientActivityHandle.Describe call - use + // ClientActivityExecutionDescription.RawExecutionInfo instead. + RawExecutionListInfo *activitypb.ActivityExecutionListInfo + ActivityID string + ActivityRunID string + ActivityType string + ScheduleTime time.Time + CloseTime time.Time + Status enumspb.ActivityExecutionStatus + TypedSearchAttributes SearchAttributes + TaskQueue string + ExecutionDuration time.Duration + } + + // ClientActivityExecutionDescription contains detailed information about an activity execution. + // This is returned by ClientActivityHandle.Describe. + // + // NOTE: Experimental + // + // Exposed as: [go.temporal.io/sdk/client.ActivityExecutionDescription] + ClientActivityExecutionDescription struct { + ClientActivityExecutionInfo + // Raw PB message this struct was built from. + RawExecutionInfo *activitypb.ActivityExecutionInfo + RunState enumspb.PendingActivityState + LastHeartbeatTime time.Time + LastStartedTime time.Time + Attempt int32 + RetryPolicy *RetryPolicy + ExpirationTime time.Time + LastWorkerIdentity string + CurrentRetryInterval time.Duration + LastAttemptCompleteTime time.Time + NextAttemptScheduleTime time.Time + LastDeploymentVersion *WorkerDeploymentVersion + Priority Priority + CanceledReason string + dataConverter converter.DataConverter + failureConverter converter.FailureConverter + summary string + details string + } + + // clientActivityHandleImpl is the default implementation of ClientActivityHandle. + clientActivityHandleImpl struct { + client *WorkflowClient + id string + runID string + result *ClientPollActivityResultOutput + } +) + +// HasHeartbeatDetails returns whether heartbeat details are present. Use GetHeartbeatDetails to retrieve them. +func (d *ClientActivityExecutionDescription) HasHeartbeatDetails() bool { + return len(d.RawExecutionInfo.GetHeartbeatDetails().GetPayloads()) > 0 +} + +// GetHeartbeatDetails retrieves heartbeat details. Returns ErrNoData if heartbeat details are not present. +// The details are deserialized into provided pointers using the data converter of the client used to make the Describe call. +// Returns error if data conversion fails. +func (d *ClientActivityExecutionDescription) GetHeartbeatDetails(valuePtrs ...any) error { + details := d.RawExecutionInfo.GetHeartbeatDetails() + if details == nil { + return ErrNoData + } + return d.dataConverter.FromPayloads(details, valuePtrs...) +} + +// GetLastFailure returns the last failure of the activity execution, using the failure converter of the client used to +// make the Describe call. Returns nil if there was no failure. +func (d *ClientActivityExecutionDescription) GetLastFailure() error { + failure := d.RawExecutionInfo.GetLastFailure() + if failure == nil { + return nil + } + return d.failureConverter.FailureToError(failure) +} + +// GetSummary returns summary of the activity. See ClientStartActivityOptions.Summary. Returns empty string if there is no summary. +// Uses the data converter of the client used to make the Describe call. Returns error if data conversion fails. +func (d *ClientActivityExecutionDescription) GetSummary() (string, error) { + if d.summary != "" { + return d.summary, nil + } + payload := d.RawExecutionInfo.GetUserMetadata().GetSummary() + if payload == nil { + return "", nil + } + var summary string + err := d.dataConverter.FromPayload(payload, &summary) + if err != nil { + return "", err + } + d.summary = summary + return summary, nil +} + +// GetDetails returns details of the activity. See ClientStartActivityOptions.Details. Returns empty string if there are no details. +// Uses the data converter of the client used to make the Describe call. Returns error if data conversion fails. +func (d *ClientActivityExecutionDescription) GetDetails() (string, error) { + if d.details != "" { + return d.details, nil + } + payload := d.RawExecutionInfo.GetUserMetadata().GetDetails() + if payload == nil { + return "", nil + } + var details string + err := d.dataConverter.FromPayload(payload, &details) + if err != nil { + return "", err + } + d.details = details + return details, nil +} + +func (h *clientActivityHandleImpl) GetID() string { + return h.id +} + +func (h *clientActivityHandleImpl) GetRunID() string { + return h.runID +} + +func (h *clientActivityHandleImpl) Get(ctx context.Context, valuePtr any) error { + if h.result != nil { + if h.result.Error != nil { + return h.result.Error + } + if h.result.Result != nil { + if valuePtr == nil { + return nil + } + return h.result.Result.Get(valuePtr) + } + } + if err := h.client.ensureInitialized(ctx); err != nil { + return err + } + + // repeatedly poll, the loop repeats until there's an outcome + for { + resp, err := h.client.interceptor.PollActivityResult(ctx, &ClientPollActivityResultInput{ + ActivityID: h.id, + RunID: h.runID, + }) + if err != nil { + return err + } + if resp.Error != nil { + h.result = &ClientPollActivityResultOutput{Error: resp.Error} + return resp.Error + } + if resp.Result != nil { + if valuePtr == nil { + return nil + } + h.result = &ClientPollActivityResultOutput{Result: resp.Result} + return resp.Result.Get(valuePtr) + } + } +} + +func (h *clientActivityHandleImpl) Describe(ctx context.Context, options ClientDescribeActivityOptions) (*ClientActivityExecutionDescription, error) { + if err := h.client.ensureInitialized(ctx); err != nil { + return nil, err + } + out, err := h.client.interceptor.DescribeActivity(ctx, &ClientDescribeActivityInput{ + ActivityID: h.id, + RunID: h.runID, + }) + if err != nil { + return nil, err + } + return out.Description, nil +} + +func (h *clientActivityHandleImpl) Cancel(ctx context.Context, options ClientCancelActivityOptions) error { + if err := h.client.ensureInitialized(ctx); err != nil { + return err + } + return h.client.interceptor.CancelActivity(ctx, &ClientCancelActivityInput{ + ActivityID: h.id, + RunID: h.runID, + Reason: options.Reason, + }) +} + +func (h *clientActivityHandleImpl) Terminate(ctx context.Context, options ClientTerminateActivityOptions) error { + if err := h.client.ensureInitialized(ctx); err != nil { + return err + } + return h.client.interceptor.TerminateActivity(ctx, &ClientTerminateActivityInput{ + ActivityID: h.id, + RunID: h.runID, + Reason: options.Reason, + }) +} + +func (wc *WorkflowClient) ExecuteActivity(ctx context.Context, options ClientStartActivityOptions, activity any, args ...any) (ClientActivityHandle, error) { + if err := wc.ensureInitialized(ctx); err != nil { + return nil, err + } + + activityType, err := getValidatedActivityFunction(activity, args, wc.registry) + if err != nil { + return nil, err + } + + return wc.interceptor.ExecuteActivity(ctx, &ClientExecuteActivityInput{ + Options: &options, + ActivityType: activityType.Name, + Args: args, + }) +} + +func (wc *WorkflowClient) GetActivityHandle(options ClientGetActivityHandleOptions) ClientActivityHandle { + return wc.interceptor.GetActivityHandle((*ClientGetActivityHandleInput)(&options)) +} + +func (wc *WorkflowClient) ListActivities(ctx context.Context, options ClientListActivitiesOptions) (ClientListActivitiesResult, error) { + return ClientListActivitiesResult{ + Results: func(yield func(*ClientActivityExecutionInfo, error) bool) { + if err := wc.ensureInitialized(ctx); err != nil { + yield(nil, err) + return + } + + request := &workflowservice.ListActivityExecutionsRequest{ + Namespace: wc.namespace, + Query: options.Query, + } + + for { + resp, err := wc.getListActivitiesPage(ctx, request) + if err != nil { + yield(nil, err) + return + } + + for _, ex := range resp.Executions { + if !yield(&ClientActivityExecutionInfo{ + RawExecutionListInfo: ex, + ActivityID: ex.ActivityId, + ActivityRunID: ex.RunId, + ActivityType: ex.ActivityType.GetName(), + ScheduleTime: ex.ScheduleTime.AsTime(), + CloseTime: ex.CloseTime.AsTime(), + Status: ex.Status, + TypedSearchAttributes: convertToTypedSearchAttributes(wc.logger, ex.SearchAttributes.IndexedFields), + TaskQueue: ex.TaskQueue, + ExecutionDuration: ex.ExecutionDuration.AsDuration(), + }, nil) { + return + } + } + + if resp.NextPageToken != nil { + request.NextPageToken = resp.NextPageToken + } else { + return + } + } + }, + }, nil +} + +func (wc *WorkflowClient) getListActivitiesPage(ctx context.Context, request *workflowservice.ListActivityExecutionsRequest) (*workflowservice.ListActivityExecutionsResponse, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + return wc.WorkflowService().ListActivityExecutions(grpcCtx, request) +} + +func (wc *WorkflowClient) CountActivities(ctx context.Context, options ClientCountActivitiesOptions) (*ClientCountActivitiesResult, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.CountActivityExecutionsRequest{ + Namespace: wc.namespace, + Query: options.Query, + } + resp, err := wc.WorkflowService().CountActivityExecutions(grpcCtx, request) + if err != nil { + return nil, err + } + + groups := make([]ClientCountActivitiesAggregationGroup, len(resp.Groups)) + for i, group := range resp.Groups { + groupValues := make([]any, len(group.GroupValues)) + for j, groupValue := range group.GroupValues { + // should never fail, and if it does, leaving nil behind + _ = converter.GetDefaultDataConverter().FromPayload(groupValue, &groupValues[j]) + } + groups[i] = ClientCountActivitiesAggregationGroup{ + GroupValues: groupValues, + Count: group.Count, + } + } + + return &ClientCountActivitiesResult{ + Count: resp.Count, + Groups: groups, + }, nil +} + +func (w *workflowClientInterceptor) ExecuteActivity( + ctx context.Context, + in *ClientExecuteActivityInput, +) (ClientActivityHandle, error) { + ctx = contextWithNewHeader(ctx) + dataConverter := WithContext(ctx, w.client.dataConverter) + if dataConverter == nil { + dataConverter = converter.GetDefaultDataConverter() + } + + request := &workflowservice.StartActivityExecutionRequest{ + Namespace: w.client.namespace, + Identity: w.client.identity, + RequestId: uuid.NewString(), + ActivityType: &commonpb.ActivityType{Name: in.ActivityType}, + } + var err error + if err = in.Options.validateAndSetInRequest(request, dataConverter); err != nil { + return nil, err + } + if request.Input, err = encodeArgs(dataConverter, in.Args); err != nil { + return nil, err + } + if request.Header, err = headerPropagated(ctx, w.client.contextPropagators); err != nil { + return nil, err + } + + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + resp, err := w.client.WorkflowService().StartActivityExecution(grpcCtx, request) + + var runID string + if err != nil { + return nil, err + } else { + runID = resp.RunId + } + + return &clientActivityHandleImpl{ + client: w.client, + id: in.Options.ID, + runID: runID, + }, nil +} + +func (options *ClientStartActivityOptions) validateAndSetInRequest(request *workflowservice.StartActivityExecutionRequest, dataConverter converter.DataConverter) error { + if options.ID == "" { + return errors.New("activity ID is required") + } + if options.TaskQueue == "" { + return errors.New("task queue is required") + } + if options.ScheduleToCloseTimeout < 0 { + return errors.New("negative ScheduleToCloseTimeout") + } + if options.StartToCloseTimeout < 0 { + return errors.New("negative StartToCloseTimeout") + } + if options.StartToCloseTimeout == 0 && options.ScheduleToCloseTimeout == 0 { + return errors.New("at least one of ScheduleToCloseTimeout and StartToCloseTimeout is required") + } + searchAttrs, err := serializeTypedSearchAttributes(options.TypedSearchAttributes.GetUntypedValues()) + if err != nil { + return err + } + userMetadata, err := buildUserMetadata(options.Summary, options.Details, dataConverter) + if err != nil { + return err + } + + request.ActivityId = options.ID + request.TaskQueue = &taskqueuepb.TaskQueue{Name: options.TaskQueue} + request.ScheduleToCloseTimeout = durationpb.New(options.ScheduleToCloseTimeout) + request.ScheduleToStartTimeout = durationpb.New(options.ScheduleToStartTimeout) + request.StartToCloseTimeout = durationpb.New(options.StartToCloseTimeout) + request.HeartbeatTimeout = durationpb.New(options.HeartbeatTimeout) + request.RetryPolicy = convertToPBRetryPolicy(options.RetryPolicy) + request.IdReusePolicy = options.ActivityIDReusePolicy + request.IdConflictPolicy = options.ActivityIDConflictPolicy + request.SearchAttributes = searchAttrs + request.UserMetadata = userMetadata + request.Priority = convertToPBPriority(options.Priority) + return nil +} + +func (w *workflowClientInterceptor) GetActivityHandle( + in *ClientGetActivityHandleInput, +) ClientActivityHandle { + return &clientActivityHandleImpl{ + client: w.client, + id: in.ActivityID, + runID: in.RunID, + } +} + +func (w *workflowClientInterceptor) PollActivityResult( + ctx context.Context, + in *ClientPollActivityResultInput, +) (*ClientPollActivityResultOutput, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx), grpcLongPoll(true)) + defer cancel() + + request := &workflowservice.PollActivityExecutionRequest{ + Namespace: w.client.namespace, + ActivityId: in.ActivityID, + RunId: in.RunID, + } + + var resp *workflowservice.PollActivityExecutionResponse + for resp.GetOutcome() == nil { + var err error + resp, err = w.client.WorkflowService().PollActivityExecution(grpcCtx, request) + if err != nil { + return nil, err + } + } + + switch v := resp.GetOutcome().GetValue().(type) { + case *activitypb.ActivityExecutionOutcome_Result: + return &ClientPollActivityResultOutput{Result: newEncodedValue(v.Result, w.client.dataConverter)}, nil + case *activitypb.ActivityExecutionOutcome_Failure: + return &ClientPollActivityResultOutput{Error: w.client.failureConverter.FailureToError(v.Failure)}, nil + default: + return nil, fmt.Errorf("unexpected activity outcome type: %T", v) + } +} + +func (w *workflowClientInterceptor) DescribeActivity( + ctx context.Context, + in *ClientDescribeActivityInput, +) (*ClientDescribeActivityOutput, error) { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.DescribeActivityExecutionRequest{ + Namespace: w.client.namespace, + ActivityId: in.ActivityID, + RunId: in.RunID, + } + resp, err := w.client.WorkflowService().DescribeActivityExecution(grpcCtx, request) + if err != nil { + return nil, err + } + info := resp.GetInfo() + if info == nil { + return nil, errors.New("DescribeActivityExecution response doesn't contain info") + } + + var lastDeploymentVersion *WorkerDeploymentVersion + if info.LastDeploymentVersion != nil { + v := workerDeploymentVersionFromProto(info.LastDeploymentVersion) + lastDeploymentVersion = &v + } + + return &ClientDescribeActivityOutput{ + Description: &ClientActivityExecutionDescription{ + ClientActivityExecutionInfo: ClientActivityExecutionInfo{ + RawExecutionListInfo: nil, + ActivityID: info.ActivityId, + ActivityRunID: info.RunId, + ActivityType: info.ActivityType.GetName(), + ScheduleTime: info.ScheduleTime.AsTime(), + CloseTime: info.CloseTime.AsTime(), + Status: info.Status, + TypedSearchAttributes: convertToTypedSearchAttributes(w.client.logger, info.SearchAttributes.IndexedFields), + TaskQueue: info.TaskQueue, + ExecutionDuration: info.ExecutionDuration.AsDuration(), + }, + RawExecutionInfo: info, + RunState: info.RunState, + LastHeartbeatTime: info.LastHeartbeatTime.AsTime(), + LastStartedTime: info.LastStartedTime.AsTime(), + Attempt: info.Attempt, + RetryPolicy: convertFromPBRetryPolicy(info.RetryPolicy), + ExpirationTime: info.ExpirationTime.AsTime(), + LastWorkerIdentity: info.LastWorkerIdentity, + CurrentRetryInterval: info.CurrentRetryInterval.AsDuration(), + LastAttemptCompleteTime: info.LastAttemptCompleteTime.AsTime(), + NextAttemptScheduleTime: info.NextAttemptScheduleTime.AsTime(), + LastDeploymentVersion: lastDeploymentVersion, + Priority: convertFromPBPriority(info.Priority), + CanceledReason: info.CanceledReason, + dataConverter: WithContext(ctx, w.client.dataConverter), + failureConverter: w.client.failureConverter, + }, + }, nil +} + +func (w *workflowClientInterceptor) CancelActivity( + ctx context.Context, + in *ClientCancelActivityInput, +) error { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.RequestCancelActivityExecutionRequest{ + Namespace: w.client.namespace, + ActivityId: in.ActivityID, + RunId: in.RunID, + Identity: w.client.identity, + RequestId: uuid.NewString(), + Reason: in.Reason, + } + _, err := w.client.WorkflowService().RequestCancelActivityExecution(grpcCtx, request) + return err +} + +func (w *workflowClientInterceptor) TerminateActivity( + ctx context.Context, + in *ClientTerminateActivityInput, +) error { + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.TerminateActivityExecutionRequest{ + Namespace: w.client.namespace, + ActivityId: in.ActivityID, + RunId: in.RunID, + Identity: w.client.identity, + RequestId: uuid.NewString(), + Reason: in.Reason, + } + _, err := w.client.WorkflowService().TerminateActivityExecution(grpcCtx, request) + return err +} diff --git a/internal/internal_logging_tags.go b/internal/internal_logging_tags.go index 8a2ccc3f3..2e009b17a 100644 --- a/internal/internal_logging_tags.go +++ b/internal/internal_logging_tags.go @@ -2,6 +2,7 @@ package internal const ( tagActivityID = "ActivityID" + tagActivityRunID = "ActivityRunID" tagActivityType = "ActivityType" tagNamespace = "Namespace" tagEventID = "EventID" diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 5c1759c48..c23bfda86 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -2246,12 +2246,22 @@ func newServiceInvoker( // Execute executes an implementation of the activity. func (ath *activityTaskHandlerImpl) Execute(taskQueue string, t *workflowservice.PollActivityTaskQueueResponse) (result interface{}, err error) { traceLog(func() { - ath.logger.Debug("Processing new activity task", - tagWorkflowID, t.WorkflowExecution.GetWorkflowId(), - tagRunID, t.WorkflowExecution.GetRunId(), - tagActivityType, t.ActivityType.GetName(), - tagAttempt, t.Attempt, - ) + if t.WorkflowExecution.GetWorkflowId() == "" { + ath.logger.Debug("Processing new standalone activity task", + tagActivityID, t.ActivityId, + tagActivityRunID, t.ActivityRunId, + tagActivityType, t.ActivityType.GetName(), + tagAttempt, t.Attempt, + ) + } else { + ath.logger.Debug("Processing new workflow activity task", + tagWorkflowID, t.WorkflowExecution.GetWorkflowId(), + tagRunID, t.WorkflowExecution.GetRunId(), + tagActivityID, t.ActivityId, + tagActivityType, t.ActivityType.GetName(), + tagAttempt, t.Attempt, + ) + } }) // The root context is only cancelled when the worker is finished shutting down. rootCtx := ath.backgroundContext diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 7c15c2d4c..6298c394f 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1145,7 +1145,7 @@ func getNamespaceFromActivityCtx(ctx context.Context) string { if env == nil { return "" } - return env.workflowNamespace + return env.namespace } func getActivityEnvironmentFromCtx(ctx context.Context) *activityEnvironment { diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 357046934..4f2ae1493 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -53,7 +53,7 @@ type ( // LocalActivityResultHandler that returns local activity result LocalActivityResultHandler func(lar *LocalActivityResultWrapper) - // LocalActivityResultWrapper contains result of a local activity + // LocalActivityResultWrapper contains the result of a local activity LocalActivityResultWrapper struct { Err error Result *commonpb.Payloads diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 3c0db0c9a..02728da68 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -481,7 +481,7 @@ func (wc *WorkflowClient) CompleteActivity(ctx context.Context, taskToken []byte return reportActivityComplete(ctx, wc.workflowService, request, wc.metricsHandler) } -// CompleteActivityByID reports activity completed. Similar to CompleteActivity +// CompleteActivityByID reports workflow activity completed. Similar to CompleteActivity // It takes namespace name, workflowID, runID, activityID as arguments. func (wc *WorkflowClient) CompleteActivityByID(ctx context.Context, namespace, workflowID, runID, activityID string, result interface{}, err error, @@ -507,6 +507,31 @@ func (wc *WorkflowClient) CompleteActivityByID(ctx context.Context, namespace, w return reportActivityCompleteByID(ctx, wc.workflowService, request, wc.metricsHandler) } +// CompleteActivityByActivityID reports standalone activity completed. Similar to CompleteActivity +func (wc *WorkflowClient) CompleteActivityByActivityID(ctx context.Context, namespace, activityID, activityRunID string, + result interface{}, err error, +) error { + if activityID == "" || namespace == "" { + return errors.New("empty activity id or namespace") + } + + dataConverter := WithContext(ctx, wc.dataConverter) + var data *commonpb.Payloads + if result != nil { + var err0 error + data, err0 = encodeArg(dataConverter, result) + if err0 != nil { + return err0 + } + } + + // We do allow canceled error to be passed here + cancelAllowed := true + request := convertActivityResultToRespondRequestByID(wc.identity, namespace, "", activityRunID, activityID, + data, err, wc.dataConverter, wc.failureConverter, cancelAllowed) + return reportActivityCompleteByID(ctx, wc.workflowService, request, wc.metricsHandler) +} + // RecordActivityHeartbeat records heartbeat for an activity. func (wc *WorkflowClient) RecordActivityHeartbeat(ctx context.Context, taskToken []byte, details ...interface{}) error { if err := wc.ensureInitialized(ctx); err != nil { diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 833d67911..fff6bd754 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -66,11 +66,9 @@ type ( testActivityHandle struct { callback ResultHandler - activityType string heartbeatDetails *commonpb.Payloads - // Timeout configuration - heartbeatTimeout time.Duration - startToCloseTimeout time.Duration + token testActivityToken + task *workflowservice.PollActivityTaskQueueResponse // Timeout tracking startTime time.Time // when activity started executing lastHeartbeatTime time.Time @@ -184,7 +182,7 @@ type ( header *commonpb.Header counterID int64 - activities map[string]*testActivityHandle + activities map[testActivityToken]*testActivityHandle localActivities map[string]*localActivityTask timers map[string]*testTimerHandle runningWorkflows map[string]*testWorkflowHandle @@ -249,7 +247,8 @@ type ( sessionEnvironment *testSessionEnvironmentImpl // True if this was created only for testing activities not workflows. - activityEnvOnly bool + activityEnvOnly bool + executeActivitiesInWorkflow bool workflowFunctionExecuting bool bufferedUpdateRequests map[string][]func() @@ -268,6 +267,11 @@ type ( env *testWorkflowEnvironmentImpl updateID string } + + testActivityToken struct { + activityID string + runID string + } ) func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *registry) *testWorkflowEnvironmentImpl { @@ -288,7 +292,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist mockClock: clock.NewMock(), wallClock: clock.New(), timers: make(map[string]*testTimerHandle), - activities: make(map[string]*testActivityHandle), + activities: make(map[testActivityToken]*testActivityHandle), localActivities: make(map[string]*localActivityTask), runningWorkflows: make(map[string]*testWorkflowHandle), runningNexusOperations: make(map[int64]*testNexusOperationHandle), @@ -319,13 +323,14 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist changeVersions: make(map[string]Version), openSessions: make(map[string]*SessionInfo), - doneChannel: make(chan struct{}), - workerStopChannel: make(chan struct{}), - dataConverter: converter.GetDefaultDataConverter(), - failureConverter: GetDefaultFailureConverter(), - runTimeout: maxWorkflowTimeout, - bufferedUpdateRequests: make(map[string][]func()), - sdkFlags: newSDKFlagSet(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), + doneChannel: make(chan struct{}), + workerStopChannel: make(chan struct{}), + dataConverter: converter.GetDefaultDataConverter(), + failureConverter: GetDefaultFailureConverter(), + runTimeout: maxWorkflowTimeout, + bufferedUpdateRequests: make(map[string][]func()), + sdkFlags: newSDKFlagSet(&workflowservice.GetSystemInfoResponse_Capabilities{SdkMetadata: true}), + executeActivitiesInWorkflow: true, } if debugMode { @@ -353,19 +358,23 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist mockService := workflowservicemock.NewMockWorkflowServiceClient(mockCtrl) mockHeartbeatFn := func(c context.Context, r *workflowservice.RecordActivityTaskHeartbeatRequest, opts ...grpc.CallOption) error { - activityID := ActivityID{id: string(r.TaskToken)} + token, ok := activityTokenFromBytes(r.TaskToken) + if !ok { + env.logger.Debug("RecordActivityTaskHeartbeat: Invalid activity token.") + return serviceerror.NewNotFound("") + } env.locker.Lock() // need lock as this is running in activity worker's goroutinue - activityHandle, ok := env.getActivityHandle(activityID.id, GetActivityInfo(c).WorkflowExecution.RunID) + activityHandle, ok := env.getActivityHandle(token) if !ok { env.locker.Unlock() - env.logger.Debug("RecordActivityTaskHeartbeat: ActivityID not found, could be already completed or canceled.", - tagActivityID, activityID) + env.logger.Debug("RecordActivityTaskHeartbeat: Activity token not found, could be already completed or canceled.", + tagActivityID, token.activityID) return serviceerror.NewNotFound("") } activityHandle.heartbeatDetails = r.Details activityHandle.lastHeartbeatTime = time.Now() env.locker.Unlock() - activityInfo := env.getActivityInfo(activityID, activityHandle.activityType) + activityInfo := activityHandle.getActivityInfo() if env.onActivityHeartbeatListener != nil { // If we're only in an activity environment, posted callbacks are not // invoked @@ -378,7 +387,7 @@ func newTestWorkflowEnvironmentImpl(s *WorkflowTestSuite, parentRegistry *regist } } - env.logger.Debug("RecordActivityTaskHeartbeat", tagActivityID, activityID) + env.logger.Debug("RecordActivityTaskHeartbeat", tagActivityID, token.activityID) return nil } @@ -727,21 +736,22 @@ func (env *testWorkflowEnvironmentImpl) executeActivity( if workflowType == workflowTypeNotSpecified { workflowType = "0" } - task := newTestActivityTask( - env.workflowInfo.WorkflowExecution.ID, - env.workflowInfo.WorkflowExecution.RunID, - workflowType, - env.workflowInfo.Namespace, - scheduleTaskAttr, - ) - + task := newTestActivityTask(env.workflowInfo.Namespace, scheduleTaskAttr) + if env.executeActivitiesInWorkflow { + task.WorkflowExecution = &commonpb.WorkflowExecution{ + WorkflowId: env.workflowInfo.WorkflowExecution.ID, + RunId: env.workflowInfo.WorkflowExecution.RunID, + } + task.WorkflowType = &commonpb.WorkflowType{Name: workflowType} + } else { + task.ActivityRunId = getStringID(env.nextID()) + } task.HeartbeatDetails = env.heartbeatDetails // ensure activityFn is registered to defaultTestTaskQueue taskHandler := env.newTestActivityTaskHandler(defaultTestTaskQueue, env.GetDataConverter()) - activityHandle := &testActivityHandle{callback: func(result *commonpb.Payloads, err error) {}, activityType: parameters.ActivityType.Name} - activityID := ActivityID{id: scheduleTaskAttr.GetActivityId()} - env.setActivityHandle(activityID.id, env.workflowInfo.WorkflowExecution.RunID, activityHandle) + env.addNewActivityHandle(task, func(result *commonpb.Payloads, err error) {}) + activityID := ActivityID{id: task.ActivityId} result, err := taskHandler.Execute(defaultTestTaskQueue, task) if err != nil { @@ -978,14 +988,15 @@ func (env *testWorkflowEnvironmentImpl) postCallback(cb func(), startWorkflowTas } func (env *testWorkflowEnvironmentImpl) RequestCancelActivity(activityID ActivityID) { - handle, ok := env.getActivityHandle(activityID.id, env.workflowInfo.WorkflowExecution.RunID) + token := testActivityToken{activityID: activityID.id, runID: env.workflowInfo.WorkflowExecution.RunID} + handle, ok := env.getActivityHandle(token) if !ok { env.logger.Debug("RequestCancelActivity failed, Activity not exists or already completed.", tagActivityID, activityID) return } - activityInfo := env.getActivityInfo(activityID, handle.activityType) + activityInfo := handle.getActivityInfo() env.logger.Debug("RequestCancelActivity", tagActivityID, activityID) - env.deleteHandle(activityID.id, env.workflowInfo.WorkflowExecution.RunID) + env.deleteHandle(token) env.postCallback(func() { handle.callback(nil, NewCanceledError()) if env.onActivityCanceledListener != nil { @@ -1182,6 +1193,11 @@ func (env *testWorkflowEnvironmentImpl) CompleteActivity(taskToken []byte, resul if taskToken == nil { return errors.New("nil task token provided") } + activityToken, ok := activityTokenFromBytes(taskToken) + if !ok { + return errors.New("invalid task token provided") + } + var data *commonpb.Payloads if result != nil { var encodeErr error @@ -1191,19 +1207,18 @@ func (env *testWorkflowEnvironmentImpl) CompleteActivity(taskToken []byte, resul } } - activityID := ActivityID{id: string(taskToken)} env.postCallback(func() { - activityHandle, ok := env.getActivityHandle(activityID.id, env.workflowInfo.WorkflowExecution.RunID) + activityHandle, ok := env.getActivityHandle(activityToken) if !ok { env.logger.Debug("CompleteActivity: ActivityID not found, could be already completed or canceled.", - tagActivityID, activityID) + tagActivityID, activityToken.activityID) return } // We do allow canceled error to be passed here cancelAllowed := true request := convertActivityResultToRespondRequest("test-identity", taskToken, data, err, env.GetDataConverter(), env.GetFailureConverter(), defaultTestNamespace, cancelAllowed, nil, nil, nil) - env.handleActivityResult(activityID, request, activityHandle.activityType, env.GetDataConverter()) + env.handleActivityResult(activityHandle, request, env.GetDataConverter()) }, false /* do not auto schedule workflow task, because activity might be still pending */) return nil @@ -1232,9 +1247,8 @@ func (env *testWorkflowEnvironmentImpl) GetContextPropagators() []ContextPropaga func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivityParams, callback ResultHandler) ActivityID { ensureDefaultRetryPolicy(¶meters) scheduleTaskAttr := &commandpb.ScheduleActivityTaskCommandAttributes{} - scheduleID := env.nextID() if parameters.ActivityID == "" { - scheduleTaskAttr.ActivityId = getStringID(scheduleID) + scheduleTaskAttr.ActivityId = getStringID(env.nextID()) } else { scheduleTaskAttr.ActivityId = parameters.ActivityID } @@ -1253,27 +1267,18 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivi callback(nil, err) return activityID } - task := newTestActivityTask( - env.workflowInfo.WorkflowExecution.ID, - env.workflowInfo.WorkflowExecution.RunID, - env.workflowInfo.WorkflowType.Name, - env.workflowInfo.Namespace, - scheduleTaskAttr, - ) - taskHandler := env.newTestActivityTaskHandler(parameters.TaskQueueName, parameters.DataConverter) - now := time.Now() - activityHandle := &testActivityHandle{ - callback: callback, - activityType: parameters.ActivityType.Name, - heartbeatTimeout: parameters.HeartbeatTimeout, - startToCloseTimeout: parameters.StartToCloseTimeout, - startTime: now, - lastHeartbeatTime: now, + task := newTestActivityTask(env.workflowInfo.Namespace, scheduleTaskAttr) + task.WorkflowExecution = &commonpb.WorkflowExecution{ + WorkflowId: env.workflowInfo.WorkflowExecution.ID, + RunId: env.workflowInfo.WorkflowExecution.RunID, } + task.WorkflowType = &commonpb.WorkflowType{Name: env.workflowInfo.WorkflowType.Name} - env.setActivityHandle(activityID.id, env.workflowInfo.WorkflowExecution.RunID, activityHandle) + taskHandler := env.newTestActivityTaskHandler(parameters.TaskQueueName, parameters.DataConverter) + activityHandle := env.addNewActivityHandle(task, callback) env.runningCount++ + activityToken := activityHandle.token // Start timeout monitoring if any timeout is configured var timeoutWatchDone chan struct{} @@ -1301,7 +1306,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivi return case <-ticker.C: env.locker.Lock() - handle, ok := env.getActivityHandle(activityID.id, env.workflowInfo.WorkflowExecution.RunID) + handle, ok := env.getActivityHandle(activityToken) if !ok { env.locker.Unlock() return // activity already completed @@ -1376,7 +1381,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivi // Check if any timeout occurred env.locker.Lock() - handle, ok := env.getActivityHandle(activityID.id, env.workflowInfo.WorkflowExecution.RunID) + handle, ok := env.getActivityHandle(activityToken) activityTimedOut := ok && handle.timedOut var timeoutType enumspb.TimeoutType var heartbeatDetails *commonpb.Payloads @@ -1393,7 +1398,7 @@ func (env *testWorkflowEnvironmentImpl) ExecuteActivity(parameters ExecuteActivi // post activity result to workflow dispatcher env.postCallback(func() { - env.handleActivityResult(activityID, result, parameters.ActivityType.Name, parameters.DataConverter) + env.handleActivityResult(activityHandle, result, parameters.DataConverter) env.runningCount-- }, false /* do not auto schedule workflow task, because activity might be still pending */) }() @@ -1556,23 +1561,49 @@ func (env *testWorkflowEnvironmentImpl) validateRetryPolicy(policy *commonpb.Ret return nil } -func (env *testWorkflowEnvironmentImpl) getActivityHandle(activityID, runID string) (*testActivityHandle, bool) { - handle, ok := env.activities[env.makeUniqueActivityID(activityID, runID)] +func (env *testWorkflowEnvironmentImpl) addNewActivityHandle(task *workflowservice.PollActivityTaskQueueResponse, callback func(result *commonpb.Payloads, err error)) *testActivityHandle { + token := testActivityToken{activityID: task.ActivityId} + if task.WorkflowExecution.GetWorkflowId() == "" { + token.runID = task.ActivityRunId + } else { + token.runID = task.WorkflowExecution.GetRunId() + } + task.TaskToken = token.toBytes() + + now := time.Now() + handle := &testActivityHandle{ + callback: callback, + heartbeatDetails: nil, + token: token, + task: task, + startTime: now, + lastHeartbeatTime: now, + } + + env.activities[token] = handle + return handle +} + +func (env *testWorkflowEnvironmentImpl) getActivityHandle(token testActivityToken) (*testActivityHandle, bool) { + handle, ok := env.activities[token] return handle, ok } -func (env *testWorkflowEnvironmentImpl) setActivityHandle(activityID, runID string, handle *testActivityHandle) { - env.activities[env.makeUniqueActivityID(activityID, runID)] = handle +func (env *testWorkflowEnvironmentImpl) deleteHandle(token testActivityToken) { + delete(env.activities, token) } -func (env *testWorkflowEnvironmentImpl) deleteHandle(activityID, runID string) { - delete(env.activities, env.makeUniqueActivityID(activityID, runID)) +func (t *testActivityToken) toBytes() []byte { + // we don't entirely control activity ID, so runID goes first to make reconstructing from bytes easier + return []byte(fmt.Sprintf("%v#%v", t.runID, t.activityID)) } -func (env *testWorkflowEnvironmentImpl) makeUniqueActivityID(activityID, runID string) string { - // ActivityID is unique per workflow, but different workflow could have same activityID. - // Make the key unique globally as we share the same collection for all running workflows in test. - return fmt.Sprintf("%v_%v", runID, activityID) +func activityTokenFromBytes(token []byte) (testActivityToken, bool) { + split := strings.SplitN(string(token), "#", 2) + if len(split) != 2 { + return testActivityToken{}, false + } + return testActivityToken{activityID: split[1], runID: split[0]}, true } func (env *testWorkflowEnvironmentImpl) executeActivityWithRetryForTest( @@ -1614,9 +1645,10 @@ func (env *testWorkflowEnvironmentImpl) executeActivityWithRetryForTest( env.registerDelayedCallback(func() { env.runningCount++ task.Attempt = task.GetAttempt() + 1 - activityID := ActivityID{id: string(task.TaskToken)} - if ah, ok := env.getActivityHandle(activityID.id, task.WorkflowExecution.RunId); ok { - task.HeartbeatDetails = ah.heartbeatDetails + if token, ok := activityTokenFromBytes(task.TaskToken); ok { + if ah, ok := env.getActivityHandle(token); ok { + task.HeartbeatDetails = ah.heartbeatDetails + } } close(waitCh) }, backoff) @@ -1730,11 +1762,13 @@ func (env *testWorkflowEnvironmentImpl) RequestCancelLocalActivity(activityID Lo task.cancel() } -func (env *testWorkflowEnvironmentImpl) handleActivityResult(activityID ActivityID, result interface{}, activityType string, +func (env *testWorkflowEnvironmentImpl) handleActivityResult(activityHandle *testActivityHandle, result interface{}, dataConverter converter.DataConverter) { + activityID := ActivityID{id: activityHandle.task.ActivityId} + activityType := activityHandle.task.ActivityType.Name env.logger.Debug(fmt.Sprintf("handleActivityResult: %T.", result), - tagActivityID, activityID, tagActivityType, activityType) - activityInfo := env.getActivityInfo(activityID, activityType) + tagActivityID, activityID.id, tagActivityType, activityType) + activityInfo := activityHandle.getActivityInfo() if result == ErrActivityResultPending { // In case activity returns ErrActivityResultPending, the respond will be nil, and we don't need to do anything. // Activity will need to complete asynchronously using CompleteActivity(). @@ -1745,14 +1779,12 @@ func (env *testWorkflowEnvironmentImpl) handleActivityResult(activityID Activity } // this is running in dispatcher - activityHandle, ok := env.getActivityHandle(activityID.id, activityInfo.WorkflowExecution.RunID) - if !ok { - env.logger.Debug("handleActivityResult: ActivityID not exists, could be already completed or canceled.", + if _, ok := env.getActivityHandle(activityHandle.token); !ok { + env.logger.Debug("handleActivityResult: Activity not found, could be already completed or canceled.", tagActivityID, activityID) return } - - env.deleteHandle(activityID.id, activityInfo.WorkflowExecution.RunID) + env.deleteHandle(activityHandle.token) var blob *commonpb.Payloads var err error @@ -1842,7 +1874,14 @@ func (env *testWorkflowEnvironmentImpl) handleLocalActivityResult(result *localA env.logger.Debug(fmt.Sprintf("handleLocalActivityResult: Err: %v, Result: %v.", result.err, result.result), tagActivityID, activityID, tagActivityType, activityType) - activityInfo := env.getActivityInfo(activityID, activityType) + activityInfo := &ActivityInfo{ + ActivityID: activityID.id, + ActivityType: ActivityType{Name: activityType}, + TaskToken: []byte(activityID.id), + WorkflowExecution: env.workflowInfo.WorkflowExecution, + Attempt: 1, + } + task, ok := env.localActivities[activityID.id] if !ok { env.logger.Debug("handleLocalActivityResult: ActivityID not exists, could be already completed or canceled.", @@ -1915,12 +1954,14 @@ func (env *testWorkflowEnvironmentImpl) runBeforeMockCallReturns(call *MockCallW // Execute executes the activity code. func (a *activityExecutorWrapper) Execute(ctx context.Context, input *commonpb.Payloads) (*commonpb.Payloads, error) { activityInfo := GetActivityInfo(ctx) - // If the activity was cancelled before it starts here, we do not execute and - // instead return cancelled - a.env.locker.Lock() - _, handleExists := a.env.getActivityHandle(activityInfo.ActivityID, activityInfo.WorkflowExecution.RunID) - a.env.locker.Unlock() - if !handleExists { + token, ok := activityTokenFromBytes(activityInfo.TaskToken) + // If activity handle cannot be found, we assume it was cancelled + if ok { + a.env.locker.Lock() + _, ok = a.env.getActivityHandle(token) + a.env.locker.Unlock() + } + if !ok { return nil, NewCanceledError() } @@ -2279,18 +2320,11 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskQueue str return taskHandler } -func newTestActivityTask(workflowID, runID, workflowTypeName, namespace string, - attr *commandpb.ScheduleActivityTaskCommandAttributes) *workflowservice.PollActivityTaskQueueResponse { - activityID := attr.GetActivityId() +func newTestActivityTask(namespace string, attr *commandpb.ScheduleActivityTaskCommandAttributes) *workflowservice.PollActivityTaskQueueResponse { now := time.Now() - task := &workflowservice.PollActivityTaskQueueResponse{ - Attempt: 1, - WorkflowExecution: &commonpb.WorkflowExecution{ - WorkflowId: workflowID, - RunId: runID, - }, - ActivityId: activityID, - TaskToken: []byte(activityID), // use activityID as TaskToken so we can map TaskToken in heartbeat calls. + return &workflowservice.PollActivityTaskQueueResponse{ + Attempt: 1, + ActivityId: attr.GetActivityId(), ActivityType: &commonpb.ActivityType{Name: attr.GetActivityType().GetName()}, Input: attr.GetInput(), ScheduledTime: timestamppb.New(now), @@ -2298,14 +2332,10 @@ func newTestActivityTask(workflowID, runID, workflowTypeName, namespace string, StartedTime: timestamppb.New(now), StartToCloseTimeout: attr.GetStartToCloseTimeout(), HeartbeatTimeout: attr.GetHeartbeatTimeout(), - WorkflowType: &commonpb.WorkflowType{ - Name: workflowTypeName, - }, - WorkflowNamespace: namespace, - Header: attr.GetHeader(), - Priority: attr.Priority, + WorkflowNamespace: namespace, + Header: attr.GetHeader(), + Priority: attr.Priority, } - return task } func (env *testWorkflowEnvironmentImpl) newTimer( @@ -3091,13 +3121,17 @@ func (env *testWorkflowEnvironmentImpl) nextID() int64 { return activityID } -func (env *testWorkflowEnvironmentImpl) getActivityInfo(activityID ActivityID, activityType string) *ActivityInfo { +func (a *testActivityHandle) getActivityInfo() *ActivityInfo { return &ActivityInfo{ - ActivityID: activityID.id, - ActivityType: ActivityType{Name: activityType}, - TaskToken: []byte(activityID.id), - WorkflowExecution: env.workflowInfo.WorkflowExecution, - Attempt: 1, + ActivityID: a.task.ActivityId, + ActivityType: ActivityType{Name: a.task.ActivityType.GetName()}, + TaskToken: a.token.toBytes(), + WorkflowExecution: WorkflowExecution{ + ID: a.task.WorkflowExecution.GetWorkflowId(), + RunID: a.task.WorkflowExecution.GetRunId(), + }, + Attempt: a.task.Attempt, + ActivityRunID: a.task.ActivityRunId, } } diff --git a/internal/nexus_operations.go b/internal/nexus_operations.go index da3db97ca..36d8e40db 100644 --- a/internal/nexus_operations.go +++ b/internal/nexus_operations.go @@ -445,6 +445,11 @@ func (t *testSuiteClientForNexusOperations) CompleteActivityByID(ctx context.Con panic("not implemented in the test environment") } +// CompleteActivityByID implements Client. +func (t *testSuiteClientForNexusOperations) CompleteActivityByActivityID(ctx context.Context, namespace string, activityID string, activityRunID string, result interface{}, err error) error { + panic("not implemented in the test environment") +} + // CountWorkflow implements Client. func (t *testSuiteClientForNexusOperations) CountWorkflow(ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest) (*workflowservice.CountWorkflowExecutionsResponse, error) { panic("not implemented in the test environment") @@ -702,6 +707,22 @@ func (t *testSuiteClientForNexusOperations) UpdateWorkerVersioningRules(ctx cont panic("unimplemented in the test environment") } +func (t *testSuiteClientForNexusOperations) ExecuteActivity(ctx context.Context, options ClientStartActivityOptions, activity any, args ...any) (ClientActivityHandle, error) { + panic("unimplemented in the test environment") +} + +func (t *testSuiteClientForNexusOperations) GetActivityHandle(options ClientGetActivityHandleOptions) ClientActivityHandle { + panic("unimplemented in the test environment") +} + +func (t *testSuiteClientForNexusOperations) ListActivities(ctx context.Context, options ClientListActivitiesOptions) (ClientListActivitiesResult, error) { + panic("unimplemented in the test environment") +} + +func (t *testSuiteClientForNexusOperations) CountActivities(ctx context.Context, options ClientCountActivitiesOptions) (*ClientCountActivitiesResult, error) { + panic("unimplemented in the test environment") +} + // WorkflowService implements Client. func (t *testSuiteClientForNexusOperations) WorkflowService() workflowservice.WorkflowServiceClient { panic("not implemented in the test environment") diff --git a/internal/session.go b/internal/session.go index 0b08413b9..264fc858d 100644 --- a/internal/session.go +++ b/internal/session.go @@ -549,7 +549,7 @@ func (env *sessionEnvironmentImpl) AddSessionToken() { func (env *sessionEnvironmentImpl) SignalCreationResponse(ctx context.Context, sessionID string) error { activityEnv := getActivityEnv(ctx) - client := activityEnv.serviceInvoker.GetClient(ClientOptions{Namespace: activityEnv.workflowNamespace}) + client := activityEnv.serviceInvoker.GetClient(ClientOptions{Namespace: activityEnv.namespace}) return client.SignalWorkflow(ctx, activityEnv.workflowExecution.ID, activityEnv.workflowExecution.RunID, sessionID, env.getCreationResponse()) } diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 716bc9d6f..59f0d4db4 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -273,6 +273,17 @@ func (t *TestActivityEnvironment) SetOnActivityHeartbeatListener( return t } +// SetExecuteActivitiesInWorkflow controls the simulated environment in which the tested activity is being executed. +// This affects which fields are available in ActivityInfo (e.g. WorkflowExecution or ActivityRunID). +// Most activities run identically in both situations, so this setting is rarely needed. +// If set to true, the activity will be executed as if it was started by a workflow. +// If set to false, the activity will be executed as if it was started directly by a client. +// Defaults to true. +func (t *TestActivityEnvironment) SetExecuteActivitiesInWorkflow(executeActivitiesInWorkflow bool) *TestActivityEnvironment { + t.impl.executeActivitiesInWorkflow = executeActivitiesInWorkflow + return t +} + // RegisterWorkflow registers workflow implementation with the TestWorkflowEnvironment func (e *TestWorkflowEnvironment) RegisterWorkflow(w interface{}) { e.impl.RegisterWorkflow(w) diff --git a/internal/workflow_testsuite_test.go b/internal/workflow_testsuite_test.go index 8f02fd606..2f8af03bc 100644 --- a/internal/workflow_testsuite_test.go +++ b/internal/workflow_testsuite_test.go @@ -1233,3 +1233,49 @@ func TestDynamicWorkflows(t *testing.T) { require.NoError(t, err) require.Equal(t, "dynamic-activity - grape - cherry", result) } + +func checkActivityInfo(ctx context.Context, isWorkflowActivity bool) error { + info := GetActivityInfo(ctx) + if isWorkflowActivity { + if !info.IsWorkflowActivity() { + return fmt.Errorf("expected IsWorkflowActivity to be true") + } + if info.ActivityRunID != "" { + return fmt.Errorf("expected ActivityRunID to be empty") + } + } else { + if info.IsWorkflowActivity() { + return fmt.Errorf("expected IsWorkflowActivity to be false") + } + if info.ActivityRunID == "" { + return fmt.Errorf("expected ActivityRunID to be non-empty") + } + } + return nil +} + +func TestExecuteActivitiesInWorkflow(t *testing.T) { + testSuite := &WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + env.RegisterActivity(checkActivityInfo) + + // Default true + _, err := env.ExecuteActivity(checkActivityInfo, true) + require.NoError(t, err) + _, err = env.ExecuteActivity(checkActivityInfo, false) + require.Error(t, err) + + // Setting to false + env.SetExecuteActivitiesInWorkflow(false) + _, err = env.ExecuteActivity(checkActivityInfo, true) + require.Error(t, err) + _, err = env.ExecuteActivity(checkActivityInfo, false) + require.NoError(t, err) + + // Explicitly setting to true + env.SetExecuteActivitiesInWorkflow(true) + _, err = env.ExecuteActivity(checkActivityInfo, false) + require.Error(t, err) + _, err = env.ExecuteActivity(checkActivityInfo, true) + require.NoError(t, err) +} diff --git a/internalbindings/internalbindings.go b/internalbindings/internalbindings.go index b05398985..abd613419 100644 --- a/internalbindings/internalbindings.go +++ b/internalbindings/internalbindings.go @@ -41,7 +41,7 @@ type ( ExecuteLocalActivityOptions = internal.ExecuteLocalActivityOptions // LocalActivityResultHandler that returns local activity result LocalActivityResultHandler = internal.LocalActivityResultHandler - // LocalActivityResultWrapper contains result of a local activity + // LocalActivityResultWrapper contains the result of a local activity LocalActivityResultWrapper = internal.LocalActivityResultWrapper // ActivityType type of activity ActivityType = internal.ActivityType diff --git a/mocks/Client.go b/mocks/Client.go index d70ccd2ca..294bb020b 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -6,7 +6,6 @@ package mocks import ( "context" - "go.temporal.io/sdk/client" "go.temporal.io/api/enums/v1" @@ -113,6 +112,24 @@ func (_m *Client) CompleteActivityByID(ctx context.Context, namespace string, wo return r0 } +// CompleteActivityByID provides a mock function with given fields: ctx, namespace, activityID, activityRunID, result, err +func (_m *Client) CompleteActivityByActivityID(ctx context.Context, namespace string, activityID string, activityRunID string, result interface{}, err error) error { + ret := _m.Called(ctx, namespace, activityID, activityRunID, result, err) + + if len(ret) == 0 { + panic("no return value specified for CompleteActivityByActivityID") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, interface{}, error) error); ok { + r0 = rf(ctx, namespace, activityID, activityRunID, result, err) + } else { + r0 = ret.Error(0) + } + + return r0 +} + // CountWorkflow provides a mock function with given fields: ctx, request func (_m *Client) CountWorkflow(ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest) (*workflowservice.CountWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) @@ -1092,6 +1109,101 @@ func (_m *Client) DescribeWorkflow(ctx context.Context, workflowID string, runID return r0, r1 } +func (_m *Client) ExecuteActivity(ctx context.Context, options client.StartActivityOptions, activity any, args ...any) (client.ActivityHandle, error) { + var _ca []interface{} + _ca = append(_ca, ctx, options, activity) + _ca = append(_ca, args...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for ExecuteActivity") + } + + var r0 client.ActivityHandle + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.StartActivityOptions, interface{}, ...interface{}) (client.ActivityHandle, error)); ok { + return rf(ctx, options, activity, args...) + } + if rf, ok := ret.Get(0).(func(context.Context, client.StartActivityOptions, interface{}, ...interface{}) client.ActivityHandle); ok { + r0 = rf(ctx, options, activity, args...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(client.ActivityHandle) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.StartActivityOptions, interface{}, ...interface{}) error); ok { + r1 = rf(ctx, options, activity, args...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +func (_m *Client) GetActivityHandle(options client.GetActivityHandleOptions) client.ActivityHandle { + ret := _m.Called(options) + + if len(ret) == 0 { + panic("no return value specified for GetActivityHandle") + } + + var r0 client.ActivityHandle + if rf, ok := ret.Get(0).(func(client.GetActivityHandleOptions) client.ActivityHandle); ok { + r0 = rf(options) + } else { + r0 = ret.Get(0).(client.ActivityHandle) + } + + return r0 +} + +func (_m *Client) ListActivities(ctx context.Context, options client.ListActivitiesOptions) (client.ListActivitiesResult, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for ListActivities") + } + + var r0 client.ListActivitiesResult + if rf, ok := ret.Get(0).(func(context.Context, client.ListActivitiesOptions) client.ListActivitiesResult); ok { + r0 = rf(ctx, options) + } else { + r0 = ret.Get(0).(client.ListActivitiesResult) + } + + return r0, nil +} + +func (_m *Client) CountActivities(ctx context.Context, options client.CountActivitiesOptions) (*client.CountActivitiesResult, error) { + ret := _m.Called(ctx, options) + + if len(ret) == 0 { + panic("no return value specified for CountActivities") + } + + var r0 *client.CountActivitiesResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, client.CountActivitiesOptions) (*client.CountActivitiesResult, error)); ok { + return rf(ctx, options) + } + if rf, ok := ret.Get(0).(func(context.Context, client.CountActivitiesOptions) *client.CountActivitiesResult); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*client.CountActivitiesResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, client.CountActivitiesOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // WorkerDeploymentClient provides a mock function with given fields: func (_m *Client) WorkerDeploymentClient() client.WorkerDeploymentClient { ret := _m.Called() diff --git a/test/activity_test.go b/test/activity_test.go index 7c71cff1c..f1fc243a5 100644 --- a/test/activity_test.go +++ b/test/activity_test.go @@ -79,7 +79,7 @@ func (a *Activities) ActivityToBePaused(ctx context.Context, completeOnPause boo go func() { // Pause the activity activity.GetClient(ctx).WorkflowService().PauseActivity(context.Background(), &workflowservice.PauseActivityRequest{ - Namespace: info.WorkflowNamespace, + Namespace: info.Namespace, Execution: &commonpb.WorkflowExecution{ WorkflowId: info.WorkflowExecution.ID, RunId: info.WorkflowExecution.RunID, @@ -123,7 +123,7 @@ func (a *Activities) ActivityToBeReset(ctx context.Context, completeOnReset bool go func() { // Reset the activity activity.GetClient(ctx).WorkflowService().ResetActivity(context.Background(), &workflowservice.ResetActivityRequest{ - Namespace: info.WorkflowNamespace, + Namespace: info.Namespace, Execution: &commonpb.WorkflowExecution{ WorkflowId: info.WorkflowExecution.ID, RunId: info.WorkflowExecution.RunID, @@ -239,16 +239,22 @@ func (a *Activities) failNTimes(_ context.Context, times int, id int) error { func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQueue, wfType string, isLocalActivity bool, scheduleToCloseTimeout, startToCloseTimeout time.Duration, retryPolicy *temporal.RetryPolicy) error { a.append("inspectActivityInfo") if !activity.IsActivity(ctx) { - return fmt.Errorf("expected InActivity to return %v but got %v", true, activity.IsActivity(ctx)) + return fmt.Errorf("expected IsActivity to be true") } info := activity.GetInfo(ctx) + if info.Namespace != namespace { + return fmt.Errorf("expected namespace %v but got %v", namespace, info.Namespace) + } if info.WorkflowNamespace != namespace { - return fmt.Errorf("expected namespace %v but got %v", namespace, info.WorkflowNamespace) + return fmt.Errorf("expected WorkflowNamespace %v but got %v", namespace, info.WorkflowNamespace) } if info.WorkflowType == nil || info.WorkflowType.Name != wfType { return fmt.Errorf("expected workflowType %v but got %v", wfType, info.WorkflowType) } + if info.ActivityID == "" { + return fmt.Errorf("expected ActivityID to be non-empty") + } if info.TaskQueue != taskQueue { return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue) } @@ -273,6 +279,80 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue if !cmp.Equal(info.RetryPolicy, retryPolicy) { return fmt.Errorf("expected RetryPolicy %v but got %v", retryPolicy, info.RetryPolicy) } + + if !info.IsWorkflowActivity() { + return fmt.Errorf("expected IsWorkflowActivity to return true") + } + if info.WorkflowExecution.ID == "" { + return fmt.Errorf("expected WorkflowExecution.ID to be non-empty") + } + if info.WorkflowExecution.RunID == "" { + return fmt.Errorf("expected WorkflowExecution.RunID to be non-empty") + } + if info.ActivityRunID != "" { + return fmt.Errorf("expected ActivityRunID to be empty but got %v", info.ActivityRunID) + } + + return nil +} + +func (a *Activities) InspectActivityInfoNoWorkflow(ctx context.Context, namespace, activityID, taskQueue string, scheduleToCloseTimeout, startToCloseTimeout time.Duration, retryPolicy *temporal.RetryPolicy) error { + a.append("inspectActivityInfoNoWorkflow") + if !activity.IsActivity(ctx) { + return fmt.Errorf("expected IsActivity to be true") + } + + info := activity.GetInfo(ctx) + if info.Namespace != namespace { + return fmt.Errorf("expected namespace %v but got %v", namespace, info.Namespace) + } + if info.ActivityID != activityID { + return fmt.Errorf("expected ActivityID %v but got %v", activityID, info.ActivityID) + } + if info.ActivityRunID == "" { + return fmt.Errorf("expected ActivityRunID to be non-empty") + } + if info.TaskQueue != taskQueue { + return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue) + } + if info.Deadline.IsZero() { + return errors.New("expected non zero deadline") + } + if info.StartedTime.IsZero() { + return errors.New("expected non zero started time") + } + if info.ScheduledTime.IsZero() { + return errors.New("expected non zero scheduled time") + } + if info.ScheduleToCloseTimeout != scheduleToCloseTimeout { + return fmt.Errorf("expected ScheduleToCloseTimeout %v but got %v", scheduleToCloseTimeout, info.ScheduleToCloseTimeout) + } + if info.StartToCloseTimeout != startToCloseTimeout { + return fmt.Errorf("expected StartToCloseTimeout %v but got %v", startToCloseTimeout, info.StartToCloseTimeout) + } + if !cmp.Equal(info.RetryPolicy, retryPolicy) { + return fmt.Errorf("expected RetryPolicy %v but got %v", retryPolicy, info.RetryPolicy) + } + + if info.IsWorkflowActivity() { + return fmt.Errorf("expected IsWorkflowActivity to return false") + } + if info.WorkflowNamespace != "" { + return fmt.Errorf("expected WorkflowNamespace to be empty but got %v", info.WorkflowNamespace) + } + if info.WorkflowType != nil { + return fmt.Errorf("expected workflowType to be nil but got %v", info.WorkflowType) + } + if info.WorkflowExecution.ID != "" { + return fmt.Errorf("expected WorkflowExecution.ID to be empty but got %v", info.WorkflowExecution.ID) + } + if info.WorkflowExecution.RunID != "" { + return fmt.Errorf("expected WorkflowExecution.RunID to be empty but got %v", info.WorkflowExecution.RunID) + } + if info.IsLocalActivity { + return fmt.Errorf("expected IsLocalActivity to be false") + } + return nil } @@ -498,7 +578,7 @@ func (a *Activities) register(worker worker.Worker) { func (a *Activities) ClientFromActivity(ctx context.Context) error { activityClient := activity.GetClient(ctx) info := activity.GetInfo(ctx) - request := workflowservice.ListWorkflowExecutionsRequest{Namespace: info.WorkflowNamespace} + request := workflowservice.ListWorkflowExecutionsRequest{Namespace: info.Namespace} resp, err := activityClient.ListWorkflow(ctx, &request) if err != nil { return err diff --git a/test/go.mod b/test/go.mod index 25fd4a34b..f58dec949 100644 --- a/test/go.mod +++ b/test/go.mod @@ -15,7 +15,7 @@ require ( go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 - go.temporal.io/api v1.59.0 + go.temporal.io/api v1.62.0 go.temporal.io/sdk v1.29.1 go.temporal.io/sdk/contrib/opentelemetry v0.0.0-00010101000000-000000000000 go.temporal.io/sdk/contrib/opentracing v0.0.0-00010101000000-000000000000 diff --git a/test/go.sum b/test/go.sum index 252849014..7e3e564c3 100644 --- a/test/go.sum +++ b/test/go.sum @@ -172,8 +172,8 @@ go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2N go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= -go.temporal.io/api v1.59.0 h1:QUpAju1KKs9xBfGSI0Uwdyg06k6dRCJH+Zm3G1Jc9Vk= -go.temporal.io/api v1.59.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.0 h1:rh7LqqV+pxaLNwPLsFRZgYoDJ/NvCNDv0EnWe6oS7A4= +go.temporal.io/api v1.62.0/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= diff --git a/test/integration_test.go b/test/integration_test.go index 8df934b2f..9ce4821fc 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -2894,6 +2894,84 @@ func (ts *IntegrationTestSuite) TestInterceptorStartWithSignal() { ts.True(foundHandleSignal) } +func (ts *IntegrationTestSuite) TestInterceptorStandaloneActivity() { + if os.Getenv("DISABLE_STANDALONE_ACTIVITY_TESTS") != "" { + ts.T().SkipNow() + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + immediateActivity := func() (string, error) { return "result", nil } + ts.worker.RegisterActivityWithOptions(immediateActivity, activity.RegisterOptions{Name: "interceptorTestActivityImmediate"}) + + activityStarted := make(chan struct{}, 2) + waitActivity := func(ctx context.Context) error { + activityStarted <- struct{}{} + <-ctx.Done() + return ctx.Err() + } + ts.worker.RegisterActivityWithOptions(waitActivity, activity.RegisterOptions{Name: "interceptorTestActivityWait"}) + + makeOptions := func() client.StartActivityOptions { + return client.StartActivityOptions{ + ID: "interceptor-test-" + uuid.NewString(), + TaskQueue: ts.taskQueueName, + ScheduleToCloseTimeout: 30 * time.Second, + } + } + + // ExecuteActivity + Describe + handle1, err := ts.client.ExecuteActivity(ctx, makeOptions(), "interceptorTestActivityImmediate") + ts.NoError(err) + _, err = handle1.Describe(ctx, client.DescribeActivityOptions{}) + ts.NoError(err) + + // GetActivityHandle + _ = ts.client.GetActivityHandle(client.GetActivityHandleOptions{ + ActivityID: handle1.GetID(), + RunID: handle1.GetRunID(), + }) + + // Get (PollActivityResult) + var result string + err = handle1.Get(ctx, &result) + ts.NoError(err) + ts.Equal("result", result) + + // Cancel + handle2, err := ts.client.ExecuteActivity(ctx, makeOptions(), "interceptorTestActivityWait") + ts.NoError(err) + <-activityStarted + err = handle2.Cancel(ctx, client.CancelActivityOptions{Reason: "test cancel"}) + ts.NoError(err) + + // Terminate + handle3, err := ts.client.ExecuteActivity(ctx, makeOptions(), "interceptorTestActivityWait") + ts.NoError(err) + <-activityStarted + err = handle3.Terminate(ctx, client.TerminateActivityOptions{Reason: "test terminate"}) + ts.NoError(err) + + // Verify all 6 interceptor methods were called + expectedCalls := []string{ + "ClientOutboundInterceptor.ExecuteActivity", + "ClientOutboundInterceptor.GetActivityHandle", + "ClientOutboundInterceptor.DescribeActivity", + "ClientOutboundInterceptor.CancelActivity", + "ClientOutboundInterceptor.TerminateActivity", + "ClientOutboundInterceptor.PollActivityResult", + } + + recordedCalls := make(map[string]bool) + for _, call := range ts.interceptorCallRecorder.Calls() { + recordedCalls[call.Interface.Name()+"."+call.Method.Name] = true + } + + for _, expectedCall := range expectedCalls { + ts.True(recordedCalls[expectedCall], "Expected interceptor call %s was not recorded", expectedCall) + } +} + func (ts *IntegrationTestSuite) TestOpenTelemetryTracing() { ts.T().Skip("issue-1650: Otel Tracing intergation tests are flaky") ts.testOpenTelemetryTracing(true, false) @@ -7983,6 +8061,8 @@ func (ts *IntegrationTestSuite) TestMutableSideEffectSummary() { } func (ts *IntegrationTestSuite) TestGrpcMessageTooLarge() { + ts.T().Skip("temporal server 1.30 has different behavior") // see https://github.com/temporalio/temporal/pull/8610 + assertGrpcErrorInHistory := func(ctx context.Context, run client.WorkflowRun) { iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), true, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) for iter.HasNext() { @@ -8582,3 +8662,256 @@ func (ts *IntegrationTestSuite) TestSimplePluginDoNothing() { ts.NoError(run.Get(context.Background(), &res)) ts.Equal("workflow-success", res) } + +func (ts *IntegrationTestSuite) TestExecuteActivitySuite() { + if os.Getenv("DISABLE_STANDALONE_ACTIVITY_TESTS") != "" { + ts.T().SkipNow() + } + makeOptions := func() client.StartActivityOptions { + return client.StartActivityOptions{ + ID: uuid.NewString(), + TaskQueue: ts.taskQueueName, + ScheduleToCloseTimeout: 10 * time.Second, + } + } + + var activities Activities + + activityResultChan := make(chan string) + readFromChannelActivity := func() (string, error) { + return <-activityResultChan, nil + } + ts.worker.RegisterActivityWithOptions(readFromChannelActivity, activity.RegisterOptions{Name: "readFromChannelActivity"}) + ts.Run("Describe activity", func() { + timeBeforeStart := time.Now().Add(-time.Millisecond) + + options := makeOptions() + searchAttrKey := temporal.NewSearchAttributeKeyString("CustomStringField") + searchAttrValue := "CustomValue" + options.TypedSearchAttributes = temporal.NewSearchAttributes(searchAttrKey.ValueSet(searchAttrValue)) + options.Summary = "activity summary" + options.Details = "activity description" + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + handle, err := ts.client.ExecuteActivity(ctx, options, "readFromChannelActivity") + ts.NoError(err) + ts.Equal(options.ID, handle.GetID()) + ts.NotEmpty(handle.GetRunID()) + + description, err := handle.Describe(ctx, client.DescribeActivityOptions{}) + ts.NoError(err) + ts.Equal(enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, description.Status) + ts.Nil(description.RawExecutionListInfo) + ts.NotNil(description.RawExecutionInfo) + ts.Equal(options.ID, description.ActivityID) + ts.Equal(handle.GetRunID(), description.ActivityRunID) + ts.Equal("readFromChannelActivity", description.ActivityType) + ts.Equal(ts.taskQueueName, description.TaskQueue) + ts.EqualValues(1, description.Attempt) + ts.EqualValues(1, description.TypedSearchAttributes.Size()) + attr, hasAttr := description.TypedSearchAttributes.GetString(searchAttrKey) + ts.True(hasAttr) + ts.Equal(searchAttrValue, attr) + summary, err := description.GetSummary() + ts.NoError(err) + ts.Equal(options.Summary, summary) + details, err := description.GetDetails() + ts.NoError(err) + ts.Equal(options.Details, details) + + // ensure measurable amount of time passes, then complete activity + time.Sleep(100 * time.Millisecond) + activityResultChan <- "" + err = handle.Get(ctx, nil) + ts.NoError(err) + + description, err = handle.Describe(ctx, client.DescribeActivityOptions{}) + ts.NoError(err) + ts.Equal(enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, description.Status) + ts.Equal(options.ID, description.ActivityID) + ts.Equal(handle.GetRunID(), description.ActivityRunID) + ts.True(description.ScheduleTime.After(timeBeforeStart)) + ts.True(description.CloseTime.After(description.ScheduleTime)) + ts.Greater(description.ExecutionDuration, int64(0)) + ts.True(description.LastStartedTime.After(timeBeforeStart)) + ts.True(description.LastAttemptCompleteTime.After(description.ScheduleTime)) + }) + + ts.Run("Wait for activity result", func() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + handle, err := ts.client.ExecuteActivity(ctx, makeOptions(), "readFromChannelActivity") + ts.NoError(err) + + receivedResultChan := make(chan string) + go func() { + var result string + err := handle.Get(ctx, &result) + ts.NoError(err) + receivedResultChan <- result + }() + + select { + case <-receivedResultChan: + ts.Fail("received result too soon") + case <-time.After(time.Second): + // OK + } + + result := "result" + activityResultChan <- result + ts.Equal(result, <-receivedResultChan) + }) + + ts.Run("Execute activity with argument", func() { + argument := "argument" + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + handle, err := ts.client.ExecuteActivity(ctx, makeOptions(), activities.EchoString, argument) + ts.NoError(err) + + var result string + err = handle.Get(ctx, &result) + ts.NoError(err) + ts.Equal(argument, result) + }) + + activityStarted := make(chan struct{}, 2) + heartbeatUntilStopped := func(ctx context.Context, heartbeatFreq time.Duration) error { + activityStarted <- struct{}{} + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(heartbeatFreq): + activity.RecordHeartbeat(ctx) + } + } + } + ts.worker.RegisterActivityWithOptions(heartbeatUntilStopped, activity.RegisterOptions{Name: "heartbeatUntilStopped"}) + + ts.Run("Cancel activity", func() { + reason := "cancellation reason" + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + handle, err := ts.client.ExecuteActivity(ctx, makeOptions(), "heartbeatUntilStopped", 50*time.Millisecond) + ts.NoError(err) + + <-activityStarted + + err = handle.Cancel(ctx, client.CancelActivityOptions{Reason: reason}) + ts.NoError(err) + + var canceledErr *temporal.CanceledError + err = handle.Get(ctx, nil) + ts.ErrorAs(err, &canceledErr) + + description, err := handle.Describe(ctx, client.DescribeActivityOptions{}) + ts.NoError(err) + ts.Equal(enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, description.Status) + ts.Equal(reason, description.CanceledReason) + }) + + ts.Run("Terminate activity", func() { + reason := "termination reason" + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + handle, err := ts.client.ExecuteActivity(ctx, makeOptions(), "heartbeatUntilStopped", 50*time.Millisecond) + ts.NoError(err) + + <-activityStarted + + err = handle.Terminate(ctx, client.TerminateActivityOptions{Reason: reason}) + ts.NoError(err) + + var terminatedErr *temporal.TerminatedError + err = handle.Get(ctx, nil) + ts.ErrorAs(err, &terminatedErr) + + description, err := handle.Describe(ctx, client.DescribeActivityOptions{}) + ts.NoError(err) + ts.Equal(enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED, description.Status) + ts.Empty(description.CanceledReason) + }) + + ts.Run("Inspect activity info", func() { + var w Workflows + + options := makeOptions() + options.StartToCloseTimeout = 5 * time.Second + options.RetryPolicy = w.defaultRetryPolicy() + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + handle, err := ts.client.ExecuteActivity(ctx, options, activities.InspectActivityInfoNoWorkflow, ts.config.Namespace, + options.ID, ts.taskQueueName, options.ScheduleToCloseTimeout, options.StartToCloseTimeout, options.RetryPolicy) + ts.NoError(err) + ts.NoError(handle.Get(ctx, nil)) + }) + + ts.Run("GetActivityHandle", func() { + options := makeOptions() + options.ActivityIDReusePolicy = enumspb.ACTIVITY_ID_REUSE_POLICY_ALLOW_DUPLICATE + + executeActivity := func(ctx context.Context, argument string) client.ActivityHandle { + handle, err := ts.client.ExecuteActivity(ctx, options, activities.EchoString, argument) + ts.NoError(err) + var result string + err = handle.Get(ctx, &result) + ts.NoError(err) + ts.Equal(argument, result) + return handle + } + + testGetHandle := func(ctx context.Context, runId string, expectedResult string) { + handleOptions := client.GetActivityHandleOptions{ + ActivityID: options.ID, + RunID: runId, + } + handle := ts.client.GetActivityHandle(handleOptions) + var result string + err := handle.Get(ctx, &result) + ts.NoError(err) + ts.Equal(expectedResult, result) + + description, err := handle.Describe(ctx, client.DescribeActivityOptions{}) + ts.NoError(err) + ts.Equal(options.ID, description.ActivityID) + if runId != "" { + ts.Equal(runId, description.ActivityRunID) + } + } + + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + argument1 := "argument1" + act1Handle := executeActivity(ctx, argument1) + testGetHandle(ctx, "", argument1) + + argument2 := "argument2" + act2Handle := executeActivity(ctx, argument2) + testGetHandle(ctx, "", argument2) + + testGetHandle(ctx, act1Handle.GetRunID(), argument1) + testGetHandle(ctx, act2Handle.GetRunID(), argument2) + }) + + ts.Run("Activity result timeout", func() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + handle, err := ts.client.ExecuteActivity(ctx, makeOptions(), "readFromChannelActivity") + ts.NoError(err) + + getCtx, getCancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer getCancel() + err = handle.Get(getCtx, nil) + var serviceErr *serviceerror.DeadlineExceeded + ts.True(errors.Is(err, context.DeadlineExceeded) || errors.As(err, &serviceErr)) + activityResultChan <- "" // allow activity to complete + }) +}