Skip to content

Conversation

@MadLittleMods
Copy link
Contributor

@MadLittleMods MadLittleMods commented Jan 22, 2026

When purging room history, send cache replication message for _get_event_cache

As described in https://github.com/element-hq/synapse-rust-apps/issues/157

Dev notes

Relevant code:

def invalidate_get_event_cache_after_txn(
self, txn: LoggingTransaction, event_id: str
) -> None:
"""
Prepares a database transaction to invalidate the get event cache for a given
event ID when executed successfully. This is achieved by attaching two callbacks
to the transaction, one to invalidate the async cache and one for the in memory
sync cache (importantly called in that order).
Arguments:
txn: the database transaction to attach the callbacks to
event_id: the event ID to be invalidated from caches
"""
txn.async_call_after(self._invalidate_async_get_event_cache, event_id)
txn.call_after(self._invalidate_local_get_event_cache, event_id)
async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
"""
Invalidates an event in the asynchronous get event cache, which may be remote.
Arguments:
event_id: the event ID to invalidate
"""
await self._get_event_cache.invalidate((event_id,))
def _invalidate_local_get_event_cache(self, event_id: str) -> None:
"""
Invalidates an event in local in-memory get event caches.
Arguments:
event_id: the event ID to invalidate
"""
self._get_event_cache.invalidate_local((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)
def _invalidate_local_get_event_cache_room_id(self, room_id: str) -> None:
"""Clears the in-memory get event caches for a room.
Used when we purge room history.
"""
self._get_event_cache.invalidate_on_extra_index_local((room_id,))
self._event_ref.clear()
self._current_event_fetches.clear()
def _invalidate_async_get_event_cache_room_id(self, room_id: str) -> None:
"""
Clears the async get_event cache for a room. Currently a no-op until
an async get_event cache is implemented - see https://github.com/matrix-org/synapse/pull/13242
for preliminary work.
"""
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> dict[str, EventCacheEntry]:
"""Fetch events from the caches, both in memory and any external.
May return rejected events.
Args:
events: list of event_ids to fetch
update_metrics: Whether to update the cache hit ratio metrics
"""
event_map = self._get_events_from_local_cache(
events, update_metrics=update_metrics
)
missing_event_ids = [e for e in events if e not in event_map]
event_map.update(
await self._get_events_from_external_cache(
events=missing_event_ids,
update_metrics=update_metrics,
)
)
return event_map

  • invalidate_get_event_cache_after_txn
  • _invalidate_async_get_event_cache
  • _invalidate_local_get_event_cache
  • _invalidate_local_get_event_cache_room_id
  • _invalidate_async_get_event_cache_room_id

_invalidate_caches_for_room_events

Pull Request Checklist

  • Pull request is based on the develop branch
  • Pull request includes a changelog file. The entry should:
    • Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from EventStore to EventWorkerStore.".
    • Use markdown where necessary, mostly for code blocks.
    • End with either a period (.) or an exclamation mark (!).
    • Start with a capital letter.
    • Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry.
  • Code style is correct (run the linters)

Comment on lines 403 to +408
self.invalidate_get_event_cache_after_txn(txn, event_id)
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(
txn, "_get_event_cache", (event_id,)
)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just move the _send_invalidation_to_replication call inside invalidate_get_event_cache_after_txn instead of doing it separately?

Is there a reason that we sometimes invalidate_get_event_cache_after_txn but don't want to send it over replication?

Copy link
Member

@devonh devonh Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside of adding in additional replication call locations would be the potential for large amounts of replication overhead per event, which can consume a lot of resources on each worker.
In this case, it appears that the ph_cache_fake cache function is used to propagate a single replication message to invalidate events for the entire room (see below).

Comment on lines 403 to +408
self.invalidate_get_event_cache_after_txn(txn, event_id)
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(
txn, "_get_event_cache", (event_id,)
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/element-hq/synapse-rust-apps/issues/157 only calls out this spot we're fixing but there are a few more places that we call invalidate_get_event_cache_after_txn without calling _send_invalidation_to_replication 🤔

# Once the txn completes, invalidate all of the relevant caches. Note that we do this
# up here because it captures all the events_and_contexts before any are removed.
for event, _ in events_and_contexts:
self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
if event.redacts:
self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)

Related to the other discussion about doing it all the time: #19404 (comment)

Comment on lines +404 to +408
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(
txn, "_get_event_cache", (event_id,)
)
Copy link
Contributor Author

@MadLittleMods MadLittleMods Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test this properly, I think this would be best served by some end-to-end Complement tests (WORKERS=1) but that means we need to introduce in-repo Complement tests to utilize the Synapse admin API's.

Perhaps we can also accomplish a test with BaseMultiWorkerStreamTestCase 🤞 although that's not very satisfying.

(putting this up for review without a test to answer the other questions)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we have some Sytest tests for purging history: matrix-org/sytest -> tests/48admin.pl#L91-L618

The After /purge_history users still get pushed for new messages test is known to be flaky and even failed in the CI for this PR.

Since Sytest is a nightmare to use and debug, I'd rather just work on other tests than try to add a test that stresses the correct things there. And surprising that those tests haven't been failing (perhaps cache is already being cleared somehow).

Copy link
Member

@devonh devonh Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests passing may not be surprising anymore based on my latest knowledge outlined below: #19404 (comment)

@MadLittleMods MadLittleMods marked this pull request as ready for review January 23, 2026 02:49
@MadLittleMods MadLittleMods requested a review from a team as a code owner January 23, 2026 02:49

logger.info("[purge] done")

self._invalidate_caches_for_room_events_and_stream(txn, room_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe it's not neccessary to add a replication line above because it is handled here.

The replication logic in synapse pro doesn't handle all streams yet, so I assumed at the time that we only needed to support the _get_event cache func.
It appears that Synapse relies on this invalidation of room history purging (which uses the ph_cache_fake cache func name) going out over replication in order to invalidate the get_event cache.

I think my original assessment of needing another outgoing replication message is wrong. And instead Synapse Pro just needs to add support for invalidating the get_event cache based on purge room history cache func replication messages.

Digging down here:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really appreciate you diving into this 🙇

This feels like a very brittle system if we forever have to align with the various replication tasks that Synapse does. We're basically having to copy all of the process_replication_rows(...) logic into the separate Synapse Pro codebase. And it's a total cat and mouse game where a feature could ship in Synapse and we forget to patch the replication in Synapse Pro rust apps with no way to catch what's wrong.

We could slightly mitigate things by having a known list of cache_func for the caches replication stream and blow up in CI/testing when we see an new/unknown cache_func that we need to handle before shipping things. This assumes proper test coverage of all features including stressing the replication. And this doesn't help the case where a bug is fixed for an existing cache_func condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed it is seeming more and more fragile the closer I look.
Perhaps this is one to discuss in our Monday sync to get more minds involved in the discussion.

Comment on lines 403 to +408
self.invalidate_get_event_cache_after_txn(txn, event_id)
# Send that invalidation to replication so that other workers also invalidate
# the event cache.
self._send_invalidation_to_replication(
txn, "_get_event_cache", (event_id,)
)
Copy link
Member

@devonh devonh Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downside of adding in additional replication call locations would be the potential for large amounts of replication overhead per event, which can consume a lot of resources on each worker.
In this case, it appears that the ph_cache_fake cache function is used to propagate a single replication message to invalidate events for the entire room (see below).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants