Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pramen/api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<parent>
<groupId>za.co.absa.pramen</groupId>
<artifactId>pramen</artifactId>
<version>1.12.14-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import java.time.{Instant, LocalDate}

case class MetaTableRunInfo(tableName: String,
infoDate: LocalDate,
batchId: Long,
inputRecordCount: Long,
outputRecordCount: Long,
appendedRecordCount: Option[Long],
jobStarted: Instant,
jobFinished: Instant)
Comment on lines 21 to 28
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
echo "=== Searching for MetaTableRunInfo construction sites ==="
rg -n 'MetaTableRunInfo\s*\(' --type=scala -A3

echo -e "\n=== Searching for pattern matches on MetaTableRunInfo ==="
rg -n 'MetaTableRunInfo\s*\(' --type=scala | grep -E '(case|match|extract)'

echo -e "\n=== Searching for MetaTableRunInfo in the codebase ==="
rg -n 'MetaTableRunInfo' --type=scala

Repository: AbsaOSS/pramen

Length of output: 2571


🏁 Script executed:

#!/bin/bash
echo "=== Full MetaTableRunInfo construction in MetastoreReaderBase.scala ==="
sed -n '76,90p' pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreReaderBase.scala

echo -e "\n=== Case class definition in MetaTableRunInfo.scala ==="
sed -n '21,28p' pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala

echo -e "\n=== Search for other MetaTableRunInfo creations (broader search) ==="
rg -n 'new MetaTableRunInfo|MetaTableRunInfo\(' --type=scala -B2 -A8 | grep -v test | head -100

Repository: AbsaOSS/pramen

Length of output: 3638


🏁 Script executed:

#!/bin/bash
echo "=== Checking test files for MetaTableRunInfo construction ==="
find . -path ./target -prune -o -name "*Test.scala" -o -name "*Spec.scala" -type f | xargs grep -l "MetaTableRunInfo" 2>/dev/null

echo -e "\n=== Looking for MetaTableRunInfo construction in test files ==="
rg -n 'MetaTableRunInfo\(' --type=scala pramen/core/src/test | head -50

echo -e "\n=== Checking for deprecation or version info ==="
cat README.md 2>/dev/null | head -50

echo -e "\n=== Checking pom.xml or build.sbt for version ==="
find . -name "pom.xml" -o -name "build.sbt" | xargs head -30 2>/dev/null | head -80

Repository: AbsaOSS/pramen

Length of output: 6668


Breaking API change: case class constructor signature modified.

Adding batchId and appendedRecordCount fields to the MetaTableRunInfo case class is a breaking change for external consumers of this public API. While the internal construction site in MetastoreReaderBase.scala has been properly updated to include all parameters, external code constructing this type will fail.

For a public API release, ensure:

  1. A changelog entry documents this breaking change
  2. Migration guidance is provided for users (e.g., "if upgrading from X.Y, update MetaTableRunInfo construction calls to include the two new parameters")
  3. Consider a major version bump if not already planned
🤖 Prompt for AI Agents
In pramen/api/src/main/scala/za/co/absa/pramen/api/MetaTableRunInfo.scala around
lines 21 to 28, the case class constructor was changed by adding batchId and
appendedRecordCount which is a breaking API change; update the release to
document this and provide migration guidance: add a changelog entry that
describes the new fields and their types, include a short migration note showing
example construction before and after (e.g., indicate default or derived values
callers should supply), and if this is part of a public release coordinate a
major version bump (or clearly state compatibility implications) so downstream
users are informed.

Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ trait MetastoreReader {
*
* @param tableName The name of the table in the metastore.
* @param infoDate The information date of the data.
* @param batchId An optional batch ID to filter by.
* @return The run info of the table if available.
*/
def getTableRunInfo(tableName: String, infoDate: LocalDate): Option[MetaTableRunInfo]
def getTableRunInfo(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[MetaTableRunInfo]

/**
* Returns the reason of running the task. This helps transformers and sinks to determine logic based on whether
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ case class PipelineInfo(
pipelineNotificationFailures: Seq[PipelineNotificationFailure],
pipelineId: String,
tenant: Option[String],
country: Option[String]
country: Option[String],
batchId: Long
)
3 changes: 2 additions & 1 deletion pramen/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ ThisBuild / organization := "za.co.absa.pramen"
ThisBuild / scalaVersion := scala212
ThisBuild / crossScalaVersions := Seq(scala211, scala212, scala213)

ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation")
ThisBuild / scalacOptions := Seq("-unchecked", "-deprecation", "-target:jvm-1.8")
ThisBuild / javacOptions := Seq("-source", "1.8", "-target", "1.8")

ThisBuild / versionScheme := Some("early-semver")

Expand Down
2 changes: 1 addition & 1 deletion pramen/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<parent>
<groupId>za.co.absa.pramen</groupId>
<artifactId>pramen</artifactId>
<version>1.12.14-SNAPSHOT</version>
<version>1.13.0-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ trait Bookkeeper {

def getLatestDataChunk(table: String, infoDate: LocalDate): Option[DataChunk]

def getDataChunks(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk]

def getDataChunksCount(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long

def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)]
Expand All @@ -50,6 +52,7 @@ trait Bookkeeper {
infoDate: LocalDate,
inputRecordCount: Long,
outputRecordCount: Long,
appendedRecordCount: Option[Long],
jobStarted: Long,
jobFinished: Long,
isTableTransient: Boolean): Unit
Expand Down Expand Up @@ -115,23 +118,23 @@ object Bookkeeper {
mongoDbConnection match {
case Some(connection) =>
log.info(s"Using MongoDB for bookkeeping.")
new BookkeeperMongoDb(connection)
new BookkeeperMongoDb(connection, batchId)
case None =>
bookkeepingConfig.bookkeepingHadoopFormat match {
case HadoopFormat.Text =>
val path = bookkeepingConfig.bookkeepingLocation.get
log.info(s"Using Hadoop (CSV for records, JSON for schemas) for bookkeeping at $path")
new BookkeeperText(path)
new BookkeeperText(path, batchId)
case HadoopFormat.Delta =>
bookkeepingConfig.deltaTablePrefix match {
case Some(tablePrefix) =>
val fullTableName = BookkeeperDeltaTable.getFullTableName(bookkeepingConfig.deltaDatabase, tablePrefix, "*")
log.info(s"Using Delta Lake managed table '$fullTableName' for bookkeeping.")
new BookkeeperDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix)
new BookkeeperDeltaTable(bookkeepingConfig.deltaDatabase, tablePrefix, batchId)
case None =>
val path = bookkeepingConfig.bookkeepingLocation.get
log.info(s"Using Delta Lake for bookkeeping at $path")
new BookkeeperDeltaPath(path)
new BookkeeperDeltaPath(path, batchId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,48 @@ import za.co.absa.pramen.core.model.DataChunk
import java.time.LocalDate
import scala.collection.mutable

abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper {
abstract class BookkeeperBase(isBookkeepingEnabled: Boolean, batchId: Long) extends Bookkeeper {
private val transientDataChunks = new mutable.HashMap[String, Array[DataChunk]]()

def getLatestProcessedDateFromStorage(table: String, until: Option[LocalDate] = None): Option[LocalDate]

def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk]

def getDataChunksFromStorage(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk]

def getDataChunksCountFromStorage(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long

def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit

private[pramen] def saveRecordCountToStorage(table: String,
infoDate: LocalDate,
inputRecordCount: Long,
outputRecordCount: Long,
recordsAppended: Option[Long],
jobStarted: Long,
jobFinished: Long): Unit

private[pramen] final def setRecordCount(table: String,
infoDate: LocalDate,
inputRecordCount: Long,
outputRecordCount: Long,
recordsAppended: Option[Long],
jobStarted: Long,
jobFinished: Long,
isTableTransient: Boolean): Unit = {
if (isTableTransient || !isBookkeepingEnabled) {
val tableLowerCase = table.toLowerCase
val dataChunk = DataChunk(table, infoDate.toString, infoDate.toString, infoDate.toString, inputRecordCount, outputRecordCount, jobStarted, jobFinished)
val dataChunk = DataChunk(table, infoDate.toString, infoDate.toString, infoDate.toString, inputRecordCount, outputRecordCount, jobStarted, jobFinished, Option(batchId), recordsAppended)
this.synchronized {
val dataChunks = transientDataChunks.getOrElse(tableLowerCase, Array.empty[DataChunk])
val newDataChunks = (dataChunks :+ dataChunk).sortBy(_.jobFinished)
transientDataChunks += tableLowerCase -> newDataChunks
}
} else {
saveRecordCountToStorage(table, infoDate, inputRecordCount, outputRecordCount, jobStarted, jobFinished)
saveRecordCountToStorage(table, infoDate, inputRecordCount, outputRecordCount, recordsAppended, jobStarted, jobFinished)
if (recordsAppended.isEmpty) {
deleteNonCurrentBatchRecords(table, infoDate)
}
}
}

Expand Down Expand Up @@ -82,13 +91,25 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper
}
}

final def getDataChunks(table: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = {
val isTransient = this.synchronized {
transientDataChunks.contains(table.toLowerCase)
}

if (isTransient || !isBookkeepingEnabled) {
getTransientDataChunks(table, Option(infoDate), Option(infoDate), batchId)
} else {
getDataChunksFromStorage(table, infoDate, batchId)
}
}

final def getDataChunksCount(table: String, dateBeginOpt: Option[LocalDate], dateEndOpt: Option[LocalDate]): Long = {
val isTransient = this.synchronized {
transientDataChunks.contains(table.toLowerCase)
}

if (isTransient || !isBookkeepingEnabled) {
getTransientDataChunks(table, dateBeginOpt, dateEndOpt).length
getTransientDataChunks(table, dateBeginOpt, dateEndOpt, None).length
} else {
getDataChunksCountFromStorage(table, dateBeginOpt, dateEndOpt)
}
Expand All @@ -100,7 +121,7 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper
}

private def getLatestTransientDate(table: String, from: Option[LocalDate], until: Option[LocalDate]): Option[LocalDate] = {
val chunks = getTransientDataChunks(table, from, until)
val chunks = getTransientDataChunks(table, from, until, None)

if (chunks.isEmpty) {
None
Expand All @@ -112,20 +133,25 @@ abstract class BookkeeperBase(isBookkeepingEnabled: Boolean) extends Bookkeeper
}

private def getLatestTransientChunk(table: String, from: Option[LocalDate], until: Option[LocalDate]): Option[DataChunk] = {
getTransientDataChunks(table, from, until)
getTransientDataChunks(table, from, until, None)
.lastOption
}


private[core] def getTransientDataChunks(table: String, from: Option[LocalDate], until: Option[LocalDate]): Array[DataChunk] = {
private[core] def getTransientDataChunks(table: String, from: Option[LocalDate], until: Option[LocalDate], batchId: Option[Long]): Array[DataChunk] = {
val minDate = from.map(_.toString).getOrElse("0000-00-00")
val maxDate = until.map(_.toString).getOrElse("9999-99-99")
val tableLowerCase = table.toLowerCase
val allChunks = this.synchronized {
transientDataChunks.getOrElse(tableLowerCase, Array.empty[DataChunk])
}

allChunks.filter(chunk => chunk.infoDate >= minDate && chunk.infoDate <= maxDate)
batchId match {
case Some(id) =>
allChunks.filter(chunk => chunk.infoDate >= minDate && chunk.infoDate <= maxDate && chunk.batchId.contains(id))
case None =>
allChunks.filter(chunk => chunk.infoDate >= minDate && chunk.infoDate <= maxDate)
}
}

protected def getDateStr(date: LocalDate): String = DataChunk.dateFormatter.format(date)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.time.LocalDate
import scala.reflect.ClassTag
import scala.reflect.runtime.universe

abstract class BookkeeperDeltaBase extends BookkeeperHadoop {
abstract class BookkeeperDeltaBase(batchId: Long) extends BookkeeperHadoop(batchId) {

def getBkDf(filter: Column): Dataset[DataChunk]

Expand Down Expand Up @@ -60,30 +60,37 @@ abstract class BookkeeperDeltaBase extends BookkeeperHadoop {
}

final override def getLatestDataChunkFromStorage(table: String, infoDate: LocalDate): Option[DataChunk] = {
val infoDateFilter = getFilter(table, Option(infoDate), Option(infoDate))
val infoDateFilter = getFilter(table, Option(infoDate), Option(infoDate), None)

getBkData(infoDateFilter).lastOption
}

final override def getDataChunksFromStorage(tableName: String, infoDate: LocalDate, batchId: Option[Long]): Seq[DataChunk] = {
val infoDateFilter = getFilter(tableName, Option(infoDate), Option(infoDate), batchId)

getBkAllData(infoDateFilter)
}

final def getDataChunksCountFromStorage(table: String, dateBegin: Option[LocalDate], dateEnd: Option[LocalDate]): Long = {
getBkDf(getFilter(table, dateBegin, dateEnd)).count()
getBkDf(getFilter(table, dateBegin, dateEnd, None)).count()
}

final private[pramen] override def saveRecordCountToStorage(table: String,
infoDate: LocalDate,
inputRecordCount: Long,
outputRecordCount: Long,
jobStarted: Long,
jobFinished: Long): Unit = {
infoDate: LocalDate,
inputRecordCount: Long,
outputRecordCount: Long,
recordsAppended: Option[Long],
jobStarted: Long,
jobFinished: Long): Unit = {
val dateStr = getDateStr(infoDate)

val chunk = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished)
val chunk = DataChunk(table, dateStr, dateStr, dateStr, inputRecordCount, outputRecordCount, jobStarted, jobFinished, Option(batchId), recordsAppended)

saveRecordCountDelta(chunk)
}

final override def getLatestSchema(table: String, until: LocalDate): Option[(StructType, LocalDate)] = {
val filter = getFilter(table, None, Option(until))
val filter = getFilter(table, None, Option(until), None)

val df = getSchemasDeltaDf

Expand All @@ -103,6 +110,12 @@ abstract class BookkeeperDeltaBase extends BookkeeperHadoop {
saveSchemaDelta(tableSchema)
}

private[core] def getBkAllData(filter: Column): Seq[DataChunk] = {
getBkDf(filter)
.collect()
.sortBy(_.jobFinished)
}

private[core] def getBkData(filter: Column): Seq[DataChunk] = {
getBkDf(filter)
.collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package za.co.absa.pramen.core.bookkeeper

import io.delta.tables.DeltaTable
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, Dataset, SaveMode, SparkSession}
import za.co.absa.pramen.core.bookkeeper.model.TableSchemaJson
import za.co.absa.pramen.core.model.{DataChunk, TableSchema}
import za.co.absa.pramen.core.utils.FsUtils

import java.time.Instant
import java.time.{Instant, LocalDate}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe

Expand All @@ -34,7 +35,7 @@ object BookkeeperDeltaPath {
val locksDirName = "locks"
}

class BookkeeperDeltaPath(bookkeepingPath: String)(implicit spark: SparkSession) extends BookkeeperDeltaBase {
class BookkeeperDeltaPath(bookkeepingPath: String, batchId: Long)(implicit spark: SparkSession) extends BookkeeperDeltaBase(batchId) {
import BookkeeperDeltaPath._
import spark.implicits._

Expand Down Expand Up @@ -67,6 +68,14 @@ class BookkeeperDeltaPath(bookkeepingPath: String)(implicit spark: SparkSession)
.save(recordsPath.toUri.toString)
}

override def deleteNonCurrentBatchRecords(table: String, infoDate: LocalDate): Unit = {
val infoDateStr = DataChunk.dateFormatter.format(infoDate)
val filter = (col("tableName") === lit(table)) && (col("infoDate") === lit(infoDateStr)) && (col("batchId") =!= lit(batchId))

val deltaTable = DeltaTable.forPath(spark, recordsPath.toUri.toString)
deltaTable.delete(filter)
}

override def getSchemasDeltaDf: Dataset[TableSchemaJson] = {
spark
.read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ package za.co.absa.pramen.core.bookkeeper

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import za.co.absa.pramen.core.model.DataChunk

import java.time.LocalDate


abstract class BookkeeperHadoop extends BookkeeperBase(true) {
private[core] def getFilter(tableName: String, infoDateBegin: Option[LocalDate], infoDateEnd: Option[LocalDate]): Column = {
(infoDateBegin, infoDateEnd) match {
abstract class BookkeeperHadoop(batchId: Long) extends BookkeeperBase(true, batchId) {
private[core] def getFilter(tableName: String, infoDateBegin: Option[LocalDate], infoDateEnd: Option[LocalDate], batchId: Option[Long]): Column = {
val baseFilter = (infoDateBegin, infoDateEnd) match {
case (Some(begin), Some(end)) =>
val beginStr = getDateStr(begin)
val endStr = getDateStr(end)
Expand All @@ -39,5 +38,10 @@ abstract class BookkeeperHadoop extends BookkeeperBase(true) {
case (None, None) =>
col("tableName") === tableName
}

batchId match {
case Some(id) => baseFilter && col("batchId") === lit(id)
case None => baseFilter
}
}
}
Loading