Skip to content

Conversation

@yruslan
Copy link
Collaborator

@yruslan yruslan commented Jan 7, 2026

Closes #674

Summary by CodeRabbit

  • New Features

    • Added an operation- and transformer-level setting ignore.schema.change to opt out of schema-change validation for specific operations.
  • Documentation

    • README examples updated to show the new configuration and transformer parameters (including parallel/attempt/threads options).
  • Tests

    • Test coverage extended to validate the new ignore-schema-change behavior across ingestion, transformation, and sink scenarios.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 7, 2026

Walkthrough

Adds an operation-level ignoreSchemaChange flag, threads it through OperationDef, configuration, TaskRunnerBase.handleSchemaChange, and test fixtures; TaskRunnerBase now consults the flag to optionally bypass schema-change detection and notification.

Changes

Cohort / File(s) Summary
Documentation
README.md
Added operation/transformer examples showing ignore.schema.change and small whitespace formatting tweaks.
Core Configuration Model
pramen/core/src/main/scala/.../pipeline/OperationDef.scala
Added ignoreSchemaChange: Boolean field, introduced IGNORE_SCHEMA_CHANGE_KEY, and loaded the flag in fromConfig.
Schema Change Handler
pramen/core/src/main/scala/.../runner/task/TaskRunnerBase.scala
handleSchemaChange signature updated to accept OperationDef; logic now checks operationDef.ignoreSchemaChange (and raw table formats) to skip schema checks and returns updated (newSchemaRegistered, schemaDifferences) values; call sites updated.
Test Fixtures
pramen/core/src/test/scala/.../OperationDefFactory.scala
getDummyOperationDef gained ignoreSchemaChange: Boolean = false parameter and forwards it to OperationDef.
Configuration Tests
pramen/core/src/test/scala/.../pipeline/OperationDefSuite.scala
Tests updated to assert ignoreSchemaChange default and explicit values across ingestion, transformation, sink, and strict dependency cases.
Integration / Runner Tests
pramen/core/src/test/scala/.../tests/runner/task/TaskRunnerBaseSuite.scala
getUseCase now returns OperationDef (tuple extended); test destructuring and call sites updated; added test path exercising "ignore schema change if explicitly specified".

Sequence Diagram(s)

sequenceDiagram
    participant Runner as TaskRunnerBase
    participant OpDef as OperationDef
    participant DF as DataFrame
    participant Meta as MetaTable
    participant Journal as Journal/SchemaRegistry

    Note over Runner,OpDef: Run task with operation context
    Runner->>OpDef: read ignoreSchemaChange
    Runner->>DF: produce DataFrame from job
    Runner->>Meta: fetch registered schema
    alt ignoreSchemaChange == true or raw format
        Note right of Runner: Skip schema comparison\nreturn (false, [])
        Runner->>Journal: do not register schema change
    else normal path
        Runner->>DF: extract schema
        Runner->>Meta: compare schemas
        alt differences found
            Runner->>Journal: register new schema & record differences
            Journal-->>Runner: newSchemaRegistered = true
        else no differences
            Runner-->>Runner: newSchemaRegistered = false
        end
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Poem

🐰 Hopping through configs with a twitch of my nose,

I skip noisy changes where the quiet one goes.
A flag tucked in operation keeps alerts out of range,
Pipelines breathe easy — fewer schema-change bangs!
🥕✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: allowing operations to ignore schema changes, which directly matches the core feature implementation.
Linked Issues check ✅ Passed The PR implements the feature from #674: adds per-operation configuration to ignore schema changes via ignoreSchemaChange field, README examples, and updated handling logic.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing the schema-change ignore feature; no unrelated modifications detected in code or tests.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


📜 Recent review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9d9bc83 and 0be24ea.

📒 Files selected for processing (3)
  • README.md
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.

Applied to files:

  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala
🧬 Code graph analysis (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala (1)
pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala (1)
  • getUseCase (128-141)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Coverage on Scala 2.12.18
🔇 Additional comments (9)
README.md (2)

1774-1776: LGTM! Documentation for the new ignore.schema.change option is clear and well-placed.

The new configuration option is properly documented with its default value (false) and behavior description. This aligns with the PR objective to reduce redundant schema change notifications in ingestion-centric multi-stage pipelines.


2283-2283: Documentation for transformer operations is consistent.

The same ignore.schema.change option is documented for transformation jobs with identical semantics, which is appropriate since schema changes can occur in both ingestion and transformation paths.

Also applies to: 2325-2343

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (3)

362-362: LGTM! Call site correctly passes the operation definition.

The handleSchemaChange call now correctly receives task.job.operation to enable the new ignoreSchemaChange functionality.


394-399: LGTM! Consistent handling for post-transformation schema changes.

The second schema change check (after transformations are applied) also correctly passes the operation definition, ensuring consistent behavior regardless of when schema changes are detected.


573-577: Comment typos remain - please fix for clarity.

The comments at lines 575-576 have two issues that were flagged in a previous review:

  1. Line 575: "Raw tables do not need schema check" should read "Raw tables do not need schema check" (missing "not")
  2. Line 576: "return no changes" should be "return no changes" (was "non changes")

Also note: when ignoreSchemaChange is true, this implementation skips both schema tracking (saving to bookkeeper) and notifications. This means if the flag is later changed to false, schema tracking starts fresh from that point. Please confirm this is the intended behavior.

📝 Suggested fix
   private[core] def handleSchemaChange(df: DataFrame, table: MetaTable, operationDef: OperationDef, infoDate: LocalDate): (Boolean, List[SchemaDifference]) = {
     if (table.format.isRaw || operationDef.ignoreSchemaChange) {
-      // Raw tables do not need schema check
-      // When schema changes are explicitly ignored - return no changes
+      // Raw tables do not need schema check.
+      // When schema changes are explicitly ignored, return no changes.
       return (false, List.empty[SchemaDifference])
     }

Likely an incorrect or invalid review comment.

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala (4)

63-63: LGTM! Tuple destructuring updated correctly.

All call sites throughout the test file have been updated to account for the new OperationDef element in the returned tuple. The unused _ placeholder is appropriate for tests that don't need to access the operation definition directly.


542-546: LGTM! handleSchemaChange call sites updated correctly.

The test now correctly passes the operation parameter to handleSchemaChange, aligning with the updated method signature.


613-640: LGTM! Comprehensive test for ignoreSchemaChange behavior.

The test properly verifies that when ignoreSchemaChange = true:

  1. The method returns (false, Nil) indicating no schema change detected
  2. The bookkeeper is not updated with the new schema
  3. The old schema (from infoDate.minusDays(10)) remains the latest for all queried dates

The typo from the previous review (hasSchamaChangedhasSchemaChanged) has been fixed on line 624.


654-686: LGTM! Test helper updated appropriately.

The getUseCase method signature and return statement are correctly updated to expose the OperationDef for tests that need to verify operation-level behavior like ignoreSchemaChange.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In
@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala:
- Around line 573-577: Fix the two comment typos in handleSchemaChange: change
"Raw tables do need schema check" to "Raw tables do NOT need schema check" and
"return non changes" to "return no changes". Then decide and implement the
intended behavior for operationDef.ignoreSchemaChange: either keep the current
early return (which skips both schema saving and notifications) and update the
comment to state that ignoreSchemaChange suppresses both tracking and
notifications, or change the logic so only notifications are suppressed while
schema is still saved to the bookkeeper (i.e., remove
operationDef.ignoreSchemaChange from the early return condition and instead
branch later to skip notification logic). Ensure references to
table.format.isRaw and operationDef.ignoreSchemaChange in handleSchemaChange are
updated accordingly.

In
@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala:
- Around line 613-640: The test has a typo in the destructured variable name:
change (hasSchamaChanged, changes) returned by runner.handleSchemaChange(...) to
(hasSchemaChanged, changes) and update the subsequent assertion
assert(!hasSchamaChanged) to assert(!hasSchemaChanged); ensure any other
references in this test to hasSchamaChanged are similarly renamed so the
variable name matches the intended hasSchemaChanged symbol.
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 262f41f and 9d9bc83.

📒 Files selected for processing (6)
  • README.md
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-06-18T08:27:21.504Z
Learnt from: yruslan
Repo: AbsaOSS/pramen PR: 611
File: pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala:29-29
Timestamp: 2025-06-18T08:27:21.504Z
Learning: In pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/MetastoreDependencyFactory.scala, the constant variable name was changed from DATE_UNTIL_EXPR_KEY to DATE_TO_EXPR_KEY, but both constants hold the same string value "date.to". This is a variable name refactoring, not a configuration key change, so it doesn't affect backward compatibility.

Applied to files:

  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
🧬 Code graph analysis (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ConfigUtils.scala (2)
  • ConfigUtils (33-612)
  • getOptionBoolean (36-42)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Test Coverage on Scala 2.12.18
  • GitHub Check: Test Spark 3.4.4 on Scala 2.12.20
  • GitHub Check: Test Spark 2.4.8 on Scala 2.11.12
  • GitHub Check: Test Spark 3.4.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.12.20
  • GitHub Check: Test Spark 3.3.4 on Scala 2.13.16
  • GitHub Check: Test Spark 3.5.5 on Scala 2.13.16
🔇 Additional comments (8)
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala (1)

41-41: LGTM! Clean implementation of the new configuration option.

The ignoreSchemaChange field is properly integrated into the OperationDef model with:

  • Appropriate placement in the case class constructor
  • Config key following naming conventions
  • Default value of false ensuring backward compatibility
  • Consistent with other boolean flags like allowParallel and alwaysAttempt

Also applies to: 67-67, 105-105, 152-152

README.md (1)

1774-1776: Documentation is clear and helpful.

The new ignore.schema.change configuration option is well-documented with:

  • Clear description of its purpose
  • Correct default value shown
  • Appropriate placement in the operations configuration section
pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala (1)

33-33: Test factory correctly updated.

The ignoreSchemaChange parameter is properly added to the test factory with:

  • Correct default value (false) matching the production code
  • Appropriate parameter placement
  • Proper forwarding to the OperationDef constructor

Also applies to: 54-54

pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala (1)

112-112: Excellent test coverage for the new field.

The tests thoroughly validate the ignoreSchemaChange functionality:

  • Default value (false) verified across ingestion, transformation, and sink operations
  • Configured value (true) properly tested
  • Multiple operation types and scenarios covered

Also applies to: 131-131, 167-167, 232-232, 272-272

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (1)

362-362: Call sites correctly updated.

Both invocations of handleSchemaChange are properly updated to pass the task.job.operation as the new operationDef parameter, maintaining consistency across the codebase.

Also applies to: 396-396

pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala (3)

63-445: LGTM! Destructuring updates are correct.

The mechanical updates to accommodate the new OperationDef return value from getUseCase are applied consistently across all test cases. The destructuring patterns correctly ignore the new element where it's not needed.


542-611: LGTM! Schema change test updates are correct.

All existing handleSchemaChange test cases are properly updated to pass the new operation parameter. The tests maintain their original intent and correctly adapt to the new method signature.


643-687: LGTM! Test helper method correctly updated.

The getUseCase helper method signature is properly updated to return OperationDef as the fifth element. The operationDef is created with appropriate test configuration and correctly positioned in the return tuple. All call sites have been consistently updated to handle the new return value.

@github-actions
Copy link

github-actions bot commented Jan 7, 2026

Unit Test Coverage

Overall Project 84.18% 🍏
Files changed 100% 🍏

Module Coverage
pramen:core Jacoco Report 86.17% 🍏
Files
Module File Coverage
pramen:core Jacoco Report OperationDef.scala 89.45% 🍏
TaskRunnerBase.scala 82.73% 🍏

@yruslan yruslan force-pushed the feature/674-mark-ignore-schema-changes branch from 9d9bc83 to 0be24ea Compare January 7, 2026 08:58
@yruslan yruslan merged commit d7406ca into main Jan 7, 2026
9 checks passed
@yruslan yruslan deleted the feature/674-mark-ignore-schema-changes branch January 7, 2026 09:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add the ability to mark operations to ignore schema changes

2 participants