Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,14 @@ pramen.sources = [

# [Optional] Set name for the struct field that contains Kafka record metadata
#custom.kafka.column = "kafka"

# [Optional] Set name for the Kafka key column
#key.column.name = "kafka_key"

# The Kafka key serializer when key.naming.strategy is NOT defined. Can be "none", "binary", "string".
# When key.naming.strategy IS defined in schema.registry, Avro deserialization is used automatically.
# Default is "binary".
#key.column.serializer = "none"

kafka {
bootstrap.servers = "mybroker1:9092,mybroker2:9092"
Expand All @@ -956,6 +962,7 @@ pramen.sources = [
schema.registry {
url = "https://my.schema.registry:8081"
value.naming.strategy = "topic.name"
#key.naming.strategy = "topic.name"

# Arbitrary options for Schema registry
basic.auth.credentials.source = "..."
Expand All @@ -982,7 +989,7 @@ pramen.operations = [

tables = [
{
input.table = "my_kafka_topic1"
input.topic = "my_kafka_topic1"
output.metastore.table = "table1"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@ object QueryBuilder {
val PATH_KEY = "path"
val TABLE_KEY = "table"
val DB_TABLE_KEY = "db.table" // Same as table - for backwards compatibility and config readability
val TOPIC_KEY = "topic" // Same as table, only for sources for which topic name is more aligned to the terminology

def fromConfig(conf: Config, prefix: String, parentPath: String): Query = {
val p = if (prefix.isEmpty) "" else s"$prefix."

val hasSql = conf.hasPath(s"$p$SQL_KEY")
val hasPath = conf.hasPath(s"$p$PATH_KEY")
val hasDbTable = conf.hasPath(s"$p$TABLE_KEY") || conf.hasPath(s"$p$DB_TABLE_KEY")
val hasDbTable = conf.hasPath(s"$p$TABLE_KEY") || conf.hasPath(s"$p$DB_TABLE_KEY") || conf.hasPath(s"$p$TOPIC_KEY")
val hesSomething = if (prefix.isEmpty) !conf.isEmpty else conf.hasPath(prefix)

val tableDef = if (conf.hasPath(s"$p$TABLE_KEY")) {
Some(conf.getString(s"$p$TABLE_KEY"))
} else if (conf.hasPath(s"$p$DB_TABLE_KEY")) {
Some(conf.getString(s"$p$DB_TABLE_KEY"))
} else if (conf.hasPath(s"$p$TOPIC_KEY")) {
Some(conf.getString(s"$p$TOPIC_KEY"))
} else {
None
}
Expand All @@ -50,9 +53,9 @@ object QueryBuilder {
case _ =>
val parent = if (parentPath.isEmpty) "" else s" at $parentPath"
if (prefix.isEmpty)
throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY'$parent.")
throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY', '$TOPIC_KEY'$parent.")
else
throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY'$parent.")
throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY', '$p$TOPIC_KEY'$parent.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class QueryBuilderSuite extends AnyWordSpec {
TestCase("", """data.file.1 = "/some/data/file"""", Query.Custom(Map("data.file.1" -> "/some/data/file"))),
TestCase("input", """input.sql = "SELECT * FROM table"""", Query.Sql("SELECT * FROM table")),
TestCase("input", """input.table = table1""", Query.Table("table1")),
TestCase("input", """input.db.table = table1""", Query.Table("table1")),
TestCase("input", """input.topic = topic1""", Query.Table("topic1")),
TestCase("input", """input.path = /some/path""", Query.Path("/some/path")),
TestCase("input", "input.data.file.1 = /some/data/file1\ninput.data.file.2 = /some/data/file2",
Query.Custom(Map("data.file.1" -> "/some/data/file1", "data.file.2" -> "/some/data/file2")))
Expand All @@ -53,7 +55,7 @@ class QueryBuilderSuite extends AnyWordSpec {
QueryBuilder.fromConfig(conf, "", "")
}

assert(exception.getMessage == "No options are specified for the query. Usually, it is one of: 'sql', 'path', 'table', 'db.table'.")
assert(exception.getMessage == "No options are specified for the query. Usually, it is one of: 'sql', 'path', 'table', 'db.table', 'topic'.")
}

"throw an exception when the prefix is empty" in {
Expand All @@ -63,7 +65,7 @@ class QueryBuilderSuite extends AnyWordSpec {
QueryBuilder.fromConfig(conf, "input", "")
}

assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table'.")
assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic'.")
}

"throw an exception when the prefix is empty and parent is specified" in {
Expand All @@ -73,7 +75,7 @@ class QueryBuilderSuite extends AnyWordSpec {
QueryBuilder.fromConfig(conf, "input", "my.parent")
}

assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table' at my.parent.")
assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic' at my.parent.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class SourceTableParserSuite extends AnyWordSpec {
SourceTableParser.fromConfig(conf, "source.tables")
}

assert(ex.getMessage.contains("No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table' at source.tables[0]."))
assert(ex.getMessage.contains("No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic' at source.tables[0]."))
}

"throw an exception in case of duplicate entries" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.pramen.extras.source
import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.StringType
import org.slf4j.LoggerFactory
import za.co.absa.abris.avro.functions.from_avro
import za.co.absa.abris.config.AbrisConfig
Expand Down Expand Up @@ -48,9 +49,15 @@ import java.time.LocalDate
*
* # [Optional] Set name for the struct field that contains Kafka record metadata
* custom.kafka.column = "kafka"
*
* # [Optional] Set name for the Kafka key column
* key.column.name = "kafka_key"
*
* # The Kafka key serializer when 'key.naming.strategy' is NOT defined. Can be "none", "binary", "string".
* # When 'key.naming.strategy' IS defined in 'schema.registry', Avro deserialization is used automatically.
* # Default is "binary".
* #key.column.serializer = "none"
*
* kafka {
* bootstrap.servers = "mybroker1:9092,mybroker2:9092"
*
Expand All @@ -66,6 +73,7 @@ import java.time.LocalDate
*
* # Can be one of: topic.name, record.name, topic.record.name
* value.naming.strategy = "topic.name"
* #key.naming.strategy = "topic.name"
*
* # If you want to force the specific schema id. Otherwise, the latest schema id will be used.
* # key.schema.id =
Expand Down Expand Up @@ -114,6 +122,9 @@ class KafkaAvroSource(sourceConfig: Config,

private val kafkaColumnName = ConfigUtils.getOptionString(sourceConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka")
private val keyColumnName = ConfigUtils.getOptionString(sourceConfig, KEY_COLUMN_KEY).getOrElse("kafka_key")
private val keyColumnSerializer = ConfigUtils.getOptionString(sourceConfig, KEY_COLUMN_SERIALIZER_KEY).getOrElse("binary").toLowerCase.trim
private val tempKafkaColumnName = "tmp_pramen_kafka"
private val tempKafkaKeyColumnName = "tmp_pramen_kafka_key"

override def hasInfoDateColumn(query: Query): Boolean = false

Expand Down Expand Up @@ -206,32 +217,46 @@ class KafkaAvroSource(sourceConfig: Config,
col("timestampType").as("timestamp_type")
))

val hasKey = kafkaAvroConfig.keyNamingStrategy.isDefined || keyColumnSerializer != "none"

val df2 = kafkaAvroConfig.keyNamingStrategy match {
case Some(keyNamingStrategy) =>
val abrisKeyConfig = keyNamingStrategy
.applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true)
.usingSchemaRegistry(schemaRegistryClientConfig)
df1.withColumn("tmp_pramen_kafka_key", from_avro(col("key"), abrisKeyConfig))
df1.withColumn(tempKafkaKeyColumnName, from_avro(col("key"), abrisKeyConfig))
case None =>
df1.withColumn("tmp_pramen_kafka_key", col("key"))
keyColumnSerializer match {
case "none" => df1
case "binary" => df1.withColumn(tempKafkaKeyColumnName, col("key"))
case "string" => df1.withColumn(tempKafkaKeyColumnName, col("key").cast(StringType))
case "avro" => throw new IllegalArgumentException("For the 'avro' serializer of Kafka topic key, 'schema.registry.key.naming.strategy' needs to be set.")
case x => throw new IllegalArgumentException(s"Unknown Kafka key serializer '$x'. Can be one of: none, binary, string, avro.")
}
}

val payloadFields = df2.select("data.*").schema.fieldNames.toSet
if (payloadFields.contains(kafkaColumnName)) {
log.warn(s"Payload field '$kafkaColumnName' conflicts with Kafka metadata struct name and will be replaced.")
log.warn(s"Payload field '$kafkaColumnName' conflicts with reserved Kafka metadata struct name and will be replaced.")
}
if (payloadFields.contains(keyColumnName)) {
log.warn(s"Payload field '$keyColumnName' conflicts with Kafka key column name and will be replaced.")
log.warn(s"Payload field '$keyColumnName' conflicts with reserved Kafka key column name and will be removed.")
}

// Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields,
// drop them
val dfFinal = df2
.select("tmp_pramen_kafka_key", "data.*", "tmp_pramen_kafka")
.drop(kafkaColumnName)
.drop(keyColumnName)
.withColumnRenamed("tmp_pramen_kafka", kafkaColumnName)
.withColumnRenamed("tmp_pramen_kafka_key", keyColumnName)
val dfFinal = if (hasKey) {
df2.select(tempKafkaKeyColumnName, "data.*", tempKafkaColumnName)
.drop(kafkaColumnName)
.drop(keyColumnName)
.withColumnRenamed(tempKafkaColumnName, kafkaColumnName)
.withColumnRenamed(tempKafkaKeyColumnName, keyColumnName)
} else {
df2.select("data.*", tempKafkaColumnName)
.drop(kafkaColumnName)
.drop(keyColumnName)
.withColumnRenamed(tempKafkaColumnName, kafkaColumnName)
}

SourceResult(dfFinal)
}
Expand All @@ -243,6 +268,7 @@ object KafkaAvroSource extends ExternalChannelFactoryV2[KafkaAvroSource] {
val TOPIC_NAME_KEY = "topic.name"
val CUSTOM_KAFKA_COLUMN_KEY = "custom.kafka.column"
val KEY_COLUMN_KEY = "key.column.name"
val KEY_COLUMN_SERIALIZER_KEY = "key.column.serializer"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Public constant looks good; ensure config default is discoverable.

Confirm key.column.serializer default is present in reference.conf for visibility.


🏁 Script executed:

#!/bin/bash
# Verify default presence and examples for key.column.serializer
fd -a reference.conf
rg -nC2 'key\.column\.serializer' -g 'pramen/**/reference.conf'

Length of output: 196


I need to inspect the reference.conf files directly to verify if the configuration default is present.


---

🏁 Script executed:

```shell
#!/bin/bash
# Search for key.column.serializer in all found reference.conf files
echo "=== Searching in extras reference.conf ==="
rg -nC2 'key\.column\.serializer' /home/jailuser/git/pramen/extras/src/main/resources/reference.conf

echo -e "\n=== Searching in core reference.conf ==="
rg -nC2 'key\.column\.serializer' /home/jailuser/git/pramen/core/src/main/resources/reference.conf

echo -e "\n=== Checking if extras reference.conf exists and contains kafka config ==="
cat /home/jailuser/git/pramen/extras/src/main/resources/reference.conf 2>/dev/null | head -50

Length of output: 1132


Let me verify more thoroughly by checking the full contents and looking for any Kafka configuration:


---

🏁 Script executed:

```shell
#!/bin/bash
# Check full extras reference.conf for Kafka sections
echo "=== Full extras reference.conf ==="
cat /home/jailuser/git/pramen/extras/src/main/resources/reference.conf

echo -e "\n=== Search for any 'kafka' in reference.conf files ==="
rg -in 'kafka' /home/jailuser/git/pramen/extras/src/main/resources/reference.conf /home/jailuser/git/pramen/core/src/main/resources/reference.conf

echo -e "\n=== Check KafkaAvroSource for default value handling ==="
rg -nC3 'key\.column\.serializer\|KEY_COLUMN_SERIALIZER' /home/jailuser/git/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala

Length of output: 1120


Let me verify the file path and view the KafkaAvroSource code directly:

#!/bin/bash
# Find the correct KafkaAvroSource file
echo "=== Finding KafkaAvroSource.scala ==="
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f

echo -e "\n=== View context around line 271 ==="
fd -e scala "KafkaAvroSource" /home/jailuser/git/pramen --exec cat {} \; | sed -n '265,280p'

---


🏁 Script executed:

#!/bin/bash
# Find KafkaAvroSource.scala and view context around line 271
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f

echo -e "\n=== View lines 265-280 of KafkaAvroSource.scala ==="
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f -exec sed -n '265,280p' {} +

echo -e "\n=== View surrounding context for KEY_COLUMN_SERIALIZER_KEY ==="
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f -exec grep -nC5 "KEY_COLUMN_SERIALIZER" {} +

Length of output: 2062


Let me check if there are any Kafka configuration examples elsewhere in the codebase:


---

🏁 Script executed:

```shell
#!/bin/bash
# Check if Kafka config is documented elsewhere
echo "=== Search for Kafka config examples in test resources or documentation ==="
find /home/jailuser/git/pramen -type f \( -name "*.conf" -o -name "*.md" -o -name "*.txt" \) -exec grep -l "key\.column" {} +

echo -e "\n=== Check test resources for Kafka configuration examples ==="
find /home/jailuser/git/pramen -path "*/test/resources/*" -name "*.conf" -type f -exec cat {} \;

Length of output: 56619


Add Kafka configuration defaults to reference.conf for discoverability.

Verification confirms key.column.serializer is absent from reference.conf. While the code provides a hardcoded default of "binary", this configuration is not documented or discoverable. Add a Kafka source configuration section to pramen/extras/src/main/resources/reference.conf listing available options (including key.column.serializer, key.column.name, custom.kafka.column, and topic.name) with their defaults for user visibility.

🤖 Prompt for AI Agents
In
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala
around line 271, the constant KEY_COLUMN_SERIALIZER_KEY
("key.column.serializer") is used but not documented in reference.conf; update
pramen/extras/src/main/resources/reference.conf to add a kafka source
configuration section (e.g., pramen.extras.source.kafka or similar existing
namespace) that lists the relevant keys and their defaults for discoverability:
key.column.serializer = "binary", key.column.name = "key", custom.kafka.column =
"" (or appropriate default), and topic.name = "" (or appropriate default), plus
brief comments describing accepted options where useful; ensure keys match the
exact names used in code and place the section in alphabetical/consistent order
with existing configs.


val KAFKA_TOKENS_TO_REDACT = Set("password", "jaas.config", "auth.user.info")

Expand Down