diff --git a/pramen/api/pom.xml b/pramen/api/pom.xml index 3d6dd1efc..0c9d76deb 100644 --- a/pramen/api/pom.xml +++ b/pramen/api/pom.xml @@ -27,7 +27,7 @@ za.co.absa.pramen pramen - 1.12.14-SNAPSHOT + 1.13.0-SNAPSHOT diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala index addd969f2..c3b7d35d0 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala @@ -20,7 +20,9 @@ import java.time.{Instant, LocalDate} case class MetaTableRunInfo(tableName: String, infoDate: LocalDate, + batchId: Long, inputRecordCount: Long, outputRecordCount: Long, + appendedRecordCount: Option[Long], jobStarted: Instant, jobFinished: Instant) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala index 1216bdeab..6957ad2c9 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/MetastoreReader.scala @@ -128,9 +128,10 @@ trait MetastoreReader { * * @param tableName The name of the table in the metastore. * @param infoDate The information date of the data. + * @param batchId An optional batch ID to filter by. * @return The run info of the table if available. */ - def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] + def getTableRunInfo(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[MetaTableRunInfo] /** * Returns the reason of running the task. This helps transformers and sinks to determine logic based on whether diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala index 0bd40f305..c98d9e1cd 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala @@ -33,5 +33,6 @@ case class PipelineInfo( pipelineNotificationFailures: Seq[PipelineNotificationFailure], pipelineId: String, tenant: Option[String], - country: Option[String] + country: Option[String], + batchId: Long ) diff --git a/pramen/build.sbt b/pramen/build.sbt index a87fed719..7c69214b4 100644 --- a/pramen/build.sbt +++ b/pramen/build.sbt @@ -28,7 +28,8 @@ ThisBuild / organization := "za.co.absa.pramen" ThisBuild / scalaVersion := scala212 ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213) -ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation") +ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation", "-target:jvm-1.8") +ThisBuild / javacOptions := Seq("-source", "1.8", "-target", "1.8") ThisBuild / versionScheme := Some("early-semver") diff --git a/pramen/core/pom.xml b/pramen/core/pom.xml index c70260f79..1552b62e7 100644 --- a/pramen/core/pom.xml +++ b/pramen/core/pom.xml @@ -27,7 +27,7 @@ za.co.absa.pramen pramen - 1.12.14-SNAPSHOT + 1.13.0-SNAPSHOT diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala index b5dfee689..a7ef79e9b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala @@ -42,6 +42,8 @@ trait Bookkeeper { def getLatestDataChunk(table: String, infoDate: LocalDate): Option[DataChunk] + def getDataChunks(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] + def getDataChunksCount(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] @@ -50,6 +52,7 @@ trait Bookkeeper { infoDate: LocalDate, inputRecordCount: Long, outputRecordCount: Long, + appendedRecordCount: Option[Long], jobStarted: Long, jobFinished: Long, isTableTransient: Boolean): Unit @@ -115,23 +118,23 @@ object Bookkeeper { mongoDbConnection match { case Some(connection) => log.info(s"Using MongoDB for bookkeeping.") - new BookkeeperMongoDb(connection) + new BookkeeperMongoDb(connection, batchId) case None => bookkeepingConfig.bookkeepingHadoopFormat match { case HadoopFormat.Text => val path = bookkeepingConfig.bookkeepingLocation.get log.info(s"Using Hadoop (CSV for records, JSON for schemas) for bookkeeping at $path") - new BookkeeperText(path) + new BookkeeperText(path, batchId) case HadoopFormat.Delta => bookkeepingConfig.deltaTablePrefix match { case Some(tablePrefix) => val fullTableName = BookkeeperDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "*") log.info(s"Using Delta Lake managed table '$fullTableName' for bookkeeping.") - new BookkeeperDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix) + new BookkeeperDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix, batchId) case None => val path = bookkeepingConfig.bookkeepingLocation.get log.info(s"Using Delta Lake for bookkeeping at $path") - new BookkeeperDeltaPath(path) + new BookkeeperDeltaPath(path, batchId) } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala index c1afed024..a199de637 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperBase.scala @@ -21,19 +21,24 @@ import za.co.absa.pramen.core.model.DataChunk import java.time.LocalDate import scala.collection.mutable -abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper { +abstract class BookkeeperBase(isBookkeepingEnabled: Boolean, batchId: Long) extends Bookkeeper { private val transientDataChunks = new mutable.HashMap[String, Array[DataChunk]]() def getLatestProcessedDateFromStorage(table: String, until: Option[LocalDate] = None): Option[LocalDate] def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk] + def getDataChunksFromStorage(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] + def getDataChunksCountFromStorage(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long + def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit + private[pramen] def saveRecordCountToStorage(table: String, infoDate: LocalDate, inputRecordCount: Long, outputRecordCount: Long, + recordsAppended: Option[Long], jobStarted: Long, jobFinished: Long): Unit @@ -41,19 +46,23 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper infoDate: LocalDate, inputRecordCount: Long, outputRecordCount: Long, + recordsAppended: Option[Long], jobStarted: Long, jobFinished: Long, isTableTransient: Boolean): Unit = { if (isTableTransient || !isBookkeepingEnabled) { val tableLowerCase = table.toLowerCase - val dataChunk = DataChunk(table, infoDate.toString, infoDate.toString, infoDate.toString, inputRecordCount, outputRecordCount, jobStarted, jobFinished) + val dataChunk = DataChunk(table, infoDate.toString, infoDate.toString, infoDate.toString, inputRecordCount, outputRecordCount, jobStarted, jobFinished, Option(batchId), recordsAppended) this.synchronized { val dataChunks = transientDataChunks.getOrElse(tableLowerCase, Array.empty[DataChunk]) val newDataChunks = (dataChunks :+ dataChunk).sortBy(_.jobFinished) transientDataChunks += tableLowerCase -> newDataChunks } } else { - saveRecordCountToStorage(table, infoDate, inputRecordCount, outputRecordCount, jobStarted, jobFinished) + saveRecordCountToStorage(table, infoDate, inputRecordCount, outputRecordCount, recordsAppended, jobStarted, jobFinished) + if (recordsAppended.isEmpty) { + deleteNonCurrentBatchRecords(table, infoDate) + } } } @@ -82,13 +91,25 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper } } + final def getDataChunks(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = { + val isTransient = this.synchronized { + transientDataChunks.contains(table.toLowerCase) + } + + if (isTransient || !isBookkeepingEnabled) { + getTransientDataChunks(table, Option(infoDate), Option(infoDate), batchId) + } else { + getDataChunksFromStorage(table, infoDate, batchId) + } + } + final def getDataChunksCount(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long = { val isTransient = this.synchronized { transientDataChunks.contains(table.toLowerCase) } if (isTransient || !isBookkeepingEnabled) { - getTransientDataChunks(table, dateBeginOpt, dateEndOpt).length + getTransientDataChunks(table, dateBeginOpt, dateEndOpt, None).length } else { getDataChunksCountFromStorage(table, dateBeginOpt, dateEndOpt) } @@ -100,7 +121,7 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper } private def getLatestTransientDate(table: String, from: Option[LocalDate], until: Option[LocalDate]): Option[LocalDate] = { - val chunks = getTransientDataChunks(table, from, until) + val chunks = getTransientDataChunks(table, from, until, None) if (chunks.isEmpty) { None @@ -112,12 +133,12 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper } private def getLatestTransientChunk(table: String, from: Option[LocalDate], until: Option[LocalDate]): Option[DataChunk] = { - getTransientDataChunks(table, from, until) + getTransientDataChunks(table, from, until, None) .lastOption } - private[core] def getTransientDataChunks(table: String, from: Option[LocalDate], until: Option[LocalDate]): Array[DataChunk] = { + private[core] def getTransientDataChunks(table: String, from: Option[LocalDate], until: Option[LocalDate], batchId: Option[Long]): Array[DataChunk] = { val minDate = from.map(_.toString).getOrElse("0000-00-00") val maxDate = until.map(_.toString).getOrElse("9999-99-99") val tableLowerCase = table.toLowerCase @@ -125,7 +146,12 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper transientDataChunks.getOrElse(tableLowerCase, Array.empty[DataChunk]) } - allChunks.filter(chunk => chunk.infoDate >= minDate && chunk.infoDate <= maxDate) + batchId match { + case Some(id) => + allChunks.filter(chunk => chunk.infoDate >= minDate && chunk.infoDate <= maxDate && chunk.batchId.contains(id)) + case None => + allChunks.filter(chunk => chunk.infoDate >= minDate && chunk.infoDate <= maxDate) + } } protected def getDateStr(date: LocalDate): String = DataChunk.dateFormatter.format(date) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala index 2638e97da..941c951fc 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaBase.scala @@ -26,7 +26,7 @@ import java.time.LocalDate import scala.reflect.ClassTag import scala.reflect.runtime.universe -abstract class BookkeeperDeltaBase extends BookkeeperHadoop { +abstract class BookkeeperDeltaBase(batchId: Long) extends BookkeeperHadoop(batchId) { def getBkDf(filter: Column): Dataset[DataChunk] @@ -60,30 +60,37 @@ abstract class BookkeeperDeltaBase extends BookkeeperHadoop { } final override def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk] = { - val infoDateFilter = getFilter(table, Option(infoDate), Option(infoDate)) + val infoDateFilter = getFilter(table, Option(infoDate), Option(infoDate), None) getBkData(infoDateFilter).lastOption } + final override def getDataChunksFromStorage(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = { + val infoDateFilter = getFilter(tableName, Option(infoDate), Option(infoDate), batchId) + + getBkAllData(infoDateFilter) + } + final def getDataChunksCountFromStorage(table: String, dateBegin: Option[LocalDate], dateEnd: Option[LocalDate]): Long = { - getBkDf(getFilter(table, dateBegin, dateEnd)).count() + getBkDf(getFilter(table, dateBegin, dateEnd, None)).count() } final private[pramen] override def saveRecordCountToStorage(table: String, - infoDate: LocalDate, - inputRecordCount: Long, - outputRecordCount: Long, - jobStarted: Long, - jobFinished: Long): Unit = { + infoDate: LocalDate, + inputRecordCount: Long, + outputRecordCount: Long, + recordsAppended: Option[Long], + jobStarted: Long, + jobFinished: Long): Unit = { val dateStr = getDateStr(infoDate) - val chunk = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished) + val chunk = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished, Option(batchId), recordsAppended) saveRecordCountDelta(chunk) } final override def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] = { - val filter = getFilter(table, None, Option(until)) + val filter = getFilter(table, None, Option(until), None) val df = getSchemasDeltaDf @@ -103,6 +110,12 @@ abstract class BookkeeperDeltaBase extends BookkeeperHadoop { saveSchemaDelta(tableSchema) } + private[core] def getBkAllData(filter: Column): Seq[DataChunk] = { + getBkDf(filter) + .collect() + .sortBy(_.jobFinished) + } + private[core] def getBkData(filter: Column): Seq[DataChunk] = { getBkDf(filter) .collect() diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala index 952418f93..0d9ee4eaa 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala @@ -16,6 +16,7 @@ package za.co.absa.pramen.core.bookkeeper +import io.delta.tables.DeltaTable import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions._ import org.apache.spark.sql.{Column, Dataset, SaveMode, SparkSession} @@ -23,7 +24,7 @@ import za.co.absa.pramen.core.bookkeeper.model.TableSchemaJson import za.co.absa.pramen.core.model.{DataChunk, TableSchema} import za.co.absa.pramen.core.utils.FsUtils -import java.time.Instant +import java.time.{Instant, LocalDate} import scala.reflect.ClassTag import scala.reflect.runtime.universe @@ -34,7 +35,7 @@ object BookkeeperDeltaPath { val locksDirName = "locks" } -class BookkeeperDeltaPath(bookkeepingPath: String)(implicit spark: SparkSession) extends BookkeeperDeltaBase { +class BookkeeperDeltaPath(bookkeepingPath: String, batchId: Long)(implicit spark: SparkSession) extends BookkeeperDeltaBase(batchId) { import BookkeeperDeltaPath._ import spark.implicits._ @@ -67,6 +68,14 @@ class BookkeeperDeltaPath(bookkeepingPath: String)(implicit spark: SparkSession) .save(recordsPath.toUri.toString) } + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { + val infoDateStr = DataChunk.dateFormatter.format(infoDate) + val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId)) + + val deltaTable = DeltaTable.forPath(spark, recordsPath.toUri.toString) + deltaTable.delete(filter) + } + override def getSchemasDeltaDf: Dataset[TableSchemaJson] = { spark .read diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala index 9fa577215..a971f7123 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperHadoop.scala @@ -18,14 +18,13 @@ package za.co.absa.pramen.core.bookkeeper import org.apache.spark.sql.Column import org.apache.spark.sql.functions._ -import za.co.absa.pramen.core.model.DataChunk import java.time.LocalDate -abstract class BookkeeperHadoop extends BookkeeperBase(true) { - private[core] def getFilter(tableName: String, infoDateBegin: Option[LocalDate], infoDateEnd: Option[LocalDate]): Column = { - (infoDateBegin, infoDateEnd) match { +abstract class BookkeeperHadoop(batchId: Long) extends BookkeeperBase(true, batchId) { + private[core] def getFilter(tableName: String, infoDateBegin: Option[LocalDate], infoDateEnd: Option[LocalDate], batchId: Option[Long]): Column = { + val baseFilter = (infoDateBegin, infoDateEnd) match { case (Some(begin), Some(end)) => val beginStr = getDateStr(begin) val endStr = getDateStr(end) @@ -39,5 +38,10 @@ abstract class BookkeeperHadoop extends BookkeeperBase(true) { case (None, None) => col("tableName") === tableName } + + batchId match { + case Some(id) => baseFilter && col("batchId") === lit(id) + case None => baseFilter + } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala index 4d077dc5f..ee76b8ada 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperJdbc.scala @@ -24,12 +24,13 @@ import za.co.absa.pramen.core.model.{DataChunk, TableSchema} import za.co.absa.pramen.core.rdb.PramenDb.DEFAULT_RETRIES import za.co.absa.pramen.core.reader.JdbcUrlSelector import za.co.absa.pramen.core.reader.model.JdbcConfig -import za.co.absa.pramen.core.utils.SlickUtils +import za.co.absa.pramen.core.utils.SlickUtils.{WARN_IF_LONGER_MS, log} +import za.co.absa.pramen.core.utils.{AlgorithmUtils, SlickUtils, TimeUtils} import java.time.LocalDate import scala.util.control.NonFatal -class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { +class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true, batchId) { import za.co.absa.pramen.core.utils.FutureImplicits._ private val log = LoggerFactory.getLogger(this.getClass) @@ -54,7 +55,7 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { val chunks = try { SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query) - .map(toChunk) + .map(DataChunk.fromRecord) } catch { case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping table.", ex) } @@ -67,14 +68,27 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { } } + override def getDataChunksFromStorage(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = { + val query = getFilter(table, Option(infoDate), Option(infoDate), batchId) + + try { + SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query) + .map(DataChunk.fromRecord) + .toArray[DataChunk] + .sortBy(_.jobFinished) + } catch { + case NonFatal(ex) => throw new RuntimeException(s"Unable to read from the bookkeeping table.", ex) + } + } + override def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk] = { - val query = getFilter(table, Option(infoDate), Option(infoDate)) + val query = getFilter(table, Option(infoDate), Option(infoDate), None) .sortBy(r => r.jobFinished.desc) .take(1) try { val records = SlickUtils.executeQuery[BookkeepingRecords, BookkeepingRecord](db, query) - .map(toChunk) + .map(DataChunk.fromRecord) .toArray[DataChunk] if (records.length > 1) @@ -86,7 +100,7 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { } def getDataChunksCountFromStorage(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long = { - val query = getFilter(table, dateBeginOpt, dateEndOpt) + val query = getFilter(table, dateBeginOpt, dateEndOpt, None) .length val count = try { @@ -98,15 +112,16 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { count } - private[pramen] override def saveRecordCountToStorage(table: String, - infoDate: LocalDate, - inputRecordCount: Long, - outputRecordCount: Long, - jobStarted: Long, - jobFinished: Long): Unit = { + override def saveRecordCountToStorage(table: String, + infoDate: LocalDate, + inputRecordCount: Long, + outputRecordCount: Long, + recordsAppended: Option[Long], + jobStarted: Long, + jobFinished: Long): Unit = { val dateStr = DataChunk.dateFormatter.format(infoDate) - val record = BookkeepingRecord(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished) + val record = BookkeepingRecord(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, recordsAppended, jobStarted, jobFinished, Option(batchId)) try { db.run( @@ -117,17 +132,32 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { } } - private[pramen] override def getOffsetManager: OffsetManager = { - offsetManagement + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { + val dateStr = DataChunk.dateFormatter.format(infoDate) + + val query = BookkeepingRecords.records + .filter(r => r.pramenTableName === table && r.infoDate === dateStr && r.batchId =!= Option(batchId)) + .delete + + try { + AlgorithmUtils.runActionWithElapsedTimeEvent(WARN_IF_LONGER_MS) { + db.run(query).execute() + } { actualTimeMs => + val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(actualTimeMs) + val sql = query.statements.mkString("; ") + log.warn(s"Action execution time: $elapsedTime. SQL: $sql") + } + } catch { + case NonFatal(ex) => throw new RuntimeException(s"Unable to delete non-current batch records from the bookkeeping table.", ex) + } } - private def toChunk(r: BookkeepingRecord): DataChunk = { - DataChunk( - r.pramenTableName, r.infoDate, r.infoDateBegin, r.infoDateEnd, r.inputRecordCount, r.outputRecordCount, r.jobStarted, r.jobFinished) + private[pramen] override def getOffsetManager: OffsetManager = { + offsetManagement } - private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate]): Query[BookkeepingRecords, BookkeepingRecord, Seq] = { - (infoDateBeginOpt, infoDateEndOpt) match { + private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Query[BookkeepingRecords, BookkeepingRecord, Seq] = { + val baseFilter = (infoDateBeginOpt, infoDateEndOpt) match { case (Some(infoDateBegin), Some(infoDateEnd)) => val date0Str = DataChunk.dateFormatter.format(infoDateBegin) val date1Str = DataChunk.dateFormatter.format(infoDateEnd) @@ -153,6 +183,11 @@ class BookkeeperJdbc(db: Database, batchId: Long) extends BookkeeperBase(true) { BookkeepingRecords.records .filter(r => r.pramenTableName === tableName) } + + batchId match { + case Some(id) => baseFilter.filter(r => r.batchId === Option(id)) + case None => baseFilter + } } override def getLatestSchema(table: String, infoDate: LocalDate): Option[(StructType, LocalDate)] = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala index f9678d8f8..9eca65253 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperMongoDb.scala @@ -29,6 +29,7 @@ import za.co.absa.pramen.core.dao.MongoDb import za.co.absa.pramen.core.dao.model.{ASC, IndexField} import za.co.absa.pramen.core.model.{DataChunk, TableSchema} import za.co.absa.pramen.core.mongo.MongoDbConnection +import za.co.absa.pramen.core.utils.{AlgorithmUtils, TimeUtils} import java.time.LocalDate import scala.util.control.NonFatal @@ -37,16 +38,16 @@ object BookkeeperMongoDb { val collectionName = "bookkeeping" val schemaCollectionName = "schemas" - val MODEL_VERSION = 2 + val MODEL_VERSION = 3 } -class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection) extends BookkeeperBase(true) { - +class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection, batchId: Long) extends BookkeeperBase(true, batchId) { import BookkeeperMongoDb._ import za.co.absa.pramen.core.dao.ScalaMongoImplicits._ private val log = LoggerFactory.getLogger(this.getClass) + private val queryWarningTimeoutMs = 10000L private val codecRegistry: CodecRegistry = fromRegistries(fromProviders(classOf[DataChunk], classOf[TableSchema]), DEFAULT_CODEC_REGISTRY) private val db = mongoDbConnection.getDatabase @@ -79,7 +80,7 @@ class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection) extends Bookkeeper } override def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk] = { - val infoDateFilter = getFilter(table, Option(infoDate), Option(infoDate)) + val infoDateFilter = getFilter(table, Option(infoDate), Option(infoDate), None) collection.find(infoDateFilter) .sort(Sorts.descending("jobFinished")) @@ -89,25 +90,49 @@ class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection) extends Bookkeeper } def getDataChunksCountFromStorage(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long = { - collection.countDocuments(getFilter(table, dateBeginOpt, dateEndOpt)).execute() + collection.countDocuments(getFilter(table, dateBeginOpt, dateEndOpt, None)).execute() + } + + override def getDataChunksFromStorage(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = { + val chunks = collection.find(getFilter(table, Option(infoDate), Option(infoDate), batchId)).execute() + .sortBy(_.jobFinished) + log.debug(s"For $table ($infoDate) : ${chunks.mkString("[ ", ", ", " ]")}") + chunks } - private[pramen] override def saveRecordCountToStorage(table: String, - infoDate: LocalDate, - inputRecordCount: Long, - outputRecordCount: Long, - jobStarted: Long, - jobFinished: Long): Unit = { + override def saveRecordCountToStorage(table: String, + infoDate: LocalDate, + inputRecordCount: Long, + outputRecordCount: Long, + recordsAppended: Option[Long], + jobStarted: Long, + jobFinished: Long): Unit = { val dateStr = DataChunk.dateFormatter.format(infoDate) - val chunk = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished) + val record = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished, Option(batchId), recordsAppended) - val opts = (new ReplaceOptions).upsert(true) - collection.replaceOne(getFilter(table, Option(infoDate), Option(infoDate)), chunk, opts).execute() + collection.insertOne(record).execute() } - private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate]): Bson = { - (infoDateBeginOpt, infoDateEndOpt) match { + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { + val dateStr = DataChunk.dateFormatter.format(infoDate) + + val filter = Filters.and( + Filters.eq("tableName", table), + Filters.eq("infoDate", dateStr), + Filters.ne("batchId", batchId) + ) + + AlgorithmUtils.runActionWithElapsedTimeEvent(queryWarningTimeoutMs) { + collection.deleteMany(filter).execute() + }{ actualTimeMs => + val elapsedTime = TimeUtils.prettyPrintElapsedTimeShort(actualTimeMs) + log.warn(s"MongoDB query took too long ($elapsedTime) while deleting from $collectionName, tableName='$table', infoDate='$infoDate', batchId!=$batchId") + } + } + + private def getFilter(tableName: String, infoDateBeginOpt: Option[LocalDate], infoDateEndOpt: Option[LocalDate], batchId: Option[Long]): Bson = { + val baseFilter = (infoDateBeginOpt, infoDateEndOpt) match { case (Some(infoDateBegin), Some(infoDateEnd)) => val date0Str = DataChunk.dateFormatter.format(infoDateBegin) @@ -139,6 +164,10 @@ class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection) extends Bookkeeper Filters.eq("tableName", tableName) } + batchId match { + case Some(id) => Filters.and(baseFilter, Filters.eq("batchId", id)) + case None => baseFilter + } } private def getSchemaGetFilter(tableName: String, until: LocalDate): Bson = { @@ -159,12 +188,18 @@ class BookkeeperMongoDb(mongoDbConnection: MongoDbConnection) extends Bookkeeper val dbVersion = d.getVersion() if (!d.doesCollectionExists(collectionName)) { d.createCollection(collectionName) - d.createIndex(collectionName, IndexField("tableName", ASC) :: IndexField("infoDate", ASC) :: Nil, unique = true) + d.createIndex(collectionName, IndexField("tableName", ASC) :: IndexField("infoDate", ASC) :: Nil) } if (dbVersion < 2) { d.createCollection(schemaCollectionName) d.createIndex(schemaCollectionName, IndexField("tableName", ASC) :: IndexField("infoDate", ASC) :: Nil, unique = true) } + if (dbVersion < 3 && dbVersion > 0) { + val keys = IndexField("tableName", ASC) :: IndexField("infoDate", ASC) :: Nil + // Make the bookkeeping index non-unique + d.dropIndex(collectionName, keys) + d.createIndex(collectionName, keys) + } if (dbVersion < MODEL_VERSION) { d.setVersion(MODEL_VERSION) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala index b5c9836f4..3b69b01fe 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperNull.scala @@ -26,23 +26,28 @@ import java.time.LocalDate * this bookkeeper implementation is used. It always returns the state as if every table is new and no information is * available. */ -class BookkeeperNull() extends BookkeeperBase(false) { +class BookkeeperNull() extends BookkeeperBase(false, 0L) { override val bookkeepingEnabled: Boolean = false override def getLatestProcessedDateFromStorage(table: String, until: Option[LocalDate]): Option[LocalDate] = None override def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk] = None + override def getDataChunksFromStorage(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = Nil + override def getDataChunksCountFromStorage(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long = 0 private[pramen] override def saveRecordCountToStorage(table: String, infoDate: LocalDate, inputRecordCount: Long, outputRecordCount: Long, + recordsAppended: Option[Long], jobStarted: Long, jobFinished: Long): Unit = {} override def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] = None override def saveSchema(table: String, infoDate: LocalDate, schema: StructType): Unit = {} + + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {} } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala index 203859d92..42fe1b60d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperText.scala @@ -38,7 +38,7 @@ object BookkeeperText { val locksDirName = "locks" } -class BookkeeperText(bookkeepingPath: String)(implicit spark: SparkSession) extends BookkeeperHadoop { +class BookkeeperText(bookkeepingPath: String, batchId: Long)(implicit spark: SparkSession) extends BookkeeperHadoop(batchId) { import BookkeeperText._ import spark.implicits._ @@ -77,18 +77,25 @@ class BookkeeperText(bookkeepingPath: String)(implicit spark: SparkSession) exte } } + override def getDataChunksFromStorage(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = { + val infoDateFilter = getFilter(tableName, Option(infoDate), Option(infoDate), batchId) + + getData(infoDateFilter) + } + override def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk] = { - getData(getFilter(table, Option(infoDate), Option(infoDate))).lastOption + getData(getFilter(table, Option(infoDate), Option(infoDate), None)).lastOption } def getDataChunksCountFromStorage(table: String, dateBegin: Option[LocalDate], dateEnd: Option[LocalDate]): Long = { - getDf(getFilter(table, dateBegin, dateEnd)).count() + getDf(getFilter(table, dateBegin, dateEnd, None)).count() } private[pramen] override def saveRecordCountToStorage(table: String, infoDate: LocalDate, inputRecordCount: Long, outputRecordCount: Long, + recordsAppended: Option[Long], jobStarted: Long, jobFinished: Long): Unit = { val lock: TokenLockHadoopPath = getLock @@ -96,7 +103,7 @@ class BookkeeperText(bookkeepingPath: String)(implicit spark: SparkSession) exte try { val dateStr = getDateStr(infoDate) - val chunk = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished) + val chunk = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished, Option(batchId), recordsAppended) val csv = CsvUtils.getRecord(chunk, '|') fsUtils.appendFile(bkFilePath, csv) @@ -151,16 +158,11 @@ class BookkeeperText(bookkeepingPath: String)(implicit spark: SparkSession) exte private def getData(filter: Column): Seq[DataChunk] = { getDf(filter) .collect() - .groupBy(v => (v.tableName, v.infoDate)) - .map { case (_, listChunks) => - listChunks.maxBy(c => c.jobFinished) - } - .toArray[DataChunk] .sortBy(_.jobFinished) } override def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] = { - val filter = getFilter(table, None, Option(until)) + val filter = getFilter(table, None, Option(until), None) val df = spark .read @@ -191,4 +193,9 @@ class BookkeeperText(bookkeepingPath: String)(implicit spark: SparkSession) exte lock.release() } } + + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { + // No-op: CSV-based storage doesn't support efficient in-place deletion. + // Cross-batch replacement is not supported for BookkeeperText. + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala index 97db34902..8d2e387ec 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecord.scala @@ -22,8 +22,7 @@ case class BookkeepingRecord(pramenTableName: String, infoDateEnd: String, inputRecordCount: Long, outputRecordCount: Long, + appendedRecordCount: Option[Long], jobStarted: Long, - jobFinished: Long) - - - + jobFinished: Long, + batchId: Option[Long]) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala index be9aac585..2562d6503 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/model/BookkeepingRecords.scala @@ -26,11 +26,13 @@ class BookkeepingRecords(tag: Tag) extends Table[BookkeepingRecord](tag, "bookke def infoDateEnd = column[String]("info_date_end", O.Length(20)) def inputRecordCount = column[Long]("input_record_count") def outputRecordCount = column[Long]("output_record_count") + def appendedRecordCount = column[Option[Long]]("appended_record_count") def jobStarted = column[Long]("job_started") def jobFinished = column[Long]("job_finished") + def batchId = column[Option[Long]]("batch_id") def * = (pramenTableName, infoDate, infoDateBegin, infoDateEnd, - inputRecordCount, outputRecordCount, - jobStarted, jobFinished) <> (BookkeepingRecord.tupled, BookkeepingRecord.unapply) + inputRecordCount, outputRecordCount, appendedRecordCount, + jobStarted, jobFinished, batchId) <> (BookkeepingRecord.tupled, BookkeepingRecord.unapply) def idx1 = index("bk_idx_1", (pramenTableName, infoDate), unique = false) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala index fc22caf01..a27b7a0e5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala @@ -88,7 +88,8 @@ class JournalHadoopCsv(journalPath: String) pipelineName = v.pipelineName, environmentName = v.environmentName, tenant = v.tenant, - country = v.country + country = v.country, + batchId = v.batchId.getOrElse(0L) )) } @@ -125,6 +126,7 @@ class JournalHadoopCsv(journalPath: String) t.environmentName.getOrElse("") :: t.tenant.getOrElse("") :: t.country.getOrElse("") :: + t.batchId :: Nil record.mkString("", s"$separator", "\n") } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala index ea3c69ced..d2e2cbaf2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala @@ -56,7 +56,8 @@ class JournalJdbc(db: Database) extends Journal { entry.pipelineName, entry.environmentName, entry.tenant, - entry.country) + entry.country, + Option(entry.batchId)) try { db.run( @@ -97,7 +98,8 @@ class JournalJdbc(db: Database) extends Journal { pipelineName = v.pipelineName, environmentName = v.environmentName, tenant = v.tenant, - country = v.country + country = v.country, + batchId = v.batchId.getOrElse(0L) ) }).toList } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala index b1aca8b38..74e5d2cd0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTask.scala @@ -37,5 +37,6 @@ case class JournalTask( pipelineName: Option[String], environmentName: Option[String], tenant: Option[String], - country: Option[String] + country: Option[String], + batchId: Option[Long] ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala index 246fcf1fc..35d4637d7 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/JournalTasks.scala @@ -41,11 +41,12 @@ class JournalTasks(tag: Tag) extends Table[JournalTask](tag, "journal") { def environmentName = column[Option[String]]("environmentName", O.Length(128)) def tenant = column[Option[String]]("tenant", O.Length(200)) def country = column[Option[String]]("country", O.Length(50)) + def batchId = column[Option[Long]]("batch_id") def * = (jobName, pramenTableName, periodBegin, periodEnd, informationDate, inputRecordCount, inputRecordCountOld, outputRecordCount, outputRecordCountOld, appendedRecordCount, outputSize, startedAt, finishedAt, status, failureReason, sparkApplicationId, pipelineId, pipelineName, environmentName, - tenant, country) <> (JournalTask.tupled, JournalTask.unapply) + tenant, country, batchId) <> (JournalTask.tupled, JournalTask.unapply) def idx1 = index("idx_started_at", startedAt, unique = false) def idx2 = index("idx_finished_at", finishedAt, unique = false) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala index ae005e966..1fc2fb969 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala @@ -44,7 +44,8 @@ case class TaskCompleted( pipelineName: Option[String], environmentName: Option[String], tenant: Option[String], - country: Option[String] + country: Option[String], + batchId: Long ) object TaskCompleted { @@ -86,7 +87,8 @@ object TaskCompleted { Option(pipelineInfo.pipelineName), Option(pipelineInfo.environment), pipelineInfo.tenant, - pipelineInfo.country + pipelineInfo.country, + pipelineInfo.batchId ) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala index b0a2989ad..ee49e758d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala @@ -37,5 +37,6 @@ case class TaskCompletedCsv( pipelineName: Option[String], environmentName: Option[String], tenant: Option[String], - country: Option[String] + country: Option[String], + batchId: Option[Long] ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 306d61634..47bf65856 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -140,7 +140,12 @@ class MetastoreImpl(appConfig: Config, stats.recordCount.foreach{recordCount => if (!skipBookKeepingUpdates && !nothingAppended) { - bookkeeper.setRecordCount(tableName, infoDate, inputRecordCount.getOrElse(recordCount), recordCount, start, finish, isTransient) + val overwrite = saveModeOverride.contains(SaveMode.Overwrite) + val recordsAppended = if (overwrite) + None + else + stats.recordCountAppended + bookkeeper.setRecordCount(tableName, infoDate, inputRecordCount.getOrElse(recordCount), recordCount, recordsAppended, start, finish, isTransient) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala index 4673984e4..b082a8e54 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala @@ -70,10 +70,19 @@ abstract class MetastoreReaderBase(metastore: Metastore, MetaTable.getMetaTableDef(metastore.getTableDef(tableName)) } - override def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] = { - bookkeeper.getLatestDataChunk(tableName, infoDate) + override def getTableRunInfo(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[MetaTableRunInfo] = { + bookkeeper.getDataChunks(tableName, infoDate, batchId) .map(chunk => - MetaTableRunInfo(tableName, LocalDate.parse(chunk.infoDate), chunk.inputRecordCount, chunk.outputRecordCount, Instant.ofEpochSecond(chunk.jobStarted), Instant.ofEpochSecond(chunk.jobFinished)) + MetaTableRunInfo( + tableName, + LocalDate.parse(chunk.infoDate), + chunk.batchId.getOrElse(0L), + chunk.inputRecordCount, + chunk.outputRecordCount, + chunk.appendedRecordCount, + Instant.ofEpochSecond(chunk.jobStarted), + Instant.ofEpochSecond(chunk.jobFinished) + ) ) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala index 98a8a0d32..03444ab2b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/model/DataChunk.scala @@ -16,20 +16,33 @@ package za.co.absa.pramen.core.model +import za.co.absa.pramen.core.bookkeeper.model.BookkeepingRecord + import java.time.format.DateTimeFormatter +/** + * Represents a data chunk for storing bookkeeping records in non-relational storage (e.g., CSV files). + * The order of columns must be preserved to maintain compatibility with existing CSV files + * since the field order in CSV directly depends on it. + */ case class DataChunk(tableName: String, - infoDate: String, /* Use String to workaround serialization issues */ + infoDate: String, infoDateBegin: String, infoDateEnd: String, inputRecordCount: Long, outputRecordCount: Long, jobStarted: Long, - jobFinished: Long) + jobFinished: Long, + batchId: Option[Long], + appendedRecordCount: Option[Long]) object DataChunk { /* This is how info dates are stored */ val datePersistFormat = "yyyy-MM-dd" val dateFormatter: DateTimeFormatter = DateTimeFormatter.ofPattern(datePersistFormat) + + def fromRecord(r: BookkeepingRecord): DataChunk = { + DataChunk(r.pramenTableName, r.infoDate, r.infoDateBegin, r.infoDateEnd, r.inputRecordCount, r.outputRecordCount, r.jobStarted, r.jobFinished, r.batchId, r.appendedRecordCount) + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala index b3014e9d8..75482b42d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala @@ -144,6 +144,7 @@ class PythonTransformationJob(operationDef: OperationDef, infoDate, recordCount, recordCount, + None, jobStarted.getEpochSecond, jobFinished.getEpochSecond, isTableTransient = false) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index c6f6dab5e..20bc2b214 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -166,6 +166,7 @@ class SinkJob(operationDef: OperationDef, infoDate, inputRecordCount.getOrElse(sinkResult.recordsSent), sinkResult.recordsSent, + Option(sinkResult.recordsSent), jobStarted.getEpochSecond, jobFinished.getEpochSecond, isTransient diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala index 9ca535138..5f686a106 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/rdb/PramenDb.scala @@ -89,9 +89,16 @@ class PramenDb(val jdbcConfig: JdbcConfig, if (0 < dbVersion && dbVersion < 7) { addColumn(LockTickets.lockTickets.baseTableRow.tableName, "created_at", "bigint") } + if (0 < dbVersion && dbVersion < 8) { addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "country", "varchar(50)") } + + if (0 < dbVersion && dbVersion < 9) { + addColumn(BookkeepingRecords.records.baseTableRow.tableName, "batch_id", "bigint") + addColumn(BookkeepingRecords.records.baseTableRow.tableName, "appended_record_count", "bigint") + addColumn(JournalTasks.journalTasks.baseTableRow.tableName, "batch_id", "bigint") + } } def initTable(schema: H2Profile.SchemaDescription): Unit = { @@ -127,7 +134,7 @@ class PramenDb(val jdbcConfig: JdbcConfig, } object PramenDb { - val MODEL_VERSION = 8 + val MODEL_VERSION = 9 val DEFAULT_RETRIES = 3 def apply(jdbcConfig: JdbcConfig): PramenDb = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index a8042703e..3a79b39e8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -327,7 +327,7 @@ abstract class TaskRunnerBase(conf: Config, log.info(s"SKIP validation failure for the task: $outputTableName for date: ${task.infoDate}. Reason: $msg") if (bookkeeper.getLatestDataChunk(outputTableName, task.infoDate).isEmpty) { val isTransient = task.job.outputTable.format.isTransient - bookkeeper.setRecordCount(outputTableName, task.infoDate, status.inputRecordsCount.getOrElse(0L), 0, started.getEpochSecond, Instant.now().getEpochSecond, isTransient) + bookkeeper.setRecordCount(outputTableName, task.infoDate, status.inputRecordsCount.getOrElse(0L), 0, None, started.getEpochSecond, Instant.now().getEpochSecond, isTransient) } Left(TaskResult(task.job.taskDef, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, newSchemaRegistered = false, Nil, status.dependencyWarnings, Nil, options)) case Reason.SkipOnce(msg) => diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index fc65e1db9..592ffd477 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -139,7 +139,8 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification pipelineNotificationFailures.toSeq, pipelineId, tenant, - country + country, + batchId ) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala index 3d0dd6024..cf2ea6bee 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/AlgorithmUtils.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.utils import org.slf4j.Logger +import java.time.{Duration, Instant} import scala.annotation.tailrec import scala.collection.mutable @@ -92,4 +93,16 @@ object AlgorithmUtils { } } } + + final def runActionWithElapsedTimeEvent[R](maxTimeMs: Long)(action: => R)(onMaxTimeBreach: Long => Unit): R = { + val start = Instant.now + val result = action + val finish = Instant.now + + val duration = Duration.between(start, finish).toMillis + if (duration > maxTimeMs) { + onMaxTimeBreach(duration) + } + result + } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scala index c16842c3c..5be7bada3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/CsvUtils.scala @@ -38,7 +38,14 @@ object CsvUtils { .filterNot(_.getName.contains('$')) .map(field => { field.setAccessible(true) - field.get(obj).toString.replace(separator, ' ') + val ref = field.get(obj) + // Handle Option[_] + val str = ref match { + case Some(x) => x.toString + case None => "" + case x => x.toString + } + str.replace(separator, ' ') }).mkString("", s"$separator", "\n") } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala index 865ae6366..e777d08e1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SlickUtils.scala @@ -30,7 +30,7 @@ object SlickUtils { private val log = LoggerFactory.getLogger(this.getClass) - private val WARN_IF_LONGER_MS = 1000L + val WARN_IF_LONGER_MS = 1000L /** * Synchronously executes a query against a JDBC connection. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala similarity index 86% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala rename to pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala index b93289600..476b41619 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala +++ b/pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{Column, Dataset, SaveMode, SparkSession} import za.co.absa.pramen.core.bookkeeper.model.TableSchemaJson import za.co.absa.pramen.core.model.{DataChunk, TableSchema} -import java.time.Instant +import java.time.{Instant, LocalDate} import scala.reflect.ClassTag import scala.reflect.runtime.universe @@ -38,8 +38,9 @@ object BookkeeperDeltaTable { } class BookkeeperDeltaTable(database: Option[String], - tablePrefix: String) - (implicit spark: SparkSession) extends BookkeeperDeltaBase { + tablePrefix: String, + batchId: Long) + (implicit spark: SparkSession) extends BookkeeperDeltaBase(batchId) { import BookkeeperDeltaTable._ import spark.implicits._ @@ -56,15 +57,20 @@ class BookkeeperDeltaTable(database: Option[String], .as[DataChunk] } - override def saveRecordCountDelta(dataChunks: DataChunk): Unit = { - val df = Seq(dataChunks).toDF() + override def saveRecordCountDelta(dataChunk: DataChunk): Unit = { + val df = Seq(dataChunk).toDF() df.write + .format("delta") .mode(SaveMode.Append) .option("mergeSchema", "true") .saveAsTable(recordsFullTableName) } + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { + // This is not supported for Delta tables using Spark 2.* + } + override def getSchemasDeltaDf: Dataset[TableSchemaJson] = { spark.table(schemasFullTableName).as[TableSchemaJson] } @@ -75,6 +81,7 @@ class BookkeeperDeltaTable(database: Option[String], ).toDF() df.write + .format("delta") .mode(SaveMode.Append) .option("mergeSchema", "true") .saveAsTable(schemasFullTableName) @@ -84,7 +91,7 @@ class BookkeeperDeltaTable(database: Option[String], val df = Seq.empty[T].toDS df.write - .mode(SaveMode.Overwrite) + .format("delta") .saveAsTable(pathOrTable) } diff --git a/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala new file mode 100644 index 000000000..5c150abc1 --- /dev/null +++ b/pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper + +import io.delta.tables.DeltaTable +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.{Column, Dataset, SaveMode, SparkSession} +import za.co.absa.pramen.core.bookkeeper.model.TableSchemaJson +import za.co.absa.pramen.core.model.{DataChunk, TableSchema} + +import java.time.{Instant, LocalDate} +import scala.reflect.ClassTag +import scala.reflect.runtime.universe + +object BookkeeperDeltaTable { + val recordsTable = "bookkeeping" + val schemasTable = "schemas" + + def getFullTableName(databaseOpt: Option[String], tablePrefix: String, tableName: String): String = { + databaseOpt match { + case Some(db) => s"$db.$tablePrefix$tableName" + case None => s"$tablePrefix$tableName" + } + } +} + +class BookkeeperDeltaTable(database: Option[String], + tablePrefix: String, + batchId: Long) + (implicit spark: SparkSession) extends BookkeeperDeltaBase(batchId) { + import BookkeeperDeltaTable._ + import spark.implicits._ + + private val recordsFullTableName = getFullTableName(database, tablePrefix, recordsTable) + private val schemasFullTableName = getFullTableName(database, tablePrefix, schemasTable) + + init() + + override def getBkDf(filter: Column): Dataset[DataChunk] = { + val df = spark.table(recordsFullTableName).as[DataChunk] + + df.filter(filter) + .orderBy(col("jobFinished")) + .as[DataChunk] + } + + override def saveRecordCountDelta(dataChunk: DataChunk): Unit = { + val df = Seq(dataChunk).toDF() + + df.write + .format("delta") + .mode(SaveMode.Append) + .option("mergeSchema", "true") + .saveAsTable(recordsFullTableName) + } + + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { + val infoDateStr = DataChunk.dateFormatter.format(infoDate) + val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId)) + + val deltaTable = DeltaTable.forName(spark, recordsFullTableName) + deltaTable.delete(filter) + } + + override def getSchemasDeltaDf: Dataset[TableSchemaJson] = { + spark.table(schemasFullTableName).as[TableSchemaJson] + } + + override def saveSchemaDelta(schema: TableSchema): Unit = { + val df = Seq( + TableSchemaJson(schema.tableName, schema.infoDate, schema.schemaJson, Instant.now().toEpochMilli) + ).toDF() + + df.write + .format("delta") + .mode(SaveMode.Append) + .option("mergeSchema", "true") + .saveAsTable(schemasFullTableName) + } + + override def writeEmptyDataset[T <: Product : universe.TypeTag : ClassTag](pathOrTable: String): Unit = { + val df = Seq.empty[T].toDS + + df.write + .format("delta") + .saveAsTable(pathOrTable) + } + + def init(): Unit = { + initRecordsDirectory() + initSchemasDirectory() + } + + private def initRecordsDirectory(): Unit = { + if (!spark.catalog.tableExists(recordsFullTableName)) { + writeEmptyDataset[DataChunk](recordsFullTableName) + } + } + + private def initSchemasDirectory(): Unit = { + if (!spark.catalog.tableExists(schemasFullTableName)) { + writeEmptyDataset[TableSchemaJson](schemasFullTableName) + } + } +} diff --git a/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala b/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala new file mode 100644 index 000000000..5c150abc1 --- /dev/null +++ b/pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.bookkeeper + +import io.delta.tables.DeltaTable +import org.apache.spark.sql.functions.{col, lit} +import org.apache.spark.sql.{Column, Dataset, SaveMode, SparkSession} +import za.co.absa.pramen.core.bookkeeper.model.TableSchemaJson +import za.co.absa.pramen.core.model.{DataChunk, TableSchema} + +import java.time.{Instant, LocalDate} +import scala.reflect.ClassTag +import scala.reflect.runtime.universe + +object BookkeeperDeltaTable { + val recordsTable = "bookkeeping" + val schemasTable = "schemas" + + def getFullTableName(databaseOpt: Option[String], tablePrefix: String, tableName: String): String = { + databaseOpt match { + case Some(db) => s"$db.$tablePrefix$tableName" + case None => s"$tablePrefix$tableName" + } + } +} + +class BookkeeperDeltaTable(database: Option[String], + tablePrefix: String, + batchId: Long) + (implicit spark: SparkSession) extends BookkeeperDeltaBase(batchId) { + import BookkeeperDeltaTable._ + import spark.implicits._ + + private val recordsFullTableName = getFullTableName(database, tablePrefix, recordsTable) + private val schemasFullTableName = getFullTableName(database, tablePrefix, schemasTable) + + init() + + override def getBkDf(filter: Column): Dataset[DataChunk] = { + val df = spark.table(recordsFullTableName).as[DataChunk] + + df.filter(filter) + .orderBy(col("jobFinished")) + .as[DataChunk] + } + + override def saveRecordCountDelta(dataChunk: DataChunk): Unit = { + val df = Seq(dataChunk).toDF() + + df.write + .format("delta") + .mode(SaveMode.Append) + .option("mergeSchema", "true") + .saveAsTable(recordsFullTableName) + } + + override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = { + val infoDateStr = DataChunk.dateFormatter.format(infoDate) + val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId)) + + val deltaTable = DeltaTable.forName(spark, recordsFullTableName) + deltaTable.delete(filter) + } + + override def getSchemasDeltaDf: Dataset[TableSchemaJson] = { + spark.table(schemasFullTableName).as[TableSchemaJson] + } + + override def saveSchemaDelta(schema: TableSchema): Unit = { + val df = Seq( + TableSchemaJson(schema.tableName, schema.infoDate, schema.schemaJson, Instant.now().toEpochMilli) + ).toDF() + + df.write + .format("delta") + .mode(SaveMode.Append) + .option("mergeSchema", "true") + .saveAsTable(schemasFullTableName) + } + + override def writeEmptyDataset[T <: Product : universe.TypeTag : ClassTag](pathOrTable: String): Unit = { + val df = Seq.empty[T].toDS + + df.write + .format("delta") + .saveAsTable(pathOrTable) + } + + def init(): Unit = { + initRecordsDirectory() + initSchemasDirectory() + } + + private def initRecordsDirectory(): Unit = { + if (!spark.catalog.tableExists(recordsFullTableName)) { + writeEmptyDataset[DataChunk](recordsFullTableName) + } + } + + private def initSchemasDirectory(): Unit = { + if (!spark.catalog.tableExists(schemasFullTableName)) { + writeEmptyDataset[TableSchemaJson](schemasFullTableName) + } + } +} diff --git a/pramen/core/src/test/resources/log4j.properties b/pramen/core/src/test/resources/log4j.properties index b682b4759..64f1a173b 100644 --- a/pramen/core/src/test/resources/log4j.properties +++ b/pramen/core/src/test/resources/log4j.properties @@ -35,3 +35,4 @@ log4j.logger.za.co.absa.pramen.core.utils.ConfigUtils$=OFF log4j.logger.za.co.absa.pramen.core.utils.JdbcNativeUtils$=OFF log4j.logger.za.co.absa.pramen.core.utils.SparkUtils$=OFF log4j.logger.za.co.absa.pramen.core.utils.ThreadUtils$=OFF +log4j.logger.za.co.absa.pramen.core.mocks.dao.MongoDbSingleton$=WARN diff --git a/pramen/core/src/test/resources/log4j2.properties b/pramen/core/src/test/resources/log4j2.properties index 2686a2678..315056667 100644 --- a/pramen/core/src/test/resources/log4j2.properties +++ b/pramen/core/src/test/resources/log4j2.properties @@ -65,3 +65,6 @@ logger.sparkutils.level = OFF logger.threadutils.name = za.co.absa.pramen.core.utils.ThreadUtils$ logger.threadutils.level = OFF + +logger.mongodbsingleton.name = za.co.absa.pramen.core.mocks.dao.MongoDbSingleton$ +logger.mongodbsingleton.level = WARN diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scala index 9298c2b03..4c8199d59 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/fixtures/MongoDbFixture.scala @@ -28,7 +28,7 @@ trait MongoDbFixture extends BeforeAndAfterAll { import za.co.absa.pramen.core.dao.ScalaMongoImplicits._ - private val (mongoDbExecutable, mongoPort) = MongoDbSingleton.embeddedMongoDb + protected val (mongoDbExecutable, mongoPort) = MongoDbSingleton.embeddedMongoDb def connectionString: String = s"mongodb://localhost:$mongoPort" diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 3e4865ed5..2c6d13a86 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -249,7 +249,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val df1 = m.getTable("table1", Some(infoDate), Some(infoDate)) assert(df1.count() == 3) - assert(b.getDataChunks("table1", infoDate, infoDate).nonEmpty) + assert(b.getDataChunks("table1", infoDate, None).nonEmpty, None) } } @@ -262,7 +262,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val df1 = m.getTable("table1", Some(infoDate), Some(infoDate)) assert(df1.count() == 3) - assert(b.getDataChunks("table1", infoDate, infoDate).isEmpty) + assert(b.getDataChunks("table1", infoDate, None).isEmpty) } } } @@ -422,14 +422,14 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF m.saveTable("table1", infoDate, getDf) val reader = m.getMetastoreReader("table1" :: Nil, "output_table", infoDate, TaskRunReason.New, ReaderMode.Batch) - val runInfo1 = reader.getTableRunInfo("table1", infoDate) - val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1)) + val runInfo1 = reader.getTableRunInfo("table1", infoDate, None) + val runInfo2 = reader.getTableRunInfo("table1", infoDate.plusDays(1), None) - assert(runInfo1.isDefined) + assert(runInfo1.nonEmpty) assert(runInfo2.isEmpty) - assert(runInfo1.get.tableName == "table1") - assert(runInfo1.get.infoDate == infoDate) + assert(runInfo1.head.tableName == "table1") + assert(runInfo1.head.infoDate == infoDate) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala index 1619a7662..7f51e1e50 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/DataChunkFactory.scala @@ -23,8 +23,10 @@ object DataChunkFactory { infoDate: String = "2022-01-15", inputRecordCount: Long = 1000, outputRecordCount: Long = 1000, + recordsAppended: Option[Long] = None, jobStarted: Long = 10000, - jobFinished: Long = 20000): DataChunk = { + jobFinished: Long = 20000, + batchId: Option[Long] = Some(123L)): DataChunk = { DataChunk(tableName = tableName, infoDate = infoDate, infoDateBegin = infoDate, @@ -32,6 +34,8 @@ object DataChunkFactory { inputRecordCount = inputRecordCount, outputRecordCount = outputRecordCount, jobStarted = jobStarted, - jobFinished = jobFinished) + jobFinished = jobFinished, + batchId = batchId, + appendedRecordCount = recordsAppended) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala index 4712b22e6..206e2f723 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineInfoFactory.scala @@ -34,7 +34,8 @@ object PipelineInfoFactory { pipelineNotificationFailures: Seq[PipelineNotificationFailure] = Seq.empty, pipelineId: String = "dummy_pipeline_id", tenant: Option[String] = Some("Dummy tenant"), - country: Option[String] = Some("noname")): PipelineInfo = { - PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, warningFlag, sparkApplicationId, pipelineStatus, failureException, pipelineNotificationFailures, pipelineId, tenant, country) + country: Option[String] = Some("noname"), + batchId: Long = 123L): PipelineInfo = { + PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, warningFlag, sparkApplicationId, pipelineStatus, failureException, pipelineNotificationFailures, pipelineId, tenant, country, batchId) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala index 35b83cbea..cfbb8ba61 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala @@ -43,7 +43,8 @@ object TaskCompletedFactory { pipelineName: Option[String] = Some("test"), environmentName: Option[String] = Some("DEV"), tenant: Option[String] = Some("Dummy tenant"), - country: Option[String] = Some("noname")): TaskCompleted = { + country: Option[String] = Some("noname"), + batchId: Long = 123L): TaskCompleted = { model.TaskCompleted( jobName, tableName, @@ -65,7 +66,8 @@ object TaskCompletedFactory { pipelineName, environmentName, tenant, - country + country, + batchId ) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala index d18557a7e..6a6eb3a8b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/bookkeeper/SyncBookkeeperMock.scala @@ -24,7 +24,7 @@ import java.time.LocalDate import scala.collection.mutable import scala.util.Try -class SyncBookkeeperMock extends Bookkeeper { +class SyncBookkeeperMock(batchId: Long = 123L) extends Bookkeeper { private val chunks = new mutable.HashMap[(String, LocalDate), DataChunk]() private val schemas = new mutable.ListBuffer[(String, (LocalDate, TableSchema))]() @@ -53,15 +53,14 @@ class SyncBookkeeperMock extends Bookkeeper { } override def getLatestDataChunk(table: String, infoDate: LocalDate): Option[DataChunk] = { - getDataChunks(table, infoDate, infoDate).lastOption + getDataChunks(table, infoDate, None).lastOption } - def getDataChunks(table: String, dateBegin: LocalDate, dateEnd: LocalDate): Seq[DataChunk] = { - chunks.toList.flatMap { case ((tblName, infoDate), chunk) => - val isInsidePeriod = tblName == table && (infoDate.isAfter(dateBegin) || infoDate.equals(dateBegin)) && - (infoDate.isBefore(dateEnd) || infoDate.equals(dateEnd)) + override def getDataChunks(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = { + chunks.toList.flatMap { case ((tblName, date), chunk) => + val isInsidePeriod = tblName == table && date.equals(infoDate) if (isInsidePeriod) { - Some(chunk) + if (batchId.forall(chunk.batchId.contains)) Some(chunk) else None } else { None } @@ -84,12 +83,13 @@ class SyncBookkeeperMock extends Bookkeeper { } private[pramen] override def setRecordCount(table: String, - infoDate: LocalDate, - inputRecordCount: Long, - outputRecordCount: Long, - jobStarted: Long, - jobFinished: Long, - isTableTransient: Boolean): Unit = { + infoDate: LocalDate, + inputRecordCount: Long, + outputRecordCount: Long, + recordsAppended: Option[Long], + jobStarted: Long, + jobFinished: Long, + isTableTransient: Boolean): Unit = { val dateStr = DataChunk.dateFormatter.format(infoDate) val chunk = DataChunk(table, @@ -99,7 +99,9 @@ class SyncBookkeeperMock extends Bookkeeper { inputRecordCount, outputRecordCount, jobStarted, - jobFinished) + jobFinished, + Option(batchId), + recordsAppended) chunks += (table, infoDate) -> chunk } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala index 920730875..7507afaff 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/dao/MongoDbSingleton.scala @@ -16,17 +16,27 @@ package za.co.absa.pramen.core.mocks.dao -import de.flapdoodle.embed.mongo.config.{MongodConfigBuilder, Net, RuntimeConfigBuilder} + +import de.flapdoodle.embed.mongo.commands.ServerAddress import de.flapdoodle.embed.mongo.distribution.Version -import de.flapdoodle.embed.mongo.{Command, MongodExecutable, MongodStarter} -import de.flapdoodle.embed.process.config.io.ProcessOutput -import de.flapdoodle.embed.process.runtime.Network -import org.slf4j.LoggerFactory +import de.flapdoodle.embed.mongo.transitions.{Mongod, RunningMongodProcess} +import de.flapdoodle.embed.process.io.{ProcessOutput, StreamProcessor} +import de.flapdoodle.reverse.transitions.Start +import de.flapdoodle.reverse.{StateID, TransitionWalker} +import org.slf4j.{Logger, LoggerFactory} object MongoDbSingleton { private val log = LoggerFactory.getLogger(this.getClass) - lazy val embeddedMongoDb: (Option[MongodExecutable], Int) = startEmbeddedMongoDb() + lazy val embeddedMongoDb: (Option[RunningMongodProcess], Int) = startEmbeddedMongoDb() + + final class Slf4jProcessor(logger: Logger, prefix: String) extends StreamProcessor { + override def process(block: String): Unit = { + if (block != null && block.nonEmpty) logger.info(s"$prefix$block") + } + + override def onProcessed(): Unit = () // no-op + } /** * Create and run a MongoDb instance. @@ -35,31 +45,35 @@ object MongoDbSingleton { * * @return A pair: a MongoDb executable object to be used to stop it and the port number the embedded MongoDB listens to. */ - private def startEmbeddedMongoDb(): (Option[MongodExecutable], Int) = { - val mongoPort: Int = Network.getFreeServerPort() - - // Do not print Embedded MongoDB logs - val runtimeConfig = new RuntimeConfigBuilder() - .defaultsWithLogger(Command.MongoD, log) - .processOutput(ProcessOutput.getDefaultInstanceSilent) - .build() + private def startEmbeddedMongoDb(): (Option[RunningMongodProcess], Int) = { + try { + val version: Version = Version.V8_2_2 + val mongod = Mongod.builder() + .processOutput( + Start.to(classOf[ProcessOutput]).initializedWith( + ProcessOutput.builder() + .output(new Slf4jProcessor(log, "[mongod-out] ")) + .error(new Slf4jProcessor(log, "[mongod-err] ")) + .commands(new Slf4jProcessor(log, "[mongod-cmd] ")) + .build() + ) + ) + .build() - val starter = MongodStarter.getInstance(runtimeConfig) + val executable: TransitionWalker.ReachedState[RunningMongodProcess] = + mongod.transitions(version) + .walker() + .initState(StateID.of(classOf[RunningMongodProcess])) - val mongodConfig = new MongodConfigBuilder() - .version(Version.Main.V4_0) - .net(new Net("localhost", mongoPort, Network.localhostIsIPv6())) - .build() + val addr: ServerAddress = executable.current().getServerAddress + val mongoPort: Int = addr.getPort - val executable = try { - val exec = starter.prepare(mongodConfig) - exec.start() - Some(exec) + (Option(executable.current()), mongoPort) } catch { - case _: Throwable => None + case ex: Throwable => + log.warn("Couldn't start embedded Mongodb. MongoDB tests will be skipped", ex) + (None, 0) } - - (executable, mongoPort) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala index d162134e1..06db156c6 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreReaderMock.scala @@ -65,7 +65,7 @@ class MetastoreReaderMock(tables: Seq[(String, DataFrame)], infoDate: LocalDate) } } - override def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] = None + override def getTableRunInfo(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[MetaTableRunInfo] = Seq.empty override def metadataManager: MetadataManager = metadata diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index ef392c011..6f1d6c533 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -164,7 +164,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), table.writeOptions) } - override def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo] = None + override def getTableRunInfo(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[MetaTableRunInfo] = Seq.empty override def getRunReason: TaskRunReason = TaskRunReason.New diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala index 4de5e7378..9285828bd 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala @@ -122,7 +122,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis "track already ran" in { val (bk, _, job) = getUseCase() - bk.setRecordCount("table1", infoDate, 4, 4, 123, 456, isTableTransient = false) + bk.setRecordCount("table1", infoDate, 4, 4, None, 123, 456, isTableTransient = false) val result = job.preRunCheckJob(infoDate, runReason, conf, Nil) @@ -133,7 +133,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis "some records, default minimum records" in { val (bk, _, job) = getUseCase() - bk.setRecordCount("table1", infoDate, 100, 100, 123, 456, isTableTransient = false) + bk.setRecordCount("table1", infoDate, 100, 100, None, 123, 456, isTableTransient = false) val result = job.preRunCheckJob(infoDate, runReason, conf, Nil) @@ -143,7 +143,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis "some records, custom minimum records" in { val (bk, _, job) = getUseCase(minRecords = Some(3)) - bk.setRecordCount("table1", infoDate, 100, 100, 123, 456, isTableTransient = false) + bk.setRecordCount("table1", infoDate, 100, 100, None, 123, 456, isTableTransient = false) val result = job.preRunCheckJob(infoDate, runReason, conf, Nil) @@ -199,7 +199,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis "needs update, some records, custom minimum records" in { val (bk, _, job) = getUseCase(minRecords = Some(5)) - bk.setRecordCount("table1", infoDate, 100, 100, 123, 456, isTableTransient = false) + bk.setRecordCount("table1", infoDate, 100, 100, None, 123, 456, isTableTransient = false) val result = job.preRunCheckJob(infoDate, runReason, conf, Nil) @@ -214,7 +214,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis val noDataInfoDate = infoDate.plusDays(1) - bk.setRecordCount("table1", noDataInfoDate, 4, 4, 123, 456, isTableTransient = false) + bk.setRecordCount("table1", noDataInfoDate, 4, 4, None, 123, 456, isTableTransient = false) val result = job.preRunCheckJob(noDataInfoDate, runReason, conf, Nil) @@ -226,7 +226,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis val noDataInfoDate = infoDate.plusDays(1) - bk.setRecordCount("table1", noDataInfoDate, 30, 30, 123, 456, isTableTransient = false) + bk.setRecordCount("table1", noDataInfoDate, 30, 30, None, 123, 456, isTableTransient = false) val result = job.preRunCheckJob(noDataInfoDate, runReason, conf, Nil) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala index 1192ffa9f..fe1bc91c3 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala @@ -193,9 +193,9 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix val outputTable = "table1->mysink" - assert(bk.getDataChunks(outputTable, infoDate, infoDate).nonEmpty) + assert(bk.getDataChunks(outputTable, infoDate, None).nonEmpty) - val chunk = bk.getDataChunks(outputTable, infoDate, infoDate).head + val chunk = bk.getDataChunks(outputTable, infoDate, None).head assert(chunk.inputRecordCount == 10) assert(chunk.outputRecordCount == 3) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala index f120d444a..98dcb4596 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala @@ -72,7 +72,7 @@ class TransferJobSuite extends AnyWordSpec with SparkTestBase with TextCompariso " the number of records do not match" in { val (job, bk) = getUseCase(numberOfRecords = 7) - bk.setRecordCount("table1->sink", infoDate, 3, 3, 10000, 1001, isTableTransient = false) + bk.setRecordCount("table1->sink", infoDate, 3, 3, None, 10000, 1001, isTableTransient = false) val actual = job.preRunCheckJob(infoDate, runReason, conf, Nil) @@ -82,7 +82,7 @@ class TransferJobSuite extends AnyWordSpec with SparkTestBase with TextCompariso "return AlreadyRan when the number of records didn't change" in { val (job, bk) = getUseCase(numberOfRecords = 7) - bk.setRecordCount("table1->sink", infoDate, 7, 3, 10000, 1001, isTableTransient = false) + bk.setRecordCount("table1->sink", infoDate, 7, 3, None, 10000, 1001, isTableTransient = false) val actual = job.preRunCheckJob(infoDate, runReason, conf, Nil) @@ -236,9 +236,9 @@ class TransferJobSuite extends AnyWordSpec with SparkTestBase with TextCompariso val outputTable = "table1->sink" - assert(bk.getDataChunks(outputTable, infoDate, infoDate).nonEmpty) + assert(bk.getDataChunks(outputTable, infoDate, None).nonEmpty) - val chunk = bk.getDataChunks(outputTable, infoDate, infoDate).head + val chunk = bk.getDataChunks(outputTable, infoDate, None).head assert(chunk.inputRecordCount == 10) assert(chunk.outputRecordCount == 3) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala index 5f2684a37..5060cda39 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala @@ -137,7 +137,7 @@ class SourceValidationSuite extends AnyWordSpec with BeforeAndAfterAll with Temp val state = new PipelineStateSpy - bookkeeper.setRecordCount("table_out", runDateIn.minusDays(1), 1, 1, 0, 0, isTableTransient = false) + bookkeeper.setRecordCount("table_out", runDateIn.minusDays(1), 1, 1, None, 0, 0, isTableTransient = false) val sourceTable = SourceTableFactory.getDummySourceTable() diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala index eaf6f502c..ec9f54a19 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperCommonSuite.scala @@ -16,16 +16,17 @@ package za.co.absa.pramen.core.tests.bookkeeper +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperDeltaTable} +import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperDeltaTable, BookkeeperText} import za.co.absa.pramen.core.model.DataChunk import java.time.LocalDate +import scala.util.Try class BookkeeperCommonSuite extends AnyWordSpec { - - def testBookKeeper(getBookkeeper: () => Bookkeeper): Unit = { + def testBookKeeper(getBookkeeper: Long => Bookkeeper): Unit = { val infoDate1 = LocalDate.of(2020, 8, 11) val infoDate2 = LocalDate.of(2020, 8, 12) val infoDate3 = LocalDate.of(2020, 8, 13) @@ -42,15 +43,15 @@ class BookkeeperCommonSuite extends AnyWordSpec { "getLatestProcessedDate()" should { "return None if there are no entries" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) assert(bk.getLatestProcessedDate("table").isEmpty) } "return a date when there is an entry" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) - bk.setRecordCount("table", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = false) val dateOpt = bk.getLatestProcessedDate("table") @@ -59,9 +60,9 @@ class BookkeeperCommonSuite extends AnyWordSpec { } "return None if the passed date is too old" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) - bk.setRecordCount("table", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = false) val dateOpt = bk.getLatestProcessedDate("table", Some(infoDate1)) @@ -69,9 +70,9 @@ class BookkeeperCommonSuite extends AnyWordSpec { } "return a date when there is an entry and until date is passed" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) - bk.setRecordCount("table", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = false) val dateOpt = bk.getLatestProcessedDate("table", Some(infoDate2)) @@ -80,11 +81,11 @@ class BookkeeperCommonSuite extends AnyWordSpec { } "return the latest date when there are several dates" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) - bk.setRecordCount("table", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = false) - bk.setRecordCount("table", infoDate3, 200, 20, 1597318830, 1597318835, isTableTransient = false) - bk.setRecordCount("table", infoDate1, 400, 40, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate3, 200, 20, None, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 400, 40, None, 1597318830, 1597318835, isTableTransient = false) val dateOpt = bk.getLatestProcessedDate("table") @@ -95,17 +96,17 @@ class BookkeeperCommonSuite extends AnyWordSpec { "getLatestDataChunk()" should { "return None if there are no entries" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) assert(bk.getLatestDataChunk("table", infoDate1).isEmpty) } "return the latest date from the specified periods" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) - bk.setRecordCount("table", infoDate2, 100, 10, 1597318831, 1597318835, isTableTransient = false) - bk.setRecordCount("table", infoDate3, 200, 20, 1597318832, 1597318836, isTableTransient = false) - bk.setRecordCount("table", infoDate1, 400, 40, 1597318833, 1597318837, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318831, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate3, 200, 20, None, 1597318832, 1597318836, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 400, 40, None, 1597318833, 1597318837, isTableTransient = false) val chunkOpt = bk.getLatestDataChunk("table", infoDate3) val infoDate3Str = infoDate3.format(DataChunk.dateFormatter) @@ -124,17 +125,17 @@ class BookkeeperCommonSuite extends AnyWordSpec { "getDataChunksCount()" should { "return 0 if there are no entries" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) assert(bk.getDataChunksCount("table", Option(infoDate1), Option(infoDate1)) == 0) } "return the number of entries if there are entries" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) - bk.setRecordCount("table", infoDate1, 400, 40, 1597318833, 1597318837, isTableTransient = false) - bk.setRecordCount("table", infoDate2, 100, 10, 1597318831, 1597318835, isTableTransient = false) - bk.setRecordCount("table", infoDate3, 200, 20, 1597318832, 1597318836, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 400, 40, None, 1597318833, 1597318837, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318831, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate3, 200, 20, None, 1597318832, 1597318836, isTableTransient = false) val chunksCount = bk.getDataChunksCount("table", Option(infoDate1), Option(infoDate2)) @@ -143,25 +144,70 @@ class BookkeeperCommonSuite extends AnyWordSpec { } "setRecordCount()" should { - "overwrite the previous entry" in { - val bk = getBookkeeper() + "add a new entry for the same batch" in { + val bk = getBookkeeper(123L) - bk.setRecordCount("table", infoDate1, 100, 10, 1597318833, 1597318837, isTableTransient = false) - bk.setRecordCount("table", infoDate1, 200, 20, 1597318838, 1597318839, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 100, 10, None, 1597318833, 1597318837, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 200, 20, None, 1597318838, 1597318839, isTableTransient = false) val latestChunkOpt = bk.getLatestDataChunk("table", infoDate1) + val chunks = bk.getDataChunks("table", infoDate1, None) + + assert(latestChunkOpt.isDefined) + assert(chunks.length == 2) + + assert(latestChunkOpt.get.infoDate == "2020-08-11") + assert(latestChunkOpt.get.jobFinished == 1597318839) + } + + "replace the entry for a different batch" in { + val bk1 = getBookkeeper(123L) + val bk2 = getBookkeeper(456L) + + // The feature is not implemented in these implementations + assume(!bk1.isInstanceOf[BookkeeperText]) + + bk1.setRecordCount("table", infoDate1, 100, 10, None, 1597318833, 1597318837, isTableTransient = false) + bk2.setRecordCount("table", infoDate1, 200, 20, None, 1597318838, 1597318839, isTableTransient = false) + + val latestChunkOpt = bk2.getLatestDataChunk("table", infoDate1) + val chunks = bk2.getDataChunks("table", infoDate1, None) assert(latestChunkOpt.isDefined) + assert(chunks.length == 1) assert(latestChunkOpt.get.infoDate == "2020-08-11") assert(latestChunkOpt.get.jobFinished == 1597318839) } + "add the entry for a different batch non overwrite" in { + val bk1 = getBookkeeper(123L) + val bk2 = getBookkeeper(456L) + + val sparkVersion = Try { + SparkSession.active.version + }.toOption.getOrElse("") + + // The feature is not supported with Delta Table implementation and Spark 2.x + assume(!sparkVersion.startsWith("2.") || !bk1.isInstanceOf[BookkeeperDeltaTable]) + + bk1.setRecordCount("table", infoDate1, 100, 10, Some(1), 1597318833, 1597318837, isTableTransient = false) + bk2.setRecordCount("table", infoDate1, 200, 20, Some(1), 1597318838, 1597318839, isTableTransient = false) + + val latestChunkOpt = bk2.getLatestDataChunk("table", infoDate1) + val chunks = bk2.getDataChunks("table", infoDate1, None) + + assert(latestChunkOpt.isDefined) + assert(chunks.length == 2) + + assert(latestChunkOpt.get.infoDate == "2020-08-11") + assert(latestChunkOpt.get.jobFinished == 1597318839) + } } "saveSchema" should { "overwrite a schema entry" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) bk.saveSchema("table", infoDate2, schema1) Thread.sleep(10) @@ -176,7 +222,7 @@ class BookkeeperCommonSuite extends AnyWordSpec { "getLatestSchema" should { "return the latest schema" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) bk.saveSchema("table", infoDate1, schema1) bk.saveSchema("table", infoDate2, schema2) @@ -195,7 +241,7 @@ class BookkeeperCommonSuite extends AnyWordSpec { } "return None is schema is not available" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) bk.saveSchema("table", infoDate2, schema1) @@ -209,20 +255,20 @@ class BookkeeperCommonSuite extends AnyWordSpec { "Multiple bookkeepers" should { "share the state" in { - val bk = getBookkeeper() + val bk = getBookkeeper(123L) // A workaround for BookkeeperDeltaTable which outputs to different tables in unit tests - val bk2 = if (bk.isInstanceOf[BookkeeperDeltaTable]) bk else getBookkeeper() - val bk3 = if (bk.isInstanceOf[BookkeeperDeltaTable]) bk else getBookkeeper() + val bk2 = if (bk.isInstanceOf[BookkeeperDeltaTable]) bk else getBookkeeper(123L) + val bk3 = if (bk.isInstanceOf[BookkeeperDeltaTable]) bk else getBookkeeper(123L) - bk.setRecordCount("table", infoDate1, 100, 10, 1597318833, 1597318840, isTableTransient = false) - bk.setRecordCount("table", infoDate1, 200, 20, 1597318838, 1597318841, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 100, 10, None, 1597318833, 1597318840, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 200, 20, None, 1597318838, 1597318841, isTableTransient = false) - bk2.setRecordCount("table", infoDate2, 101, 10, 1597318833, 1597318843, isTableTransient = false) - bk2.setRecordCount("table", infoDate3, 201, 20, 1597318838, 1597318844, isTableTransient = false) + bk2.setRecordCount("table", infoDate2, 101, 10, None, 1597318833, 1597318843, isTableTransient = false) + bk2.setRecordCount("table", infoDate3, 201, 20, None, 1597318838, 1597318844, isTableTransient = false) - bk3.setRecordCount("table", infoDate3, 102, 10, 1597318833, 1597318842, isTableTransient = false) - bk3.setRecordCount("table", infoDate2, 202, 20, 1597318838, 1597318843, isTableTransient = false) + bk3.setRecordCount("table", infoDate3, 102, 10, None, 1597318833, 1597318842, isTableTransient = false) + bk3.setRecordCount("table", infoDate2, 202, 20, None, 1597318838, 1597318843, isTableTransient = false) val chunk1 = bk.getLatestDataChunk("table", infoDate3) val chunk2 = bk2.getLatestDataChunk("table", infoDate3) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala index f7cb07561..d80082267 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala @@ -38,20 +38,20 @@ class BookkeeperDeltaPathLongSuite extends BookkeeperCommonSuite with SparkTestB deleteDir(tmpDir) } - def getBookkeeper: BookkeeperDeltaPath = { - new BookkeeperDeltaPath(tmpDir) + def getBookkeeper(batchId: Long): BookkeeperDeltaPath = { + new BookkeeperDeltaPath(tmpDir, batchId) } "BookkeeperHadoopDeltaPath" when { "initialized" should { "Initialize bookkeeping directory" in { - getBookkeeper + getBookkeeper(123L) assert(fsUtils.exists(new Path(tmpDir, s"$bookkeepingRootPath/$recordsDirName"))) assert(fsUtils.exists(new Path(tmpDir, locksDirName))) } } - testBookKeeper(() => getBookkeeper) + testBookKeeper(batchId => getBookkeeper(batchId)) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala index 66de740be..0d0027bc8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala @@ -16,14 +16,16 @@ package za.co.absa.pramen.core.tests.bookkeeper -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.bookkeeper.BookkeeperDeltaTable import java.io.File import scala.util.Random -class BookkeeperDeltaTableLongSuite extends BookkeeperCommonSuite with SparkTestBase with BeforeAndAfterAll { +class BookkeeperDeltaTableLongSuite extends BookkeeperCommonSuite with SparkTestBase with BeforeAndAfter with BeforeAndAfterAll { + val bookkeepingTablePrefix: String = getNewTablePrefix + override protected def beforeAll(): Unit = { super.beforeAll() cleanUpWarehouse() @@ -34,24 +36,48 @@ class BookkeeperDeltaTableLongSuite extends BookkeeperCommonSuite with SparkTest super.afterAll() } - def getBookkeeper(prefix: String): BookkeeperDeltaTable = { - new BookkeeperDeltaTable(None, prefix) + before { + if (!spark.version.startsWith("2.")) { + val fullRecordsTableName = BookkeeperDeltaTable.getFullTableName(None, bookkeepingTablePrefix, BookkeeperDeltaTable.recordsTable) + val fullSchemasTableName = BookkeeperDeltaTable.getFullTableName(None, bookkeepingTablePrefix, BookkeeperDeltaTable.schemasTable) + + if (spark.catalog.tableExists(fullRecordsTableName)) { + spark.sql(s"DELETE FROM $fullRecordsTableName WHERE true") + } + + if (spark.catalog.tableExists(fullSchemasTableName)) { + spark.sql(s"DELETE FROM $fullSchemasTableName WHERE true") + } + } + } + + def getBookkeeper(prefix: String, batchId: Long): BookkeeperDeltaTable = { + new BookkeeperDeltaTable(None, prefix, batchId) } "BookkeeperHadoopDeltaTable" when { - testBookKeeper { () => - val rndInt = Math.abs(Random.nextInt()) - getBookkeeper(s"tbl${rndInt}_") + testBookKeeper { batchId => + if (spark.version.startsWith("2.")) { + getBookkeeper(getNewTablePrefix, batchId) + } else { + getBookkeeper(bookkeepingTablePrefix, batchId) + } } "test tables are created properly" in { - getBookkeeper(s"tbl0000_") + val prefix = getNewTablePrefix + getBookkeeper(prefix, 123L) - assert(spark.catalog.tableExists("tbl0000_bookkeeping")) - assert(spark.catalog.tableExists("tbl0000_schemas")) + assert(spark.catalog.tableExists(s"${prefix}bookkeeping")) + assert(spark.catalog.tableExists(s"${prefix}schemas")) } } + private def getNewTablePrefix: String = { + val rndInt = Math.abs(Random.nextInt()) + s"tbl${rndInt}_" + } + private def cleanUpWarehouse(): Unit = { val warehouseDir = new File("spark-warehouse") if (warehouseDir.exists()) { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala index 010707fcf..fcbf63365 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperJdbcSuite.scala @@ -37,14 +37,14 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture super.afterAll() } - def getBookkeeper: Bookkeeper = { - new BookkeeperJdbc(pramenDb.slickDb, 0L) + def getBookkeeper(batchId: Long): Bookkeeper = { + new BookkeeperJdbc(pramenDb.slickDb, batchId) } "BookkeeperJdbc" when { "initialized" should { "Initialize an empty database" in { - getBookkeeper + getBookkeeper(0L) assert(getTables.exists(_.equalsIgnoreCase("bookkeeping"))) assert(getTables.exists(_.equalsIgnoreCase("schemas"))) @@ -53,6 +53,6 @@ class BookkeeperJdbcSuite extends BookkeeperCommonSuite with RelationalDbFixture } } - testBookKeeper(() => getBookkeeper) + testBookKeeper(batchId => getBookkeeper(batchId)) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala index 2a88264b7..bf3c82054 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMemSuite.scala @@ -43,7 +43,7 @@ class BookkeeperMemSuite extends AnyWordSpec with BeforeAndAfter { } "return a date when there is an entry" in { - bk.setRecordCount("table", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = false) val dateOpt = bk.getLatestProcessedDate("table") @@ -52,9 +52,9 @@ class BookkeeperMemSuite extends AnyWordSpec with BeforeAndAfter { } "return the latest date when there are several dates" in { - bk.setRecordCount("table", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = false) - bk.setRecordCount("table", infoDate3, 200, 20, 1597318830, 1597318835, isTableTransient = false) - bk.setRecordCount("table", infoDate1, 400, 40, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate3, 200, 20, None, 1597318830, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 400, 40, None, 1597318830, 1597318835, isTableTransient = false) val dateOpt = bk.getLatestProcessedDate("table") @@ -69,9 +69,9 @@ class BookkeeperMemSuite extends AnyWordSpec with BeforeAndAfter { } "return the latest date from the specified periods" in { - bk.setRecordCount("table", infoDate2, 100, 10, 1597318831, 1597318835, isTableTransient = false) - bk.setRecordCount("table", infoDate3, 200, 20, 1597318832, 1597318836, isTableTransient = false) - bk.setRecordCount("table", infoDate1, 400, 40, 1597318833, 1597318837, isTableTransient = false) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318831, 1597318835, isTableTransient = false) + bk.setRecordCount("table", infoDate3, 200, 20, None, 1597318832, 1597318836, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 400, 40, None, 1597318833, 1597318837, isTableTransient = false) val chunkOpt = bk.getLatestDataChunk("table", infoDate3) val infoDate3Str = infoDate3.format(DataChunk.dateFormatter) @@ -90,10 +90,10 @@ class BookkeeperMemSuite extends AnyWordSpec with BeforeAndAfter { "setRecordCount()" should { "overwrite the previous entry" in { - bk.setRecordCount("table", infoDate1, 100, 10, 1597318833, 1597318837, isTableTransient = false) - bk.setRecordCount("table", infoDate1, 200, 20, 1597318838, 1597318839, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 100, 10, None, 1597318833, 1597318837, isTableTransient = false) + bk.setRecordCount("table", infoDate1, 200, 20, None, 1597318838, 1597318839, isTableTransient = false) - val chunks = bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table", infoDate1, infoDate1) + val chunks = bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table", infoDate1, None) assert(chunks.size == 1) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala index 8561ce1e0..036511fd8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperMongoDbLongSuite.scala @@ -36,15 +36,15 @@ class BookkeeperMongoDbLongSuite extends BookkeeperCommonSuite with MongoDbFixtu } } - def getBookkeeper: Bookkeeper = { - new BookkeeperMongoDb(connection) + def getBookkeeper(batchId: Long): Bookkeeper = { + new BookkeeperMongoDb(connection, batchId) } - if (db != null) { + if (mongoDbExecutable.nonEmpty) { "BookkeeperMongoDb" when { "initialized" should { "Initialize an empty database" in { - getBookkeeper + getBookkeeper(123L) assert(db.doesCollectionExists(collectionName)) @@ -53,7 +53,7 @@ class BookkeeperMongoDbLongSuite extends BookkeeperCommonSuite with MongoDbFixtu } } - testBookKeeper(() => getBookkeeper) + testBookKeeper(batchId => getBookkeeper(batchId)) } } else { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperNullSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperNullSuite.scala index aac61b580..9518feb5b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperNullSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperNullSuite.scala @@ -69,9 +69,9 @@ class BookkeeperNullSuite extends AnyWordSpec { def getBookkeeper: BookkeeperBase = { val bk = new BookkeeperNull - bk.saveRecordCountToStorage("table", infoDate2, 100, 10, 1597318830, 1597318835) - bk.saveRecordCountToStorage("table", infoDate3, 200, 20, 1597318830, 1597318835) - bk.saveRecordCountToStorage("table", infoDate1, 400, 40, 1597318830, 1597318835) + bk.saveRecordCountToStorage("table", infoDate2, 100, 10, None, 1597318830, 1597318835) + bk.saveRecordCountToStorage("table", infoDate3, 200, 20, None, 1597318830, 1597318835) + bk.saveRecordCountToStorage("table", infoDate1, 400, 40, None, 1597318830, 1597318835) bk } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala index a28293f2c..41adb3486 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperSuite.scala @@ -80,7 +80,7 @@ class BookkeeperSuite extends AnyWordSpec closable.close() } - if (db != null) { + if (mongoDbExecutable.nonEmpty) { "build bookkeeper, token lock, journal, and closable object for MongoDB" in { val bookkeepingConfig = BookkeepingConfigFactory.getDummyBookkeepingConfig( bookkeepingEnabled = true, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala index fa9991fe7..6722c6f51 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTextLongSuite.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.tests.bookkeeper import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfter import za.co.absa.pramen.core.base.SparkTestBase -import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, BookkeeperText} +import za.co.absa.pramen.core.bookkeeper.BookkeeperText import za.co.absa.pramen.core.fixtures.TempDirFixture import za.co.absa.pramen.core.utils.FsUtils @@ -41,52 +41,68 @@ class BookkeeperTextLongSuite extends BookkeeperCommonSuite with SparkTestBase w deleteDir(tmpDir) } - def getBookkeeper: BookkeeperText = { - new BookkeeperText(tmpDir) + def getBookkeeper(batchId: Long): BookkeeperText = { + new BookkeeperText(tmpDir, batchId) } "BookkeeperHadoopText" when { "initialized" should { "Initialize bookkeeping directory" in { - getBookkeeper + getBookkeeper(123L) assert(fsUtils.exists(new Path(tmpDir, s"$bookkeepingRootPath/$recordsDirName"))) assert(fsUtils.exists(new Path(tmpDir, locksDirName))) } } - testBookKeeper(() => getBookkeeper) + testBookKeeper(batchId => getBookkeeper(batchId)) } "getFilter" should { "get a ranged filter" in { - val bk = getBookkeeper + val bk = getBookkeeper(123L) - val actual = bk.getFilter("table1", Some(LocalDate.of(2021, 1, 1)), Some(LocalDate.of(2021, 1, 2))).toString() + val actual = bk.getFilter("table1", Some(LocalDate.of(2021, 1, 1)), Some(LocalDate.of(2021, 1, 2)), None).toString() assert(actual == "(((tableName = table1) AND (infoDate >= 2021-01-01)) AND (infoDate <= 2021-01-02))") } + "get a ranged filter with batch id" in { + val bk = getBookkeeper(123L) + + val actual = bk.getFilter("table1", Some(LocalDate.of(2021, 1, 1)), Some(LocalDate.of(2021, 1, 2)), Some(123L)).toString() + + assert(actual == "((((tableName = table1) AND (infoDate >= 2021-01-01)) AND (infoDate <= 2021-01-02)) AND (batchId = 123))") + } + "get a from filter" in { - val bk = getBookkeeper + val bk = getBookkeeper(123L) - val actual = bk.getFilter("table1", Some(LocalDate.of(2021, 1, 1)), None).toString() + val actual = bk.getFilter("table1", Some(LocalDate.of(2021, 1, 1)), None, None).toString() assert(actual == "((tableName = table1) AND (infoDate >= 2021-01-01))") } "get a to filter" in { - val bk = getBookkeeper + val bk = getBookkeeper(123L) - val actual = bk.getFilter("table1", None, Some(LocalDate.of(2021, 1, 2))).toString() + val actual = bk.getFilter("table1", None, Some(LocalDate.of(2021, 1, 2)), None).toString() assert(actual == "((tableName = table1) AND (infoDate <= 2021-01-02))") } + "get a batchid filter" in { + val bk = getBookkeeper(123L) + + val actual = bk.getFilter("table1", None, None, Some(123L)).toString() + + assert(actual == "((tableName = table1) AND (batchId = 123))") + } + "get a table filter" in { - val bk = getBookkeeper + val bk = getBookkeeper(123L) - val actual = bk.getFilter("table1", None, None).toString() + val actual = bk.getFilter("table1", None, None, None).toString() assert(actual == "(tableName = table1)") } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scala index 441e4bfb8..e79154777 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperTransientSuiteEager.scala @@ -69,9 +69,9 @@ class BookkeeperTransientSuiteEager extends AnyWordSpec { "setRecordCount()" should { "return the newly added record" in { val bk = getBookkeeper - bk.setRecordCount("table1", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = true) + bk.setRecordCount("table1", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = true) - val chunks = bk.getTransientDataChunks("table1", Option(infoDate2), Option(infoDate2)) + val chunks = bk.getTransientDataChunks("table1", Option(infoDate2), Option(infoDate2), None) assert(chunks.length == 1) assert(chunks.head.infoDate == infoDate2.toString) @@ -80,9 +80,9 @@ class BookkeeperTransientSuiteEager extends AnyWordSpec { def getBookkeeper: BookkeeperBase = { val bk = new BookkeeperNull - bk.setRecordCount("table", infoDate2, 100, 10, 1597318830, 1597318835, isTableTransient = true) - bk.setRecordCount("table", infoDate3, 200, 20, 1597318830, 1597318836, isTableTransient = true) - bk.setRecordCount("table", infoDate1, 400, 40, 1597318830, 1597318837, isTableTransient = true) + bk.setRecordCount("table", infoDate2, 100, 10, None, 1597318830, 1597318835, isTableTransient = true) + bk.setRecordCount("table", infoDate3, 200, 20, None, 1597318830, 1597318836, isTableTransient = true) + bk.setRecordCount("table", infoDate1, 400, 40, None, 1597318830, 1597318837, isTableTransient = true) bk } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala index 378bcb37e..d3e0825b1 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalMongoDbSuite.scala @@ -37,7 +37,7 @@ class JournalMongoDbSuite extends AnyWordSpec with MongoDbFixture with BeforeAnd } } - if (db != null) { + if (mongoDbExecutable.nonEmpty) { "Journal/MongoDB" should { "Initialize an empty database" in { db.doesCollectionExists("collectionName") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala index 3dbc562c3..1871b6292 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala @@ -30,7 +30,7 @@ object TestCases { val instant2: Instant = Instant.ofEpochSecond(1597318835) val instant3: Instant = Instant.ofEpochSecond(1597318839) - val task1: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate1, infoDate1, infoDate1, Some(100), Some(0), Some(100), None, None, None, 597318830, 1597318830, "New", Some("Test1"), Some("abc123"), Some("p_id_1"), Some("p_1"), Some("TEST"), Some("T1"), Some("C1")) - val task2: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate2, infoDate2, infoDate2, Some(100), Some(0), Some(100), None, None, None, 1597318835, 1597318835, "Late", Some("Test2"), Some("abc123"), Some("p_id_2"), Some("p_2"), Some("TEST"), Some("T2"), Some("C2")) - val task3: TaskCompleted = model.TaskCompleted("job2", "table2", infoDate3, infoDate3, infoDate3, Some(100), Some(0), Some(100), None, None, None, 1597318839, 1597318839, "Fail", Some("Test3"), Some("abc123"), Some("p_id_3"), Some("p_2"), Some("TEST"), Some("T2"), Some("C3")) + val task1: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate1, infoDate1, infoDate1, Some(100), Some(0), Some(100), None, None, None, 597318830, 1597318830, "New", Some("Test1"), Some("abc123"), Some("p_id_1"), Some("p_1"), Some("TEST"), Some("T1"), Some("C1"), 123L) + val task2: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate2, infoDate2, infoDate2, Some(100), Some(0), Some(100), None, None, None, 1597318835, 1597318835, "Late", Some("Test2"), Some("abc123"), Some("p_id_2"), Some("p_2"), Some("TEST"), Some("T2"), Some("C2"), 123L) + val task3: TaskCompleted = model.TaskCompleted("job2", "table2", infoDate3, infoDate3, infoDate3, Some(100), Some(0), Some(100), None, None, None, 1597318839, 1597318839, "Fail", Some("Test3"), Some("abc123"), Some("p_id_3"), Some("p_2"), Some("TEST"), Some("T2"), Some("C3"), 123L) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockMongoDbSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockMongoDbSuite.scala index 579802b65..c974eb6d0 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockMongoDbSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockMongoDbSuite.scala @@ -22,8 +22,8 @@ import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.core.fixtures.MongoDbFixture import za.co.absa.pramen.core.lock.TokenLockMongoDb import za.co.absa.pramen.core.lock.TokenLockMongoDb.collectionName + import scala.concurrent.duration._ -import org.scalatest.concurrent.Eventually._ class TokenLockMongoDbSuite extends AnyWordSpec with MongoDbFixture with BeforeAndAfter { before { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala index e336082f9..40c9922af 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala @@ -175,7 +175,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis builder.addCompletedTask(TaskResultFactory.getDummyTaskResult( runStatus = TestPrototypes.runStatusWarning, schemaDifferences = SchemaDifferenceFactory.getDummySchemaDifference() :: Nil, - notificationTargetErrors = Seq(NotificationFailure("table1", "my_tagret", LocalDate.parse("2020-02-18"), new RuntimeException("Target 1 exception")))) + notificationTargetErrors = Seq(NotificationFailure("table1", "my_target", LocalDate.parse("2020-02-18"), new RuntimeException("Target 1 exception")))) ) val actual = builder.renderBody() diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala index d7c520828..ecee53c74 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala @@ -83,7 +83,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { assert(results.head.runStatus.isInstanceOf[Failed]) assert(results.head.runInfo.get.infoDate == runDate) - assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", runDate, runDate).isEmpty) + assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", runDate, None).isEmpty) } "handle a successful multiple task job sequential execution" in { @@ -109,7 +109,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { assert(results.head.runStatus.isInstanceOf[Failed]) assert(results.head.runInfo.get.infoDate == runDate) - assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", runDate, runDate).isEmpty) + assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", runDate, None).isEmpty) } "run job even if it is asking for more resources than maximum available" in { @@ -188,7 +188,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { val state = new PipelineStateSpy - bookkeeper.setRecordCount("table_out", runDate.minusDays(1), 1, 1, 0, 0, isTableTransient = false) + bookkeeper.setRecordCount("table_out", runDate.minusDays(1), 1, 1, None, 0, 0, isTableTransient = false) val stats = MetaTableStats(Some(2), None, Some(100)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala index 4aaaab653..b867778bd 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala @@ -498,7 +498,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val failure = result.runStatus.asInstanceOf[Failed] assert(failure.ex.getMessage == "TestException") - assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", infoDate, infoDate).isEmpty) + assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", infoDate, None).isEmpty) } "handle a dry run" in { @@ -514,7 +514,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar assert(success.recordCount.contains(2)) assert(success.sizeBytes.isEmpty) - assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", infoDate, infoDate).isEmpty) + assert(bk.asInstanceOf[SyncBookkeeperMock].getDataChunks("table_out", infoDate, None).isEmpty) } "expose Hive table" in { diff --git a/pramen/extras/pom.xml b/pramen/extras/pom.xml index a2b26c464..c5508f36f 100644 --- a/pramen/extras/pom.xml +++ b/pramen/extras/pom.xml @@ -27,7 +27,7 @@ za.co.absa.pramen pramen - 1.12.14-SNAPSHOT + 1.13.0-SNAPSHOT diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala index f9aadea83..1809bf209 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala @@ -62,7 +62,7 @@ class EcsPipelineNotificationTargetSuite extends AnyWordSpec { val task3 = TestPrototypes.taskNotification.copy(taskDef = taskDef3) notificationTarget.sendNotification( - PipelineInfo("Dummy", "DEV", RuntimeInfo(LocalDate.parse("2022-02-18")), Instant.now, None, warningFlag = false, None, PipelineStatus.Success, None, Seq.empty, "pid_123", None, None), + PipelineInfo("Dummy", "DEV", RuntimeInfo(LocalDate.parse("2022-02-18")), Instant.now, None, warningFlag = false, None, PipelineStatus.Success, None, Seq.empty, "pid_123", None, None, 123L), Seq(task1, task2, task3), CustomNotification(Seq.empty, Seq.empty) ) diff --git a/pramen/pom.xml b/pramen/pom.xml index 7af348b32..9ddf16640 100644 --- a/pramen/pom.xml +++ b/pramen/pom.xml @@ -22,7 +22,7 @@ za.co.absa.pramen pramen - 1.12.14-SNAPSHOT + 1.13.0-SNAPSHOT pom Pramen @@ -383,7 +383,7 @@ de.flapdoodle.embed de.flapdoodle.embed.mongo - 2.2.0 + 4.22.0 test diff --git a/pramen/project/Versions.scala b/pramen/project/Versions.scala index 1083aa9cb..4cac97ac3 100644 --- a/pramen/project/Versions.scala +++ b/pramen/project/Versions.scala @@ -31,7 +31,7 @@ object Versions { val channelVersion = "0.2.1" val requestsVersion = "0.8.0" val javaXMailVersion = "1.6.2" - val embeddedMongoDbVersion = "2.2.0" + val embeddedMongoDbVersion = "4.22.0" val scalaCompatColsVersion = "2.12.0" val scalatestVersion = "3.2.14" val mockitoVersion = "2.28.2"