From 4e67dfd2cc3f9c0034d48251175ecdf55665e518 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 19 Nov 2025 15:05:10 +0000 Subject: [PATCH 1/6] Workflow: no longer require DB connection string PR updates the workflow clients to use the Dapr RPCs to list Workflow instance IDs instead of connecting directly to the database. This removes the requirement that the Dapr CLI need both connectivity to the database and a connection string, greatly improving the user experience. `$ dapr workflow list` now works out-of-the-box without any additional configuration. Connection strings are still supported via the `--connection-string` flag for users using Dapr pre v1.17. Also exposes the `--force` flag on `$ dapr workflow purge` to allow purging without a worker connected. Signed-off-by: joshvanl --- .github/workflows/kind_e2e.yaml | 2 +- .github/workflows/self_hosted_e2e.yaml | 2 +- cmd/workflow/purge.go | 6 +- cmd/workflow/raiseevent.go | 3 +- cmd/workflow/rerun.go | 3 +- cmd/workflow/run.go | 3 +- cmd/workflow/workflow.go | 2 +- go.mod | 38 +-- go.sum | 68 ++--- pkg/workflow/db/sql.go | 5 +- pkg/workflow/dclient/dclient.go | 326 +++++++++++++++++++--- pkg/workflow/events.go | 4 +- pkg/workflow/history.go | 96 +------ pkg/workflow/list.go | 46 +-- pkg/workflow/purge.go | 50 +--- pkg/workflow/rerun.go | 4 +- pkg/workflow/workflow.go | 85 ------ tests/apps/emit-metrics/app.go | 13 +- tests/apps/emit-metrics/go.mod | 7 + tests/apps/emit-metrics/go.sum | 20 ++ tests/apps/workflow/app.go | 2 +- tests/e2e/spawn/spawn.go | 38 +-- tests/e2e/standalone/run_template_test.go | 137 +++++++-- tests/e2e/standalone/scheduler_test.go | 92 +++--- tests/e2e/standalone/workflow_test.go | 9 - tests/e2e/upgrade/upgrade_test.go | 6 +- 26 files changed, 597 insertions(+), 470 deletions(-) delete mode 100644 pkg/workflow/workflow.go create mode 100644 tests/apps/emit-metrics/go.sum diff --git a/.github/workflows/kind_e2e.yaml b/.github/workflows/kind_e2e.yaml index 7cbcbdbf2..8a02e415c 100644 --- a/.github/workflows/kind_e2e.yaml +++ b/.github/workflows/kind_e2e.yaml @@ -50,7 +50,7 @@ jobs: name: E2E tests for K8s (KinD) runs-on: ubuntu-latest env: - DAPR_RUNTIME_PINNED_VERSION: 1.16.1 + DAPR_RUNTIME_PINNED_VERSION: 1.17.0-rc.1 DAPR_DASHBOARD_PINNED_VERSION: 0.15.0 DAPR_RUNTIME_LATEST_STABLE_VERSION: DAPR_DASHBOARD_LATEST_STABLE_VERSION: diff --git a/.github/workflows/self_hosted_e2e.yaml b/.github/workflows/self_hosted_e2e.yaml index 1aca5bf41..14d35b188 100644 --- a/.github/workflows/self_hosted_e2e.yaml +++ b/.github/workflows/self_hosted_e2e.yaml @@ -38,7 +38,7 @@ jobs: GOARCH: ${{ matrix.target_arch }} GOPROXY: https://proxy.golang.org ARCHIVE_OUTDIR: dist/archives - DAPR_RUNTIME_PINNED_VERSION: "1.16.1" + DAPR_RUNTIME_PINNED_VERSION: "1.17.0-rc.1" DAPR_DASHBOARD_PINNED_VERSION: 0.15.0 DAPR_RUNTIME_LATEST_STABLE_VERSION: "" DAPR_DASHBOARD_LATEST_STABLE_VERSION: "" diff --git a/cmd/workflow/purge.go b/cmd/workflow/purge.go index 5ddac021e..3197fafb4 100644 --- a/cmd/workflow/purge.go +++ b/cmd/workflow/purge.go @@ -25,12 +25,14 @@ var ( flagPurgeOlderThan string flagPurgeAll bool flagPurgeConn *connFlag + flagPurgeForce bool schedulerNamespace string ) var PurgeCmd = &cobra.Command{ Use: "purge", - Short: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.", + Short: "Purge workflow instances with a terminal state.", + Long: "Purge one or more workflow instances with a terminal state. Accepts a workflow instance ID argument or flags to purge multiple/all terminal instances. Also deletes all associated scheduler jobs.", Args: func(cmd *cobra.Command, args []string) error { switch { case cmd.Flags().Changed("all-older-than"), @@ -63,6 +65,7 @@ var PurgeCmd = &cobra.Command{ All: flagPurgeAll, ConnectionString: flagPurgeConn.connectionString, TableName: flagPurgeConn.tableName, + Force: flagPurgeForce, } if cmd.Flags().Changed("all-older-than") { @@ -80,6 +83,7 @@ func init() { PurgeCmd.Flags().StringVar(&flagPurgeOlderThan, "all-older-than", "", "Purge workflow instances older than the specified Go duration or timestamp, e.g., '24h' or '2023-01-02T15:04:05Z'.") PurgeCmd.Flags().BoolVar(&flagPurgeAll, "all", false, "Purge all workflow instances in a terminal state. Use with caution.") PurgeCmd.MarkFlagsMutuallyExclusive("all-older-than", "all") + PurgeCmd.Flags().BoolVar(&flagPurgeForce, "force", false, "force will force a purge of a workflow, regardless of its current runtime state, or whether an active worker can process it, the backend will attempt to delete it anyway. This necessarily means the purging is executed out side of the workflow state machine, and therefore, can lead to corrupt state or broken workflow execution. Usage of this should _only_ be used when you know the workflow is not being currently processed. It is highly recommended to avoid using this flag unless absolutely necessary.") PurgeCmd.Flags().StringVar(&schedulerNamespace, "scheduler-namespace", "dapr-system", "Kubernetes namespace where the scheduler is deployed, only relevant if --kubernetes is set") diff --git a/cmd/workflow/raiseevent.go b/cmd/workflow/raiseevent.go index cc533a0a1..6a40cc9fc 100644 --- a/cmd/workflow/raiseevent.go +++ b/cmd/workflow/raiseevent.go @@ -30,7 +30,8 @@ var ( var RaiseEventCmd = &cobra.Command{ Use: "raise-event", - Short: "Raise an event for a workflow waiting for an external event. Expects a single argument '/'.", + Short: "Raise an event for a workflow waiting for an external event.", + Long: "Raise an event for a workflow waiting for an external event. Expects a single argument '/'.", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { ctx := signals.Context() diff --git a/cmd/workflow/rerun.go b/cmd/workflow/rerun.go index 29b85ba7a..2d45290ba 100644 --- a/cmd/workflow/rerun.go +++ b/cmd/workflow/rerun.go @@ -32,7 +32,8 @@ var ( var ReRunCmd = &cobra.Command{ Use: "rerun [instance ID]", - Short: "ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided.", + Short: "Re-run a workflow instance.", + Long: "ReRun a workflow instance from the beginning or a specific event. Optionally, a new instance ID and input to the starting event can be provided.", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { ctx := signals.Context() diff --git a/cmd/workflow/run.go b/cmd/workflow/run.go index 2536def0f..6a26b6ed3 100644 --- a/cmd/workflow/run.go +++ b/cmd/workflow/run.go @@ -31,7 +31,8 @@ var ( var RunCmd = &cobra.Command{ Use: "run", - Short: "Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name.", + Short: "Run a workflow instance.", + Long: "Run a workflow instance based on a given workflow name. Accepts a single argument, the workflow name.", Args: cobra.ExactArgs(1), RunE: func(cmd *cobra.Command, args []string) error { ctx := signals.Context() diff --git a/cmd/workflow/workflow.go b/cmd/workflow/workflow.go index 3174792ed..bbfb91ff5 100644 --- a/cmd/workflow/workflow.go +++ b/cmd/workflow/workflow.go @@ -205,7 +205,7 @@ func connectionCmd(cmd *cobra.Command) *connFlag { flagTableName string ) - cmd.Flags().StringVarP(&flagConnectionString, "connection-string", "c", "", "The connection string used to connect and authenticate to the actor state store") + cmd.Flags().StringVarP(&flagConnectionString, "connection-string", "c", "", "Only used for Dapr runtime versions 1.16. The connection string used to connect and authenticate to the actor state store") cmd.Flags().StringVarP(&flagTableName, "table-name", "t", "", "The name of the table or collection which is used as the actor state store") var cflag connFlag diff --git a/go.mod b/go.mod index 3b3bef80a..cb04d3ed8 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,21 @@ module github.com/dapr/cli -go 1.24.7 +go 1.24.11 require ( github.com/Masterminds/semver v1.5.0 github.com/Masterminds/semver/v3 v3.3.0 github.com/Pallinder/sillyname-go v0.0.0-20130730142914-97aeae9e6ba1 github.com/briandowns/spinner v1.19.0 - github.com/dapr/dapr v1.16.0 - github.com/dapr/durabletask-go v0.10.0 + github.com/dapr/dapr v1.16.1-rc.3.0.20260109125959-3e6d229306c2 + github.com/dapr/durabletask-go v0.10.2-0.20260109105925-0094a750e8b7 github.com/dapr/go-sdk v1.13.0 - github.com/dapr/kit v0.16.1 - github.com/diagridio/go-etcd-cron v0.9.1 + github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d + github.com/diagridio/go-etcd-cron v0.10.1-0.20260105221246-ee8c118dd834 github.com/docker/docker v25.0.6+incompatible github.com/evanphx/json-patch/v5 v5.9.0 github.com/fatih/color v1.17.0 + github.com/go-sql-driver/mysql v1.8.1 github.com/gocarina/gocsv v0.0.0-20220927221512-ad3251f9fa25 github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/go-version v1.6.0 @@ -36,16 +37,17 @@ require ( github.com/stretchr/testify v1.10.0 go.etcd.io/etcd/client/v3 v3.5.21 go.mongodb.org/mongo-driver v1.14.0 - golang.org/x/mod v0.25.0 - golang.org/x/sys v0.33.0 - google.golang.org/protobuf v1.36.6 + golang.org/x/mod v0.29.0 + golang.org/x/sys v0.38.0 + google.golang.org/grpc v1.73.0 + google.golang.org/protobuf v1.36.9 gopkg.in/yaml.v2 v2.4.0 helm.sh/helm/v3 v3.17.1 - k8s.io/api v0.32.1 + k8s.io/api v0.32.3 k8s.io/apiextensions-apiserver v0.32.1 k8s.io/apimachinery v0.33.0 k8s.io/cli-runtime v0.32.1 - k8s.io/client-go v0.32.1 + k8s.io/client-go v0.32.3 k8s.io/helm v2.16.10+incompatible sigs.k8s.io/yaml v1.4.0 ) @@ -54,6 +56,7 @@ require ( cel.dev/expr v0.23.0 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect dario.cat/mergo v1.0.1 // indirect + filippo.io/edwards25519 v1.1.0 // indirect github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 // indirect github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/BurntSushi/toml v1.4.0 // indirect @@ -76,7 +79,7 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/chai2010/gettext-go v1.0.2 // indirect github.com/chebyrash/promise v0.0.0-20230709133807-42ec49ba1459 // indirect - github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 // indirect + github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 // indirect github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect github.com/containerd/containerd v1.7.24 // indirect github.com/containerd/errdefs v0.3.0 // indirect @@ -85,7 +88,7 @@ require ( github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cyphar/filepath-securejoin v0.3.6 // indirect - github.com/dapr/components-contrib v1.16.0 // indirect + github.com/dapr/components-contrib v1.16.2-0.20260105164851-3e22d45d5cae // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect @@ -240,17 +243,16 @@ require ( go.opentelemetry.io/proto/otlp v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.39.0 // indirect + golang.org/x/crypto v0.45.0 // indirect golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect - golang.org/x/net v0.41.0 // indirect + golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sync v0.15.0 // indirect - golang.org/x/term v0.32.0 // indirect - golang.org/x/text v0.26.0 // indirect + golang.org/x/sync v0.18.0 // indirect + golang.org/x/term v0.37.0 // indirect + golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.11.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250512202823-5a2f75b736a9 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/grpc v1.73.0 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect diff --git a/go.sum b/go.sum index b9fec0821..6c58ab82f 100644 --- a/go.sum +++ b/go.sum @@ -142,8 +142,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 h1:dEopBSOSjB5fM9r76ufM44AVj9Dnz2IOM0Xs6FVxZRM= -github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0/go.mod h1:qDSbb0fgIfFNjZrNTPtS5MOMScAGyQtn1KlSvoOdqYw= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 h1:FIvfKlS2mcuP0qYY6yzdIU9xdrRd/YMP0bNwFjXd0u8= +github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2/go.mod h1:POsdVp/08Mki0WD9QvvgRRpg9CQ6zhjfRrBoEY8JFS8= github.com/cloudevents/sdk-go/v2 v2.15.2 h1:54+I5xQEnI73RBhWHxbI1XJcqOFOVJN85vb41+8mHUc= github.com/cloudevents/sdk-go/v2 v2.15.2/go.mod h1:lL7kSWAE/V8VI4Wh0jbL2v/jvqsm6tjmaQBSvxcv4uE= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= @@ -169,16 +169,16 @@ github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/cyphar/filepath-securejoin v0.3.6 h1:4d9N5ykBnSp5Xn2JkhocYDkOpURL/18CYMpo6xB9uWM= github.com/cyphar/filepath-securejoin v0.3.6/go.mod h1:Sdj7gXlvMcPZsbhwhQ33GguGLDGQL7h7bg04C/+u9jI= -github.com/dapr/components-contrib v1.16.0 h1:kUif6UyxtRz6tXnkuIjbx6z+VLMfc6y+SIYa9T7J3eA= -github.com/dapr/components-contrib v1.16.0/go.mod h1:1AufCWqZwBj//UkyS7FesOEmp5/E6Xgy1tyCn8peiR4= -github.com/dapr/dapr v1.16.0 h1:la2WLZM8Myr2Pq3cyrFjHKWDSPYLzGZCs3p502TwBjI= -github.com/dapr/dapr v1.16.0/go.mod h1:ln/mxvNOeqklaDmic4ppsxmnjl2D/oZGKaJy24IwaEY= -github.com/dapr/durabletask-go v0.10.0 h1:vfIivPl4JYd55xZTslDwhA6p6F8ipcNxBtMaupxArr8= -github.com/dapr/durabletask-go v0.10.0/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= +github.com/dapr/components-contrib v1.16.2-0.20260105164851-3e22d45d5cae h1:3/LfsQisuENndaE59E41FTDqDYBmkf88SMorJsInA2g= +github.com/dapr/components-contrib v1.16.2-0.20260105164851-3e22d45d5cae/go.mod h1:NU9TRkHl1pi0mkoIOOd2msuYFVFBRsIO+frTpVMAlfs= +github.com/dapr/dapr v1.16.1-rc.3.0.20260109125959-3e6d229306c2 h1:sB9423NEsNhwSZYmaEClPy7KZsTUFRjjxU1X/CngvTw= +github.com/dapr/dapr v1.16.1-rc.3.0.20260109125959-3e6d229306c2/go.mod h1:w2z96ul2huSoE5NreRO+5O3UAdPXKfc0sxbHEE/JBuQ= +github.com/dapr/durabletask-go v0.10.2-0.20260109105925-0094a750e8b7 h1:8qsDN/X7idExhHwxchwcI5Hk0VJkFTVu5VebJXzk5zE= +github.com/dapr/durabletask-go v0.10.2-0.20260109105925-0094a750e8b7/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= github.com/dapr/go-sdk v1.13.0 h1:Qw2BmUonClQ9yK/rrEEaFL1PyDgq616RrvYj0CT67Lk= github.com/dapr/go-sdk v1.13.0/go.mod h1:RsffVNZitDApmQqoS68tNKGMXDZUjTviAbKZupJSzts= -github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= -github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= +github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d h1:csljij9d1IO6u9nqbg+TuSRmTZ+OXT8G49yh6zie1yI= +github.com/dapr/kit v0.16.2-0.20251124175541-3ac186dff64d/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -187,8 +187,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etly github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/diagridio/go-etcd-cron v0.9.1 h1:KUfcceDtypL8s3hL0jD2ZoiIzjjXY6xDQ4kT1DJF4Ws= -github.com/diagridio/go-etcd-cron v0.9.1/go.mod h1:CSzuxoCDFu+Gbds0RO73GE8CnmL5t85axiPLptsej3I= +github.com/diagridio/go-etcd-cron v0.10.1-0.20260105221246-ee8c118dd834 h1:orXUDAMBpAqp9HGc7Jz7WRl+6iCieVcxRIT+AYeQZnc= +github.com/diagridio/go-etcd-cron v0.10.1-0.20260105221246-ee8c118dd834/go.mod h1:XpjpGLT4WzS/eE+20h4aUl2yFtudShbrKK7cPQMtMJ0= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2 h1:aBfCb7iqHmDEIp6fBvC/hQUddQfg+3qdYjwzaiP9Hnc= github.com/distribution/distribution/v3 v3.0.0-20221208165359-362910506bc2/go.mod h1:WHNsWjnIn2V1LYOrME7e8KxSeKunYHsxEm4am0BUtcI= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= @@ -786,8 +786,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= -golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -821,8 +821,8 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= -golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -858,8 +858,8 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= -golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -882,8 +882,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -937,12 +937,12 @@ golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= -golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= +golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -951,8 +951,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= -golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1002,8 +1002,8 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc= -golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1097,8 +1097,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +google.golang.org/protobuf v1.36.9 h1:w2gp2mA27hUeUzj9Ex9FBjsBm40zfaDtEWow293U7Iw= +google.golang.org/protobuf v1.36.9/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -1135,8 +1135,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= -k8s.io/api v0.32.1 h1:f562zw9cy+GvXzXf0CKlVQ7yHJVYzLfL6JAS4kOAaOc= -k8s.io/api v0.32.1/go.mod h1:/Yi/BqkuueW1BgpoePYBRdDYfjPF5sgTr5+YqDZra5k= +k8s.io/api v0.32.3 h1:Hw7KqxRusq+6QSplE3NYG4MBxZw1BZnq4aP4cJVINls= +k8s.io/api v0.32.3/go.mod h1:2wEDTXADtm/HA7CCMD8D8bK4yuBUptzaRhYcYEEYA3k= k8s.io/apiextensions-apiserver v0.32.1 h1:hjkALhRUeCariC8DiVmb5jj0VjIc1N0DREP32+6UXZw= k8s.io/apiextensions-apiserver v0.32.1/go.mod h1:sxWIGuGiYov7Io1fAS2X06NjMIk5CbRHc2StSmbaQto= k8s.io/apimachinery v0.33.0 h1:1a6kHrJxb2hs4t8EE5wuR/WxKDwGN1FKH3JvDtA0CIQ= @@ -1145,8 +1145,8 @@ k8s.io/apiserver v0.32.1 h1:oo0OozRos66WFq87Zc5tclUX2r0mymoVHRq8JmR7Aak= k8s.io/apiserver v0.32.1/go.mod h1:UcB9tWjBY7aryeI5zAgzVJB/6k7E97bkr1RgqDz0jPw= k8s.io/cli-runtime v0.32.1 h1:19nwZPlYGJPUDbhAxDIS2/oydCikvKMHsxroKNGA2mM= k8s.io/cli-runtime v0.32.1/go.mod h1:NJPbeadVFnV2E7B7vF+FvU09mpwYlZCu8PqjzfuOnkY= -k8s.io/client-go v0.32.1 h1:otM0AxdhdBIaQh7l1Q0jQpmo7WOFIk5FFa4bg6YMdUU= -k8s.io/client-go v0.32.1/go.mod h1:aTTKZY7MdxUaJ/KiUs8D+GssR9zJZi77ZqtzcGXIiDg= +k8s.io/client-go v0.32.3 h1:RKPVltzopkSgHS7aS98QdscAgtgah/+zmpAogooIqVU= +k8s.io/client-go v0.32.3/go.mod h1:3v0+3k4IcT9bXTc4V2rt+d2ZPPG700Xy6Oi0Gdl2PaY= k8s.io/component-base v0.32.1 h1:/5IfJ0dHIKBWysGV0yKTFfacZ5yNV1sulPh3ilJjRZk= k8s.io/component-base v0.32.1/go.mod h1:j1iMMHi/sqAHeG5z+O9BFNCF698a1u0186zkjMZQ28w= k8s.io/helm v2.16.10+incompatible h1:eFksERw3joHEL62TrcDX8I5fgEQJvit4qxxPXAkYTyQ= diff --git a/pkg/workflow/db/sql.go b/pkg/workflow/db/sql.go index fd1b30dcc..0c0d2d4af 100644 --- a/pkg/workflow/db/sql.go +++ b/pkg/workflow/db/sql.go @@ -18,6 +18,7 @@ import ( "database/sql" "fmt" + _ "github.com/go-sql-driver/mysql" _ "github.com/jackc/pgx/v5/stdlib" _ "github.com/mattn/go-sqlite3" _ "github.com/microsoft/go-mssqldb" @@ -35,8 +36,8 @@ func SQL(ctx context.Context, driver, connString string) (*sql.DB, error) { return db, nil } -func ListSQL(ctx context.Context, db *sql.DB, table string, opts ListOptions) ([]string, error) { - query := fmt.Sprintf(`SELECT key FROM "%s" WHERE key LIKE ?;`, table) +func ListSQL(ctx context.Context, db *sql.DB, table, key string, opts ListOptions) ([]string, error) { + query := fmt.Sprintf(`SELECT "%s" FROM "%s" WHERE key LIKE ?;`, key, table) like := opts.AppID + "||dapr.internal." + opts.Namespace + "." + opts.AppID + ".workflow||%||metadata" rows, err := db.QueryContext(ctx, query, like) diff --git a/pkg/workflow/dclient/dclient.go b/pkg/workflow/dclient/dclient.go index f345fad4f..318a9ca14 100644 --- a/pkg/workflow/dclient/dclient.go +++ b/pkg/workflow/dclient/dclient.go @@ -15,31 +15,52 @@ package dclient import ( "context" + "encoding/json" + "errors" "fmt" "slices" + "sort" "strconv" + "strings" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" "github.com/dapr/cli/pkg/kubernetes" "github.com/dapr/cli/pkg/standalone" + "github.com/dapr/cli/pkg/workflow/db" "github.com/dapr/dapr/pkg/apis/components/v1alpha1" "github.com/dapr/dapr/pkg/components/loader" + "github.com/dapr/durabletask-go/api/protos" + "github.com/dapr/durabletask-go/workflow" "github.com/dapr/go-sdk/client" "github.com/dapr/kit/ptr" ) +const maxHistoryEntries = 1000 + type Options struct { - KubernetesMode bool - Namespace string - AppID string - RuntimePath string + KubernetesMode bool + Namespace string + AppID string + RuntimePath string + DBConnectionString *string } type Client struct { - Dapr client.Client - Cancel context.CancelFunc - StateStoreDriver string - ConnectionString *string - TableName *string + Dapr client.Client + WF *workflow.Client + Cancel context.CancelFunc + + kubernetesMode bool + resourcePaths []string + appID string + ns string + dbConnString *string } func DaprClient(ctx context.Context, opts Options) (*Client, error) { @@ -48,7 +69,7 @@ func DaprClient(ctx context.Context, opts Options) (*Client, error) { var client *Client var err error if opts.KubernetesMode { - client, err = kube(opts) + client, err = kube(ctx, opts) } else { client, err = stand(ctx, opts) } @@ -74,31 +95,44 @@ func stand(ctx context.Context, opts Options) (*Client, error) { return nil, fmt.Errorf("Dapr app with id '%s' not found", opts.AppID) } - if len(proc.ResourcePaths) == 0 { + resourcePaths := proc.ResourcePaths + if len(resourcePaths) == 0 { var daprDirPath string daprDirPath, err = standalone.GetDaprRuntimePath(opts.RuntimePath) if err != nil { return nil, err } - proc.ResourcePaths = []string{standalone.GetDaprComponentsPath(daprDirPath)} + resourcePaths = []string{standalone.GetDaprComponentsPath(daprDirPath)} } - comps, err := loader.NewLocalLoader(opts.AppID, proc.ResourcePaths).Load(ctx) + client, err := client.NewClientWithAddress("localhost:" + strconv.Itoa(proc.GRPCPort)) if err != nil { return nil, err } - c, err := clientFromComponents(comps, opts.AppID, strconv.Itoa(proc.GRPCPort)) + //nolint:staticcheck + conn, err := grpc.DialContext(ctx, "localhost:"+strconv.Itoa(proc.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) if err != nil { return nil, err } - c.Cancel = func() {} - return c, nil + return &Client{ + Dapr: client, + WF: workflow.NewClient(conn), + Cancel: func() { conn.Close() }, + kubernetesMode: false, + resourcePaths: resourcePaths, + appID: opts.AppID, + ns: opts.Namespace, + dbConnString: opts.DBConnectionString, + }, nil } -func kube(opts Options) (*Client, error) { +func kube(ctx context.Context, opts Options) (*Client, error) { list, err := kubernetes.List(opts.Namespace) if err != nil { return nil, err @@ -143,27 +177,134 @@ func kube(opts Options) (*Client, error) { return nil, err } - kclient, err := kubernetes.DaprClient() + client, err := client.NewClientWithAddress("localhost:" + strconv.Itoa(port)) if err != nil { return nil, err } - comps, err := kubernetes.ListComponents(kclient, pod.Namespace) + //nolint:staticcheck + conn, err := grpc.DialContext(ctx, "localhost:"+strconv.Itoa(port), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithBlock(), + ) if err != nil { return nil, err } - c, err := clientFromComponents(comps.Items, opts.AppID, pod.DaprGRPCPort) + return &Client{ + WF: workflow.NewClient(conn), + Dapr: client, + Cancel: func() { conn.Close(); portForward.Stop() }, + kubernetesMode: true, + appID: opts.AppID, + ns: opts.Namespace, + dbConnString: opts.DBConnectionString, + }, nil +} + +func (c *Client) InstanceIDs(ctx context.Context) ([]string, error) { + resp, err := c.WF.ListInstanceIDs(ctx) if err != nil { - portForward.Stop() + code, ok := status.FromError(err) + if !ok || (code.Code() != codes.Unimplemented && code.Code() != codes.Unknown) { + return nil, err + } + + // Dapr is pre v1.17, so fall back to reading from the state store + // directly. + var metaKeys []string + metaKeys, err = c.metaKeysFromDB(ctx) + if err != nil { + return nil, err + } + + instanceIDs := make([]string, 0, len(metaKeys)) + for _, key := range metaKeys { + split := strings.Split(key, "||") + if len(split) != 4 { + continue + } + + instanceIDs = append(instanceIDs, split[2]) + } + + return instanceIDs, err } - c.Cancel = portForward.Stop + ids := resp.InstanceIds - return c, nil + for resp.ContinuationToken != nil { + resp, err = c.WF.ListInstanceIDs(ctx, workflow.WithListInstanceIDsContinuationToken(*resp.ContinuationToken)) + if err != nil { + return nil, err + } + + ids = append(ids, resp.InstanceIds...) + } + + return ids, nil } -func clientFromComponents(comps []v1alpha1.Component, appID string, port string) (*Client, error) { +func (c *Client) InstanceHistory(ctx context.Context, instanceID string) ([]*protos.HistoryEvent, error) { + var history []*protos.HistoryEvent + resp, err := c.WF.GetInstanceHistory(ctx, instanceID) + if err != nil { + code, ok := status.FromError(err) + if !ok || (code.Code() != codes.Unimplemented && code.Code() != codes.Unknown) { + return nil, err + } + + // Dapr is pre v1.17, so fall back to reading from the state store + // directly. + history, err = c.fetchHistory(ctx, instanceID) + if err != nil { + return nil, err + } + } else { + history = resp.Events + } + + // Sort: EventId if both present, else Timestamp + sort.SliceStable(history, func(i, j int) bool { + ei, ej := history[i], history[j] + if ei.EventId > 0 && ej.EventId > 0 { + return ei.EventId < ej.EventId + } + ti, tj := ei.GetTimestamp().AsTime(), ej.GetTimestamp().AsTime() + if !ti.Equal(tj) { + return ti.Before(tj) + } + return ei.EventId < ej.EventId + }) + + return history, nil +} + +func (c *Client) metaKeysFromDB(ctx context.Context) ([]string, error) { + if c.dbConnString == nil { + return nil, fmt.Errorf("connection string is required for all database drivers for Dapr pre v1.17") + } + + var comps []v1alpha1.Component + if c.kubernetesMode { + kclient, err := kubernetes.DaprClient() + if err != nil { + return nil, err + } + + kcomps, err := kubernetes.ListComponents(kclient, c.ns) + if err != nil { + return nil, err + } + comps = kcomps.Items + } else { + var err error + comps, err = loader.NewLocalLoader(c.appID, c.resourcePaths).Load(ctx) + if err != nil { + return nil, err + } + } + var comp *v1alpha1.Component for _, c := range comps { for _, meta := range c.Spec.Metadata { @@ -175,7 +316,7 @@ func clientFromComponents(comps []v1alpha1.Component, appID string, port string) } if comp == nil { - return nil, fmt.Errorf("no state store configured for app id %s", appID) + return nil, fmt.Errorf("no actor state store configured for app id %s", c.appID) } driver, err := driverFromType(comp.Spec.Type) @@ -183,11 +324,6 @@ func clientFromComponents(comps []v1alpha1.Component, appID string, port string) return nil, err } - client, err := client.NewClientWithAddress("localhost:" + port) - if err != nil { - return nil, err - } - var tableName *string for _, meta := range comp.Spec.Metadata { switch meta.Name { @@ -196,11 +332,58 @@ func clientFromComponents(comps []v1alpha1.Component, appID string, port string) } } - return &Client{ - Dapr: client, - StateStoreDriver: driver, - TableName: tableName, - }, nil + switch { + case isSQLDriver(driver): + if tableName == nil { + tableName = ptr.Of("state") + } + + sqldb, err := db.SQL(ctx, driver, *c.dbConnString) + if err != nil { + return nil, err + } + defer sqldb.Close() + + key := "key" + if driver == "mysql" { + key = "id" + } + + return db.ListSQL(ctx, sqldb, *tableName, key, db.ListOptions{ + Namespace: c.ns, + AppID: c.appID, + }) + + case driver == "redis": + client, err := db.Redis(ctx, *c.dbConnString) + if err != nil { + return nil, err + } + + return db.ListRedis(ctx, client, db.ListOptions{ + Namespace: c.ns, + AppID: c.appID, + }) + + case driver == "mongodb": + client, err := db.Mongo(ctx, *c.dbConnString) + if err != nil { + return nil, err + } + + collectionName := "daprCollection" + if tableName != nil { + collectionName = *tableName + } + + return db.ListMongo(ctx, client.Database("daprStore"), collectionName, db.ListOptions{ + Namespace: c.ns, + AppID: c.appID, + }) + + default: + return nil, fmt.Errorf("unsupported driver: %s", driver) + } } func driverFromType(v string) (string, error) { @@ -226,7 +409,7 @@ func driverFromType(v string) (string, error) { } } -func IsSQLDriver(driver string) bool { +func isSQLDriver(driver string) bool { return slices.Contains([]string{ "mysql", "pgx", @@ -235,3 +418,74 @@ func IsSQLDriver(driver string) bool { "oracle", }, driver) } + +func (c *Client) fetchHistory(ctx context.Context, instanceID string) ([]*protos.HistoryEvent, error) { + + actorType := "dapr.internal." + c.ns + "." + c.appID + ".workflow" + + var events []*protos.HistoryEvent + for startIndex := 0; startIndex <= 1; startIndex++ { + if len(events) > 0 { + break + } + + for i := startIndex; i < maxHistoryEntries; i++ { + key := fmt.Sprintf("history-%06d", i) + + resp, err := c.Dapr.GetActorState(ctx, &client.GetActorStateRequest{ + ActorType: actorType, + ActorID: instanceID, + KeyName: key, + }) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return nil, err + } + break + } + + if resp == nil || len(resp.Data) == 0 { + break + } + + var event protos.HistoryEvent + if err = decodeKey(resp.Data, &event); err != nil { + return nil, fmt.Errorf("failed to decode history event %s: %w", key, err) + } + + events = append(events, &event) + } + } + + return events, nil +} + +func decodeKey(data []byte, item proto.Message) error { + if len(data) == 0 { + return fmt.Errorf("empty value") + } + + if err := protojson.Unmarshal(data, item); err == nil { + return nil + } + + if unquoted, err := UnquoteJSON(data); err == nil { + if err := protojson.Unmarshal([]byte(unquoted), item); err == nil { + return nil + } + } + + if err := proto.Unmarshal(data, item); err == nil { + return nil + } + + return fmt.Errorf("unable to decode history event (len=%d)", len(data)) +} + +func UnquoteJSON(data []byte) (string, error) { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return "", err + } + return s, nil +} diff --git a/pkg/workflow/events.go b/pkg/workflow/events.go index 4080f7fcc..a21eae847 100644 --- a/pkg/workflow/events.go +++ b/pkg/workflow/events.go @@ -42,14 +42,12 @@ func RaiseEvent(ctx context.Context, opts RaiseEventOptions) error { } defer cli.Cancel() - wf := workflow.NewClient(cli.Dapr.GrpcClientConn()) - var wopts []workflow.RaiseEventOptions if opts.Input != nil { wopts = append(wopts, workflow.WithEventPayload(*opts.Input)) } - return wf.RaiseEvent(ctx, opts.InstanceID, opts.Name, wopts...) + return cli.WF.RaiseEvent(ctx, opts.InstanceID, opts.Name, wopts...) } type SuspendOptions struct { diff --git a/pkg/workflow/history.go b/pkg/workflow/history.go index b8a4f8f2b..348650e90 100644 --- a/pkg/workflow/history.go +++ b/pkg/workflow/history.go @@ -15,15 +15,11 @@ package workflow import ( "context" - "encoding/json" - "errors" "fmt" "sort" "strings" "time" - "google.golang.org/protobuf/encoding/protojson" - "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/wrapperspb" "github.com/dapr/cli/cmd/runtime" @@ -31,12 +27,9 @@ import ( "github.com/dapr/cli/utils" "github.com/dapr/durabletask-go/api/protos" "github.com/dapr/durabletask-go/workflow" - "github.com/dapr/go-sdk/client" "github.com/dapr/kit/ptr" ) -const maxHistoryEntries = 100 - type HistoryOptions struct { KubernetesMode bool Namespace string @@ -116,28 +109,11 @@ func HistoryWide(ctx context.Context, opts HistoryOptions) ([]*HistoryOutputWide } defer cli.Cancel() - history, err := fetchHistory(ctx, - cli.Dapr, - "dapr.internal."+opts.Namespace+"."+opts.AppID+".workflow", - opts.InstanceID, - ) + history, err := cli.InstanceHistory(ctx, opts.InstanceID) if err != nil { return nil, err } - // Sort: EventId if both present, else Timestamp - sort.SliceStable(history, func(i, j int) bool { - ei, ej := history[i], history[j] - if ei.EventId > 0 && ej.EventId > 0 { - return ei.EventId < ej.EventId - } - ti, tj := ei.GetTimestamp().AsTime(), ej.GetTimestamp().AsTime() - if !ti.Equal(tj) { - return ti.Before(tj) - } - return ei.EventId < ej.EventId - }) - var rows []*HistoryOutputWide var prevTs time.Time replay := 0 @@ -265,74 +241,6 @@ func HistoryWide(ctx context.Context, opts HistoryOptions) ([]*HistoryOutputWide return rows, nil } -func fetchHistory(ctx context.Context, cl client.Client, actorType, instanceID string) ([]*protos.HistoryEvent, error) { - var events []*protos.HistoryEvent - for startIndex := 0; startIndex <= 1; startIndex++ { - if len(events) > 0 { - break - } - - for i := startIndex; i < maxHistoryEntries; i++ { - key := fmt.Sprintf("history-%06d", i) - - resp, err := cl.GetActorState(ctx, &client.GetActorStateRequest{ - ActorType: actorType, - ActorID: instanceID, - KeyName: key, - }) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - return nil, err - } - break - } - - if resp == nil || len(resp.Data) == 0 { - break - } - - var event protos.HistoryEvent - if err = decodeKey(resp.Data, &event); err != nil { - return nil, fmt.Errorf("failed to decode history event %s: %w", key, err) - } - - events = append(events, &event) - } - } - - return events, nil -} - -func decodeKey(data []byte, item proto.Message) error { - if len(data) == 0 { - return fmt.Errorf("empty value") - } - - if err := protojson.Unmarshal(data, item); err == nil { - return nil - } - - if unquoted, err := unquoteJSON(data); err == nil { - if err := protojson.Unmarshal([]byte(unquoted), item); err == nil { - return nil - } - } - - if err := proto.Unmarshal(data, item); err == nil { - return nil - } - - return fmt.Errorf("unable to decode history event (len=%d)", len(data)) -} - -func unquoteJSON(data []byte) (string, error) { - var s string - if err := json.Unmarshal(data, &s); err != nil { - return "", err - } - return s, nil -} - func eventTypeName(h *protos.HistoryEvent) string { switch h.GetEventType().(type) { case *protos.HistoryEvent_ExecutionStarted: @@ -506,7 +414,7 @@ func trim(ww *wrapperspb.StringValue, limit int) string { return "" } - s, err := unquoteJSON([]byte(ww.Value)) + s, err := dclient.UnquoteJSON([]byte(ww.Value)) if err != nil { s = ww.Value } diff --git a/pkg/workflow/list.go b/pkg/workflow/list.go index 44f40fec0..477512e3c 100644 --- a/pkg/workflow/list.go +++ b/pkg/workflow/list.go @@ -24,8 +24,6 @@ import ( "github.com/dapr/cli/pkg/workflow/dclient" "github.com/dapr/durabletask-go/api" "github.com/dapr/durabletask-go/api/protos" - "github.com/dapr/durabletask-go/workflow" - "github.com/dapr/go-sdk/client" "github.com/dapr/kit/ptr" "k8s.io/apimachinery/pkg/util/duration" ) @@ -94,52 +92,29 @@ func ListShort(ctx context.Context, opts ListOptions) ([]*ListOutputShort, error func ListWide(ctx context.Context, opts ListOptions) ([]*ListOutputWide, error) { dclient, err := dclient.DaprClient(ctx, dclient.Options{ - KubernetesMode: opts.KubernetesMode, - Namespace: opts.Namespace, - AppID: opts.AppID, - RuntimePath: runtime.GetDaprRuntimePath(), + KubernetesMode: opts.KubernetesMode, + Namespace: opts.Namespace, + AppID: opts.AppID, + RuntimePath: runtime.GetDaprRuntimePath(), + DBConnectionString: opts.ConnectionString, }) if err != nil { return nil, fmt.Errorf("failed to create Dapr client: %w", err) } defer dclient.Cancel() - connString := opts.ConnectionString - if connString == nil { - connString = dclient.ConnectionString - } - tableName := opts.TableName - if tableName == nil { - tableName = dclient.TableName - } - - metaKeys, err := metakeys(ctx, DBOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - Driver: dclient.StateStoreDriver, - ConnectionString: connString, - TableName: tableName, - }) + instanceIDs, err := dclient.InstanceIDs(ctx) if err != nil { return nil, err } - return list(ctx, metaKeys, dclient.Dapr, opts) + return list(ctx, instanceIDs, dclient, opts) } -func list(ctx context.Context, metaKeys []string, cl client.Client, opts ListOptions) ([]*ListOutputWide, error) { - wf := workflow.NewClient(cl.GrpcClientConn()) - +func list(ctx context.Context, instanceIDs []string, cl *dclient.Client, opts ListOptions) ([]*ListOutputWide, error) { var listOutput []*ListOutputWide - for _, key := range metaKeys { - split := strings.Split(key, "||") - if len(split) != 4 { - continue - } - - instanceID := split[2] - - resp, err := wf.FetchWorkflowMetadata(ctx, instanceID) + for _, instanceID := range instanceIDs { + resp, err := cl.WF.FetchWorkflowMetadata(ctx, instanceID) if err != nil { return nil, err } @@ -153,6 +128,7 @@ func list(ctx context.Context, metaKeys []string, cl client.Client, opts ListOpt if opts.Filter.MaxAge != nil && resp.CreatedAt.AsTime().Before(*opts.Filter.MaxAge) { continue } + // TODO: @joshvanl: add `WorkflowIsCompleted` func to workflow package. //nolint:govet if opts.Filter.Terminal && !api.OrchestrationMetadataIsComplete(ptr.Of(protos.OrchestrationMetadata(*resp))) { diff --git a/pkg/workflow/purge.go b/pkg/workflow/purge.go index 9f84484d3..711473009 100644 --- a/pkg/workflow/purge.go +++ b/pkg/workflow/purge.go @@ -19,11 +19,8 @@ import ( "os" "time" - clientv3 "go.etcd.io/etcd/client/v3" - "github.com/dapr/cli/cmd/runtime" "github.com/dapr/cli/pkg/print" - "github.com/dapr/cli/pkg/scheduler" "github.com/dapr/cli/pkg/workflow/dclient" "github.com/dapr/durabletask-go/workflow" ) @@ -36,29 +33,20 @@ type PurgeOptions struct { InstanceIDs []string AllOlderThan *time.Time All bool + Force bool ConnectionString *string TableName *string } func Purge(ctx context.Context, opts PurgeOptions) error { - cli, err := dclient.DaprClient(ctx, dclient.Options{ - KubernetesMode: opts.KubernetesMode, - Namespace: opts.Namespace, - AppID: opts.AppID, - RuntimePath: runtime.GetDaprRuntimePath(), - }) - if err != nil { - return err - } - defer cli.Cancel() - var toPurge []string if len(opts.InstanceIDs) > 0 { toPurge = opts.InstanceIDs } else { var list []*ListOutputWide + var err error list, err = ListWide(ctx, ListOptions{ KubernetesMode: opts.KubernetesMode, Namespace: opts.Namespace, @@ -88,41 +76,25 @@ func Purge(ctx context.Context, opts PurgeOptions) error { } } - wf := workflow.NewClient(cli.Dapr.GrpcClientConn()) - - etcdClient, cancel, err := scheduler.EtcdClient(opts.KubernetesMode, opts.SchedulerNamespace) + cli, err := dclient.DaprClient(ctx, dclient.Options{ + KubernetesMode: opts.KubernetesMode, + Namespace: opts.Namespace, + AppID: opts.AppID, + RuntimePath: runtime.GetDaprRuntimePath(), + DBConnectionString: opts.ConnectionString, + }) if err != nil { return err } - defer cancel() + defer cli.Cancel() print.InfoStatusEvent(os.Stdout, "Purging %d workflow instance(s)", len(toPurge)) for _, id := range toPurge { - if err = wf.PurgeWorkflowState(ctx, id); err != nil { + if err = cli.WF.PurgeWorkflowState(ctx, id, workflow.WithForcePurge(opts.Force)); err != nil { return fmt.Errorf("%s: %w", id, err) } - paths := []string{ - fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.Namespace, opts.Namespace, opts.AppID, id), - fmt.Sprintf("dapr/jobs/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.Namespace, opts.Namespace, opts.AppID, id), - fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.workflow||%s||", opts.Namespace, opts.Namespace, opts.AppID, id), - fmt.Sprintf("dapr/counters/actorreminder||%s||dapr.internal.%s.%s.activity||%s::", opts.Namespace, opts.Namespace, opts.AppID, id), - } - - oopts := make([]clientv3.Op, 0, len(paths)) - for _, path := range paths { - oopts = append(oopts, clientv3.OpDelete(path, - clientv3.WithPrefix(), - clientv3.WithPrevKV(), - clientv3.WithKeysOnly(), - )) - } - - if _, err = etcdClient.Txn(ctx).Then(oopts...).Commit(); err != nil { - return err - } - print.SuccessStatusEvent(os.Stdout, "Purged workflow instance %q", id) } diff --git a/pkg/workflow/rerun.go b/pkg/workflow/rerun.go index 02abc2555..b262d0383 100644 --- a/pkg/workflow/rerun.go +++ b/pkg/workflow/rerun.go @@ -43,8 +43,6 @@ func ReRun(ctx context.Context, opts ReRunOptions) (string, error) { } defer cli.Cancel() - wf := workflow.NewClient(cli.Dapr.GrpcClientConn()) - var wopts []workflow.RerunOptions if opts.NewInstanceID != nil { wopts = append(wopts, workflow.WithRerunNewInstanceID(*opts.NewInstanceID)) @@ -53,5 +51,5 @@ func ReRun(ctx context.Context, opts ReRunOptions) (string, error) { wopts = append(wopts, workflow.WithRerunInput(*opts.Input)) } - return wf.RerunWorkflowFromEvent(ctx, opts.InstanceID, opts.EventID, wopts...) + return cli.WF.RerunWorkflowFromEvent(ctx, opts.InstanceID, opts.EventID, wopts...) } diff --git a/pkg/workflow/workflow.go b/pkg/workflow/workflow.go deleted file mode 100644 index 033b89d58..000000000 --- a/pkg/workflow/workflow.go +++ /dev/null @@ -1,85 +0,0 @@ -/* -Copyright 2025 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package workflow - -import ( - "context" - "fmt" - - "github.com/dapr/cli/pkg/workflow/db" - "github.com/dapr/cli/pkg/workflow/dclient" -) - -type DBOptions struct { - Namespace string - AppID string - Driver string - ConnectionString *string - TableName *string -} - -func metakeys(ctx context.Context, opts DBOptions) ([]string, error) { - if opts.ConnectionString == nil { - return nil, fmt.Errorf("connection string is required for all drivers") - } - - switch { - case dclient.IsSQLDriver(opts.Driver): - tableName := "state" - if opts.TableName != nil { - tableName = *opts.TableName - } - - sqldb, err := db.SQL(ctx, opts.Driver, *opts.ConnectionString) - if err != nil { - return nil, err - } - defer sqldb.Close() - - return db.ListSQL(ctx, sqldb, tableName, db.ListOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - }) - - case opts.Driver == "redis": - client, err := db.Redis(ctx, *opts.ConnectionString) - if err != nil { - return nil, err - } - - return db.ListRedis(ctx, client, db.ListOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - }) - - case opts.Driver == "mongodb": - client, err := db.Mongo(ctx, *opts.ConnectionString) - if err != nil { - return nil, err - } - - collectionName := "daprCollection" - if opts.TableName != nil { - collectionName = *opts.TableName - } - - return db.ListMongo(ctx, client.Database("daprStore"), collectionName, db.ListOptions{ - Namespace: opts.Namespace, - AppID: opts.AppID, - }) - - default: - return nil, fmt.Errorf("unsupported driver: %s", opts.Driver) - } -} diff --git a/tests/apps/emit-metrics/app.go b/tests/apps/emit-metrics/app.go index 7b1a1662a..bbdfd8f2d 100644 --- a/tests/apps/emit-metrics/app.go +++ b/tests/apps/emit-metrics/app.go @@ -23,6 +23,8 @@ import ( "net/http" "os" "time" + + "github.com/dapr/kit/signals" ) type Metrics struct { @@ -48,8 +50,17 @@ func main() { } finalURL := "http://" + host + ":" + port + "/metrics" log.Println("Sending metrics to ", finalURL) + + ctx := signals.Context() + for i := 0; i < 2000; i++ { - time.Sleep(1 * time.Second) + select { + case <-ctx.Done(): + log.Println("Shutting down metrics sender app") + return + case <-time.After(time.Second): + } + metrics := Metrics{ MetricsID: i, } diff --git a/tests/apps/emit-metrics/go.mod b/tests/apps/emit-metrics/go.mod index 67248acda..755259774 100644 --- a/tests/apps/emit-metrics/go.mod +++ b/tests/apps/emit-metrics/go.mod @@ -1,3 +1,10 @@ module emit-metrics go 1.24.7 + +require github.com/dapr/kit v0.16.1 + +require ( + github.com/sirupsen/logrus v1.9.3 // indirect + golang.org/x/sys v0.33.0 // indirect +) diff --git a/tests/apps/emit-metrics/go.sum b/tests/apps/emit-metrics/go.sum new file mode 100644 index 000000000..c32d34425 --- /dev/null +++ b/tests/apps/emit-metrics/go.sum @@ -0,0 +1,20 @@ +github.com/dapr/kit v0.16.1 h1:MqLAhHVg8trPy2WJChMZFU7ToeondvxcNHYVvMDiVf4= +github.com/dapr/kit v0.16.1/go.mod h1:40ZWs5P6xfYf7O59XgwqZkIyDldTIXlhTQhGop8QoSM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tests/apps/workflow/app.go b/tests/apps/workflow/app.go index c5d7e335f..a070c3fad 100644 --- a/tests/apps/workflow/app.go +++ b/tests/apps/workflow/app.go @@ -184,7 +184,7 @@ func LongRunningActivity(ctx workflow.ActivityContext) (any, error) { stage = "unknown" } - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 6) return fmt.Sprintf("Completed %s at %s", stage, time.Now().Format(time.RFC3339)), nil } diff --git a/tests/e2e/spawn/spawn.go b/tests/e2e/spawn/spawn.go index e8ffef6c7..68af6e68a 100644 --- a/tests/e2e/spawn/spawn.go +++ b/tests/e2e/spawn/spawn.go @@ -15,12 +15,8 @@ package spawn import ( "bufio" - "bytes" "context" - "fmt" "os/exec" - "syscall" - "time" ) // CommandWithContext runs a command with its arguments in background. @@ -81,36 +77,6 @@ func Command(command string, arguments ...string) (string, error) { // and returns the combined stdout, stderr or the error. func CommandExecWithContext(ctx context.Context, command string, arguments ...string) (string, error) { cmd := exec.CommandContext(ctx, command, arguments...) - var b bytes.Buffer - cmd.Stdout = &b - cmd.Stderr = &b - - err := cmd.Start() - if err != nil { - return "", fmt.Errorf("error starting command : %w", err) - } - - waitErrChan := make(chan error, 1) - go func(errChan chan error) { - waitErr := cmd.Wait() - if waitErr != nil { - fmt.Printf("error waiting for command : %s\n", waitErr) - } - waitErrChan <- waitErr - }(waitErrChan) - ctx, cancel := context.WithTimeout(ctx, time.Second*20) - defer cancel() - <-ctx.Done() - if cmd.ProcessState == nil || !cmd.ProcessState.Exited() { - cmd.Process.Signal(syscall.SIGTERM) - time.Sleep(10 * time.Second) - if cmd.ProcessState == nil || !cmd.ProcessState.Exited() { - err = cmd.Process.Kill() - if err != nil { - return b.String(), fmt.Errorf("error killing command : %w", err) - } - } - } - - return b.String(), <-waitErrChan + b, err := cmd.CombinedOutput() + return string(b), err } diff --git a/tests/e2e/standalone/run_template_test.go b/tests/e2e/standalone/run_template_test.go index b3f4c387b..a60e03089 100644 --- a/tests/e2e/standalone/run_template_test.go +++ b/tests/e2e/standalone/run_template_test.go @@ -39,6 +39,7 @@ type AppTestOutput struct { } func TestRunWithTemplateFile(t *testing.T) { + cmdUninstall() cleanUpLogs() ensureDaprInstallation(t) t.Cleanup(func() { @@ -52,14 +53,26 @@ func TestRunWithTemplateFile(t *testing.T) { t.Cleanup(func() { // assumption in the test is that there is only one set of app and daprd logs in the logs directory. cleanUpLogs() - waitAppsToBeStopped() }) args := []string{ "-f", runFilePath, } - output, err := cmdRunWithContext(t.Context(), "", args...) - t.Logf("%s", output) - require.NoError(t, err, "run failed") + + outputCh := make(chan string) + go func() { + output, _ := cmdRun("", args...) + t.Logf("%s", output) + outputCh <- output + }() + time.Sleep(time.Second * 10) + cmdStopWithRunTemplate(runFilePath) + var output string + select { + case output = <-outputCh: + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for run command to finish") + } + // Deterministic output for template file, so we can assert line by line lines := strings.Split(output, "\n") assert.GreaterOrEqual(t, len(lines), 4, "expected at least 4 lines in output of starting two apps") @@ -95,18 +108,33 @@ func TestRunWithTemplateFile(t *testing.T) { }) t.Run("valid template file", func(t *testing.T) { + cmdUninstall() + ensureDaprInstallation(t) + runFilePath := "../testdata/run-template-files/dapr.yaml" t.Cleanup(func() { // assumption in the test is that there is only one set of app and daprd logs in the logs directory. cleanUpLogs() - waitAppsToBeStopped() }) args := []string{ "-f", runFilePath, } - output, err := cmdRunWithContext(t.Context(), "", args...) - t.Logf("%s", output) - require.NoError(t, err, "run failed") + + outputCh := make(chan string) + go func() { + output, _ := cmdRun("", args...) + t.Logf("%s", output) + outputCh <- output + }() + time.Sleep(time.Second * 10) + cmdStopWithRunTemplate(runFilePath) + var output string + select { + case output = <-outputCh: + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for run command to finish") + } + // Deterministic output for template file, so we can assert line by line lines := strings.Split(output, "\n") assert.GreaterOrEqual(t, len(lines), 6, "expected at least 6 lines in output of starting two apps") @@ -150,17 +178,31 @@ func TestRunWithTemplateFile(t *testing.T) { t.Run("invalid template file env var not set", func(t *testing.T) { runFilePath := "../testdata/run-template-files/env_var_not_set_dapr.yaml" + cmdUninstall() + ensureDaprInstallation(t) + t.Cleanup(func() { // assumption in the test is that there is only one set of app and daprd logs in the logs directory. cleanUpLogs() - waitAppsToBeStopped() }) args := []string{ "-f", runFilePath, } - output, err := cmdRunWithContext(t.Context(), "", args...) - t.Logf("%s", output) - require.NoError(t, err, "run failed") + outputCh := make(chan string) + go func() { + output, _ := cmdRun("", args...) + t.Logf("%s", output) + outputCh <- output + }() + time.Sleep(time.Second * 10) + cmdStopWithRunTemplate(runFilePath) + var output string + select { + case output = <-outputCh: + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for run command to finish") + } + // Deterministic output for template file, so we can assert line by line lines := strings.Split(output, "\n") assert.GreaterOrEqual(t, len(lines), 6, "expected at least 6 lines in output of starting two apps") @@ -197,18 +239,32 @@ func TestRunWithTemplateFile(t *testing.T) { }) t.Run("valid template file no app command", func(t *testing.T) { + cmdUninstall() + ensureDaprInstallation(t) + runFilePath := "../testdata/run-template-files/no_app_command.yaml" t.Cleanup(func() { // assumption in the test is that there is only one set of app and daprd logs in the logs directory. cleanUpLogs() - waitAppsToBeStopped() }) args := []string{ "-f", runFilePath, } - output, err := cmdRunWithContext(t.Context(), "", args...) - t.Logf("%s", output) - require.NoError(t, err, "run failed") + outputCh := make(chan string) + go func() { + output, _ := cmdRun("", args...) + t.Logf("%s", output) + outputCh <- output + }() + time.Sleep(time.Second * 10) + cmdStopWithRunTemplate(runFilePath) + var output string + select { + case output = <-outputCh: + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for run command to finish") + } + // Deterministic output for template file, so we can assert line by line lines := strings.Split(output, "\n") assert.GreaterOrEqual(t, len(lines), 7, "expected at least 7 lines in output of starting two apps with one app not having a command") @@ -246,18 +302,32 @@ func TestRunWithTemplateFile(t *testing.T) { }) t.Run("valid template file empty app command", func(t *testing.T) { + cmdUninstall() + ensureDaprInstallation(t) + runFilePath := "../testdata/run-template-files/empty_app_command.yaml" t.Cleanup(func() { // assumption in the test is that there is only one set of app and daprd logs in the logs directory. cleanUpLogs() - waitAppsToBeStopped() }) args := []string{ "-f", runFilePath, } - output, err := cmdRunWithContext(t.Context(), "", args...) - t.Logf("%s", output) - require.Error(t, err, "run must fail") + outputCh := make(chan string) + go func() { + output, _ := cmdRun("", args...) + t.Logf("%s", output) + outputCh <- output + }() + time.Sleep(time.Second * 10) + cmdStopWithRunTemplate(runFilePath) + var output string + select { + case output = <-outputCh: + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for run command to finish") + } + // Deterministic output for template file, so we can assert line by line lines := strings.Split(output, "\n") assert.GreaterOrEqual(t, len(lines), 5, "expected at least 5 lines in output of starting two apps with last app having an empty command") @@ -292,18 +362,31 @@ func TestRunWithTemplateFile(t *testing.T) { }) t.Run("valid template file with app/daprd log destinations", func(t *testing.T) { + cmdUninstall() + ensureDaprInstallation(t) + runFilePath := "../testdata/run-template-files/app_output_to_file_and_console.yaml" t.Cleanup(func() { // assumption in the test is that there is only one set of app and daprd logs in the logs directory. cleanUpLogs() - waitAppsToBeStopped() }) args := []string{ "-f", runFilePath, } - output, err := cmdRunWithContext(t.Context(), "", args...) - t.Logf("%s", output) - require.NoError(t, err, "run failed") + outputCh := make(chan string) + go func() { + output, _ := cmdRun("", args...) + t.Logf("%s", output) + outputCh <- output + }() + time.Sleep(time.Second * 10) + cmdStopWithRunTemplate(runFilePath) + var output string + select { + case output = <-outputCh: + case <-time.After(time.Second * 10): + t.Fatal("timed out waiting for run command to finish") + } // App logs for processor app should not be printed to console and only written to file. assert.NotContains(t, output, "== APP - processor") @@ -358,7 +441,7 @@ func TestRunTemplateFileWithoutDaprInit(t *testing.T) { args := []string{ "-f", "../testdata/run-template-files/no_app_command.yaml", } - output, err := cmdRunWithContext(t.Context(), "", args...) + output, err := cmdRun("", args...) t.Logf("%s", output) require.Error(t, err, "run must fail") assert.Contains(t, output, "Error starting Dapr and app (\"processor\"): fork/exec") @@ -408,7 +491,3 @@ func lookUpFileFullName(dirPath, partialFilename string) (string, error) { } return "", fmt.Errorf("failed to find file with partial name %s in directory %s", partialFilename, dirPath) } - -func waitAppsToBeStopped() { - time.Sleep(15 * time.Second) -} diff --git a/tests/e2e/standalone/scheduler_test.go b/tests/e2e/standalone/scheduler_test.go index b95d6c086..eb311bc87 100644 --- a/tests/e2e/standalone/scheduler_test.go +++ b/tests/e2e/standalone/scheduler_test.go @@ -38,17 +38,14 @@ func TestSchedulerList(t *testing.T) { cmdUninstall() ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" t.Cleanup(func() { - cmdStopWithAppID("test-scheduler") - waitAppsToBeStopped() + cmdStopWithRunTemplate(runFilePath) + must(t, cmdUninstall, "failed to uninstall Dapr") }) - args := []string{"-f", runFilePath} + args := []string{"-f", runFilePath} go func() { o, err := cmdRun("", args...) t.Log(o) @@ -193,15 +190,13 @@ func TestSchedulerGet(t *testing.T) { cmdUninstall() ensureDaprInstallation(t) - t.Cleanup(func() { - must(t, cmdUninstall, "failed to uninstall Dapr") - }) runFilePath := "../testdata/run-template-files/test-scheduler.yaml" t.Cleanup(func() { - cmdStopWithAppID("test-scheduler") - waitAppsToBeStopped() + cmdStopWithRunTemplate(runFilePath) + must(t, cmdUninstall, "failed to uninstall Dapr") }) + args := []string{"-f", runFilePath} go func() { @@ -337,15 +332,21 @@ func TestSchedulerDelete(t *testing.T) { runFilePath := "../testdata/run-template-files/test-scheduler.yaml" t.Cleanup(func() { - cmdStopWithAppID("test-scheduler") - waitAppsToBeStopped() + cmdStopWithRunTemplate(runFilePath) + must(t, cmdUninstall, "failed to uninstall Dapr") }) args := []string{"-f", runFilePath} go func() { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) + for range 10 { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + if err == nil { + break + } + time.Sleep(time.Second * 2) + } }() require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -409,15 +410,21 @@ func TestSchedulerDeleteAllAll(t *testing.T) { runFilePath := "../testdata/run-template-files/test-scheduler.yaml" t.Cleanup(func() { - cmdStopWithAppID("test-scheduler") - waitAppsToBeStopped() + cmdStopWithRunTemplate(runFilePath) + must(t, cmdUninstall, "failed to uninstall Dapr") }) args := []string{"-f", runFilePath} go func() { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) + for range 10 { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + if err == nil { + break + } + time.Sleep(time.Second * 2) + } }() require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -447,21 +454,27 @@ func TestSchedulerDeleteAll(t *testing.T) { runFilePath := "../testdata/run-template-files/test-scheduler.yaml" t.Cleanup(func() { - cmdStopWithAppID("test-scheduler") - waitAppsToBeStopped() + cmdStopWithRunTemplate(runFilePath) + must(t, cmdUninstall, "failed to uninstall Dapr") }) args := []string{"-f", runFilePath} go func() { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) + for range 10 { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + if err == nil { + break + } + time.Sleep(time.Second * 2) + } }() require.EventuallyWithT(t, func(c *assert.CollectT) { output, err := cmdSchedulerList() require.NoError(t, err) - assert.Len(c, strings.Split(output, "\n"), 10) + assert.GreaterOrEqual(c, len(strings.Split(output, "\n")), 7) }, time.Second*30, time.Millisecond*10) _, err := cmdSchedulerDeleteAll("app/test-scheduler") @@ -508,15 +521,21 @@ func TestSchedulerExportImport(t *testing.T) { runFilePath := "../testdata/run-template-files/test-scheduler.yaml" t.Cleanup(func() { - cmdStopWithAppID("test-scheduler") - waitAppsToBeStopped() + cmdStopWithRunTemplate(runFilePath) + must(t, cmdUninstall, "failed to uninstall Dapr") }) args := []string{"-f", runFilePath} go func() { - o, err := cmdRun("", args...) - t.Log(o) - t.Log(err) + for range 10 { + o, err := cmdRun("", args...) + t.Log(o) + t.Log(err) + if err == nil { + break + } + time.Sleep(time.Second * 2) + } }() require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -537,7 +556,10 @@ func TestSchedulerExportImport(t *testing.T) { _, err = cmdSchedulerImport("-f", f) require.NoError(t, err) - output, err = cmdSchedulerList() - require.NoError(t, err) - assert.Len(t, strings.Split(output, "\n"), 10) + + require.EventuallyWithT(t, func(c *assert.CollectT) { + output, err := cmdSchedulerList() + require.NoError(t, err) + assert.GreaterOrEqual(c, len(strings.Split(output, "\n")), 9) + }, time.Second*30, time.Millisecond*10) } diff --git a/tests/e2e/standalone/workflow_test.go b/tests/e2e/standalone/workflow_test.go index 471a18a60..a4698e7d1 100644 --- a/tests/e2e/standalone/workflow_test.go +++ b/tests/e2e/standalone/workflow_test.go @@ -48,7 +48,6 @@ func TestWorkflowList(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -106,7 +105,6 @@ func TestWorkflowRaiseEvent(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -188,7 +186,6 @@ func TestWorkflowReRun(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -245,7 +242,6 @@ func TestWorkflowPurge(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -359,7 +355,6 @@ func TestWorkflowFilters(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -426,7 +421,6 @@ func TestWorkflowChildCalls(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -546,7 +540,6 @@ func TestWorkflowHistory(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -601,7 +594,6 @@ func TestWorkflowSuspendResume(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} @@ -680,7 +672,6 @@ func TestWorkflowTerminate(t *testing.T) { appID := "test-workflow" t.Cleanup(func() { cmdStopWithAppID(appID) - waitAppsToBeStopped() }) args := []string{"-f", runFilePath} diff --git a/tests/e2e/upgrade/upgrade_test.go b/tests/e2e/upgrade/upgrade_test.go index 732da491c..b003c12e7 100644 --- a/tests/e2e/upgrade/upgrade_test.go +++ b/tests/e2e/upgrade/upgrade_test.go @@ -30,9 +30,9 @@ type upgradePath struct { } const ( - latestRuntimeVersion = "1.16.1" - latestRuntimeVersionMinusOne = "1.15.11" - latestRuntimeVersionMinusTwo = "1.14.5" + latestRuntimeVersion = "1.17.0-rc.1" + latestRuntimeVersionMinusOne = "1.16.6" + latestRuntimeVersionMinusTwo = "1.15.11" dashboardVersion = "0.15.0" ) From 0ad7eaba6c94866d40e623321f9b68ca5657f020 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 14 Jan 2026 11:12:56 +0000 Subject: [PATCH 2/6] Cleanup workflow history output Signed-off-by: joshvanl --- pkg/workflow/history.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/workflow/history.go b/pkg/workflow/history.go index 348650e90..ee9c4cea5 100644 --- a/pkg/workflow/history.go +++ b/pkg/workflow/history.go @@ -346,25 +346,33 @@ func deriveStatus(h *protos.HistoryEvent) string { func deriveDetails(first *protos.HistoryEvent, h *protos.HistoryEvent) *string { switch t := h.GetEventType().(type) { case *protos.HistoryEvent_TaskScheduled: - ver := "" - if t.TaskScheduled.Version != nil && t.TaskScheduled.Version.Value != "" { - ver = " v" + t.TaskScheduled.Version.Value + if in := t.TaskScheduled.RerunParentInstanceInfo; in != nil { + return ptr.Of(fmt.Sprintf("rerun-parent=%s", in.InstanceID)) } - return ptr.Of(fmt.Sprintf("activity=%s%s", t.TaskScheduled.Name, ver)) + return nil case *protos.HistoryEvent_TimerCreated: - return ptr.Of(fmt.Sprintf("fireAt=%s", t.TimerCreated.FireAt.AsTime().Format(time.RFC3339))) + det := fmt.Sprintf("fireAt=%s", t.TimerCreated.FireAt.AsTime().Format(time.RFC3339)) + if in := t.TimerCreated.RerunParentInstanceInfo; in != nil { + det += fmt.Sprintf(",rerun-parent=%s", in.InstanceID) + } + return ptr.Of(det) case *protos.HistoryEvent_EventRaised: return ptr.Of(fmt.Sprintf("event=%s", t.EventRaised.Name)) case *protos.HistoryEvent_EventSent: - return ptr.Of(fmt.Sprintf("event=%s -> %s", t.EventSent.Name, t.EventSent.InstanceId)) + return ptr.Of(fmt.Sprintf("event=%s->%s", t.EventSent.Name, t.EventSent.InstanceId)) case *protos.HistoryEvent_ExecutionStarted: - return ptr.Of("orchestration start") + return ptr.Of("workflow_start") case *protos.HistoryEvent_OrchestratorStarted: - return ptr.Of("replay cycle start") + return ptr.Of("replay") case *protos.HistoryEvent_TaskCompleted: return ptr.Of(fmt.Sprintf("eventId=%d", t.TaskCompleted.TaskScheduledId)) case *protos.HistoryEvent_ExecutionCompleted: return ptr.Of(fmt.Sprintf("execDuration=%s", utils.HumanizeDuration(h.GetTimestamp().AsTime().Sub(first.GetTimestamp().AsTime())))) + case *protos.HistoryEvent_SubOrchestrationInstanceCreated: + if in := t.SubOrchestrationInstanceCreated.RerunParentInstanceInfo; in != nil { + return ptr.Of(fmt.Sprintf("rerun-parent=%s", in.InstanceID)) + } + return nil default: return nil } From f205947b7b254ed6dc80a2c3cf0f99abca227008 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 14 Jan 2026 12:05:45 +0000 Subject: [PATCH 3/6] Improve parent details Signed-off-by: joshvanl --- pkg/workflow/history.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/workflow/history.go b/pkg/workflow/history.go index ee9c4cea5..ec58d6abb 100644 --- a/pkg/workflow/history.go +++ b/pkg/workflow/history.go @@ -347,13 +347,13 @@ func deriveDetails(first *protos.HistoryEvent, h *protos.HistoryEvent) *string { switch t := h.GetEventType().(type) { case *protos.HistoryEvent_TaskScheduled: if in := t.TaskScheduled.RerunParentInstanceInfo; in != nil { - return ptr.Of(fmt.Sprintf("rerun-parent=%s", in.InstanceID)) + return ptr.Of(fmt.Sprintf("rerunParent=%s", in.InstanceID)) } return nil case *protos.HistoryEvent_TimerCreated: det := fmt.Sprintf("fireAt=%s", t.TimerCreated.FireAt.AsTime().Format(time.RFC3339)) if in := t.TimerCreated.RerunParentInstanceInfo; in != nil { - det += fmt.Sprintf(",rerun-parent=%s", in.InstanceID) + det += fmt.Sprintf(",rerunParent=%s", in.InstanceID) } return ptr.Of(det) case *protos.HistoryEvent_EventRaised: @@ -361,7 +361,11 @@ func deriveDetails(first *protos.HistoryEvent, h *protos.HistoryEvent) *string { case *protos.HistoryEvent_EventSent: return ptr.Of(fmt.Sprintf("event=%s->%s", t.EventSent.Name, t.EventSent.InstanceId)) case *protos.HistoryEvent_ExecutionStarted: - return ptr.Of("workflow_start") + d := ptr.Of("workflowStart") + if p := h.GetExecutionStarted().GetParentInstance(); p != nil { + *d += fmt.Sprintf(",parent=%s", p.GetOrchestrationInstance().GetInstanceId()) + } + return d case *protos.HistoryEvent_OrchestratorStarted: return ptr.Of("replay") case *protos.HistoryEvent_TaskCompleted: @@ -370,7 +374,7 @@ func deriveDetails(first *protos.HistoryEvent, h *protos.HistoryEvent) *string { return ptr.Of(fmt.Sprintf("execDuration=%s", utils.HumanizeDuration(h.GetTimestamp().AsTime().Sub(first.GetTimestamp().AsTime())))) case *protos.HistoryEvent_SubOrchestrationInstanceCreated: if in := t.SubOrchestrationInstanceCreated.RerunParentInstanceInfo; in != nil { - return ptr.Of(fmt.Sprintf("rerun-parent=%s", in.InstanceID)) + return ptr.Of(fmt.Sprintf("rerunParent=%s", in.InstanceID)) } return nil default: From fa1168cd2348fb52e777d1cc6332f7cfc124b2bc Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 14 Jan 2026 12:38:32 +0000 Subject: [PATCH 4/6] result -> output Signed-off-by: joshvanl --- pkg/workflow/history.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/workflow/history.go b/pkg/workflow/history.go index ec58d6abb..294e7b335 100644 --- a/pkg/workflow/history.go +++ b/pkg/workflow/history.go @@ -181,7 +181,7 @@ func HistoryWide(ctx context.Context, opts HistoryOptions) ([]*HistoryOutputWide row.ExecutionID = ptr.Of(t.TaskCompleted.TaskExecutionId) } if t.TaskCompleted.Result != nil { - row.addAttr("result", trim(t.TaskCompleted.Result, 120)) + row.addAttr("output", trim(t.TaskCompleted.Result, 120)) } case *protos.HistoryEvent_TaskFailed: row.addAttr("scheduledId", fmt.Sprintf("%d", t.TaskFailed.TaskScheduledId)) From e5b82b3133e646a994c00bda44d7ae341bffeb9a Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 14 Jan 2026 13:11:24 +0000 Subject: [PATCH 5/6] Adds input/output to child workflow history Signed-off-by: joshvanl --- pkg/workflow/history.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/workflow/history.go b/pkg/workflow/history.go index 294e7b335..224d1aafa 100644 --- a/pkg/workflow/history.go +++ b/pkg/workflow/history.go @@ -183,6 +183,14 @@ func HistoryWide(ctx context.Context, opts HistoryOptions) ([]*HistoryOutputWide if t.TaskCompleted.Result != nil { row.addAttr("output", trim(t.TaskCompleted.Result, 120)) } + case *protos.HistoryEvent_SubOrchestrationInstanceCreated: + if t.SubOrchestrationInstanceCreated.Input != nil { + row.addAttr("input", trim(t.SubOrchestrationInstanceCreated.Input, 120)) + } + case *protos.HistoryEvent_SubOrchestrationInstanceCompleted: + if t.SubOrchestrationInstanceCompleted.Result != nil { + row.addAttr("output", trim(t.SubOrchestrationInstanceCompleted.Result, 120)) + } case *protos.HistoryEvent_TaskFailed: row.addAttr("scheduledId", fmt.Sprintf("%d", t.TaskFailed.TaskScheduledId)) if t.TaskFailed.TaskExecutionId != "" { From 2cbe1765968c86c4ed665b24106d94c3bc64429a Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 14 Jan 2026 17:10:31 +0000 Subject: [PATCH 6/6] Adds eventId to SubOrchestrationCompleted details ouput Signed-off-by: joshvanl --- pkg/workflow/history.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/workflow/history.go b/pkg/workflow/history.go index 224d1aafa..214a5d3a5 100644 --- a/pkg/workflow/history.go +++ b/pkg/workflow/history.go @@ -385,6 +385,8 @@ func deriveDetails(first *protos.HistoryEvent, h *protos.HistoryEvent) *string { return ptr.Of(fmt.Sprintf("rerunParent=%s", in.InstanceID)) } return nil + case *protos.HistoryEvent_SubOrchestrationInstanceCompleted: + return ptr.Of(fmt.Sprintf("eventId=%d", t.SubOrchestrationInstanceCompleted.GetTaskScheduledId())) default: return nil }