Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Oct 29, 2025

Summary by CodeRabbit

Release Notes

  • New Features

    • Added support for "topic" as an input source configuration option (alongside table, db.table, path, and sql)
    • Added key.column.serializer configuration option for Kafka key handling (supports binary, string, avro, or none)
  • Documentation

    • Updated Kafka Avro source configuration examples and documentation

@coderabbitai
Copy link

coderabbitai bot commented Oct 29, 2025

Walkthrough

This pull request extends Pramen's Kafka integration by adding topic-based query support and implementing configurable key column serialization strategies. QueryBuilder now recognizes "topic" as a valid query configuration option alongside existing "table" and "db.table" options. KafkaAvroSource gains a new key.column.serializer configuration supporting binary, string, and Avro serialization formats with conditional logic for each strategy.

Changes

Cohort / File(s) Summary
Documentation Updates
README.md
Documents new key.column.serializer configuration option; updates Kafka Avro source examples to reference input.topic instead of input.table.
Topic-based Query Support
pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala
Adds public constant TOPIC_KEY = "topic"; extends table detection and resolution logic to handle topic configurations; updates error messages to include topic among valid query options.
Topic Query Tests
pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala
Adds test cases for topic-based queries (input.topic = topic1); updates error message assertions to include 'topic' and 'input.topic' in valid options.
Kafka Source Parser Tests
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala
Updates error message expectations to include input.topic in the list of valid source configuration options.
Kafka Avro Key Serialization
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala
Introduces KEY_COLUMN_SERIALIZER_KEY public constant; adds key.column.serializer configuration with "binary" default; implements conditional serialization logic branching on "none", "binary", "string", and "avro" strategies; conditionally materializes key columns into DataFrame based on key naming strategy and serializer selection.

Sequence Diagram

sequenceDiagram
    participant Config as Configuration
    participant KafkaAvroSource
    participant Kafka as Kafka Data
    participant DataFrame
    
    Config->>KafkaAvroSource: key.column.serializer: "binary"<br/>(or "string", "avro", "none")
    Config->>KafkaAvroSource: key.naming.strategy (optional)
    
    alt key.naming.strategy defined
        KafkaAvroSource->>Kafka: Read Avro key
        Kafka-->>KafkaAvroSource: Deserialized key
        KafkaAvroSource->>DataFrame: Select & rename to kafka_key
    else key.column.serializer: "binary"
        KafkaAvroSource->>Kafka: Read binary key
        Kafka-->>KafkaAvroSource: Raw key bytes
        KafkaAvroSource->>DataFrame: Materialize as kafka_key
    else key.column.serializer: "string"
        KafkaAvroSource->>Kafka: Read key
        Kafka-->>KafkaAvroSource: Key bytes
        KafkaAvroSource->>DataFrame: Cast to StringType, rename to kafka_key
    else key.column.serializer: "none"
        KafkaAvroSource->>DataFrame: Omit key handling
    end
    
    KafkaAvroSource->>DataFrame: Finalize with kafka & kafka_key columns
    DataFrame-->>KafkaAvroSource: Output DataFrame
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

  • KafkaAvroSource.scala: The new key serialization logic introduces multiple conditional branches (binary, string, avro, none strategies) with different DataFrame transformation patterns; requires careful review of each branch's correctness and interaction with key.naming.strategy configuration.
  • QueryBuilder.scala: While relatively straightforward constant addition and logic extension, the impact on query resolution warrants verification that topic handling doesn't introduce conflicts with existing table/db.table logic.
  • Test coverage: Verify that the new test cases for topic-based queries and error messages adequately cover all combinations of key serialization strategies and naming strategies.

Possibly related PRs

Poem

🐰 Topics hop where tables stood before,
Keys serialized in ways galore,
Binary, string, or Avro's dance,
The Kafka rabbit takes its chance! 🐇

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "Add ability to select how key is serialized in Kafka Avro source" directly aligns with the primary objective of this changeset. The main substantive change is in KafkaAvroSource.scala, which introduces the KEY_COLUMN_SERIALIZER_KEY configuration option allowing users to select between different key serialization methods ("binary", "string", "avro", "none"). The title accurately and concisely summarizes this core feature without vague terminology or extraneous details. Supporting changes like the TOPIC_KEY additions in QueryBuilder are secondary infrastructure adjustments that enable the main feature.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/649-kafka-source-key-serialization

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (1)

48-59: Update error messages to include TOPIC_KEY.

The error messages on lines 56 and 58 list the allowed query options but don't mention TOPIC_KEY. Users encountering these errors won't know they can use topic as an alternative.

Apply this diff:

       val parent = if (parentPath.isEmpty) "" else s" at $parentPath"
       if (prefix.isEmpty)
-        throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY'$parent.")
+        throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY', '$TOPIC_KEY'$parent.")
       else
-        throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY'$parent.")
+        throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY', '$p$TOPIC_KEY'$parent.")
🧹 Nitpick comments (1)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1)

56-59: Clarify documentation about automatic serializer selection.

The comment states that "avro" is selected automatically when key.naming.strategy is defined, but this is misleading. The code doesn't change keyColumnSerializer to "avro"; instead, when keyNamingStrategy is present (lines 222-227), it bypasses the keyColumnSerializer switch entirely and always uses Avro deserialization.

Consider clarifying the documentation:

-  *    # The Kafka key serializer. Can be "none", "binary", "string", "avro".
-  *    # When "avro", "key.naming.strategy" should be defined at the "schema.registry" section.
-  *    # Default is "binary", but if "key.naming.strategy" is defined, "avro" is selected automatically.
+  *    # The Kafka key serializer when key.naming.strategy is NOT defined. Can be "none", "binary", "string".
+  *    # When key.naming.strategy IS defined in schema.registry, Avro deserialization is used automatically.
+  *    # Default is "binary".
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2981884 and 6cde129.

📒 Files selected for processing (4)
  • README.md (3 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (1 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • getOptionString (43-49)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)
  • applyNamingStrategyToAbrisConfig (81-103)
pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (1)
pramen/api/src/main/scala/za/co/absa/pramen/api/Query.scala (2)
  • Query (28-56)
  • Table (37-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (1)

28-46: LGTM! Clean implementation of TOPIC_KEY support.

The addition of TOPIC_KEY follows the existing pattern for TABLE_KEY and DB_TABLE_KEY. The implementation correctly extends both the hasDbTable check and the tableDef resolution logic.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (3)

22-22: LGTM! Good string normalization.

The addition of the keyColumnSerializer config with a sensible default of "binary" is appropriate. Applying toLowerCase.trim ensures consistent comparison regardless of user input formatting.

Also applies to: 125-128


238-244: LGTM! Good conflict detection.

The warnings for payload field name conflicts with metadata column names provide helpful feedback to users and clearly indicate when their data fields will be replaced.


248-259: LGTM! Correct conditional DataFrame assembly.

The conditional logic properly handles both scenarios:

  • When a key is present: includes the key column in the output
  • When no key: excludes the key column entirely

The consistent pattern of dropping conflicting columns before renaming temp columns ensures the final schema is correct.

pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (1)

33-34: LGTM! Good test coverage for TOPIC_KEY.

The test cases appropriately verify that:

  • input.db.table resolves to a Query.Table
  • input.topic resolves to a Query.Table

This confirms that both keys are treated equivalently, which aligns with the implementation in QueryBuilder.scala.

README.md (1)

992-992: LGTM! Consistent terminology for Kafka topics.

Changing input.table to input.topic for the Kafka ingestion example improves clarity and aligns with the domain-specific terminology introduced by TOPIC_KEY.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
README.md (1)

944-952: Documentation correctly describes key serialization behavior.

The documentation clearly explains:

  • key.column.serializer is used when key.naming.strategy is NOT defined
  • Automatic Avro deserialization when key.naming.strategy IS defined
  • Valid serializer options and defaults

Note: The typo "deined" mentioned in past reviews has been fixed.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)

52-60: Documentation comments are clear and correct.

The key serialization behavior is well documented. The typo "deined" mentioned in past reviews has been corrected to "defined".


222-236: Key deserialization logic is well structured.

The branching logic correctly handles:

  • Avro deserialization when keyNamingStrategy is defined
  • Binary, string, and none serializers when it's not
  • Clear error for "avro" serializer without naming strategy
  • Helpful error message for unknown serializers

The issue mentioned in past reviews about "long" in the error message has been fixed—it's no longer listed as a valid option.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6cde129 and 1d9fbf0.

📒 Files selected for processing (3)
  • README.md (3 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (2 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • getOptionString (43-49)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)
  • applyNamingStrategyToAbrisConfig (81-103)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (1)

28-58: LGTM! Clean extension to support topic-based queries.

The addition of TOPIC_KEY and its consistent integration throughout the query resolution logic is well done. This provides better semantic clarity for Kafka and other message-queue sources while maintaining backward compatibility with existing table-based queries.

README.md (1)

993-993: Good update to use topic-based terminology.

Changing from input.table to input.topic in the Kafka example improves clarity and aligns with the new TOPIC_KEY support in QueryBuilder.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)

248-259: Final DataFrame assembly logic is correct.

The conditional assembly properly:

  • Selects appropriate columns based on hasKey
  • Drops conflicting field names from the payload
  • Renames temporary columns to final names
  • Maintains sensible column ordering

However, this logic depends on hasKey being calculated correctly (see comment on line 220).


271-271: LGTM! Public constant follows existing patterns.

Adding KEY_COLUMN_SERIALIZER_KEY makes the configuration key accessible externally and is consistent with other constants in the companion object.

@yruslan yruslan force-pushed the feature/649-kafka-source-key-serialization branch from 1d9fbf0 to 51a7acf Compare October 29, 2025 10:20
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1)

238-259: Update warning message to accurately reflect behavior when hasKey is false.

Lines 242-244 warn that a payload field with the same name as keyColumnName "will be replaced", but when hasKey is false (lines 255-259), the field is actually dropped, not replaced with a Kafka key column.

Consider updating the warning to reflect the actual behavior:

 if (payloadFields.contains(keyColumnName)) {
-  log.warn(s"Payload field '$keyColumnName' conflicts with Kafka key column name and will be replaced.")
+  log.warn(s"Payload field '$keyColumnName' conflicts with reserved Kafka key column name and will be removed.")
 }

Alternatively, only drop keyColumnName when hasKey is true to preserve payload fields when no key is being added.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1d9fbf0 and 51a7acf.

📒 Files selected for processing (6)
  • README.md (3 hunks)
  • pramen-py/pyproject.toml (1 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (4 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (7 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (1)
pramen/api/src/main/scala/za/co/absa/pramen/api/Query.scala (2)
  • Query (28-56)
  • Table (37-41)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • getOptionString (43-49)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)
  • applyNamingStrategyToAbrisConfig (81-103)
🪛 GitHub Actions: python_ci
pramen-py/pyproject.toml

[warning] 1-1: Warning: poetry.lock is not consistent with pyproject.toml. You may be getting improper dependencies. Run poetry lock [--no-update] to fix it.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (3)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala (1)

122-122: LGTM!

The error message correctly reflects the new input.topic option introduced in the QueryBuilder changes.

pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (1)

28-58: LGTM!

The TOPIC_KEY addition is implemented consistently throughout:

  • Properly defined as a public constant
  • Integrated into hasDbTable checks
  • Included in tableDef resolution
  • Reflected in all error messages

The implementation correctly treats topic as an alternative name for table, which aligns well with Kafka source terminology.

pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (1)

33-78: LGTM!

Excellent test coverage for the new TOPIC_KEY functionality:

  • Validates that input.db.table continues to work (backward compatibility)
  • Validates that input.topic produces the expected Query.Table result
  • Error message expectations correctly updated to include 'topic' option

{version="^22.1.0", python = ">=3.7,<4.0"},
]
pyspark = "3.1.3"
pyspark = "3.3.4"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Update poetry.lock to match pyproject.toml.

The pipeline is reporting that poetry.lock is inconsistent with this change. Run poetry lock --no-update to regenerate the lock file.

🤖 Prompt for AI Agents
In pramen-py/pyproject.toml around line 133, the pyproject declares pyspark =
"3.3.4" but poetry.lock is now out of sync; run `poetry lock --no-update` in the
project root to regenerate the lockfile without changing other dependency
versions, commit the updated poetry.lock, and push the change so the pipeline
and repository are consistent.

@github-actions
Copy link

github-actions bot commented Oct 29, 2025

Unit Test Coverage

Overall Project 83.47% -0.25% 🍏
Files changed 43.54%

Module Coverage
pramen:core Jacoco Report 85.42% 🍏
pramen-extras Jacoco Report 66.28% -2.47%
Files
Module File Coverage
pramen:core Jacoco Report QueryBuilder.scala 99.2% 🍏
pramen-extras Jacoco Report KafkaAvroSource.scala 0% -28.09%

@yruslan yruslan force-pushed the feature/649-kafka-source-key-serialization branch from 51a7acf to c984e75 Compare October 29, 2025 10:34
@yruslan yruslan force-pushed the feature/649-kafka-source-key-serialization branch from c984e75 to 549e082 Compare October 29, 2025 10:39
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (7)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (2)

35-35: Rename to reflect topic support.

Variable now also covers topic. Consider renaming hasDbTablehasTableOrTopic for clarity.

-    val hasDbTable = conf.hasPath(s"$p$TABLE_KEY") || conf.hasPath(s"$p$DB_TABLE_KEY") || conf.hasPath(s"$p$TOPIC_KEY")
+    val hasTableOrTopic = conf.hasPath(s"$p$TABLE_KEY") || conf.hasPath(s"$p$DB_TABLE_KEY") || conf.hasPath(s"$p$TOPIC_KEY")
@@
-    (hasSql, hasPath, hasDbTable, hesSomething) match {
+    (hasSql, hasPath, hasTableOrTopic, hesSomething) match {

56-59: Improve error when multiple query options are set.

Current message targets “no options specified”. Add a branch to detect multiple specified options and throw a targeted error (e.g., “Only one of: sql, path, table, db.table, topic is allowed.”).

@@
-    (hasSql, hasPath, hasDbTable, hesSomething) match {
+    val specified = Seq(
+      if (hasSql) Some(SQL_KEY) else None,
+      if (hasPath) Some(PATH_KEY) else None,
+      if (hasTableOrTopic) Some(s"$TABLE_KEY/$DB_TABLE_KEY/$TOPIC_KEY") else None
+    ).flatten
+    (hasSql, hasPath, hasTableOrTopic, hesSomething, specified.count(_ => true)) match {
+      case (_, _, _, _, n) if n > 1 =>
+        val parent = if (parentPath.isEmpty) "" else s" at $parentPath"
+        if (prefix.isEmpty)
+          throw new IllegalArgumentException(s"Multiple options specified for the query$parent. Only one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY', '$TOPIC_KEY' is allowed.")
+        else
+          throw new IllegalArgumentException(s"Multiple options specified for the '$prefix' query$parent. Only one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY', '$p$TOPIC_KEY' is allowed.")
       case (true, false, false, _)     => Query.Sql(conf.getString(s"$p$SQL_KEY"))
       case (false, true, false, _)     => Query.Path(conf.getString(s"$p$PATH_KEY"))
       case (false, false, true, _)     => Query.Table(tableDef.get)
       case (false, false, false, true) => Query.Custom(ConfigUtils.getExtraOptions(conf, prefix))
       case _                           =>
pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (1)

78-79: Consider adding multi-option conflict tests.

Add cases where two options are set (e.g., input.table + input.topic) and assert a “only one allowed” error if you implement that branch in QueryBuilder.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (3)

229-235: Clarify string decoding or support charset.

Casting binary → string assumes UTF‑8. Either:

  • Document that “string” uses UTF‑8; or
  • Allow string:<charset> (e.g., string:utf-16) and use from_csv-style decode or a small UDF.

Example (documentation-only change preferred in this PR):

-  *    # The Kafka key serializer when 'key.naming.strategy' is NOT defined. Can be "none", "binary", "string".
+  *    # The Kafka key serializer when 'key.naming.strategy' is NOT defined. Can be "none", "binary", "string" (UTF‑8).

125-128: Default and temp column names.

  • Good default for key.column.serializer = "binary".
  • Optionally make temp names even less collision‑prone (e.g., prefix with __pramen_tmp_).
-  private val tempKafkaColumnName = "tmp_pramen_kafka"
-  private val tempKafkaKeyColumnName = "tmp_pramen_kafka_key"
+  private val tempKafkaColumnName = "__pramen_tmp_kafka"
+  private val tempKafkaKeyColumnName = "__pramen_tmp_kafka_key"

56-60: Doc tweak: make UTF‑8 explicit for “string”.

Minor clarity: note that “string” decodes key bytes as UTF‑8 (given the cast).

README.md (1)

948-951: State encoding for “string” serializer.

Add “(UTF‑8)” to align with Spark binary→string cast behavior.

-    # The Kafka key serializer when key.naming.strategy is NOT defined. Can be "none", "binary", "string".
+    # The Kafka key serializer when key.naming.strategy is NOT defined. Can be "none", "binary", "string" (UTF‑8).
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 51a7acf and 549e082.

📒 Files selected for processing (5)
  • README.md (3 hunks)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (2 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (4 hunks)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala (1 hunks)
  • pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala
🧰 Additional context used
🧬 Code graph analysis (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (2)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/utils/ConfigUtils.scala (2)
  • ConfigUtils (24-205)
  • getOptionString (43-49)
pramen/extras/src/main/scala/za/co/absa/pramen/extras/writer/model/NamingStrategy.scala (1)
  • applyNamingStrategyToAbrisConfig (81-103)
pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (1)
pramen/api/src/main/scala/za/co/absa/pramen/api/Query.scala (2)
  • Query (28-56)
  • Table (37-41)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (7)
pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala (2)

28-28: Good addition: explicit TOPIC_KEY constant.

Clear, consistent with existing keys; enables topic-based configs cleanly.


42-43: Topic resolution path looks right.

Topic falls back after table/db.table; consistent precedence.

pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala (3)

33-35: Tests for db.table and topic look good.

Covers both aliases; maintains expected Query.Table resolution.


58-58: Error expectation updated correctly.

Message now includes 'topic'.


68-68: Prefixed error expectation updated correctly.

Includes 'input.topic' as intended.

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala (1)

220-220: hasKey fix is correct.

Including keyNamingStrategy.isDefined prevents losing the deserialized key when serializer is none. Nice.

README.md (1)

992-992: Docs match code: input.topic example

Good update to input.topic; consistent with QueryBuilder changes.

val TOPIC_NAME_KEY = "topic.name"
val CUSTOM_KAFKA_COLUMN_KEY = "custom.kafka.column"
val KEY_COLUMN_KEY = "key.column.name"
val KEY_COLUMN_SERIALIZER_KEY = "key.column.serializer"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Public constant looks good; ensure config default is discoverable.

Confirm key.column.serializer default is present in reference.conf for visibility.


🏁 Script executed:

#!/bin/bash
# Verify default presence and examples for key.column.serializer
fd -a reference.conf
rg -nC2 'key\.column\.serializer' -g 'pramen/**/reference.conf'

Length of output: 196


I need to inspect the reference.conf files directly to verify if the configuration default is present.


---

🏁 Script executed:

```shell
#!/bin/bash
# Search for key.column.serializer in all found reference.conf files
echo "=== Searching in extras reference.conf ==="
rg -nC2 'key\.column\.serializer' /home/jailuser/git/pramen/extras/src/main/resources/reference.conf

echo -e "\n=== Searching in core reference.conf ==="
rg -nC2 'key\.column\.serializer' /home/jailuser/git/pramen/core/src/main/resources/reference.conf

echo -e "\n=== Checking if extras reference.conf exists and contains kafka config ==="
cat /home/jailuser/git/pramen/extras/src/main/resources/reference.conf 2>/dev/null | head -50

Length of output: 1132


Let me verify more thoroughly by checking the full contents and looking for any Kafka configuration:


---

🏁 Script executed:

```shell
#!/bin/bash
# Check full extras reference.conf for Kafka sections
echo "=== Full extras reference.conf ==="
cat /home/jailuser/git/pramen/extras/src/main/resources/reference.conf

echo -e "\n=== Search for any 'kafka' in reference.conf files ==="
rg -in 'kafka' /home/jailuser/git/pramen/extras/src/main/resources/reference.conf /home/jailuser/git/pramen/core/src/main/resources/reference.conf

echo -e "\n=== Check KafkaAvroSource for default value handling ==="
rg -nC3 'key\.column\.serializer\|KEY_COLUMN_SERIALIZER' /home/jailuser/git/pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala

Length of output: 1120


Let me verify the file path and view the KafkaAvroSource code directly:

#!/bin/bash
# Find the correct KafkaAvroSource file
echo "=== Finding KafkaAvroSource.scala ==="
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f

echo -e "\n=== View context around line 271 ==="
fd -e scala "KafkaAvroSource" /home/jailuser/git/pramen --exec cat {} \; | sed -n '265,280p'

---


🏁 Script executed:

#!/bin/bash
# Find KafkaAvroSource.scala and view context around line 271
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f

echo -e "\n=== View lines 265-280 of KafkaAvroSource.scala ==="
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f -exec sed -n '265,280p' {} +

echo -e "\n=== View surrounding context for KEY_COLUMN_SERIALIZER_KEY ==="
find /home/jailuser/git/pramen -name "KafkaAvroSource.scala" -type f -exec grep -nC5 "KEY_COLUMN_SERIALIZER" {} +

Length of output: 2062


Let me check if there are any Kafka configuration examples elsewhere in the codebase:


---

🏁 Script executed:

```shell
#!/bin/bash
# Check if Kafka config is documented elsewhere
echo "=== Search for Kafka config examples in test resources or documentation ==="
find /home/jailuser/git/pramen -type f \( -name "*.conf" -o -name "*.md" -o -name "*.txt" \) -exec grep -l "key\.column" {} +

echo -e "\n=== Check test resources for Kafka configuration examples ==="
find /home/jailuser/git/pramen -path "*/test/resources/*" -name "*.conf" -type f -exec cat {} \;

Length of output: 56619


Add Kafka configuration defaults to reference.conf for discoverability.

Verification confirms key.column.serializer is absent from reference.conf. While the code provides a hardcoded default of "binary", this configuration is not documented or discoverable. Add a Kafka source configuration section to pramen/extras/src/main/resources/reference.conf listing available options (including key.column.serializer, key.column.name, custom.kafka.column, and topic.name) with their defaults for user visibility.

🤖 Prompt for AI Agents
In
pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala
around line 271, the constant KEY_COLUMN_SERIALIZER_KEY
("key.column.serializer") is used but not documented in reference.conf; update
pramen/extras/src/main/resources/reference.conf to add a kafka source
configuration section (e.g., pramen.extras.source.kafka or similar existing
namespace) that lists the relevant keys and their defaults for discoverability:
key.column.serializer = "binary", key.column.name = "key", custom.kafka.column =
"" (or appropriate default), and topic.name = "" (or appropriate default), plus
brief comments describing accepted options where useful; ensure keys match the
exact names used in code and place the section in alphabetical/consistent order
with existing configs.

@yruslan yruslan merged commit a2bb9af into main Oct 29, 2025
9 checks passed
@yruslan yruslan deleted the feature/649-kafka-source-key-serialization branch October 29, 2025 10:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants