diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml new file mode 100644 index 0000000..31000a2 --- /dev/null +++ b/.github/workflows/rust.yml @@ -0,0 +1,22 @@ +name: Rust + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + - name: Build + run: cargo build --verbose + - name: Run tests + run: cargo test --verbose diff --git a/Cargo.toml b/Cargo.toml index 529e06f..2b6440a 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,18 @@ include = ["**/*.rs", "Cargo.toml"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "0.2", features = ["fs", "blocking", "rt-core"] } +tokio = { version = "1.37", features = [ + "fs", + "rt", + "rt-multi-thread", + "io-util", + "sync", + "time", +] } fs3 = "0.5.0" futures-lite = "1.11.3" [dev-dependencies] -tokio = { version = "0.2", features = ["macros"] } +tokio = { version = "1.37", features = ["macros"] } fork = "0.1.18" tempfile = "3.2.0" diff --git a/benches/main.rs b/benches/main.rs index fe59969..a48cf1e 100644 --- a/benches/main.rs +++ b/benches/main.rs @@ -3,22 +3,22 @@ extern crate test; -use test::Bencher; +use async_file_lock::FileLock; use tempfile::NamedTempFile; +use test::Bencher; use tokio::fs::File; -use tokio::prelude::io::AsyncWriteExt; -use async_file_lock::FileLock; +use tokio::io::AsyncWriteExt; #[bench] fn tokio_write(b: &mut Bencher) { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut file = rt.block_on(async { - File::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap() + File::create(NamedTempFile::new().unwrap().into_temp_path()) + .await + .unwrap() }); b.iter(|| { - rt.block_on(async { - file.write(b"a") - }); + rt.block_on(async { file.write(b"a") }); }) } @@ -26,14 +26,14 @@ fn tokio_write(b: &mut Bencher) { fn normal_write(b: &mut Bencher) { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut file = rt.block_on(async { - let mut file = FileLock::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap(); + let mut file = FileLock::create(NamedTempFile::new().unwrap().into_temp_path()) + .await + .unwrap(); file.lock_exclusive().await; file }); b.iter(|| { - rt.block_on(async { - file.write(b"a") - }); + rt.block_on(async { file.write(b"a") }); }) } @@ -41,11 +41,11 @@ fn normal_write(b: &mut Bencher) { fn auto_write(b: &mut Bencher) { let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut file = rt.block_on(async { - FileLock::create(NamedTempFile::new().unwrap().into_temp_path()).await.unwrap() + FileLock::create(NamedTempFile::new().unwrap().into_temp_path()) + .await + .unwrap() }); b.iter(|| { - rt.block_on(async { - file.write(b"a") - }); + rt.block_on(async { file.write(b"a") }); }) } diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..5d56faf --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" diff --git a/src/lib.rs b/src/lib.rs index f7543b2..be30cfc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,18 @@ #![deny(unused_must_use)] // #![cfg_attr(test, feature(test))] -use std::task::{Context, Poll}; -use std::pin::Pin; -use std::fmt::Formatter; +use futures_lite::{ready, FutureExt}; use std::fmt::Debug; +use std::fmt::Formatter; use std::future::Future; -use std::io::{Error, Result, SeekFrom, Seek}; +use std::io::{Error, Result, Seek, SeekFrom}; +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; use tokio::fs::{File, OpenOptions}; use tokio::io::{AsyncRead, AsyncSeek, AsyncWrite}; use tokio::task::{spawn_blocking, JoinHandle}; -use futures_lite::{ready, FutureExt}; -use fs3::FileExt; -use std::path::Path; -use std::mem::MaybeUninit; +use {fs3::FileExt, tokio::io::ReadBuf}; /// Locks a file asynchronously. /// Auto locks a file if any read or write methods are called. If [Self::lock_exclusive] @@ -40,7 +39,13 @@ impl FileLock { /// Opens a file in read and write mode that is unlocked. // This function will create a file if it does not exist, and will truncate it if it does. pub async fn create(path: impl AsRef) -> Result { - let file = OpenOptions::new().write(true).read(true).create(true).truncate(true).open(path).await?; + let file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .truncate(true) + .open(path) + .await?; Ok(FileLock::new_tokio(file).await) } @@ -61,7 +66,7 @@ impl FileLock { result: None, locking_fut: None, unlocking_fut: None, - seek_fut: None + seek_fut: None, } } @@ -76,7 +81,7 @@ impl FileLock { result: None, locking_fut: None, unlocking_fut: None, - seek_fut: None + seek_fut: None, } } @@ -96,11 +101,14 @@ impl FileLock { panic!("File already locked."); } self.is_manually_locked = true; - self.unlocked_file.as_mut().unwrap().try_lock_exclusive().map(|_| { - self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); - self.state = State::Locked; - - }) + self.unlocked_file + .as_mut() + .unwrap() + .try_lock_exclusive() + .map(|_| { + self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); + self.state = State::Locked; + }) } /// Locks the file for reading until [`Self::unlock`] is called. @@ -119,11 +127,14 @@ impl FileLock { panic!("File already locked."); } self.is_manually_locked = true; - self.unlocked_file.as_mut().unwrap().try_lock_shared().map(|_| { - self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); - self.state = State::Locked; - - }) + self.unlocked_file + .as_mut() + .unwrap() + .try_lock_shared() + .map(|_| { + self.locked_file = Some(File::from_std(self.unlocked_file.take().unwrap())); + self.state = State::Locked; + }) } /// Unlocks the file. @@ -154,11 +165,11 @@ impl FileLock { /// /// ```no_run /// use tokio::fs::File; - /// use tokio::prelude::*; + /// use tokio::io::AsyncWriteExt; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; - /// file.write_all(b"hello, world!").await?; + /// file.write(b"hello, world!").await?; /// file.sync_all().await?; /// # Ok(()) /// # } @@ -173,9 +184,7 @@ impl FileLock { return file.sync_all().await; } let file = self.unlocked_file.take().unwrap(); - let (result, file) = spawn_blocking(|| { - (file.sync_all(), file) - }).await.unwrap(); + let (result, file) = spawn_blocking(|| (file.sync_all(), file)).await.unwrap(); self.unlocked_file = Some(file); result } @@ -193,11 +202,11 @@ impl FileLock { /// /// ```no_run /// use tokio::fs::File; - /// use tokio::prelude::*; + /// use tokio::io::AsyncWriteExt; /// /// # async fn dox() -> std::io::Result<()> { /// let mut file = File::create("foo.txt").await?; - /// file.write_all(b"hello, world!").await?; + /// file.write(b"hello, world!").await?; /// file.sync_data().await?; /// # Ok(()) /// # } @@ -212,9 +221,7 @@ impl FileLock { return file.sync_data().await; } let file = self.unlocked_file.take().unwrap(); - let (result, file) = spawn_blocking(|| { - (file.sync_data(), file) - }).await.unwrap(); + let (result, file) = spawn_blocking(|| (file.sync_data(), file)).await.unwrap(); self.unlocked_file = Some(file); result } @@ -316,7 +323,7 @@ macro_rules! poll_loop { SeekFrom::Current(0) => $self.state = State::Working, _ => { let mode = $self.mode; - $self.as_mut().start_seek($cx, mode); + $self.as_mut().start_seek(mode); $self.state = State::Seeking; } }, @@ -324,7 +331,7 @@ macro_rules! poll_loop { // println!("working"); $working // println!("worked"); - }, + } State::Locking => { if let Err(e) = ready!($self.$lock($cx)) { return Poll::Ready(Err(e)); @@ -403,17 +410,19 @@ impl AsyncWrite for FileLock { } } -impl AsyncRead for FileLock { - unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit]) -> bool { - false - } +// impl FileLock { +// unsafe fn prepare_uninitialized_buffer(&self, _: &mut [MaybeUninit]) -> bool { +// false +// } +// } +impl AsyncRead for FileLock { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - poll_loop! {self, cx, |x| x as usize, poll_shared_lock, + buf: &mut ReadBuf, + ) -> Poll> { + poll_loop! {self, cx, /* |x| x as usize */|_| (), poll_shared_lock, State::Working => { let result = ready!(Pin::new(self.locked_file.as_mut().unwrap()) .as_mut() @@ -423,7 +432,7 @@ impl AsyncRead for FileLock { return Poll::Ready(result); } else { self.state = State::Unlocking; - self.result = Some(result.map(|x| x as u64)); + self.result = Some(result.map(|_| 0u64/* x as u64 */)); } } }; @@ -431,28 +440,25 @@ impl AsyncRead for FileLock { } impl AsyncSeek for FileLock { - fn start_seek( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - position: SeekFrom, - ) -> Poll> { + fn start_seek(mut self: Pin<&mut Self>, position: SeekFrom) -> Result<()> { if let Some(ref mut locked_file) = self.locked_file { - return Pin::new(locked_file) - .as_mut() - .start_seek(cx, position); + return Pin::new(locked_file).as_mut().start_seek(position); } - let mut file = self.unlocked_file.take().expect("Cannot seek while in the process of locking/unlocking/seeking"); - self.seek_fut = Some(spawn_blocking(move || { - (file.seek(position), file) - })); - return Poll::Ready(Ok(())); + let mut file = self + .unlocked_file + .take() + .expect("Cannot seek while in the process of locking/unlocking/seeking"); + self.seek_fut = Some(spawn_blocking(move || (file.seek(position), file))); + return Ok(()); } fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(ref mut locked_file) = self.locked_file { - return Pin::new(locked_file) - .as_mut() - .poll_complete(cx) + return Pin::new(locked_file).as_mut().poll_complete(cx); + } + // NOTE: calling this without calling start_seek might return the same result + if let None = self.seek_fut { + return Poll::Ready(Ok(0)); // but we return 0 } let (result, file) = ready!(Pin::new(self.seek_fut.as_mut().unwrap()).poll(cx)).unwrap(); self.seek_fut = None; @@ -547,4 +553,3 @@ impl<'a> Future for UnlockFuture<'a> { self.file_lock.poll_unlock(cx) } } - diff --git a/tests/async_file_lock.rs b/tests/async_file_lock.rs index dc65048..caeef4d 100644 --- a/tests/async_file_lock.rs +++ b/tests/async_file_lock.rs @@ -1,176 +1,425 @@ #![deny(unused_must_use)] +#![feature(assert_matches)] -use async_file_lock::FileLock; -use tokio::test; -use std::io::Result; +use std::path::PathBuf; use tempfile::NamedTempFile; -use tokio::io::{AsyncWriteExt, SeekFrom, AsyncSeekExt, AsyncReadExt}; -use std::time::Instant; -use fork::{fork, Fork}; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom}; +use tokio::test; +use {async_file_lock::FileLock, std::assert_matches::assert_matches}; fn file() -> FileLock { FileLock::new_std(tempfile::tempfile().unwrap()) } -#[test(threaded_scheduler)] -async fn normal1() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - // let tmp_path = PathBuf::from("/tmp/a"); - match fork() { - Ok(Fork::Parent(_)) => { - // println!("parent {}", tmp_path.exists()); - let mut buf = String::new(); - let mut file = FileLock::create(&tmp_path).await?; - // println!("parent {}", tmp_path.exists()); - println!("written {}", file.write(b"a").await?); - std::thread::sleep(std::time::Duration::from_millis(200)); - println!("slept"); - println!("sought {}", file.seek(SeekFrom::Start(0)).await?); - file.read_to_string(&mut buf).await?; - println!("read"); - // // We write at location 0 then it gets overridden. - assert_eq!(buf, "b"); - } - Ok(Fork::Child) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - println!("child {}", tmp_path.exists()); - let mut file = FileLock::open(&tmp_path).await?; - println!("child opened"); - file.write(b"b").await?; - println!("done"); - std::process::exit(0); - }, - Err(_) => panic!("unable to fork"), - } - Ok(()) +#[test(flavor = "multi_thread")] +async fn normal1() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // // let tmp_path = PathBuf::from("/tmp/a"); + // match fork() { + // Ok(Fork::Parent(_)) => { + // // println!("parent {}", tmp_path.exists()); + // let mut buf = String::new(); + // let mut file = FileLock::create(&tmp_path).await?; + // // println!("parent {}", tmp_path.exists()); + // println!("writen {}", file.write(b"a").await?); + // std::thread::sleep(std::time::Duration::from_millis(200)); + // println!("slept"); + // println!("sought {}", file.seek(SeekFrom::Start(0)).await?); + // file.read_to_string(&mut buf).await?; + // println!("read"); + // // // We write at location 0 then it gets overridden. + // assert_eq!(buf, "b"); + // } + // Ok(Fork::Child) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // println!("child {}", tmp_path.exists()); + // let mut file = FileLock::open(&tmp_path).await?; + // println!("child opened"); + // file.write(b"b").await?; + // println!("done"); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (writen_tx, writen_rx) = tokio::sync::oneshot::channel::<()>(); + let (overwriten_tx, overwriten_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // open file + let mut file = FileLock::open(&tmp_path).await.unwrap(); + assert!(tmp_path.exists()); + + // write to file + assert_matches!(file.write(b"a").await, Ok(_)); + assert_matches!(writen_tx.send(()), Ok(())); + + // wait for overwriten + assert_matches!(overwriten_rx.await, Ok(())); + + // read content + let mut buf = String::new(); + assert_matches!(file.read_to_string(&mut buf).await, Ok(_)); + + // assert it has been overwriten + assert_eq!(buf, "b"); + }); + + // main thread + { + // open file + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // wait for writen signal + assert_matches!(writen_rx.await, Ok(())); + + // overwrite + assert_matches!(file.write(b"b").await, Ok(_)); + + // signal + assert_matches!(overwriten_tx.send(()), Ok(())); + }; } -#[test(threaded_scheduler)] -async fn append_mode() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - let mut buf = String::new(); - let mut file = FileLock::create(&tmp_path).await?; - file.set_seeking_mode(SeekFrom::End(0)); - file.write(b"a").await?; - // println!("parent {}", tmp_path.exists()); - std::thread::sleep(std::time::Duration::from_millis(200)); - file.write(b"a").await?; - file.seek(SeekFrom::Start(0)).await?; - // Turn off auto seeking mode - file.set_seeking_mode(SeekFrom::Current(0)); - file.read_to_string(&mut buf).await?; - // Each file handle has its own position. - assert_eq!(buf, "bba"); - } - Ok(Fork::Child) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - // println!("{}", tmp_path.exists()); - let mut file = FileLock::open(&tmp_path).await?; - file.write(b"bb").await?; - file.flush().await?; - // println!("done"); - std::process::exit(0); - }, - Err(_) => panic!("unable to fork"), - } - Ok(()) +#[test(flavor = "multi_thread")] +async fn append_mode() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // match fork() { + // Ok(Fork::Parent(_)) => { + // let mut buf = String::new(); + // let mut file = FileLock::create(&tmp_path).await?; + // file.set_seeking_mode(SeekFrom::End(0)); + // file.write(b"a").await?; + // // println!("parent {}", tmp_path.exists()); + // std::thread::sleep(std::time::Duration::from_millis(200)); + // file.write(b"a").await?; + // file.seek(SeekFrom::Start(0)).await?; + // // Turn off auto seeking mode + // file.set_seeking_mode(SeekFrom::Current(0)); + // file.read_to_string(&mut buf).await?; + // // Each file handle has its own position. + // assert_eq!(buf, "bba"); + // } + // Ok(Fork::Child) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // // println!("{}", tmp_path.exists()); + // let mut file = FileLock::open(&tmp_path).await?; + // file.write(b"bb").await?; + // file.flush().await?; + // // println!("done"); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (writen_tx, writen_rx) = tokio::sync::oneshot::channel::<()>(); + let (overwriten_tx, overwriten_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // open file + let mut file = FileLock::open(&tmp_path).await.unwrap(); + assert!(tmp_path.exists()); + + file.set_seeking_mode(SeekFrom::End(0)); + + // write to file + assert_matches!(file.write(b"a").await, Ok(_)); + assert_matches!(writen_tx.send(()), Ok(())); + + // wait for overwriten + assert_matches!(overwriten_rx.await, Ok(())); + + // write, it should append to the end + assert_matches!(file.write(b"a").await, Ok(_)); + + assert_matches!(file.seek(SeekFrom::Current(0)).await, Ok(_)); + + // turn off auto seeking mode + file.set_seeking_mode(SeekFrom::Current(0)); + + // read content + let mut buf = String::new(); + assert_matches!(file.read_to_string(&mut buf).await, Ok(_)); + + // assert it has been overwriten + assert_eq!(buf, "bba"); + }); + + // main thread + { + // wait for writen signal + assert_matches!(writen_rx.await, Ok(())); + + // open file + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // overwrite + assert_matches!(file.write(b"bb").await, Ok(_)); + assert_matches!(file.flush().await, Ok(_)); + + // signal + assert_matches!(overwriten_tx.send(()), Ok(())); + }; } -#[test(threaded_scheduler)] -async fn lock_shared() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_shared().await?; - assert!(instant.elapsed().as_millis() < 100); - } - Ok(Fork::Child) => { - let mut file = FileLock::open(&tmp_path).await?; - file.lock_shared().await?; - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - }, - Err(_) => panic!("unable to fork"), - } - Ok(()) +#[test(flavor = "multi_thread")] +async fn lock_shared() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // match fork() { + // Ok(Fork::Parent(_)) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // let mut file = FileLock::open(&tmp_path).await?; + // let instant = Instant::now(); + // file.lock_shared().await?; + // assert!(instant.elapsed().as_millis() < 100); + // } + // Ok(Fork::Child) => { + // let mut file = FileLock::open(&tmp_path).await?; + // file.lock_shared().await?; + // std::thread::sleep(std::time::Duration::from_millis(1000)); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_shared(), Ok(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_shared(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_shared().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // unlock + file.unlock().await; + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } +#[test(flavor = "multi_thread")] +async fn lock_exclusive() { + let file = NamedTempFile::new().unwrap(); -#[test(threaded_scheduler)] -async fn lock_exclusive() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - println!("parent {}", tmp_path.exists()); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_exclusive().await?; - assert!(instant.elapsed().as_millis() > 800); - } - Ok(Fork::Child) => { - let mut file = FileLock::create(&tmp_path).await?; - file.lock_exclusive().await?; - println!("child {}", tmp_path.exists()); - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - }, - Err(_) => panic!("unable to fork"), - } - Ok(()) + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_exclusive(), Err(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_exclusive(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_exclusive().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // unlock + file.unlock().await; + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } -#[test(threaded_scheduler)] -async fn lock_exclusive_shared() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_exclusive().await?; - assert!(instant.elapsed().as_millis() > 800); - } - Ok(Fork::Child) => { - let mut file = FileLock::open(&tmp_path).await?; - file.lock_shared().await?; - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - }, - Err(_) => panic!("unable to fork"), - } - Ok(()) +#[test(flavor = "multi_thread")] +async fn lock_exclusive_shared() { + // let tmp_path = NamedTempFile::new()?.into_temp_path(); + // match fork() { + // Ok(Fork::Parent(_)) => { + // std::thread::sleep(std::time::Duration::from_millis(100)); + // let mut file = FileLock::open(&tmp_path).await?; + // let instant = Instant::now(); + // file.lock_exclusive().await?; + // assert!(instant.elapsed().as_millis() > 800); + // } + // Ok(Fork::Child) => { + // let mut file = FileLock::open(&tmp_path).await?; + // file.lock_shared().await?; + // std::thread::sleep(std::time::Duration::from_millis(1000)); + // std::process::exit(0); + // } + // Err(_) => panic!("unable to fork"), + // } + // Ok(()) + + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_exclusive(), Err(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_exclusive(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_shared().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // unlock + file.unlock().await; + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } -#[test(threaded_scheduler)] -async fn drop_lock_exclusive() -> Result<()> { - let tmp_path = NamedTempFile::new()?.into_temp_path(); - match fork() { - Ok(Fork::Parent(_)) => { - std::thread::sleep(std::time::Duration::from_millis(100)); - let mut file = FileLock::open(&tmp_path).await?; - let instant = Instant::now(); - file.lock_exclusive().await?; - assert!(instant.elapsed().as_millis() < 100); - } - Ok(Fork::Child) => { - let mut file = FileLock::open(&tmp_path).await?; - file.lock_exclusive().await?; - drop(file); - std::thread::sleep(std::time::Duration::from_millis(1000)); - std::process::exit(0); - }, - Err(_) => panic!("unable to fork"), - } - Ok(()) +#[test(flavor = "multi_thread")] +async fn drop_lock_exclusive() { + let file = NamedTempFile::new().unwrap(); + + let tmp_path: PathBuf = file.path().into(); + let tmp_path2 = tmp_path.clone(); + + // signal channels + let (locked_tx, locked_rx) = tokio::sync::oneshot::channel::<()>(); + let (unlocked_tx, unlocked_rx) = tokio::sync::oneshot::channel::<()>(); + + // child thread + tokio::spawn(async move { + // wait for file locked by main thread + assert_matches!(locked_rx.await, Ok(())); + + assert!(tmp_path.exists()); + let mut file = FileLock::open(&tmp_path).await.unwrap(); + + // attemp to obtain a lock acquired should failed. + assert_matches!(file.try_lock_exclusive(), Err(_)); + + // wait for lock released from main thread + assert_matches!(unlocked_rx.await, Ok(())); + + // attemp to obtain a lock should success. + assert_matches!(file.try_lock_exclusive(), Ok(())); + }); + + // main thread + { + let mut file = FileLock::open(&tmp_path2).await.unwrap(); + assert!(tmp_path2.exists()); + + // obtain the lock + assert_matches!(file.lock_exclusive().await, Ok(_)); + + // signal child thread + assert_matches!(locked_tx.send(()), Ok(())); + + // sleep + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; + + // drop lock + drop(file); + + // signal + assert_matches!(unlocked_tx.send(()), Ok(())); + }; } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn exclusive_locking_locked_file_panics() { let mut file = file(); @@ -178,7 +427,7 @@ async fn exclusive_locking_locked_file_panics() { file.lock_exclusive().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn shared_locking_locked_file_panics() { let mut file = file(); @@ -186,7 +435,7 @@ async fn shared_locking_locked_file_panics() { file.lock_shared().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn shared_locking_exclusive_file_panics() { let mut file = file(); @@ -194,7 +443,7 @@ async fn shared_locking_exclusive_file_panics() { file.lock_shared().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn exclusive_locking_shared_file_panics() { let mut file = file(); @@ -202,14 +451,14 @@ async fn exclusive_locking_shared_file_panics() { file.lock_exclusive().await.unwrap(); } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn unlocking_file_panics() { let mut file = file(); file.unlock().await; } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn unlocking_unlocked_file_panics() { let mut file = file(); @@ -218,7 +467,7 @@ async fn unlocking_unlocked_file_panics() { file.unlock().await; } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] async fn file_stays_locked() { let mut file = file(); file.lock_exclusive().await.unwrap(); @@ -226,10 +475,10 @@ async fn file_stays_locked() { file.unlock().await; } -#[test(threaded_scheduler)] +#[test(flavor = "multi_thread")] #[should_panic] async fn file_auto_unlocks() { let mut file = file(); file.write(b"a").await.unwrap(); file.unlock().await; -} \ No newline at end of file +}