Skip to content

Commit df16019

Browse files
committed
fix: critical bugs found during storage audit
- Fix list/query methods returning compressed data without decompressing - Fix AuditLog race condition: add write_lock mutex - Fix AtomicCounter underflow: clamp to 0 on negative values - Add integration tests for compression roundtrip - Add backward compatibility test for pre-compression data - Fix clippy warnings in platform-rpc and platform-storage
1 parent 760aaff commit df16019

File tree

3 files changed

+142
-12
lines changed

3 files changed

+142
-12
lines changed

crates/distributed-storage/src/audit.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
use serde::{Deserialize, Serialize};
66
use std::sync::Arc;
7+
use tokio::sync::Mutex;
78

89
use crate::error::{StorageError, StorageResult};
910
use crate::store::{DistributedStore, GetOptions, PutOptions, StorageKey};
@@ -113,6 +114,8 @@ pub struct AuditLog {
113114
storage: Arc<dyn DistributedStore>,
114115
/// Maximum entries per block (to limit storage size)
115116
max_entries_per_block: usize,
117+
/// Write lock to prevent race conditions on read-modify-write
118+
write_lock: Mutex<()>,
116119
}
117120

118121
impl AuditLog {
@@ -121,11 +124,14 @@ impl AuditLog {
121124
Self {
122125
storage,
123126
max_entries_per_block: 10_000,
127+
write_lock: Mutex::new(()),
124128
}
125129
}
126130

127131
/// Append an audit entry
128132
pub async fn append(&self, entry: AuditEntry) -> StorageResult<()> {
133+
let _guard = self.write_lock.lock().await;
134+
129135
// Store by block number
130136
let block_key = StorageKey::new("audit_block", format!("{:016x}", entry.block_number));
131137

crates/distributed-storage/src/index.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,10 @@ impl IndexManager {
281281
}
282282
}
283283

284-
/// Atomic counter for tracking counts
284+
/// Storage-backed counter.
285+
///
286+
/// NOTE: Not truly atomic under concurrent access -- uses read-modify-write
287+
/// without CAS. Safe for single-writer scenarios only.
285288
pub struct AtomicCounter {
286289
storage: Arc<dyn DistributedStore>,
287290
}
@@ -306,12 +309,13 @@ impl AtomicCounter {
306309
Ok(0)
307310
}
308311

309-
/// Increment a counter and return the new value
312+
/// Increment a counter and return the new value.
313+
/// Clamps to 0 on the low end to avoid underflow.
310314
pub async fn increment(&self, namespace: &str, name: &str, delta: i64) -> StorageResult<i64> {
311315
let key = StorageKey::new("counter", format!("{}:{}", namespace, name));
312316

313317
let current = self.get(namespace, name).await? as i64;
314-
let new_value = current.saturating_add(delta);
318+
let new_value = current.saturating_add(delta).max(0);
315319

316320
let data = (new_value as u64).to_le_bytes().to_vec();
317321
self.storage.put(key, data, PutOptions::default()).await?;

crates/distributed-storage/src/tracked.rs

Lines changed: 129 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,22 @@ impl TrackedStorage {
193193
hasher.finalize().into()
194194
}
195195

196+
/// Decompress all items in a list result
197+
fn decompress_list_result(&self, mut result: ListResult) -> StorageResult<ListResult> {
198+
for (_key, value) in &mut result.items {
199+
value.data = self.decompress(&value.data)?;
200+
}
201+
Ok(result)
202+
}
203+
204+
/// Decompress all items in a query result
205+
fn decompress_query_result(&self, mut result: QueryResult) -> StorageResult<QueryResult> {
206+
for (_key, value) in &mut result.items {
207+
value.data = self.decompress(&value.data)?;
208+
}
209+
Ok(result)
210+
}
211+
196212
/// Create an index for a namespace
197213
pub async fn create_index(
198214
&self,
@@ -377,9 +393,11 @@ impl DistributedStore for TrackedStorage {
377393
limit: usize,
378394
continuation_token: Option<&[u8]>,
379395
) -> StorageResult<ListResult> {
380-
self.inner
396+
let result = self
397+
.inner
381398
.list_prefix(namespace, prefix, limit, continuation_token)
382-
.await
399+
.await?;
400+
self.decompress_list_result(result)
383401
}
384402

385403
async fn stats(&self) -> StorageResult<StorageStats> {
@@ -392,9 +410,11 @@ impl DistributedStore for TrackedStorage {
392410
block_id: u64,
393411
limit: usize,
394412
) -> StorageResult<QueryResult> {
395-
self.inner
413+
let result = self
414+
.inner
396415
.list_before_block(namespace, block_id, limit)
397-
.await
416+
.await?;
417+
self.decompress_query_result(result)
398418
}
399419

400420
async fn list_after_block(
@@ -403,9 +423,11 @@ impl DistributedStore for TrackedStorage {
403423
block_id: u64,
404424
limit: usize,
405425
) -> StorageResult<QueryResult> {
406-
self.inner
426+
let result = self
427+
.inner
407428
.list_after_block(namespace, block_id, limit)
408-
.await
429+
.await?;
430+
self.decompress_query_result(result)
409431
}
410432

411433
async fn list_range(
@@ -415,17 +437,20 @@ impl DistributedStore for TrackedStorage {
415437
end_block: u64,
416438
limit: usize,
417439
) -> StorageResult<QueryResult> {
418-
self.inner
440+
let result = self
441+
.inner
419442
.list_range(namespace, start_block, end_block, limit)
420-
.await
443+
.await?;
444+
self.decompress_query_result(result)
421445
}
422446

423447
async fn count_by_namespace(&self, namespace: &str) -> StorageResult<u64> {
424448
self.inner.count_by_namespace(namespace).await
425449
}
426450

427451
async fn query(&self, query: QueryBuilder) -> StorageResult<QueryResult> {
428-
self.inner.query(query).await
452+
let result = self.inner.query(query).await?;
453+
self.decompress_query_result(result)
429454
}
430455

431456
async fn put_with_block(
@@ -481,4 +506,99 @@ mod tests {
481506
let hash2 = TrackedStorage::hash_data(data);
482507
assert_eq!(hash, hash2);
483508
}
509+
510+
#[tokio::test]
511+
async fn test_put_get_roundtrip_with_compression() {
512+
let config = TrackedStorageConfig {
513+
compression: CompressionMode::Lz4,
514+
compression_threshold: 10,
515+
enable_audit: true,
516+
enable_indexing: true,
517+
..Default::default()
518+
};
519+
520+
let storage = crate::local::LocalStorage::in_memory("test".to_string()).unwrap();
521+
let tracked = TrackedStorage::new(Arc::new(storage), config);
522+
tracked.set_block(100);
523+
524+
// Write large data (will be compressed)
525+
let key = StorageKey::new("test", "large-value");
526+
let data = vec![42u8; 500]; // 500 bytes of 42s
527+
tracked
528+
.put(key.clone(), data.clone(), PutOptions::default())
529+
.await
530+
.unwrap();
531+
532+
// Read back - should be decompressed
533+
let result = tracked
534+
.get(&key, GetOptions::default())
535+
.await
536+
.unwrap()
537+
.unwrap();
538+
assert_eq!(result.data, data);
539+
540+
// Verify stats
541+
let stats = tracked.tracked_stats().await;
542+
assert_eq!(stats.total_writes, 1);
543+
assert_eq!(stats.total_reads, 1);
544+
assert!(stats.compression_ratio < 1.0); // Data was compressed
545+
}
546+
547+
#[tokio::test]
548+
async fn test_list_prefix_decompresses() {
549+
let config = TrackedStorageConfig {
550+
compression: CompressionMode::Lz4,
551+
compression_threshold: 10,
552+
enable_audit: false,
553+
enable_indexing: false,
554+
..Default::default()
555+
};
556+
557+
let storage = crate::local::LocalStorage::in_memory("test".to_string()).unwrap();
558+
let tracked = TrackedStorage::new(Arc::new(storage), config);
559+
560+
// Write multiple values
561+
for i in 0..3 {
562+
let key = StorageKey::new("ns", format!("key-{}", i));
563+
let data = vec![i as u8; 500]; // Will be compressed
564+
tracked.put(key, data, PutOptions::default()).await.unwrap();
565+
}
566+
567+
// List prefix should return decompressed data
568+
let result = tracked.list_prefix("ns", None, 10, None).await.unwrap();
569+
assert_eq!(result.items.len(), 3);
570+
for (i, (_key, value)) in result.items.iter().enumerate() {
571+
assert_eq!(value.data.len(), 500);
572+
assert_eq!(value.data[0], i as u8);
573+
}
574+
}
575+
576+
#[tokio::test]
577+
async fn test_backward_compatibility_uncompressed() {
578+
let inner = crate::local::LocalStorage::in_memory("test".to_string()).unwrap();
579+
let inner_arc = Arc::new(inner);
580+
581+
// Write directly to inner (uncompressed, simulating old data)
582+
let key = StorageKey::new("test", "old-data");
583+
let data = b"this is old uncompressed data that was stored before compression".to_vec();
584+
inner_arc
585+
.put(key.clone(), data.clone(), PutOptions::default())
586+
.await
587+
.unwrap();
588+
589+
// Now wrap with TrackedStorage and read
590+
let config = TrackedStorageConfig {
591+
compression: CompressionMode::Lz4,
592+
compression_threshold: 10,
593+
..Default::default()
594+
};
595+
let tracked = TrackedStorage::new(inner_arc, config);
596+
597+
let result = tracked
598+
.get(&key, GetOptions::default())
599+
.await
600+
.unwrap()
601+
.unwrap();
602+
assert_eq!(result.data, data); // Should read old data unchanged
603+
}
484604
}

0 commit comments

Comments
 (0)