From ae66c81a7f895a237df6cf39f475108c3ae2b182 Mon Sep 17 00:00:00 2001 From: Valentino Hudhra Date: Sun, 1 Mar 2026 14:23:09 +0000 Subject: [PATCH] Add e2e test for cross-client query invalidation via Centrifugo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two-browser Playwright test that proves the realtime query invalidation pipeline works end-to-end: User B's mutation publishes to Centrifugo, User A's WebSocket subscription triggers queryClient.invalidateQueries, and the listJoinRequests query refetches without a page refresh. Primary assertion is a network intercept (waitForResponse) catching the refetch on User A's page — the direct proof that invalidation fired. Also adds Centrifugo env vars to dev:e2e and playwright config, and starts a Centrifugo Docker container in the CI workflow. --- .github/workflows/e2e-tests.yml | 17 ++- package.json | 3 +- tests/e2e/playwright.config.ts | 5 + tests/e2e/tests/query-invalidation.spec.ts | 160 +++++++++++++++++++++ 4 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 tests/e2e/tests/query-invalidation.spec.ts diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 58d255cce..a1a269562 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -43,7 +43,7 @@ jobs: SUPABASE_AUTH_EXTERNAL_GOOGLE_CLIENT_SECRET: test-client-secret # TipTap Cloud (collaborative editing - stub for tests) NEXT_PUBLIC_TIPTAP_APP_ID: test-tiptap-app-id - # Centrifugo (realtime - stub values for tests) + # Centrifugo (realtime messaging server) CENTRIFUGO_API_URL: http://localhost:8000/api CENTRIFUGO_API_KEY: test_api_key CENTRIFUGO_TOKEN_SECRET: test_token_secret @@ -127,6 +127,21 @@ jobs: - name: Seed test database run: pnpm w:db seed:e2e + - name: Start Centrifugo + run: | + docker run -d --rm --name centrifugo \ + -p 8000:8000 \ + --env-file services/realtime/.env.test \ + centrifugo/centrifugo:v6 centrifugo + echo "Waiting for Centrifugo to be ready..." + for i in $(seq 1 30); do + if curl -sf http://localhost:8000/health > /dev/null 2>&1; then + echo "Centrifugo is ready" + break + fi + sleep 1 + done + - name: Run E2E tests run: pnpm -C ./tests/e2e exec playwright test --shard=${{ matrix.shard }}/2 --reporter=blob diff --git a/package.json b/package.json index 2df124f89..23efd7925 100644 --- a/package.json +++ b/package.json @@ -15,10 +15,11 @@ "deps:override": "tsx scripts/depCheck.ts --multiuse && pnpm prettier --write **/package.json --write", "deps:viz": "node ./scripts/visualize-deps.mjs", "dev": "turbo dev", - "dev:e2e": "cross-env E2E=true NEXT_PUBLIC_SUPABASE_URL=http://127.0.0.1:56321 DATABASE_URL=postgresql://postgres:postgres@127.0.0.1:56322/postgres S3_ASSET_ROOT=http://127.0.0.1:56321/storage/v1/object/public/assets TIPTAP_SECRET=e2e NEXT_PUBLIC_TIPTAP_APP_ID=e2e turbo dev:e2e", + "dev:e2e": "cross-env E2E=true NEXT_PUBLIC_SUPABASE_URL=http://127.0.0.1:56321 DATABASE_URL=postgresql://postgres:postgres@127.0.0.1:56322/postgres S3_ASSET_ROOT=http://127.0.0.1:56321/storage/v1/object/public/assets TIPTAP_SECRET=e2e NEXT_PUBLIC_TIPTAP_APP_ID=e2e CENTRIFUGO_API_URL=http://localhost:8000/api CENTRIFUGO_API_KEY=test_api_key CENTRIFUGO_TOKEN_SECRET=test_token_secret NEXT_PUBLIC_CENTRIFUGO_WS_URL=ws://localhost:8000/connection/websocket turbo dev:e2e", "dev:test": "dotenv -e .env.test -o -- turbo dev", "e2e": "pnpm -C ./tests/e2e e2e", "e2e:ui": "pnpm -C ./tests/e2e e2e:ui", + "end-to-end": "pnpm -C ./tests/e2e e2e", "format": "prettier --write \"**/*.{ts,tsx,md}\"", "format:changes": "git diff --name-only --diff-filter=ACM | xargs pnpx prettier --write", "format:check": "turbo format:check", diff --git a/tests/e2e/playwright.config.ts b/tests/e2e/playwright.config.ts index dfb9849c1..df1b44a12 100644 --- a/tests/e2e/playwright.config.ts +++ b/tests/e2e/playwright.config.ts @@ -23,6 +23,11 @@ Object.assign(process.env, { // getProposalDocumentsContent guards on their presence before calling the client. TIPTAP_SECRET: 'e2e', NEXT_PUBLIC_TIPTAP_APP_ID: 'e2e', + // Centrifugo (realtime messaging) — must match the running Centrifugo instance + CENTRIFUGO_API_URL: 'http://localhost:8000/api', + CENTRIFUGO_API_KEY: 'test_api_key', + CENTRIFUGO_TOKEN_SECRET: 'test_token_secret', + NEXT_PUBLIC_CENTRIFUGO_WS_URL: 'ws://localhost:8000/connection/websocket', }); /** diff --git a/tests/e2e/tests/query-invalidation.spec.ts b/tests/e2e/tests/query-invalidation.spec.ts new file mode 100644 index 000000000..53b15daeb --- /dev/null +++ b/tests/e2e/tests/query-invalidation.spec.ts @@ -0,0 +1,160 @@ +import { users } from '@op/db/schema'; +import { db, eq } from '@op/db/test'; + +import { + TEST_USER_DEFAULT_PASSWORD, + authenticateAsUser, + createOrganization, + createSupabaseAdminClient, + expect, + test, +} from '../fixtures/index.js'; + +/** + * Tests the channel-based query invalidation system end-to-end across two + * separate browser sessions connected via Centrifugo WebSocket. + * + * The full pipeline under test: + * 1. User A loads the landing page. The `listJoinRequests` query registers + * on the `profileJoinRequest:target:{orgProfileId}` channel via + * `ctx.registerQueryChannels`. The client-side tRPC link extracts the + * channel from the response and registers it in the `queryChannelRegistry`. + * `QueryInvalidationSubscriber` subscribes to the channel over WebSocket. + * 2. User B (in a separate browser) visits User A's org profile and clicks + * "Request". The `createJoinRequest` mutation registers on the same + * channel via `ctx.registerMutationChannels`. The `withChannelMeta` + * middleware publishes a message to Centrifugo on that channel. + * 3. Centrifugo broadcasts the message to User A's WebSocket subscription. + * 4. User A's `QueryInvalidationSubscriber` receives the message and calls + * `queryClient.invalidateQueries` for all queries on matching channels. + * 5. The `listJoinRequests` query refetches and User A's UI updates to show + * the new join request — without a page refresh. + * + * The primary assertion is a **network intercept**: we wait for User A's + * browser to issue a new `profile.listJoinRequests` HTTP request after User B's + * mutation. This is the direct proof that `queryClient.invalidateQueries` fired + * as a result of the WebSocket message — no page refresh, no navigation, just + * the realtime pipeline triggering a refetch. + * + * Requires Centrifugo to be running (see services/realtime/start-centrifugo.sh). + */ +test.describe('Query invalidation via realtime', () => { + test('mutation on one client invalidates a query on another client via Centrifugo', async ({ + browser, + org, + }) => { + const supabaseAdmin = createSupabaseAdminClient(); + + // ── User A setup ────────────────────────────────────────────────── + // User A is the org admin. Set their active profile to the org so + // the landing page renders OrgNotifications (which includes + // JoinProfileRequestsNotifications → listJoinRequests query). + const userA = org.adminUser; + + await db + .update(users) + .set({ currentProfileId: org.organizationProfile.id }) + .where(eq(users.authUserId, userA.authUserId)); + + // ── User B setup ────────────────────────────────────────────────── + // User B is a member of a different org. Their active profile must + // be their individual profile so the "Request" button appears on + // User A's org profile page. + const userBOrg = await createOrganization({ + testId: `qi-b-${Date.now()}`, + supabaseAdmin, + users: { admin: 1, member: 0 }, + }); + const userB = userBOrg.adminUser; + + const [userBRecord] = await db + .select() + .from(users) + .where(eq(users.authUserId, userB.authUserId)); + + if (!userBRecord?.profileId) { + throw new Error('User B has no individual profile'); + } + + await db + .update(users) + .set({ currentProfileId: userBRecord.profileId }) + .where(eq(users.authUserId, userB.authUserId)); + + // ── User A: open landing page ───────────────────────────────────── + const contextA = await browser.newContext(); + const pageA = await contextA.newPage(); + await authenticateAsUser(pageA, { + email: userA.email, + password: TEST_USER_DEFAULT_PASSWORD, + }); + + await pageA.goto('/en/'); + + // Verify landing page loaded + await expect( + pageA.getByRole('heading', { level: 1, name: /Welcome back/ }), + ).toBeVisible({ timeout: 15000 }); + + // The listJoinRequests query has fired (0 results → component renders + // nothing). The query is registered on the realtime channel and the + // WebSocket subscription is active. + await expect(pageA.getByText('Join requests')).not.toBeVisible(); + + // ── Set up network intercept on User A BEFORE User B acts ───────── + // This is the primary proof: we wait for User A's browser to issue a + // new listJoinRequests request. This can only happen if the WebSocket + // message triggered queryClient.invalidateQueries on that query. + const refetchPromise = pageA.waitForResponse( + (response) => + response.url().includes('profile.listJoinRequests') && + response.status() === 200, + { timeout: 15000 }, + ); + + // ── User B: open org profile and send join request ──────────────── + const contextB = await browser.newContext(); + const pageB = await contextB.newPage(); + await authenticateAsUser(pageB, { + email: userB.email, + password: TEST_USER_DEFAULT_PASSWORD, + }); + + const orgSlug = org.organizationProfile.slug; + await pageB.goto(`/en/profile/${orgSlug}`); + + // Wait for the profile page to load + await expect( + pageB.getByRole('heading', { name: org.organizationProfile.name }), + ).toBeVisible({ timeout: 15000 }); + + // Click "Request" to create a join request. + // Server-side: withChannelMeta publishes to Centrifugo on the + // profileJoinRequest:target channel. + const requestButton = pageB.getByRole('button', { name: 'Request' }); + await expect(requestButton).toBeVisible({ timeout: 10000 }); + await requestButton.click(); + + // ── Primary assertion: network-level proof of invalidation ──────── + // Wait for the refetch response on User A's page. This HTTP request + // was triggered by: + // Centrifugo message → RealtimeManager → QueryInvalidationSubscriber + // → queryClient.invalidateQueries → listJoinRequests refetch + // No navigation or refresh occurred — this is purely the realtime + // invalidation pipeline. + const refetchResponse = await refetchPromise; + expect(refetchResponse.ok()).toBe(true); + + // ── Secondary assertion: UI confirms the data actually rendered ─── + await expect(pageA.getByText('Join requests')).toBeVisible({ + timeout: 5000, + }); + await expect( + pageA.getByText('wants to join your organization'), + ).toBeVisible({ timeout: 5000 }); + + // ── Cleanup ─────────────────────────────────────────────────────── + await contextA.close(); + await contextB.close(); + }); +});