From fbb613152666ba3ef20f9373404c567e3894ae6e Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 02:05:49 +0800 Subject: [PATCH 01/13] chore: update tokio to 1.37 --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 529e06f..e17b587 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,11 @@ include = ["**/*.rs", "Cargo.toml"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "0.2", features = ["fs", "blocking", "rt-core"] } +tokio = { version = "1.37", features = ["fs"] } fs3 = "0.5.0" futures-lite = "1.11.3" [dev-dependencies] -tokio = { version = "0.2", features = ["macros"] } +tokio = { version = "1.37", features = ["macros"] } fork = "0.1.18" tempfile = "3.2.0" From 9644653e046ab4545af10f9373af4e26b629f240 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 02:23:09 +0800 Subject: [PATCH 02/13] feat: align definition with tokio 1.37 --- Cargo.toml | 7 ++- src/lib.rs | 104 ++++++++++++++++++++------------------- tests/async_file_lock.rs | 51 ++++++++++--------- 3 files changed, 84 insertions(+), 78 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e17b587..2f73843 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,12 @@ include = ["**/*.rs", "Cargo.toml"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.37", features = ["fs"] } +tokio = { version = "1.37", features = [ + "fs", + "rt", + "rt-multi-thread", + "io-util", +] } fs3 = "0.5.0" futures-lite = "1.11.3" diff --git a/src/lib.rs b/src/lib.rs index f7543b2..b20e036 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,19 @@ #![deny(unused_must_use)] // #![cfg_attr(test, feature(test))] -use std::task::{Context, Poll}; -use std::pin::Pin; -use std::fmt::Formatter; +use futures_lite::{ready, FutureExt}; use std::fmt::Debug; +use std::fmt::Formatter; use std::future::Future; -use std::io::{Error, Result, SeekFrom, Seek}; +use std::io::{Error, Result, Seek, SeekFrom}; +use std::mem::MaybeUninit; +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; use tokio::task::{spawn_blocking, JoinHandle}; -use futures_lite::{ready, FutureExt}; -use fs3::FileExt; -use std::path::Path; -use std::mem::MaybeUninit; +use {fs3::FileExt, tokio::io::ReadBuf}; /// Locks a file asynchronously. /// Auto locks a file if any read or write methods are called. If [Self::lock_exclusive] @@ -40,7 +40,13 @@ impl FileLock { /// Opens a file in read and write mode that is unlocked. // This function will create a file if it does not exist, and will truncate it if it does. pub async fn create(path: impl AsRef) -> Result { - let file = OpenOptions::new().write(true).read(true).create(true).truncate(true).open(path).await?; + let file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .truncate(true) + .open(path) + .await?; Ok(FileLock::new_tokio(file).await) } @@ -61,7 +67,7 @@ impl FileLock { result: None, locking_fut: None, unlocking_fut: None, - seek_fut: None + seek_fut: None, } } @@ -76,7 +82,7 @@ impl FileLock { result: None, locking_fut: None, unlocking_fut: None, - seek_fut: None + seek_fut: None, } } @@ -96,11 +102,14 @@ impl FileLock { panic!("File already locked."); } self.is_manually_locked = true; - self.unlocked_file.as_mut().unwrap().try_lock_exclusive().map(|_| { - self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); - self.state = State::Locked; - - }) + self.unlocked_file + .as_mut() + .unwrap() + .try_lock_exclusive() + .map(|_| { + self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); + self.state = State::Locked; + }) } /// Locks the file for reading until [`Self::unlock`] is called. @@ -119,11 +128,14 @@ impl FileLock { panic!("File already locked."); } self.is_manually_locked = true; - self.unlocked_file.as_mut().unwrap().try_lock_shared().map(|_| { - self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); - self.state = State::Locked; - - }) + self.unlocked_file + .as_mut() + .unwrap() + .try_lock_shared() + .map(|_| { + self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); + self.state = State::Locked; + }) } /// Unlocks the file. @@ -173,9 +185,7 @@ impl FileLock { return file.sync_all().await; } let file = self.unlocked_file.take().unwrap(); - let (result, file) = spawn_blocking(|| { - (file.sync_all(), file) - }).await.unwrap(); + let (result, file) = spawn_blocking(|| (file.sync_all(), file)).await.unwrap(); self.unlocked_file = Some(file); result } @@ -212,9 +222,7 @@ impl FileLock { return file.sync_data().await; } let file = self.unlocked_file.take().unwrap(); - let (result, file) = spawn_blocking(|| { - (file.sync_data(), file) - }).await.unwrap(); + let (result, file) = spawn_blocking(|| (file.sync_data(), file)).await.unwrap(); self.unlocked_file = Some(file); result } @@ -316,7 +324,7 @@ macro_rules! poll_loop { SeekFrom::Current(0) => $self.state = State::Working, _ => { let mode = $self.mode; - $self.as_mut().start_seek($cx, mode); + $self.as_mut().start_seek(mode); $self.state = State::Seeking; } }, @@ -324,7 +332,7 @@ macro_rules! poll_loop { // println!("working"); $working // println!("worked"); - }, + } State::Locking => { if let Err(e) = ready!($self.$lock($cx)) { return Poll::Ready(Err(e)); @@ -403,17 +411,19 @@ impl AsyncWrite for FileLock { } } -impl AsyncRead for FileLock { +impl FileLock { unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit]) -> bool { false } +} +impl AsyncRead for FileLock { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - poll_loop! {self, cx, |x| x as usize, poll_shared_lock, + buf: &mut ReadBuf, + ) -> Poll> { + poll_loop! {self, cx, /* |x| x as usize */|_| (), poll_shared_lock, State::Working => { let result = ready!(Pin::new(self.locked_file.as_mut().unwrap()) .as_mut() @@ -423,7 +433,7 @@ impl AsyncRead for FileLock { return Poll::Ready(result); } else { self.state = State::Unlocking; - self.result = Some(result.map(|x| x as u64)); + self.result = Some(result.map(|x| 0u64/* x as u64 */)); } } }; @@ -431,28 +441,21 @@ impl AsyncRead for FileLock { } impl AsyncSeek for FileLock { - fn start_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - position: SeekFrom, - ) -> Poll> { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<()> { if let Some(ref mut locked_file) = self.locked_file { - return Pin::new(locked_file) - .as_mut() - .start_seek(cx, position); + return Pin::new(locked_file).as_mut().start_seek(position); } - let mut file = self.unlocked_file.take().expect("Cannot seek while in the process of locking/unlocking/seeking"); - self.seek_fut = Some(spawn_blocking(move || { - (file.seek(position), file) - })); - return Poll::Ready(Ok(())); + let mut file = self + .unlocked_file + .take() + .expect("Cannot seek while in the process of locking/unlocking/seeking"); + self.seek_fut = Some(spawn_blocking(move || (file.seek(position), file))); + return Ok(()); } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(ref mut locked_file) = self.locked_file { - return Pin::new(locked_file) - .as_mut() - .poll_complete(cx) + return Pin::new(locked_file).as_mut().poll_complete(cx); } let (result, file) = ready!(Pin::new(self.seek_fut.as_mut().unwrap()).poll(cx)).unwrap(); self.seek_fut = None; @@ -547,4 +550,3 @@ impl<'a> Future for UnlockFuture<'a> { self.file_lock.poll_unlock(cx) } } - diff --git a/tests/async_file_lock.rs b/tests/async_file_lock.rs index dc65048..6213f7c 100644 --- a/tests/async_file_lock.rs +++ b/tests/async_file_lock.rs @@ -1,18 +1,18 @@ #![deny(unused_must_use)] use async_file_lock::FileLock; -use tokio::test; +use fork::{fork, Fork}; use std::io::Result; -use tempfile::NamedTempFile; -use tokio::io::{AsyncWriteExt, SeekFrom, AsyncSeekExt, AsyncReadExt}; use std::time::Instant; -use fork::{fork, Fork}; +use tempfile::NamedTempFile; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tokio::test; fn file() -> FileLock { FileLock::new_std(tempfile::tempfile().unwrap()) } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn normal1() -> Result<()> { let tmp_path = NamedTempFile::new()?.into_temp_path(); // let tmp_path = PathBuf::from("/tmp/a"); @@ -39,13 +39,13 @@ async fn normal1() -> Result<()> { file.write(b"b").await?; println!("done"); std::process::exit(0); - }, + } Err(_) => panic!("unable to fork"), } Ok(()) } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn append_mode() -> Result<()> { let tmp_path = NamedTempFile::new()?.into_temp_path(); match fork() { @@ -72,13 +72,13 @@ async fn append_mode() -> Result<()> { file.flush().await?; // println!("done"); std::process::exit(0); - }, + } Err(_) => panic!("unable to fork"), } Ok(()) } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn lock_shared() -> Result<()> { let tmp_path = NamedTempFile::new()?.into_temp_path(); match fork() { @@ -94,14 +94,13 @@ async fn lock_shared() -> Result<()> { file.lock_shared().await?; std::thread::sleep(std::time::Duration::from_millis(1000)); std::process::exit(0); - }, + } Err(_) => panic!("unable to fork"), } Ok(()) } - -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn lock_exclusive() -> Result<()> { let tmp_path = NamedTempFile::new()?.into_temp_path(); match fork() { @@ -119,13 +118,13 @@ async fn lock_exclusive() -> Result<()> { println!("child {}", tmp_path.exists()); std::thread::sleep(std::time::Duration::from_millis(1000)); std::process::exit(0); - }, + } Err(_) => panic!("unable to fork"), } Ok(()) } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn lock_exclusive_shared() -> Result<()> { let tmp_path = NamedTempFile::new()?.into_temp_path(); match fork() { @@ -141,13 +140,13 @@ async fn lock_exclusive_shared() -> Result<()> { file.lock_shared().await?; std::thread::sleep(std::time::Duration::from_millis(1000)); std::process::exit(0); - }, + } Err(_) => panic!("unable to fork"), } Ok(()) } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn drop_lock_exclusive() -> Result<()> { let tmp_path = NamedTempFile::new()?.into_temp_path(); match fork() { @@ -164,13 +163,13 @@ async fn drop_lock_exclusive() -> Result<()> { drop(file); std::thread::sleep(std::time::Duration::from_millis(1000)); std::process::exit(0); - }, + } Err(_) => panic!("unable to fork"), } Ok(()) } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn exclusive_locking_locked_file_panics() { let mut file = file(); @@ -178,7 +177,7 @@ async fn exclusive_locking_locked_file_panics() { file.lock_exclusive().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn shared_locking_locked_file_panics() { let mut file = file(); @@ -186,7 +185,7 @@ async fn shared_locking_locked_file_panics() { file.lock_shared().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn shared_locking_exclusive_file_panics() { let mut file = file(); @@ -194,7 +193,7 @@ async fn shared_locking_exclusive_file_panics() { file.lock_shared().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn exclusive_locking_shared_file_panics() { let mut file = file(); @@ -202,14 +201,14 @@ async fn exclusive_locking_shared_file_panics() { file.lock_exclusive().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn unlocking_file_panics() { let mut file = file(); file.unlock().await; } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn unlocking_unlocked_file_panics() { let mut file = file(); @@ -218,7 +217,7 @@ async fn unlocking_unlocked_file_panics() { file.unlock().await; } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn file_stays_locked() { let mut file = file(); file.lock_exclusive().await.unwrap(); @@ -226,10 +225,10 @@ async fn file_stays_locked() { file.unlock().await; } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn file_auto_unlocks() { let mut file = file(); file.write(b"a").await.unwrap(); file.unlock().await; -} \ No newline at end of file +} From ac90a14f3745d220dbe8e960a67036f4c7a8fd45 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 02:24:38 +0800 Subject: [PATCH 03/13] chore: create github action for rust --- .github/workflows/rust.yml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 .github/workflows/rust.yml diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..31000a2 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,22 @@ +name: Rust + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Build + run: cargo build --verbose + - name: Run tests + run: cargo test --verbose From 305c5b21104f1b29b7017f043d2422a38fbd650f Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 02:25:36 +0800 Subject: [PATCH 04/13] fix: tokio import in bench --- benches/main.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/benches/main.rs b/benches/main.rs index fe59969..a48cf1e 100644 --- a/benches/main.rs +++ b/benches/main.rs @@ -3,22 +3,22 @@ extern crate test; -use test::Bencher; +use async_file_lock::FileLock; use tempfile::NamedTempFile; +use test::Bencher; use tokio::fs::File; -use tokio::prelude::io::AsyncWriteExt; -use async_file_lock::FileLock; +use tokio::io::AsyncWriteExt; #[bench] fn tokio_write(b: &mut Bencher) { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut file = rt.block_on(async { - File::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap() + File::create(NamedTempFile::new().unwrap().into_temp_path()) + .await + .unwrap() }); b.iter(|| { - rt.block_on(async { - file.write(b"a") - }); + rt.block_on(async { file.write(b"a") }); }) } @@ -26,14 +26,14 @@ fn tokio_write(b: &mut Bencher) { fn normal_write(b: &mut Bencher) { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut file = rt.block_on(async { - let mut file = FileLock::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap(); + let mut file = FileLock::create(NamedTempFile::new().unwrap().into_temp_path()) + .await + .unwrap(); file.lock_exclusive().await; file }); b.iter(|| { - rt.block_on(async { - file.write(b"a") - }); + rt.block_on(async { file.write(b"a") }); }) } @@ -41,11 +41,11 @@ fn normal_write(b: &mut Bencher) { fn auto_write(b: &mut Bencher) { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut file = rt.block_on(async { - FileLock::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap() + FileLock::create(NamedTempFile::new().unwrap().into_temp_path()) + .await + .unwrap() }); b.iter(|| { - rt.block_on(async { - file.write(b"a") - }); + rt.block_on(async { file.write(b"a") }); }) } From 1b1cf769ff89fe99d9e955b05c0da9d3a947c741 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 02:55:31 +0800 Subject: [PATCH 05/13] chore: remove unused fn --- src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b20e036..8e6732a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -411,11 +411,11 @@ impl AsyncWrite for FileLock { } } -impl FileLock { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit]) -> bool { - false - } -} +// impl FileLock { +// unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit]) -> bool { +// false +// } +// } impl AsyncRead for FileLock { fn poll_read( @@ -433,7 +433,7 @@ impl AsyncRead for FileLock { return Poll::Ready(result); } else { self.state = State::Unlocking; - self.result = Some(result.map(|x| 0u64/* x as u64 */)); + self.result = Some(result.map(|_| 0u64/* x as u64 */)); } } }; From a88f9d9e036a04647f5d9a067711a55eddf060f4 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 03:15:24 +0800 Subject: [PATCH 06/13] chore: remove unused import --- src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 8e6732a..46136f7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,7 +6,6 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::future::Future; use std::io::{Error, Result, Seek, SeekFrom}; -use std::mem::MaybeUninit; use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; From e8ddffdd7c697f5820035ddd29012bb75a1ee073 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 03:16:04 +0800 Subject: [PATCH 07/13] feat: rewrite lock_exclusive test for stable test result --- Cargo.toml | 2 + tests/async_file_lock.rs | 163 +++++++++++++++++++++++++++++---------- 2 files changed, 123 insertions(+), 42 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2f73843..2b6440a 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,8 @@ tokio = { version = "1.37", features = [ "rt", "rt-multi-thread", "io-util", + "sync", + "time", ] } fs3 = "0.5.0" futures-lite = "1.11.3" diff --git a/tests/async_file_lock.rs b/tests/async_file_lock.rs index 6213f7c..fb7261b 100644 --- a/tests/async_file_lock.rs +++ b/tests/async_file_lock.rs @@ -1,12 +1,16 @@ #![deny(unused_must_use)] +#![feature(assert_matches)] -use async_file_lock::FileLock; -use fork::{fork, Fork}; use std::io::Result; use std::time::Instant; use tempfile::NamedTempFile; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::test; +use {async_file_lock::FileLock, std::assert_matches::assert_matches}; +use { + fork::{fork, Fork}, + std::path::PathBuf, +}; fn file() -> FileLock { FileLock::new_std(tempfile::tempfile().unwrap()) @@ -101,49 +105,124 @@ async fn lock_shared() -> Result<()> { } #[test(flavor = "multi_thread")] -async fn lock_exclusive() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - println!("parent {}", tmp_path.exists()); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_exclusive().await?; - assert!(instant.elapsed().as_millis() > 800); - } - Ok(Fork::Child) => { - let mut file = FileLock::create(&tmp_path).await?; - file.lock_exclusive().await?; - println!("child {}", tmp_path.exists()); - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - } - Err(_) => panic!("unable to fork"), - } - Ok(()) +async fn lock_exclusive() { + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_exclusive(), Err(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_exclusive(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_exclusive().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // unlock + file.unlock().await; + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } #[test(flavor = "multi_thread")] -async fn lock_exclusive_shared() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_exclusive().await?; - assert!(instant.elapsed().as_millis() > 800); - } - Ok(Fork::Child) => { - let mut file = FileLock::open(&tmp_path).await?; - file.lock_shared().await?; - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - } - Err(_) => panic!("unable to fork"), - } - Ok(()) +async fn lock_exclusive_shared() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // match fork() { + // Ok(Fork::Parent(_)) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // let mut file = FileLock::open(&tmp_path).await?; + // let instant = Instant::now(); + // file.lock_exclusive().await?; + // assert!(instant.elapsed().as_millis() > 800); + // } + // Ok(Fork::Child) => { + // let mut file = FileLock::open(&tmp_path).await?; + // file.lock_shared().await?; + // std::thread::sleep(std::time::Duration::from_millis(1000)); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_exclusive(), Err(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_exclusive(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_shared().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // unlock + file.unlock().await; + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } #[test(flavor = "multi_thread")] From 7a3885d4cee6be3b23b63c090526a37d50147666 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 03:16:51 +0800 Subject: [PATCH 08/13] feat: fix rust channel to nightly --- rust-toolchain.toml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 rust-toolchain.toml diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..5d56faf --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" From e767e53f69f6bc255c1c062536d02028d33e61a5 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 03:50:09 +0800 Subject: [PATCH 09/13] fix: don't panic if poll before async seek starts --- src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 46136f7..7577f8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -456,6 +456,10 @@ impl AsyncSeek for FileLock { if let Some(ref mut locked_file) = self.locked_file { return Pin::new(locked_file).as_mut().poll_complete(cx); } + // NOTE: calling this without calling start_seek might return the same result + if let None = self.seek_fut { + return Poll::Ready(Ok(0)); // but we return 0 + } let (result, file) = ready!(Pin::new(self.seek_fut.as_mut().unwrap()).poll(cx)).unwrap(); self.seek_fut = None; self.unlocked_file = Some(file); From dc757ef769a80202c4663ff9a8465f98d07bc04a Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 03:50:51 +0800 Subject: [PATCH 10/13] feat: rewrite tests for stable result --- tests/async_file_lock.rs | 191 +++++++++++++++++++++++++++++---------- 1 file changed, 142 insertions(+), 49 deletions(-) diff --git a/tests/async_file_lock.rs b/tests/async_file_lock.rs index fb7261b..a640e4b 100644 --- a/tests/async_file_lock.rs +++ b/tests/async_file_lock.rs @@ -17,38 +17,84 @@ fn file() -> FileLock { } #[test(flavor = "multi_thread")] -async fn normal1() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - // let tmp_path = PathBuf::from("/tmp/a"); - match fork() { - Ok(Fork::Parent(_)) => { - // println!("parent {}", tmp_path.exists()); - let mut buf = String::new(); - let mut file = FileLock::create(&tmp_path).await?; - // println!("parent {}", tmp_path.exists()); - println!("written {}", file.write(b"a").await?); - std::thread::sleep(std::time::Duration::from_millis(200)); - println!("slept"); - println!("sought {}", file.seek(SeekFrom::Start(0)).await?); - file.read_to_string(&mut buf).await?; - println!("read"); - // // We write at location 0 then it gets overridden. - assert_eq!(buf, "b"); - } - Ok(Fork::Child) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - println!("child {}", tmp_path.exists()); - let mut file = FileLock::open(&tmp_path).await?; - println!("child opened"); - file.write(b"b").await?; - println!("done"); - std::process::exit(0); - } - Err(_) => panic!("unable to fork"), - } - Ok(()) +async fn normal1() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // // let tmp_path = PathBuf::from("/tmp/a"); + // match fork() { + // Ok(Fork::Parent(_)) => { + // // println!("parent {}", tmp_path.exists()); + // let mut buf = String::new(); + // let mut file = FileLock::create(&tmp_path).await?; + // // println!("parent {}", tmp_path.exists()); + // println!("writen {}", file.write(b"a").await?); + // std::thread::sleep(std::time::Duration::from_millis(200)); + // println!("slept"); + // println!("sought {}", file.seek(SeekFrom::Start(0)).await?); + // file.read_to_string(&mut buf).await?; + // println!("read"); + // // // We write at location 0 then it gets overridden. + // assert_eq!(buf, "b"); + // } + // Ok(Fork::Child) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // println!("child {}", tmp_path.exists()); + // let mut file = FileLock::open(&tmp_path).await?; + // println!("child opened"); + // file.write(b"b").await?; + // println!("done"); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (writen_tx, writen_rx) = tokio::sync::oneshot::channel::<()>(); + let (overwriten_tx, overwriten_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // open file + let mut file = FileLock::open(&tmp_path).await.unwrap(); + assert!(tmp_path.exists()); + + // write to file + assert_matches!(file.write(b"a").await, Ok(_)); + assert_matches!(writen_tx.send(()), Ok(())); + + // wait for overwriten + assert_matches!(overwriten_rx.await, Ok(())); + + // read content + let mut buf = String::new(); + assert_matches!(file.read_to_string(&mut buf).await, Ok(_)); + + // assert it has been overwriten + assert_eq!(buf, "b"); + }); + + // main thread + { + // open file + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // wait for writen signal + assert_matches!(writen_rx.await, Ok(())); + + // overwrite + assert_matches!(file.write(b"b").await, Ok(_)); + + // signal + assert_matches!(overwriten_tx.send(()), Ok(())); + }; } +#[ignore] #[test(flavor = "multi_thread")] async fn append_mode() -> Result<()> { let tmp_path = NamedTempFile::new()?.into_temp_path(); @@ -83,25 +129,72 @@ async fn append_mode() -> Result<()> { } #[test(flavor = "multi_thread")] -async fn lock_shared() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_shared().await?; - assert!(instant.elapsed().as_millis() < 100); - } - Ok(Fork::Child) => { - let mut file = FileLock::open(&tmp_path).await?; - file.lock_shared().await?; - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - } - Err(_) => panic!("unable to fork"), - } - Ok(()) +async fn lock_shared() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // match fork() { + // Ok(Fork::Parent(_)) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // let mut file = FileLock::open(&tmp_path).await?; + // let instant = Instant::now(); + // file.lock_shared().await?; + // assert!(instant.elapsed().as_millis() < 100); + // } + // Ok(Fork::Child) => { + // let mut file = FileLock::open(&tmp_path).await?; + // file.lock_shared().await?; + // std::thread::sleep(std::time::Duration::from_millis(1000)); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_shared(), Ok(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_shared(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_shared().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // unlock + file.unlock().await; + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } #[test(flavor = "multi_thread")] From b22682221ce62bd69c74bba43d5b9b6e4e997369 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 03:52:40 +0800 Subject: [PATCH 11/13] feat: rewrite drop_lock_exclusive --- tests/async_file_lock.rs | 68 ++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/tests/async_file_lock.rs b/tests/async_file_lock.rs index a640e4b..72b42d2 100644 --- a/tests/async_file_lock.rs +++ b/tests/async_file_lock.rs @@ -319,26 +319,54 @@ async fn lock_exclusive_shared() { } #[test(flavor = "multi_thread")] -async fn drop_lock_exclusive() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_exclusive().await?; - assert!(instant.elapsed().as_millis() < 100); - } - Ok(Fork::Child) => { - let mut file = FileLock::open(&tmp_path).await?; - file.lock_exclusive().await?; - drop(file); - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - } - Err(_) => panic!("unable to fork"), - } - Ok(()) +async fn drop_lock_exclusive() { + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_exclusive(), Err(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_exclusive(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_exclusive().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // drop lock + drop(file); + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } #[test(flavor = "multi_thread")] From 85682ee2c2745c1fb6ff359e9c02ef04a7f01e43 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 03:54:37 +0800 Subject: [PATCH 12/13] fix: fix doctests --- src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7577f8f..be30cfc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -165,11 +165,11 @@ impl FileLock { /// /// ```no_run /// use tokio::fs::File; - /// use tokio::prelude::*; + /// use tokio::io::AsyncWriteExt; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; - /// file.write_all(b"hello, world!").await?; + /// file.write(b"hello, world!").await?; /// file.sync_all().await?; /// # Ok(()) /// # } @@ -202,11 +202,11 @@ impl FileLock { /// /// ```no_run /// use tokio::fs::File; - /// use tokio::prelude::*; + /// use tokio::io::AsyncWriteExt; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; - /// file.write_all(b"hello, world!").await?; + /// file.write(b"hello, world!").await?; /// file.sync_data().await?; /// # Ok(()) /// # } From 4a8f90315f70c92f90a50f2e9aaa1a538f2c8a76 Mon Sep 17 00:00:00 2001 From: juicyenc Date: Thu, 18 Apr 2024 04:08:43 +0800 Subject: [PATCH 13/13] fix: stablize append_mode test --- tests/async_file_lock.rs | 124 +++++++++++++++++++++++++++------------ 1 file changed, 87 insertions(+), 37 deletions(-) diff --git a/tests/async_file_lock.rs b/tests/async_file_lock.rs index 72b42d2..caeef4d 100644 --- a/tests/async_file_lock.rs +++ b/tests/async_file_lock.rs @@ -1,16 +1,11 @@ #![deny(unused_must_use)] #![feature(assert_matches)] -use std::io::Result; -use std::time::Instant; +use std::path::PathBuf; use tempfile::NamedTempFile; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; use tokio::test; use {async_file_lock::FileLock, std::assert_matches::assert_matches}; -use { - fork::{fork, Fork}, - std::path::PathBuf, -}; fn file() -> FileLock { FileLock::new_std(tempfile::tempfile().unwrap()) @@ -94,38 +89,93 @@ async fn normal1() { }; } -#[ignore] #[test(flavor = "multi_thread")] -async fn append_mode() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - let mut buf = String::new(); - let mut file = FileLock::create(&tmp_path).await?; - file.set_seeking_mode(SeekFrom::End(0)); - file.write(b"a").await?; - // println!("parent {}", tmp_path.exists()); - std::thread::sleep(std::time::Duration::from_millis(200)); - file.write(b"a").await?; - file.seek(SeekFrom::Start(0)).await?; - // Turn off auto seeking mode - file.set_seeking_mode(SeekFrom::Current(0)); - file.read_to_string(&mut buf).await?; - // Each file handle has its own position. - assert_eq!(buf, "bba"); - } - Ok(Fork::Child) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - // println!("{}", tmp_path.exists()); - let mut file = FileLock::open(&tmp_path).await?; - file.write(b"bb").await?; - file.flush().await?; - // println!("done"); - std::process::exit(0); - } - Err(_) => panic!("unable to fork"), - } - Ok(()) +async fn append_mode() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // match fork() { + // Ok(Fork::Parent(_)) => { + // let mut buf = String::new(); + // let mut file = FileLock::create(&tmp_path).await?; + // file.set_seeking_mode(SeekFrom::End(0)); + // file.write(b"a").await?; + // // println!("parent {}", tmp_path.exists()); + // std::thread::sleep(std::time::Duration::from_millis(200)); + // file.write(b"a").await?; + // file.seek(SeekFrom::Start(0)).await?; + // // Turn off auto seeking mode + // file.set_seeking_mode(SeekFrom::Current(0)); + // file.read_to_string(&mut buf).await?; + // // Each file handle has its own position. + // assert_eq!(buf, "bba"); + // } + // Ok(Fork::Child) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // // println!("{}", tmp_path.exists()); + // let mut file = FileLock::open(&tmp_path).await?; + // file.write(b"bb").await?; + // file.flush().await?; + // // println!("done"); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (writen_tx, writen_rx) = tokio::sync::oneshot::channel::<()>(); + let (overwriten_tx, overwriten_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // open file + let mut file = FileLock::open(&tmp_path).await.unwrap(); + assert!(tmp_path.exists()); + + file.set_seeking_mode(SeekFrom::End(0)); + + // write to file + assert_matches!(file.write(b"a").await, Ok(_)); + assert_matches!(writen_tx.send(()), Ok(())); + + // wait for overwriten + assert_matches!(overwriten_rx.await, Ok(())); + + // write, it should append to the end + assert_matches!(file.write(b"a").await, Ok(_)); + + assert_matches!(file.seek(SeekFrom::Current(0)).await, Ok(_)); + + // turn off auto seeking mode + file.set_seeking_mode(SeekFrom::Current(0)); + + // read content + let mut buf = String::new(); + assert_matches!(file.read_to_string(&mut buf).await, Ok(_)); + + // assert it has been overwriten + assert_eq!(buf, "bba"); + }); + + // main thread + { + // wait for writen signal + assert_matches!(writen_rx.await, Ok(())); + + // open file + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // overwrite + assert_matches!(file.write(b"bb").await, Ok(_)); + assert_matches!(file.flush().await, Ok(_)); + + // signal + assert_matches!(overwriten_tx.send(()), Ok(())); + }; } #[test(flavor = "multi_thread")]