Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ lazy_static = "1.5.0"
anyhow = "1.0.100"
parking_lot = { version = "0.12.4", features = ["arc_lock", "send_guard"] }
async-trait = "0.1.77"
futures-util = "0.3.31"
4 changes: 2 additions & 2 deletions docs/deduplication.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ File-level locking ensures consistency during concurrent operations:
// - File coordination: tokio::sync::RwLock (async, for I/O)

// PUT/DELETE operations
let lock = locks.prepare_lock(file_lock(bucket, path));
let lock = locks.prepare_lock(file_lock(bucket, path)).await;
let _guard = lock.acquire_exclusive().await;
// ... perform write operation ...
// Guard drops, lock released

// GET operations
let lock = locks.prepare_lock(file_lock(bucket, path));
let lock = locks.prepare_lock(file_lock(bucket, path)).await;
let _guard = lock.acquire_shared().await;
// ... perform read operation ...
// Guard drops, lock released
Expand Down
51 changes: 30 additions & 21 deletions src/locks/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::locks::{ExclusiveLockGuard, Lock, LockStorage, SharedLockGuard};
use async_trait::async_trait;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{RwLock as TokioRwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::task::spawn_blocking;

// HashMap management: parking_lot (sync, fast, held briefly)
type LockMap = Arc<parking_lot::RwLock<HashMap<String, Arc<TokioRwLock<()>>>>>;
Expand Down Expand Up @@ -45,24 +46,32 @@ impl<'a> Drop for LockedKey<'a> {
}

impl MemoryLocks {
fn get_or_create_lock(&self, key: String) -> Arc<TokioRwLock<()>> {
let mut locks = self.locks.write();
locks
.entry(key)
.or_insert_with(|| Arc::new(TokioRwLock::new(())))
.clone()
async fn get_or_create_lock(&self, key: String) -> Arc<TokioRwLock<()>> {
let locks = self.locks.clone();
// Because `.write()` returns guard with reference to a locked `RwLock`, we need to block on whole body.
// `tokio::task::block_in_place()` could be different approach, blocking only `self.locks.write()`, without needing to `.clone()` locks or worrying about lifetimes.
spawn_blocking(move || {
let mut locks = locks.write();
locks
.entry(key)
.or_insert_with(|| Arc::new(TokioRwLock::new(())))
.clone()
})
.await
.expect("`parking_lot::RwLock::write()` panicked")
}
}

#[async_trait]
impl LockStorage for MemoryLocks {
fn new() -> Box<Self> {
Box::new(Self {
locks: Arc::new(parking_lot::RwLock::new(HashMap::new())),
})
}

fn prepare_lock<'a>(&'a self, key: String) -> Box<dyn Lock + 'a + Send> {
let lock = self.get_or_create_lock(key.clone());
async fn prepare_lock<'a>(&'a self, key: String) -> Box<dyn Lock + 'a + Send> {
let lock = self.get_or_create_lock(key.clone()).await;
Box::new(LockedKey {
lock,
parent: self,
Expand All @@ -80,7 +89,7 @@ mod tests {
#[tokio::test]
async fn assert_locks_compile() {
let memory = MemoryLocks::new();
let lock = memory.prepare_lock("1".into());
let lock = memory.prepare_lock("1".into()).await;
let _guard = lock.acquire_exclusive().await;
}

Expand All @@ -93,7 +102,7 @@ mod tests {
let memory = memory.clone();
let tx = tx.clone();
tokio::spawn(async move {
let lock = memory.prepare_lock("key1".into());
let lock = memory.prepare_lock("key1".into()).await;
let _guard = lock.acquire_shared().await;
tx.send(format!("acquired_{}", i)).await.unwrap();
sleep(Duration::from_millis(50)).await;
Expand Down Expand Up @@ -121,7 +130,7 @@ mod tests {
let memory1 = memory.clone();
let tx1 = tx.clone();
tokio::spawn(async move {
let lock = memory1.prepare_lock("key1".into());
let lock = memory1.prepare_lock("key1".into()).await;
let _guard = lock.acquire_exclusive().await;
tx1.send("task1_acquired").await.unwrap();
sleep(Duration::from_millis(100)).await;
Expand All @@ -133,7 +142,7 @@ mod tests {
let memory2 = memory.clone();
let tx2 = tx.clone();
tokio::spawn(async move {
let lock = memory2.prepare_lock("key1".into());
let lock = memory2.prepare_lock("key1".into()).await;
let _guard = lock.acquire_exclusive().await;
tx2.send("task2_acquired").await.unwrap();
});
Expand Down Expand Up @@ -161,8 +170,8 @@ mod tests {
let memory1 = memory.clone();
let tx1 = tx.clone();
tokio::spawn(async move {
let lock = memory1.prepare_lock("key1".into());
let _guard = lock.acquire_shared();
let lock = memory1.prepare_lock("key1".into()).await;
let _guard = lock.acquire_shared().await;
tx1.send("shared_acquired").await.unwrap();
sleep(Duration::from_millis(100)).await;
tx1.send("shared_released").await.unwrap();
Expand All @@ -173,7 +182,7 @@ mod tests {
let memory2 = memory.clone();
let tx2 = tx.clone();
tokio::spawn(async move {
let lock = memory2.prepare_lock("key1".into());
let lock = memory2.prepare_lock("key1".into()).await;
let _guard = lock.acquire_exclusive().await;
tx2.send("exclusive_acquired").await.unwrap();
});
Expand All @@ -198,7 +207,7 @@ mod tests {
let memory = Arc::new(*MemoryLocks::new());

{
let lock = memory.prepare_lock("cleanup_key".into());
let lock = memory.prepare_lock("cleanup_key".into()).await;
let _guard = lock.acquire_exclusive().await;
}

Expand All @@ -219,7 +228,7 @@ mod tests {
let memory1 = memory.clone();
let tx1 = tx.clone();
tokio::spawn(async move {
let lock = memory1.prepare_lock("key1".into());
let lock = memory1.prepare_lock("key1".into()).await;
let _guard = lock.acquire_exclusive().await;
tx1.send("key1_acquired").await.unwrap();
sleep(Duration::from_millis(100)).await;
Expand All @@ -231,7 +240,7 @@ mod tests {
let memory2 = memory.clone();
let tx2 = tx.clone();
tokio::spawn(async move {
let lock = memory2.prepare_lock("key2".into());
let lock = memory2.prepare_lock("key2".into()).await;
let _guard = lock.acquire_exclusive().await;
tx2.send("key2_acquired").await.unwrap();
});
Expand All @@ -255,7 +264,7 @@ mod tests {
let memory1 = memory.clone();
let tx1 = tx.clone();
tokio::spawn(async move {
let lock = memory1.prepare_lock("shared_key".into());
let lock = memory1.prepare_lock("shared_key".into()).await;
let _guard = lock.acquire_exclusive().await;
tx1.send("task1_acquired").await.unwrap();
sleep(Duration::from_millis(50)).await;
Expand All @@ -267,7 +276,7 @@ mod tests {
let memory2 = memory.clone();
let tx2 = tx.clone();
tokio::spawn(async move {
let lock = memory2.prepare_lock("shared_key".into());
let lock = memory2.prepare_lock("shared_key".into()).await;
let _guard = lock.acquire_exclusive().await;
tx2.send("task2_acquired").await.unwrap();
sleep(Duration::from_millis(100)).await;
Expand All @@ -279,7 +288,7 @@ mod tests {
let memory3 = memory.clone();
let tx3 = tx.clone();
tokio::spawn(async move {
let lock = memory3.prepare_lock("shared_key".into());
let lock = memory3.prepare_lock("shared_key".into()).await;
let _guard = lock.acquire_exclusive().await;
tx3.send("task3_acquired").await.unwrap();
});
Expand Down
7 changes: 4 additions & 3 deletions src/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ pub(crate) trait Lock {
async fn acquire_exclusive<'a>(&'a self) -> Box<dyn ExclusiveLockGuard<'a> + Send + 'a>;
}

#[async_trait]
pub(crate) trait LockStorage {
fn new() -> Box<Self>;

fn prepare_lock<'a>(&'a self, key: String) -> Box<dyn Lock + 'a + Send>;
async fn prepare_lock<'a>(&'a self, key: String) -> Box<dyn Lock + 'a + Send>;
}

#[allow(private_interfaces)]
Expand All @@ -59,9 +60,9 @@ impl LocksStorage {
}
}

pub(crate) fn prepare_lock<'a>(&'a self, key: String) -> Box<dyn Lock + 'a + Send> {
pub(crate) async fn prepare_lock<'a>(&'a self, key: String) -> Box<dyn Lock + 'a + Send> {
match self {
LocksStorage::Memory(memory_locks) => memory_locks.prepare_lock(key),
LocksStorage::Memory(memory_locks) => memory_locks.prepare_lock(key).await,
}
}
}
11 changes: 5 additions & 6 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::AppState;
use crate::filetracker_client::{FileMetadata, FiletrackerClient};
use crate::routes::ft::storage_helpers;
use anyhow::Result;
use futures_util::future::join_all;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tracing::{error, info, warn};
Expand Down Expand Up @@ -104,10 +105,8 @@ pub async fn migrate_all_files(
}

// Wait for this batch to complete before moving to next batch
// TODO: futures_util::join_all;
for handle in handles {
let _ = handle.await;
}
// TODO: Propagate errors? (Tokio returns `Err`, when thread from `JoinHandle` panicked).
let _ = join_all(handles).await;
}

let migrated_count = *migrated.lock().await;
Expand Down Expand Up @@ -163,7 +162,7 @@ pub async fn migrate_single_file_from_metadata(
// Acquire file lock
let lock_key = crate::locks::file_lock(&app_state.bucket_name, path);
let locks = &app_state.locks;
let lock = locks.prepare_lock(lock_key);
let lock = locks.prepare_lock(lock_key).await;
let _guard = lock.acquire_exclusive().await;

// Recheck if file was already migrated after acquiring lock (race condition protection)
Expand Down Expand Up @@ -306,7 +305,7 @@ async fn migrate_single_file(
// Acquire file lock
let lock_key = crate::locks::file_lock(&app_state.bucket_name, path);
let locks_storage = &app_state.locks;
let lock = locks_storage.prepare_lock(lock_key);
let lock = locks_storage.prepare_lock(lock_key).await;
let _guard = lock.acquire_exclusive().await;

// Recheck if file was already migrated after acquiring lock (race condition protection)
Expand Down
2 changes: 1 addition & 1 deletion src/routes/ft/delete_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub async fn ft_delete_file(
// 2. Acquire file lock (exclusive for write operation)
let lock_key = locks::file_lock(&state.bucket_name, path);
let locks_storage = &state.locks;
let lock = locks_storage.prepare_lock(lock_key);
let lock = locks_storage.prepare_lock(lock_key).await;
let _guard = lock.acquire_exclusive().await;

// 3. Check if file exists
Expand Down
2 changes: 1 addition & 1 deletion src/routes/ft/get_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn ft_get_file(
// 1. Acquire file lock (shared lock for read operation)
let lock_key = locks::file_lock(&state.bucket_name, path);
let locks_storage = &state.locks;
let lock = locks_storage.prepare_lock(lock_key);
let lock = locks_storage.prepare_lock(lock_key).await;
let guard = lock.acquire_shared().await;

// 2. Check if file exists and get metadata
Expand Down
2 changes: 1 addition & 1 deletion src/routes/ft/put_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn ft_put_file(
// 4. Acquire file lock
let lock_key = locks::file_lock(&state.bucket_name, path);
let locks_storage = &state.locks;
let lock = locks_storage.prepare_lock(lock_key);
let lock = locks_storage.prepare_lock(lock_key).await;
let _guard = lock.acquire_exclusive().await;

// 5. Check existing version (matching original logic)
Expand Down