diff --git a/pom.xml b/pom.xml
index cca8cbd2..b138a1ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,7 @@
6.7.2
2.15.2
2.2.0
+ 2.0.7
-XX:+IgnoreUnrecognizedVMOptions
@@ -190,6 +191,11 @@
jackson-module-scala_${scala.binary.version}
${jackson.version}
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+
diff --git a/src/main/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplication.scala b/src/main/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplication.scala
index 5376495b..7a6f7f52 100644
--- a/src/main/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplication.scala
+++ b/src/main/scala/org/apache/spark/deploy/armada/submit/ArmadaClientApplication.scala
@@ -76,7 +76,9 @@ import org.apache.spark.deploy.k8s.features.{
KubernetesFeatureConfigStep
}
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils
+import org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBuilder
import org.apache.spark.util.Utils
+import org.slf4j.{Logger, LoggerFactory}
import scala.collection.mutable
import scala.jdk.CollectionConverters._
@@ -145,18 +147,12 @@ private[spark] object ArmadaClientApplication {
private val DEFAULT_NAMESPACE = "default"
private val DEFAULT_ARMADA_APP_ID = "armada-spark-app-id"
private val DEFAULT_RUN_AS_USER = 185
-
}
/** Main class and entry point of application submission in KUBERNETES mode.
*/
private[spark] class ArmadaClientApplication extends SparkApplication {
- // FIXME: Find the real way to log properly.
- private def log(msg: String): Unit = {
- // scalastyle:off println
- System.err.println(msg)
- // scalastyle:on println
- }
+ private val logger = LoggerFactory.getLogger(getClass)
override def start(args: Array[String], conf: SparkConf): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
@@ -170,17 +166,17 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
val armadaJobConfig = validateArmadaJobConfig(sparkConf, clientArguments)
val (host, port) = ArmadaUtils.parseMasterUrl(sparkConf.get("spark.master"))
- log(s"Connecting to Armada Server - host: $host, port: $port")
+ logger.info(s"Connecting to Armada Server - host: $host, port: $port")
val armadaClient = ArmadaClient(host, port, useSsl = false, sparkConf.get(ARMADA_AUTH_TOKEN))
val healthTimeout =
Duration(sparkConf.get(ARMADA_HEALTH_CHECK_TIMEOUT), SECONDS)
- log(s"Checking Armada Server health (timeout: $healthTimeout)")
+ logger.info(s"Checking Armada Server health (timeout: $healthTimeout)")
val healthResp = Await.result(armadaClient.submitHealth(), healthTimeout)
if (healthResp.status.isServing) {
- log("Armada Server is serving requests!")
+ logger.info("Armada Server is serving requests!")
} else {
throw new RuntimeException(
"Armada health check failed - Armada Server is not serving requests!"
@@ -196,7 +192,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
val lookoutURL =
s"$lookoutBaseURL/?page=0&sort[id]=jobId&sort[desc]=true&" +
s"ps=50&sb=$driverJobId&active=false&refresh=true"
- log(s"Lookout URL for the driver job is $lookoutURL")
+ logger.info(s"Lookout URL for the driver job is $lookoutURL")
()
}
@@ -745,7 +741,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
val error = Some(driverResponse.jobResponseItems.head.error)
.filter(_.nonEmpty)
.getOrElse("none")
- log(
+ logger.info(
s"Submitted driver job with ID: $driverJobId, Error: $error"
)
driverJobId
@@ -760,7 +756,7 @@ private[spark] class ArmadaClientApplication extends SparkApplication {
val executorsResponse = armadaClient.submitJobs(queue, jobSetId, executors)
executorsResponse.jobResponseItems.map { item =>
val error = Some(item.error).filter(_.nonEmpty).getOrElse("none")
- log(s"Submitted executor job with ID: ${item.jobId}, Error: $error")
+ logger.info(s"Submitted executor job with ID: ${item.jobId}, Error: $error")
item.jobId
}
}
diff --git a/src/test/resources/log4j2-test.properties b/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000..406accfd
--- /dev/null
+++ b/src/test/resources/log4j2-test.properties
@@ -0,0 +1,7 @@
+rootLogger.level = debug
+rootLogger.appenderRef.console.ref = console
+
+appender.console.type = Console
+appender.console.name = console
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{HH:mm:ss.SSS} %-5level %logger{20} - %msg%n
\ No newline at end of file
diff --git a/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaOperations.scala b/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaOperations.scala
index 5d0c1e0e..31cf82ce 100644
--- a/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaOperations.scala
+++ b/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaOperations.scala
@@ -21,6 +21,7 @@ import api.submit.Queue
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.slf4j.{Logger, LoggerFactory}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
@@ -46,6 +47,7 @@ object JobSetStatus {
* Armada server URL (default: "localhost:30002")
*/
class ArmadaClient(armadaUrl: String = "localhost:30002") {
+ private val logger = LoggerFactory.getLogger(getClass)
private val processTimeout = DefaultProcessTimeout
private val yamlMapper = {
@@ -94,12 +96,12 @@ class ArmadaClient(armadaUrl: String = "localhost:30002") {
def ensureQueueExists(name: String)(implicit ec: ExecutionContext): Future[Unit] = {
getQueue(name).flatMap {
case Some(_) =>
- println(s"[QUEUE] Queue $name already exists")
+ logger.info(s"[QUEUE] Queue $name already exists")
Future.successful(())
case None =>
- println(s"[QUEUE] Creating queue $name...")
+ logger.info(s"[QUEUE] Creating queue $name...")
createQueue(name).flatMap { _ =>
- println(s"[QUEUE] Waiting for queue $name to become available...")
+ logger.info(s"[QUEUE] Waiting for queue $name to become available...")
Future {
var attempts = 0
val maxAttempts = 30
@@ -110,12 +112,12 @@ class ArmadaClient(armadaUrl: String = "localhost:30002") {
queueFound = getQueueSync(name).isDefined
attempts += 1
if (!queueFound && attempts % 5 == 0) {
- println(s"[QUEUE] Still waiting for queue $name... (${attempts}s)")
+ logger.info(s"[QUEUE] Still waiting for queue $name... (${attempts}s)")
}
}
if (queueFound) {
- println(s"[QUEUE] Queue $name is ready")
+ logger.info(s"[QUEUE] Queue $name is ready")
} else {
throw new RuntimeException(s"Queue $name not available after $attempts seconds")
}
@@ -164,7 +166,7 @@ class ArmadaClient(armadaUrl: String = "localhost:30002") {
val elapsed = (System.currentTimeMillis() - startTime) / 1000
// Log progress periodically for long-running jobs
if (elapsed % ProgressLogInterval.toSeconds == 0 && elapsed > 0) {
- println(s"Still monitoring job - elapsed: ${elapsed}s")
+ logger.info(s"Still monitoring job - elapsed: ${elapsed}s")
}
}
}
diff --git a/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaSparkE2E.scala b/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaSparkE2E.scala
index 50281900..88c854de 100644
--- a/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaSparkE2E.scala
+++ b/src/test/scala/org/apache/spark/deploy/armada/e2e/ArmadaSparkE2E.scala
@@ -22,6 +22,7 @@ import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.time.{Seconds, Span}
+import org.slf4j.LoggerFactory
import java.util.Properties
import scala.concurrent.ExecutionContext.Implicits.global
@@ -34,6 +35,8 @@ class ArmadaSparkE2E
with ScalaFutures
with Eventually {
+ private val logger = LoggerFactory.getLogger(getClass)
+
implicit override val patienceConfig: PatienceConfig = PatienceConfig(
timeout = Span(300, Seconds),
interval = Span(2, Seconds)
@@ -72,14 +75,14 @@ class ArmadaSparkE2E
sparkVersion = finalSparkVersion
)
- println(s"Test configuration loaded: $baseConfig")
+ logger.info(s"Test configuration loaded: $baseConfig")
// Verify Armada cluster is ready before running tests
val clusterReadyTimeout = ClusterReadyTimeout.toSeconds.toInt
val testQueueName = s"${baseConfig.baseQueueName}-cluster-check-${System.currentTimeMillis()}"
- println(s"[CLUSTER-CHECK] Verifying Armada cluster readiness...")
- println(s"[CLUSTER-CHECK] Will retry for up to $clusterReadyTimeout seconds")
+ logger.info(s"[CLUSTER-CHECK] Verifying Armada cluster readiness...")
+ logger.info(s"[CLUSTER-CHECK] Will retry for up to $clusterReadyTimeout seconds")
val startTime = System.currentTimeMillis()
var clusterReady = false
@@ -88,18 +91,20 @@ class ArmadaSparkE2E
while (!clusterReady && (System.currentTimeMillis() - startTime) < clusterReadyTimeout * 1000) {
attempts += 1
- println(
+ logger.info(
s"[CLUSTER-CHECK] Attempt #$attempts - Creating and verifying test queue: $testQueueName"
)
try {
armadaClient.ensureQueueExists(testQueueName).futureValue
- println(s"[CLUSTER-CHECK] Queue creation and verification succeeded - cluster is ready!")
+ logger.info(
+ s"[CLUSTER-CHECK] Queue creation and verification succeeded - cluster is ready!"
+ )
clusterReady = true
try {
armadaClient.deleteQueue(testQueueName).futureValue
- println(s"[CLUSTER-CHECK] Test queue cleaned up")
+ logger.info(s"[CLUSTER-CHECK] Test queue cleaned up")
} catch {
case _: Exception => // Ignore cleanup failures
}
@@ -107,10 +112,12 @@ class ArmadaSparkE2E
case ex: Exception =>
lastError = Some(ex)
val elapsed = (System.currentTimeMillis() - startTime) / 1000
- println(s"[CLUSTER-CHECK] Attempt #$attempts failed after ${elapsed}s: ${ex.getMessage}")
+ logger.info(
+ s"[CLUSTER-CHECK] Attempt #$attempts failed after ${elapsed}s: ${ex.getMessage}"
+ )
if ((System.currentTimeMillis() - startTime) < clusterReadyTimeout * 1000) {
- println(
+ logger.info(
s"[CLUSTER-CHECK] Waiting ${ClusterCheckRetryDelay.toSeconds} seconds before retry..."
)
Thread.sleep(ClusterCheckRetryDelay.toMillis)
@@ -126,7 +133,7 @@ class ArmadaSparkE2E
)
}
- println(
+ logger.info(
s"[CLUSTER-CHECK] Cluster verified ready after $totalTime seconds ($attempts attempts)"
)
}
diff --git a/src/test/scala/org/apache/spark/deploy/armada/e2e/ProcessExecutor.scala b/src/test/scala/org/apache/spark/deploy/armada/e2e/ProcessExecutor.scala
index 2459ad5b..b400c95b 100644
--- a/src/test/scala/org/apache/spark/deploy/armada/e2e/ProcessExecutor.scala
+++ b/src/test/scala/org/apache/spark/deploy/armada/e2e/ProcessExecutor.scala
@@ -22,6 +22,7 @@ import scala.concurrent.{Future, ExecutionContext, blocking}
import scala.concurrent.duration._
import scala.sys.process._
import scala.util.{Failure, Success, Try}
+import org.slf4j.{Logger, LoggerFactory}
case class ProcessResult(
exitCode: Int,
@@ -31,6 +32,7 @@ case class ProcessResult(
)
object ProcessExecutor {
+ private val logger = LoggerFactory.getLogger(getClass)
/** Execute command and always return ProcessResult, even on failure */
def executeWithResult(command: Seq[String], timeout: Duration): ProcessResult = {
@@ -42,14 +44,14 @@ object ProcessExecutor {
stdout.append(line).append("\n")
// Print docker/spark-submit output in real-time for debugging
if (command.headOption.contains("docker") && line.nonEmpty) {
- println(s"[SPARK-SUBMIT] $line")
+ logger.info(s"[SPARK-SUBMIT] $line")
}
},
line => {
stderr.append(line).append("\n")
// Print docker/spark-submit errors in real-time for debugging
if (command.headOption.contains("docker") && line.nonEmpty) {
- println(s"[SPARK-SUBMIT] $line")
+ logger.info(s"[SPARK-SUBMIT] $line")
}
}
)
diff --git a/src/test/scala/org/apache/spark/deploy/armada/e2e/SimplePodMonitor.scala b/src/test/scala/org/apache/spark/deploy/armada/e2e/SimplePodMonitor.scala
index 66539ad4..dbb80da6 100644
--- a/src/test/scala/org/apache/spark/deploy/armada/e2e/SimplePodMonitor.scala
+++ b/src/test/scala/org/apache/spark/deploy/armada/e2e/SimplePodMonitor.scala
@@ -17,11 +17,13 @@
package org.apache.spark.deploy.armada.e2e
+import org.slf4j.{Logger, LoggerFactory}
import scala.collection.mutable
import scala.concurrent.duration._
/** Simple pod monitoring that captures logs and events on failure */
class SimplePodMonitor(namespace: String) {
+ private val logger = LoggerFactory.getLogger(getClass)
private val capturedLogs = mutable.ArrayBuffer[String]()
/** Check if any pods have failed and capture their logs if so */
@@ -45,7 +47,7 @@ class SimplePodMonitor(namespace: String) {
val failedPodName = failedPods.head.split("\\s+").head
// Immediately capture logs for the failed pod
- println(s"[MONITOR] Pod $failedPodName failed, capturing logs...")
+ logger.info(s"[MONITOR] Pod $failedPodName failed, capturing logs...")
try {
val logsCmd = Seq(
"kubectl",
@@ -58,8 +60,8 @@ class SimplePodMonitor(namespace: String) {
)
val logsResult = ProcessExecutor.executeWithResult(logsCmd, 10.seconds)
if (logsResult.exitCode == 0 && logsResult.stdout.nonEmpty) {
- println(s"[MONITOR] Pod $failedPodName logs:")
- println(logsResult.stdout)
+ logger.info(s"[MONITOR] Pod $failedPodName logs:")
+ logger.info(logsResult.stdout)
}
// Also try to describe the pod
@@ -69,13 +71,13 @@ class SimplePodMonitor(namespace: String) {
val lines = describeResult.stdout.split("\n")
val eventsIndex = lines.indexWhere(_.contains("Events:"))
if (eventsIndex >= 0) {
- println(s"[MONITOR] Pod $failedPodName events:")
- println(lines.slice(eventsIndex, eventsIndex + 20).mkString("\n"))
+ logger.info(s"[MONITOR] Pod $failedPodName events:")
+ logger.info(lines.slice(eventsIndex, eventsIndex + 20).mkString("\n"))
}
}
} catch {
case e: Exception =>
- println(s"[MONITOR] Failed to capture logs for $failedPodName: ${e.getMessage}")
+ logger.info(s"[MONITOR] Failed to capture logs for $failedPodName: ${e.getMessage}")
}
Some(s"Pod $failedPodName failed in namespace $namespace")
@@ -83,19 +85,19 @@ class SimplePodMonitor(namespace: String) {
None
}
} else {
- println(s"[MONITOR] Failed to get pods: ${podsResult.stderr}")
+ logger.info(s"[MONITOR] Failed to get pods: ${podsResult.stderr}")
None
}
} catch {
case e: Exception =>
- println(s"[MONITOR] Error checking pods: ${e.getMessage}")
+ logger.info(s"[MONITOR] Error checking pods: ${e.getMessage}")
None
}
}
/** Capture all logs and events for debugging */
def captureDebugInfo(): Unit = {
- println(s"\n[DEBUG] Capturing debug info for namespace $namespace")
+ logger.debug(s"Capturing debug info for namespace $namespace")
try {
val podsCmd = Seq("kubectl", "get", "pods", "-n", namespace, "-o", "name")
@@ -107,7 +109,7 @@ class SimplePodMonitor(namespace: String) {
podNames.foreach { podName =>
try {
- println(s"[DEBUG] Capturing logs for pod $podName")
+ logger.debug(s"Capturing logs for pod $podName")
val logsCmd = Seq(
"kubectl",
@@ -150,7 +152,7 @@ class SimplePodMonitor(namespace: String) {
}
} catch {
case e: Exception =>
- println(s"[DEBUG] Failed to capture info for pod $podName: ${e.getMessage}")
+ logger.error(s"Failed to capture info for pod $podName: ${e.getMessage}")
}
}
}
@@ -163,16 +165,16 @@ class SimplePodMonitor(namespace: String) {
} catch {
case e: Exception =>
- println(s"[DEBUG] Failed to capture debug info: ${e.getMessage}")
+ logger.error(s"Failed to capture debug info: ${e.getMessage}")
}
}
/** Print all captured logs */
def printCapturedLogs(): Unit = {
if (capturedLogs.nonEmpty) {
- println(s"\n========== DEBUG INFO FOR NAMESPACE: $namespace ==========")
- capturedLogs.foreach(println)
- println(s"========== END DEBUG INFO ==========\n")
+ logger.info(s"\n========== DEBUG INFO FOR NAMESPACE: $namespace ==========")
+ capturedLogs.foreach(logger.info)
+ logger.info(s"========== END DEBUG INFO ==========\n")
}
}
}
diff --git a/src/test/scala/org/apache/spark/deploy/armada/e2e/TestOrchestrator.scala b/src/test/scala/org/apache/spark/deploy/armada/e2e/TestOrchestrator.scala
index ba2bc5de..93eedcc4 100644
--- a/src/test/scala/org/apache/spark/deploy/armada/e2e/TestOrchestrator.scala
+++ b/src/test/scala/org/apache/spark/deploy/armada/e2e/TestOrchestrator.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeoutException
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import TestConstants._
+import org.slf4j.{Logger, LoggerFactory}
import scala.annotation.tailrec
@@ -70,6 +71,7 @@ class TestOrchestrator(
k8sClient: K8sClient
)(implicit ec: ExecutionContext) {
+ private val logger = LoggerFactory.getLogger(getClass)
private val jobSubmitTimeout = JobSubmitTimeout
private val jobWatchTimeout = JobWatchTimeout
@@ -187,10 +189,10 @@ class TestOrchestrator(
s"e2e-${name.toLowerCase.replaceAll("[^a-z0-9]", "-")}-${System.currentTimeMillis()}"
val queueName = s"${config.baseQueueName}-${context.queueSuffix}"
- println(s"\n========== Starting E2E Test: $name ==========")
- println(s"Test ID: ${context.testId}")
- println(s"Namespace: ${context.namespace}")
- println(s"Queue: $queueName")
+ logger.info(s"========== Starting E2E Test: $name ==========")
+ logger.info(s"Test ID: ${context.testId}")
+ logger.info(s"Namespace: ${context.namespace}")
+ logger.info(s"Queue: $queueName")
val resultFuture = for {
_ <- k8sClient.createNamespace(context.namespace)
@@ -205,7 +207,7 @@ class TestOrchestrator(
.andThen {
case scala.util.Failure(ex) =>
// Capture debug info on any failure
- println(s"\n[FAILURE] Test failed with exception: ${ex.getMessage}")
+ logger.error(s"Test failed with exception: ${ex.getMessage}")
val podMonitor = new SimplePodMonitor(context.namespace)
podMonitor.captureDebugInfo()
podMonitor.printCapturedLogs()
@@ -215,9 +217,9 @@ class TestOrchestrator(
cleanupTest(context, queueName)
}
.map { result =>
- println(s"\n========== Test Finished: $name ==========")
- println(s"Job Status: ${result.status}")
- println(s"Duration: ${(System.currentTimeMillis() - context.startTime) / 1000}s")
+ logger.info(s"\n========== Test Finished: $name ==========")
+ logger.info(s"Job Status: ${result.status}")
+ logger.info(s"Duration: ${(System.currentTimeMillis() - context.startTime) / 1000}s")
result
}
}
@@ -225,13 +227,13 @@ class TestOrchestrator(
private def cleanupTest(context: TestContext, queueName: String): Future[Unit] = {
for {
_ <- k8sClient.deleteNamespace(context.namespace).recover { case ex =>
- println(
+ logger.info(
s"[CLEANUP] Warning: Failed to delete namespace ${context.namespace}: ${ex.getMessage}"
)
()
}
_ <- armadaClient.deleteQueue(queueName).recover { case ex =>
- println(s"[CLEANUP] Warning: Failed to delete queue $queueName: ${ex.getMessage}")
+ logger.info(s"[CLEANUP] Warning: Failed to delete queue $queueName: ${ex.getMessage}")
()
}
} yield ()
@@ -275,17 +277,17 @@ class TestOrchestrator(
config.pythonScript
)
- println(s"\n[SUBMIT] Submitting Spark job via Docker:")
- println(s"[SUBMIT] Queue: $queueName")
- println(s"[SUBMIT] JobSetId: $jobSetId")
- println(s"[SUBMIT] Namespace: ${context.namespace}")
- println(s"[SUBMIT] Image: ${config.imageName}")
- println(s"[SUBMIT] Master URL: ${config.masterUrl}")
- println(s"[SUBMIT] Application Resource: $appResource")
- println(s"[SUBMIT] Spark config:")
+ logger.info(s"[SUBMIT] Submitting Spark job via Docker:")
+ logger.info(s"[SUBMIT] Queue: $queueName")
+ logger.info(s"[SUBMIT] JobSetId: $jobSetId")
+ logger.info(s"[SUBMIT] Namespace: ${context.namespace}")
+ logger.info(s"[SUBMIT] Image: ${config.imageName}")
+ logger.info(s"[SUBMIT] Master URL: ${config.masterUrl}")
+ logger.info(s"[SUBMIT] Application Resource: $appResource")
+ logger.info(s"[SUBMIT] Spark config:")
enhancedSparkConfs.toSeq.sortBy(_._1).foreach { case (key, value) =>
val displayValue = if (value.length > 100) value.take(100) + "..." else value
- println(s"[SUBMIT] $key = $displayValue")
+ logger.info(s"[SUBMIT] $key = $displayValue")
}
// Properly escape command for shell reproduction
val escapedCommand = dockerCommand.map { arg =>
@@ -293,7 +295,7 @@ class TestOrchestrator(
"'" + arg.replace("'", "'\\''") + "'"
} else arg
}
- println(s"[SUBMIT] Full command: ${escapedCommand.mkString(" ")}\n")
+ logger.info(s"[SUBMIT] Full command: ${escapedCommand.mkString(" ")}\n")
@tailrec
def attemptSubmit(attempt: Int = 1): ProcessResult = {
@@ -305,7 +307,7 @@ class TestOrchestrator(
if (isQueueNotFoundError && attempt <= 3) {
val waitTime = attempt * 2 // 2, 4, 6 seconds
- println(
+ logger.info(
s"[SUBMIT] Queue not found error on attempt $attempt, retrying in ${waitTime}s..."
)
Thread.sleep(waitTime * 1000)
@@ -318,21 +320,21 @@ class TestOrchestrator(
.filter(line => errorPattern.findFirstIn(line).isDefined)
.take(10)
- println(
+ logger.info(
s"[SUBMIT] ERROR Submit failed with exit code ${result.exitCode} after $attempt attempts"
)
if (relevantLines.nonEmpty) {
- println("[SUBMIT] Relevant error lines:")
- relevantLines.foreach(line => println(s"[SUBMIT] $line"))
+ logger.info("[SUBMIT] Relevant error lines:")
+ relevantLines.foreach(line => logger.info(s"[SUBMIT] $line"))
}
throw new RuntimeException(s"Spark submit failed with exit code ${result.exitCode}")
}
} else {
if (attempt > 1) {
- println(s"[SUBMIT] Job submitted successfully on attempt $attempt")
+ logger.info(s"[SUBMIT] Job submitted successfully on attempt $attempt")
} else {
- println(s"[SUBMIT] Job submitted successfully")
+ logger.info(s"[SUBMIT] Job submitted successfully")
}
result
}
@@ -349,7 +351,7 @@ class TestOrchestrator(
): Future[TestResult] = Future {
val podMonitor = new SimplePodMonitor(context.namespace)
- println(s"[WATCH] Starting job watch for jobSetId: $jobSetId")
+ logger.info(s"[WATCH] Starting job watch for jobSetId: $jobSetId")
val jobFuture = armadaClient.watchJobSet(queueName, jobSetId, jobWatchTimeout)
var jobCompleted = false
@@ -357,7 +359,7 @@ class TestOrchestrator(
var assertionResults = Map.empty[String, AssertionResult]
if (config.failFastOnPodFailure) {
- println(s"[MONITOR] Starting pod monitoring for namespace: ${context.namespace}")
+ logger.info(s"[MONITOR] Starting pod monitoring for namespace: ${context.namespace}")
val monitorThread = new Thread(() => {
while (!jobCompleted && podFailure.isEmpty) {
podFailure = podMonitor.checkForFailures()
@@ -395,10 +397,10 @@ class TestOrchestrator(
Await.result(jobFuture, jobWatchTimeout + 10.seconds)
} catch {
case _: TimeoutException =>
- println(s"[TIMEOUT] Job watch timed out after ${jobWatchTimeout.toSeconds}s")
+ logger.warn(s"Job watch timed out after ${jobWatchTimeout.toSeconds}s")
JobSetStatus.Timeout
case ex: Exception =>
- println(s"[ERROR] Job watch failed: ${ex.getMessage}")
+ logger.error(s"Job watch failed: ${ex.getMessage}")
JobSetStatus.Failed
} finally {
jobCompleted = true
@@ -412,7 +414,7 @@ class TestOrchestrator(
!thread.isAlive // Returns true if thread completed, false if still running
} catch {
case _: InterruptedException =>
- println("[WARNING] Assertion thread was interrupted while waiting for completion")
+ logger.warn("Assertion thread was interrupted while waiting for completion")
false
}
case None => true // No assertions to run, so consider them "completed"
@@ -432,7 +434,7 @@ class TestOrchestrator(
val finalStatus = podFailure match {
case Some(failureMsg) =>
- println(s"[FAILED] Pod failure detected: $failureMsg")
+ logger.info(s"[FAILED] Pod failure detected: $failureMsg")
JobSetStatus.Failed
case None =>
jobStatus
@@ -442,15 +444,15 @@ class TestOrchestrator(
val hasAssertionFailures =
assertionResults.values.exists(_.isInstanceOf[AssertionResult.Failure])
if (finalStatus != JobSetStatus.Success || hasAssertionFailures) {
- println("[DEBUG] Test or assertions failed, capturing debug information...")
+ logger.debug("Test or assertions failed, capturing debug information...")
// Print which assertions failed for clarity
if (hasAssertionFailures) {
- println("[DEBUG] Failed assertions:")
+ logger.error("Failed assertions:")
assertionResults.foreach { case (name, result) =>
result match {
case AssertionResult.Failure(msg) =>
- println(s" - $name: $msg")
+ logger.error(s" - $name: $msg")
case _ =>
}
}