Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@
import * as findUp from "find-up";
import path from "path";

const root = path.dirname(await findUp.findUp(".git", { type: "directory" }));
// In git worktrees, `.git` is a *file* (not a directory), so don't constrain the type.
// Fall back to workspace markers for non-git environments.
const rootMarker =
(await findUp.findUp(".git")) ??
(await findUp.findUp("pnpm-workspace.yaml")) ??
(await findUp.findUp("package.json"));
if (!rootMarker) {
throw new Error("Unable to locate repo root (no .git/workspace marker found)");
}
const root = path.dirname(rootMarker);

export default {
// global options
Expand Down
12 changes: 12 additions & 0 deletions debugging-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Findings (2026-02-06)
- `queryAll` uses RPC default timeout of 10s; search/iterate does not override this even when `remote.wait.timeout` is much longer. Under churn, RPC requests can time out before `waitForResolved` deadlines, leading to missing responses that are silently swallowed unless `throwOnMissing` is set. This can cause partial results to be treated as complete. (see `packages/programs/rpc/src/controller.ts` and `packages/programs/data/document/document/src/search.ts`)
- `MissingResponsesError` is caught in `queryCommence` and only logged unless `remote.throwOnMissing` is set, so caller has no signal to keep iterator open or requery missing peers.

# Changes Applied
- `queryCommence` now aligns RPC timeout with `remote.wait.timeout` when provided, reducing premature MissingResponsesError during churn.
- `MissingResponsesError` now carries `missingGroups` metadata and `queryCommence` exposes this via an `onMissingResponses` callback.
- `iterate` marks the initial fetch as incomplete when missing responses are detected to avoid prematurely closing the iterator.

# Tests
- Attempted: `node ./node_modules/aegir/src/index.js run test --roots ./packages/programs/rpc -- -t node --grep "queryAll"`
- Result: failed (missing local dependency `node_modules/aegir`)
11 changes: 10 additions & 1 deletion packages/clients/peerbit/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ import fs from "fs";
import { createRequire } from "module";
import path from "path";

const root = path.dirname(await findUp.findUp(".git", { type: "directory" }));
// In git worktrees, `.git` is a *file* (not a directory), so don't constrain the type.
// Fall back to workspace markers for non-git environments.
const rootMarker =
(await findUp.findUp(".git")) ??
(await findUp.findUp("pnpm-workspace.yaml")) ??
(await findUp.findUp("package.json"));
if (!rootMarker) {
throw new Error("Unable to locate repo root (no .git/workspace marker found)");
}
const root = path.dirname(rootMarker);
const resolverFromRoot = createRequire(path.join(root, "package.json"));
const resolverFromLocal = createRequire(import.meta.url);

Expand Down
21 changes: 21 additions & 0 deletions packages/programs/data/document/document/src/search.ts
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ type QueryDetailedOptions<
response: types.AbstractSearchResult,
from: PublicSignKey,
) => void | Promise<void>;
onMissingResponses?: (error: MissingResponsesError) => void | Promise<void>;
remote?: {
from?: string[]; // if specified, only query these peers
};
Expand Down Expand Up @@ -2237,6 +2238,15 @@ export class DocumentIndex<
// this will lead to bad UX as you usually want to list/expore whats going on before doing any replication work
remote.priority = 2;
}
if (remote && remote.timeout == null && options?.remote) {
const waitPolicy =
typeof options.remote === "object" ? options.remote.wait : undefined;
const waitTimeout =
typeof waitPolicy === "object" ? waitPolicy.timeout : undefined;
if (waitTimeout != null) {
remote.timeout = waitTimeout;
}
}

if (!local && !remote) {
throw new Error(
Expand Down Expand Up @@ -2403,6 +2413,9 @@ export class DocumentIndex<
} catch (error) {
if (error instanceof MissingResponsesError) {
warn("Did not reciveve responses from all shard");
if (options?.onMissingResponses) {
await options.onMissingResponses(error);
}
if (remote?.throwOnMissing) {
throw error;
}
Expand Down Expand Up @@ -2961,6 +2974,7 @@ export class DocumentIndex<
): Promise<boolean> => {
await warmupPromise;
let hasMore = false;
let missingResponses = false;
const discoverTargets =
typeof options?.remote === "object"
? options.remote.reach?.discover
Expand Down Expand Up @@ -3096,10 +3110,17 @@ export class DocumentIndex<
);
}
},
onMissingResponses: () => {
missingResponses = true;
},
},
fetchOptions?.fetchedFirstForRemote,
);

if (missingResponses) {
hasMore = true;
unsetDone();
}
if (!hasMore) {
maybeSetDone();
}
Expand Down
5 changes: 4 additions & 1 deletion packages/programs/rpc/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import type {
} from "./io.js";

export class MissingResponsesError extends Error {
constructor(message: string) {
missingGroups: string[][];
constructor(message: string, missingGroups: string[][] = []) {
super(message);
this.missingGroups = missingGroups;
}
}
export type RPCRequestAllOptions<_Q, R> = RPCRequestResponseOptions<R> &
Expand Down Expand Up @@ -84,6 +86,7 @@ export const queryAll = async <Q, R>(
throw new MissingResponsesError(
"Did not receive responses from all shards: " +
JSON.stringify(missingReponses),
missingReponses,
);
}
};
21 changes: 21 additions & 0 deletions packages/programs/rpc/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { TestSession } from "@peerbit/test-utils";
import { AbortError, delay, waitFor, waitForResolved } from "@peerbit/time";
import { expect } from "chai";
import {
MissingResponsesError,
RPC,
type RPCResponse,
type RequestEvent,
Expand Down Expand Up @@ -734,4 +735,24 @@ describe("queryAll", () => {
await expect(promise).rejectedWith("TestAborted");
expect(+new Date() - t1).lessThan(1000);
});

it("reports missing groups on timeout", async () => {
clients[1].delay = 200;
const missingGroup = [[clients[1].node.identity.publicKey.hashcode()]];
try {
await queryAll(
clients[0].query,
missingGroup,
new Body({ arr: new Uint8Array([1]) }),
() => {},
{ timeout: 50 },
);
expect.fail("Expected MissingResponsesError");
} catch (error) {
expect(error).to.be.instanceOf(MissingResponsesError);
expect((error as MissingResponsesError).missingGroups).to.deep.equal(
missingGroup,
);
}
});
});
11 changes: 10 additions & 1 deletion packages/utils/any-store/any-store/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@ import fs from "fs";
import { createRequire } from "module";
import path from "path";

const root = path.dirname(await findUp.findUp(".git", { type: "directory" }));
// In git worktrees, `.git` is a *file* (not a directory), so don't constrain the type.
// Fall back to workspace markers for non-git environments.
const rootMarker =
(await findUp.findUp(".git")) ??
(await findUp.findUp("pnpm-workspace.yaml")) ??
(await findUp.findUp("package.json"));
if (!rootMarker) {
throw new Error("Unable to locate repo root (no .git/workspace marker found)");
}
const root = path.dirname(rootMarker);
const resolverFromRoot = createRequire(path.join(root, "package.json"));
const resolverFromLocal = createRequire(import.meta.url);

Expand Down
11 changes: 10 additions & 1 deletion packages/utils/indexer/sqlite3/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ import * as findUp from "find-up";
import fs from "fs";
import path from "path";

const root = path.dirname(await findUp.findUp(".git", { type: "directory" }));
// In git worktrees, `.git` is a *file* (not a directory), so don't constrain the type.
// Fall back to workspace markers for non-git environments.
const rootMarker =
(await findUp.findUp(".git")) ??
(await findUp.findUp("pnpm-workspace.yaml")) ??
(await findUp.findUp("package.json"));
if (!rootMarker) {
throw new Error("Unable to locate repo root (no .git/workspace marker found)");
}
const root = path.dirname(rootMarker);

export default {
// test cmd options
Expand Down
Loading