Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
48c9392
Update Nexus error model to use Temporal failures
pdoerner Dec 11, 2025
bc39bbc
Upgrade SDK dep to 1.38
pdoerner Dec 16, 2025
e0ebac6
Set capabilities
pdoerner Dec 16, 2025
6a6ea6e
fix writeFailure
pdoerner Dec 16, 2025
273c5eb
Bug fixes and test updates
pdoerner Dec 17, 2025
6dbbfed
lint
pdoerner Jan 7, 2026
781bc00
Add handling for StartOperationResponse_Failure
pdoerner Jan 16, 2026
699b0ff
Upgrade api
bergundy Jan 23, 2026
2c9cb58
Fix conversion from nexus failure to temporal failure
bergundy Jan 23, 2026
7671e01
Allow preserving more information for operation errors
bergundy Jan 24, 2026
c0eb657
Fix retryable flag
bergundy Jan 24, 2026
d642560
Various edge cases and better handling of operation errors
bergundy Jan 26, 2026
7d11cc8
Revert back to original error message
bergundy Jan 26, 2026
64da4df
Lint and some self-review
bergundy Jan 26, 2026
d6e1b1f
Test fixes
bergundy Jan 26, 2026
9083b1d
Minor stuff
bergundy Jan 27, 2026
a5bec1e
Merge branch 'main' into nexus-error-model
bergundy Jan 30, 2026
6a65c45
Merge remote-tracking branch 'origin/main' into nexus-error-model
bergundy Feb 2, 2026
53d941d
Bump API
bergundy Feb 3, 2026
4b11275
wip
bergundy Feb 3, 2026
d76ef5a
Fix tests and compat with generic Nexus
bergundy Feb 4, 2026
033df6d
Self review
bergundy Feb 4, 2026
f6b7aa8
Inline the failure converter from the Nexus SDK and upgrade the Nexus…
bergundy Feb 10, 2026
eaac970
Refactor
bergundy Feb 11, 2026
ec8e7ae
Replace OperationCompletion with CompleteOperationOptions
bergundy Feb 11, 2026
1a0ce0a
Separate client instantiation and function invocation
bergundy Feb 11, 2026
1709d69
Self review
bergundy Feb 11, 2026
1b58f0f
Merge remote-tracking branch 'origin/main' into nexus-error-model
bergundy Feb 11, 2026
f7a6fda
Bump Nexus SDK
bergundy Feb 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 95 additions & 70 deletions api/matchingservice/v1/request_response.pb.go

Large diffs are not rendered by default.

44 changes: 20 additions & 24 deletions chasm/lib/callback/chasm_invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/base64"
"fmt"
"io"

"github.com/google/uuid"
"github.com/nexus-rpc/sdk-go/nexus"
Expand All @@ -28,7 +27,7 @@ import (
type chasmInvocation struct {
nexus *callbackspb.Callback_Nexus
attempt int32
completion nexusrpc.OperationCompletion
completion nexusrpc.CompleteOperationOptions
requestID string
}

Expand Down Expand Up @@ -135,47 +134,44 @@ func (c chasmInvocation) getHistoryRequest(
RequestId: c.requestID,
}

switch op := c.completion.(type) {
case *nexusrpc.OperationCompletionSuccessful:
payloadBody, err := io.ReadAll(op.Reader)
if err != nil {
return nil, fmt.Errorf("failed to read payload: %v", err)
}

if c.completion.Error == nil {
var payload *commonpb.Payload
if payloadBody != nil {
content := &nexus.Content{
Header: op.Reader.Header,
Data: payloadBody,
}
err := commonnexus.PayloadSerializer.Deserialize(content, &payload)
if err != nil {
return nil, fmt.Errorf("failed to deserialize payload: %v", err)
if c.completion.Result != nil {
var ok bool
payload, ok = c.completion.Result.(*commonpb.Payload)
if !ok {
return nil, fmt.Errorf("invalid result, expected a payload, got: %T", c.completion.Result)
}
}

req = &historyservice.CompleteNexusOperationChasmRequest{
Outcome: &historyservice.CompleteNexusOperationChasmRequest_Success{
Success: payload,
},
CloseTime: timestamppb.New(op.CloseTime),
CloseTime: timestamppb.New(c.completion.CloseTime),
Completion: completion,
}
case *nexusrpc.OperationCompletionUnsuccessful:
apiFailure, err := commonnexus.NexusFailureToAPIFailure(op.Failure, true)
} else {
failure, err := nexusrpc.DefaultFailureConverter().ErrorToFailure(c.completion.Error)
if err != nil {
return nil, fmt.Errorf("failed to convert failure type: %v", err)
return nil, fmt.Errorf("failed to convert error to failure: %w", err)
}
// Unwrap the operation error since it's not meant to be sent for Temporal->Temporal completions.
if failure.Cause != nil {
failure = *failure.Cause
}
apiFailure, err := commonnexus.NexusFailureToTemporalFailure(failure)
if err != nil {
return nil, fmt.Errorf("failed to convert failure type: %w", err)
}

req = &historyservice.CompleteNexusOperationChasmRequest{
Completion: completion,
Outcome: &historyservice.CompleteNexusOperationChasmRequest_Failure{
Failure: apiFailure,
},
CloseTime: timestamppb.New(op.CloseTime),
CloseTime: timestamppb.New(c.completion.CloseTime),
}
default:
return nil, fmt.Errorf("unexpected nexus.OperationCompletion: %v", completion)
}

return req, nil
Expand Down
2 changes: 1 addition & 1 deletion chasm/lib/callback/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

type CompletionSource interface {
GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.OperationCompletion, error)
GetNexusCompletion(ctx chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error)
}

var _ chasm.Component = (*Callback)(nil)
Expand Down
96 changes: 33 additions & 63 deletions chasm/lib/callback/executors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ type mockNexusCompletionGetterComponent struct {

Empty *emptypb.Empty

completion nexusrpc.OperationCompletion
completion nexusrpc.CompleteOperationOptions
err error

Callback chasm.Field[*Callback]
}

func (m *mockNexusCompletionGetterComponent) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.OperationCompletion, error) {
func (m *mockNexusCompletionGetterComponent) GetNexusCompletion(_ chasm.Context, requestID string) (nexusrpc.CompleteOperationOptions, error) {
return m.completion, m.err
}

Expand Down Expand Up @@ -81,7 +81,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
caller: func(r *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 200, Body: http.NoBody}, nil
},
expectedMetricOutcome: "status:200",
expectedMetricOutcome: "success",
assertOutcome: func(t *testing.T, cb *Callback, err error) {
require.NoError(t, err)
require.Equal(t, callbackspb.CALLBACK_STATUS_SUCCEEDED, cb.Status)
Expand All @@ -104,7 +104,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
caller: func(r *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 500, Body: http.NoBody}, nil
},
expectedMetricOutcome: "status:500",
expectedMetricOutcome: "handler-error:INTERNAL",
assertOutcome: func(t *testing.T, cb *Callback, err error) {
var destDownErr *queueserrors.DestinationDownError
require.ErrorAs(t, err, &destDownErr)
Expand All @@ -116,7 +116,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
caller: func(r *http.Request) (*http.Response, error) {
return &http.Response{StatusCode: 400, Body: http.NoBody}, nil
},
expectedMetricOutcome: "status:400",
expectedMetricOutcome: "handler-error:BAD_REQUEST",
assertOutcome: func(t *testing.T, cb *Callback, err error) {
require.NoError(t, err)
require.Equal(t, callbackspb.CALLBACK_STATUS_FAILED, cb.Status)
Expand Down Expand Up @@ -210,8 +210,7 @@ func TestExecuteInvocationTaskNexus_Outcomes(t *testing.T) {
}

// Create completion
completion, err := nexusrpc.NewOperationCompletionSuccessful(nil, nexusrpc.OperationCompletionSuccessfulOptions{})
require.NoError(t, err)
completion := nexusrpc.CompleteOperationOptions{}

// Set up the CompletionSource field to return our mock completion
root.SetRootComponent(&mockNexusCompletionGetterComponent{
Expand Down Expand Up @@ -371,7 +370,7 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
cases := []struct {
name string
setupHistoryClient func(*testing.T, *gomock.Controller) resource.HistoryClient
completion nexusrpc.OperationCompletion
completion nexusrpc.CompleteOperationOptions
headerValue string
assertOutcome func(*testing.T, *Callback, error)
}{
Expand All @@ -397,16 +396,11 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
})
return client
},
completion: func() nexusrpc.OperationCompletion {
comp, err := nexusrpc.NewOperationCompletionSuccessful(
createPayloadBytes([]byte("result-data")),
nexusrpc.OperationCompletionSuccessfulOptions{
Serializer: commonnexus.PayloadSerializer,
CloseTime: dummyTime,
},
)
require.NoError(t, err)
return comp
completion: func() nexusrpc.CompleteOperationOptions {
return nexusrpc.CompleteOperationOptions{
Result: createPayloadBytes([]byte("result-data")),
CloseTime: dummyTime,
}
}(),
headerValue: encodedRef,
assertOutcome: func(t *testing.T, cb *Callback, err error) {
Expand All @@ -430,18 +424,14 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
})
return client
},
completion: func() nexusrpc.OperationCompletion {
comp, err := nexusrpc.NewOperationCompletionUnsuccessful(
&nexus.OperationError{
completion: func() nexusrpc.CompleteOperationOptions {
return nexusrpc.CompleteOperationOptions{
Error: &nexus.OperationError{
State: nexus.OperationStateFailed,
Cause: &nexus.FailureError{Failure: nexus.Failure{Message: "operation failed"}},
},
nexusrpc.OperationCompletionUnsuccessfulOptions{
CloseTime: dummyTime,
},
)
require.NoError(t, err)
return comp
CloseTime: dummyTime,
}
}(),
headerValue: encodedRef,
assertOutcome: func(t *testing.T, cb *Callback, err error) {
Expand All @@ -459,15 +449,10 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
).Return(nil, status.Error(codes.Unavailable, "service unavailable"))
return client
},
completion: func() nexusrpc.OperationCompletion {
comp, err := nexusrpc.NewOperationCompletionSuccessful(
createPayloadBytes([]byte("result-data")),
nexusrpc.OperationCompletionSuccessfulOptions{
Serializer: commonnexus.PayloadSerializer,
},
)
require.NoError(t, err)
return comp
completion: func() nexusrpc.CompleteOperationOptions {
return nexusrpc.CompleteOperationOptions{
Result: createPayloadBytes([]byte("result-data")),
}
}(),
headerValue: encodedRef,
assertOutcome: func(t *testing.T, cb *Callback, err error) {
Expand All @@ -485,15 +470,10 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
).Return(nil, status.Error(codes.InvalidArgument, "invalid request"))
return client
},
completion: func() nexusrpc.OperationCompletion {
comp, err := nexusrpc.NewOperationCompletionSuccessful(
createPayloadBytes([]byte("result-data")),
nexusrpc.OperationCompletionSuccessfulOptions{
Serializer: commonnexus.PayloadSerializer,
},
)
require.NoError(t, err)
return comp
completion: func() nexusrpc.CompleteOperationOptions {
return nexusrpc.CompleteOperationOptions{
Result: createPayloadBytes([]byte("result-data")),
}
}(),
headerValue: encodedRef,
assertOutcome: func(t *testing.T, cb *Callback, err error) {
Expand All @@ -507,15 +487,10 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
// No RPC call expected
return historyservicemock.NewMockHistoryServiceClient(ctrl)
},
completion: func() nexusrpc.OperationCompletion {
comp, err := nexusrpc.NewOperationCompletionSuccessful(
createPayloadBytes([]byte("result-data")),
nexusrpc.OperationCompletionSuccessfulOptions{
Serializer: commonnexus.PayloadSerializer,
},
)
require.NoError(t, err)
return comp
completion: func() nexusrpc.CompleteOperationOptions {
return nexusrpc.CompleteOperationOptions{
Result: createPayloadBytes([]byte("result-data")),
}
}(),
headerValue: "invalid-base64!!!",
assertOutcome: func(t *testing.T, cb *Callback, err error) {
Expand All @@ -529,15 +504,10 @@ func TestExecuteInvocationTaskChasm_Outcomes(t *testing.T) {
// No RPC call expected
return historyservicemock.NewMockHistoryServiceClient(ctrl)
},
completion: func() nexusrpc.OperationCompletion {
comp, err := nexusrpc.NewOperationCompletionSuccessful(
createPayloadBytes([]byte("result-data")),
nexusrpc.OperationCompletionSuccessfulOptions{
Serializer: commonnexus.PayloadSerializer,
},
)
require.NoError(t, err)
return comp
completion: func() nexusrpc.CompleteOperationOptions {
return nexusrpc.CompleteOperationOptions{
Result: createPayloadBytes([]byte("result-data")),
}
}(),
headerValue: base64.RawURLEncoding.EncodeToString([]byte("not-valid-protobuf")),
assertOutcome: func(t *testing.T, cb *Callback, err error) {
Expand Down
Loading
Loading