From 81c2c5caaf92f7ecd5fdd157847ec773a63cd91b Mon Sep 17 00:00:00 2001 From: Tyler Sturos <55340199+tjsturos@users.noreply.github.com> Date: Tue, 7 Oct 2025 17:12:12 -0800 Subject: [PATCH] add action queue --- .cursor/plans/background-5480a5f6.plan.md | 152 ++++++++++++++++++ src/actions/handlers/kickUser.ts | 27 ++++ src/actions/handlers/saveUserConfig.ts | 16 ++ src/actions/handlers/sendMessage.ts | 78 +++++++++ src/actions/types.ts | 19 +++ src/components/Layout.tsx | 22 +++ src/components/context/ActionQueue.tsx | 94 +++++++++++ src/components/dev/ActionQueuePanel.tsx | 50 ++++++ src/components/direct/DirectMessage.tsx | 65 +++++--- src/components/space/Channel.tsx | 72 ++++++--- src/config/actionQueue.ts | 10 ++ src/db/messages.ts | 133 ++++++++++++++- src/hooks/actions/useActionQueue.ts | 6 + .../business/channels/useChannelReadState.ts | 85 ++++++++++ .../mentions/useChannelMentionCounts.ts | 60 +++++++ src/hooks/business/user/useUserKicking.ts | 18 ++- src/hooks/business/user/useUserSettings.ts | 22 ++- src/services/ActionQueueService.ts | 146 +++++++++++++++++ web/main.tsx | 13 +- 19 files changed, 1025 insertions(+), 63 deletions(-) create mode 100644 .cursor/plans/background-5480a5f6.plan.md create mode 100644 src/actions/handlers/kickUser.ts create mode 100644 src/actions/handlers/saveUserConfig.ts create mode 100644 src/actions/handlers/sendMessage.ts create mode 100644 src/actions/types.ts create mode 100644 src/components/context/ActionQueue.tsx create mode 100644 src/components/dev/ActionQueuePanel.tsx create mode 100644 src/config/actionQueue.ts create mode 100644 src/hooks/actions/useActionQueue.ts create mode 100644 src/hooks/business/channels/useChannelReadState.ts create mode 100644 src/hooks/business/mentions/useChannelMentionCounts.ts create mode 100644 src/services/ActionQueueService.ts diff --git a/.cursor/plans/background-5480a5f6.plan.md b/.cursor/plans/background-5480a5f6.plan.md new file mode 100644 index 000000000..136a58684 --- /dev/null +++ b/.cursor/plans/background-5480a5f6.plan.md @@ -0,0 +1,152 @@ + +# Background Action Queue (Web, per-key serial, global concurrency) + +## Summary + +Build a persistent background queue for user actions (web-only for now) with per-key serial ordering and a global concurrency cap configured via environment variable. It integrates with existing `MessageService`, `ConfigService`, and uses `MessageDB` (IndexedDB) for persistence. UI remains non-blocking; we show toasts and expose a lightweight status hook. A full task panel will be a follow-up. + +- **Persistence**: New `action_queue` store in `MessageDB` (DB v4) +- **Concurrency**: Global cap from env `VITE_ACTION_QUEUE_CONCURRENCY` (default 4) with per-key serial (e.g., by conversation/user key) +- **Initial actions**: `send-message`, `save-user-config`, `kick-user` +- **Web scope now**: Storage + queue under web (native later via adapter) +- **UX**: Toasts + `useActionQueueStatus` hook; offline red banner when `navigator.onLine === false`; future full panel + +References: + +- Parallel concurrency pattern (chunked/limited Promise.all) [Medium](https://shnoman97.medium.com/parallel-processing-in-javascript-with-concurrency-c214e9facefd) +- React state and reset mechanics (avoid unintended resets when wiring providers) [React](https://react.dev/learn/preserving-and-resetting-state#resetting-state-at-the-same-position) +- State as component memory (queue status held in context + DB) [React](https://react.dev/learn/state-a-components-memory#when-a-regular-variable-isnt-enough) +- Responding to events (enqueue from UI handlers) [React](https://react.dev/learn/responding-to-events) +- Reducer for internal queue state transitions [React](https://react.dev/learn/extracting-state-logic-into-a-reducer) +- Effects for online/offline + resume [React](https://react.dev/reference/react/useEffect#displaying-different-content-on-the-server-and-the-client) + +## Files to Add + +- `src/actions/types.ts` + - `export type ActionType = 'send-message' | 'save-user-config' | 'kick-user'` + - `export interface QueueTask { id?: number; taskType: ActionType; context: any; key: string; status: 'pending'|'processing'|'completed'|'failed'; retryCount: number; createdAt: number; processedAt?: number; error?: string }` + - `export type BuildKey = (task: QueueTask['context']) => string` +- `src/actions/handlers/sendMessage.ts` + - Exports `handleSendMessage(ctx): Promise` that: saves pending locally, triggers network send via `MessageService`, marks sent or error in DB +- `src/actions/handlers/saveUserConfig.ts` + - Calls `ConfigService.saveConfig` and updates local state +- `src/actions/handlers/kickUser.ts` + - Calls the relevant service (re-uses existing permission/role APIs) +- `src/services/ActionQueueService.ts` + - Core queue logic: addTask, processQueue, resume, online/offline, retries, per-key serial + global concurrency + - Reads concurrency from env: `const concurrency = Math.max(1, Number(import.meta.env.VITE_ACTION_QUEUE_CONCURRENCY ?? 4));` +- `src/components/context/ActionQueue.tsx` + - Provider that instantiates `ActionQueueService`, exposes `addAction`, `useActionQueueStatus`, and starts processing on mount +- `src/hooks/actions/useActionQueue.ts` + - Thin hook over context for components to enqueue actions and read status +- `src/config/actionQueue.ts` + - Exports a helper to centralize the env read: +```ts +export const ACTION_QUEUE_CONCURRENCY = Math.max( + 1, + Number((import.meta as any).env?.VITE_ACTION_QUEUE_CONCURRENCY ?? 4) +); +``` + + +## File to Modify + +- `src/db/messages.ts` + - Bump DB_VERSION to 4; in `onupgradeneeded`, create `action_queue` store: +```ts +const queueStore = db.createObjectStore('action_queue', { keyPath: 'id', autoIncrement: true }); +queueStore.createIndex('by_status', ['status']); +queueStore.createIndex('by_createdAt', ['createdAt']); +queueStore.createIndex('by_key_status', ['key','status']); +``` + + - Add CRUD helpers: `addQueueTask`, `getPendingQueueTasks`, `updateQueueTask`, `deleteQueueTask`, `resetProcessingToPending` (for crash recovery) +- `web/main.tsx` + - Wrap `App` with `ActionQueueProvider` (after `MessageDBProvider` so services are available) +- `src/services/MessageService.ts` + - Export a helper usable by sendMessage handler (or reuse existing `submitMessage` flow without UI await); ensure it can be invoked headless +- A couple of minimal touchpoints to use `addAction` in DM/Channel submit paths (behind a flag or direct replacement) + +## Core Design Details + +- **Per-key serial**: `key` derived by action type: + - `send-message`: `${spaceId}/${channelId}` or direct address + - `save-user-config`: `${userAddress}` + - `kick-user`: `${spaceId}/${userAddress}` +- **Global concurrency**: Read from `VITE_ACTION_QUEUE_CONCURRENCY` (default 4). Up to that many distinct keys processed in parallel; tasks for a single key execute sequentially. +- **Algorithm**: + + 1. Load all `pending` tasks; group by `key`. + 2. Start up to `ACTION_QUEUE_CONCURRENCY` groups concurrently; within each group, await handlers sequentially. + 3. Mark task `processing` before run; on success set `completed` and delete (or keep with processedAt); on failure increment retry with backoff, set `failed` after max retries. + 4. On init, set any lingering `processing` to `pending` to recover. + +- **Online/Offline**: Process only when `navigator.onLine === true`. Listen to `online`/`offline` to pause/resume. +- **Crash/Refresh resilience**: Queue persisted; resumed by provider on mount. +- **Toasts + status**: Use existing notification utilities to show success/failure. Status hook returns counts per status and in-flight keys for badges. + +## Minimal Snippets + +- `addAction` usage (component): +```ts +const { addAction } = useActionQueue(); +await addAction('send-message', { spaceId, channelId, ...msg }); +``` + +- Concurrency setting (centralized): +```ts +import { ACTION_QUEUE_CONCURRENCY } from '@/config/actionQueue'; +// Use ACTION_QUEUE_CONCURRENCY inside ActionQueueService +``` + +- Concurrency skeleton in `ActionQueueService`: +```ts +async processQueue() { + if (this.processing) return; this.processing = true; + await this.db.resetProcessingToPending(); + const pending = await this.db.getPendingQueueTasks(); + const groups = groupByKey(pending); + const keys = Object.keys(groups); + for (let i = 0; i < keys.length; i += ACTION_QUEUE_CONCURRENCY) { + const slice = keys.slice(i, i + ACTION_QUEUE_CONCURRENCY); + await Promise.all(slice.map(k => this.processKeySerial(groups[k]))); + } + this.processing = false; +} +``` + + +## Integration Steps + +1. Add provider to `web/main.tsx`. Ensure provider order keeps `MessageDB` available to the queue. +2. Wire `send-message` path to enqueue instead of awaiting network send: + + - In `src/components/direct/DirectMessage.tsx` and `src/components/space/Channel.tsx`, call `addAction('send-message', ctx)` and immediately update UI (pending saved by handler). + +3. For `save-user-config` and `kick-user`, add non-blocking buttons that enqueue actions and optimistically update local state if applicable. + +## Success Criteria + +- Sending a message returns immediately; the message appears as pending, transitions to sent or error. +- Page refresh resumes queued tasks; interrupted tasks recover from `processing`. +- Multiple conversations can send in parallel; per conversation, order is preserved. +- Toasts appear on completion/failure; `useActionQueueStatus` shows correct counts. + +## Future Work + +- Native storage adapter and provider wrappers for React Native (AsyncStorage-based) +- Full background task panel listing, filtering, retry, and details +- Backoff tuning and per-action retry policies + +### To-dos + +- [ ] Add action_queue store and helpers to src/db/messages.ts +- [ ] Create ActionQueueService with per-key serial and global concurrency +- [ ] Implement sendMessage/saveUserConfig/kickUser handlers +- [ ] Add ActionQueueProvider and useActionQueue hook +- [ ] Wrap web/main.tsx with ActionQueueProvider +- [ ] Replace send-message calls in DirectMessage and Channel +- [ ] Replace save-user-config and kick-user calls +- [ ] Expose useActionQueueStatus and toasts for success/failure +- [ ] Add red OfflineBanner and wire to online/offline events +- [ ] Add full Background Tasks panel (list queued/failed) \ No newline at end of file diff --git a/src/actions/handlers/kickUser.ts b/src/actions/handlers/kickUser.ts new file mode 100644 index 000000000..3a7ee70f7 --- /dev/null +++ b/src/actions/handlers/kickUser.ts @@ -0,0 +1,27 @@ +type Deps = { + spaceService: any; + queryClient: any; +}; + +export function createKickUserHandler(deps: Deps) { + const { spaceService, queryClient } = deps; + + return async function handleKickUser(context: { + spaceId: string; + userAddress: string; + userKeyset: any; + deviceKeyset: any; + registration: any; + }) { + await spaceService.kickUser( + context.spaceId, + context.userAddress, + context.userKeyset, + context.deviceKeyset, + context.registration, + queryClient + ); + }; +} + + diff --git a/src/actions/handlers/saveUserConfig.ts b/src/actions/handlers/saveUserConfig.ts new file mode 100644 index 000000000..054df79d2 --- /dev/null +++ b/src/actions/handlers/saveUserConfig.ts @@ -0,0 +1,16 @@ +type Deps = { + configService: any; +}; + +export function createSaveUserConfigHandler(deps: Deps) { + const { configService } = deps; + + return async function handleSaveUserConfig(context: { + config: any; + keyset: any; + }) { + await configService.saveConfig(context); + }; +} + + diff --git a/src/actions/handlers/sendMessage.ts b/src/actions/handlers/sendMessage.ts new file mode 100644 index 000000000..a57654852 --- /dev/null +++ b/src/actions/handlers/sendMessage.ts @@ -0,0 +1,78 @@ +import { Message, PostMessage } from '../../api/quorumApi'; +import { t } from '@lingui/core/macro'; + +type Deps = { + messageDB: any; + submitChannelMessage: ( + spaceId: string, + channelId: string, + pendingMessage: string | object, + queryClient: any, + currentPasskeyInfo: any, + inReplyTo?: string, + skipSigning?: boolean + ) => Promise; + queryClient: any; +}; + +export function createSendMessageHandler(deps: Deps) { + const { messageDB, submitChannelMessage, queryClient } = deps; + + return async function handleSendMessage(context: { + spaceId: string; + channelId: string; + pendingMessage: string | object; + inReplyTo?: string; + skipSigning?: boolean; + }) { + const { spaceId, channelId, pendingMessage, inReplyTo, skipSigning } = context; + + // Optimistic local save of a pending message + const nonce = crypto.randomUUID(); + const messageIdBuffer = await crypto.subtle.digest( + 'SHA-256', + Buffer.from(JSON.stringify({ nonce, spaceId, channelId, pendingMessage }), 'utf-8') + ); + const message: Message = { + spaceId, + channelId, + messageId: Buffer.from(messageIdBuffer).toString('hex'), + digestAlgorithm: 'SHA-256', + nonce, + createdDate: Date.now(), + modifiedDate: Date.now(), + lastModifiedHash: '', + content: + typeof pendingMessage === 'string' + ? ({ + type: 'post', + senderId: '', + text: pendingMessage, + repliesToMessageId: inReplyTo, + } as PostMessage) + : { + ...(pendingMessage as any), + senderId: '', + }, + reactions: [], + mentions: { memberIds: [], roleIds: [], channelIds: [] }, + } as any; + + const conversation = await messageDB.getConversation({ + conversationId: spaceId + '/' + channelId, + }); + await messageDB.saveMessage( + message, + message.createdDate, + channelId, + 'group', + conversation?.conversation?.icon, + conversation?.conversation?.displayName + ); + + // Dispatch actual send via existing service logic (which updates caches and outbounds) + await submitChannelMessage(spaceId, channelId, pendingMessage, queryClient, (context as any).currentPasskeyInfo, inReplyTo, skipSigning); + }; +} + + diff --git a/src/actions/types.ts b/src/actions/types.ts new file mode 100644 index 000000000..b6fe36b51 --- /dev/null +++ b/src/actions/types.ts @@ -0,0 +1,19 @@ +export type ActionType = 'send-message' | 'save-user-config' | 'kick-user'; + +export type QueueTaskStatus = 'pending' | 'processing' | 'completed' | 'failed'; + +export interface QueueTask { + id?: number; + taskType: ActionType; + context: any; + key: string; // per-key serial grouping + status: QueueTaskStatus; + retryCount: number; + createdAt: number; + processedAt?: number; + error?: string; +} + +export type BuildKey = (context: any) => string; + + diff --git a/src/components/Layout.tsx b/src/components/Layout.tsx index e9463ab0a..223d164dc 100644 --- a/src/components/Layout.tsx +++ b/src/components/Layout.tsx @@ -13,6 +13,8 @@ import Connecting from './Connecting'; import { useModalManagement, useElectronDetection } from '../hooks'; import { useNavigationHotkeys } from '@/hooks/platform/interactions/useNavigationHotkeys'; import { useSidebar } from './context/SidebarProvider'; +import { ACTION_QUEUE_PANEL_ENABLED } from '../config/actionQueue'; +import { ActionQueuePanel } from './dev/ActionQueuePanel'; const Layout: React.FunctionComponent<{ children: React.ReactNode; @@ -36,6 +38,19 @@ const Layout: React.FunctionComponent<{ useSidebar(); useNavigationHotkeys(); + // Offline red banner + const [isOffline, setIsOffline] = React.useState(!navigator.onLine); + React.useEffect(() => { + const onOnline = () => setIsOffline(false); + const onOffline = () => setIsOffline(true); + window.addEventListener('online', onOnline); + window.addEventListener('offline', onOffline); + return () => { + window.removeEventListener('online', onOnline); + window.removeEventListener('offline', onOffline); + }; + }, []); + const [kickToast, setKickToast] = React.useState<{ message: string; variant?: 'info' | 'success' | 'warning' | 'error' } | null>(null); React.useEffect(() => { const kickHandler = (e: any) => { @@ -101,6 +116,11 @@ const Layout: React.FunctionComponent<{ )} {/* {joinSpaceVisible && setJoinSpaceVisible(false)}/>} */} + {isOffline && ( +
+ {`You are offline. Some actions will be queued.`} +
+ )} {}} @@ -153,6 +173,8 @@ const Layout: React.FunctionComponent<{ {rightSidebarContent} )} + + {ACTION_QUEUE_PANEL_ENABLED && } ); }; diff --git a/src/components/context/ActionQueue.tsx b/src/components/context/ActionQueue.tsx new file mode 100644 index 000000000..14d88c942 --- /dev/null +++ b/src/components/context/ActionQueue.tsx @@ -0,0 +1,94 @@ +import React, { createContext, useCallback, useContext, useMemo, useState, useEffect } from 'react'; +import { ActionQueueService } from '../../services/ActionQueueService'; +import { createSendMessageHandler } from '../../actions/handlers/sendMessage'; +import { createSaveUserConfigHandler } from '../../actions/handlers/saveUserConfig'; +import { createKickUserHandler } from '../../actions/handlers/kickUser'; +import { useMessageDB } from './useMessageDB'; +import { useQueryClient } from '@tanstack/react-query'; + +type ContextValue = { + addAction: (taskType: 'send-message' | 'save-user-config' | 'kick-user', context: any, key: string) => Promise; + isProcessing: boolean; + counts: { pending: number; processing: number; failed: number }; +}; + +const ActionQueueContext = createContext({ + addAction: async () => -1, + isProcessing: false, + counts: { pending: 0, processing: 0, failed: 0 }, +}); + +export const ActionQueueProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => { + const { messageDB } = useMessageDB(); + const queryClient = useQueryClient(); + + // Pull needed services/refs from MessageDB context providers via hooks + // We build handlers by reading from MessageDB provider runtime wiring + const messageDBContext = require('./MessageDB'); + const useMDB = messageDBContext.useMessageDB || (() => ({ })); + const { + submitChannelMessage, + saveConfig, + // SpaceService methods are proxied via MessageDB context + } = useMDB(); + + const messageService = { submitChannelMessage } as any; + const configService = { saveConfig } as any; + const spaceService = (useMDB() as any)?.spaceService || {}; + + const handlers = useMemo(() => ({ + 'send-message': createSendMessageHandler({ messageDB, messageService, queryClient, currentPasskeyInfo: (useMDB() as any)?.keyset ? (useMDB() as any)?.keyset?.currentPasskeyInfo : (useMDB() as any)?.currentPasskeyInfo }), + 'save-user-config': createSaveUserConfigHandler({ configService }), + 'kick-user': createKickUserHandler({ spaceService, queryClient }), + }), [messageDB, messageService, configService, spaceService, queryClient]); + + const [service] = useState(() => new ActionQueueService({ handlers, messageDB })); + const [isProcessing, setIsProcessing] = useState(false); + const [counts, setCounts] = useState({ pending: 0, processing: 0, failed: 0 }); + + const refreshCounts = useCallback(async () => { + const pending = (await messageDB.getQueueTasksByStatus('pending')).length; + const processing = (await messageDB.getQueueTasksByStatus('processing')).length; + const failed = (await messageDB.getQueueTasksByStatus('failed')).length; + setCounts({ pending, processing, failed }); + }, [messageDB]); + + const addAction = useCallback(async (taskType: any, context: any, key: string) => { + const id = await service.addTask(taskType, context, key); + refreshCounts(); + return id; + }, [service, refreshCounts]); + + useEffect(() => { + const handleOnline = () => service.processQueue(); + const handleOffline = () => {}; + window.addEventListener('online', handleOnline); + window.addEventListener('offline', handleOffline); + // Kick off processing at mount + service.processQueue(); + return () => { + window.removeEventListener('online', handleOnline); + window.removeEventListener('offline', handleOffline); + }; + }, [service]); + + useEffect(() => { + const onQueueUpdated = () => refreshCounts(); + (window as any).addEventListener('quorum:queue-updated', onQueueUpdated); + refreshCounts(); + return () => { + (window as any).removeEventListener('quorum:queue-updated', onQueueUpdated); + }; + }, [refreshCounts]); + + return ( + + {children} + + ); +}; + +export const useActionQueue = () => useContext(ActionQueueContext); +export { ActionQueueContext }; + + diff --git a/src/components/dev/ActionQueuePanel.tsx b/src/components/dev/ActionQueuePanel.tsx new file mode 100644 index 000000000..cfa762cc8 --- /dev/null +++ b/src/components/dev/ActionQueuePanel.tsx @@ -0,0 +1,50 @@ +import * as React from 'react'; +import { useActionQueue } from '../context/ActionQueue'; +import { useMessageDB } from '../context/useMessageDB'; + +export const ActionQueuePanel: React.FC = () => { + const { counts } = useActionQueue(); + const { messageDB } = useMessageDB(); + const [pending, setPending] = React.useState([]); + const [processing, setProcessing] = React.useState([]); + const [failed, setFailed] = React.useState([]); + + const refresh = React.useCallback(async () => { + setPending(await messageDB.getQueueTasksByStatus('pending')); + setProcessing(await messageDB.getQueueTasksByStatus('processing')); + setFailed(await messageDB.getQueueTasksByStatus('failed')); + }, [messageDB]); + + React.useEffect(() => { + refresh(); + }, [counts, refresh]); + + return ( +
+
Background Tasks
+
Pending: {counts.pending} • Processing: {counts.processing} • Failed: {counts.failed}
+
+
+
+
+ ); +}; + +const Section: React.FC<{ title: string; items: any[] }> = ({ title, items }) => { + if (!items.length) return null; + return ( +
+
{title}
+
    + {items.map((t) => ( +
  • +
    {t.taskType}
    +
    {JSON.stringify(t.context)}
    +
  • + ))} +
+
+ ); +}; + + diff --git a/src/components/direct/DirectMessage.tsx b/src/components/direct/DirectMessage.tsx index feadbe16d..dda4ab4bd 100644 --- a/src/components/direct/DirectMessage.tsx +++ b/src/components/direct/DirectMessage.tsx @@ -175,32 +175,47 @@ const DirectMessage: React.FC<{}> = () => { const effectiveSkip = nonRepudiable ? false : skipSigning; - if (typeof message === 'string') { - // Text message - await submitMessage( - address, - message, - self.registration!, - registration.registration!, - queryClient, - user.currentPasskeyInfo!, - keyset, - inReplyTo, - effectiveSkip - ); - } else { - // Embed message (image) - await submitMessage( - address, - message as EmbedMessage, - self.registration!, - registration.registration!, - queryClient, - user.currentPasskeyInfo!, - keyset, - inReplyTo, - effectiveSkip + try { + const { useActionQueue } = require('../../hooks/actions/useActionQueue'); + const { addAction } = useActionQueue(); + await addAction( + 'send-message', + { + spaceId: address, + channelId: address, + pendingMessage: message, + inReplyTo, + skipSigning: effectiveSkip, + currentPasskeyInfo: user.currentPasskeyInfo!, + }, + `${address}/${address}` ); + } catch { + if (typeof message === 'string') { + await submitMessage( + address, + message, + self.registration!, + registration.registration!, + queryClient, + user.currentPasskeyInfo!, + keyset, + inReplyTo, + effectiveSkip + ); + } else { + await submitMessage( + address, + message as EmbedMessage, + self.registration!, + registration.registration!, + queryClient, + user.currentPasskeyInfo!, + keyset, + inReplyTo, + effectiveSkip + ); + } } // Clear deletion flag after a short delay diff --git a/src/components/space/Channel.tsx b/src/components/space/Channel.tsx index a213ab876..d773749e0 100644 --- a/src/components/space/Channel.tsx +++ b/src/components/space/Channel.tsx @@ -147,15 +147,34 @@ const Channel: React.FC = ({ } const effectiveSkip = space?.isRepudiable ? skipSigning : false; - await submitChannelMessage( - spaceId, - channelId, - message, - queryClient, - user.currentPasskeyInfo!, - inReplyTo, - effectiveSkip - ); + // Enqueue send-message to background queue + try { + const { useActionQueue } = require('../../hooks/actions/useActionQueue'); + const { addAction } = useActionQueue(); + await addAction( + 'send-message', + { + spaceId, + channelId, + pendingMessage: message, + inReplyTo, + skipSigning: effectiveSkip, + currentPasskeyInfo: user.currentPasskeyInfo!, + }, + `${spaceId}/${channelId}` + ); + } catch { + // Fallback to direct submit on any error wiring queue + await submitChannelMessage( + spaceId, + channelId, + message, + queryClient, + user.currentPasskeyInfo!, + inReplyTo, + effectiveSkip + ); + } // Clear deletion flag after a short delay if (isDeletion) { @@ -193,15 +212,32 @@ const Channel: React.FC = ({ type: 'sticker', stickerId: stickerId, } as StickerMessage; - await submitChannelMessage( - spaceId, - channelId, - stickerMessage, - queryClient, - user.currentPasskeyInfo!, - inReplyTo, - false // Stickers are always signed - ); + try { + const { useActionQueue } = require('../../hooks/actions/useActionQueue'); + const { addAction } = useActionQueue(); + await addAction( + 'send-message', + { + spaceId, + channelId, + pendingMessage: stickerMessage, + inReplyTo, + skipSigning: false, + currentPasskeyInfo: user.currentPasskeyInfo!, + }, + `${spaceId}/${channelId}` + ); + } catch { + await submitChannelMessage( + spaceId, + channelId, + stickerMessage, + queryClient, + user.currentPasskeyInfo!, + inReplyTo, + false + ); + } // Auto-scroll to bottom after sending sticker setTimeout(() => { messageListRef.current?.scrollToBottom(); diff --git a/src/config/actionQueue.ts b/src/config/actionQueue.ts new file mode 100644 index 000000000..f6ffa280f --- /dev/null +++ b/src/config/actionQueue.ts @@ -0,0 +1,10 @@ +export const ACTION_QUEUE_CONCURRENCY = Math.max( + 1, + Number((import.meta as any).env?.VITE_ACTION_QUEUE_CONCURRENCY ?? 4) +); + +export const ACTION_QUEUE_PANEL_ENABLED = String( + (import.meta as any).env?.VITE_ACTION_QUEUE_PANEL +).toLowerCase() === 'true'; + + diff --git a/src/db/messages.ts b/src/db/messages.ts index aa7d6bee6..b2557db9f 100644 --- a/src/db/messages.ts +++ b/src/db/messages.ts @@ -72,7 +72,7 @@ export interface SearchResult { export class MessageDB { private db: IDBDatabase | null = null; private readonly DB_NAME = 'quorum_db'; - private readonly DB_VERSION = 3; + private readonly DB_VERSION = 4; private searchIndices: Map> = new Map(); private indexInitialized = false; @@ -147,6 +147,137 @@ export class MessageDB { ]); } } + + if (event.oldVersion < 4) { + // Persistent background action queue store + const queueStore = db.createObjectStore('action_queue', { + keyPath: 'id', + autoIncrement: true, + }); + queueStore.createIndex('by_status', 'status'); + queueStore.createIndex('by_createdAt', 'createdAt'); + queueStore.createIndex('by_key_status', ['key', 'status']); + } + }; + }); + } + + // --- Action Queue (persistent) --- + private ensureDb() { + if (!this.db) throw new Error('DB not initialized'); + } + + async addQueueTask(task: { + taskType: string; + context: any; + key: string; + status?: 'pending' | 'processing' | 'completed' | 'failed'; + retryCount?: number; + createdAt?: number; + processedAt?: number; + error?: string; + }): Promise { + await this.init(); + this.ensureDb(); + return new Promise((resolve, reject) => { + const tx = this.db!.transaction('action_queue', 'readwrite'); + const store = tx.objectStore('action_queue'); + const record = { + taskType: task.taskType, + context: task.context, + key: task.key, + status: task.status ?? 'pending', + retryCount: task.retryCount ?? 0, + createdAt: task.createdAt ?? Date.now(), + processedAt: task.processedAt, + error: task.error, + } as any; + const req = store.add(record); + req.onsuccess = () => resolve(req.result as number); + req.onerror = () => reject(req.error); + }); + } + + async getPendingQueueTasks(): Promise { + await this.init(); + this.ensureDb(); + return new Promise((resolve, reject) => { + const tx = this.db!.transaction('action_queue', 'readonly'); + const index = tx.objectStore('action_queue').index('by_status'); + const req = index.getAll('pending'); + req.onsuccess = () => resolve(req.result || []); + req.onerror = () => reject(req.error); + }); + } + + async getQueueTasksByKeyAndStatus(key: string, status: 'pending' | 'processing' | 'completed' | 'failed'): Promise { + await this.init(); + this.ensureDb(); + return new Promise((resolve, reject) => { + const tx = this.db!.transaction('action_queue', 'readonly'); + const index = tx.objectStore('action_queue').index('by_key_status'); + const range = IDBKeyRange.only([key, status]); + const req = index.getAll(range); + req.onsuccess = () => resolve(req.result || []); + req.onerror = () => reject(req.error); + }); + } + + async getQueueTasksByStatus(status: 'pending' | 'processing' | 'completed' | 'failed'): Promise { + await this.init(); + this.ensureDb(); + return new Promise((resolve, reject) => { + const tx = this.db!.transaction('action_queue', 'readonly'); + const index = tx.objectStore('action_queue').index('by_status'); + const req = index.getAll(status); + req.onsuccess = () => resolve(req.result || []); + req.onerror = () => reject(req.error); + }); + } + + async updateQueueTask(task: any): Promise { + await this.init(); + this.ensureDb(); + return new Promise((resolve, reject) => { + const tx = this.db!.transaction('action_queue', 'readwrite'); + const store = tx.objectStore('action_queue'); + const req = store.put(task); + req.onsuccess = () => resolve(); + req.onerror = () => reject(req.error); + }); + } + + async deleteQueueTask(id: number): Promise { + await this.init(); + this.ensureDb(); + return new Promise((resolve, reject) => { + const tx = this.db!.transaction('action_queue', 'readwrite'); + const store = tx.objectStore('action_queue'); + const req = store.delete(id); + req.onsuccess = () => resolve(); + req.onerror = () => reject(req.error); + }); + } + + async resetProcessingToPending(): Promise { + await this.init(); + this.ensureDb(); + return new Promise((resolve, reject) => { + const tx = this.db!.transaction('action_queue', 'readwrite'); + const store = tx.objectStore('action_queue'); + const index = store.index('by_status'); + const req = index.openCursor('processing'); + req.onerror = () => reject(req.error); + req.onsuccess = (ev) => { + const cursor = (ev.target as IDBRequest).result; + if (cursor) { + const value = cursor.value; + value.status = 'pending'; + store.put(value); + cursor.continue(); + } else { + resolve(); + } }; }); } diff --git a/src/hooks/actions/useActionQueue.ts b/src/hooks/actions/useActionQueue.ts new file mode 100644 index 000000000..c4a5e3040 --- /dev/null +++ b/src/hooks/actions/useActionQueue.ts @@ -0,0 +1,6 @@ +import { useContext } from 'react'; +import { ActionQueueContext } from '../../components/context/ActionQueue'; + +export const useActionQueue = () => useContext(ActionQueueContext as any); + + diff --git a/src/hooks/business/channels/useChannelReadState.ts b/src/hooks/business/channels/useChannelReadState.ts new file mode 100644 index 000000000..9e0d2cfd6 --- /dev/null +++ b/src/hooks/business/channels/useChannelReadState.ts @@ -0,0 +1,85 @@ +import { useCallback, useEffect, useMemo, useState } from 'react'; +import { useMessageDB } from '../../../components/context/useMessageDB'; +import { useInvalidateSpace } from '../../queries/space/useInvalidateSpace'; + +type ChannelReadState = { + lastReadTimestamp: number; + lastReadMessageId?: string; +}; + +const STORAGE_KEY_PREFIX = 'channel_read_state:'; + +function storageKey(spaceId: string, channelId: string, userAddress: string) { + return `${STORAGE_KEY_PREFIX}${userAddress}:${spaceId}/${channelId}`; +} + +export function useChannelReadState( + spaceId: string, + channelId: string, + userAddress?: string +) { + const { messageDB } = useMessageDB(); + const invalidateSpace = useInvalidateSpace(); + const [state, setState] = useState({ lastReadTimestamp: 0 }); + + // Load from localStorage on mount + useEffect(() => { + if (!userAddress) return; + try { + const raw = localStorage.getItem(storageKey(spaceId, channelId, userAddress)); + if (raw) { + const parsed = JSON.parse(raw) as ChannelReadState; + setState(parsed); + } + } catch {} + }, [spaceId, channelId, userAddress]); + + const markRead = useCallback( + async (timestamp?: number, lastReadMessageId?: string) => { + const next: ChannelReadState = { + lastReadTimestamp: timestamp ?? Date.now(), + lastReadMessageId, + }; + setState(next); + if (userAddress) { + try { + localStorage.setItem( + storageKey(spaceId, channelId, userAddress), + JSON.stringify(next) + ); + } catch {} + } + // Also persist conversation lastReadTimestamp for DM parity + try { + await messageDB.saveReadTime({ + conversationId: `${spaceId}/${channelId}`, + lastMessageTimestamp: next.lastReadTimestamp, + }); + } catch {} + // Invalidate space so mention counts recompute + invalidateSpace({ spaceId }); + }, + [spaceId, channelId, userAddress, messageDB, invalidateSpace] + ); + + return useMemo( + () => ({ + lastReadTimestamp: state.lastReadTimestamp, + lastReadMessageId: state.lastReadMessageId, + markRead, + }), + [state, markRead] + ); +} + + + + + + + + + + + + diff --git a/src/hooks/business/mentions/useChannelMentionCounts.ts b/src/hooks/business/mentions/useChannelMentionCounts.ts new file mode 100644 index 000000000..5986a010c --- /dev/null +++ b/src/hooks/business/mentions/useChannelMentionCounts.ts @@ -0,0 +1,60 @@ +import { useCallback, useEffect, useMemo, useState } from 'react'; +import { useMessageDB } from '../../../components/context/useMessageDB'; +import { usePasskeysContext } from '@quilibrium/quilibrium-js-sdk-channels'; +import { useChannelReadState } from '../channels/useChannelReadState'; + +export function useChannelMentionCounts(spaceId: string, channelIds: string[]) { + const { messageDB } = useMessageDB(); + const passkeys = usePasskeysContext(); + const userAddress = passkeys.currentPasskeyInfo?.address; + + const [counts, setCounts] = useState>({}); + + // Maintain per-channel read state to compute unread mentions + const readStates = channelIds.map((channelId) => ({ + channelId, + stateHook: useChannelReadState(spaceId, channelId, userAddress), + })); + + const refresh = useCallback(async () => { + if (!userAddress) return; + const entries = await Promise.all( + channelIds.map(async (channelId) => { + const rs = readStates.find((r) => r.channelId === channelId)!.stateHook; + const count = await messageDB.getUnreadMentionCountForChannel({ + spaceId, + channelId, + memberId: userAddress, + sinceTimestamp: rs.lastReadTimestamp || 0, + }); + return [channelId, count] as const; + }) + ); + setCounts(Object.fromEntries(entries)); + }, [channelIds, messageDB, readStates, spaceId, userAddress]); + + useEffect(() => { + refresh(); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [refresh]); + + return useMemo( + () => ({ + counts, + refresh, + }), + [counts, refresh] + ); +} + + + + + + + + + + + + diff --git a/src/hooks/business/user/useUserKicking.ts b/src/hooks/business/user/useUserKicking.ts index 7c66f4f4f..8ba5e1cd9 100644 --- a/src/hooks/business/user/useUserKicking.ts +++ b/src/hooks/business/user/useUserKicking.ts @@ -14,6 +14,8 @@ export const useUserKicking = () => { const queryClient = useQueryClient(); const { kickUser } = useMessageDB(); + const { useActionQueue } = require('../../hooks/actions/useActionQueue'); + const { addAction } = useActionQueue(); const { currentPasskeyInfo } = usePasskeysContext(); const { keyset } = useRegistrationContext(); const { data: registration } = useRegistration({ @@ -36,12 +38,16 @@ export const useUserKicking = () => { setKicking(true); try { - await kickUser( - spaceId, - userAddress, - keyset.userKeyset, - keyset.deviceKeyset, - registration.registration + await addAction( + 'kick-user', + { + spaceId, + userAddress, + userKeyset: keyset.userKeyset, + deviceKeyset: keyset.deviceKeyset, + registration: registration.registration, + }, + `${spaceId}/${userAddress}` ); // The kickUser function doesn't remove the user from local IndexedDB diff --git a/src/hooks/business/user/useUserSettings.ts b/src/hooks/business/user/useUserSettings.ts index 22016f881..960e7576b 100644 --- a/src/hooks/business/user/useUserSettings.ts +++ b/src/hooks/business/user/useUserSettings.ts @@ -48,6 +48,8 @@ export const useUserSettings = ( }); const { keyset } = useRegistrationContext(); const { saveConfig, getConfig, updateUserProfile } = useMessageDB(); + const { useActionQueue } = require('../../hooks/actions/useActionQueue'); + const { addAction } = useActionQueue(); const uploadRegistration = useUploadRegistration(); const existingConfig = useRef(null); @@ -139,15 +141,19 @@ export const useUserSettings = ( currentPasskeyInfo ); - // Save config - await saveConfig({ - config: { - ...existingConfig.current!, - allowSync, - nonRepudiable: nonRepudiable, + // Save config via background queue + await addAction( + 'save-user-config', + { + config: { + ...existingConfig.current!, + allowSync, + nonRepudiable: nonRepudiable, + }, + keyset: keyset, }, - keyset: keyset, - }); + currentPasskeyInfo.address + ); // If devices were removed, reconstruct and upload the registration if (removedDevices.length > 0 && stagedRegistration) { diff --git a/src/services/ActionQueueService.ts b/src/services/ActionQueueService.ts new file mode 100644 index 000000000..247edc42e --- /dev/null +++ b/src/services/ActionQueueService.ts @@ -0,0 +1,146 @@ +import { ACTION_QUEUE_CONCURRENCY } from '../config/actionQueue'; +import { QueueTask, ActionType } from '../actions/types'; + +type HandlersMap = { + [K in ActionType]?: (context: any) => Promise; +}; + +export class ActionQueueService { + private processing = false; + private concurrency: number; + private handlers: HandlersMap; + private messageDB: any; + + constructor({ handlers, messageDB, concurrency }: { handlers: HandlersMap; messageDB: any; concurrency?: number }) { + this.handlers = handlers; + this.messageDB = messageDB; + this.concurrency = Math.max(1, concurrency ?? ACTION_QUEUE_CONCURRENCY); + } + + async addTask(taskType: ActionType, context: any, key: string): Promise { + const id = await this.messageDB.addQueueTask({ taskType, context, key, status: 'pending', retryCount: 0, createdAt: Date.now() }); + // Trigger processing asynchronously + this.processQueue().catch(() => {}); + try { + (window as any).dispatchEvent(new CustomEvent('quorum:queue-updated')); + } catch {} + return id; + } + + async processQueue() { + if (this.processing) return; + this.processing = true; + try { + await this.messageDB.resetProcessingToPending(); + const pending: QueueTask[] = await this.messageDB.getPendingQueueTasks(); + if (!pending.length) return; + const groups = this.groupByKey(pending); + const keys = Object.keys(groups); + for (let i = 0; i < keys.length; i += this.concurrency) { + const slice = keys.slice(i, i + this.concurrency); + await Promise.all(slice.map((k) => this.processKeySerial(groups[k]))); + } + try { + (window as any).dispatchEvent(new CustomEvent('quorum:queue-updated')); + } catch {} + } finally { + this.processing = false; + } + } + + private groupByKey(tasks: QueueTask[]): Record { + return tasks.reduce((acc, t) => { + (acc[t.key] ||= []).push(t); + return acc; + }, {} as Record); + } + + private async processKeySerial(tasks: QueueTask[]) { + // sort by createdAt ascending to preserve order + tasks.sort((a, b) => a.createdAt - b.createdAt); + for (const task of tasks) { + await this.runTask(task); + } + } + + private async runTask(task: QueueTask) { + // mark processing + task.status = 'processing'; + task.processedAt = Date.now(); + await this.messageDB.updateQueueTask(task); + + const handler = this.handlers[task.taskType]; + if (!handler) { + task.status = 'failed'; + task.error = `No handler for task type: ${task.taskType}`; + await this.messageDB.updateQueueTask(task); + return; + } + + try { + await handler(task.context); + // delete tasks after success + if (task.id != null) { + await this.messageDB.deleteQueueTask(task.id); + } + try { + (window as any).dispatchEvent(new CustomEvent('quorum:queue-updated')); + } catch {} + try { + const msg = this.successMessage(task.taskType); + if (msg) { + (window as any).dispatchEvent( + new CustomEvent('quorum:toast', { detail: { message: msg, variant: 'success' } }) + ); + } + } catch {} + } catch (err: any) { + // simple retry policy: up to 3 attempts with exponential backoff trigger via next cycles + task.retryCount = (task.retryCount ?? 0) + 1; + task.status = task.retryCount >= 3 ? 'failed' : 'pending'; + task.error = err?.message || String(err); + await this.messageDB.updateQueueTask(task); + try { + (window as any).dispatchEvent(new CustomEvent('quorum:queue-updated')); + } catch {} + try { + if (task.status === 'failed') { + (window as any).dispatchEvent( + new CustomEvent('quorum:toast', { detail: { message: this.failureMessage(task.taskType, task.error), variant: 'error' } }) + ); + } + } catch {} + } + } + + private successMessage(type: ActionType): string | null { + switch (type) { + case 'send-message': + return 'Message sent'; + case 'save-user-config': + return 'Settings saved'; + case 'kick-user': + return 'User kicked'; + default: + return null; + } + } + + private failureMessage(type: ActionType, error?: string): string { + const base = (() => { + switch (type) { + case 'send-message': + return 'Failed to send message'; + case 'save-user-config': + return 'Failed to save settings'; + case 'kick-user': + return 'Failed to kick user'; + default: + return 'Task failed'; + } + })(); + return error ? `${base}: ${error}` : base; + } +} + + diff --git a/web/main.tsx b/web/main.tsx index cd9358ea1..a3c477df6 100644 --- a/web/main.tsx +++ b/web/main.tsx @@ -12,6 +12,7 @@ import { ThemeProvider } from '../src/components/primitives/theme'; import { i18n } from '@lingui/core'; import { I18nProvider } from '@lingui/react'; import { dynamicActivate, getUserLocale } from '../src/i18n/i18n'; +import { ActionQueueProvider } from '../src/components/context/ActionQueue'; const queryClient = new QueryClient({ defaultOptions: { @@ -40,11 +41,13 @@ const Root = () => { - - - - - + + + + + + +