Skip to content

Conversation

@pieterbreed
Copy link

@pieterbreed pieterbreed commented Mar 25, 2025

KTable checkpoints

We want to be able to persist a ktable's value (incl meta-data) to a blob store of some kind. We want to do this regularly, but not too often, so that the next time that a KTable is initialized, it can read the last checkpoint value, load it as the KTable's value, and then resume consuming from the KTable's topic from where that value was valid. This will save the effort & time required to load the entire KTable topic from the beginning, just to boot up the KTable.

This is an optimization focused on making the KTable components initialization phase faster.

Design requirements

  • We want to be able to opt-in to this behaviour with an extra, optional property on the KTable component config.
  • We want to have blob stores for local filesystem for repl/local dev and S3 for production.
  • It is not important to update the checkpoint after every KTable update, but it must be regular.
  • We don't want the blob store to just keep on filling up, so garbage collection must also happen. This GC must retain at least some number n of checkpoints, and all checkpoints less than a certain age.

Example configuration (from telemetry)

This shows the configuration for the blob storage engine (picker) and the checkpoint component.

 :afrolabs.components.kafka.checkpoint-storage.stores/checkpoint-storage-switcher
 {:type                       #or [#edn #option CHECKPOINT_STORAGE_TYPE
                                   #profile {:default :filesystem
                                             :prod    :s3}]
  :fs:store-root              #or [#option FS_CHECKPOINT_STORE_ROOT
                                   "./telemetry-checkpoint-storage"]
  :s3:bucketname              #option S3_CHECKPOINT_STORE_BUCKET
  :s3:path-prefix             #or [#option S3_CHECKPOINT_STORE_PATH_PREFIX
                                   #profile {:prod    "ktable-checkpoints"
                                             :default #join ["dev/" #or [#option USER
                                                                         "default_no_USER"]]}]
  :aws-client-config          #ig/ref :afrolabs.components.aws.client-config/aws-client-config}

 :afrolabs.components.kafka.checkpoint-storage/ktable-checkpoint-store
 {:clock                            #ig/ref :afrolabs.components.time/system-time
  :checkpoint-store                 #ig/ref :afrolabs.components.kafka.checkpoint-storage.stores/checkpoint-storage-switcher
  :admin-client                     #ig/ref :afrolabs.gometro-bridge.core/admin-client
  :parse-inst-as-java-time          true

  :checkpointing-period-duration    #or [#edn #option KTABLE_CHECKPOINTS_PERIOD_DURATION ;; '[120 :seconds]'
                                         #profile {:dev     [5 :minutes]
                                                   :default [1 :hours]}]
  :checkpoint-min-lifetime-duration #or [#edn #option KTABLE_CHECKPOINTS_MIN_LIFETIME_DURATION ;; '[1 :hours]'
                                         #profile {:dev     [1 :hours]
                                                   :default [12 :hours]}]
  :min-nr-of-checkpoints            #long #or [#option KTABLE_CHECKPOINTS_MIN_NR_OF_CHECKPOINTS
                                               10]}

This line in KTable component config opts into checkpoint behaviour:

:ktable-checkpoint-storage  #ig/ref :afrolabs.components.kafka.checkpoint-storage/ktable-checkpoint-store

Example of local file system blob store implementation

Shows the checkpoints for ktable relocation-replay-state on cluster lkc-ozxkny.

└── lkc-ozxkny_location-replay-state
    ├── 1743759556976-2025-04-04T09:39:16.976034124Z.edn.gz
    ├── 1743759619071-2025-04-04T09:40:19.071918855Z.edn.gz
    ├── 1743759681301-2025-04-04T09:41:21.301498816Z.edn.gz
    ├── 1743759743460-2025-04-04T09:42:23.460386139Z.edn.gz
    ├── 1743759805591-2025-04-04T09:43:25.591432312Z.edn.gz
    ├── 1743760117069-2025-04-04T09:48:37.069126124Z.edn.gz
    └── 1743760179738-2025-04-04T09:49:39.738436293Z.edn.gz

Pieter Breed added 29 commits March 25, 2025 10:21
…g-caches-for-ktable-values-to-improve-system-startup
- removed health trips from ktable checkpoints
Copy link
Author

@pieterbreed pieterbreed left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather big piece of code. Happy to do part of this review in person.

@pieterbreed pieterbreed requested a review from zan-xhipe April 7, 2025 08:10
@pieterbreed pieterbreed marked this pull request as ready for review April 7, 2025 08:10
…g-caches-for-ktable-values-to-improve-system-startup
@pieterbreed pieterbreed merged commit c25aa95 into main Apr 17, 2025
@pieterbreed pieterbreed deleted the feature/bri-128-using-caches-for-ktable-values-to-improve-system-startup branch April 17, 2025 12:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants