-
Notifications
You must be signed in to change notification settings - Fork 30
Concurrent storage write batch support #425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: 0fb42f9 The changes in this PR will be included in the next version bump. This PR includes changesets to release 17 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
6522374 to
841273e
Compare
stevensJourney
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks really cool. I could not spot anything major which stands-out as being a potential issue.
| if (afterId) { | ||
| // Insert or update | ||
| const after_key: SourceKey = { g: this.group_id, t: sourceTable.id, k: afterId }; | ||
| const after_key: SourceKey = { g: this.group_id, t: sourceTable.id as bson.ObjectId, k: afterId }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this type cast be using mongoTableId?
| $cond: [ | ||
| can_checkpoint, | ||
| { | ||
| $max: ['$last_checkpoint', { $literal: this.persisted_op }, { $toLong: '$keepalive_op' }] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about the specifics here, but I noticed the Postgres implementation coerces the, potentially null persisted_op to zero. Should we do that here also?
This modifies the APIs and implementation of
BucketStorageBatchto allow using multiple batches concurrently without introducing consistency issues. The first goal of this is to allow reading the replication stream while snapshotting - see #426 for a Postgres implementation.The actual writes to the database still rely on exclusive database locks to a large extent, so there isn't true concurrency yet. But it does allow running a replication stream concurrently with replication snapshots without breaking.
Specifically, this changes:
commit()andkeepalive(). The only remaining difference is that keepalive is essentiallycommit({allowEmptyCommit: true}).snapshot_doneis now explicitly set after completing a snapshot, rather than setting it automatically on commit. No commit will go through until snapshot_done is set.current_datadeletes are now soft-deletes (storing an empty document/row instead of deleting the document/row), converted into hard deletes on the next commit. We need this to consistently handle snapshots while streaming.SourceTable.idto be strongly typed, specifically checking forstringin Postgres storage andObjectIdin MongoDB storage. This unfortunately had a big knock-on effect in tests, including now having different checksums in postgres storage vs mongodb storage.Other more minor changes:
causetoReplicationAbortedError.