Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,9 @@ pramen.operations = [
# Thus, the task will take up multiple "slots" in 'pramen.parallel.tasks' setting.
# This is useful if some tasks consume lot of memory and CPU and should not be running with other tasks in parallel.
consume.threads = 2

# When true, schema changes in the source are ignored and are not displayed in notifications.
ignore.schema.change = false

tables = [
{
Expand Down Expand Up @@ -2277,7 +2280,7 @@ Here is a example:
name = "My Scala Transformation"
type = "transformer"
class = "com.example.MyTransformer"

schedule.type = "daily"

output.table = "my_output_table"
Expand Down Expand Up @@ -2319,6 +2322,25 @@ Here is a example:

# Optional column selection
columns = [ "A", "B", "C" ]

# [Optional] If true (default) jobs in this operation is allowed to run in parallel.
# It makes sense to set it to false for jobs that take a lot of cluster resources.
allow.parallel = true

# [Optional] If this is true, the operation will run regardless if dependent jobs had failed.
# This gives more responsibilities for validation to ensure that the job can run.
# Useful for transformations that should still run if they do not strongly need latest
# data from previous jobs.
always.attempt = false

# [Optional] You can determine number of tasks running in parallel with 'pramen.parallel.tasks' setting.
# By setting 'consume.threads' to greater value than 1, the task will appear to require more than 1 thread to run.
# Thus, the task will take up multiple "slots" in 'pramen.parallel.tasks' setting.
# This is useful if some tasks consume lot of memory and CPU and should not be running with other tasks in parallel.
consume.threads = 2

# [Optional] When true, schema changes in the source are ignored and are not displayed in notifications.
ignore.schema.change = false
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case class OperationDef(
expectedDelayDays: Int,
allowParallel: Boolean,
alwaysAttempt: Boolean,
ignoreSchemaChange: Boolean,
consumeThreads: Int,
dependencies: Seq[MetastoreDependency],
outputInfoDateExpression: String,
Expand All @@ -63,6 +64,7 @@ object OperationDef {
val EXPECTED_DELAY_DAYS_KEY = "expected.delay.days"
val ALLOW_PARALLEL_KEY = "parallel"
val ALWAYS_ATTEMPT_KEY = "always.attempt"
val IGNORE_SCHEMA_CHANGE_KEY = "ignore.schema.change"
val CONSUME_THREADS_KEY = "consume.threads"
val DEPENDENCIES_KEY = "dependencies"
val STRICT_DEPENDENCY_MANAGEMENT_KEY = "pramen.strict.dependency.management"
Expand Down Expand Up @@ -100,6 +102,7 @@ object OperationDef {
val expectedDelayDays = ConfigUtils.getOptionInt(conf, EXPECTED_DELAY_DAYS_KEY).getOrElse(defaultDelayDays)
val consumeThreads = getThreadsToConsume(name, conf, appConfig)
val allowParallel = ConfigUtils.getOptionBoolean(conf, ALLOW_PARALLEL_KEY).getOrElse(true)
val ignoreSchemaChange = ConfigUtils.getOptionBoolean(conf, IGNORE_SCHEMA_CHANGE_KEY).getOrElse(false)
val alwaysAttempt = ConfigUtils.getOptionBoolean(conf, ALWAYS_ATTEMPT_KEY).getOrElse(false)
val dependencies = getDependencies(conf, parent, strictDependencyManagement)
val outputInfoDateExpressionOpt = ConfigUtils.getOptionString(conf, OUTPUT_INFO_DATE_EXPRESSION_KEY)
Expand Down Expand Up @@ -146,6 +149,7 @@ object OperationDef {
expectedDelayDays,
allowParallel,
alwaysAttempt,
ignoreSchemaChange,
consumeThreads,
dependencies,
outputInfoDateExpression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ abstract class TaskRunnerBase(conf: Config,

val runResult = task.job.run(task.infoDate, task.reason, conf)

val (newSchemaRegistered, schemaChangesBeforeTransform) = handleSchemaChange(runResult.data, task.job.outputTable, task.infoDate)
val (newSchemaRegistered, schemaChangesBeforeTransform) = handleSchemaChange(runResult.data, task.job.outputTable, task.job.operation, task.infoDate)

val dfWithTimestamp = task.job.operation.processingTimestampColumn match {
case Some(timestampCol) => addProcessingTimestamp(runResult.data, timestampCol)
Expand Down Expand Up @@ -393,7 +393,7 @@ abstract class TaskRunnerBase(conf: Config,

val (newSchemaRegisteredAfterTransform, schemaChangesAfterTransform) = if (task.job.operation.schemaTransformations.nonEmpty) {
val transformedTable = task.job.outputTable.copy(name = s"${task.job.outputTable.name}_transformed")
handleSchemaChange(dfTransformed, transformedTable, task.infoDate)
handleSchemaChange(dfTransformed, transformedTable, task.job.operation, task.infoDate)
} else {
(false, Nil)
}
Expand Down Expand Up @@ -570,9 +570,10 @@ abstract class TaskRunnerBase(conf: Config,
}
}

private[core] def handleSchemaChange(df: DataFrame, table: MetaTable, infoDate: LocalDate): (Boolean, List[SchemaDifference]) = {
if (table.format.isRaw) {
// Raw tables do need schema check
private[core] def handleSchemaChange(df: DataFrame, table: MetaTable, operationDef: OperationDef, infoDate: LocalDate): (Boolean, List[SchemaDifference]) = {
if (table.format.isRaw || operationDef.ignoreSchemaChange) {
// Raw tables do not need schema check
// When schema changes are explicitly ignored - return no changes
return (false, List.empty[SchemaDifference])
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object OperationDefFactory {
expectedDelayDays: Int = 0,
allowParallel: Boolean = true,
alwaysAttempt: Boolean = false,
ignoreSchemaChange: Boolean = false,
consumeThreads: Int = 1,
dependencies: Seq[MetastoreDependency] = Nil,
outputInfoDateExpression: String = "@date",
Expand All @@ -50,6 +51,7 @@ object OperationDefFactory {
expectedDelayDays,
allowParallel,
alwaysAttempt,
ignoreSchemaChange,
consumeThreads,
dependencies,
outputInfoDateExpression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture {
assert(op.operationType.asInstanceOf[Ingestion].sourceTables.head.metaTableName == "table1_sync")
assert(op.allowParallel)
assert(!op.alwaysAttempt)
assert(!op.ignoreSchemaChange)
assert(op.warnMaxExecutionTimeSeconds.isEmpty)
assert(op.killMaxExecutionTimeSeconds.isEmpty)
assert(op.notificationTargets.size == 2)
Expand All @@ -127,6 +128,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture {
|class = "myclass"
|output.table = "dummy_table"
|always.attempt = "true"
|ignore.schema.change = "true"
|warn.maximum.execution.time.seconds = 50
|kill.maximum.execution.time.seconds = 100
|
Expand Down Expand Up @@ -162,6 +164,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture {
assert(op.operationType.asInstanceOf[Transformation].clazz == "myclass")
assert(op.allowParallel)
assert(op.alwaysAttempt)
assert(op.ignoreSchemaChange)
assert(op.dependencies.length == 2)
assert(op.dependencies.head.tables.contains("table1"))
assert(op.dependencies.head.dateFromExpr.contains("@infoDate - 1"))
Expand Down Expand Up @@ -226,6 +229,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture {
assert(op.schedule.isInstanceOf[Schedule.EveryDay])
assert(op.outputInfoDateExpression == "@date")
assert(op.operationType.asInstanceOf[Transformation].clazz == "myclass")
assert(!op.ignoreSchemaChange)
assert(op.dependencies.length == 2)
assert(op.dependencies.head.tables.contains("table1"))
assert(op.dependencies.head.dateFromExpr.contains("@infoDate - 1"))
Expand Down Expand Up @@ -265,6 +269,7 @@ class OperationDefSuite extends AnyWordSpec with TempDirFixture {
assert(op.operationType.asInstanceOf[OperationType.Sink].sinkTables.head.metaTableName == "table1_sync")
assert(op.operationType.asInstanceOf[OperationType.Sink].sinkTables.head.options("topic") == "table1_topic")
assert(!op.allowParallel)
assert(!op.ignoreSchemaChange)
}

"set a correct number of threads to consume by an operation and handles edge cases" in {
Expand Down
Loading