Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ In CI‑node mode, DDTest also fans out across local CPUs on that node and furth
| `--min-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MIN_PARALLELISM` | vCPU count | Minimum workers to use for the split. |
| `--max-parallelism` | `DD_TEST_OPTIMIZATION_RUNNER_MAX_PARALLELISM` | vCPU count | Maximum workers to use for the split. |
| | `DD_TEST_OPTIMIZATION_RUNNER_CI_NODE` | `-1` (off) | Restrict this run to the slice assigned to node **N** (0‑indexed). Also parallelizes within the node across its CPUs. |
| `--worker-env` | `DD_TEST_OPTIMIZATION_RUNNER_WORKER_ENV` | `""` | Template env vars per local worker (e.g., isolate DBs): `--worker-env "DATABASE_NAME_TEST=app_test{{nodeIndex}}"`. |
| `--ci-node-workers` | `DD_TEST_OPTIMIZATION_RUNNER_CI_NODE_WORKERS` | vCPU count | Number of parallel workers per CI node. Tests assigned to a CI node are further split among this many local workers. |
| `--worker-env` | `DD_TEST_OPTIMIZATION_RUNNER_WORKER_ENV` | `""` | Template env vars per worker: `--worker-env "DATABASE_NAME_TEST=app_test{{nodeIndex}}"`. In CI-node mode, `{{nodeIndex}}` is `ciNode * 10000 + localWorkerIndex`, ensuring uniqueness across heterogeneous CI pools. |
| `--command` | `DD_TEST_OPTIMIZATION_RUNNER_COMMAND` | `""` | Override the default test command used by the framework. When provided, takes precedence over auto-detection (e.g., `--command "bundle exec custom-rspec"`). |
| `--tests-location` | `DD_TEST_OPTIMIZATION_RUNNER_TESTS_LOCATION` | `""` | Custom glob pattern to discover test files (e.g., `--tests-location "custom/spec/**/*_spec.rb"`). Defaults to `spec/**/*_spec.rb` for RSpec, `test/**/*_test.rb` for Minitest. |
| `--runtime-tags` | `DD_TEST_OPTIMIZATION_RUNNER_RUNTIME_TAGS` | `""` | JSON string to override runtime tags used to fetch skippable tests. Useful for local development on a different OS than CI (e.g., `--runtime-tags '{"os.platform":"linux","runtime.version":"3.2.0"}'`). |
Expand Down Expand Up @@ -479,7 +480,6 @@ The `--runtime-tags` option lets you override your local runtime tags to match y
![Runtime tags in Datadog](docs/images/runtime-tags-datadog.png)

Note the following tags:

- `os.architecture` (e.g., `x86_64`)
- `os.platform` (e.g., `linux`)
- `os.version` (e.g., `6.8.0-aws`)
Expand Down
5 changes: 5 additions & 0 deletions internal/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func init() {
rootCmd.PersistentFlags().Int("min-parallelism", defaultParallelism, "Minimum number of parallel test processes (default: number of CPUs)")
rootCmd.PersistentFlags().Int("max-parallelism", defaultParallelism, "Maximum number of parallel test processes (default: number of CPUs)")
rootCmd.PersistentFlags().String("worker-env", "", "Worker environment configuration")
rootCmd.PersistentFlags().Int("ci-node-workers", defaultParallelism, "Number of parallel workers per CI node (default: number of CPUs)")
rootCmd.PersistentFlags().String("command", "", "Test command that ddtest should wrap")
rootCmd.PersistentFlags().String("tests-location", "", "Glob pattern used to discover test files")
rootCmd.PersistentFlags().String("runtime-tags", "", "JSON string to override runtime tags (e.g. '{\"os.platform\":\"linux\",\"runtime.version\":\"3.2.0\"}')")
Expand All @@ -83,6 +84,10 @@ func init() {
fmt.Fprintf(os.Stderr, "Error binding worker-env flag: %v\n", err)
os.Exit(1)
}
if err := viper.BindPFlag("ci_node_workers", rootCmd.PersistentFlags().Lookup("ci-node-workers")); err != nil {
fmt.Fprintf(os.Stderr, "Error binding ci-node-workers flag: %v\n", err)
os.Exit(1)
}
if err := viper.BindPFlag("command", rootCmd.PersistentFlags().Lookup("command")); err != nil {
fmt.Fprintf(os.Stderr, "Error binding command flag: %v\n", err)
os.Exit(1)
Expand Down
30 changes: 10 additions & 20 deletions internal/platform/ruby.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"log/slog"
"maps"
"os"
"regexp"
"strings"

"github.com/DataDog/ddtest/internal/constants"
Expand Down Expand Up @@ -139,34 +140,23 @@ func (r *Ruby) SanityCheck() error {
return nil
}

// bundlerInfoRegex matches bundler info output format: " * gem-name (version [hash])"
// Captures: 1=gem-name, 2=version
var bundlerInfoRegex = regexp.MustCompile(`^\s*\*\s+(\S+)\s+\((\d+\.\d+\.\d+)`)

func parseBundlerInfoVersion(output, gemName string) (version.Version, error) {
for line := range strings.SplitSeq(output, "\n") {
trimmed := strings.TrimSpace(line)
if trimmed == "" {
matches := bundlerInfoRegex.FindStringSubmatch(line)
if matches == nil {
continue
}

if !strings.Contains(trimmed, gemName) {
matchedGem := matches[1]
if matchedGem != gemName {
continue
}

start := strings.Index(trimmed, "(")
end := strings.Index(trimmed, ")")
if start == -1 || end == -1 || end <= start+1 {
continue
}

versionToken := strings.TrimSpace(trimmed[start+1 : end])
if versionToken == "" {
continue
}

fields := strings.Fields(versionToken)
versionString := fields[0]
if !version.IsValid(versionString) {
return version.Version{}, fmt.Errorf("unexpected version format in bundle info output: %q", versionToken)
}

versionString := matches[2]
parsed, err := version.Parse(versionString)
if err != nil {
return version.Version{}, fmt.Errorf("failed to parse version from bundle info output: %w", err)
Expand Down
20 changes: 20 additions & 0 deletions internal/platform/ruby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ func TestRuby_SanityCheck_FailsWhenVersionNotFound(t *testing.T) {
}
}

func TestRuby_SanityCheck_SucceedsWithDebugLogs(t *testing.T) {
// Debug logs from datadog tracing contain paths with gem name and parentheses
// that could confuse the parser if not handled correctly
output := `D, [2026-02-02T16:50:26.016240 #9457] DEBUG -- datadog: [datadog] (/path/to/datadog-ci-rb/lib/datadog/ci/contrib/instrumentation.rb:27:in 'auto_instrument') Auto instrumenting all integrations...
* datadog-ci (1.27.0 e11ecfb)
Summary: Datadog Test Optimization for your ruby application
Homepage: https://github.com/DataDog/datadog-ci-rb
Path: /Users/user/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/bundler/gems/datadog-ci-rb-e11ecfbf06ad
`
mockExecutor := &mockCommandExecutor{
combinedOutput: []byte(output),
}

ruby := NewRuby()
ruby.executor = mockExecutor
if err := ruby.SanityCheck(); err != nil {
t.Fatalf("SanityCheck() unexpected error: %v", err)
}
}

func TestRuby_DetectFramework_RSpec(t *testing.T) {
viper.Reset()
viper.Set("framework", "rspec")
Expand Down
84 changes: 82 additions & 2 deletions internal/runner/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,103 @@ import (

"github.com/DataDog/ddtest/internal/constants"
"github.com/DataDog/ddtest/internal/framework"
"github.com/DataDog/ddtest/internal/settings"
"golang.org/x/sync/errgroup"
)

// ciNodeIndexMultiplier is used to calculate global worker indices in CI-node mode.
// Each CI node gets a range of 10000 indices (node 0: 0-9999, node 1: 10000-19999, etc.)
// This ensures uniqueness even in heterogeneous CI pools with different CPU counts per node.
const ciNodeIndexMultiplier = 10000

// splitTestFilesIntoGroups splits a slice of test files into n groups
// using simple round-robin distribution
func splitTestFilesIntoGroups(testFiles []string, n int) [][]string {
if n <= 0 {
n = 1
}

result := make([][]string, n)
for i := range result {
result[i] = []string{}
}

for i, file := range testFiles {
groupIndex := i % n
result[groupIndex] = append(result[groupIndex], file)
}

return result
}

// runCINodeTests executes tests for a specific CI node (one split, not the whole tests set)
// It further splits the node's tests among local workers based on ci_node_workers setting.
func runCINodeTests(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int) error {
return runCINodeTestsWithWorkers(ctx, framework, workerEnvMap, ciNode, settings.GetCiNodeWorkers())
}

// runCINodeTestsWithWorkers is the internal implementation that accepts ciNodeWorkers as a parameter
// for easier testing.
func runCINodeTestsWithWorkers(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int, ciNodeWorkers int) error {
runnerFilePath := fmt.Sprintf("%s/runner-%d", constants.TestsSplitDir, ciNode)
if _, err := os.Stat(runnerFilePath); os.IsNotExist(err) {
return fmt.Errorf("runner file for ci-node %d does not exist: %s", ciNode, runnerFilePath)
}

slog.Info("Running tests for specific CI node", "ciNode", ciNode, "filePath", runnerFilePath)
if err := runTestsFromFile(ctx, framework, runnerFilePath, workerEnvMap, ciNode); err != nil {
testFiles, err := readTestFilesFromFile(runnerFilePath)
if err != nil {
return fmt.Errorf("failed to read test files for ci-node %d from %s: %w", ciNode, runnerFilePath, err)
}

if len(testFiles) == 0 {
slog.Info("No tests to run for CI node", "ciNode", ciNode)
return nil
}

// Single worker mode: run all tests with global index based on ciNode
if ciNodeWorkers <= 1 {
globalIndex := ciNode * ciNodeIndexMultiplier
slog.Info("Running tests for CI node in single-worker mode", "ciNode", ciNode, "globalIndex", globalIndex)
return runTestsWithGlobalIndex(ctx, framework, testFiles, workerEnvMap, globalIndex)
}

// Multi-worker mode: split tests among local workers
slog.Info("Running tests for CI node in parallel mode",
"ciNode", ciNode, "ciNodeWorkers", ciNodeWorkers, "testFilesCount", len(testFiles))

groups := splitTestFilesIntoGroups(testFiles, ciNodeWorkers)

var g errgroup.Group
for localIndex, groupFiles := range groups {
if len(groupFiles) == 0 {
continue
}

// Global index = ciNode * 10000 + localIndex (ensures uniqueness across heterogeneous CI pools)
globalIndex := ciNode*ciNodeIndexMultiplier + localIndex
g.Go(func() error {
return runTestsWithGlobalIndex(ctx, framework, groupFiles, workerEnvMap, globalIndex)
})
}

if err := g.Wait(); err != nil {
return fmt.Errorf("failed to run tests for ci-node %d: %w", ciNode, err)
}
return nil
}

// runTestsWithGlobalIndex runs a set of test files with the given global worker index for env templating
func runTestsWithGlobalIndex(ctx context.Context, framework framework.Framework, testFiles []string, workerEnvMap map[string]string, globalIndex int) error {
// Create a copy of the worker env map and replace nodeIndex placeholder with global index
workerEnv := make(map[string]string)
for key, value := range workerEnvMap {
workerEnv[key] = strings.ReplaceAll(value, constants.NodeIndexPlaceholder, fmt.Sprintf("%d", globalIndex))
}

slog.Info("Running tests in worker", "globalIndex", globalIndex, "testFilesCount", len(testFiles), "workerEnv", workerEnv)
return framework.RunTests(ctx, testFiles, workerEnv)
}

// runParallelTests executes tests across multiple parallel runners on a single node
func runParallelTests(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string) error {
slog.Info("Running tests in parallel mode")
Expand Down
Loading
Loading