Implement series cache invalidation#1752
Implement series cache invalidation#1752JamesGuthrie merged 1 commit intofeature/series-cache-invalidationfrom
Conversation
71e1489 to
a773990
Compare
antekresic
left a comment
There was a problem hiding this comment.
Overall looks good, couple of questions/suggestions left.
| err, _ = insertSeries(ctx, conn, true, reqs...) | ||
| } | ||
| if isEpochAbort(err) { | ||
| // Epoch abort is non-recoverable. Return error here instead of retrying. |
There was a problem hiding this comment.
Just to confirm, this is non-recoverable because the series metadata is dropped at this point?
pkg/pgmodel/ingestor/ingestor.go
Outdated
| } | ||
| // Normalize and canonicalize ts.Labels. | ||
| // After this point ts.Labels should never be used again. | ||
|
|
| return | ||
| } | ||
| if cacheCurrentEpoch.After(dbCurrentEpoch) { | ||
| log.Warn("msg", "The connector's cache epoch is greater than the databases. This is unexpected", "connector_current_epoch", cacheCurrentEpoch, "database_current_epoch", dbCurrentEpoch) |
There was a problem hiding this comment.
I'd flush the cache in this case. Otherwise we will just get spammed by these messages and it is an invalid cache epoch.
There was a problem hiding this comment.
If we do flush the cache, it would probably make sense to also reset the cache's epoch to the epoch we just got from the DB. Seems like a sensible move.
a773990 to
d70d62c
Compare
There was a problem hiding this comment.
The first pass. I still need to take a better look at the tests.
Assuming I understand the following correctly, I think it more or less matches our model:
There is always a single Ingestor per connector process. The Ingestor spawns a single Dispatcher, which, in turn, spawns several Copier goroutines. Ingestor and Dispatcher together act as one of CacheRefreshWorkers from the model.
sumerman
left a comment
There was a problem hiding this comment.
Looks great. There's one extra-paranoia point that we discussed offline.
| return model.SeriesEpoch(dbCurrentEpoch), model.SeriesEpoch(dbDeleteEpoch), nil | ||
| } | ||
|
|
||
| func (p *PgxDispatcher) RefreshSeriesEpoch() { |
There was a problem hiding this comment.
My quick experiments with the TLA model suggest that it is safe as it is, but my paranoia wants to have a lock for this method to prevent more than one instance of it from running concurrently. The concurrent attempt doesn't have to block on the mutex, TryLock and bail should be sufficient.
There was a problem hiding this comment.
I just observed this happening in the test setup. TryLock is a smart approach. Done.
f85d794 to
80ffe83
Compare
|
I've squashed down all of the changes during review and retargeted against the branch |
This change implements invalidation of the series cache, and mechanisms to prevent the ingestion of data based on stale cache information. In principle, the cache invalidation mechanism works as follows: In the database, we track two values: `current_epoch`, and `delete_epoch`. These are unix timestamps (which, for reasons of backwards-compatibility, are stored in a BIGINT field), updated every time that rows in the series table are deleted. `current_epoch` is set from `now()`, and `delete_epoch` is set from `now() - epoch_duration`. `epoch_duration` is a configurable parameter. When a series row is to be deleted, instead of immediately deleting it, we set the `delete_epoch` column of the series row to the `current_epoch` timestamp (the time at which we decided that it will be deleted). After `epoch_duration` time elapses, the row is removed. When the connector starts, it reads `current_epoch` from the database and stores this value with the series cache as `cache_current_epoch`. The connector periodically fetches the ids of series rows where `delete_epoch` is not null, together with `current_epoch`. It removes these entries from the cache, and updates `cache_current_epoch` to the value of `current_epoch` which was fetched from the database. As the connector receives samples to be inserted, it tracks the smallest value of `cache_current_epoch` that it saw for that batch. When it comes to inserting the samples in a batch into the database, it asserts (in the database) that the cache which was read from was not stale. This is expressed as: `cache_current_epoch > delete_epoch`. If this is not the case, the insert is aborted. This is a companion change to [1] which implements the database-side logic required for cache invalidation. [1]: timescale/promscale_extension#529
80ffe83 to
df28b69
Compare
This change implements invalidation of the series cache, and mechanisms to prevent the ingestion of data based on stale cache information.
In principle, the cache invalidation mechanism works as follows:
In the database, we track two values:
current_epoch, anddelete_epoch. These are unix timestamps (which, for reasons of backwards-compatibility, are stored in a BIGINT field), updated every time that rows in the series table are deleted.current_epochis set fromnow(), anddelete_epochis set fromnow() - epoch_duration.epoch_durationis a configurable parameter.When a series row is to be deleted, instead of immediately deleting it, we set the
delete_epochcolumn of the series row to thecurrent_epochtimestamp (the time at which we decided that it will be deleted). Afterepoch_durationtime elapses, the row is removed.When the connector starts, it reads
current_epochfrom the database and stores this value with the series cache ascache_current_epoch. The connector periodically fetches the ids of series rows wheredelete_epochis not null, together withcurrent_epoch. It removes these entries from the cache, and updatescache_current_epochto the value ofcurrent_epochwhich was fetched from the database.As the connector receives samples to be inserted, it tracks the smallest value of
cache_current_epochthat it saw for that batch. When it comes to inserting the samples in a batch into the database, it asserts (in the database) that the cache which was read from was not stale. This is expressed as:cache_current_epoch > delete_epoch. If this is not the case, the insert is aborted.This is a companion change to 1 which implements the database-side logic required for cache invalidation.
Description
Merge requirements
Please take into account the following non-code changes that you may need to make with your PR: