Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/confidential-module-plumbing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add ConfidentialModule and attributes plumbing for confidential CRE workflows #added #db_update #wip
1 change: 1 addition & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,7 @@ type WorkflowSpec struct {
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
Attributes []byte `db:"attributes"`
sdkWorkflow *sdk.WorkflowSpec
rawSpec []byte
config []byte
Expand Down
9 changes: 6 additions & 3 deletions core/services/workflows/artifacts/v2/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url,
created_at,
updated_at,
spec_type
spec_type,
attributes
) VALUES (
:workflow,
:config,
Expand All @@ -71,7 +72,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
:config_url,
:created_at,
:updated_at,
:spec_type
:spec_type,
:attributes
) ON CONFLICT (workflow_id) DO UPDATE
SET
workflow = EXCLUDED.workflow,
Expand All @@ -84,7 +86,8 @@ func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec)
config_url = EXCLUDED.config_url,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
spec_type = EXCLUDED.spec_type,
attributes = EXCLUDED.attributes
RETURNING id
`

Expand Down
115 changes: 115 additions & 0 deletions core/services/workflows/syncer/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (h *eventHandler) createWorkflowSpec(ctx context.Context, payload WorkflowR
SpecType: job.WASMFile,
BinaryURL: payload.BinaryURL,
ConfigURL: payload.ConfigURL,
Attributes: payload.Attributes,
}

if _, err = h.workflowArtifactsStore.UpsertWorkflowSpec(ctx, entry); err != nil {
Expand Down Expand Up @@ -697,6 +698,15 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return fmt.Errorf("invalid workflow name: %w", err)
}

confidential, err := v2.IsConfidential(spec.Attributes)
if err != nil {
return fmt.Errorf("failed to parse workflow attributes: %w", err)
}
if confidential {
h.lggr.Infow("routing workflow to confidential execution", "workflowID", spec.WorkflowID)
return h.tryConfidentialEngineCreate(ctx, spec, wid, workflowName, decodedBinary, source)
}

// Create a channel to receive the initialization result.
// This allows us to wait for the engine to complete initialization (including trigger subscriptions)
// before emitting the workflowActivated event, ensuring the event accurately reflects deployment status.
Expand Down Expand Up @@ -768,6 +778,111 @@ func (h *eventHandler) tryEngineCreate(ctx context.Context, spec *job.WorkflowSp
return nil
}

// tryConfidentialEngineCreate creates a V2 engine backed by a ConfidentialModule
// instead of a local WASM module. The ConfidentialModule delegates execution to
// the confidential-workflows capability which runs the WASM inside a TEE.
func (h *eventHandler) tryConfidentialEngineCreate(
ctx context.Context,
spec *job.WorkflowSpec,
wid types.WorkflowID,
workflowName types.WorkflowName,
decodedBinary []byte,
source string,
) error {
attrs, err := v2.ParseWorkflowAttributes(spec.Attributes)
if err != nil {
return fmt.Errorf("failed to parse workflow attributes: %w", err)
}

binaryHash := v2.ComputeBinaryHash(decodedBinary)

lggr := logger.Named(h.lggr, "WorkflowEngine.ConfidentialModule")
lggr = logger.With(lggr, "workflowID", spec.WorkflowID, "workflowName", spec.WorkflowName, "workflowOwner", spec.WorkflowOwner)

module := v2.NewConfidentialModule(
h.capRegistry,
spec.BinaryURL,
binaryHash,
spec.WorkflowID,
spec.WorkflowOwner,
workflowName.String(),
spec.WorkflowTag,
attrs.VaultDonSecrets,
lggr,
)

initDone := make(chan error, 1)

cfg := &v2.EngineConfig{
Lggr: h.lggr,
Module: module,
WorkflowConfig: []byte(spec.Config),
CapRegistry: h.capRegistry,
DonSubscriber: h.workflowDonSubscriber,
UseLocalTimeProvider: h.useLocalTimeProvider,
DonTimeStore: h.donTimeStore,
ExecutionsStore: h.workflowStore,
WorkflowID: spec.WorkflowID,
WorkflowOwner: spec.WorkflowOwner,
WorkflowName: workflowName,
WorkflowTag: spec.WorkflowTag,
WorkflowEncryptionKey: h.workflowEncryptionKey,

LocalLimits: v2.EngineLimits{},
LocalLimiters: h.engineLimiters,
GlobalExecutionConcurrencyLimiter: h.workflowLimits,

BeholderEmitter: h.emitter,
BillingClient: h.billingClient,

WorkflowRegistryAddress: h.workflowRegistryAddress,
WorkflowRegistryChainSelector: h.workflowRegistryChainSelector,
OrgResolver: h.orgResolver,
SecretsFetcher: h.secretsFetcher,
}

existingHook := cfg.Hooks.OnInitialized
cfg.Hooks.OnInitialized = func(err error) {
initDone <- err
if existingHook != nil {
existingHook(err)
}
}

engine, err := v2.NewEngine(cfg)
if err != nil {
return fmt.Errorf("failed to create confidential workflow engine: %w", err)
}

if err = engine.Start(ctx); err != nil {
return fmt.Errorf("failed to start confidential workflow engine: %w", err)
}

select {
case <-ctx.Done():
if closeErr := engine.Close(); closeErr != nil {
h.lggr.Errorw("failed to close engine after context cancellation", "error", closeErr, "workflowID", spec.WorkflowID)
}
return fmt.Errorf("context cancelled while waiting for engine initialization: %w", ctx.Err())
case initErr := <-initDone:
if initErr != nil {
if closeErr := engine.Close(); closeErr != nil {
h.lggr.Errorw("failed to close engine after initialization failure", "error", closeErr, "workflowID", spec.WorkflowID)
}
return fmt.Errorf("engine initialization failed: %w", initErr)
}
}

if err := h.engineRegistry.Add(wid, source, engine); err != nil {
if closeErr := engine.Close(); closeErr != nil {
return fmt.Errorf("failed to close workflow engine: %w during invariant violation: %w", closeErr, err)
}
return fmt.Errorf("invariant violation: %w", err)
}

return nil
}
Comment on lines +781 to +884
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tryConfidentialEngineCreate function lacks test coverage. The codebase shows comprehensive testing for tryEngineCreate and other handler flows. Consider adding tests that verify: (1) confidential engine creation when IsConfidential returns true, (2) proper initialization and lifecycle hooks, (3) error handling during engine creation and startup, and (4) integration with the engine registry.

Copilot uses AI. Check for mistakes.

// logCustMsg emits a custom message to the external sink and logs an error if that fails.
func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) {
err := cma.Emit(ctx, msg)
Expand Down
184 changes: 184 additions & 0 deletions core/services/workflows/v2/confidential_module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package v2

import (
"context"
"crypto/sha256"
"encoding/json"
"fmt"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"

confworkflowtypes "github.com/smartcontractkit/chainlink-common/pkg/capabilities/v2/actions/confidentialworkflow"
sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk"
)

const confidentialWorkflowsCapabilityID = "confidential-workflows@1.0.0-alpha"

// WorkflowAttributes is the JSON structure stored in WorkflowSpec.Attributes.
type WorkflowAttributes struct {
Confidential bool `json:"confidential"`
VaultDonSecrets []SecretIdentifier `json:"vault_don_secrets"`
}

// SecretIdentifier identifies a secret in VaultDON.
type SecretIdentifier struct {
Key string `json:"key"`
Namespace string `json:"namespace,omitempty"`
}

// ParseWorkflowAttributes parses the Attributes JSON from a WorkflowSpec.
// Returns a zero-value struct if data is nil or empty.
func ParseWorkflowAttributes(data []byte) (WorkflowAttributes, error) {
var attrs WorkflowAttributes
if len(data) == 0 {
return attrs, nil
}
if err := json.Unmarshal(data, &attrs); err != nil {
return attrs, fmt.Errorf("failed to parse workflow attributes: %w", err)
}
return attrs, nil
}

// IsConfidential returns true if the Attributes JSON has "confidential": true.
// Returns an error if the attributes contain malformed JSON, so callers can
// fail loudly rather than silently falling through to non-confidential execution.
func IsConfidential(data []byte) (bool, error) {
attrs, err := ParseWorkflowAttributes(data)
if err != nil {
return false, err
}
return attrs.Confidential, nil
}
Comment on lines 48 to 57
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IsConfidential function silently returns false when ParseWorkflowAttributes fails. This could hide configuration errors and cause workflows intended to be confidential to execute non-confidentially. Consider logging the parsing error or exposing it to callers so that malformed attributes are caught early rather than silently ignored.

Copilot uses AI. Check for mistakes.

// ConfidentialModule implements host.ModuleV2 for confidential workflows.
// Instead of running WASM locally, it delegates execution to the
// confidential-workflows capability via the CapabilitiesRegistry.
type ConfidentialModule struct {
capRegistry core.CapabilitiesRegistry
binaryURL string
binaryHash []byte
workflowID string
workflowOwner string
workflowName string
workflowTag string
vaultDonSecrets []SecretIdentifier
lggr logger.Logger
}

var _ host.ModuleV2 = (*ConfidentialModule)(nil)

func NewConfidentialModule(
capRegistry core.CapabilitiesRegistry,
binaryURL string,
binaryHash []byte,
workflowID string,
workflowOwner string,
workflowName string,
workflowTag string,
vaultDonSecrets []SecretIdentifier,
lggr logger.Logger,
) *ConfidentialModule {
return &ConfidentialModule{
capRegistry: capRegistry,
binaryURL: binaryURL,
binaryHash: binaryHash,
workflowID: workflowID,
workflowOwner: workflowOwner,
workflowName: workflowName,
workflowTag: workflowTag,
vaultDonSecrets: vaultDonSecrets,
lggr: lggr,
}
}

func (m *ConfidentialModule) Start() {}
func (m *ConfidentialModule) Close() {}
func (m *ConfidentialModule) IsLegacyDAG() bool { return false }

func (m *ConfidentialModule) Execute(
ctx context.Context,
request *sdkpb.ExecuteRequest,
_ host.ExecutionHelper,
) (*sdkpb.ExecutionResult, error) {
execReqBytes, err := proto.Marshal(request)
if err != nil {
return nil, fmt.Errorf("failed to marshal ExecuteRequest: %w", err)
}

protoSecrets := make([]*confworkflowtypes.SecretIdentifier, len(m.vaultDonSecrets))
for i, s := range m.vaultDonSecrets {
ns := s.Namespace
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code defaults an empty namespace to "main" on line 116. This default appears to be a business logic decision, but it's not documented. Consider adding a comment explaining why "main" is the default namespace and whether this aligns with the VaultDON's expected behavior. This will help future maintainers understand the implicit contract.

Suggested change
ns := s.Namespace
ns := s.Namespace
// Default to the "main" namespace when none is provided. VaultDON and the
// confidential workflows capability treat "main" as the canonical default
// namespace for secrets, so leaving this empty would not match the expected
// behavior of downstream components.

Copilot uses AI. Check for mistakes.
if ns == "" {
ns = "main"
}
protoSecrets[i] = &confworkflowtypes.SecretIdentifier{
Key: s.Key,
Namespace: proto.String(ns),
}
}

capInput := &confworkflowtypes.ConfidentialWorkflowRequest{
VaultDonSecrets: protoSecrets,
Execution: &confworkflowtypes.WorkflowExecution{
WorkflowId: m.workflowID,
BinaryUrl: m.binaryURL,
BinaryHash: m.binaryHash,
ExecuteRequest: execReqBytes,
},
}

payload, err := anypb.New(capInput)
if err != nil {
return nil, fmt.Errorf("failed to marshal capability payload: %w", err)
}

cap, err := m.capRegistry.GetExecutable(ctx, confidentialWorkflowsCapabilityID)
if err != nil {
return nil, fmt.Errorf("failed to get confidential-workflows capability: %w", err)
}

capReq := capabilities.CapabilityRequest{
Payload: payload,
Method: "Execute",
CapabilityId: confidentialWorkflowsCapabilityID,
Metadata: capabilities.RequestMetadata{
WorkflowID: m.workflowID,
WorkflowOwner: m.workflowOwner,
WorkflowName: m.workflowName,
WorkflowTag: m.workflowTag,
},
}

capResp, err := cap.Execute(ctx, capReq)
if err != nil {
return nil, fmt.Errorf("confidential-workflows capability execution failed: %w", err)
}

if capResp.Payload == nil {
return nil, fmt.Errorf("confidential-workflows capability returned nil payload")
}

var confResp confworkflowtypes.ConfidentialWorkflowResponse
if err := capResp.Payload.UnmarshalTo(&confResp); err != nil {
return nil, fmt.Errorf("failed to unmarshal capability response: %w", err)
}

var result sdkpb.ExecutionResult
if err := proto.Unmarshal(confResp.ExecutionResult, &result); err != nil {
return nil, fmt.Errorf("failed to unmarshal ExecutionResult: %w", err)
}

return &result, nil
}

// ComputeBinaryHash returns the SHA-256 hash of the given binary.
func ComputeBinaryHash(binary []byte) []byte {
h := sha256.Sum256(binary)
return h[:]
}
Comment on lines 1 to 184
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ConfidentialModule and tryConfidentialEngineCreate function lack test coverage. The codebase shows comprehensive testing for other modules and engine creation flows. Consider adding tests that verify: (1) confidential module creation and configuration, (2) routing logic for workflows with confidential attributes, (3) error handling when the capability is not available, and (4) proper passing of vault secrets and binary hash.

Copilot uses AI. Check for mistakes.
Loading
Loading