diff --git a/README.md b/README.md index 665a3a9a..29d6930f 100644 --- a/README.md +++ b/README.md @@ -1771,6 +1771,9 @@ pramen.operations = [ # Thus, the task will take up multiple "slots" in 'pramen.parallel.tasks' setting. # This is useful if some tasks consume lot of memory and CPU and should not be running with other tasks in parallel. consume.threads = 2 + + # When true, schema changes in the source are ignored and are not displayed in notifications. + ignore.schema.change = false tables = [ { @@ -2277,7 +2280,7 @@ Here is a example: name = "My Scala Transformation" type = "transformer" class = "com.example.MyTransformer" - + schedule.type = "daily" output.table = "my_output_table" @@ -2319,6 +2322,25 @@ Here is a example: # Optional column selection columns = [ "A", "B", "C" ] + + # [Optional] If true (default) jobs in this operation is allowed to run in parallel. + # It makes sense to set it to false for jobs that take a lot of cluster resources. + allow.parallel = true + + # [Optional] If this is true, the operation will run regardless if dependent jobs had failed. + # This gives more responsibilities for validation to ensure that the job can run. + # Useful for transformations that should still run if they do not strongly need latest + # data from previous jobs. + always.attempt = false + + # [Optional] You can determine number of tasks running in parallel with 'pramen.parallel.tasks' setting. + # By setting 'consume.threads' to greater value than 1, the task will appear to require more than 1 thread to run. + # Thus, the task will take up multiple "slots" in 'pramen.parallel.tasks' setting. + # This is useful if some tasks consume lot of memory and CPU and should not be running with other tasks in parallel. + consume.threads = 2 + + # [Optional] When true, schema changes in the source are ignored and are not displayed in notifications. + ignore.schema.change = false } ``` diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala index 1db1af07..dd05dee8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala @@ -38,6 +38,7 @@ case class OperationDef( expectedDelayDays: Int, allowParallel: Boolean, alwaysAttempt: Boolean, + ignoreSchemaChange: Boolean, consumeThreads: Int, dependencies: Seq[MetastoreDependency], outputInfoDateExpression: String, @@ -63,6 +64,7 @@ object OperationDef { val EXPECTED_DELAY_DAYS_KEY = "expected.delay.days" val ALLOW_PARALLEL_KEY = "parallel" val ALWAYS_ATTEMPT_KEY = "always.attempt" + val IGNORE_SCHEMA_CHANGE_KEY = "ignore.schema.change" val CONSUME_THREADS_KEY = "consume.threads" val DEPENDENCIES_KEY = "dependencies" val STRICT_DEPENDENCY_MANAGEMENT_KEY = "pramen.strict.dependency.management" @@ -100,6 +102,7 @@ object OperationDef { val expectedDelayDays = ConfigUtils.getOptionInt(conf, EXPECTED_DELAY_DAYS_KEY).getOrElse(defaultDelayDays) val consumeThreads = getThreadsToConsume(name, conf, appConfig) val allowParallel = ConfigUtils.getOptionBoolean(conf, ALLOW_PARALLEL_KEY).getOrElse(true) + val ignoreSchemaChange = ConfigUtils.getOptionBoolean(conf, IGNORE_SCHEMA_CHANGE_KEY).getOrElse(false) val alwaysAttempt = ConfigUtils.getOptionBoolean(conf, ALWAYS_ATTEMPT_KEY).getOrElse(false) val dependencies = getDependencies(conf, parent, strictDependencyManagement) val outputInfoDateExpressionOpt = ConfigUtils.getOptionString(conf, OUTPUT_INFO_DATE_EXPRESSION_KEY) @@ -146,6 +149,7 @@ object OperationDef { expectedDelayDays, allowParallel, alwaysAttempt, + ignoreSchemaChange, consumeThreads, dependencies, outputInfoDateExpression, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 3a79b39e..83cfe931 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -359,7 +359,7 @@ abstract class TaskRunnerBase(conf: Config, val runResult = task.job.run(task.infoDate, task.reason, conf) - val (newSchemaRegistered, schemaChangesBeforeTransform) = handleSchemaChange(runResult.data, task.job.outputTable, task.infoDate) + val (newSchemaRegistered, schemaChangesBeforeTransform) = handleSchemaChange(runResult.data, task.job.outputTable, task.job.operation, task.infoDate) val dfWithTimestamp = task.job.operation.processingTimestampColumn match { case Some(timestampCol) => addProcessingTimestamp(runResult.data, timestampCol) @@ -393,7 +393,7 @@ abstract class TaskRunnerBase(conf: Config, val (newSchemaRegisteredAfterTransform, schemaChangesAfterTransform) = if (task.job.operation.schemaTransformations.nonEmpty) { val transformedTable = task.job.outputTable.copy(name = s"${task.job.outputTable.name}_transformed") - handleSchemaChange(dfTransformed, transformedTable, task.infoDate) + handleSchemaChange(dfTransformed, transformedTable, task.job.operation, task.infoDate) } else { (false, Nil) } @@ -570,9 +570,10 @@ abstract class TaskRunnerBase(conf: Config, } } - private[core] def handleSchemaChange(df: DataFrame, table: MetaTable, infoDate: LocalDate): (Boolean, List[SchemaDifference]) = { - if (table.format.isRaw) { - // Raw tables do need schema check + private[core] def handleSchemaChange(df: DataFrame, table: MetaTable, operationDef: OperationDef, infoDate: LocalDate): (Boolean, List[SchemaDifference]) = { + if (table.format.isRaw || operationDef.ignoreSchemaChange) { + // Raw tables do not need schema check + // When schema changes are explicitly ignored - return no changes return (false, List.empty[SchemaDifference]) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala index e3ac4a5f..72c0e1c9 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala @@ -30,6 +30,7 @@ object OperationDefFactory { expectedDelayDays: Int = 0, allowParallel: Boolean = true, alwaysAttempt: Boolean = false, + ignoreSchemaChange: Boolean = false, consumeThreads: Int = 1, dependencies: Seq[MetastoreDependency] = Nil, outputInfoDateExpression: String = "@date", @@ -50,6 +51,7 @@ object OperationDefFactory { expectedDelayDays, allowParallel, alwaysAttempt, + ignoreSchemaChange, consumeThreads, dependencies, outputInfoDateExpression, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala index 0212ea53..3e27f1cc 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala @@ -109,6 +109,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture { assert(op.operationType.asInstanceOf[Ingestion].sourceTables.head.metaTableName == "table1_sync") assert(op.allowParallel) assert(!op.alwaysAttempt) + assert(!op.ignoreSchemaChange) assert(op.warnMaxExecutionTimeSeconds.isEmpty) assert(op.killMaxExecutionTimeSeconds.isEmpty) assert(op.notificationTargets.size == 2) @@ -127,6 +128,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture { |class = "myclass" |output.table = "dummy_table" |always.attempt = "true" + |ignore.schema.change = "true" |warn.maximum.execution.time.seconds = 50 |kill.maximum.execution.time.seconds = 100 | @@ -162,6 +164,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture { assert(op.operationType.asInstanceOf[Transformation].clazz == "myclass") assert(op.allowParallel) assert(op.alwaysAttempt) + assert(op.ignoreSchemaChange) assert(op.dependencies.length == 2) assert(op.dependencies.head.tables.contains("table1")) assert(op.dependencies.head.dateFromExpr.contains("@infoDate - 1")) @@ -226,6 +229,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture { assert(op.schedule.isInstanceOf[Schedule.EveryDay]) assert(op.outputInfoDateExpression == "@date") assert(op.operationType.asInstanceOf[Transformation].clazz == "myclass") + assert(!op.ignoreSchemaChange) assert(op.dependencies.length == 2) assert(op.dependencies.head.tables.contains("table1")) assert(op.dependencies.head.dateFromExpr.contains("@infoDate - 1")) @@ -265,6 +269,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture { assert(op.operationType.asInstanceOf[OperationType.Sink].sinkTables.head.metaTableName == "table1_sync") assert(op.operationType.asInstanceOf[OperationType.Sink].sinkTables.head.options("topic") == "table1_topic") assert(!op.allowParallel) + assert(!op.ignoreSchemaChange) } "set a correct number of threads to consume by an operation and handles edge cases" in { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala index b867778b..bc3d269e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala @@ -60,7 +60,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val now = Instant.now() val notificationTarget = new NotificationTargetSpy(ConfigFactory.empty(), (action: TaskResult) => ()) val jobNotificationTarget = JobNotificationTarget("notification1", Map.empty[String, String], notificationTarget) - val (runner, _, journal, state, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), jobNotificationTargets = Seq(jobNotificationTarget)) + val (runner, _, journal, state, _, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), jobNotificationTargets = Seq(jobNotificationTarget)) val taskPreDefs = (infoDate :: infoDate.plusDays(1) :: Nil).map(d => core.pipeline.TaskPreDef(d, TaskRunReason.New)) @@ -91,7 +91,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar "run multiple successful jobs sequential execution" in { val now = Instant.now() - val (runner, _, journal, state, tasks) = getUseCase(allowParallel = false, runFunction = () => RunResult(exampleDf)) + val (runner, _, journal, state, _, tasks) = getUseCase(allowParallel = false, runFunction = () => RunResult(exampleDf)) val taskPreDefs = (infoDate :: infoDate.plusDays(1) :: Nil).map(d => core.pipeline.TaskPreDef(d, TaskRunReason.New)) @@ -120,7 +120,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar "run multiple failure jobs parallel execution" in { val now = Instant.now() - val (runner, _, journal, state, tasks) = getUseCase(runFunction = () => throw new IllegalStateException("Test exception")) + val (runner, _, journal, state, _, tasks) = getUseCase(runFunction = () => throw new IllegalStateException("Test exception")) val taskPreDefs = (infoDate :: infoDate.plusDays(1) :: Nil).map(d => core.pipeline.TaskPreDef(d, TaskRunReason.New)) @@ -155,7 +155,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar null } - val (runner, _, journal, state, tasks) = getUseCase(runFunction = runFunction, + val (runner, _, journal, state, _, tasks) = getUseCase(runFunction = runFunction, isRerun = true, allowParallel = false, timeoutTask = true) @@ -188,7 +188,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val now = Instant.now() val notificationTarget = new NotificationTargetSpy(ConfigFactory.empty(), (action: TaskResult) => ()) val jobNotificationTarget = JobNotificationTarget("notification1", Map.empty[String, String], notificationTarget) - val (runner, _, journal, state, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), jobNotificationTargets = Seq(jobNotificationTarget)) + val (runner, _, journal, state, _, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), jobNotificationTargets = Seq(jobNotificationTarget)) val result = runner.runLazyTask(tasks.head.job, infoDate) @@ -214,7 +214,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val now = Instant.now() val notificationTarget = new NotificationTargetSpy(ConfigFactory.empty(), (action: TaskResult) => ()) val jobNotificationTarget = JobNotificationTarget("notification1", Map.empty[String, String], notificationTarget) - val (runner, _, journal, state, tasks) = getUseCase(allowParallel = false, runFunction = () => throw new IllegalStateException("Test exception"), jobNotificationTargets = Seq(jobNotificationTarget)) + val (runner, _, journal, state, _, tasks) = getUseCase(allowParallel = false, runFunction = () => throw new IllegalStateException("Test exception"), jobNotificationTargets = Seq(jobNotificationTarget)) val taskPreDefs = (infoDate :: infoDate.plusDays(1) :: Nil).map(d => core.pipeline.TaskPreDef(d, TaskRunReason.New)) @@ -247,7 +247,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val started = Instant.now() "job is ready" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.Ready, Some(100), Nil, Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.Ready, Some(100), Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -255,7 +255,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "job is ready with warnings" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.Ready, Some(100), Seq(DependencyWarning("table1")), Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.Ready, Some(100), Seq(DependencyWarning("table1")), Nil)) val result = runner.preRunCheck(task.head, started) @@ -265,7 +265,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "job needs update" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NeedsUpdate, Some(100), Nil, Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NeedsUpdate, Some(100), Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -273,7 +273,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "job needs update with warnings" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NeedsUpdate, Some(100), Seq(DependencyWarning("table1")), Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NeedsUpdate, Some(100), Seq(DependencyWarning("table1")), Nil)) val result = runner.preRunCheck(task.head, started) @@ -284,7 +284,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "no data for the job" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NoData(false), None, Nil, Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NoData(false), None, Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -293,7 +293,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "no data as a failure for the job" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NoData(true), None, Nil, Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NoData(true), None, Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -303,7 +303,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "no data for the job with warnings" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NoData(false), None, Seq(DependencyWarning("table1")), Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.NoData(false), None, Seq(DependencyWarning("table1")), Nil)) val result = runner.preRunCheck(task.head, started) @@ -314,7 +314,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "insufficient data" in { - val (runner, _, _, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.InsufficientData(100, 200, None), None, Nil, Nil)) + val (runner, _, _, _, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.InsufficientData(100, 200, None), None, Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -325,7 +325,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar "job already ran" when { "normal run" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.AlreadyRan, Some(100), Nil, Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.AlreadyRan, Some(100), Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -334,7 +334,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "rerun" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.AlreadyRan, Some(100), Nil, Nil), + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.AlreadyRan, Some(100), Nil, Nil), isRerun = true) val result = runner.preRunCheck(task.head, started) @@ -343,7 +343,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "historical" in { - val (runner, _, _, state, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.AlreadyRan, Some(100), Nil, Nil)) + val (runner, _, _, state, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.AlreadyRan, Some(100), Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -353,7 +353,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "skipped" in { - val (runner, _, _, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.Skip("test"), None, Nil, Nil)) + val (runner, _, _, _, _, task) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.Skip("test"), None, Nil, Nil)) val result = runner.preRunCheck(task.head, started) @@ -363,7 +363,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar "job has failed dependencies" in { val depFailure = DependencyFailure(MetastoreDependency("table1" :: Nil, "@infoDate", None, triggerUpdates = true, isOptional = false, isPassive = false), Nil, Nil, "table1" :: Nil, "2022-02-18 - 2022-02-19" :: Nil) - val (runner, _, _, state, tasks) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.FailedDependencies(isFailure = true, depFailure :: Nil), None, Nil, Nil)) + val (runner, _, _, state, _, tasks) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.FailedDependencies(isFailure = true, depFailure :: Nil), None, Nil, Nil)) val result = runner.preRunCheck(tasks.head, started) @@ -373,7 +373,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar "job has empty tables" in { val depFailure = DependencyFailure(MetastoreDependency("table2" :: Nil, "@infoDate", None, triggerUpdates = true, isOptional = false, isPassive = false), "table1" :: Nil, Nil, Nil, Nil) - val (runner, _, _, state, tasks) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.FailedDependencies(isFailure = false, depFailure :: Nil), None, Nil, Nil)) + val (runner, _, _, state, _, tasks) = getUseCase(preRunCheckFunction = () => JobPreRunResult(JobPreRunStatus.FailedDependencies(isFailure = false, depFailure :: Nil), None, Nil, Nil)) val result = runner.preRunCheck(tasks.head, started) @@ -382,7 +382,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "job had failed" in { - val (runner, _, _, _, tasks) = getUseCase(preRunCheckFunction = () => throw new IllegalStateException("test exception")) + val (runner, _, _, _, _, tasks) = getUseCase(preRunCheckFunction = () => throw new IllegalStateException("test exception")) val result = runner.preRunCheck(tasks.head, started) @@ -395,7 +395,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val started = Instant.now() "job is ready" in { - val (runner, _, _, state, task) = getUseCase(validationFunction = () => Reason.Ready) + val (runner, _, _, state, _, task) = getUseCase(validationFunction = () => Reason.Ready) val result = runner.validate(task.head, started) @@ -403,7 +403,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "job is ready with warnings ready" in { - val (runner, _, _, _, task) = getUseCase(validationFunction = () => Reason.Warning(Seq("dummy warning"))) + val (runner, _, _, _, _, task) = getUseCase(validationFunction = () => Reason.Warning(Seq("dummy warning"))) val result = runner.validate(task.head, started) @@ -413,7 +413,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "job not ready" in { - val (runner, _, _, state, task) = getUseCase(validationFunction = () => Reason.NotReady("dummy reason")) + val (runner, _, _, state, _, task) = getUseCase(validationFunction = () => Reason.NotReady("dummy reason")) val result = runner.validate(task.head, started) @@ -423,7 +423,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "job is skipped ready" in { - val (runner, _, _, state, task) = getUseCase(validationFunction = () => Reason.Skip("dummy reason")) + val (runner, _, _, state, _, task) = getUseCase(validationFunction = () => Reason.Skip("dummy reason")) val result = runner.validate(task.head, started) @@ -433,7 +433,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar "validate threw an exception" in { val ex = new IllegalStateException("TestException") - val (runner, _, _, state, tasks) = getUseCase(validationFunction = () => throw ex) + val (runner, _, _, state, _, tasks) = getUseCase(validationFunction = () => throw ex) val result = runner.validate(tasks.head, started) @@ -442,7 +442,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "pass the failure from pre-run check" in { - val (runner, _, _, _, tasks) = getUseCase(preRunCheckFunction = () => throw new IllegalStateException("test exception")) + val (runner, _, _, _, _, tasks) = getUseCase(preRunCheckFunction = () => throw new IllegalStateException("test exception")) val result = runner.validate(tasks.head, started) @@ -466,7 +466,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar | "c" : "3" |} ]""".stripMargin - val (runner, _, _, state, tasks) = getUseCase(runFunction = () => RunResult(exampleDf)) + val (runner, _, _, state, _, tasks) = getUseCase(runFunction = () => RunResult(exampleDf)) val job = tasks.head.job.asInstanceOf[JobSpy] val started = Instant.now() @@ -487,7 +487,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "handle a failed task" in { - val (runner, bk, _, state, tasks) = getUseCase(runFunction = () => throw new IllegalStateException("TestException")) + val (runner, bk, _, state, _, tasks) = getUseCase(runFunction = () => throw new IllegalStateException("TestException")) val started = Instant.now() @@ -502,7 +502,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "handle a dry run" in { - val (runner, bk, _, state, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), isDryRun = true) + val (runner, bk, _, state, _, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), isDryRun = true) val started = Instant.now() @@ -518,7 +518,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "expose Hive table" in { - val (runner, bk, _, state, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), hiveTable = Some("table_hive")) + val (runner, bk, _, state, _, tasks) = getUseCase(runFunction = () => RunResult(exampleDf), hiveTable = Some("table_hive")) val task = tasks.head val job = task.job.asInstanceOf[JobSpy] @@ -539,11 +539,11 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar "handleSchemaChange" should { "register a new schema" in { - val (runner, bk, _, state, _) = getUseCase(runFunction = () => RunResult(exampleDf)) + val (runner, bk, _, state, operation, _) = getUseCase(runFunction = () => RunResult(exampleDf)) val metaTable = MetaTableFactory.getDummyMetaTable("table") - runner.handleSchemaChange(exampleDf, metaTable, infoDate) + runner.handleSchemaChange(exampleDf, metaTable, operation, infoDate) val schemaOpt1 = bk.getLatestSchema("table", infoDate.minusDays(1)) val schemaOpt2 = bk.getLatestSchema("table", infoDate) @@ -553,13 +553,13 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "do nothing if schemas are the same" in { - val (runner, bk, _, state, _) = getUseCase(runFunction = () => RunResult(exampleDf)) + val (runner, bk, _, state, operation, _) = getUseCase(runFunction = () => RunResult(exampleDf)) bk.saveSchema("table", infoDate.minusDays(10), exampleDf.schema) val metaTable = MetaTableFactory.getDummyMetaTable("table") - runner.handleSchemaChange(exampleDf, metaTable, infoDate) + runner.handleSchemaChange(exampleDf, metaTable, operation, infoDate) val schemaOpt1 = bk.getLatestSchema("table", infoDate) val schemaOpt2 = bk.getLatestSchema("table", infoDate.minusDays(11)) @@ -571,11 +571,11 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "do nothing if the table format is 'raw'" in { - val (runner, bk, _, state, _) = getUseCase(runFunction = () => RunResult(exampleDf)) + val (runner, bk, _, state, operation, _) = getUseCase(runFunction = () => RunResult(exampleDf)) val metaTable = MetaTableFactory.getDummyMetaTable("table", format = DataFormat.Raw("/dummy/path")) - runner.handleSchemaChange(exampleDf, metaTable, infoDate) + runner.handleSchemaChange(exampleDf, metaTable, operation, infoDate) val schemaOpt1 = bk.getLatestSchema("table", infoDate.minusDays(1)) val schemaOpt2 = bk.getLatestSchema("table", infoDate) @@ -587,7 +587,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar } "register schema update" in { - val (runner, bk, _, state, _) = getUseCase(runFunction = () => RunResult(exampleDf)) + val (runner, bk, _, state, operation, _) = getUseCase(runFunction = () => RunResult(exampleDf)) bk.saveSchema("table", infoDate.minusDays(10), exampleDf.schema) @@ -595,7 +595,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val metaTable = MetaTableFactory.getDummyMetaTable("table") - runner.handleSchemaChange(df2, metaTable, infoDate) + runner.handleSchemaChange(df2, metaTable, operation, infoDate) val schemaOpt1 = bk.getLatestSchema("table", infoDate.minusDays(1)) val schemaOpt2 = bk.getLatestSchema("table", infoDate) @@ -609,6 +609,35 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar assert(schemaOpt2.get._2 == infoDate) assert(schemaOpt3.get._2 == infoDate) } + + "ignore schema change if explicitly specified" in { + val (runner, bk, _, state, operation, _) = getUseCase(runFunction = () => RunResult(exampleDf)) + + bk.saveSchema("table", infoDate.minusDays(10), exampleDf.schema) + + val df2 = exampleDf.withColumn("c", lit(3)) + + val metaTable = MetaTableFactory.getDummyMetaTable("table") + + val operationWithIgnoredSchemaChanges = operation.copy(ignoreSchemaChange = true) + + val (hasSchemaChanged, changes) = runner.handleSchemaChange(df2, metaTable, operationWithIgnoredSchemaChanges, infoDate) + + val schemaOpt1 = bk.getLatestSchema("table", infoDate.minusDays(1)) + val schemaOpt2 = bk.getLatestSchema("table", infoDate) + val schemaOpt3 = bk.getLatestSchema("table", infoDate.plusDays(1)) + + assert(!hasSchemaChanged) + assert(changes.isEmpty) + + assert(schemaOpt1.nonEmpty) + assert(schemaOpt2.nonEmpty) + assert(schemaOpt3.nonEmpty) + + assert(schemaOpt1.get._2 == infoDate.minusDays(10)) + assert(schemaOpt2.get._2 == infoDate.minusDays(10)) + assert(schemaOpt3.get._2 == infoDate.minusDays(10)) + } } def getUseCase(infoDates: Seq[LocalDate] = infoDate :: Nil, @@ -622,7 +651,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar hiveTable: Option[String] = None, jobNotificationTargets: Seq[JobNotificationTarget] = Nil, timeoutTask: Boolean = false - ): (TaskRunnerBase, Bookkeeper, Journal, PipelineStateSpy, Seq[Task]) = { + ): (TaskRunnerBase, Bookkeeper, Journal, PipelineStateSpy, OperationDef, Seq[Task]) = { val conf = ConfigFactory.empty() val runtimeConfig = RuntimeConfigFactory.getDummyRuntimeConfig(isRerun = isRerun, isDryRun = isDryRun) @@ -654,7 +683,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val runner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, tokenLockFactory, state, runtimeConfig, "app_123") - (runner, bookkeeper, journal, state, tasks) + (runner, bookkeeper, journal, state, operationDef, tasks) } }