From 0bdfbb244876539bbef8b812327d29d10b61d6db Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 16 Jan 2026 13:45:15 +0100 Subject: [PATCH] #685 Fix when transformers and sinks are marked as not allowed to run in parallel have the behavior of transformations with self-dependencies. --- .../za/co/absa/pramen/core/pipeline/Job.scala | 2 ++ .../absa/pramen/core/pipeline/JobBase.scala | 2 ++ .../core/runner/task/TaskRunnerBase.scala | 9 ++--- .../absa/pramen/core/mocks/job/JobSpy.scala | 5 ++- .../runner/task/TaskRunnerBaseSuite.scala | 35 +++++++++++++++++++ 5 files changed, 48 insertions(+), 5 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala index 69adfe574..bff87e5e1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala @@ -42,6 +42,8 @@ trait Job { def allowRunningTasksInParallel: Boolean + def isSelfDependent: Boolean + def notificationTargets: Seq[JobNotificationTarget] def trackDays: Int diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index fbb538a46..4b7e45e12 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -53,6 +53,8 @@ abstract class JobBase(operationDef: OperationDef, override val allowRunningTasksInParallel: Boolean = operationDef.allowParallel && !hasSelfDependencies + override val isSelfDependent: Boolean = hasSelfDependencies + override def notificationTargets: Seq[JobNotificationTarget] = jobNotificationTargets override def trackDays: Int = outputTable.trackDays diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 83cfe931d..bc80199e1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -107,17 +107,18 @@ abstract class TaskRunnerBase(conf: Config, val sortedTasks = tasks.sortBy(_.infoDate) var failedInfoDate: Option[LocalDate] = None - sortedTasks.map(task => + sortedTasks.map { task => + val selfDependent = task.job.isSelfDependent failedInfoDate match { - case Some(failedDate) => + case Some(failedDate) if selfDependent => skipTask(task, s"Due to failure for $failedDate", isWarning = true) - case None => + case _ => val status = runTask(task) if (status.isFailure) failedInfoDate = Option(task.infoDate) status } - ) + } } /** Runs a task in the single thread. Performs all task logging and notification sending activities. */ diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala index bb27a76da..7d4be1a68 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala @@ -41,6 +41,7 @@ class JobSpy(jobName: String = "Dummy Job", runFunction: () => RunResult = () => null, scheduleStrategyIn: ScheduleStrategy = new ScheduleStrategySourcing(true), allowParallel: Boolean = true, + hasSelfDependencies: Boolean = false, saveStats: MetaTableStats = MetaTableStats(Some(0)), jobNotificationTargets: Seq[JobNotificationTarget] = Seq.empty, jobTrackDays: Int = 0 @@ -66,7 +67,9 @@ class JobSpy(jobName: String = "Dummy Job", override val scheduleStrategy: ScheduleStrategy = scheduleStrategyIn - override def allowRunningTasksInParallel: Boolean = allowParallel + override def allowRunningTasksInParallel: Boolean = allowParallel && !hasSelfDependencies + + override def isSelfDependent: Boolean = hasSelfDependencies override def notificationTargets: Seq[JobNotificationTarget] = jobNotificationTargets diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala index bc3d269ee..498e454fc 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala @@ -226,6 +226,39 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val job = tasks.head.job.asInstanceOf[JobSpy] + assert(job.validateCount == 2) + assert(job.runCount == 2) + assert(job.postProcessingCount == 0) + assert(job.saveCount == 0) + assert(job.createHiveTableCount == 0) + assert(result.length == 2) + assert(result.head.runStatus.isInstanceOf[Failed]) + assert(result(1).runStatus.isInstanceOf[Failed]) + + val journalEntries = journal.getEntries(now, now.plusSeconds(30)) + + assert(journalEntries.length == 2) + assert(journalEntries.head.status == "Failed") + assert(notificationTarget.notificationsSent.length == 2) + assert(notificationTarget.notificationsSent.head.runStatus.isInstanceOf[RunStatus.Failed]) + } + + "run multiple failure jobs sequential execution and self=dependencies" in { + val now = Instant.now() + val notificationTarget = new NotificationTargetSpy(ConfigFactory.empty(), (action: TaskResult) => ()) + val jobNotificationTarget = JobNotificationTarget("notification1", Map.empty[String, String], notificationTarget) + val (runner, _, journal, state, _, tasks) = getUseCase(hasSelfDependencies = true, runFunction = () => throw new IllegalStateException("Test exception"), jobNotificationTargets = Seq(jobNotificationTarget)) + + val taskPreDefs = (infoDate :: infoDate.plusDays(1) :: Nil).map(d => core.pipeline.TaskPreDef(d, TaskRunReason.New)) + + val fut = runner.runJobTasks(tasks.head.job, taskPreDefs) + + Await.result(fut, Duration.Inf) + + val result = state.completedStatuses + + val job = tasks.head.job.asInstanceOf[JobSpy] + assert(job.validateCount == 1) assert(job.runCount == 1) assert(job.postProcessingCount == 0) @@ -648,6 +681,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar isRerun: Boolean = false, bookkeeperIn: Bookkeeper = null, allowParallel: Boolean = true, + hasSelfDependencies: Boolean = false, hiveTable: Option[String] = None, jobNotificationTargets: Seq[JobNotificationTarget] = Nil, timeoutTask: Boolean = false @@ -675,6 +709,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar runFunction = runFunction, operationDef = operationDef, allowParallel = allowParallel, + hasSelfDependencies = hasSelfDependencies, saveStats = stats, hiveTable = hiveTable, jobNotificationTargets = jobNotificationTargets)