From b1bd50b2185b820188f30ff658d04200cceedf5a Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 24 Oct 2025 09:55:59 +0200 Subject: [PATCH 1/3] #648 Add support for selecting if a key is present and the ability to use binary or string serializer. --- README.md | 7 ++++ .../extras/source/KafkaAvroSource.scala | 42 +++++++++++++++---- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index d04b3cc9..9fc7d757 100644 --- a/README.md +++ b/README.md @@ -941,8 +941,14 @@ pramen.sources = [ # [Optional] Set name for the struct field that contains Kafka record metadata #custom.kafka.column = "kafka" + # [Optional] Set name for the Kafka key column #key.column.name = "kafka_key" + + # The Kafka key serializer. Can be "none", "binary", "string", "avro". + # When "avro", "key.naming.strategy" should be deined at the "schema.registry" section. + # Default is "binary", but if "key.naming.strategy" is defined, "avro" is selected automatically. + #key.column.serializer = "none" kafka { bootstrap.servers = "mybroker1:9092,mybroker2:9092" @@ -956,6 +962,7 @@ pramen.sources = [ schema.registry { url = "https://my.schema.registry:8081" value.naming.strategy = "topic.name" + #key.naming.strategy = "topic.name" # Arbitrary options for Schema registry basic.auth.credentials.source = "..." diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index 3141afea..c8ef7aa9 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -19,6 +19,7 @@ package za.co.absa.pramen.extras.source import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.{col, struct} +import org.apache.spark.sql.types.StringType import org.slf4j.LoggerFactory import za.co.absa.abris.avro.functions.from_avro import za.co.absa.abris.config.AbrisConfig @@ -48,9 +49,15 @@ import java.time.LocalDate * * # [Optional] Set name for the struct field that contains Kafka record metadata * custom.kafka.column = "kafka" + * * # [Optional] Set name for the Kafka key column * key.column.name = "kafka_key" * + * # The Kafka key serializer. Can be "none", "binary", "string", "avro". + * # When "avro", "key.naming.strategy" should be deined at the "schema.registry" section. + * # Default is "binary", but if "key.naming.strategy" is defined, "avro" is selected automatically. + * #key.column.serializer = "none" + * * kafka { * bootstrap.servers = "mybroker1:9092,mybroker2:9092" * @@ -66,6 +73,7 @@ import java.time.LocalDate * * # Can be one of: topic.name, record.name, topic.record.name * value.naming.strategy = "topic.name" + * #key.naming.strategy = "topic.name" * * # If you want to force the specific schema id. Otherwise, the latest schema id will be used. * # key.schema.id = @@ -114,6 +122,9 @@ class KafkaAvroSource(sourceConfig: Config, private val kafkaColumnName = ConfigUtils.getOptionString(sourceConfig, CUSTOM_KAFKA_COLUMN_KEY).getOrElse("kafka") private val keyColumnName = ConfigUtils.getOptionString(sourceConfig, KEY_COLUMN_KEY).getOrElse("kafka_key") + private val keyColumnSerializer = ConfigUtils.getOptionString(sourceConfig, KEY_COLUMN_SERIALIZER_KEY).getOrElse("binary").toLowerCase.trim + private val tempKafkaColumnName = "tmp_pramen_kafka" + private val tempKafkaKeyColumnName = "tmp_pramen_kafka_key" override def hasInfoDateColumn(query: Query): Boolean = false @@ -206,14 +217,22 @@ class KafkaAvroSource(sourceConfig: Config, col("timestampType").as("timestamp_type") )) + val hasKey = keyColumnSerializer != "none" + val df2 = kafkaAvroConfig.keyNamingStrategy match { case Some(keyNamingStrategy) => val abrisKeyConfig = keyNamingStrategy .applyNamingStrategyToAbrisConfig(abrisValueBase, topic, isKey = true) .usingSchemaRegistry(schemaRegistryClientConfig) - df1.withColumn("tmp_pramen_kafka_key", from_avro(col("key"), abrisKeyConfig)) + df1.withColumn(tempKafkaKeyColumnName, from_avro(col("key"), abrisKeyConfig)) case None => - df1.withColumn("tmp_pramen_kafka_key", col("key")) + keyColumnSerializer match { + case "none" => df1 + case "binary" => df1.withColumn(tempKafkaKeyColumnName, col("key")) + case "string" => df1.withColumn(tempKafkaKeyColumnName, col("key").cast(StringType)) + case "avro" => throw new IllegalArgumentException("For the 'avro' serializer of Kafka topic key, 'schema.registry.key.naming.strategy' needs to be set.") + case x => throw new IllegalArgumentException("Unknown Kafka key serializer. Can be one of: none, binary, long, string, avro.") + } } val payloadFields = df2.select("data.*").schema.fieldNames.toSet @@ -226,12 +245,18 @@ class KafkaAvroSource(sourceConfig: Config, // Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields, // drop them - val dfFinal = df2 - .select("tmp_pramen_kafka_key", "data.*", "tmp_pramen_kafka") - .drop(kafkaColumnName) - .drop(keyColumnName) - .withColumnRenamed("tmp_pramen_kafka", kafkaColumnName) - .withColumnRenamed("tmp_pramen_kafka_key", keyColumnName) + val dfFinal = if (hasKey) { + df2.select(tempKafkaKeyColumnName, "data.*", tempKafkaColumnName) + .drop(kafkaColumnName) + .drop(keyColumnName) + .withColumnRenamed(tempKafkaColumnName, kafkaColumnName) + .withColumnRenamed(tempKafkaKeyColumnName, keyColumnName) + } else { + df2.select("data.*", tempKafkaColumnName) + .drop(kafkaColumnName) + .drop(keyColumnName) + .withColumnRenamed(tempKafkaColumnName, kafkaColumnName) + } SourceResult(dfFinal) } @@ -243,6 +268,7 @@ object KafkaAvroSource extends ExternalChannelFactoryV2[KafkaAvroSource] { val TOPIC_NAME_KEY = "topic.name" val CUSTOM_KAFKA_COLUMN_KEY = "custom.kafka.column" val KEY_COLUMN_KEY = "key.column.name" + val KEY_COLUMN_SERIALIZER_KEY = "key.column.serializer" val KAFKA_TOKENS_TO_REDACT = Set("password", "jaas.config", "auth.user.info") From 6cde1292437cf83ac8736a9547dcd790b40d3a98 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 29 Oct 2025 08:59:19 +0100 Subject: [PATCH 2/3] #649 Allow 'input.topic' for specifying the source topic name in Kafka Avro ingestion iobs. --- README.md | 2 +- .../scala/za/co/absa/pramen/core/model/QueryBuilder.scala | 5 ++++- .../za/co/absa/pramen/core/model/QueryBuilderSuite.scala | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9fc7d757..0aab29ba 100644 --- a/README.md +++ b/README.md @@ -989,7 +989,7 @@ pramen.operations = [ tables = [ { - input.table = "my_kafka_topic1" + input.topic = "my_kafka_topic1" output.metastore.table = "table1" } ] diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala index 8d36a27e..da68e133 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala @@ -25,19 +25,22 @@ object QueryBuilder { val PATH_KEY = "path" val TABLE_KEY = "table" val DB_TABLE_KEY = "db.table" // Same as table - for backwards compatibility and config readability + val TOPIC_KEY = "topic" // Same as table, only for sources for which topic name is more aligned to the terminology def fromConfig(conf: Config, prefix: String, parentPath: String): Query = { val p = if (prefix.isEmpty) "" else s"$prefix." val hasSql = conf.hasPath(s"$p$SQL_KEY") val hasPath = conf.hasPath(s"$p$PATH_KEY") - val hasDbTable = conf.hasPath(s"$p$TABLE_KEY") || conf.hasPath(s"$p$DB_TABLE_KEY") + val hasDbTable = conf.hasPath(s"$p$TABLE_KEY") || conf.hasPath(s"$p$DB_TABLE_KEY") || conf.hasPath(s"$p$TOPIC_KEY") val hesSomething = if (prefix.isEmpty) !conf.isEmpty else conf.hasPath(prefix) val tableDef = if (conf.hasPath(s"$p$TABLE_KEY")) { Some(conf.getString(s"$p$TABLE_KEY")) } else if (conf.hasPath(s"$p$DB_TABLE_KEY")) { Some(conf.getString(s"$p$DB_TABLE_KEY")) + } else if (conf.hasPath(s"$p$TOPIC_KEY")) { + Some(conf.getString(s"$p$TOPIC_KEY")) } else { None } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala index cfd378f7..46f553a4 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala @@ -30,6 +30,8 @@ class QueryBuilderSuite extends AnyWordSpec { TestCase("", """data.file.1 = "/some/data/file"""", Query.Custom(Map("data.file.1" -> "/some/data/file"))), TestCase("input", """input.sql = "SELECT * FROM table"""", Query.Sql("SELECT * FROM table")), TestCase("input", """input.table = table1""", Query.Table("table1")), + TestCase("input", """input.db.table = table1""", Query.Table("table1")), + TestCase("input", """input.topic = topic1""", Query.Table("topic1")), TestCase("input", """input.path = /some/path""", Query.Path("/some/path")), TestCase("input", "input.data.file.1 = /some/data/file1\ninput.data.file.2 = /some/data/file2", Query.Custom(Map("data.file.1" -> "/some/data/file1", "data.file.2" -> "/some/data/file2"))) From 549e0821e9b72dc30ba9fba0969a8312aa387e55 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 29 Oct 2025 11:39:31 +0100 Subject: [PATCH 3/3] #649 Fix PR suggestions - thanks @coderabbitai. --- README.md | 6 +++--- .../co/absa/pramen/core/model/QueryBuilder.scala | 4 ++-- .../absa/pramen/core/model/QueryBuilderSuite.scala | 6 +++--- .../core/pipeline/SourceTableParserSuite.scala | 2 +- .../pramen/extras/source/KafkaAvroSource.scala | 14 +++++++------- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 0aab29ba..665a3a9a 100644 --- a/README.md +++ b/README.md @@ -945,9 +945,9 @@ pramen.sources = [ # [Optional] Set name for the Kafka key column #key.column.name = "kafka_key" - # The Kafka key serializer. Can be "none", "binary", "string", "avro". - # When "avro", "key.naming.strategy" should be deined at the "schema.registry" section. - # Default is "binary", but if "key.naming.strategy" is defined, "avro" is selected automatically. + # The Kafka key serializer when key.naming.strategy is NOT defined. Can be "none", "binary", "string". + # When key.naming.strategy IS defined in schema.registry, Avro deserialization is used automatically. + # Default is "binary". #key.column.serializer = "none" kafka { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala index da68e133..0be7daa3 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala @@ -53,9 +53,9 @@ object QueryBuilder { case _ => val parent = if (parentPath.isEmpty) "" else s" at $parentPath" if (prefix.isEmpty) - throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY'$parent.") + throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY', '$TOPIC_KEY'$parent.") else - throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY'$parent.") + throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY', '$p$TOPIC_KEY'$parent.") } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala index 46f553a4..c5f4fa71 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala @@ -55,7 +55,7 @@ class QueryBuilderSuite extends AnyWordSpec { QueryBuilder.fromConfig(conf, "", "") } - assert(exception.getMessage == "No options are specified for the query. Usually, it is one of: 'sql', 'path', 'table', 'db.table'.") + assert(exception.getMessage == "No options are specified for the query. Usually, it is one of: 'sql', 'path', 'table', 'db.table', 'topic'.") } "throw an exception when the prefix is empty" in { @@ -65,7 +65,7 @@ class QueryBuilderSuite extends AnyWordSpec { QueryBuilder.fromConfig(conf, "input", "") } - assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table'.") + assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic'.") } "throw an exception when the prefix is empty and parent is specified" in { @@ -75,7 +75,7 @@ class QueryBuilderSuite extends AnyWordSpec { QueryBuilder.fromConfig(conf, "input", "my.parent") } - assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table' at my.parent.") + assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic' at my.parent.") } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala index 3e55b6fa..d08b071b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala @@ -119,7 +119,7 @@ class SourceTableParserSuite extends AnyWordSpec { SourceTableParser.fromConfig(conf, "source.tables") } - assert(ex.getMessage.contains("No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table' at source.tables[0].")) + assert(ex.getMessage.contains("No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic' at source.tables[0].")) } "throw an exception in case of duplicate entries" in { diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala index c8ef7aa9..9862ff99 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala @@ -53,9 +53,9 @@ import java.time.LocalDate * # [Optional] Set name for the Kafka key column * key.column.name = "kafka_key" * - * # The Kafka key serializer. Can be "none", "binary", "string", "avro". - * # When "avro", "key.naming.strategy" should be deined at the "schema.registry" section. - * # Default is "binary", but if "key.naming.strategy" is defined, "avro" is selected automatically. + * # The Kafka key serializer when 'key.naming.strategy' is NOT defined. Can be "none", "binary", "string". + * # When 'key.naming.strategy' IS defined in 'schema.registry', Avro deserialization is used automatically. + * # Default is "binary". * #key.column.serializer = "none" * * kafka { @@ -217,7 +217,7 @@ class KafkaAvroSource(sourceConfig: Config, col("timestampType").as("timestamp_type") )) - val hasKey = keyColumnSerializer != "none" + val hasKey = kafkaAvroConfig.keyNamingStrategy.isDefined || keyColumnSerializer != "none" val df2 = kafkaAvroConfig.keyNamingStrategy match { case Some(keyNamingStrategy) => @@ -231,16 +231,16 @@ class KafkaAvroSource(sourceConfig: Config, case "binary" => df1.withColumn(tempKafkaKeyColumnName, col("key")) case "string" => df1.withColumn(tempKafkaKeyColumnName, col("key").cast(StringType)) case "avro" => throw new IllegalArgumentException("For the 'avro' serializer of Kafka topic key, 'schema.registry.key.naming.strategy' needs to be set.") - case x => throw new IllegalArgumentException("Unknown Kafka key serializer. Can be one of: none, binary, long, string, avro.") + case x => throw new IllegalArgumentException(s"Unknown Kafka key serializer '$x'. Can be one of: none, binary, string, avro.") } } val payloadFields = df2.select("data.*").schema.fieldNames.toSet if (payloadFields.contains(kafkaColumnName)) { - log.warn(s"Payload field '$kafkaColumnName' conflicts with Kafka metadata struct name and will be replaced.") + log.warn(s"Payload field '$kafkaColumnName' conflicts with reserved Kafka metadata struct name and will be replaced.") } if (payloadFields.contains(keyColumnName)) { - log.warn(s"Payload field '$keyColumnName' conflicts with Kafka key column name and will be replaced.") + log.warn(s"Payload field '$keyColumnName' conflicts with reserved Kafka key column name and will be removed.") } // Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields,