Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 190 additions & 98 deletions packages/programs/data/shared-log/src/index.ts

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions packages/programs/data/shared-log/src/replication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
ReplicationRangeMessage,
ReplicationRangeMessageU32,
} from "./ranges.js";
import { denormalizer } from "./integers.js";
import { Observer, Replicator, Role } from "./role.js";

export type ReplicationLimits = { min: MinReplicas; max?: MinReplicas };
Expand Down Expand Up @@ -64,14 +65,15 @@ export class ResponseRoleMessage extends TransportMessage {
}

toReplicationInfoMessage(): AllReplicatingSegmentsMessage {
const denormalizeru32 = denormalizer("u32");
return new AllReplicatingSegmentsMessage({
segments:
this.role instanceof Replicator
? this.role.segments.map((x) => {
return new ReplicationRangeMessageU32({
id: randomBytes(32),
offset: x.offset,
factor: x.factor,
offset: denormalizeru32(x.offset),
factor: denormalizeru32(x.factor),
timestamp: x.timestamp,
mode: ReplicationIntent.NonStrict,
});
Expand Down
2 changes: 1 addition & 1 deletion packages/programs/data/shared-log/src/role.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class RoleReplicationSegment {
if (offset > 1 || offset < 0) {
throw new Error("Expecting offset to be between 0 and 1, got: " + offset);
}
this.offsetNominator = denormalizeru32(factor);
this.offsetNominator = denormalizeru32(offset);
}

get factor(): number {
Expand Down
2 changes: 1 addition & 1 deletion packages/programs/data/shared-log/test/migration.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ describe(`migration-8-9`, function () {
if (msg instanceof RequestReplicationInfoMessage) {
// TODO we never respond to this message, nor in older version do we need to send it
// we are keeping this here to mimic the old behaviour
await db.log.rpc.send(
await db1.log.rpc.send(
new ResponseRoleMessage({
role: new Replicator({ factor: 1, offset: 0 }),
}),
Expand Down
59 changes: 38 additions & 21 deletions packages/programs/data/shared-log/test/replicate.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,31 +328,48 @@ describe(`replicate`, () => {
await checkRoleIsDynamic(db1.log);
});

it("waitForReplicator waits until maturity", async () => {
const store = new EventStore<string, any>();
it("waitForReplicator waits until maturity", async () => {
const store = new EventStore<string, any>();

const db1 = await session.peers[0].open(store.clone(), {
args: {
replicate: {
factor: 1,
const db1 = await session.peers[0].open(store.clone(), {
args: {
replicate: {
factor: 1,
},
},
},
});
const db2 = await session.peers[1].open(store.clone(), {
args: {
replicate: {
factor: 1,
});
const db2 = await session.peers[1].open(store.clone(), {
args: {
replicate: {
factor: 1,
},
},
},
});
const roleAgeMs = 3e3;
db2.log.getDefaultMinRoleAge = () => Promise.resolve(roleAgeMs);

// Ensure we have observed db1's replication segment so we can compute the
// remaining time until maturity. Depending on timing, the segment may be
// present slightly before we start waiting.
await waitForResolved(async () => {
const rects = await db2.log.replicationIndex
.iterate({ query: { hash: db1.node.identity.publicKey.hashcode() } })
.all();
expect(rects[0]?.value).to.exist;
});

const rect = (
await db2.log.replicationIndex
.iterate({ query: { hash: db1.node.identity.publicKey.hashcode() } })
.all()
)[0]!.value;

const t0 = +new Date();
await db2.log.waitForReplicator(db1.node.identity.publicKey);
const t1 = +new Date();
const remaining = Math.max(0, roleAgeMs - (t0 - Number(rect.timestamp)));
expect(t1 - t0).greaterThanOrEqual(remaining - 100); // - 100 for timer inaccuracy
});
db2.log.getDefaultMinRoleAge = () => Promise.resolve(3e3);
const t0 = +new Date();
await db2.log.waitForReplicator(db1.node.identity.publicKey);
const t1 = +new Date();
expect(t1 - t0).greaterThanOrEqual(
(await db2.log.getDefaultMinRoleAge()) - 100,
); // - 100 for handle timer inaccuracy
});

it("waitForReplicator eager resolves before maturity", async () => {
const store = new EventStore<string, any>();
Expand Down
110 changes: 58 additions & 52 deletions packages/programs/data/shared-log/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,62 +223,68 @@ export const checkReplicas = async (
entryCount: number,
) => {
try {
await waitForResolved(async () => {
const map = new Map<string, number>();
const hashToEntry = new Map<string, Entry<any>>();
for (const db of dbs) {
for (const value of await db.log.log.toArray()) {
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
expect(await db.log.log.blocks.has(value.hash)).to.be.true;
map.set(value.hash, (map.get(value.hash) || 0) + 1);
hashToEntry.set(value.hash, value);
// Under heavy CI load, full replication can take longer than the default
// waitForResolved timeout (10s). Also, polling too frequently increases
// load by repeatedly materializing log arrays.
await waitForResolved(
async () => {
const map = new Map<string, number>();
const hashToEntry = new Map<string, Entry<any>>();
for (const db of dbs) {
for (const value of await db.log.log.toArray()) {
// eslint-disable-next-line @typescript-eslint/no-unused-expressions
expect(await db.log.log.blocks.has(value.hash)).to.be.true;
map.set(value.hash, (map.get(value.hash) || 0) + 1);
hashToEntry.set(value.hash, value);
}
}
}
for (const [_k, v] of map) {
try {
expect(v).greaterThanOrEqual(minReplicas);
} catch (error) {
const entry = hashToEntry.get(_k)!;
const gid = entry.meta.gid;
const coordinates = await dbs[0].log.createCoordinates(
entry,
minReplicas,
);
throw new Error(
"Did not fulfill min replicas level for " +
entry.hash +
" coordinates" +
JSON.stringify(coordinates.map((x) => x.toString())) +
" of: " +
minReplicas +
" got " +
v +
". Gid to peer history? " +
JSON.stringify(
dbs.map(
(x) =>
[...(x.log._gidPeersHistory.get(gid) || [])].filter(
(id) => id !== x.log.node.identity.publicKey.hashcode(),
).length || 0,
) +
". Has? " +
JSON.stringify(
await Promise.all(
dbs.map((x) => x.log.log.has(entry.hash)),
),
for (const [_k, v] of map) {
try {
expect(v).greaterThanOrEqual(minReplicas);
} catch (error) {
const entry = hashToEntry.get(_k)!;
const gid = entry.meta.gid;
const coordinates = await dbs[0].log.createCoordinates(
entry,
minReplicas,
);
throw new Error(
"Did not fulfill min replicas level for " +
entry.hash +
" coordinates" +
JSON.stringify(coordinates.map((x) => x.toString())) +
" of: " +
minReplicas +
" got " +
v +
". Gid to peer history? " +
JSON.stringify(
dbs.map(
(x) =>
[...(x.log._gidPeersHistory.get(gid) || [])].filter(
(id) => id !== x.log.node.identity.publicKey.hashcode(),
).length || 0,
) +
", sync in flight ? " +
JSON.stringify(
dbs.map((x) =>
x.log.syncronizer.syncInFlight.has(entry.hash),
". Has? " +
JSON.stringify(
await Promise.all(
dbs.map((x) => x.log.log.has(entry.hash)),
),
) +
", sync in flight ? " +
JSON.stringify(
dbs.map((x) =>
x.log.syncronizer.syncInFlight.has(entry.hash),
),
),
),
),
);
),
);
}
expect(v).lessThanOrEqual(dbs.length);
}
expect(v).lessThanOrEqual(dbs.length);
}
});
},
{ timeout: 20_000, delayInterval: 250 },
);
} catch (error) {
await dbgLogs(dbs.map((x) => x.log));
throw error;
Expand Down
14 changes: 14 additions & 0 deletions packages/transport/pubsub/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {

async subscribe(topic: string) {
// this.debounceUnsubscribeAggregator.delete(topic);
if (!this.topics.has(topic)) {
this.initializeTopic(topic);
}
return this.debounceSubscribeAggregator.add({ key: topic });
}

Expand Down Expand Up @@ -178,6 +181,8 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {
async unsubscribe(topic: string) {
if (this.debounceSubscribeAggregator.has(topic)) {
this.debounceSubscribeAggregator.delete(topic); // cancel subscription before it performed
this.topics.delete(topic);
this.topicsToPeers.delete(topic);
return false;
}
const subscriptions = this.subscriptions.get(topic);
Expand Down Expand Up @@ -729,6 +734,15 @@ export class DirectSub extends DirectStream<PubSubEvents> implements PubSub {
const mySubscriptions = this.getSubscriptionOverlap(
pubsubMessage.topics,
);
// Also include topics pending in the debounce window
for (const topic of pubsubMessage.topics) {
if (
!mySubscriptions.includes(topic) &&
this.debounceSubscribeAggregator.has(topic)
) {
mySubscriptions.push(topic);
}
}
if (mySubscriptions.length > 0) {
const response = new DataMessage({
data: toUint8Array(
Expand Down
Loading
Loading