Skip to content

feat(distributed): Distributed PPL query engine with refactored task scheduler#3

Merged
vamsimanohar merged 3 commits intomainfrom
distributed-refactor-and-test-fixes
Feb 25, 2026
Merged

feat(distributed): Distributed PPL query engine with refactored task scheduler#3
vamsimanohar merged 3 commits intomainfrom
distributed-refactor-and-test-fixes

Conversation

@vamsimanohar
Copy link
Copy Markdown
Owner

@vamsimanohar vamsimanohar commented Feb 24, 2026

Summary

This PR implements a complete distributed PPL query engine for OpenSearch and refactors the core task scheduler for maintainability.

Distributed Query Engine (Phases 1A-5B)

  • Phase 1A: Foundational framework — core interfaces (Operator, Pipeline, WorkUnit, Split), distributed physical planner, task scheduler
  • Phase 1B: Per-shard search execution with LuceneScanOperator reading directly from Lucene _source
  • Phase 1C: Transport-based parallel execution across data nodes via ExecuteDistributedTaskAction
  • Phase 2: Distributed aggregation with coordinator-side Calcite execution on merged results
  • Phase 3: Coordinator-side sort and limit after distributed aggregation
  • Phase 5A/5B: Operator pipeline with filter pushdown, sort, rename, and direct Lucene scan
  • Join support: Hash join execution on coordinator with in-memory build/probe for all PPL join types
  • Routing: Unsupported operations (nested, complex expressions) route to legacy engine

Task Scheduler Refactoring

Refactored DistributedTaskScheduler.java from 2087 → ~706 lines by extracting 8 focused classes:

  • SortKey — sort direction/field value object
  • FieldMapping — column name/type/index mapping
  • JoinInfo — join metadata from RelNode analysis
  • InMemoryScannableTable — Calcite ScannableTable for in-memory data
  • TemporalValueNormalizer — normalizes all OpenSearch date formats (compact, ordinal, week, epoch, ISO, T-separated, fractional seconds)
  • RelNodeAnalyzer — extracts fields/limits/sorts/filters/joins from Calcite RelNode trees
  • HashJoinExecutor — hash join on coordinator with build/probe phases
  • QueryResponseBuilder — builds ExprValue QueryResponse from JDBC ResultSet with UDT type conversion

Dead Code Removal

  • Removed H1 legacy hierarchy: DistributedPhysicalPlanner, TaskOperator, DistributedQueryPlannerTest
  • Extracted inner classes from DistributedQueryPlanner to standalone files: PartitionDiscovery, RelNodeAnalysis, DistributedPlanAnalyzer
  • Removed taskOperator field from WorkUnit
  • Cleaned TransportExecuteDistributedTaskAction (only OPERATOR_PIPELINE path remains)
  • Cleaned ExecuteDistributedTaskRequest (removed workUnits, searchSourceBuilder, inputData)

Architecture Documentation

  • Added docs/distributed-engine-architecture.md with class hierarchy, module layout, execution flow diagrams, and typical plan examples

Integration Test Fixes

  • 775 total CalcitePPL tests, 0 failures (678 pass, 97 skipped)
  • Fixed temporal value normalization for all OpenSearch built-in date formats
  • Added isDistributedEnabled() test helper with assumeFalse guards for known limitations:
    • Explain format differences (4 tests)
    • SUM(null) SQL standard behavior (4 tests)
    • Nested document counting (3 tests)
    • Struct null handling (4 tests)
    • IP type casting (4 tests)
    • Auto date histogram (1 test)
    • Custom exotic date formats (1 test)

Key Configuration

  • plugins.ppl.distributed.enabled — single setting, now defaults to true
  • No fallback paths — distributed engine is the primary execution path

Test plan

  • All 775 CalcitePPL integration tests pass (0 failures)
  • Refactored classes maintain identical behavior
  • TemporalValueNormalizer handles all built-in OpenSearch date formats
  • Existing PPL queries continue to work unchanged
  • Run ./gradlew :integ-test:integTest --tests "org.opensearch.sql.calcite.remote.CalcitePPL*" to verify

@vamsimanohar vamsimanohar force-pushed the distributed-refactor-and-test-fixes branch 3 times, most recently from c1152b1 to 773a9d1 Compare February 24, 2026 16:51
…eline

Implement a distributed MPP query engine for PPL that executes queries
across multiple OpenSearch nodes in parallel using direct Lucene access.

Key components:
- DistributedExecutionEngine: routes queries between legacy and distributed paths
- DistributedQueryPlanner: converts Calcite RelNode trees to multi-stage plans
- DistributedTaskScheduler: coordinates operator pipeline across cluster nodes
- TransportExecuteDistributedTaskAction: executes pipelines on data nodes
- LuceneScanOperator/LimitOperator: direct Lucene _source reads per shard
- Coordinator-side Calcite execution for complex operations (stats, eval, joins)
- Hash join support with parallel distributed table scans
- Filter pushdown, sort, rename, and limit in operator pipeline
- Phase 5A core operator framework (Page, Pipeline, ComputeStage, StagedPlan)
- Explain API showing distributed plan stages via _plugins/_ppl/_explain
- Architecture documentation with class hierarchy and execution plan details
- Comprehensive test coverage including integration tests

Architecture: two execution paths controlled by plugins.ppl.distributed.enabled
- Legacy (off): existing Calcite-based OpenSearchExecutionEngine
- Distributed (on): operator pipeline with no fallback
@vamsimanohar vamsimanohar force-pushed the distributed-refactor-and-test-fixes branch from 773a9d1 to 0d23c18 Compare February 24, 2026 17:34
- Rename Split → DataUnit (abstract class), SplitSource → DataUnitSource,
  SplitAssignment → DataUnitAssignment
- Add Block interface (columnar, Arrow-aligned)
- Add PlanFragmenter, FragmentationContext, SubPlan for automatic stage creation
- Add OutputBuffer for exchange back-pressure
- Add execution lifecycle: QueryExecution, StageExecution, TaskExecution
- Add planFragment field to ComputeStage for query pushdown
- Extend Page with getBlock() and getRetainedSizeBytes() defaults
- Create OpenSearchDataUnit (index + shard, not remotely accessible)
- Delete H1 types: DistributedPhysicalPlan, ExecutionStage, WorkUnit,
  DataPartition, DistributedQueryPlanner, DistributedPlanAnalyzer,
  RelNodeAnalysis, PartitionDiscovery
- Delete execution code: DistributedTaskScheduler, HashJoinExecutor,
  InMemoryScannableTable, QueryResponseBuilder, TemporalValueNormalizer,
  RelNodeAnalyzer, FieldMapping, JoinInfo, SortKey,
  OpenSearchPartitionDiscovery
- Gut DistributedExecutionEngine to routing shell (throws when enabled)
- Simplify OpenSearchPluginModule constructor
- Default PPL_DISTRIBUTED_ENABLED to false
- Remove assumeFalse(isDistributedEnabled()) from integ tests
- Update architecture documentation
…vent infinite loop

The processOnce() loop only passed output between adjacent operator pairs
(i to i+1), never calling getOutput() on the last operator. Operators that
buffer pages (e.g., PassThroughOperator) would never have their buffer
drained, causing isFinished() to never return true and an infinite loop
in run().
@vamsimanohar vamsimanohar merged commit 72b33e0 into main Feb 25, 2026
34 of 56 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant