Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum Key {
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),
PPL_DISTRIBUTED_ENABLED("plugins.ppl.distributed.enabled"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.dataunit;

import java.util.List;
import java.util.Map;

/**
* A unit of data assigned to a SourceOperator. Each DataUnit represents a portion of data to read —
* typically one OpenSearch shard. Includes preferred nodes for data locality and estimated size for
* load balancing.
*
* <p>Subclasses provide storage-specific details (e.g., {@code OpenSearchDataUnit} adds index name
* and shard ID).
*/
public abstract class DataUnit {

/** Returns a unique identifier for this data unit. */
public abstract String getDataUnitId();

/** Returns the nodes where this data unit can be read locally (primary + replicas). */
public abstract List<String> getPreferredNodes();

/** Returns the estimated number of rows in this data unit. */
public abstract long getEstimatedRows();

/** Returns the estimated size in bytes of this data unit. */
public abstract long getEstimatedSizeBytes();

/** Returns storage-specific properties for this data unit. */
public abstract Map<String, String> getProperties();

/**
* Returns whether this data unit can be read from any node (true) or requires execution on a
* preferred node (false). Default is true; OpenSearch shard data units override to false because
* Lucene requires local access.
*/
public boolean isRemotelyAccessible() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.dataunit;

import java.util.List;
import java.util.Map;

/**
* Assigns data units to nodes, respecting data locality and load balance. Implementations decide
* which node should process each data unit based on preferred nodes, current load, and cluster
* topology.
*/
public interface DataUnitAssignment {

/**
* Assigns data units to nodes.
*
* @param dataUnits the data units to assign
* @param availableNodes the nodes available for execution
* @return a mapping from node ID to the list of data units assigned to that node
*/
Map<String, List<DataUnit>> assign(List<DataUnit> dataUnits, List<String> availableNodes);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.dataunit;

import java.util.List;

/**
* Generates {@link DataUnit}s for a source operator. Implementations discover available data units
* (e.g., shards) from cluster state and create them with preferred node information.
*/
public interface DataUnitSource extends AutoCloseable {

/**
* Returns the next batch of data units, up to the specified maximum batch size. Returns an empty
* list if no more data units are available.
*
* @param maxBatchSize maximum number of data units to return
* @return list of data units
*/
List<DataUnit> getNextBatch(int maxBatchSize);

/**
* Returns the next batch of data units with a default batch size.
*
* @return list of data units
*/
default List<DataUnit> getNextBatch() {
return getNextBatch(1000);
}

/** Returns true if all data units have been generated. */
boolean isFinished();

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.operator.OperatorContext;
import org.opensearch.sql.planner.distributed.stage.PartitioningScheme;

/**
* Manages the lifecycle of exchanges between compute stages. Creates exchange sink and source
* operators for inter-stage data transfer.
*/
public interface ExchangeManager {

/**
* Creates an exchange sink operator for sending data from one stage to another.
*
* @param context the operator context
* @param targetStageId the downstream stage receiving the data
* @param partitioning how the output should be partitioned
* @return the exchange sink operator
*/
ExchangeSinkOperator createSink(
OperatorContext context, String targetStageId, PartitioningScheme partitioning);

/**
* Creates an exchange source operator for receiving data from an upstream stage.
*
* @param context the operator context
* @param sourceStageId the upstream stage sending the data
* @return the exchange source operator
*/
ExchangeSourceOperator createSource(OperatorContext context, String sourceStageId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.operator.SinkOperator;

/**
* A sink operator that sends pages to a downstream compute stage. Implementations handle the
* serialization and transport of data between stages (e.g., via OpenSearch transport, Arrow Flight,
* or in-memory buffers for local exchanges).
*/
public interface ExchangeSinkOperator extends SinkOperator {

/** Returns the ID of the downstream stage this sink sends data to. */
String getTargetStageId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.operator.SourceOperator;

/**
* A source operator that receives pages from an upstream compute stage. Implementations handle
* deserialization and buffering of data received from upstream stages.
*/
public interface ExchangeSourceOperator extends SourceOperator {

/** Returns the ID of the upstream stage this source receives data from. */
String getSourceStageId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.page.Page;
import org.opensearch.sql.planner.distributed.stage.PartitioningScheme;

/**
* Buffers output pages from a stage before sending them to downstream consumers via the exchange
* layer. Provides back-pressure to prevent producers from overwhelming consumers.
*
* <p>Serialization format is an implementation detail. The default implementation uses OpenSearch
* transport ({@code StreamOutput}). A future implementation can use Arrow IPC ({@code
* ArrowRecordBatch}) for zero-copy columnar exchange.
*/
public interface OutputBuffer extends AutoCloseable {

/**
* Enqueues a page for delivery to downstream consumers.
*
* @param page the page to send
*/
void enqueue(Page page);

/** Signals that no more pages will be enqueued. */
void setNoMorePages();

/** Returns true if the buffer is full and the producer should wait (back-pressure). */
boolean isFull();

/** Returns the total size of buffered data in bytes. */
long getBufferedBytes();

/** Aborts the buffer, discarding any buffered pages. */
void abort();

/** Returns true if all pages have been consumed and no more will be produced. */
boolean isFinished();

/** Returns the partitioning scheme for this buffer's output. */
PartitioningScheme getPartitioningScheme();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.execution;

import java.util.List;
import org.opensearch.sql.planner.distributed.stage.StagedPlan;

/**
* Represents the execution of a complete distributed query. Manages the lifecycle of all stage
* executions and provides query-level statistics.
*/
public interface QueryExecution {

/** Query execution states. */
enum State {
PLANNING,
STARTING,
RUNNING,
FINISHING,
FINISHED,
FAILED
}

/** Returns the unique query identifier. */
String getQueryId();

/** Returns the staged execution plan. */
StagedPlan getPlan();

/** Returns the current execution state. */
State getState();

/** Returns all stage executions for this query. */
List<StageExecution> getStageExecutions();

/** Returns execution statistics for this query. */
QueryStats getStats();

/** Cancels the query and all its stage executions. */
void cancel();

/** Statistics for a query execution. */
interface QueryStats {

/** Returns the total number of output rows. */
long getTotalRows();

/** Returns the total elapsed execution time in milliseconds. */
long getElapsedTimeMillis();

/** Returns the time spent planning in milliseconds. */
long getPlanningTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.execution;

import java.util.List;
import java.util.Map;
import org.opensearch.sql.planner.distributed.dataunit.DataUnit;
import org.opensearch.sql.planner.distributed.stage.ComputeStage;

/**
* Manages the execution of a single compute stage across multiple nodes. Tracks task executions,
* handles data unit assignment, and monitors stage completion.
*/
public interface StageExecution {

/** Stage execution states. */
enum State {
PLANNED,
SCHEDULING,
RUNNING,
FINISHED,
FAILED,
CANCELLED
}

/** Returns the compute stage being executed. */
ComputeStage getStage();

/** Returns the current execution state. */
State getState();

/**
* Adds data units to be processed by this stage.
*
* @param dataUnits the data units to add
*/
void addDataUnits(List<DataUnit> dataUnits);

/** Signals that no more data units will be added to this stage. */
void noMoreDataUnits();

/**
* Returns task executions grouped by node ID.
*
* @return map from node ID to list of task executions on that node
*/
Map<String, List<TaskExecution>> getTaskExecutions();

/** Returns execution statistics for this stage. */
StageStats getStats();

/** Cancels all tasks in this stage. */
void cancel();

/**
* Adds a listener to be notified when the stage state changes.
*
* @param listener the state change listener
*/
void addStateChangeListener(StateChangeListener listener);

/** Listener for stage state changes. */
@FunctionalInterface
interface StateChangeListener {

/**
* Called when the stage transitions to a new state.
*
* @param newState the new state
*/
void onStateChange(State newState);
}

/** Statistics for a stage execution. */
interface StageStats {

/** Returns the total number of rows processed across all tasks. */
long getTotalRows();

/** Returns the total number of bytes processed across all tasks. */
long getTotalBytes();

/** Returns the number of completed tasks. */
int getCompletedTasks();

/** Returns the total number of tasks. */
int getTotalTasks();
}
}
Loading
Loading