Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ class GlutenDriverEndpoint extends IsolatedRpcEndpoint with Logging {
totalRegisteredExecutors.addAndGet(-1)
logTrace(s"Executor endpoint ref $executorId is removed.")

case GlutenNativeStackAsyncResult(requestId, success, message) =>
GlutenDriverEndpoint.handleAsyncResult(requestId, success, message)

case GlutenNativeStackAsyncChunk(requestId, chunk) =>
GlutenDriverEndpoint.appendStackAsyncMessage(requestId, chunk)

case e =>
logError(s"Received unexpected message. $e")
}
Expand Down Expand Up @@ -88,6 +94,62 @@ class GlutenDriverEndpoint extends IsolatedRpcEndpoint with Logging {
context.reply(true)
}

case GlutenStartNativeStackAsync(executorId) =>
val data = GlutenDriverEndpoint.executorDataMap.get(executorId)
if (data == null) {
context.sendFailure(
new IllegalArgumentException(s"Executor $executorId not registered or unavailable"))
} else {
try {
val reqId = java.util.UUID.randomUUID().toString
GlutenDriverEndpoint.putStackAsyncStatus(reqId, "running", "")
data.executorEndpointRef.send(GlutenDumpNativeStackAsyncRequest(reqId))
context.reply(reqId)
} catch {
case t: Throwable => context.sendFailure(t)
}
}

case GlutenQueryNativeStackStatus(requestId) =>
try {
def jsonEscape(s: String): String = {
if (s == null) {
""
} else {
s
.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
}
}
val json = GlutenDriverEndpoint
.getStackAsyncStatus(requestId)
.map {
case (st, msg) =>
val safeMsg = jsonEscape(Option(msg).getOrElse(""))
s"""{"requestId":"$requestId", "status":"$st", "message":"$safeMsg"}"""
}
.getOrElse("""{"error":"invalid requestId"}""")
context.reply(json)
} catch {
case t: Throwable => context.sendFailure(t)
}

case GlutenQueryNativeStackSync(executorId) =>
val data = GlutenDriverEndpoint.executorDataMap.get(executorId)
if (data == null) {
context.sendFailure(
new IllegalArgumentException(s"Executor $executorId not registered or unavailable"))
} else {
try {
val text = data.executorEndpointRef.askSync[String](GlutenDumpNativeStackSyncRequest)
context.reply(text)
} catch {
case t: Throwable => context.sendFailure(t)
}
}
}

override def onStart(): Unit = {
Expand Down Expand Up @@ -116,6 +178,41 @@ object GlutenDriverEndpoint extends Logging with RemovalListener[String, util.Se
.removalListener(this)
.build[String, util.Set[String]]()

// Async C++ stack collection status: requestId -> (status, message)
private val stackAsyncStatus = new ConcurrentHashMap[String, (String, String)]()

def putStackAsyncStatus(requestId: String, status: String, message: String): Unit = {
stackAsyncStatus.put(requestId, (status, message))
}

def getStackAsyncStatus(requestId: String): Option[(String, String)] = {
Option(stackAsyncStatus.get(requestId))
}

def appendStackAsyncMessage(requestId: String, chunk: String): Unit = {
val prev = stackAsyncStatus.get(requestId)
val prevMsg = if (prev == null) "" else Option(prev._2).getOrElse("")
val nextMsg = if (chunk == null || chunk.isEmpty) prevMsg else prevMsg + chunk
val status = if (prev == null) "running" else prev._1
stackAsyncStatus.put(requestId, (status, nextMsg))
logInfo(
s"Async stack chunk appended: requestId=$requestId, " +
s"chunkLen=${Option(chunk).map(_.length).getOrElse(0)}, " +
s"totalLen=${nextMsg.length}")
}

def handleAsyncResult(requestId: String, success: Boolean, message: String): Unit = {
val st = if (success) "done" else "error"
val prev = stackAsyncStatus.get(requestId)
val prevMsg = if (prev == null) "" else Option(prev._2).getOrElse("")
val finalMsg = if (message == null || message.isEmpty) prevMsg else prevMsg + message
stackAsyncStatus.put(requestId, (st, finalMsg))
logInfo(
s"Async stack complete: requestId=$requestId, success=$success, totalLen=${Option(finalMsg)
.map(_.length)
.getOrElse(0)}")
}

def collectResources(executionId: String, resourceId: String): Unit = {
val resources = executionResourceRelation
.get(executionId, (_: String) => new util.HashSet[String]())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ import org.apache.gluten.execution.{CHBroadcastBuildSideCache, CHNativeCacheMana
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.rpc.GlutenRpcMessages._
import org.apache.spark.util.ThreadUtils
import org.apache.spark.rpc.RpcCallContext
import org.apache.spark.util.{ExecutorManager, ThreadUtils, Utils}

import java.io.RandomAccessFile
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util

import scala.util.{Failure, Success}

Expand Down Expand Up @@ -66,6 +72,130 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
resource_id => CHBroadcastBuildSideCache.invalidateBroadcastHashtable(resource_id))
}

case GlutenDumpNativeStackAsyncRequest(requestId) =>
// Async collection: run in a separate thread and report result to driverEndpointRef
val runnable = new Runnable {
override def run(): Unit = {
def sendResult(msg: GlutenRpcMessage): Unit = {
var ref = driverEndpointRef
if (ref == null) {
try {
ref = rpcEnv.setupEndpointRefByURI(driverUrl)
driverEndpointRef = ref
} catch {
case t: Throwable =>
logWarning("Resolve driverEndpointRef failed when reporting async result", t)
}
}
if (ref != null) {
try {
ref.send(msg)
} catch {
case t: Throwable => logWarning("Send async result to driver failed", t)
}
} else {
logError(
s"DriverEndpointRef unavailable; " +
s"async stack result lost for requestId=$requestId")
}
}
try {
val pid = ExecutorManager.getProcessId()
ensureGdbInstalled()
val gdbCmdPrefix = gdbPrefix()
val tmpLog = Files.createTempFile(s"gluten-bt-$requestId", ".log")
val logPath = tmpLog.toAbsolutePath.toString
val charset = StandardCharsets.UTF_8
val gdbWithLog = buildGdbWithLogging(gdbCmdPrefix, logPath, pid)
logInfo(
s"Starting async native stack collection: " +
s"requestId=$requestId, pid=$pid, cmd=$gdbWithLog")
val proc = new ProcessBuilder("bash", "-c", gdbWithLog).start()
// Incrementally tail the log file and stream to driver while gdb runs
val maxSegmentBytes = 64 * 1024
var pos: Long = 0L
var segments = 0
var totalBytes = 0L
def streamBytes(bytes: Array[Byte]): Unit = {
var offset = 0
while (offset < bytes.length) {
val end = Math.min(bytes.length, offset + maxSegmentBytes)
val segStr = new String(Arrays.copyOfRange(bytes, offset, end), charset)
sendResult(GlutenRpcMessages.GlutenNativeStackAsyncChunk(requestId, segStr))
segments += 1
offset = end
}
}
// Tail loop: read any appended bytes every ~100ms
while (proc.isAlive) {
try {
val raf = new RandomAccessFile(logPath, "r")
val len = raf.length()
if (len > pos) {
val toRead = (len - pos).toInt
val buf = new Array[Byte](toRead)
raf.seek(pos)
raf.readFully(buf)
pos = len
totalBytes += toRead
streamBytes(buf)
}
raf.close()
} catch { case _: Throwable => () }
try Thread.sleep(100)
catch { case _: Throwable => () }
}
// After gdb exits, flush any remaining content
try {
val raf = new RandomAccessFile(logPath, "r")
val len = raf.length()
if (len > pos) {
val toRead = (len - pos).toInt
val buf = new Array[Byte](toRead)
raf.seek(pos)
raf.readFully(buf)
pos = len
totalBytes += toRead
streamBytes(buf)
}
raf.close()
} catch { case _: Throwable => () }
val exitCode = proc.waitFor()
logInfo(
s"gdb process exit code: $exitCode for requestId=$requestId; " +
s"streamedBytes=$totalBytes, segments=$segments")
// Cleanup temp file
try Files.deleteIfExists(tmpLog)
catch { case _: Throwable => () }

if (exitCode != 0) {
sendResult(
GlutenNativeStackAsyncResult(
requestId,
success = false,
message =
s"Check executor logs for native C++ stack. gdb exit code: " + exitCode))
} else {
sendResult(
GlutenNativeStackAsyncResult(
requestId,
success = true,
message = ""
))
}
} catch {
case t: Throwable =>
sendResult(
GlutenNativeStackAsyncResult(
requestId,
success = false,
message = s"Async stack collection failed: ${t.getMessage}"))
}
}
}
// Use executor RPC env to run asynchronously
new Thread(runnable, s"gluten-stack-async-$requestId").start()

case e =>
logError(s"Received unexpected message. $e")
}
Expand Down Expand Up @@ -100,6 +230,81 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
case e =>
logError(s"Received unexpected message. $e")
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GlutenDumpNativeStackSyncRequest =>
try {
val pid = ExecutorManager.getProcessId()
ensureGdbInstalled()
val gdbCmdPrefix = gdbPrefix()
val tmpLog = Files.createTempFile("gluten-bt-sync-", ".log")
val logPath = tmpLog.toAbsolutePath.toString
val charset = StandardCharsets.UTF_8
val gdbWithLog = buildGdbWithLogging(gdbCmdPrefix, logPath, pid)
logInfo(s"Starting sync native stack collection: pid=$pid, cmd=$gdbWithLog")
val proc = new ProcessBuilder("bash", "-c", gdbWithLog).start()
val exitCode = proc.waitFor()
val output =
try {
new String(Files.readAllBytes(tmpLog), charset)
} catch { case _: Throwable => "" }
try Files.deleteIfExists(tmpLog)
catch { case _: Throwable => () }
if (exitCode != 0 && (output == null || output.isEmpty)) {
context.reply(s"gdb exit code: $exitCode. Please check executor logs.")
} else {
context.reply(Option(output).getOrElse(""))
}
} catch {
case t: Throwable =>
logWarning("Sync native stack collection failed", t)
context.sendFailure(t)
}
}

private def has(cmd: String): Boolean = {
try {
val out = Utils.executeAndGetOutput(
Seq("bash", "-c", s"command -v $cmd >/dev/null 2>&1 && echo yes || echo no"))
out != null && out.trim == "yes"
} catch { case _: Throwable => false }
}

private def ensureGdbInstalled(): Unit = {
if (!has("gdb")) {
val sudo = if (has("sudo")) "sudo " else ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some env it may not allow to install other software

val cmdOpt =
if (has("apt-get")) {
val base = s"${sudo}apt-get update -y && ${sudo}apt-get install -y "
Some(base + "gdb elfutils")
} else if (has("yum")) {
Some(s"${sudo}yum install -y gdb elfutils")
} else if (has("dnf")) {
Some(s"${sudo}dnf install -y gdb elfutils")
} else if (has("zypper")) {
Some(s"${sudo}zypper -n install gdb elfutils")
} else {
None
}
cmdOpt.foreach {
c =>
try { Utils.executeCommand(Seq("bash", "-c", c)) }
catch { case _: Throwable => () }
}
}
}

private def gdbPrefix(): String = {
if (has("stdbuf")) "stdbuf -o0 -e0 gdb --batch-silent -q" else "gdb --batch-silent -q"
}

private def buildGdbWithLogging(gdbCmdPrefix: String, logPath: String, pid: Long): String = {
s"$gdbCmdPrefix -ex 'set pagination off' " +
s"-ex 'set print thread-events off' -ex 'set logging file $logPath' " +
s"-ex 'set logging overwrite on' -ex 'set logging on' " +
s"-ex 'thread apply all bt full' " +
s"-ex 'set logging off' -p $pid"
}
}
object GlutenExecutorEndpoint {
var executorEndpoint: GlutenExecutorEndpoint = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,26 @@ object GlutenRpcMessages {
case class GlutenFilesCacheLoad(files: Array[Byte]) extends GlutenRpcMessage

case class GlutenFilesCacheLoadStatus(jobId: String)

/** Start async native stack collection; driver returns a `requestId` immediately */
case class GlutenStartNativeStackAsync(executorId: String) extends GlutenRpcMessage

/** Internal async dump request delivered to executor (no options) */
case class GlutenDumpNativeStackAsyncRequest(requestId: String) extends GlutenRpcMessage

/** Executor reports async dump result back to driver */
case class GlutenNativeStackAsyncResult(requestId: String, success: Boolean, message: String)
extends GlutenRpcMessage

/** Executor reports async dump partial chunk to driver */
case class GlutenNativeStackAsyncChunk(requestId: String, chunk: String) extends GlutenRpcMessage

/** Query async native stack status by requestId, driver returns JSON string */
case class GlutenQueryNativeStackStatus(requestId: String) extends GlutenRpcMessage

/** Synchronous: query native C++ stack of an executor; driver will block and return plain text */
case class GlutenQueryNativeStackSync(executorId: String) extends GlutenRpcMessage

/** Internal sync dump request delivered to executor; executor replies with full stack text */
case object GlutenDumpNativeStackSyncRequest extends GlutenRpcMessage
}
Loading