diff --git a/doc/implementation planning/client-pool.md b/doc/implementation planning/client-pool.md new file mode 100644 index 0000000000..9dd91726f1 --- /dev/null +++ b/doc/implementation planning/client-pool.md @@ -0,0 +1,3 @@ +# Client pool + +WIP diff --git a/src/domain/RootViewModel.js b/src/domain/RootViewModel.js index 2896fba612..a009c321c0 100644 --- a/src/domain/RootViewModel.js +++ b/src/domain/RootViewModel.js @@ -22,6 +22,7 @@ import {LogoutViewModel} from "./LogoutViewModel"; import {ForcedLogoutViewModel} from "./ForcedLogoutViewModel"; import {SessionPickerViewModel} from "./SessionPickerViewModel.js"; import {ViewModel} from "./ViewModel"; +import {ClientPool} from "../pool/ClientPool"; export class RootViewModel extends ViewModel { constructor(options) { @@ -158,11 +159,11 @@ export class RootViewModel extends ViewModel { } _showSessionLoader(sessionId) { - const client = new Client(this.platform, this.features); - client.startWithExistingSession(sessionId); + this._clientPool = new ClientPool(this.platform, this.features); + const clientProxy = this._clientPool.loadSession(sessionId); this._setSection(() => { this._sessionLoadViewModel = new SessionLoadViewModel(this.childOptions({ - client, + clientProxy, ready: client => this._showSession(client) })); this._sessionLoadViewModel.start(); diff --git a/src/domain/SessionLoadViewModel.js b/src/domain/SessionLoadViewModel.js index 6a63145f4a..af1a9a8eb3 100644 --- a/src/domain/SessionLoadViewModel.js +++ b/src/domain/SessionLoadViewModel.js @@ -22,8 +22,8 @@ import {ViewModel} from "./ViewModel"; export class SessionLoadViewModel extends ViewModel { constructor(options) { super(options); - const {client, ready, homeserver, deleteSessionOnCancel} = options; - this._client = client; + const {clientProxy, ready, homeserver, deleteSessionOnCancel} = options; + this._clientProxy = clientProxy; this._ready = ready; this._homeserver = homeserver; this._deleteSessionOnCancel = deleteSessionOnCancel; @@ -31,7 +31,6 @@ export class SessionLoadViewModel extends ViewModel { this._error = null; this.backUrl = this.urlRouter.urlForSegment("session", true); this._accountSetupViewModel = undefined; - } async start() { @@ -41,16 +40,16 @@ export class SessionLoadViewModel extends ViewModel { try { this._loading = true; this.emitChange("loading"); - this._waitHandle = this._client.loadStatus.waitFor(s => { + this._waitHandle = this._clientProxy.loadStatus().waitFor(s => { if (s === LoadStatus.AccountSetup) { - this._accountSetupViewModel = new AccountSetupViewModel(this.childOptions({accountSetup: this._client.accountSetup})); + this._accountSetupViewModel = new AccountSetupViewModel(this.childOptions({accountSetup: this._clientProxy.accountSetup()})); } else { this._accountSetupViewModel = undefined; } this.emitChange("loadLabel"); // wait for initial sync, but not catchup sync const isCatchupSync = s === LoadStatus.FirstSync && - this._client.sync.status.get() === SyncStatus.CatchupSync; + this._clientProxy.syncStatus().get() === SyncStatus.CatchupSync; return isCatchupSync || s === LoadStatus.LoginFailed || s === LoadStatus.Error || @@ -67,14 +66,14 @@ export class SessionLoadViewModel extends ViewModel { // much like we will once you are in the app. Probably a good idea // did it finish or get stuck at LoginFailed or Error? - const loadStatus = this._client.loadStatus.get(); - const loadError = this._client.loadError; + const loadStatus = this._clientProxy.loadStatus().get(); + const loadError = this._clientProxy.loadError(); if (loadStatus === LoadStatus.FirstSync || loadStatus === LoadStatus.Ready) { - const client = this._client; + const client = this._clientProxy.client; // session container is ready, // don't dispose it anymore when // we get disposed - this._client = null; + this._clientProxy = null; this._ready(client); } if (loadError) { @@ -92,9 +91,9 @@ export class SessionLoadViewModel extends ViewModel { dispose() { - if (this._client) { - this._client.dispose(); - this._client = null; + if (this._clientProxy) { + this._clientProxy.dispose(); + this._clientProxy = null; } if (this._waitHandle) { // rejects with AbortError @@ -105,23 +104,23 @@ export class SessionLoadViewModel extends ViewModel { // to show a spinner or not get loading() { - const client = this._client; - if (client && client.loadStatus.get() === LoadStatus.AccountSetup) { + const clientProxy = this._clientProxy; + if (clientProxy && clientProxy.loadStatus().get() === LoadStatus.AccountSetup) { return false; } return this._loading; } get loadLabel() { - const client = this._client; + const clientProxy = this._clientProxy; const error = this._getError(); - if (error || (client && client.loadStatus.get() === LoadStatus.Error)) { + if (error || (clientProxy && clientProxy.loadStatus().get() === LoadStatus.Error)) { return `Something went wrong: ${error && error.message}.`; } // Statuses related to login are handled by respective login view models - if (client) { - switch (client.loadStatus.get()) { + if (clientProxy) { + switch (clientProxy.loadStatus().get()) { case LoadStatus.QueryAccount: return `Querying account encryption setup…`; case LoadStatus.AccountSetup: @@ -133,7 +132,7 @@ export class SessionLoadViewModel extends ViewModel { case LoadStatus.FirstSync: return `Getting your conversations from the server…`; default: - return this._client.loadStatus.get(); + return clientProxy.loadStatus().get(); } } @@ -141,7 +140,7 @@ export class SessionLoadViewModel extends ViewModel { } _getError() { - return this._error || this._client?.loadError; + return this._error || this._clientProxy.loadError(); } get hasError() { @@ -154,7 +153,7 @@ export class SessionLoadViewModel extends ViewModel { } async logout() { - await this._client.startLogout(this.navigation.path.get("session").value); + await this._clientProxy.startLogout(this.navigation.path.get("session").value); this.navigation.push("session", true); } diff --git a/src/matrix/Client.js b/src/matrix/Client.js index fabb489b67..d3354d6919 100644 --- a/src/matrix/Client.js +++ b/src/matrix/Client.js @@ -32,6 +32,7 @@ import {SSOLoginHelper} from "./login/SSOLoginHelper"; import {getDehydratedDevice} from "./e2ee/Dehydration.js"; import {Registration} from "./registration/Registration"; import {FeatureSet} from "../features"; +import {SyncInWorker} from "./SyncInWorker"; export const LoadStatus = createEnum( "NotLoading", @@ -291,7 +292,7 @@ export class Client { await log.wrap("createIdentity", log => this._session.createIdentity(log)); } - this._sync = new Sync({hsApi: this._requestScheduler.hsApi, storage: this._storage, session: this._session, logger: this._platform.logger}); + this._sync = new SyncInWorker({hsApi: this._requestScheduler.hsApi, storage: this._storage, session: this._session, logger: this._platform.logger}); // notify sync and session when back online this._reconnectSubscription = this._reconnector.connectionStatus.subscribe(state => { if (state === ConnectionStatus.Online) { @@ -502,7 +503,7 @@ export class Client { } } -class AccountSetup { +export class AccountSetup { constructor(encryptedDehydratedDevice, finishStage) { this._encryptedDehydratedDevice = encryptedDehydratedDevice; this._dehydratedDevice = undefined; diff --git a/src/matrix/SyncInWorker.ts b/src/matrix/SyncInWorker.ts new file mode 100644 index 0000000000..8c3648bda5 --- /dev/null +++ b/src/matrix/SyncInWorker.ts @@ -0,0 +1,34 @@ +import {Sync, SyncStatus} from "./Sync"; +import {HomeServerApi} from "./net/HomeServerApi"; +import {Session} from "./Session"; +import {Storage} from "./storage/idb/Storage"; +import {Logger} from "../logging/Logger"; + +interface SyncOptions { + hsApi: HomeServerApi, + session: Session, + storage: Storage, + logger: Logger +} + +export class SyncInWorker extends Sync { + constructor(options: SyncOptions) { + super(options); + } + + get status(): SyncStatus { + return super.status; + } + + get error(): Error { + return super.error; + } + + start(): void { + super.start(); + } + + stop(): void { + super.stop(); + } +} diff --git a/src/matrix/sessioninfo/localstorage/SessionInfoStorage.ts b/src/matrix/sessioninfo/localstorage/SessionInfoStorage.ts index ebe575f65d..d8225c9c13 100644 --- a/src/matrix/sessioninfo/localstorage/SessionInfoStorage.ts +++ b/src/matrix/sessioninfo/localstorage/SessionInfoStorage.ts @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -interface ISessionInfo { +export interface ISessionInfo { id: string; deviceId: string; userId: string; diff --git a/src/platform/web/Platform.js b/src/platform/web/Platform.js index be8c997078..6c4ac5e69f 100644 --- a/src/platform/web/Platform.js +++ b/src/platform/web/Platform.js @@ -43,6 +43,7 @@ import {MediaDevicesWrapper} from "./dom/MediaDevices"; import {DOMWebRTC} from "./dom/WebRTC"; import {ThemeLoader} from "./theming/ThemeLoader"; import {TimeFormatter} from "./dom/TimeFormatter"; +import {SyncWorkerPool} from "./worker-sync/SyncWorkerPool"; function addScript(src) { return new Promise(function (resolve, reject) { @@ -149,6 +150,7 @@ export class Platform { this._serviceWorkerHandler = new ServiceWorkerHandler(); this._serviceWorkerHandler.registerAndStart(assetPaths.serviceWorker); } + this._syncWorkerPool = null; this.notificationService = undefined; // Only try to use crypto when olm is provided if(this._assetPaths.olm) { @@ -173,6 +175,11 @@ export class Platform { this.mediaDevices = new MediaDevicesWrapper(navigator.mediaDevices); this.webRTC = new DOMWebRTC(); this._themeLoader = import.meta.env.DEV? null: new ThemeLoader(this); + + if (assetPaths.syncWorker && window.Worker) { + this._syncWorkerPool = new SyncWorkerPool(this._assetPaths.syncWorker, this.sessionInfoStorage); + this._syncWorkerPool.add("1646528482480255"); + } } async init() { diff --git a/src/platform/web/sdk/paths/vite.js b/src/platform/web/sdk/paths/vite.js index 48a17da45a..44d3e26747 100644 --- a/src/platform/web/sdk/paths/vite.js +++ b/src/platform/web/sdk/paths/vite.js @@ -2,6 +2,7 @@ import _downloadSandboxPath from "../../assets/download-sandbox.html?url"; // @ts-ignore import _workerPath from "../../worker/main.js?url"; +import _syncWorkerPath from "../../worker-sync/sync-worker.js?url"; // @ts-ignore import olmWasmPath from "@matrix-org/olm/olm.wasm?url"; // @ts-ignore @@ -12,6 +13,7 @@ import olmLegacyJsPath from "@matrix-org/olm/olm_legacy.js?url"; export default { downloadSandbox: _downloadSandboxPath, worker: _workerPath, + syncWorker: _syncWorkerPath, olm: { wasm: olmWasmPath, legacyBundle: olmLegacyJsPath, diff --git a/src/platform/web/worker-sync/SyncWorker.ts b/src/platform/web/worker-sync/SyncWorker.ts new file mode 100644 index 0000000000..d49fa3597f --- /dev/null +++ b/src/platform/web/worker-sync/SyncWorker.ts @@ -0,0 +1,69 @@ +import {ISessionInfo} from "../../../matrix/sessioninfo/localstorage/SessionInfoStorage"; +import {HomeServerApi} from "../../../matrix/net/HomeServerApi"; +import {createFetchRequest} from "../dom/request/fetch"; +import {Clock} from "../dom/Clock"; +import {Reconnector} from "../../../matrix/net/Reconnector"; +import {ExponentialRetryDelay} from "../../../matrix/net/ExponentialRetryDelay"; +import {OnlineStatus} from "../dom/OnlineStatus"; + +type Payload = object; + +export enum SyncWorkerMessageType { + StartSync, +} + +interface Message { + type: SyncWorkerMessageType, + payload: Payload +} + +export interface StartSyncPayload extends Payload { + sessionInfo: ISessionInfo, +} + +class SyncWorker { + private _clock: Clock; + private _reconnector: Reconnector; + + async start(payload: StartSyncPayload): Promise { + const sessionInfo = payload.sessionInfo; + console.log(`Starting sync worker for session with id ${sessionInfo.id}`); + + this._clock = new Clock; + + this._reconnector = new Reconnector({ + onlineStatus: new OnlineStatus(), + retryDelay: new ExponentialRetryDelay(this._clock.createTimeout), + createMeasure: this._clock.createMeasure + }); + + const hsApi = new HomeServerApi({ + homeserver: sessionInfo.homeserver, + accessToken: sessionInfo.accessToken, + request: createFetchRequest(this._clock.createTimeout), + reconnector: this._reconnector, + }); + + + return payload; + } +} + +const worker = new SyncWorker(); +// @ts-ignore +self.syncWorker = worker; + +self.onmessage = (event: MessageEvent) => { + const data: Message = event.data; + + let promise: Promise; + switch (data.type) { + case SyncWorkerMessageType.StartSync: + promise = worker.start(data.payload as StartSyncPayload); + break; + } + + promise.then((reply: Payload) => { + postMessage(reply); + }).catch(error => console.error(error)) +}; diff --git a/src/platform/web/worker-sync/SyncWorkerPool.ts b/src/platform/web/worker-sync/SyncWorkerPool.ts new file mode 100644 index 0000000000..778368e1d1 --- /dev/null +++ b/src/platform/web/worker-sync/SyncWorkerPool.ts @@ -0,0 +1,44 @@ +import {SyncWorkerMessageType} from "./SyncWorker"; +import {SessionInfoStorage} from "../../../matrix/sessioninfo/localstorage/SessionInfoStorage"; + +export type SessionId = string; + +export class SyncWorkerPool { + private readonly _workers: Map = new Map; + private readonly _path: string; + private readonly _sessionInfoStorage: SessionInfoStorage; + + constructor(path: string, sessionInfoStorage: SessionInfoStorage) { + this._path = path; + this._sessionInfoStorage = sessionInfoStorage; + } + + async add(sessionId: SessionId) { + if (this._workers.size > 0) { + throw "Currently there can only be one active sync worker"; + } + + if (this._workers.has(sessionId)) { + throw `Session with id ${sessionId} already has a sync worker`; + } + + const worker = new Worker(this._path, {type: "module"}); + this._workers.set(sessionId, worker); + worker.onmessage = event => { + const data = event.data; + console.log(data); + } + + const sessionInfo = await this._sessionInfoStorage.get(sessionId); + worker.postMessage({ + type: SyncWorkerMessageType.StartSync, + payload: { + sessionInfo: sessionInfo + }, + }); + } + + remove(sessionId: SessionId) { + // TODO + } +} diff --git a/src/platform/web/worker-sync/sync-worker.js b/src/platform/web/worker-sync/sync-worker.js new file mode 100644 index 0000000000..21c3ccb3d3 --- /dev/null +++ b/src/platform/web/worker-sync/sync-worker.js @@ -0,0 +1 @@ +import "./SyncWorker"; diff --git a/src/pool/ClientPool.ts b/src/pool/ClientPool.ts new file mode 100644 index 0000000000..dfdf2de5c1 --- /dev/null +++ b/src/pool/ClientPool.ts @@ -0,0 +1,32 @@ +import {Platform} from "../platform/web/Platform"; +import {FeatureSet} from "../features"; +import {Client} from "../matrix/Client"; +import {ClientProxy} from "./ClientProxy"; + +export type SessionId = string; + +export class ClientPool { + private readonly _clients: Map = new Map; + private readonly _platform: Platform; + private readonly _features: FeatureSet; + + constructor(platform: Platform, features: FeatureSet) { + this._platform = platform; + this._features = features; + } + + loadSession(sessionId: SessionId): ClientProxy { + const client = new Client(this._platform, this._features); + this._clients.set(sessionId, client); + + // TODO REFACTOR: Handle case where session doesn't yet exist. + client.startWithExistingSession(sessionId); + + return new ClientProxy(sessionId, this); + } + + // TODO REFACTOR: Make this method private since client should not be exposed. + client(sessionId: SessionId): Client | undefined { + return this._clients.get(sessionId); + } +} diff --git a/src/pool/ClientProxy.ts b/src/pool/ClientProxy.ts new file mode 100644 index 0000000000..fd3813ccfd --- /dev/null +++ b/src/pool/ClientProxy.ts @@ -0,0 +1,43 @@ +import {ClientPool, SessionId} from "./ClientPool"; +import {AccountSetup, Client, LoadStatus} from "../matrix/Client"; +import {SyncStatus} from "../matrix/Sync"; +import {ObservableValue} from "../observable/value"; + +export class ClientProxy { + private readonly _sessionId: SessionId; + private readonly _clientPool: ClientPool; + + constructor(sessionId: SessionId, clientPool: ClientPool) { + this._sessionId = sessionId; + this._clientPool = clientPool; + } + + // TODO REFACTOR: Make this method private since client should not be exposed. + public get client(): Client { + return this._clientPool.client(this._sessionId); + } + + loadStatus(): ObservableValue { + return this.client.loadStatus; + } + + loadError(): Error { + return this.client.loadError; + } + + accountSetup(): AccountSetup { + return this.client.accountSetup; + } + + syncStatus(): ObservableValue { + return this.client.sync.status; + } + + startLogout() { + return this.client.startLogout(this._sessionId); + } + + dispose() { + this.client.dispose(); + } +}