Conversation
There was a problem hiding this comment.
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
The code should be ready for some early review. For this PR to be fully ready I want to add more benchmarks |
|
Please ask your LLM to generate an ASCII chart of the implemented solution. |
There was a problem hiding this comment.
Typo in commit message:
There was a but -> There was a bug
| // Initialize the direct-poll bridge once per process. | ||
| // This sets up the Tokio reactor thread and the wake mechanism used by all | ||
| // bridged async Rust functions (session queries, paging, etc.). | ||
| rust.initPollBridge(); |
There was a problem hiding this comment.
❓ When is this executed? Is this guaranteed to be executed at most once? Is this idempotent?
There was a problem hiding this comment.
When is this executed
When the file is first imported
Is this guaranteed to be executed at most once
Almost always yes: https://nodejs.org/docs/latest/api/modules.html#caching
Is this idempotent
No. The following calls will lead to a panic.
There was a problem hiding this comment.
Is this idempotent
Do you think it's worth making it idempotent, i.e. setting an atomic flag at the beginning of its execution that prevents double initialization?
| extern crate napi_derive; | ||
|
|
||
| // Link other files | ||
| pub mod casync; |
There was a problem hiding this comment.
🤔 I dislike the name. Can we use something more intuitive?
future_bridge?
task_bridge?
async_bridge? -> I like this the most.
| impl std::fmt::Display for ConvertedError { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| write!(f, "{}: {}", self.name, self.msg) | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Let's have this as a separate commit.
src/casync.rs
Outdated
| // Create the TSFN from a no-op C callback. | ||
| // `build_callback` replaces the JS call — the noop is never invoked. | ||
| let noop_fn = env.create_function::<(), ()>("pollBridgeNoop", noop_callback_c_callback)?; | ||
|
|
||
| let tsfn = noop_fn | ||
| .build_threadsafe_function::<()>() | ||
| .weak::<true>() | ||
| .build_callback(|ctx| { | ||
| let raw_env = ctx.env; | ||
| REGISTRY.with(|r| { | ||
| r.borrow_mut().poll_woken(raw_env); | ||
| }); | ||
| Ok(()) | ||
| })?; |
There was a problem hiding this comment.
💭 This trickery makes me wonder if noop_callback is really needed.
There was a problem hiding this comment.
❓ What does it mean that a TSFN is weak?
There was a problem hiding this comment.
What does it mean that a TSFN is weak
It means it's existance will not prevent node from gracefully finishing
There was a problem hiding this comment.
There was a problem hiding this comment.
If so, then why do we need to ref and unref it?
There was a problem hiding this comment.
We mannualy ref / unref when we have some promises pending, so that we block the node.js from finishing only when we have some queries running
src/casync.rs
Outdated
| // Cleanup hook — shut down the runtime when Node exits. | ||
| env.add_env_cleanup_hook((), |_| { | ||
| REGISTRY.with(|r| { | ||
| r.borrow_mut().shutdown(); | ||
| }); | ||
| })?; |
There was a problem hiding this comment.
This explains why Mutex<Option<Tsfn>> instead of OnceLock<Tsfn>.
| /// Submit a typed Rust future to be polled directly by the Node event loop. | ||
| /// | ||
| /// Future can return a typed value `T` on success | ||
| /// or an error `E` on failure. Both `T` and `E` are converted to JS values via | ||
| /// `ToNapiValue` on the main thread when the future settles. | ||
| /// | ||
| /// The error type `E` should produce a JS Error object from `to_napi_value` so | ||
| /// that the rejection value is a proper error (e.g. `ConvertedError`). | ||
| pub fn submit_future<F, T>(env: &Env, fut: F) -> ConvertedResult<JsPromise<T>> | ||
| where | ||
| F: Future<Output = std::result::Result<T, ConvertedError>> + Send + 'static, | ||
| T: napi::bindgen_prelude::ToNapiValue + Send + 'static, | ||
| { | ||
| // This is a driver error, so panic is warranted here. There is no reasonable way to recover. | ||
| assert!( | ||
| INITIALIZED.load(Ordering::Relaxed), | ||
| "init_poll_bridge must be called before submit_future. This is a bug in the driver." | ||
| ); | ||
|
|
||
| let (deferred, promise) = create_promise(env)?; | ||
|
|
||
| let boxed: BoxFuture = Box::pin(async move { | ||
| let result = fut.await; | ||
| Box::new(move |env: Env, deferred| unsafe { | ||
| // SAFETY: This closure is only ever invoked from `poll_woken`, which runs | ||
| // on the Node main thread inside the TSFN callback - the only place where | ||
| // `env` is a valid napi_env. `deferred` is consumed exactly once here, | ||
| // satisfying the napi contract that each deferred is resolved or rejected | ||
| // exactly once. `to_napi_value` receives the same valid `env`. | ||
| let (js_val, resolve) = match result { | ||
| Ok(val) => (T::to_napi_value(env.raw(), val), true), | ||
| Err(err) => (ConvertedError::to_napi_value(env.raw(), err), false), | ||
| }; | ||
| let status = js_val | ||
| // First we try to accept / reject with converted value / error. | ||
| .and_then(|v| { | ||
| if resolve { | ||
| check_status!(sys::napi_resolve_deferred(env.raw(), deferred, v)) | ||
| } else { | ||
| check_status!(sys::napi_reject_deferred(env.raw(), deferred, v)) | ||
| } | ||
| }) | ||
| // If this fails, or we failed to convert the value / error into a JS value, | ||
| // we reject with a fallback reason. | ||
| .or_else(|e| reject_with_reason(env, deferred, &e.reason)); | ||
|
|
||
| if let Err(e) = status { | ||
| // If both fail, we assume something terrible has happened. We cannot | ||
| // inform JS side about the error by regular error handling, so we panic to | ||
| // avoid silent failures and orphaned promises. | ||
| panic!( | ||
| "Failed to settle promise in TSFN callback. This may indicate either a bug in the driver or a severe runtime error.\nRoot cause:\n {}", | ||
| e.reason | ||
| ); | ||
| } | ||
| }) as SettleCallback | ||
| }); | ||
|
|
||
| REGISTRY.with(|r| r.borrow_mut().insert(env, boxed, deferred))?; | ||
| Ok(JsPromise(promise, PhantomData)) | ||
| } |
There was a problem hiding this comment.
💭 I'm wondering if it makes sense to perform the first poll() straightaway. This could reduce latency. When executing prepared statements (the main point of our interest), the logic is as follows:
- serialize statement's bound values,
- calculate token,
- configure the execution,
- ask LBP for routing decision,
- create a request frame,
- send the frame via a channel to a tokio task managing the connection (router),
- wait until the response arrives.
If I'm not mistaken, all points but the last can happen during a single poll! The rest is on the router, which is driven by the tokio runtime worker thread(s).
There was a problem hiding this comment.
wait until the response arrives.
All points except this one I can agree can happen in the single poll
There was a problem hiding this comment.
I insist on this, because the latency gains can be significant.
|
In general, looks promising! |
This commit adds a custom async bridge between Rust and JavaScript using N-API, allowing for scheduling of async tasks without the use of tokio::spawn, that is used when creating async functions through the napi-rs macros. The main motivation for this change, was to improve the performance of the driver. With the existing approach we spend a lot of CPU time on synchronization between the main thread and the tokio threads. By reducing CPU time, I aim to also improve the driver runtime. This approaches pools all the futures on the Node.js main thread, replacing napi-rs's built-in async task system which polls on Tokio worker threads. Architecture: - Single weak ThreadsafeFunction (TSFN) shared across all futures, with manual ref/unref to control Node.js event loop lifetime - FutureRegistry (thread-local on main thread) stores in-flight futures paired with their napi_deferred handles - Per-future Waker backed by Arc<WakerInner> implementing the Wake trait, which pushes the future id into a shared woken_ids vec and signals the TSFN - Coalesced signaling via AtomicBool prevents flooding the event loop when multiple wakers fire simultaneously - Single-threaded Tokio runtime drives the I/O reactor only; futures are polled on the main thread inside the TSFN callback with the Tokio runtime context entered Key design decisions: - Polling on main thread ensures napi_env is always valid during ToNapiValue conversion, avoiding cross-thread napi safety issues - Type-erased BoxFuture and SettleCallback allow heterogeneous futures in a single HashMap without leaking generic parameters - Promise created via raw napi_create_promise/napi_resolve_deferred to bypass napi-rs's async machinery entirely [This commit including this commit message was created with heavy use of LLM tools. At the current moment, the code was slightly refactored to partially match the existing style kept at this repository.]
There was a but that lead to incorrect assertion in the benchmark
|
Rebased on main |
|
Addressed some comments and added a new wrapper for safety (this one fully written by hand). For now, the changes are not split into components properly yet. |
|
|
||
| impl<T> ToNapiValue for JsPromise<T> { | ||
| /// # Safety | ||
| /// No constrains on safety. The unsafe is required by the trait. |
| type Tsfn = napi::threadsafe_function::ThreadsafeFunction<(), (), (), Status, false, true>; | ||
|
|
||
| /// Single Thread safe function, coalesced wake signals | ||
| struct WakerBridge { |
There was a problem hiding this comment.
single-thread-safety is not a thing...
| /// Submit a typed Rust future to be polled directly by the Node event loop. | ||
| /// | ||
| /// Future can return a typed value `T` on success | ||
| /// or an error `E` on failure. Both `T` and `E` are converted to JS values via | ||
| /// `ToNapiValue` on the main thread when the future settles. | ||
| /// | ||
| /// The error type `E` should produce a JS Error object from `to_napi_value` so | ||
| /// that the rejection value is a proper error (e.g. `ConvertedError`). | ||
| pub fn submit_future<F, T>(env: &Env, fut: F) -> ConvertedResult<JsPromise<T>> | ||
| where | ||
| F: Future<Output = std::result::Result<T, ConvertedError>> + Send + 'static, | ||
| T: napi::bindgen_prelude::ToNapiValue + Send + 'static, | ||
| { | ||
| // This is a driver error, so panic is warranted here. There is no reasonable way to recover. | ||
| assert!( | ||
| INITIALIZED.load(Ordering::Relaxed), | ||
| "init_poll_bridge must be called before submit_future. This is a bug in the driver." | ||
| ); | ||
|
|
||
| let (deferred, promise) = create_promise(env)?; | ||
|
|
||
| let boxed: BoxFuture = Box::pin(async move { | ||
| let result = fut.await; | ||
| Box::new(move |env: Env, deferred| unsafe { | ||
| // SAFETY: This closure is only ever invoked from `poll_woken`, which runs | ||
| // on the Node main thread inside the TSFN callback - the only place where | ||
| // `env` is a valid napi_env. `deferred` is consumed exactly once here, | ||
| // satisfying the napi contract that each deferred is resolved or rejected | ||
| // exactly once. `to_napi_value` receives the same valid `env`. | ||
| let (js_val, resolve) = match result { | ||
| Ok(val) => (T::to_napi_value(env.raw(), val), true), | ||
| Err(err) => (ConvertedError::to_napi_value(env.raw(), err), false), | ||
| }; | ||
| let status = js_val | ||
| // First we try to accept / reject with converted value / error. | ||
| .and_then(|v| { | ||
| if resolve { | ||
| check_status!(sys::napi_resolve_deferred(env.raw(), deferred, v)) | ||
| } else { | ||
| check_status!(sys::napi_reject_deferred(env.raw(), deferred, v)) | ||
| } | ||
| }) | ||
| // If this fails, or we failed to convert the value / error into a JS value, | ||
| // we reject with a fallback reason. | ||
| .or_else(|e| reject_with_reason(env, deferred, &e.reason)); | ||
|
|
||
| if let Err(e) = status { | ||
| // If both fail, we assume something terrible has happened. We cannot | ||
| // inform JS side about the error by regular error handling, so we panic to | ||
| // avoid silent failures and orphaned promises. | ||
| panic!( | ||
| "Failed to settle promise in TSFN callback. This may indicate either a bug in the driver or a severe runtime error.\nRoot cause:\n {}", | ||
| e.reason | ||
| ); | ||
| } | ||
| }) as SettleCallback | ||
| }); | ||
|
|
||
| REGISTRY.with(|r| r.borrow_mut().insert(env, boxed, deferred))?; | ||
| Ok(JsPromise(promise, PhantomData)) | ||
| } |
There was a problem hiding this comment.
I insist on this, because the latency gains can be significant.
This commit adds a custom async bridge between Rust and JavaScript using N-API,
allowing for scheduling of async tasks without the use of tokio::spawn,
that is used when creating async functions through the napi-rs macros.
The main motivation for this change, was to improve the performance of the driver.
With the existing approach we spend a lot of CPU time on synchronization
between the main thread and the tokio threads. By reducing CPU time,
I aim to also improve the driver runtime.
This approaches polls all the futures on the Node.js main thread,
replacing napi-rs's built-in async task system which polls on
Tokio worker threads.
Architecture:
with manual ref/unref to control Node.js event loop lifetime
paired with their napi_deferred handles
which pushes the future id into a shared woken_ids vec and signals
the TSFN
when multiple wakers fire simultaneously
are polled on the main thread inside the TSFN callback with the
Tokio runtime context entered
Key design decisions:
ToNapiValue conversion, avoiding cross-thread napi safety issues
in a single HashMap without leaking generic parameters
to bypass napi-rs's async machinery entirely
[This PR was created with heavy use of LLM tools. At the current moment, the code was significantly refactored to match the existing style kept at this repository and improve error handling]
This PR aims to significantly improve the performance of the driver.
Refs: #75. With this optimisation, the performance for the GA release should not be a problem any loger.
Some early results: