diff --git a/.changeset/wet-rings-repair.md b/.changeset/wet-rings-repair.md
new file mode 100644
index 000000000000..cc25b15aa5aa
--- /dev/null
+++ b/.changeset/wet-rings-repair.md
@@ -0,0 +1,5 @@
+---
+'@sveltejs/kit': minor
+---
+
+feat: experimental `query.live` function
diff --git a/documentation/docs/20-core-concepts/60-remote-functions.md b/documentation/docs/20-core-concepts/60-remote-functions.md
index 7b60d7181e86..2109f6476159 100644
--- a/documentation/docs/20-core-concepts/60-remote-functions.md
+++ b/documentation/docs/20-core-concepts/60-remote-functions.md
@@ -227,6 +227,45 @@ export const getWeather = query.batch(v.string(), async (cityIds) => {
{/if}
```
+## query.live
+
+`query.live` is for accessing real-time data from the server. It behaves similarly to `query`, but the callback — typically an async [generator function](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Operators/function*) — returns an `AsyncIterable`:
+
+```js
+import { query } from '$app/server';
+
+export const getTime = query.live(async function* () {
+ while (true) {
+ yield new Date();
+ await new Promise((f) => setTimeout(f, 1000));
+ }
+});
+```
+
+During server-side rendering, `await getTime()` returns the first yielded value then closes the iterator. This initial value is serialized and reused during hydration.
+
+On the client, the query stays connected while it's actively used in a component. Multiple instances share a connection. When there are no active uses left, the stream disconnects and server-side iteration is stopped.
+
+Live queries expose a `connected` property and `reconnect()` method:
+
+```svelte
+
+
+
{await time}
+connected: {time.connected}
+ time.reconnect()}>Reconnect
+```
+
+If the connection drops, `connected` becomes `false`. SvelteKit will attempt to reconnect passively, with exponential backoff, and actively if `navigator.onLine` goes from `false` to `true`.
+
+Unlike `query`, live queries do not have a `refresh()` method, as they are self-updating.
+
+As with `query` and `query.batch`, call `.run()` outside render when you need imperative access. For live queries, `run()` returns a `Promise>`.
+
## form
The `form` function makes it easy to write data to the server. It takes a callback that receives `data` constructed from the submitted [`FormData`](https://developer.mozilla.org/en-US/docs/Web/API/FormData)...
diff --git a/packages/kit/src/exports/internal/remote-functions.js b/packages/kit/src/exports/internal/remote-functions.js
index 100488ab4c6a..f7da7b41742d 100644
--- a/packages/kit/src/exports/internal/remote-functions.js
+++ b/packages/kit/src/exports/internal/remote-functions.js
@@ -1,7 +1,7 @@
/** @import { RemoteInternals } from 'types' */
/** @type {RemoteInternals['type'][]} */
-const types = ['command', 'form', 'prerender', 'query', 'query_batch'];
+const types = ['command', 'form', 'prerender', 'query', 'query_batch', 'query_live'];
/**
* @param {Record} module
diff --git a/packages/kit/src/exports/public.d.ts b/packages/kit/src/exports/public.d.ts
index 60a50033add7..75b49809fd36 100644
--- a/packages/kit/src/exports/public.d.ts
+++ b/packages/kit/src/exports/public.d.ts
@@ -2200,6 +2200,21 @@ export type RemoteQuery = RemoteResource & {
withOverride(update: (current: T) => T): RemoteQueryOverride;
};
+export type RemoteLiveQuery = RemoteResource & {
+ /**
+ * Returns an async iterator with live updates.
+ * Unlike awaiting the resource directly, this can only be used _outside_ render
+ * (i.e. in load functions, event handlers and so on)
+ */
+ run(): Promise>;
+ /** `true` if the live stream is currently connected. */
+ readonly connected: boolean;
+ /** `true` once the live stream iterator has completed. */
+ readonly finished: boolean;
+ /** Reconnects the live stream immediately. */
+ reconnect(): void;
+};
+
export type RemoteQueryOverride = () => void;
/**
@@ -2216,4 +2231,11 @@ export type RemoteQueryFunction = (
arg: undefined extends Input ? Input | void : Input
) => RemoteQuery;
+/**
+ * The return value of a remote `query.live` function. See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.live) for full documentation.
+ */
+export type RemoteLiveQueryFunction = (
+ arg: undefined extends Input ? Input | void : Input
+) => RemoteLiveQuery;
+
export * from './index.js';
diff --git a/packages/kit/src/runtime/app/server/remote/command.js b/packages/kit/src/runtime/app/server/remote/command.js
index 2265ee3e7e7a..5df57cd5b355 100644
--- a/packages/kit/src/runtime/app/server/remote/command.js
+++ b/packages/kit/src/runtime/app/server/remote/command.js
@@ -78,6 +78,7 @@ export function command(validate_or_fn, maybe_fn) {
}
state.remote.refreshes ??= {};
+ state.remote.reconnects ??= new Set();
const promise = Promise.resolve(
run_remote_function(event, state, true, () => validate(arg), fn)
diff --git a/packages/kit/src/runtime/app/server/remote/form.js b/packages/kit/src/runtime/app/server/remote/form.js
index 1e9c8f012872..707fc5935fdc 100644
--- a/packages/kit/src/runtime/app/server/remote/form.js
+++ b/packages/kit/src/runtime/app/server/remote/form.js
@@ -137,6 +137,7 @@ export function form(validate_or_fn, maybe_fn) {
}
state.remote.refreshes ??= {};
+ state.remote.reconnects ??= new Set();
const issue = create_issues();
diff --git a/packages/kit/src/runtime/app/server/remote/query.js b/packages/kit/src/runtime/app/server/remote/query.js
index eb64bd395e0d..7d6f3d5e1514 100644
--- a/packages/kit/src/runtime/app/server/remote/query.js
+++ b/packages/kit/src/runtime/app/server/remote/query.js
@@ -1,5 +1,5 @@
-/** @import { RemoteQuery, RemoteQueryFunction } from '@sveltejs/kit' */
-/** @import { RemoteInternals, MaybePromise, RequestState, RemoteQueryBatchInternals, RemoteQueryInternals } from 'types' */
+/** @import { RemoteLiveQuery, RemoteLiveQueryFunction, RemoteQuery, RemoteQueryFunction } from '@sveltejs/kit' */
+/** @import { RemoteInternals, MaybePromise, RequestState, RemoteQueryLiveInternals, RemoteQueryBatchInternals, RemoteQueryInternals } from 'types' */
/** @import { StandardSchemaV1 } from '@standard-schema/spec' */
import { get_request_store } from '@sveltejs/kit/internal/server';
import { create_remote_key, stringify, stringify_remote_arg } from '../../../shared.js';
@@ -113,6 +113,103 @@ export function mark_argument_validated(__, state, arg) {
return arg;
}
+/**
+ * Creates a live remote query. When called from the browser, the function will be invoked on the server via a streaming `fetch` call.
+ *
+ * See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.live) for full documentation.
+ *
+ * @template Output
+ * @overload
+ * @param {(arg: void) => MaybePromise | AsyncIterator | AsyncIterable>} fn
+ * @returns {RemoteLiveQueryFunction}
+ */
+/**
+ * @template Input
+ * @template Output
+ * @overload
+ * @param {'unchecked'} validate
+ * @param {(arg: Input) => MaybePromise | AsyncIterator | AsyncIterable>} fn
+ * @returns {RemoteLiveQueryFunction }
+ */
+/**
+ * @template {StandardSchemaV1} Schema
+ * @template Output
+ * @overload
+ * @param {Schema} schema
+ * @param {(arg: StandardSchemaV1.InferOutput) => MaybePromise | AsyncIterator | AsyncIterable>} fn
+ * @returns {RemoteLiveQueryFunction, Output>}
+ */
+/**
+ * @template Input
+ * @template Output
+ * @param {any} validate_or_fn
+ * @param {(args: Input) => MaybePromise | AsyncIterator | AsyncIterable>} [maybe_fn]
+ * @returns {RemoteLiveQueryFunction }
+ */
+/*@__NO_SIDE_EFFECTS__*/
+function live(validate_or_fn, maybe_fn) {
+ /** @type {(arg: Input) => MaybePromise | AsyncIterator | AsyncIterable>} */
+ const fn = maybe_fn ?? validate_or_fn;
+
+ /** @type {(arg?: any) => MaybePromise } */
+ const validate = create_validator(validate_or_fn, maybe_fn);
+
+ /**
+ * @param {any} event
+ * @param {any} state
+ * @param {any} arg
+ */
+ const run = async (event, state, arg) => {
+ return await run_remote_function(
+ event,
+ state,
+ false,
+ () => validate(arg),
+ async (input) => to_async_iterator(await fn(input), __.name)
+ );
+ };
+
+ /** @type {RemoteQueryLiveInternals} */
+ const __ = { type: 'query_live', id: '', name: '', run };
+
+ /** @type {RemoteLiveQueryFunction & { __: RemoteQueryLiveInternals }} */
+ const wrapper = (arg) => {
+ if (prerendering) {
+ throw new Error(
+ `Cannot call query.live '${__.name}' while prerendering, as prerendered pages need static data. Use 'prerender' from $app/server instead`
+ );
+ }
+
+ const { event, state } = get_request_store();
+
+ return create_live_query_resource(
+ __,
+ arg,
+ state,
+ async () => {
+ const iterator = await run(event, state, arg);
+
+ try {
+ const { value, done } = await iterator.next();
+
+ if (done) {
+ throw new Error(`query.live '${__.name}' did not yield a value`);
+ }
+
+ return value;
+ } finally {
+ await iterator.return?.();
+ }
+ },
+ async () => run(event, state, arg)
+ );
+ };
+
+ Object.defineProperty(wrapper, '__', { value: __ });
+
+ return wrapper;
+}
+
/**
* Creates a batch query function that collects multiple calls and executes them in a single request
*
@@ -334,8 +431,90 @@ function create_query_resource(__, arg, state, fn) {
};
}
+/**
+ * @param {RemoteQueryLiveInternals} __
+ * @param {any} arg
+ * @param {RequestState} state
+ * @param {() => Promise} get_first_value
+ * @param {() => MaybePromise>} get_iterator
+ * @returns {RemoteLiveQuery}
+ */
+function create_live_query_resource(__, arg, state, get_first_value, get_iterator) {
+ /** @type {Promise | null} */
+ let promise = null;
+
+ const get_promise = () => {
+ return (promise ??= get_response(__, arg, state, get_first_value));
+ };
+
+ return {
+ /** @type {Promise['catch']} */
+ catch(onrejected) {
+ return get_promise().catch(onrejected);
+ },
+ current: undefined,
+ error: undefined,
+ /** @type {Promise['finally']} */
+ finally(onfinally) {
+ return get_promise().finally(onfinally);
+ },
+ finished: false,
+ loading: true,
+ ready: false,
+ connected: false,
+ reconnect() {
+ const reconnects = state.remote.reconnects;
+
+ if (!reconnects) {
+ throw new Error(
+ `Cannot call reconnect on query.live '${__.name}' because it is not executed in the context of a command/form remote function`
+ );
+ }
+
+ reconnects.add(create_remote_key(__.id, stringify_remote_arg(arg, state.transport)));
+ },
+ async run() {
+ if (!state.is_in_universal_load) {
+ throw new Error(
+ 'On the server, .run() can only be called in universal `load` functions. Anywhere else, just await the query directly'
+ );
+ }
+
+ return get_iterator();
+ },
+ /** @type {Promise['then']} */
+ then(onfulfilled, onrejected) {
+ return get_promise().then(onfulfilled, onrejected);
+ },
+ get [Symbol.toStringTag]() {
+ return 'LiveQueryResource';
+ }
+ };
+}
+
// Add batch as a property to the query function
Object.defineProperty(query, 'batch', { value: batch, enumerable: true });
+Object.defineProperty(query, 'live', { value: live, enumerable: true });
+
+/**
+ * @template T
+ * @param {Generator | AsyncIterator | AsyncIterable} source
+ * @param {string} name
+ * @returns {AsyncIterator}
+ */
+function to_async_iterator(source, name) {
+ const maybe = /** @type {any} */ (source);
+
+ if (maybe && typeof maybe[Symbol.asyncIterator] === 'function') {
+ return maybe[Symbol.asyncIterator]();
+ }
+
+ if (maybe && typeof maybe.next === 'function') {
+ return maybe;
+ }
+
+ throw new Error(`query.live '${name}' must return an AsyncIterator or AsyncIterable`);
+}
/**
* @param {RemoteInternals} __
diff --git a/packages/kit/src/runtime/client/client.js b/packages/kit/src/runtime/client/client.js
index 4414c8daf9a7..d883a4431293 100644
--- a/packages/kit/src/runtime/client/client.js
+++ b/packages/kit/src/runtime/client/client.js
@@ -1,4 +1,4 @@
-/** @import { RemoteQueryCacheEntry } from './remote-functions/query.svelte.js' */
+/** @import { RemoteLiveQueryCacheEntry, RemoteQueryCacheEntry } from './remote-functions/query.svelte.js' */
import { BROWSER, DEV } from 'esm-env';
import * as svelte from 'svelte';
import { HttpError, Redirect, SvelteKitError } from '@sveltejs/kit/internal';
@@ -306,6 +306,12 @@ export let pending_invalidate;
*/
export const query_map = new Map();
+/**
+ * @type {Map>}
+ * A map of id -> live query info with all live queries that currently exist in the app.
+ */
+export const live_query_map = new Map();
+
/**
* @param {import('./types.js').SvelteKitApp} _app
* @param {HTMLElement} _target
diff --git a/packages/kit/src/runtime/client/remote-functions/command.svelte.js b/packages/kit/src/runtime/client/remote-functions/command.svelte.js
index 8c8983699f21..3a7e1b18c0d4 100644
--- a/packages/kit/src/runtime/client/remote-functions/command.svelte.js
+++ b/packages/kit/src/runtime/client/remote-functions/command.svelte.js
@@ -8,7 +8,8 @@ import { stringify_remote_arg } from '../../shared.js';
import {
get_remote_request_headers,
apply_refreshes,
- categorize_updates
+ categorize_updates,
+ reconnect_live_queries,
} from './shared.svelte.js';
/**
@@ -79,6 +80,10 @@ export function command(id) {
apply_refreshes(result.refreshes);
}
+ if (result.reconnects) {
+ reconnect_live_queries(result.reconnects);
+ }
+
return devalue.parse(result.result, app.decoders);
}
} finally {
diff --git a/packages/kit/src/runtime/client/remote-functions/form.svelte.js b/packages/kit/src/runtime/client/remote-functions/form.svelte.js
index 1af3bffdcc6a..7af9eecac4e3 100644
--- a/packages/kit/src/runtime/client/remote-functions/form.svelte.js
+++ b/packages/kit/src/runtime/client/remote-functions/form.svelte.js
@@ -7,7 +7,7 @@ import { DEV } from 'esm-env';
import { HttpError } from '@sveltejs/kit/internal';
import { app, query_responses, _goto, set_nearest_error_page, invalidateAll } from '../client.js';
import { tick } from 'svelte';
-import { apply_refreshes, categorize_updates } from './shared.svelte.js';
+import { apply_refreshes, categorize_updates, reconnect_live_queries } from './shared.svelte.js';
import { createAttachmentKey } from 'svelte/attachments';
import {
convert_formdata,
@@ -239,6 +239,10 @@ export function form(id) {
} else {
void invalidateAll();
}
+
+ if (form_result.reconnects) {
+ reconnect_live_queries(form_result.reconnects);
+ }
}
return succeeded;
@@ -247,6 +251,11 @@ export function form(id) {
if (stringified_refreshes) {
apply_refreshes(stringified_refreshes);
}
+
+ if (form_result.reconnects) {
+ reconnect_live_queries(form_result.reconnects);
+ }
+
// Use internal version to allow redirects to external URLs
void _goto(form_result.location, { invalidateAll: !stringified_refreshes }, 0);
return true;
diff --git a/packages/kit/src/runtime/client/remote-functions/index.js b/packages/kit/src/runtime/client/remote-functions/index.js
index 4b20cabddd92..6f71dd028e12 100644
--- a/packages/kit/src/runtime/client/remote-functions/index.js
+++ b/packages/kit/src/runtime/client/remote-functions/index.js
@@ -1,4 +1,4 @@
export { command } from './command.svelte.js';
export { form } from './form.svelte.js';
export { prerender } from './prerender.svelte.js';
-export { query, query_batch } from './query.svelte.js';
+export { query, query_batch, query_live } from './query.svelte.js';
diff --git a/packages/kit/src/runtime/client/remote-functions/query.svelte.js b/packages/kit/src/runtime/client/remote-functions/query.svelte.js
index 628872c35089..1755994c3f97 100644
--- a/packages/kit/src/runtime/client/remote-functions/query.svelte.js
+++ b/packages/kit/src/runtime/client/remote-functions/query.svelte.js
@@ -1,7 +1,7 @@
-/** @import { RemoteQueryFunction } from '@sveltejs/kit' */
+/** @import { RemoteLiveQueryFunction, RemoteQueryFunction } from '@sveltejs/kit' */
/** @import { RemoteFunctionResponse } from 'types' */
import { app_dir, base } from '$app/paths/internal/client';
-import { app, goto, query_map, query_responses } from '../client.js';
+import { app, goto, live_query_map, query_map, query_responses } from '../client.js';
import {
get_remote_request_headers,
QUERY_FUNCTION_ID,
@@ -25,6 +25,15 @@ import { create_remote_key, stringify_remote_arg, unfriendly_hydratable } from '
* }} RemoteQueryCacheEntry
*/
+/**
+ * @template T
+ * @typedef {{
+ * count: number;
+ * resource: LiveQuery;
+ * cleanup: () => void;
+ * }} RemoteLiveQueryCacheEntry
+ */
+
/**
* @returns {boolean} Returns `true` if we are in an effect
*/
@@ -72,6 +81,22 @@ export function query(id) {
return wrapper;
}
+/**
+ * @param {string} id
+ * @returns {RemoteLiveQueryFunction}
+ */
+export function query_live(id) {
+ if (DEV) {
+ for (const [key, entry] of live_query_map) {
+ if (key === id || key.startsWith(id + '/')) {
+ void entry.resource.reconnect();
+ }
+ }
+ }
+
+ return (arg) => new LiveQueryProxy(id, arg);
+}
+
/**
* @param {string} id
* @returns {RemoteQueryFunction}
@@ -402,6 +427,451 @@ export class Query {
}
}
+/**
+ * @param {Response} response
+ * @returns {Promise>}
+ */
+async function get_stream_reader(response) {
+ const content_type = response.headers.get('content-type') ?? '';
+
+ if (response.ok && content_type.includes('application/json')) {
+ // we can end up here if we e.g. redirect in `handle`
+ const result = await response.json();
+
+ if (result.type === 'redirect') {
+ await goto(result.location);
+ throw new Redirect(307, result.location);
+ }
+
+ if (result.type === 'error') {
+ throw new HttpError(result.status ?? 500, result.error);
+ }
+
+ throw new HttpError(500, 'Invalid query.live response');
+ }
+
+ if (!response.ok) {
+ const result = await response.json().catch(() => ({
+ type: 'error',
+ status: response.status,
+ error: response.statusText
+ }));
+
+ throw new HttpError(result.status ?? response.status ?? 500, result.error);
+ }
+
+ if (!response.body) {
+ throw new Error('Expected query.live response body to be a ReadableStream');
+ }
+
+ return response.body.getReader();
+}
+
+/**
+ * @param {ReadableStreamDefaultReader} reader
+ */
+function create_stream_reader(reader) {
+ let done = false;
+ let buffer = '';
+ const text_decoder = new TextDecoder();
+
+ return async () => {
+ while (true) {
+ const split = buffer.indexOf('\n');
+ if (split !== -1) {
+ const line = buffer.slice(0, split).trim();
+ buffer = buffer.slice(split + 1);
+
+ if (!line) continue;
+
+ const node = JSON.parse(line);
+
+ if (node.type === 'result') {
+ return devalue.parse(node.result, app.decoders);
+ }
+
+ if (node.type === 'redirect') {
+ await goto(node.location);
+ throw new Redirect(307, node.location);
+ }
+
+ if (node.type === 'error') {
+ throw new HttpError(node.status ?? 500, node.error);
+ }
+
+ throw new Error('Invalid query.live response');
+ }
+
+ if (done) {
+ if (buffer.trim()) {
+ const node = JSON.parse(buffer.trim());
+ buffer = '';
+
+ if (node.type === 'result') {
+ return devalue.parse(node.result, app.decoders);
+ }
+
+ if (node.type === 'redirect') {
+ await goto(node.location);
+ throw new Redirect(307, node.location);
+ }
+
+ if (node.type === 'error') {
+ throw new HttpError(node.status ?? 500, node.error);
+ }
+ }
+
+ return undefined;
+ }
+
+ const chunk = await reader.read();
+ done = chunk.done;
+ if (chunk.value) {
+ buffer += text_decoder.decode(chunk.value, { stream: true });
+ }
+ }
+ };
+}
+
+/**
+ * @template T
+ * @param {string} id
+ * @param {string} payload
+ * @returns {Promise>}
+ */
+async function create_live_iterator(id, payload) {
+ const controller = new AbortController();
+ const url = `${base}/${app_dir}/remote/${id}${payload ? `?payload=${payload}` : ''}`;
+ const response = await fetch(url, {
+ headers: get_remote_request_headers(),
+ signal: controller.signal
+ });
+ const reader = await get_stream_reader(response);
+ const next_value = create_stream_reader(reader);
+
+ let closed = false;
+
+ /** @type {AsyncIterableIterator} */
+ const iterator = {
+ [Symbol.asyncIterator]() {
+ return iterator;
+ },
+ async next() {
+ if (closed) {
+ return { value: undefined, done: true };
+ }
+
+ const value = await next_value();
+ if (value === undefined) {
+ closed = true;
+ return { value: undefined, done: true };
+ }
+
+ return { value, done: false };
+ },
+ async return(value) {
+ closed = true;
+ controller.abort();
+ try {
+ await reader.cancel();
+ } catch {
+ // already closed
+ }
+ return { value, done: true };
+ }
+ };
+
+ return iterator;
+}
+
+/**
+ * @template T
+ * @implements {Promise}
+ */
+export class LiveQuery {
+ _key;
+ #id;
+ #payload;
+ #loading = $state(true);
+ #ready = $state(false);
+ #connected = $state(false);
+ #finished = $state(false);
+ #version = $state(0);
+ /** @type {T | undefined} */
+ #raw = $state.raw();
+ /** @type {any} */
+ #error = $state.raw(undefined);
+ /** @type {Promise} */
+ #promise;
+
+ /** @type {Promise['then']} */
+ // @ts-expect-error TS doesn't understand that the promise returns something
+ #then = $derived.by(() => {
+ this.#version;
+ const p = this.#promise;
+
+ return (resolve, reject) => {
+ const result = p.then(tick).then(() => /** @type {T} */ (this.#raw));
+
+ if (resolve || reject) {
+ return result.then(resolve, reject);
+ }
+
+ return result;
+ };
+ });
+ /** @type {(value: T | PromiseLike) => void} */
+ #resolve_first;
+ /** @type {(reason?: any) => void} */
+ #reject_first;
+ #active = false;
+ #destroyed = false;
+ #attempt = 0;
+ /** @type {ReturnType | null} */
+ #retry_timer = null;
+ /** @type {AbortController | null} */
+ #controller = null;
+ #connection = 0;
+
+ /**
+ * @param {string} id
+ * @param {string} key
+ * @param {string} payload
+ */
+ constructor(id, key, payload) {
+ this.#id = id;
+ this._key = key;
+ this.#payload = payload;
+
+ const { promise, resolve, reject } = with_resolvers();
+ this.#promise = promise;
+ this.#resolve_first = resolve;
+ this.#reject_first = reject;
+
+ if (Object.hasOwn(query_responses, key)) {
+ this.#set_value(query_responses[key]);
+ this.#resolve_first(query_responses[key]);
+ }
+ }
+
+ #clear_retry() {
+ if (this.#retry_timer) {
+ clearTimeout(this.#retry_timer);
+ this.#retry_timer = null;
+ }
+ }
+
+ /** @param {T} value */
+ #set_value(value) {
+ this.#ready = true;
+ this.#loading = false;
+ this.#error = undefined;
+ this.#raw = value;
+ this.#version += 1;
+ }
+
+ #disconnect_current() {
+ this.#controller?.abort();
+ this.#controller = null;
+ this.#connected = false;
+ }
+
+ #schedule_reconnect() {
+ if (!this.#active || this.#destroyed || this.#finished || this.#retry_timer) return;
+
+ if (typeof navigator !== 'undefined' && navigator.onLine === false) {
+ return;
+ }
+
+ const base_delay = Math.min(250 * 2 ** this.#attempt, 10_000);
+ const jitter = base_delay * (Math.random() * 0.4 - 0.2);
+ const delay = Math.max(0, Math.round(base_delay + jitter));
+ this.#attempt += 1;
+
+ this.#retry_timer = setTimeout(() => {
+ this.#retry_timer = null;
+ void this.#connect_stream();
+ }, delay);
+ }
+
+ async #connect_stream() {
+ if (!this.#active || this.#destroyed || this.#finished) return;
+
+ const connection = ++this.#connection;
+ const controller = new AbortController();
+ this.#controller = controller;
+
+ const url = `${base}/${app_dir}/remote/${this.#id}${this.#payload ? `?payload=${this.#payload}` : ''}`;
+
+ try {
+ const response = await fetch(url, {
+ headers: get_remote_request_headers(),
+ signal: controller.signal
+ });
+
+ if (connection !== this.#connection || !this.#active || this.#destroyed) {
+ return;
+ }
+
+ const reader = await get_stream_reader(response);
+ const next_value = create_stream_reader(reader);
+ let finished = false;
+ this.#connected = true;
+ this.#attempt = 0;
+
+ while (this.#active && !this.#destroyed && connection === this.#connection) {
+ const value = await next_value();
+ if (value === undefined) {
+ finished = true;
+ break;
+ }
+
+ if (!this.#ready) {
+ this.#resolve_first(value);
+ }
+
+ this.#set_value(value);
+ }
+
+ if (finished && this.#active && !this.#destroyed && connection === this.#connection) {
+ this.#finished = true;
+ }
+ } catch (error) {
+ if (controller.signal.aborted || connection !== this.#connection) {
+ return;
+ }
+
+ this.#connected = false;
+ this.#error = /** @type {any} */ (error);
+ if (!this.#ready) {
+ this.#loading = false;
+ this.#reject_first(error);
+ }
+ } finally {
+ if (connection === this.#connection) {
+ this.#connected = false;
+ this.#controller = null;
+
+ if (this.#active && !this.#destroyed && !this.#finished) {
+ this.#schedule_reconnect();
+ }
+ }
+ }
+ }
+
+ #on_online = () => {
+ if (!this.#active || this.#destroyed || this.#finished) return;
+ this.#clear_retry();
+ void this.#connect_stream();
+ };
+
+ #on_offline = () => {
+ this.#disconnect_current();
+ };
+
+ #on_pagehide = () => {
+ this.#disconnect_current();
+ };
+
+ connect() {
+ this.#active = true;
+
+ if (typeof window !== 'undefined') {
+ window.addEventListener('online', this.#on_online);
+ window.addEventListener('offline', this.#on_offline);
+ window.addEventListener('pagehide', this.#on_pagehide);
+ window.addEventListener('beforeunload', this.#on_pagehide);
+ }
+
+ this.#clear_retry();
+ if (!this.#controller && !this.#finished) {
+ void this.#connect_stream();
+ }
+ }
+
+ disconnect() {
+ this.#active = false;
+ this.#clear_retry();
+ this.#disconnect_current();
+
+ if (typeof window !== 'undefined') {
+ window.removeEventListener('online', this.#on_online);
+ window.removeEventListener('offline', this.#on_offline);
+ window.removeEventListener('pagehide', this.#on_pagehide);
+ window.removeEventListener('beforeunload', this.#on_pagehide);
+ }
+ }
+
+ destroy() {
+ this.#destroyed = true;
+ this.disconnect();
+ }
+
+ get then() {
+ return this.#then;
+ }
+
+ get catch() {
+ this.#then;
+ return (/** @type {any} */ reject) => {
+ return this.#then(undefined, reject);
+ };
+ }
+
+ get finally() {
+ this.#then;
+ return (/** @type {any} */ fn) => {
+ return this.#then(
+ (value) => {
+ fn();
+ return value;
+ },
+ (error) => {
+ fn();
+ throw error;
+ }
+ );
+ };
+ }
+
+ get current() {
+ return this.#raw;
+ }
+
+ get error() {
+ return this.#error;
+ }
+
+ get loading() {
+ return this.#loading;
+ }
+
+ get ready() {
+ return this.#ready;
+ }
+
+ get connected() {
+ return this.#connected;
+ }
+
+ get finished() {
+ return this.#finished;
+ }
+
+ reconnect() {
+ if (!this.#active || this.#destroyed) return;
+ this.#finished = false;
+ this.#attempt = 0;
+ this.#clear_retry();
+ this.#disconnect_current();
+ void this.#connect_stream();
+ }
+
+ get [Symbol.toStringTag]() {
+ return 'LiveQuery';
+ }
+}
+
/**
* Manages the caching layer between the user and the actual {@link Query} instance. This is the thing
* the developer actually gets to interact with in their application code.
@@ -573,7 +1043,7 @@ class QueryProxy {
/** @type {Query['withOverride']} */
withOverride(fn) {
const entry = this.#get_or_create_cache_entry();
- const override = entry.resource.withOverride(fn);
+ const override = /** @type {Query} */ (entry.resource).withOverride(fn);
const release = /** @type {(() => void) & { [QUERY_OVERRIDE_KEY]: string }} */ (
() => {
@@ -609,3 +1079,166 @@ class QueryProxy {
return 'QueryProxy';
}
}
+
+/**
+ * @template T
+ * @implements {Promise}
+ */
+class LiveQueryProxy {
+ _key;
+ #id;
+ #payload;
+ #active = true;
+ #tracking = is_in_effect();
+
+ /**
+ * @param {string} id
+ * @param {any} arg
+ */
+ constructor(id, arg) {
+ this.#id = id;
+ this.#payload = stringify_remote_arg(arg, app.hooks.transport);
+ this._key = create_remote_key(id, this.#payload);
+
+ if (!this.#tracking) {
+ this.#active = false;
+ return;
+ }
+
+ const entry = this.#get_or_create_cache_entry();
+ entry.resource.connect();
+
+ $effect.pre(() => () => {
+ const die = this.#release(entry);
+ void tick().then(die);
+ });
+ }
+
+ /** @returns {RemoteLiveQueryCacheEntry} */
+ #get_or_create_cache_entry() {
+ let cached = live_query_map.get(this._key);
+
+ if (!cached) {
+ const c = (cached = {
+ count: 0,
+ resource: /** @type {LiveQuery} */ (/** @type {unknown} */ (null)),
+ cleanup: /** @type {() => void} */ (/** @type {unknown} */ (null))
+ });
+
+ c.cleanup = $effect.root(() => {
+ c.resource = new LiveQuery(this.#id, this._key, this.#payload);
+ });
+
+ live_query_map.set(this._key, cached);
+ }
+
+ cached.count += 1;
+
+ return /** @type {RemoteLiveQueryCacheEntry} */ (cached);
+ }
+
+ /**
+ * @param {RemoteLiveQueryCacheEntry} entry
+ * @param {boolean} [deactivate]
+ */
+ #release(entry, deactivate = true) {
+ this.#active &&= !deactivate;
+ entry.count -= 1;
+
+ return () => {
+ const cached = live_query_map.get(this._key);
+ if (cached?.count === 0) {
+ cached.resource.disconnect();
+ cached.resource.destroy();
+ cached.cleanup();
+ live_query_map.delete(this._key);
+ }
+ };
+ }
+
+ #get_cached_query() {
+ if (!this.#tracking) {
+ throw new Error(
+ 'This live query was not created in a reactive context and is limited to calling `.run` and `.reconnect`.'
+ );
+ }
+
+ if (!this.#active) {
+ throw new Error(
+ 'This query instance is no longer active and can no longer be used for reactive state access. ' +
+ 'This typically means you created the query in a tracking context and stashed it somewhere outside of a tracking context.'
+ );
+ }
+
+ const cached = live_query_map.get(this._key);
+
+ if (!cached) {
+ throw new Error(
+ 'No cached query found. This should be impossible. Please file a bug report.'
+ );
+ }
+
+ return /** @type {LiveQuery} */ (cached.resource);
+ }
+
+ #safe_get_cached_query() {
+ return live_query_map.get(this._key)?.resource;
+ }
+
+ get current() {
+ return this.#get_cached_query().current;
+ }
+
+ get error() {
+ return this.#get_cached_query().error;
+ }
+
+ get loading() {
+ return this.#get_cached_query().loading;
+ }
+
+ get ready() {
+ return this.#get_cached_query().ready;
+ }
+
+ get connected() {
+ return this.#get_cached_query().connected;
+ }
+
+ get finished() {
+ return this.#get_cached_query().finished;
+ }
+
+ run() {
+ if (is_in_effect()) {
+ throw new Error(
+ 'On the client, .run() can only be called outside render, e.g. in universal `load` functions and event handlers. In render, await the query directly'
+ );
+ }
+
+ return create_live_iterator(this.#id, this.#payload);
+ }
+
+ reconnect() {
+ this.#safe_get_cached_query()?.reconnect();
+ }
+
+ get then() {
+ const cached = this.#get_cached_query();
+ return cached.then.bind(cached);
+ }
+
+ get catch() {
+ const cached = this.#get_cached_query();
+ return cached.catch.bind(cached);
+ }
+
+ get finally() {
+ const cached = this.#get_cached_query();
+ return cached.finally.bind(cached);
+ }
+
+ get [Symbol.toStringTag]() {
+ return 'LiveQueryProxy';
+ }
+}
diff --git a/packages/kit/src/runtime/client/remote-functions/shared.svelte.js b/packages/kit/src/runtime/client/remote-functions/shared.svelte.js
index c74bf43076d7..8bd7dd21d51e 100644
--- a/packages/kit/src/runtime/client/remote-functions/shared.svelte.js
+++ b/packages/kit/src/runtime/client/remote-functions/shared.svelte.js
@@ -1,7 +1,7 @@
/** @import { RemoteFunctionResponse, RemoteRefreshMap } from 'types' */
/** @import { RemoteQueryUpdate } from '@sveltejs/kit' */
import * as devalue from 'devalue';
-import { app, goto, query_map } from '../client.js';
+import { app, goto, live_query_map, query_map } from '../client.js';
import { HttpError, Redirect } from '@sveltejs/kit/internal';
import { untrack } from 'svelte';
import { create_remote_key, split_remote_key } from '../../shared.js';
@@ -153,3 +153,10 @@ export function apply_refreshes(stringified_refreshes) {
}
}
}
+
+/** @param {string[]} reconnects */
+export function reconnect_live_queries(reconnects) {
+ for (const key of reconnects) {
+ live_query_map.get(key)?.resource.reconnect();
+ }
+}
diff --git a/packages/kit/src/runtime/server/remote.js b/packages/kit/src/runtime/server/remote.js
index 056859b8dbfe..2e4ea29853aa 100644
--- a/packages/kit/src/runtime/server/remote.js
+++ b/packages/kit/src/runtime/server/remote.js
@@ -121,7 +121,8 @@ async function handle_remote_call_internal(event, state, options, manifest, id)
/** @type {RemoteFunctionResponse} */ ({
type: 'result',
result: stringify(result, transport),
- refreshes: result.issues ? undefined : await serialize_refreshes()
+ refreshes: result.issues ? undefined : await serialize_refreshes(),
+ reconnects: serialize_reconnects()
})
);
}
@@ -137,11 +138,115 @@ async function handle_remote_call_internal(event, state, options, manifest, id)
/** @type {RemoteFunctionResponse} */ ({
type: 'result',
result: stringify(data, transport),
- refreshes: await serialize_refreshes()
+ refreshes: await serialize_refreshes(),
+ reconnects: serialize_reconnects()
})
);
}
+ if (internals.type === 'query_live') {
+ if (event.request.method !== 'GET') {
+ throw new SvelteKitError(
+ 405,
+ 'Method Not Allowed',
+ `\`query.live\` functions must be invoked via GET request, not ${event.request.method}`
+ );
+ }
+
+ const payload = /** @type {string} */ (
+ new URL(event.request.url).searchParams.get('payload')
+ );
+
+ const iterator = await internals.run(event, state, parse_remote_arg(payload, transport));
+
+ const encoder = new TextEncoder();
+
+ /**
+ * @param {ReadableStreamDefaultController} controller
+ * @param {any} payload
+ */
+ function send(controller, payload) {
+ controller.enqueue(encoder.encode(JSON.stringify(payload) + '\n'));
+ }
+
+ let closed = false;
+
+ /** @type {string | undefined} */
+ let result = undefined;
+
+ async function cancel() {
+ if (closed) return;
+ closed = true;
+ await iterator.return?.();
+ }
+
+ event.request.signal.addEventListener('abort', cancel, { once: true });
+
+ return new Response(
+ new ReadableStream({
+ async pull(controller) {
+ if (event.request.signal.aborted) {
+ await cancel();
+ controller.close();
+ return;
+ }
+
+ try {
+ while (true) {
+ const { value, done } = await iterator.next();
+
+ if (done) {
+ await cancel();
+ controller.close();
+ return;
+ }
+
+ // only send changed data
+ if (result !== (result = stringify(value, transport))) {
+ send(controller, {
+ type: 'result',
+ result
+ });
+
+ return;
+ }
+ }
+ } catch (error) {
+ if (!event.request.signal.aborted) {
+ if (error instanceof Redirect) {
+ send(controller, {
+ type: 'redirect',
+ location: error.location
+ });
+ } else {
+ const status =
+ error instanceof HttpError || error instanceof SvelteKitError
+ ? error.status
+ : 500;
+
+ send(controller, {
+ type: 'error',
+ error: await handle_error_and_jsonify(event, state, options, error),
+ status
+ });
+ }
+ }
+
+ await cancel();
+ controller.close();
+ }
+ },
+ cancel
+ }),
+ {
+ headers: {
+ 'cache-control': 'private, no-store',
+ 'content-type': 'application/x-ndjson'
+ }
+ }
+ );
+ }
+
const payload =
internals.type === 'prerender'
? additional_args
@@ -166,7 +271,8 @@ async function handle_remote_call_internal(event, state, options, manifest, id)
/** @type {RemoteFunctionResponse} */ ({
type: 'redirect',
location: error.location,
- refreshes: await serialize_refreshes()
+ refreshes: await serialize_refreshes(),
+ reconnects: serialize_reconnects()
})
);
}
@@ -221,6 +327,15 @@ async function handle_remote_call_internal(event, state, options, manifest, id)
return stringify(Object.fromEntries(results), transport);
}
+
+ function serialize_reconnects() {
+ const reconnects = state.remote.reconnects;
+ if (!reconnects || reconnects.size === 0) {
+ return undefined;
+ }
+
+ return Array.from(reconnects);
+ }
}
/**
diff --git a/packages/kit/src/runtime/server/respond.js b/packages/kit/src/runtime/server/respond.js
index e32ec56431e9..063ff6ee8545 100644
--- a/packages/kit/src/runtime/server/respond.js
+++ b/packages/kit/src/runtime/server/respond.js
@@ -152,6 +152,7 @@ export async function internal_respond(request, options, manifest, state) {
forms: null,
/** A map of remote function key to corresponding single-flight-mutation promise */
refreshes: null,
+ reconnects: null,
/** A map of remote function ID to payloads requested for refreshing by the client */
requested: null,
/**
diff --git a/packages/kit/src/types/internal.d.ts b/packages/kit/src/types/internal.d.ts
index de22d467faab..5f50d17a9640 100644
--- a/packages/kit/src/types/internal.d.ts
+++ b/packages/kit/src/types/internal.d.ts
@@ -303,6 +303,7 @@ export type RemoteFunctionResponse =
| (ServerRedirectNode & {
/** devalue'd Record */
refreshes?: string;
+ reconnects?: string[];
})
| ServerErrorNode
| {
@@ -310,6 +311,7 @@ export type RemoteFunctionResponse =
result: string;
/** devalue'd Record */
refreshes: string | undefined;
+ reconnects?: string[];
};
export type RemoteRefreshResult = {
@@ -596,11 +598,7 @@ export interface RemoteQueryInternals extends BaseRemoteInternals {
}
export interface RemoteQueryLiveInternals extends BaseRemoteInternals {
type: 'query_live';
- run(
- event: RequestEvent,
- state: RequestState,
- arg: any
- ): Promise<{ iterator: AsyncIterator; cancel: () => void }>;
+ run(event: RequestEvent, state: RequestState, arg: any): Promise>;
}
export interface RemoteQueryBatchInternals extends BaseRemoteInternals {
@@ -662,6 +660,7 @@ export interface RequestState {
>;
forms: null | Map;
refreshes: null | Record>;
+ reconnects: null | Set;
requested: null | Map;
validated: null | Map>;
};
diff --git a/packages/kit/test/apps/async/src/routes/remote/live/+page.svelte b/packages/kit/test/apps/async/src/routes/remote/live/+page.svelte
new file mode 100644
index 000000000000..6f080282e873
--- /dev/null
+++ b/packages/kit/test/apps/async/src/routes/remote/live/+page.svelte
@@ -0,0 +1,27 @@
+
+
+ increment()}>increment
+ reset()}>reset
+ notify_only()}>notify only
+ drop()}>drop connection
+ reconnect_live()}>reconnect live query
+ (show_live = !show_live)}>toggle live query
+refresh stats
+
+{#if show_live}
+
+{:else}
+ detached
+{/if}
+{stats}
diff --git a/packages/kit/test/apps/async/src/routes/remote/live/LiveView.svelte b/packages/kit/test/apps/async/src/routes/remote/live/LiveView.svelte
new file mode 100644
index 000000000000..8764fb324a9b
--- /dev/null
+++ b/packages/kit/test/apps/async/src/routes/remote/live/LiveView.svelte
@@ -0,0 +1,31 @@
+
+
+{String(live.ready)}
+{String(live.connected)}
+{live.current}
+{await live}
+ live.reconnect()}>reconnect
+
+{finite.current}
+{String(finite.connected)}
+{String(finite.finished)}
+ finite.reconnect()}>reconnect finite
+
+{duplicate_payload.current?.count}
+{duplicate_updates}
diff --git a/packages/kit/test/apps/async/src/routes/remote/live/live.remote.js b/packages/kit/test/apps/async/src/routes/remote/live/live.remote.js
new file mode 100644
index 000000000000..919b63e7ccaf
--- /dev/null
+++ b/packages/kit/test/apps/async/src/routes/remote/live/live.remote.js
@@ -0,0 +1,117 @@
+import { command, getRequestEvent, query } from '$app/server';
+
+let count = 0;
+let drop_next = false;
+let active_connections = 0;
+let cleanup_count = 0;
+let finite_connection_count = 0;
+
+/** @type {Set<() => void>} */
+const listeners = new Set();
+
+function notify() {
+ for (const listener of listeners) {
+ listener();
+ }
+
+ listeners.clear();
+}
+
+/** @param {AbortSignal} signal */
+function wait_for_change(signal) {
+ return new Promise((resolve) => {
+ const on_change = () => {
+ signal.removeEventListener('abort', on_abort);
+ resolve('changed');
+ };
+
+ const on_abort = () => {
+ listeners.delete(on_change);
+ resolve('aborted');
+ };
+
+ listeners.add(on_change);
+ signal.addEventListener('abort', on_abort, { once: true });
+ });
+}
+
+export const get_count = query.live(async function* () {
+ const signal = getRequestEvent().request.signal;
+
+ active_connections += 1;
+
+ try {
+ yield count;
+
+ while (true) {
+ const status = await wait_for_change(signal);
+
+ if (status === 'aborted') {
+ return;
+ }
+
+ if (drop_next) {
+ drop_next = false;
+ throw new Error('stream dropped');
+ }
+
+ yield count;
+ }
+ } finally {
+ active_connections -= 1;
+ cleanup_count += 1;
+ }
+});
+
+export const get_finite_count = query.live(async function* () {
+ finite_connection_count += 1;
+ yield count;
+});
+
+export const get_duplicate_payload = query.live(async function* () {
+ const signal = getRequestEvent().request.signal;
+
+ yield { count };
+
+ while (true) {
+ const status = await wait_for_change(signal);
+
+ if (status === 'aborted') {
+ return;
+ }
+
+ yield { count };
+ }
+});
+
+export const increment = command(() => {
+ count += 1;
+ notify();
+});
+
+export const reset = command(() => {
+ count = 0;
+ notify();
+});
+
+export const notify_only = command(() => {
+ notify();
+});
+
+export const drop = command(() => {
+ drop_next = true;
+ notify();
+});
+
+export const reconnect_live = command(() => {
+ get_count().reconnect();
+});
+
+export const get_stats = query(() => {
+ return {
+ active_connections,
+ cleanup_count,
+ finite_connection_count,
+ count
+ };
+});
diff --git a/packages/kit/test/apps/async/src/routes/remote/validation/+page.svelte b/packages/kit/test/apps/async/src/routes/remote/validation/+page.svelte
index ce19243df816..369e7fa5d400 100644
--- a/packages/kit/test/apps/async/src/routes/remote/validation/+page.svelte
+++ b/packages/kit/test/apps/async/src/routes/remote/validation/+page.svelte
@@ -3,6 +3,8 @@
import {
validated_query_no_args,
validated_query_with_arg,
+ validated_live_query_no_args,
+ validated_live_query_with_arg,
validated_prerendered_query_no_args,
validated_prerendered_query_with_arg,
validated_command_no_args,
@@ -17,6 +19,21 @@
}
}
+ async function read_live(resource) {
+ const iterator = await resource.run();
+
+ try {
+ const result = await iterator.next();
+ if (result.done) {
+ throw new Error('query.live did not yield a value');
+ }
+
+ return result.value;
+ } finally {
+ await iterator.return?.();
+ }
+ }
+
let status = $state('pending');
@@ -27,10 +44,12 @@
status = 'pending';
try {
validate_result(await validated_query_no_args().run());
+ validate_result(await read_live(validated_live_query_no_args()));
validate_result(await validated_prerendered_query_no_args());
validate_result(await validated_command_no_args());
validate_result(await validated_query_with_arg('valid').run());
+ validate_result(await read_live(validated_live_query_with_arg('valid')));
validate_result(await validated_prerendered_query_with_arg('valid'));
validate_result(await validated_command_with_arg('valid'));
@@ -56,15 +75,21 @@
} catch {
try {
// @ts-expect-error
- await validated_prerendered_query_no_args('invalid');
+ await validated_live_query_no_args('invalid').run();
status = 'error';
} catch {
try {
// @ts-expect-error
- await validated_command_no_args('invalid');
+ await validated_prerendered_query_no_args('invalid');
status = 'error';
} catch {
- status = 'success';
+ try {
+ // @ts-expect-error
+ await validated_command_no_args('invalid');
+ status = 'error';
+ } catch {
+ status = 'success';
+ }
}
}
}
@@ -85,18 +110,20 @@
status = 'wrong error message';
return;
}
+
try {
// @ts-expect-error
- await validated_prerendered_query_with_arg(1);
+ await validated_live_query_with_arg(1).run();
status = 'error';
} catch (e) {
if (!isHttpError(e) || e.body.message !== 'Input must be a string') {
status = 'wrong error message';
return;
}
+
try {
// @ts-expect-error
- await validated_command_with_arg(1);
+ await validated_prerendered_query_with_arg(1);
status = 'error';
} catch (e) {
if (!isHttpError(e) || e.body.message !== 'Input must be a string') {
@@ -106,14 +133,26 @@
try {
// @ts-expect-error
- await validated_batch_query_with_validation(123).run();
+ await validated_command_with_arg(1);
status = 'error';
} catch (e) {
if (!isHttpError(e) || e.body.message !== 'Input must be a string') {
status = 'wrong error message';
return;
}
- status = 'success';
+
+ try {
+ // @ts-expect-error
+ await validated_batch_query_with_validation(123).run();
+ status = 'error';
+ } catch (e) {
+ if (!isHttpError(e) || e.body.message !== 'Input must be a string') {
+ status = 'wrong error message';
+ return;
+ }
+
+ status = 'success';
+ }
}
}
}
@@ -130,6 +169,8 @@
// @ts-expect-error
validate_result(await validated_query_with_arg('valid', 'ignored').run());
// @ts-expect-error
+ validate_result(await read_live(validated_live_query_with_arg('valid', 'ignored')));
+ // @ts-expect-error
validate_result(await validated_prerendered_query_with_arg('valid', 'ignored'));
// @ts-expect-error
validate_result(await validated_command_with_arg('valid', 'ignored'));
diff --git a/packages/kit/test/apps/async/src/routes/remote/validation/validation.remote.js b/packages/kit/test/apps/async/src/routes/remote/validation/validation.remote.js
index 7061b401411f..e1f66420a2b1 100644
--- a/packages/kit/test/apps/async/src/routes/remote/validation/validation.remote.js
+++ b/packages/kit/test/apps/async/src/routes/remote/validation/validation.remote.js
@@ -19,6 +19,12 @@ export const validated_query_no_args = query(
export const validated_query_with_arg = query(schema, (...arg) =>
typeof arg[0] === 'string' && arg.length === 1 ? 'success' : 'failure'
);
+export const validated_live_query_no_args = query.live(function* (arg) {
+ yield arg === undefined ? 'success' : 'failure';
+});
+export const validated_live_query_with_arg = query.live(schema, function* (arg) {
+ yield typeof arg === 'string' ? 'success' : 'failure';
+});
export const validated_prerendered_query_no_args = prerender(
/** @param {string} [arg] */
diff --git a/packages/kit/test/apps/async/test/client.test.js b/packages/kit/test/apps/async/test/client.test.js
index 2e7fe18b38eb..6f530d23aab2 100644
--- a/packages/kit/test/apps/async/test/client.test.js
+++ b/packages/kit/test/apps/async/test/client.test.js
@@ -426,6 +426,126 @@ test.describe('remote function mutations', () => {
await expect(page.locator('#phrase')).toHaveText('i am your father');
});
+ test('query.live streams updates and reconnects after disconnect', async ({ page }) => {
+ await page.goto('/remote/live');
+ await page.click('#reset');
+
+ await expect(page.locator('#first-value')).toHaveText('0');
+ await expect(page.locator('#count')).toHaveText('0');
+ await expect(page.locator('#connected')).toHaveText('true');
+
+ await page.click('#increment');
+ await expect(page.locator('#count')).toHaveText('1');
+ await expect(page.locator('#first-value')).toHaveText('1');
+
+ await page.click('#drop');
+
+ await page.click('#increment');
+ await expect(page.locator('#count')).toHaveText('2');
+
+ await page.click('#reconnect');
+ await expect(page.locator('#connected')).toHaveText('true');
+ });
+
+ test('query.live marks finite iterators as finished and only reconnects explicitly', async ({
+ page
+ }) => {
+ await page.goto('/remote/live');
+ await page.click('#reset');
+
+ await expect(page.locator('#finite-finished')).toHaveText('true');
+ await expect(page.locator('#finite-connected')).toHaveText('false');
+
+ await page.waitForTimeout(200);
+ await page.click('#stats');
+ await expect(page.locator('#stats-value')).not.toHaveText('pending');
+ const stats = JSON.parse((await page.locator('#stats-value').textContent()) ?? '{}');
+ const before = stats.finite_connection_count;
+
+ await page.waitForTimeout(200);
+ await page.click('#stats');
+ await expect(page.locator('#stats-value')).not.toHaveText('pending');
+ const after_wait = JSON.parse((await page.locator('#stats-value').textContent()) ?? '{}');
+ expect(after_wait.finite_connection_count).toBe(before);
+
+ await page.click('#finite-reconnect');
+ await expect
+ .poll(async () => {
+ await page.click('#stats');
+ await expect(page.locator('#stats-value')).not.toHaveText('pending');
+ return JSON.parse((await page.locator('#stats-value').textContent()) ?? '{}')
+ .finite_connection_count;
+ })
+ .toBeGreaterThan(before);
+ await expect(page.locator('#finite-finished')).toHaveText('true');
+ });
+
+ test('query.live can be reconnected from server command handlers', async ({ page }) => {
+ await page.goto('/remote/live');
+ await page.click('#reset');
+
+ await page.click('#stats');
+ await expect(page.locator('#stats-value')).not.toHaveText('pending');
+ const before = JSON.parse((await page.locator('#stats-value').textContent()) ?? '{}');
+
+ await page.click('#reconnect-live');
+
+ await expect
+ .poll(async () => {
+ await page.click('#stats');
+ const value = (await page.locator('#stats-value').textContent()) ?? '{}';
+ if (value === 'pending') return before.cleanup_count;
+ return JSON.parse(value).cleanup_count;
+ })
+ .toBeGreaterThan(before.cleanup_count);
+
+ await expect(page.locator('#connected')).toHaveText('true');
+ });
+
+ test('query.live can be detached from the page', async ({ page }) => {
+ await page.goto('/remote/live');
+ await page.click('#reset');
+ await expect(page.locator('#count')).toHaveText('0');
+
+ await page.click('#toggle-live');
+ await expect(page.locator('#detached')).toHaveText('detached');
+ });
+
+ test('query.live does not resend unchanged devalue payloads', async ({ page }) => {
+ await page.goto('/remote/live');
+ await page.click('#reset');
+
+ await expect(page.locator('#duplicate-payload-count')).toHaveText('0');
+ const before = Number(await page.locator('#duplicate-updates').textContent());
+
+ await page.click('#notify-only');
+ await page.waitForTimeout(100);
+ await expect(page.locator('#duplicate-payload-count')).toHaveText('0');
+ await expect(page.locator('#duplicate-updates')).toHaveText(String(before));
+ });
+
+ test('query.live cleans up server iterator on reload', async ({ page }) => {
+ await page.goto('/remote/live');
+ await page.click('#stats');
+ await expect(page.locator('#stats-value')).not.toHaveText('pending');
+ const before_cleanup = JSON.parse(
+ (await page.locator('#stats-value').textContent()) ?? '{}'
+ ).cleanup_count;
+
+ await page.reload();
+ await expect(page.locator('#count')).toBeVisible();
+
+ await expect
+ .poll(async () => {
+ await page.click('#stats');
+ const value = (await page.locator('#stats-value').textContent()) ?? '{}';
+ if (value === 'pending') return before_cleanup;
+ const stats = JSON.parse(value);
+ return stats.cleanup_count;
+ })
+ .toBeGreaterThan(before_cleanup);
+ });
+
test.describe('query runtime guardrails', () => {
test('query created outside tracking context can run but cannot expose reactive state', async ({
page
diff --git a/packages/kit/test/apps/async/test/test.js b/packages/kit/test/apps/async/test/test.js
index 1099624a885b..9a8b5ca4e063 100644
--- a/packages/kit/test/apps/async/test/test.js
+++ b/packages/kit/test/apps/async/test/test.js
@@ -20,6 +20,11 @@ test.describe('remote functions', () => {
await expect(page.locator('body')).not.toContainText('Loading todo');
});
+ test('query.live renders the first yielded value during SSR', async ({ page }) => {
+ await page.goto('/remote/live');
+ await expect(page.locator('#first-value')).toHaveText('0');
+ });
+
test('run is blocked during server render', async ({ page }) => {
await page.goto('/remote/query-runtime-errors/run-in-render');
await expect(page.locator('#error')).toContainText(
diff --git a/packages/kit/test/types/remote.test.ts b/packages/kit/test/types/remote.test.ts
index 33be713459ed..f6ae062d7729 100644
--- a/packages/kit/test/types/remote.test.ts
+++ b/packages/kit/test/types/remote.test.ts
@@ -3,6 +3,7 @@ import { StandardSchemaV1 } from '@standard-schema/spec';
import {
RemoteForm,
RemoteFormInput,
+ RemoteLiveQueryFunction,
RemotePrerenderFunction,
RemoteQueryFunction,
invalid
@@ -83,6 +84,61 @@ function query_tests() {
}
query_tests();
+function live_query_tests() {
+ const no_args: RemoteLiveQueryFunction = query.live(async function* () {
+ yield 'hello';
+ });
+ void no_args();
+ // @ts-expect-error
+ void no_args('');
+
+ const one_arg: RemoteLiveQueryFunction = query.live(
+ 'unchecked',
+ async function* (a: number) {
+ yield a.toString();
+ }
+ );
+ void one_arg(1);
+ // @ts-expect-error
+ void one_arg('1');
+ // @ts-expect-error
+ void one_arg();
+
+ async function live_without_args() {
+ const q = query.live(async function* () {
+ yield 'Hello world';
+ });
+
+ const result: string = await q();
+ result;
+
+ const iterator: AsyncIterator = await q().run();
+ iterator;
+
+ q().connected === true;
+ q().finished === false;
+ q().reconnect();
+ // @ts-expect-error
+ q().refresh();
+ // @ts-expect-error
+ q().set('x');
+ }
+ void live_without_args();
+
+ async function live_with_schema() {
+ const q: RemoteLiveQueryFunction = query.live(schema, async function* (a) {
+ yield a;
+ });
+
+ const result: string = await q('x');
+ result;
+ // @ts-expect-error
+ void q(123);
+ }
+ void live_with_schema();
+}
+live_query_tests();
+
function prerender_tests() {
const no_args: RemotePrerenderFunction = prerender(() => 'Hello world');
void no_args();
diff --git a/packages/kit/types/index.d.ts b/packages/kit/types/index.d.ts
index d23a0a4b7380..b511072b1449 100644
--- a/packages/kit/types/index.d.ts
+++ b/packages/kit/types/index.d.ts
@@ -2174,6 +2174,21 @@ declare module '@sveltejs/kit' {
withOverride(update: (current: T) => T): RemoteQueryOverride;
};
+ export type RemoteLiveQuery = RemoteResource & {
+ /**
+ * Returns an async iterator with live updates.
+ * Unlike awaiting the resource directly, this can only be used _outside_ render
+ * (i.e. in load functions, event handlers and so on)
+ */
+ run(): Promise>;
+ /** `true` if the live stream is currently connected. */
+ readonly connected: boolean;
+ /** `true` once the live stream iterator has completed. */
+ readonly finished: boolean;
+ /** Reconnects the live stream immediately. */
+ reconnect(): void;
+ };
+
export type RemoteQueryOverride = () => void;
/**
@@ -2189,6 +2204,13 @@ declare module '@sveltejs/kit' {
export type RemoteQueryFunction = (
arg: undefined extends Input ? Input | void : Input
) => RemoteQuery;
+
+ /**
+ * The return value of a remote `query.live` function. See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.live) for full documentation.
+ */
+ export type RemoteLiveQueryFunction = (
+ arg: undefined extends Input ? Input | void : Input
+ ) => RemoteLiveQuery;
interface AdapterEntry {
/**
* A string that uniquely identifies an HTTP service (e.g. serverless function) and is used for deduplication.
@@ -3254,7 +3276,7 @@ declare module '$app/paths' {
}
declare module '$app/server' {
- import type { RequestEvent, RemoteCommand, RemoteForm, RemoteFormInput, InvalidField, RemotePrerenderFunction, RemoteQueryFunction, RequestedResult } from '@sveltejs/kit';
+ import type { RequestEvent, RemoteCommand, RemoteForm, RemoteFormInput, InvalidField, RemotePrerenderFunction, RemoteQueryFunction, RemoteLiveQueryFunction, RequestedResult } from '@sveltejs/kit';
import type { StandardSchemaV1 } from '@standard-schema/spec';
/**
* Read the contents of an imported asset from the filesystem
@@ -3399,6 +3421,17 @@ declare module '$app/server' {
* @since 2.35
*/
function batch(schema: Schema, fn: (args: StandardSchemaV1.InferOutput[]) => MaybePromise<(arg: StandardSchemaV1.InferOutput, idx: number) => Output>): RemoteQueryFunction, Output>;
+ /**
+ * Creates a live remote query. When called from the browser, the function will be invoked on the server via a streaming `fetch` call.
+ *
+ * See [Remote functions](https://svelte.dev/docs/kit/remote-functions#query.live) for full documentation.
+ *
+ * */
+ function live(fn: (arg: void) => MaybePromise | AsyncIterator | AsyncIterable>): RemoteLiveQueryFunction;
+
+ function live (validate: "unchecked", fn: (arg: Input) => MaybePromise | AsyncIterator | AsyncIterable>): RemoteLiveQueryFunction ;
+
+ function live(schema: Schema, fn: (arg: StandardSchemaV1.InferOutput) => MaybePromise | AsyncIterator | AsyncIterable>): RemoteLiveQueryFunction, Output>;
}
/**
* In the context of a remote `command` or `form` request, returns an iterable
@@ -3683,4 +3716,4 @@ declare module '$app/types' {
export type Asset = ReturnType;
}
-//# sourceMappingURL=index.d.ts.map
\ No newline at end of file
+//# sourceMappingURL=index.d.ts.map