Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
cb2f89d
Add base redis cche manager
johnewart Feb 23, 2026
990dfde
Add pipelining to index operations for atomicity
johnewart Feb 23, 2026
60bd3ff
Add tests and make index operations pipelined
johnewart Feb 23, 2026
2231959
Formatting fixes
johnewart Feb 23, 2026
f9bb52e
Cleanup tests
johnewart Feb 23, 2026
64a5f7f
Add tests for index operations
johnewart Feb 24, 2026
901effd
Update src/fides/common/cache/manager.py
johnewart Feb 24, 2026
2d3d6e6
Add TTL management (opt-in) to manager
johnewart Feb 24, 2026
bbd2926
Add new tests dir to noxfiles
johnewart Feb 24, 2026
fe7bd8b
Formatting
johnewart Feb 24, 2026
b1a43cc
Add changelog
johnewart Feb 24, 2026
42495fa
Part 2 of ENG-740
johnewart Feb 11, 2026
34b86a2
Add DSR cache store + tests
johnewart Feb 23, 2026
1e953d0
Cleanup tests
johnewart Feb 11, 2026
5a80d71
Add DSR store to exports
johnewart Feb 23, 2026
9ed6d6e
Formatting
johnewart Feb 23, 2026
9365451
Formatting fixes
johnewart Feb 23, 2026
932774f
Part 3: Add identity operations to DSR cache store; added tests for t…
johnewart Feb 11, 2026
48c0bb7
Dedupe import and add a TODO about moving it
johnewart Feb 24, 2026
fb37da5
Simplify some tests
johnewart Feb 11, 2026
2bf58e0
Part 4: custom fields and encryption
johnewart Feb 11, 2026
6f938c4
DRP request body caching updates
johnewart Feb 18, 2026
ebfd156
Update tests/common/cache/test_dsr_store.py
johnewart Feb 25, 2026
9a8bfd1
Merge branch 'main' into johnewart/ENG-740/1-dsr-cache-storage
johnewart Feb 25, 2026
36af735
Add DSR cache store + tests
johnewart Feb 23, 2026
0181bda
Add DSR store to exports
johnewart Feb 23, 2026
90ec2c1
Formatting fixes
johnewart Feb 23, 2026
c0536fb
Dedupe import and add a TODO about moving it
johnewart Feb 24, 2026
864e94e
Part 2 of ENG-740
johnewart Feb 11, 2026
f9450f5
Cleanup tests
johnewart Feb 11, 2026
0fefc95
Formatting
johnewart Feb 23, 2026
bada4fb
Style fix
johnewart Feb 24, 2026
94be49b
Add missing commit from PR split
johnewart Feb 24, 2026
c0b5aee
Fix mypy
johnewart Feb 25, 2026
c5b6886
Merge branch 'main' into johnewart/ENG-740/1-dsr-cache-storage
johnewart Mar 19, 2026
db882eb
Fix merge error
johnewart Mar 19, 2026
6965fb3
Fix another merge issue
johnewart Mar 19, 2026
1a3fdf6
Add missing mock pipeline
johnewart Mar 19, 2026
6a78e01
Merge branch 'johnewart/ENG-740/2-key-clearing-logic' into johnewart/…
johnewart Mar 19, 2026
ecf9493
Merge branch 'johnewart/ENG-740/3-identity-caching' into johnewart/re…
johnewart Mar 19, 2026
a980a18
Merge branch 'johnewart/ENG-740/4-custom-fields-and-encryption' into …
johnewart Mar 19, 2026
d0101cb
Merge branch 'johnewart/ENG-740/5-cache-drp-request-body' into johnew…
johnewart Mar 19, 2026
368d217
Minor fixes
johnewart Mar 19, 2026
06fd04c
Merge branch 'main' into johnewart/redis-dsr-cache-integration
johnewart Mar 19, 2026
1dde1de
Fix some merge-introduced issues
johnewart Mar 19, 2026
2551f78
Fix some typing issues
johnewart Mar 20, 2026
21b88cd
Update some tests, add some very specific additional tests to simulat…
johnewart Mar 20, 2026
005017f
Formatting fixes
johnewart Mar 20, 2026
c56d7aa
Add changelog
johnewart Mar 20, 2026
bb75535
Fix 3 TLA+-confirmed concurrency bugs in DSRCacheStore, replace hand-…
Mar 20, 2026
d3445a0
Review cleanup: deduplicate DSRCacheStore, remove context manager cer…
Mar 20, 2026
9cb04d5
Address review feedback: fix hyphenated field extraction, remove dead…
Mar 23, 2026
70f4dc3
Merge branch 'main' into johnewart/redis-dsr-cache-integration
JadeCara Mar 23, 2026
385290e
Migrate async_execution cache path to DSRCacheStore
Mar 23, 2026
7ca932b
Address PR review feedback from code review
Mar 23, 2026
1d2b450
Fix test failures from async_execution migration
Mar 24, 2026
bf48363
Address PR review feedback: DSRCacheStore API improvements
Mar 24, 2026
5616747
Merge branch 'main' into johnewart/redis-dsr-cache-integration
JadeCara Mar 24, 2026
5df038a
Fix test failures from DSRCacheStore API refactor
Mar 25, 2026
f52434d
Merge branch 'main' into johnewart/redis-dsr-cache-integration
JadeCara Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/7708-adopt-structured-cache-for-dsrs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: Changed
description: Migrated DSR workflows to use structured caching mechanism with secondary index, ensuring backward compatibility with legacy cache keys for in-flight requests during deployment.
pr: 7708
labels: []
7 changes: 0 additions & 7 deletions src/fides/api/models/privacy_request/consent.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
CustomPrivacyRequestField as CustomPrivacyRequestFieldSchema,
)
from fides.api.schemas.redis_cache import IdentityBase
from fides.api.util.cache import FidesopsRedis, get_cache
from fides.api.util.identity_verification import IdentityVerificationMixin
from fides.config import CONFIG

Expand Down Expand Up @@ -95,12 +94,6 @@ class ConsentRequest(IdentityVerificationMixin, Base):
privacy_request_id = Column(String, ForeignKey("privacyrequest.id"), nullable=True)
privacy_request = relationship("PrivacyRequest")

def get_cached_identity_data(self) -> Dict[str, Any]:
"""Retrieves any identity data pertaining to this request from the cache."""
cache: FidesopsRedis = get_cache()
keys = cache.get_keys_by_prefix(f"id-{self.id}-identity-")
return {key.split("-")[-1]: cache.get(key) for key in keys}

def verify_identity(
self,
db: Session,
Expand Down
171 changes: 87 additions & 84 deletions src/fides/api/models/privacy_request/privacy_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,8 @@
from fides.api.tasks import celery_app
from fides.api.util.cache import (
FidesopsRedis,
get_all_cache_keys_for_privacy_request,
get_async_task_tracking_cache_key,
get_cache,
get_custom_privacy_request_field_cache_key,
get_drp_request_body_cache_key,
get_encryption_cache_key,
get_identity_cache_key,
get_dsr_cache_store,
)
from fides.api.util.collection_util import Row
from fides.api.util.constants import API_DATE_FORMAT
Expand Down Expand Up @@ -471,10 +466,8 @@ def clear_cached_values(self) -> None:
Clears all cached values associated with this privacy request from Redis.
"""
logger.info(f"Clearing cached values for privacy request {self.id}")
cache: FidesopsRedis = get_cache()
all_keys = get_all_cache_keys_for_privacy_request(privacy_request_id=self.id)
for key in all_keys:
cache.delete(key)
store = get_dsr_cache_store(self.id)
store.clear()

def delete(self, db: Session) -> None:
"""
Expand Down Expand Up @@ -506,19 +499,22 @@ def cache_identity(
self, identity: Union[Identity, Dict[str, LabeledIdentity]]
) -> None:
"""Sets the identity's values at their specific locations in the Fides app cache"""
cache: FidesopsRedis = get_cache()

if isinstance(identity, dict):
identity = Identity(**identity)

identity_dict: Dict[str, Any] = identity.labeled_dict()

for key, value in identity_dict.items():
if value is not None:
cache.set_with_autoexpire(
get_identity_cache_key(self.id, key),
FidesopsRedis.encode_obj(value),
)
store = get_dsr_cache_store(self.id)
# Encode values for Redis storage
encoded_dict = {
key: FidesopsRedis.encode_obj(value)
for key, value in identity_dict.items()
if value is not None
}
store.cache_identity_data(
encoded_dict,
expire_seconds=CONFIG.redis.default_ttl_seconds,
)

def cache_custom_privacy_request_fields(
self,
Expand All @@ -534,13 +530,17 @@ def cache_custom_privacy_request_fields(
return

if CONFIG.execution.allow_custom_privacy_request_fields_in_request_execution:
cache: FidesopsRedis = get_cache()
for key, item in custom_privacy_request_fields.items():
if item is not None:
cache.set_with_autoexpire(
get_custom_privacy_request_field_cache_key(self.id, key),
json.dumps(item.value, cls=CustomJSONEncoder),
)
store = get_dsr_cache_store(self.id)
# Encode values for Redis storage
encoded_fields = {
key: json.dumps(item.value, cls=CustomJSONEncoder)
for key, item in custom_privacy_request_fields.items()
if item is not None
}
store.cache_custom_fields(
encoded_fields,
expire_seconds=CONFIG.redis.default_ttl_seconds,
)
else:
logger.info(
"Custom fields from privacy request {}, but config setting 'CONFIG.execution.allow_custom_privacy_request_fields_in_request_execution' is set to false and prevents their usage.",
Expand Down Expand Up @@ -681,14 +681,20 @@ def verify_identity(self, db: Session, provided_code: str) -> "PrivacyRequest":

def get_cached_encryption_key(self) -> Optional[str]:
"""Gets the cached encryption key for this privacy request."""
cache: FidesopsRedis = get_cache()
encryption_key = cache.get(get_encryption_cache_key(self.id, "key"))
return encryption_key
store = get_dsr_cache_store(self.id)
raw = store.get_encryption("key")
if raw is None:
return None
if isinstance(raw, bytes):
return raw.decode(CONFIG.security.encoding)
return str(raw)

def get_cached_task_id(self) -> Optional[str]:
"""Gets the cached task ID for this privacy request."""
cache: FidesopsRedis = get_cache()
task_id = cache.get(get_async_task_tracking_cache_key(self.id))
store = get_dsr_cache_store(self.id)
task_id = store.get_async_execution()
if isinstance(task_id, bytes):
return task_id.decode(CONFIG.security.encoding)
return task_id

def get_async_execution_task(self) -> Optional[AsyncResult]:
Expand All @@ -698,32 +704,35 @@ def get_async_execution_task(self) -> Optional[AsyncResult]:
return res

def cache_drp_request_body(self, drp_request_body: DrpPrivacyRequestCreate) -> None:
"""Sets the identity's values at their specific locations in the Fides app cache"""
cache: FidesopsRedis = get_cache()
"""Sets the DRP request body values at their specific locations in the Fides app cache"""
drp_request_body_dict: Dict[str, Any] = dict(drp_request_body)

# Serialize complex objects to repr format for storage
serialized_body: Dict[str, Any] = {}
for key, value in drp_request_body_dict.items():
if value is not None:
# handle nested dict/objects
# Handle nested dict/objects
if not isinstance(value, (bytes, str, int, float)):
cache.set_with_autoexpire(
get_drp_request_body_cache_key(self.id, key),
repr(value),
)
serialized_body[key] = repr(value)
else:
cache.set_with_autoexpire(
get_drp_request_body_cache_key(self.id, key),
value,
)
serialized_body[key] = value

store = get_dsr_cache_store(self.id)
store.cache_drp_request_body(
serialized_body,
expire_seconds=CONFIG.redis.default_ttl_seconds,
)

def cache_encryption(self, encryption_key: Optional[str] = None) -> None:
"""Sets the encryption key in the Fides app cache if provided"""
if not encryption_key:
return

cache: FidesopsRedis = get_cache()
cache.set_with_autoexpire(
get_encryption_cache_key(self.id, "key"),
store = get_dsr_cache_store(self.id)
store.write_encryption(
"key",
encryption_key,
expire_seconds=CONFIG.redis.default_ttl_seconds,
)

def persist_masking_secrets(
Expand All @@ -745,53 +754,43 @@ def persist_masking_secrets(
},
)

def identity_prefix_cache_and_keys(self) -> Tuple[str, FidesopsRedis, List[str]]:
"""Returns the prefix and cache keys for the identity data for this request"""
prefix = f"id-{self.id}-identity-*"
cache: FidesopsRedis = get_cache()
keys = cache.get_keys_by_prefix(f"id-{self.id}-identity-")
return prefix, cache, keys

def verify_cache_for_identity_data(self) -> bool:
"""Verifies if the identity data is cached for this request"""
_, _, keys = self.identity_prefix_cache_and_keys()
return len(keys) > 0
store = get_dsr_cache_store(self.id)
return store.has_cached_identity_data()

def get_cached_identity_data(self) -> Dict[str, Any]:
"""Retrieves any identity data pertaining to this request from the cache"""
result: Dict[str, Any] = {}
prefix, cache, keys = self.identity_prefix_cache_and_keys()
store = get_dsr_cache_store(self.id)
result = store.get_cached_identity_data()

if not keys:
if not result:
logger.debug(f"Cache miss for request {self.id}, falling back to DB")
identity = self.get_persisted_identity()
self.cache_identity(identity)
keys = cache.get_keys_by_prefix(f"id-{self.id}-identity-")

for key in keys:
value = cache.get(key)
if value:
try:
# try parsing the value as JSON
parsed_value = json.loads(value)
except json.JSONDecodeError:
# if parsing as JSON fails, assume it's a string.
# this is purely for backward compatibility: to ensure
# that identity data stored pre-2.34.0 in the "old" format
# can still be correctly retrieved from the cache.
parsed_value = value
result[key.split("-")[-1]] = parsed_value
return result
result = store.get_cached_identity_data()

# Parse JSON values for backward compatibility
parsed_result: Dict[str, Any] = {}
for key, value in result.items():
try:
# try parsing the value as JSON
parsed_result[key] = json.loads(value)
except json.JSONDecodeError:
# if parsing as JSON fails, assume it's a string.
# this is purely for backward compatibility: to ensure
# that identity data stored pre-2.34.0 in the "old" format
# can still be correctly retrieved from the cache.
parsed_result[key] = value

return parsed_result

def get_cached_custom_privacy_request_fields(self) -> Dict[str, Any]:
"""Retrieves any custom fields pertaining to this request from the cache"""
result: Dict[str, Any] = {}
prefix = f"id-{self.id}-custom-privacy-request-field-"
store = get_dsr_cache_store(self.id)
result = store.get_cached_custom_fields()

cache: FidesopsRedis = get_cache()
keys = cache.get_keys_by_prefix(prefix)

if not keys:
if not result:
logger.debug(f"Cache miss for request {self.id}, falling back to DB")
custom_privacy_request_fields = (
self.get_persisted_custom_privacy_request_fields()
Expand All @@ -802,13 +801,17 @@ def get_cached_custom_privacy_request_fields(self) -> Dict[str, Any]:
for key, value in custom_privacy_request_fields.items()
}
)
keys = cache.get_keys_by_prefix(prefix)
result = store.get_cached_custom_fields()

# Parse JSON values
parsed_result: Dict[str, Any] = {}
for key, value in result.items():
try:
parsed_result[key] = json.loads(value)
except json.JSONDecodeError:
parsed_result[key] = value

for key in keys:
value = cache.get(key)
if value:
result[key.split("-")[-1]] = json.loads(value)
return result
return parsed_result

def cache_email_connector_template_contents(
self,
Expand Down
10 changes: 5 additions & 5 deletions src/fides/api/models/privacy_request/request_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
from fides.api.schemas.base_class import FidesSchema
from fides.api.schemas.policy import ActionType
from fides.api.util.cache import (
FidesopsRedis,
celery_tasks_in_flight,
get_async_task_tracking_cache_key,
get_cache,
get_dsr_cache_store,
)
from fides.api.util.collection_util import Row
from fides.config import CONFIG
Expand Down Expand Up @@ -247,8 +245,10 @@ def allowed_action_types(cls) -> List[str]:

def get_cached_task_id(self) -> Optional[str]:
"""Gets the cached celery task ID for this request task."""
cache: FidesopsRedis = get_cache()
task_id = cache.get(get_async_task_tracking_cache_key(self.id))
store = get_dsr_cache_store(self.id)
task_id = store.get_async_execution()
if isinstance(task_id, bytes):
return task_id.decode(CONFIG.security.encoding)
return task_id

def cleanup_external_storage(self) -> None:
Expand Down
8 changes: 5 additions & 3 deletions src/fides/api/service/privacy_request/request_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
from fides.api.util.cache import (
FidesopsRedis,
celery_tasks_in_flight,
get_async_task_tracking_cache_key,
get_cache,
get_dsr_cache_store,
get_privacy_request_retry_count,
increment_privacy_request_retry_count,
reset_privacy_request_retry_count,
Expand Down Expand Up @@ -336,9 +336,11 @@ def get_cached_task_id(entity_id: str) -> Optional[str]:

Raises Exception if cache operations fail, allowing callers to handle cache failures appropriately.
"""
cache: FidesopsRedis = get_cache()
try:
task_id = cache.get(get_async_task_tracking_cache_key(entity_id))
store = get_dsr_cache_store(entity_id)
task_id = store.get_async_execution()
if isinstance(task_id, bytes):
return task_id.decode(CONFIG.security.encoding)
return task_id
except Exception as exc:
logger.error(f"Failed to get cached task ID for entity {entity_id}: {exc}")
Expand Down
16 changes: 9 additions & 7 deletions src/fides/api/tasks/encryption_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Optional, Union

from fides.api.cryptography.cryptographic_util import bytes_to_b64_str
from fides.api.util.cache import get_cache, get_encryption_cache_key
from fides.api.util.cache import get_dsr_cache_store
from fides.api.util.encryption.aes_gcm_encryption_scheme import (
encrypt_to_bytes_verify_secrets_length,
)
Expand All @@ -19,15 +19,17 @@ def encrypt_access_request_results(data: Union[str, bytes], request_id: str) ->
Returns:
str: The encrypted data as a string
"""
cache = get_cache()
encryption_cache_key = get_encryption_cache_key(
privacy_request_id=request_id,
encryption_attr="key",
)
if isinstance(data, bytes):
data = data.decode(CONFIG.security.encoding)

encryption_key: Optional[str] = cache.get(encryption_cache_key)
store = get_dsr_cache_store(request_id)
raw = store.get_encryption("key")
if raw is None:
return data
if isinstance(raw, bytes):
encryption_key = raw.decode(CONFIG.security.encoding)
else:
encryption_key = str(raw)
if not encryption_key:
return data

Expand Down
Loading
Loading