diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
index 97a6175bdb63..9e0105341c53 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenDriverEndpoint.scala
@@ -57,6 +57,12 @@ class GlutenDriverEndpoint extends IsolatedRpcEndpoint with Logging {
totalRegisteredExecutors.addAndGet(-1)
logTrace(s"Executor endpoint ref $executorId is removed.")
+ case GlutenNativeStackAsyncResult(requestId, success, message) =>
+ GlutenDriverEndpoint.handleAsyncResult(requestId, success, message)
+
+ case GlutenNativeStackAsyncChunk(requestId, chunk) =>
+ GlutenDriverEndpoint.appendStackAsyncMessage(requestId, chunk)
+
case e =>
logError(s"Received unexpected message. $e")
}
@@ -88,6 +94,62 @@ class GlutenDriverEndpoint extends IsolatedRpcEndpoint with Logging {
context.reply(true)
}
+ case GlutenStartNativeStackAsync(executorId) =>
+ val data = GlutenDriverEndpoint.executorDataMap.get(executorId)
+ if (data == null) {
+ context.sendFailure(
+ new IllegalArgumentException(s"Executor $executorId not registered or unavailable"))
+ } else {
+ try {
+ val reqId = java.util.UUID.randomUUID().toString
+ GlutenDriverEndpoint.putStackAsyncStatus(reqId, "running", "")
+ data.executorEndpointRef.send(GlutenDumpNativeStackAsyncRequest(reqId))
+ context.reply(reqId)
+ } catch {
+ case t: Throwable => context.sendFailure(t)
+ }
+ }
+
+ case GlutenQueryNativeStackStatus(requestId) =>
+ try {
+ def jsonEscape(s: String): String = {
+ if (s == null) {
+ ""
+ } else {
+ s
+ .replace("\\", "\\\\")
+ .replace("\"", "\\\"")
+ .replace("\n", "\\n")
+ .replace("\r", "\\r")
+ .replace("\t", "\\t")
+ }
+ }
+ val json = GlutenDriverEndpoint
+ .getStackAsyncStatus(requestId)
+ .map {
+ case (st, msg) =>
+ val safeMsg = jsonEscape(Option(msg).getOrElse(""))
+ s"""{"requestId":"$requestId", "status":"$st", "message":"$safeMsg"}"""
+ }
+ .getOrElse("""{"error":"invalid requestId"}""")
+ context.reply(json)
+ } catch {
+ case t: Throwable => context.sendFailure(t)
+ }
+
+ case GlutenQueryNativeStackSync(executorId) =>
+ val data = GlutenDriverEndpoint.executorDataMap.get(executorId)
+ if (data == null) {
+ context.sendFailure(
+ new IllegalArgumentException(s"Executor $executorId not registered or unavailable"))
+ } else {
+ try {
+ val text = data.executorEndpointRef.askSync[String](GlutenDumpNativeStackSyncRequest)
+ context.reply(text)
+ } catch {
+ case t: Throwable => context.sendFailure(t)
+ }
+ }
}
override def onStart(): Unit = {
@@ -116,6 +178,41 @@ object GlutenDriverEndpoint extends Logging with RemovalListener[String, util.Se
.removalListener(this)
.build[String, util.Set[String]]()
+ // Async C++ stack collection status: requestId -> (status, message)
+ private val stackAsyncStatus = new ConcurrentHashMap[String, (String, String)]()
+
+ def putStackAsyncStatus(requestId: String, status: String, message: String): Unit = {
+ stackAsyncStatus.put(requestId, (status, message))
+ }
+
+ def getStackAsyncStatus(requestId: String): Option[(String, String)] = {
+ Option(stackAsyncStatus.get(requestId))
+ }
+
+ def appendStackAsyncMessage(requestId: String, chunk: String): Unit = {
+ val prev = stackAsyncStatus.get(requestId)
+ val prevMsg = if (prev == null) "" else Option(prev._2).getOrElse("")
+ val nextMsg = if (chunk == null || chunk.isEmpty) prevMsg else prevMsg + chunk
+ val status = if (prev == null) "running" else prev._1
+ stackAsyncStatus.put(requestId, (status, nextMsg))
+ logInfo(
+ s"Async stack chunk appended: requestId=$requestId, " +
+ s"chunkLen=${Option(chunk).map(_.length).getOrElse(0)}, " +
+ s"totalLen=${nextMsg.length}")
+ }
+
+ def handleAsyncResult(requestId: String, success: Boolean, message: String): Unit = {
+ val st = if (success) "done" else "error"
+ val prev = stackAsyncStatus.get(requestId)
+ val prevMsg = if (prev == null) "" else Option(prev._2).getOrElse("")
+ val finalMsg = if (message == null || message.isEmpty) prevMsg else prevMsg + message
+ stackAsyncStatus.put(requestId, (st, finalMsg))
+ logInfo(
+ s"Async stack complete: requestId=$requestId, success=$success, totalLen=${Option(finalMsg)
+ .map(_.length)
+ .getOrElse(0)}")
+ }
+
def collectResources(executionId: String, resourceId: String): Unit = {
val resources = executionResourceRelation
.get(executionId, (_: String) => new util.HashSet[String]())
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 47a1fcb88462..ebba001d1114 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -21,7 +21,13 @@ import org.apache.gluten.execution.{CHBroadcastBuildSideCache, CHNativeCacheMana
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.GlutenRpcMessages._
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.rpc.RpcCallContext
+import org.apache.spark.util.{ExecutorManager, ThreadUtils, Utils}
+
+import java.io.RandomAccessFile
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util
import scala.util.{Failure, Success}
@@ -66,6 +72,130 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
resource_id => CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
}
+ case GlutenDumpNativeStackAsyncRequest(requestId) =>
+ // Async collection: run in a separate thread and report result to driverEndpointRef
+ val runnable = new Runnable {
+ override def run(): Unit = {
+ def sendResult(msg: GlutenRpcMessage): Unit = {
+ var ref = driverEndpointRef
+ if (ref == null) {
+ try {
+ ref = rpcEnv.setupEndpointRefByURI(driverUrl)
+ driverEndpointRef = ref
+ } catch {
+ case t: Throwable =>
+ logWarning("Resolve driverEndpointRef failed when reporting async result", t)
+ }
+ }
+ if (ref != null) {
+ try {
+ ref.send(msg)
+ } catch {
+ case t: Throwable => logWarning("Send async result to driver failed", t)
+ }
+ } else {
+ logError(
+ s"DriverEndpointRef unavailable; " +
+ s"async stack result lost for requestId=$requestId")
+ }
+ }
+ try {
+ val pid = ExecutorManager.getProcessId()
+ ensureGdbInstalled()
+ val gdbCmdPrefix = gdbPrefix()
+ val tmpLog = Files.createTempFile(s"gluten-bt-$requestId", ".log")
+ val logPath = tmpLog.toAbsolutePath.toString
+ val charset = StandardCharsets.UTF_8
+ val gdbWithLog = buildGdbWithLogging(gdbCmdPrefix, logPath, pid)
+ logInfo(
+ s"Starting async native stack collection: " +
+ s"requestId=$requestId, pid=$pid, cmd=$gdbWithLog")
+ val proc = new ProcessBuilder("bash", "-c", gdbWithLog).start()
+ // Incrementally tail the log file and stream to driver while gdb runs
+ val maxSegmentBytes = 64 * 1024
+ var pos: Long = 0L
+ var segments = 0
+ var totalBytes = 0L
+ def streamBytes(bytes: Array[Byte]): Unit = {
+ var offset = 0
+ while (offset < bytes.length) {
+ val end = Math.min(bytes.length, offset + maxSegmentBytes)
+ val segStr = new String(Arrays.copyOfRange(bytes, offset, end), charset)
+ sendResult(GlutenRpcMessages.GlutenNativeStackAsyncChunk(requestId, segStr))
+ segments += 1
+ offset = end
+ }
+ }
+ // Tail loop: read any appended bytes every ~100ms
+ while (proc.isAlive) {
+ try {
+ val raf = new RandomAccessFile(logPath, "r")
+ val len = raf.length()
+ if (len > pos) {
+ val toRead = (len - pos).toInt
+ val buf = new Array[Byte](toRead)
+ raf.seek(pos)
+ raf.readFully(buf)
+ pos = len
+ totalBytes += toRead
+ streamBytes(buf)
+ }
+ raf.close()
+ } catch { case _: Throwable => () }
+ try Thread.sleep(100)
+ catch { case _: Throwable => () }
+ }
+ // After gdb exits, flush any remaining content
+ try {
+ val raf = new RandomAccessFile(logPath, "r")
+ val len = raf.length()
+ if (len > pos) {
+ val toRead = (len - pos).toInt
+ val buf = new Array[Byte](toRead)
+ raf.seek(pos)
+ raf.readFully(buf)
+ pos = len
+ totalBytes += toRead
+ streamBytes(buf)
+ }
+ raf.close()
+ } catch { case _: Throwable => () }
+ val exitCode = proc.waitFor()
+ logInfo(
+ s"gdb process exit code: $exitCode for requestId=$requestId; " +
+ s"streamedBytes=$totalBytes, segments=$segments")
+ // Cleanup temp file
+ try Files.deleteIfExists(tmpLog)
+ catch { case _: Throwable => () }
+
+ if (exitCode != 0) {
+ sendResult(
+ GlutenNativeStackAsyncResult(
+ requestId,
+ success = false,
+ message =
+ s"Check executor logs for native C++ stack. gdb exit code: " + exitCode))
+ } else {
+ sendResult(
+ GlutenNativeStackAsyncResult(
+ requestId,
+ success = true,
+ message = ""
+ ))
+ }
+ } catch {
+ case t: Throwable =>
+ sendResult(
+ GlutenNativeStackAsyncResult(
+ requestId,
+ success = false,
+ message = s"Async stack collection failed: ${t.getMessage}"))
+ }
+ }
+ }
+ // Use executor RPC env to run asynchronously
+ new Thread(runnable, s"gluten-stack-async-$requestId").start()
+
case e =>
logError(s"Received unexpected message. $e")
}
@@ -100,6 +230,81 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
case e =>
logError(s"Received unexpected message. $e")
}
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case GlutenDumpNativeStackSyncRequest =>
+ try {
+ val pid = ExecutorManager.getProcessId()
+ ensureGdbInstalled()
+ val gdbCmdPrefix = gdbPrefix()
+ val tmpLog = Files.createTempFile("gluten-bt-sync-", ".log")
+ val logPath = tmpLog.toAbsolutePath.toString
+ val charset = StandardCharsets.UTF_8
+ val gdbWithLog = buildGdbWithLogging(gdbCmdPrefix, logPath, pid)
+ logInfo(s"Starting sync native stack collection: pid=$pid, cmd=$gdbWithLog")
+ val proc = new ProcessBuilder("bash", "-c", gdbWithLog).start()
+ val exitCode = proc.waitFor()
+ val output =
+ try {
+ new String(Files.readAllBytes(tmpLog), charset)
+ } catch { case _: Throwable => "" }
+ try Files.deleteIfExists(tmpLog)
+ catch { case _: Throwable => () }
+ if (exitCode != 0 && (output == null || output.isEmpty)) {
+ context.reply(s"gdb exit code: $exitCode. Please check executor logs.")
+ } else {
+ context.reply(Option(output).getOrElse(""))
+ }
+ } catch {
+ case t: Throwable =>
+ logWarning("Sync native stack collection failed", t)
+ context.sendFailure(t)
+ }
+ }
+
+ private def has(cmd: String): Boolean = {
+ try {
+ val out = Utils.executeAndGetOutput(
+ Seq("bash", "-c", s"command -v $cmd >/dev/null 2>&1 && echo yes || echo no"))
+ out != null && out.trim == "yes"
+ } catch { case _: Throwable => false }
+ }
+
+ private def ensureGdbInstalled(): Unit = {
+ if (!has("gdb")) {
+ val sudo = if (has("sudo")) "sudo " else ""
+ val cmdOpt =
+ if (has("apt-get")) {
+ val base = s"${sudo}apt-get update -y && ${sudo}apt-get install -y "
+ Some(base + "gdb elfutils")
+ } else if (has("yum")) {
+ Some(s"${sudo}yum install -y gdb elfutils")
+ } else if (has("dnf")) {
+ Some(s"${sudo}dnf install -y gdb elfutils")
+ } else if (has("zypper")) {
+ Some(s"${sudo}zypper -n install gdb elfutils")
+ } else {
+ None
+ }
+ cmdOpt.foreach {
+ c =>
+ try { Utils.executeCommand(Seq("bash", "-c", c)) }
+ catch { case _: Throwable => () }
+ }
+ }
+ }
+
+ private def gdbPrefix(): String = {
+ if (has("stdbuf")) "stdbuf -o0 -e0 gdb --batch-silent -q" else "gdb --batch-silent -q"
+ }
+
+ private def buildGdbWithLogging(gdbCmdPrefix: String, logPath: String, pid: Long): String = {
+ s"$gdbCmdPrefix -ex 'set pagination off' " +
+ s"-ex 'set print thread-events off' -ex 'set logging file $logPath' " +
+ s"-ex 'set logging overwrite on' -ex 'set logging on' " +
+ s"-ex 'thread apply all bt full' " +
+ s"-ex 'set logging off' -p $pid"
+ }
}
object GlutenExecutorEndpoint {
var executorEndpoint: GlutenExecutorEndpoint = _
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index 8127c324b79c..22d7f2cf351f 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -50,4 +50,26 @@ object GlutenRpcMessages {
case class GlutenFilesCacheLoad(files: Array[Byte]) extends GlutenRpcMessage
case class GlutenFilesCacheLoadStatus(jobId: String)
+
+ /** Start async native stack collection; driver returns a `requestId` immediately */
+ case class GlutenStartNativeStackAsync(executorId: String) extends GlutenRpcMessage
+
+ /** Internal async dump request delivered to executor (no options) */
+ case class GlutenDumpNativeStackAsyncRequest(requestId: String) extends GlutenRpcMessage
+
+ /** Executor reports async dump result back to driver */
+ case class GlutenNativeStackAsyncResult(requestId: String, success: Boolean, message: String)
+ extends GlutenRpcMessage
+
+ /** Executor reports async dump partial chunk to driver */
+ case class GlutenNativeStackAsyncChunk(requestId: String, chunk: String) extends GlutenRpcMessage
+
+ /** Query async native stack status by requestId, driver returns JSON string */
+ case class GlutenQueryNativeStackStatus(requestId: String) extends GlutenRpcMessage
+
+ /** Synchronous: query native C++ stack of an executor; driver will block and return plain text */
+ case class GlutenQueryNativeStackSync(executorId: String) extends GlutenRpcMessage
+
+ /** Internal sync dump request delivered to executor; executor replies with full stack text */
+ case object GlutenDumpNativeStackSyncRequest extends GlutenRpcMessage
}
diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenAllExecutionsPage.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenAllExecutionsPage.scala
index af153e28c416..5eb80443a3d2 100644
--- a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenAllExecutionsPage.scala
+++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenAllExecutionsPage.scala
@@ -16,10 +16,14 @@
*/
package org.apache.spark.sql.execution.ui
+import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.ui.TypeAlias._
+import org.apache.spark.status.{ElementTrackingStore, ExecutorSummaryWrapper}
+import org.apache.spark.status.api.v1.ExecutorSummary
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
@@ -58,6 +62,103 @@ private[ui] class GlutenAllExecutionsPage(parent: GlutenSQLTab) extends WebUIPag
{glutenPageTable}
+ // Build Executors overview and native stack collection controls
+ val execSection: Seq[Node] =
+ try {
+ val ui = parent.parent
+ val kvStore = ui.store.store.asInstanceOf[ElementTrackingStore]
+ val wrappers = Utils
+ .tryWithResource(kvStore.view(classOf[ExecutorSummaryWrapper]).closeableIterator()) {
+ iter =>
+ import scala.collection.JavaConverters.asScalaIteratorConverter
+ iter.asScala.toList
+ }
+ val summaries: Seq[ExecutorSummary] = wrappers.map(_.info)
+
+ val maybeExec = Option(request.getParameter("executorId"))
+ val dumpResult: Seq[Node] = maybeExec
+ .map {
+ id =>
+ try {
+ val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+ val driverRef: RpcEndpointRef = parent.glutenDriverEndpointRef(rpcEnv)
+ val startMsgClass = Class.forName(
+ "org.apache.spark.rpc.GlutenRpcMessages$GlutenStartNativeStackAsync")
+ val startMsg = startMsgClass.getConstructors.head
+ .newInstance(id)
+ .asInstanceOf[AnyRef]
+ val requestId = driverRef.askSync[String](startMsg)
+ val base = UIUtils.prependBaseUri(request, parent.basePath)
+ val statusUrl = s"$base/gluten/stackStatus?requestId=$requestId"
+ Seq()
+ } catch {
+ case t: Throwable =>
+ logWarning("Dump native stack failed", t)
+ Seq(
)
+ }
+ }
+ .getOrElse(Nil)
+
+ val search =
+ Option(request.getParameter("glutenExec.search")).map(_.trim).filter(_.nonEmpty)
+ val execPage = Option(request.getParameter("glutenExec.page")).map(_.toInt).getOrElse(1)
+ val parameterPath = s"${UIUtils.prependBaseUri(request, parent.basePath)}/gluten/"
+
+ val execTable = new GlutenExecutorsPagedTable(
+ request,
+ parent,
+ summaries,
+ tableHeaderId = "gluten-executors",
+ executionTag = "glutenExec",
+ basePath = UIUtils.prependBaseUri(request, parent.basePath),
+ subPath = "gluten",
+ searchText = search
+ ).table(execPage)
+
+ Seq(
+
+
+
+ Executors {summaries.size}
+
+ ,
+
+
+
+
+
+ {dumpResult ++ execTable}
+
+ )
+ } catch {
+ case t: Throwable =>
+ logWarning("Render Gluten Executors section failed", t)
+ Seq(Failed to render Executors overview: {
+ t.getMessage
+ }
)
+ }
+
+ _content ++= execSection
+
_content
}
content ++=
@@ -149,8 +250,32 @@ private[ui] class GlutenExecutionPagedTable(
override val dataSource = new GlutenExecutionDataSource(data, pageSize, sortColumn, desc)
- private val parameterPath =
- s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}"
+ private val parameterPath = {
+ // Encode existing query parameters to avoid invalid URIs (e.g., spaces in values)
+ def encodeQuery(q: String): String = {
+ if (q == null || q.isEmpty) ""
+ else {
+ q.split("&")
+ .toSeq
+ .map {
+ kv =>
+ val idx = kv.indexOf('=')
+ if (idx >= 0) {
+ val k = kv.substring(0, idx)
+ val v = kv.substring(idx + 1)
+ s"${URLEncoder.encode(k, UTF_8.name())}=${URLEncoder.encode(v, UTF_8.name())}"
+ } else {
+ URLEncoder.encode(kv, UTF_8.name())
+ }
+ }
+ .mkString("&")
+ }
+ }
+ val otherRaw = getParameterOtherTable(request, executionTag)
+ val other = encodeQuery(otherRaw)
+ val base = s"$basePath/$subPath/"
+ if (other.nonEmpty) s"$base?$other" else s"$base?"
+ }
override def tableId: String = s"$executionTag-table"
@@ -290,3 +415,223 @@ private[ui] class GlutenExecutionDataSource(
}
}
}
+
+private[ui] class GlutenExecutorsPagedTable(
+ request: HttpServletRequest,
+ parent: GlutenSQLTab,
+ data: Seq[ExecutorSummary],
+ tableHeaderId: String,
+ executionTag: String,
+ basePath: String,
+ subPath: String,
+ searchText: Option[String])
+ extends PagedTable[ExecutorSummary] {
+
+ private val (sortColumn, desc, pageSize) = getTableParameters(request, executionTag, "ID")
+ // Use space-free internal sort keys to avoid invalid URI query parameters
+ private def sortKeyFor(name: String): String = name match {
+ case "ID" => "ID"
+ case "Address" => "Address"
+ case "Status" => "Status"
+ case "Active Tasks" => "Active_Tasks"
+ case "Failed Tasks" => "Failed_Tasks"
+ case "Completed Tasks" => "Completed_Tasks"
+ case "Total Tasks" => "Total_Tasks"
+ case "Total GC Time" => "Total_GC_Time"
+ case "Total Input Bytes" => "Total_Input_Bytes"
+ case "Total Shuffle Read" => "Total_Shuffle_Read"
+ case "Total Shuffle Write" => "Total_Shuffle_Write"
+ case other => other.replace(' ', '_')
+ }
+ private def displayNameForKey(key: String): String = key match {
+ case "Active_Tasks" => "Active Tasks"
+ case "Failed_Tasks" => "Failed Tasks"
+ case "Completed_Tasks" => "Completed Tasks"
+ case "Total_Tasks" => "Total Tasks"
+ case "Total_GC_Time" => "Total GC Time"
+ case "Total_Input_Bytes" => "Total Input Bytes"
+ case "Total_Shuffle_Read" => "Total Shuffle Read"
+ case "Total_Shuffle_Write" => "Total Shuffle Write"
+ case other => other
+ }
+ private val encodedSortColumn = URLEncoder.encode(sortKeyFor(sortColumn), UTF_8.name())
+
+ private val filtered: Seq[ExecutorSummary] = searchText match {
+ case Some(q) =>
+ val qq = q.toLowerCase
+ data.filter {
+ s =>
+ s.id.toLowerCase.contains(qq) ||
+ Option(s.hostPort).exists(_.toLowerCase.contains(qq)) ||
+ (if (s.isActive) "active" else "dead").contains(qq)
+ }
+ case None => data
+ }
+
+ override val dataSource = new PagedDataSource[ExecutorSummary](pageSize) {
+ private val sorted = filtered.sorted(ordering(sortColumn, desc))
+ override def dataSize: Int = sorted.size
+ override def sliceData(from: Int, to: Int): Seq[ExecutorSummary] = sorted.slice(from, to)
+ }
+
+ private val parameterPath = {
+ // Encode existing query parameters to avoid invalid URIs (e.g., spaces in values)
+ def encodeQuery(q: String): String = {
+ if (q == null || q.isEmpty) ""
+ else {
+ q.split("&")
+ .toSeq
+ .map {
+ kv =>
+ val idx = kv.indexOf('=')
+ if (idx >= 0) {
+ val k = kv.substring(0, idx)
+ val v = kv.substring(idx + 1)
+ s"${URLEncoder.encode(k, UTF_8.name())}=${URLEncoder.encode(v, UTF_8.name())}"
+ } else {
+ URLEncoder.encode(kv, UTF_8.name())
+ }
+ }
+ .mkString("&")
+ }
+ }
+ val other = encodeQuery(getParameterOtherTable(request, executionTag))
+ val search = searchText
+ .map(s => s"$executionTag.search=${URLEncoder.encode(s, UTF_8.name())}")
+ .getOrElse("")
+ val base = s"$basePath/$subPath/"
+ val query =
+ if (other.nonEmpty && search.nonEmpty) s"$other&$search"
+ else if (other.nonEmpty) other
+ else search
+ if (query.nonEmpty) s"$base?$query" else s"$base?"
+ }
+
+ override def tableId: String = s"$executionTag-table"
+ override def tableCssClass: String =
+ "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
+
+ override def pageLink(page: Int): String = {
+ val sep = if (parameterPath.endsWith("?")) "" else "&"
+ parameterPath +
+ s"$sep$pageNumberFormField=$page" +
+ s"&$executionTag.sort=$encodedSortColumn" +
+ s"&$executionTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableHeaderId"
+ }
+
+ override def pageSizeFormField: String = s"$executionTag.pageSize"
+ override def pageNumberFormField: String = s"$executionTag.page"
+ override def goButtonFormPath: String = {
+ val sep = if (parameterPath.endsWith("?")) "" else "&"
+ s"$parameterPath$sep$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"
+ }
+
+ private val headerInfo: Seq[(String, Boolean, Option[String])] = Seq(
+ ("ID", true, None),
+ ("Address", true, None),
+ ("Status", true, None),
+ ("Active Tasks", true, None),
+ ("Failed Tasks", true, None),
+ ("Completed Tasks", true, None),
+ ("Total Tasks", true, None),
+ ("Total GC Time", true, None),
+ ("Total Input Bytes", true, None),
+ ("Total Shuffle Read", true, None),
+ ("Total Shuffle Write", true, None),
+ ("C++ Thread Dump", false, None)
+ )
+
+ override def headers: Seq[Node] = {
+ isSortColumnValid(headerInfo, displayNameForKey(sortColumn))
+ val ths: Seq[Node] = headerInfo.map {
+ case (name, sortable, _) =>
+ if (sortable) {
+ val encodedName = URLEncoder.encode(sortKeyFor(name), UTF_8.name())
+ val newDesc = if (sortKeyFor(sortColumn) == sortKeyFor(name)) !desc else false
+ val sep = if (parameterPath.endsWith("?")) "" else "&"
+ val href = parameterPath +
+ s"$sep$pageNumberFormField=1" +
+ s"&$executionTag.sort=$encodedName" +
+ s"&$executionTag.desc=$newDesc" +
+ s"&$pageSizeFormField=$pageSize" +
+ s"#$tableHeaderId"
+ {name} |
+ } else {
+ {name} |
+ }
+ }
+ Seq({ths}
)
+ }
+
+ override def row(s: ExecutorSummary): Seq[Node] = {
+ val link = UIUtils.prependBaseUri(request, parent.basePath) + s"/gluten/?executorId=${s.id}"
+ val syncLink =
+ UIUtils.prependBaseUri(request, parent.basePath) + s"/gluten/stackSync?executorId=${s.id}"
+ val statusText = if (s.isActive) "ACTIVE" else "DEAD"
+ def fmtBytes(v: Long): String = {
+ val abs = math.abs(v)
+ if (abs < 1024L) s"${v}B"
+ else if (abs < 1024L * 1024L) f"${v / 1024.0}%.1fK"
+ else if (abs < 1024L * 1024L * 1024L) f"${v / (1024.0 * 1024)}%.1fM"
+ else f"${v / (1024.0 * 1024 * 1024)}%.1fG"
+ }
+
+ | {s.id} |
+ {s.hostPort} |
+ {statusText} |
+ {s.activeTasks.toString} |
+ {s.failedTasks.toString} |
+ {s.completedTasks.toString} |
+ {s.totalTasks.toString} |
+ {s.totalGCTime.toString} |
+ {fmtBytes(s.totalInputBytes)} |
+ {fmtBytes(s.totalShuffleRead)} |
+ {fmtBytes(s.totalShuffleWrite)} |
+ {
+ // Hide C++ stack link for driver executor
+ if (s.id == "driver") {
+ |
+ } else {
+
+ C++ Stack (Sync)
+ /
+ C++ Stack (Async)
+ |
+ }
+ }
+
+ }
+
+ private def ordering(sortColumn: String, desc: Boolean): Ordering[ExecutorSummary] = {
+ val key = sortColumn match {
+ case "Num_Gluten_Nodes" => "Num Gluten Nodes"
+ case "Num_Fallback_Nodes" => "Num Fallback Nodes"
+ case "Active_Tasks" => "Active Tasks"
+ case "Failed_Tasks" => "Failed Tasks"
+ case "Completed_Tasks" => "Completed Tasks"
+ case "Total_Tasks" => "Total Tasks"
+ case "Total_GC_Time" => "Total GC Time"
+ case "Total_Input_Bytes" => "Total Input Bytes"
+ case "Total_Shuffle_Read" => "Total Shuffle Read"
+ case "Total_Shuffle_Write" => "Total Shuffle Write"
+ case other => other
+ }
+ val ord: Ordering[ExecutorSummary] = key match {
+ case "ID" => Ordering.by(_.id)
+ case "Address" => Ordering.by(_.hostPort)
+ case "Status" => Ordering.by(s => if (s.isActive) 1 else 0)
+ case "Active Tasks" => Ordering.by(_.activeTasks)
+ case "Failed Tasks" => Ordering.by(_.failedTasks)
+ case "Completed Tasks" => Ordering.by(_.completedTasks)
+ case "Total Tasks" => Ordering.by(_.totalTasks)
+ case "Total GC Time" => Ordering.by(_.totalGCTime)
+ case "Total Input Bytes" => Ordering.by(_.totalInputBytes)
+ case "Total Shuffle Read" => Ordering.by(_.totalShuffleRead)
+ case "Total Shuffle Write" => Ordering.by(_.totalShuffleWrite)
+ case unknown => throw QueryExecutionErrors.unknownColumnError(unknown)
+ }
+ if (desc) ord.reverse else ord
+ }
+}
diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLTab.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLTab.scala
index 6be1b0c95e3d..caef843a407c 100644
--- a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLTab.scala
+++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLTab.scala
@@ -18,6 +18,8 @@ package org.apache.spark.sql.execution.ui
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointAddress, RpcEndpointRef, RpcEnv}
import org.apache.spark.ui.{SparkUI, SparkUITab}
class GlutenSQLTab(val sqlStore: GlutenSQLAppStatusStore, sparkUI: SparkUI)
@@ -31,5 +33,16 @@ class GlutenSQLTab(val sqlStore: GlutenSQLAppStatusStore, sparkUI: SparkUI)
val parent = sparkUI
attachPage(new GlutenAllExecutionsPage(this))
+ attachPage(new GlutenStackStatusPage(this))
+ attachPage(new GlutenStackSyncPage(this))
parent.attachTab(this)
+
+ /** Get the RPC endpoint reference of the Gluten driver endpoint. */
+ def glutenDriverEndpointRef(rpcEnv: RpcEnv): RpcEndpointRef = {
+ val driverHost = conf.get(config.DRIVER_HOST_ADDRESS.key, "localhost")
+ val driverPort = conf.getInt(config.DRIVER_PORT.key, 7077)
+ val rpcAddress = RpcAddress(driverHost, driverPort)
+ val driverUrl = RpcEndpointAddress(rpcAddress, "GlutenDriverEndpoint").toString
+ rpcEnv.setupEndpointRefByURI(driverUrl)
+ }
}
diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenStackStatusPage.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenStackStatusPage.scala
new file mode 100644
index 000000000000..c7928e8e337b
--- /dev/null
+++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenStackStatusPage.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+import org.json4s.JValue
+import org.json4s.jackson.JsonMethods
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+/** Status API for async C++ stack collection. Path: /gluten/stackStatus?requestId=xxxx */
+private[ui] class GlutenStackStatusPage(parent: GlutenSQLTab)
+ extends WebUIPage("stackStatus")
+ with Logging {
+
+ override def renderJson(request: HttpServletRequest): JValue = {
+ val reqIdOpt = Option(request.getParameter("requestId")).filter(_.nonEmpty)
+ val resultStr = reqIdOpt match {
+ case Some(id) =>
+ try {
+ val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+ val driverRef: RpcEndpointRef = parent.glutenDriverEndpointRef(rpcEnv)
+ val msgClass =
+ Class.forName("org.apache.spark.rpc.GlutenRpcMessages$GlutenQueryNativeStackStatus")
+ val msg = msgClass.getConstructors.head.newInstance(id).asInstanceOf[AnyRef]
+ driverRef.askSync[String](msg)
+ } catch {
+ case t: Throwable =>
+ s"""{"status": "error", "message":"query failed: ${t.getMessage}"}"""
+ }
+ case None =>
+ s"""{"status": "error", "message":">Missing parameter: requestId"}"""
+
+ }
+ parseJsonString(resultStr)
+ }
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val reqIdOpt = Option(request.getParameter("requestId")).filter(_.nonEmpty)
+ reqIdOpt match {
+ case Some(id) =>
+ val base = UIUtils.prependBaseUri(request, parent.basePath)
+ val statusAPI = s"$base/gluten/stackStatus/json?requestId=$id"
+ val content = Seq(
+
+
+ Request ID: {id}
+
+
+ Collecting...
+
+
+
+
+
+ )
+ UIUtils.headerSparkPage(request, "Gluten C++ Stack", content, parent)
+ case None =>
+ Seq(Missing parameter: requestId
)
+ }
+ }
+
+ private def parseJsonString(str: String): JValue = {
+ JsonMethods.parse(str)
+ }
+}
diff --git a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenStackSyncPage.scala b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenStackSyncPage.scala
new file mode 100644
index 000000000000..8e9edc9ac8d1
--- /dev/null
+++ b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenStackSyncPage.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.ui
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+/** 同步获取 C++ 栈的页面:/gluten/stackSync?executorId=xxxx */
+private[ui] class GlutenStackSyncPage(parent: GlutenSQLTab)
+ extends WebUIPage("stackSync")
+ with Logging {
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val execIdOpt = Option(request.getParameter("executorId")).filter(_.nonEmpty)
+ execIdOpt match {
+ case Some(executorId) =>
+ try {
+ val rpcEnv: RpcEnv = SparkEnv.get.rpcEnv
+ val driverRef: RpcEndpointRef = parent.glutenDriverEndpointRef(rpcEnv)
+ val msgClass =
+ Class.forName("org.apache.spark.rpc.GlutenRpcMessages$GlutenQueryNativeStackSync")
+ val msg = msgClass.getConstructors.head.newInstance(executorId).asInstanceOf[AnyRef]
+ val resultStr = driverRef.askSync[String](msg)
+ val content = Seq(
+
+
Executor: {executorId}
+
+ Sync fetch complete (blocking).
+
+
+ {resultStr}
+
+
+ )
+ UIUtils.headerSparkPage(request, "Gluten C++ Stack (Sync)", content, parent)
+ } catch {
+ case t: Throwable =>
+ val content = Seq(
+
+
Sync fetch failed:
+
{Option(t.getMessage).getOrElse(t.toString)}
+
+ )
+ UIUtils.headerSparkPage(request, "Gluten C++ Stack (Sync)", content, parent)
+ }
+ case None =>
+ Seq(Missing parameter: executorId
)
+ }
+ }
+}