Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public enum Key {
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),
PPL_DISTRIBUTED_ENABLED("plugins.ppl.distributed.enabled"),

/** Enable Calcite as execution engine */
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.operator.OperatorContext;
import org.opensearch.sql.planner.distributed.stage.PartitioningScheme;

/**
* Manages the lifecycle of exchanges between compute stages. Creates exchange sink and source
* operators for inter-stage data transfer.
*/
public interface ExchangeManager {

/**
* Creates an exchange sink operator for sending data from one stage to another.
*
* @param context the operator context
* @param targetStageId the downstream stage receiving the data
* @param partitioning how the output should be partitioned
* @return the exchange sink operator
*/
ExchangeSinkOperator createSink(
OperatorContext context, String targetStageId, PartitioningScheme partitioning);

/**
* Creates an exchange source operator for receiving data from an upstream stage.
*
* @param context the operator context
* @param sourceStageId the upstream stage sending the data
* @return the exchange source operator
*/
ExchangeSourceOperator createSource(OperatorContext context, String sourceStageId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.operator.SinkOperator;

/**
* A sink operator that sends pages to a downstream compute stage. Implementations handle the
* serialization and transport of data between stages (e.g., via OpenSearch transport, Arrow Flight,
* or in-memory buffers for local exchanges).
*/
public interface ExchangeSinkOperator extends SinkOperator {

/** Returns the ID of the downstream stage this sink sends data to. */
String getTargetStageId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.operator.SourceOperator;

/**
* A source operator that receives pages from an upstream compute stage. Implementations handle
* deserialization and buffering of data received from upstream stages.
*/
public interface ExchangeSourceOperator extends SourceOperator {

/** Returns the ID of the upstream stage this source receives data from. */
String getSourceStageId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.exchange;

import org.opensearch.sql.planner.distributed.page.Page;
import org.opensearch.sql.planner.distributed.stage.PartitioningScheme;

/**
* Buffers output pages from a stage before sending them to downstream consumers via the exchange
* layer. Provides back-pressure to prevent producers from overwhelming consumers.
*
* <p>Serialization format is an implementation detail. The default implementation uses OpenSearch
* transport ({@code StreamOutput}). A future implementation can use Arrow IPC ({@code
* ArrowRecordBatch}) for zero-copy columnar exchange.
*/
public interface OutputBuffer extends AutoCloseable {

/**
* Enqueues a page for delivery to downstream consumers.
*
* @param page the page to send
*/
void enqueue(Page page);

/** Signals that no more pages will be enqueued. */
void setNoMorePages();

/** Returns true if the buffer is full and the producer should wait (back-pressure). */
boolean isFull();

/** Returns the total size of buffered data in bytes. */
long getBufferedBytes();

/** Aborts the buffer, discarding any buffered pages. */
void abort();

/** Returns true if all pages have been consumed and no more will be produced. */
boolean isFinished();

/** Returns the partitioning scheme for this buffer's output. */
PartitioningScheme getPartitioningScheme();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.execution;

import java.util.List;
import org.opensearch.sql.planner.distributed.stage.StagedPlan;

/**
* Represents the execution of a complete distributed query. Manages the lifecycle of all stage
* executions and provides query-level statistics.
*/
public interface QueryExecution {

/** Query execution states. */
enum State {
PLANNING,
STARTING,
RUNNING,
FINISHING,
FINISHED,
FAILED
}

/** Returns the unique query identifier. */
String getQueryId();

/** Returns the staged execution plan. */
StagedPlan getPlan();

/** Returns the current execution state. */
State getState();

/** Returns all stage executions for this query. */
List<StageExecution> getStageExecutions();

/** Returns execution statistics for this query. */
QueryStats getStats();

/** Cancels the query and all its stage executions. */
void cancel();

/** Statistics for a query execution. */
interface QueryStats {

/** Returns the total number of output rows. */
long getTotalRows();

/** Returns the total elapsed execution time in milliseconds. */
long getElapsedTimeMillis();

/** Returns the time spent planning in milliseconds. */
long getPlanningTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.execution;

import java.util.List;
import java.util.Map;
import org.opensearch.sql.planner.distributed.split.DataUnit;
import org.opensearch.sql.planner.distributed.stage.ComputeStage;

/**
* Manages the execution of a single compute stage across multiple nodes. Tracks task executions,
* handles data unit assignment, and monitors stage completion.
*/
public interface StageExecution {

/** Stage execution states. */
enum State {
PLANNED,
SCHEDULING,
RUNNING,
FINISHED,
FAILED,
CANCELLED
}

/** Returns the compute stage being executed. */
ComputeStage getStage();

/** Returns the current execution state. */
State getState();

/**
* Adds data units to be processed by this stage.
*
* @param dataUnits the data units to add
*/
void addDataUnits(List<DataUnit> dataUnits);

/** Signals that no more data units will be added to this stage. */
void noMoreDataUnits();

/**
* Returns task executions grouped by node ID.
*
* @return map from node ID to list of task executions on that node
*/
Map<String, List<TaskExecution>> getTaskExecutions();

/** Returns execution statistics for this stage. */
StageStats getStats();

/** Cancels all tasks in this stage. */
void cancel();

/**
* Adds a listener to be notified when the stage state changes.
*
* @param listener the state change listener
*/
void addStateChangeListener(StateChangeListener listener);

/** Listener for stage state changes. */
@FunctionalInterface
interface StateChangeListener {

/**
* Called when the stage transitions to a new state.
*
* @param newState the new state
*/
void onStateChange(State newState);
}

/** Statistics for a stage execution. */
interface StageStats {

/** Returns the total number of rows processed across all tasks. */
long getTotalRows();

/** Returns the total number of bytes processed across all tasks. */
long getTotalBytes();

/** Returns the number of completed tasks. */
int getCompletedTasks();

/** Returns the total number of tasks. */
int getTotalTasks();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.execution;

import java.util.List;
import org.opensearch.sql.planner.distributed.split.DataUnit;

/**
* Represents the execution of a single task within a stage. Each task processes a subset of data
* units on a specific node.
*/
public interface TaskExecution {

/** Task execution states. */
enum State {
PLANNED,
RUNNING,
FLUSHING,
FINISHED,
FAILED,
CANCELLED
}

/** Returns the unique identifier for this task. */
String getTaskId();

/** Returns the node ID where this task is executing. */
String getNodeId();

/** Returns the current execution state. */
State getState();

/** Returns the data units assigned to this task. */
List<DataUnit> getAssignedDataUnits();

/** Returns execution statistics for this task. */
TaskStats getStats();

/** Cancels this task. */
void cancel();

/** Statistics for a task execution. */
interface TaskStats {

/** Returns the number of rows processed by this task. */
long getProcessedRows();

/** Returns the number of bytes processed by this task. */
long getProcessedBytes();

/** Returns the number of output rows produced by this task. */
long getOutputRows();

/** Returns the elapsed execution time in milliseconds. */
long getElapsedTimeMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.distributed.operator;

import org.opensearch.sql.planner.distributed.page.Page;

/**
* Core operator interface using a push/pull model. Operators form a pipeline where data flows as
* {@link Page} batches. Each operator declares whether it needs input ({@link #needsInput()}),
* accepts input ({@link #addInput(Page)}), and produces output ({@link #getOutput()}).
*
* <p>Lifecycle:
*
* <ol>
* <li>Pipeline driver calls {@link #needsInput()} to check readiness
* <li>If ready, driver calls {@link #addInput(Page)} with upstream output
* <li>Driver calls {@link #getOutput()} to pull processed results
* <li>When upstream is done, driver calls {@link #finish()} to signal no more input
* <li>Operator produces remaining buffered output via {@link #getOutput()}
* <li>When {@link #isFinished()} returns true, operator is done
* <li>Driver calls {@link #close()} to release resources
* </ol>
*/
public interface Operator extends AutoCloseable {

/** Returns true if this operator is ready to accept input via {@link #addInput(Page)}. */
boolean needsInput();

/**
* Provides a page of input data to this operator.
*
* @param page the input page (must not be null)
* @throws IllegalStateException if {@link #needsInput()} returns false
*/
void addInput(Page page);

/**
* Returns the next page of output, or null if no output is available yet. A null return does not
* mean the operator is finished — call {@link #isFinished()} to check.
*/
Page getOutput();

/** Returns true if this operator has completed all processing and will produce no more output. */
boolean isFinished();

/** Signals that no more input will be provided. The operator should flush buffered results. */
void finish();

/** Returns the runtime context for this operator. */
OperatorContext getContext();
}
Loading
Loading