Conversation
8b77c15 to
7252997
Compare
|
Sounds good. Let's wait for stabilization before merging into bluest. |
30b6408 to
9fd1bb9
Compare
|
Can you make |
|
That would probably work, but it'd be quite unergonomic. I'd prefer to just wait rather than doing that change and then changing everything again when arbitrary_self_types is stable. |
|
Sure, makes sense. |
9429c1d to
96d6aac
Compare
617dcd8 to
0a0353e
Compare
|
I just considered continuing my progress on my fork https://github.com/wuwbobo2021/bluest/tree/android again, untill I found this PR #31. Here are my ideas (some of which are probably stupid):
I will definitely not start to implement these things unless you agree with my supposed "design". |
|
New idea: Dirbaio/java-spaghetti#1 (comment). |
|
Excuse me. I know that you are all online, and I'll assume that you don't disagree with my raw design seriously. I might still continue my progress a few weeks later, probably based on the not updated |
|
I've built `Excluder` and `Notifier` code#![allow(dead_code)]
use async_broadcast::{Receiver, Sender};
use async_lock::Mutex;
use std::pin::Pin;
use std::sync::{Arc, Weak};
use std::task;
pub struct Excluder<T: Send + Clone> {
inner: Mutex<Weak<Sender<()>>>,
last_val: Arc<Mutex<Option<T>>>,
}
pub struct ExcluderLock<T: Send + Clone> {
inner: Option<Arc<Sender<()>>>, // always `Some` before `drop()`
receiver: Receiver<()>,
last_val: Weak<Mutex<Option<T>>>,
}
impl<T: Send + Clone, E: Send + Clone> Excluder<Result<T, E>> {
/// Locks the excluder, does the operation that will produce the callback,
/// then waits for the callback's result.
pub async fn obtain(
&self,
operation: impl FnOnce() -> Result<(), E>,
) -> Result<Option<T>, E> {
let lock = self.lock().await;
operation()?;
if let Some(res) = lock.wait_unlock().await {
Ok(Some(res?))
} else {
Ok(None)
}
}
}
impl<T: Send + Clone> Excluder<T> {
/// Creates a new unlocked `Excluder`.
pub fn new() -> Self {
Self {
inner: Mutex::new(Weak::new()),
last_val: Arc::new(Mutex::new(None)),
}
}
/// Clones and returns the last value returned by the "foreign" callback.
pub fn last_value(&self) -> Option<T> {
self.last_val.lock_blocking().clone()
}
/// Checks if the excluder is locked.
pub fn is_locked(&self) -> bool {
// Don't call it in this module
self.inner.lock_blocking().strong_count() > 0
}
/// Waits until the excluder is unlocked and locks the excluder.
/// Call this right before calling a method that will produce a "foreign" callback;
/// after calling that method, call [ExcluderLock::wait_unlock] in the same task.
pub async fn lock(&self) -> ExcluderLock<T> {
// waits for the waking signal if the excluder is currently locked.
let receiver = {
let guard_inner = self.inner.lock().await;
guard_inner.upgrade().as_ref().map(|s| s.new_receiver())
};
if let Some(mut receiver) = receiver {
// to prevent dead lock, don't hold the `Arc<Sender<()>>` during waiting.
let _ = receiver.recv().await;
}
let mut guard_inner = self.inner.lock().await;
if guard_inner.strong_count() > 0 {
// race condition of multiple tasks trying to lock after receving unlock signal;
// one of them has already won, just wait for that new lock to be unlocked.
drop(guard_inner);
return Box::pin(self.lock()).await;
}
// sets the lock; don't drop the guard before it; `async_lock` is used for this requirement.
let (sender, receiver) = async_broadcast::broadcast(1);
let sender = Arc::new(sender);
*guard_inner = Arc::downgrade(&sender);
ExcluderLock {
inner: Some(sender),
receiver,
last_val: Arc::downgrade(&self.last_val),
}
}
/// Sends the "completed" (unlock) signal from the "foreign" callback.
pub fn unlock(&self, result: T) {
self.last_val.lock_blocking().replace(result);
let mut guard_inner = self.inner.lock_blocking();
if let Some(sender) = guard_inner.upgrade() {
// to prevent dead lock, invalidate the `Weak` in `Excluder` before broadcasting.
*guard_inner = Weak::new();
let _ = sender.broadcast_blocking(());
}
}
}
impl<T: Send + Clone> Default for Excluder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Send + Clone> Drop for Excluder<T> {
fn drop(&mut self) {
// makes sure `ExcluderLock::wait_unlock` return `None`.
let _ = self.last_val.lock_blocking().take();
let mut guard_inner = self.inner.lock_blocking();
if let Some(sender) = guard_inner.upgrade() {
*guard_inner = Weak::new();
let _ = sender.broadcast_blocking(());
}
}
}
impl<T: Send + Clone> ExcluderLock<T> {
/// Waits until the unlock signal is sent from the "foreign" callback.
pub async fn wait_unlock(mut self) -> Option<T> {
self.receiver.recv().await.ok()?;
self.last_val
.upgrade()
.and_then(|arc| arc.lock_blocking().as_ref().cloned())
}
}
pub struct Notifier<T: Send + Clone> {
capacity: usize,
inner: Mutex<Weak<NotifierInner<T>>>,
}
struct NotifierInner<T: Send + Clone> {
sender: Sender<Option<T>>,
on_stop: Box<dyn Fn() + Send + Sync + 'static>,
}
pub struct NotifierReceiver<T: Send + Clone> {
holder: Option<Arc<NotifierInner<T>>>,
receiver: Receiver<Option<T>>,
}
impl<T: Send + Clone> Notifier<T> {
/// Creates a new inactive `Notifier`.
pub fn new(capacity: usize) -> Self {
Self {
capacity,
inner: Mutex::new(Weak::new()),
}
}
/// Checks if the notifier is active.
pub fn is_notifying(&self) -> bool {
// Don't call it in this module
self.inner.lock_blocking().strong_count() > 0
}
/// Creates a new `NotifierReceiver` for the caller to receive notifications.
/// - `on_start` is called while locking the notifier if the notifier is not active.
/// - `on_stop` is what the notifier should do when it is deactivated, but it is not
/// replaced if the notifier is already active.
pub async fn subscribe<E>(
&self,
on_start: impl FnOnce() -> Result<(), E>,
on_stop: impl Fn() + Send + Sync + 'static,
) -> Result<NotifierReceiver<T>, E> {
let mut guard_inner = self.inner.lock().await;
if let Some(inner) = guard_inner.upgrade() {
let receiver = inner.sender.new_receiver();
Ok(NotifierReceiver {
holder: Some(inner),
receiver,
})
} else {
on_start()?;
let (mut sender, receiver) = async_broadcast::broadcast(self.capacity);
sender.set_overflow(true);
let new_inner = Arc::new(NotifierInner {
sender,
on_stop: Box::new(on_stop),
});
*guard_inner = Arc::downgrade(&new_inner);
Ok(NotifierReceiver {
holder: Some(new_inner),
receiver,
})
}
}
/// Sends a notifcation value from the "foreign" callback.
pub fn notify(&self, value: T) {
let inner = self.inner.lock_blocking().upgrade();
if let Some(inner) = inner {
let _ = inner.sender.broadcast_blocking(Some(value));
}
}
}
impl<T: Send + Clone> futures_core::Stream for NotifierReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<T>> {
if self.holder.is_none() {
task::Poll::Ready(None)
} else if let task::Poll::Ready(result) = std::pin::pin!(&mut self.receiver).poll_next(cx) {
if let Some(value) = result.flatten() {
task::Poll::Ready(Some(value))
} else {
let _ = self.holder.take();
task::Poll::Ready(None)
}
} else {
task::Poll::Pending
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.receiver.size_hint()
}
}
impl<T: Send + Clone> Drop for Notifier<T> {
fn drop(&mut self) {
let inner = self.inner.lock_blocking().upgrade();
if let Some(inner) = inner {
let _ = inner.sender.broadcast_blocking(None);
}
}
}
impl<T: Send + Clone> Drop for NotifierInner<T> {
fn drop(&mut self) {
(self.on_stop)()
}
}
fn main() {} |
|
I am introducing GATT support for Android: #40. Suggestions and criticisms are welcome. |
Opening as draft for visibility and because I wanted to see what it looks like. It gets rid of that unsafe lifetime upgrade which is nice :)
This isn't quite ready for prime-time because it needs
feature(arbitrary_self_types). It's supposed to be stabilized Soon(TM) becauseit's high on Rust's priorities because Rust-for-Linux needs it. I'm not sure whether to release the new
java-spaghettiwithout waiting for it or not though. Would you be OK with bluest on Android requiring nightly until then?