Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"lint": "eslint . --ext .ts",
"copy-content-files": "copyfiles -u 1 src/**/*.d.ts src/**/*.proto src/**/*.js build/",
"compile": "tsc --build",
"prebuild": "yarn clean && yarn format && yarn lint && echo Using TypeScript && tsc --version",
"prebuild": "yarn clean && yarn lint && echo Using TypeScript && tsc --version",
"build": "yarn compile && yarn copy-content-files",
"codegen": "buf generate",
"dev": "ts-node ./src/dev.ts",
Expand All @@ -36,7 +36,8 @@
"google-protobuf": "^3.20.0",
"lodash": "^4.17.21",
"node-cache": "^5.1.2",
"rxjs": "^7.5.5"
"rxjs": "^7.5.5",
"websocket-ts": "^1.1.1"
},
"devDependencies": {
"@types/dotenv": "^8.2.0",
Expand All @@ -45,6 +46,7 @@
"@types/jest": "^27.0.2",
"@types/js-yaml": "^4.0.1",
"@types/lodash": "^4.14.168",
"@types/node": "16.18.14",
"@typescript-eslint/eslint-plugin": "^5.8.1",
"@typescript-eslint/parser": "^5.8.1",
"copyfiles": "^2.4.1",
Expand Down
22 changes: 22 additions & 0 deletions src/client/getConsumerClientFactory.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export function getConsumerClientFactory() {
const factories = {};

// todo: can we have both clients on node.js ?
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we will be implementing this in the future it's worth mentioning that websockets are not working properly inside Node.js out of the box for some reason. So websocket client for Node.js won't be only about client creation

// conditionally require respective clients.
if (isNodejs()) {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const grpcClient = require("../streamdbConsumerClient/grpcClient");
factories["grpc"] = (endpoint) => new grpcClient.StreamDBConsumerGrpcClient(endpoint);
} else {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const httpClient = require("../streamdbConsumerClient/httpClient");
factories["http"] = (endpoint) => new httpClient.StreamDBConsumerHttpClient(endpoint);
}

return factories;
}

function isNodejs() {
return typeof process === "object" &&
typeof require === "function";
}
29 changes: 23 additions & 6 deletions src/client/proximaStreamClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { StreamDBConsumerClient } from "../streamdb/consumerClient";
import { StreamDBConsumerClient } from "../streamdbConsumerClient/consumerClient";
import { getConsumerClientFactory } from "./getConsumerClientFactory";
import { PausableStream, SimplePauseController } from "../lib/pausableStream";
import { Offset, StreamEndpoint, StreamEvent } from "../model";
import { StreamRegistryClient } from "./streamRegistryClient";
Expand Down Expand Up @@ -35,7 +36,7 @@ export class ProximaStreamClient {
`Can't fetch events for stream ${stream} ${offset.toString()}: no endpoints found`
);

const client = this.getStreamConsumerClient(endpoint.uri);
const client = this.getStreamConsumerClient(endpoint);
const events = await client.getEvents(stream, offset, count, direction);

// store last event's offset endpoint to the cache
Expand Down Expand Up @@ -79,7 +80,7 @@ export class ProximaStreamClient {
}

//new StreamDBConsumerClient(endpoint)
const client = this.getStreamConsumerClient(endpoint.uri);
const client = this.getStreamConsumerClient(endpoint);
currentEventStream = client.getEventsStream(
stream,
currentOffset,
Expand Down Expand Up @@ -130,14 +131,30 @@ export class ProximaStreamClient {
return endpoint;
}

private getStreamConsumerClient(endpoint: string) {
private getStreamConsumerClient(endpoint: StreamEndpoint) {
const createClient = () => {
if (endpoint.uri && consumerClientFactory["grpc"]) {
return consumerClientFactory.grpc(endpoint.uri);
}

if (endpoint.httpUri && consumerClientFactory["http"]) {
return consumerClientFactory.http(endpoint.httpUri);
}

throw new Error(`can't create client for ${JSON.stringify(endpoint)}`);
};

return (
this.clients[endpoint] ??
(this.clients[endpoint] = new StreamDBConsumerClient(endpoint))
this.clients[endpoint.uri] ??
(this.clients[endpoint.uri] = createClient())
);
}
}

export interface StreamClientOptions {
registry?: StreamRegistry;
}

const consumerClientFactory: Partial<
Record<"http" | "grpc", (endpoint: string) => StreamDBConsumerClient>
> = getConsumerClientFactory();
5 changes: 3 additions & 2 deletions src/client/singleStreamDbRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { StreamRegistry } from "./streamRegistry";
import { Offset, StreamEndpoint, StreamStats } from "../model";

export class SingleStreamDbRegistry implements StreamRegistry {
public constructor(private readonly streamDbUrl: string) {}
public constructor(private readonly streamDbUrl: string, private readonly streamDbHttpUrl?: string) {}

public async getStreamEndpoints(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand All @@ -14,7 +14,8 @@ export class SingleStreamDbRegistry implements StreamRegistry {
return [
new StreamEndpoint(
this.streamDbUrl,
new StreamStats(Offset.zero, undefined, undefined, undefined)
new StreamStats(Offset.zero, undefined, undefined, undefined),
this.streamDbHttpUrl,
),
];
}
Expand Down
1 change: 1 addition & 0 deletions src/client/streamRegistryClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ function parseStreamEndpoint(endpoint: any): StreamEndpoint {

return {
uri: endpoint.uri,
httpUri: endpoint.httpUri,
stats: {
start: Offset.fromString(stats.start),
end: Offset.fromString(stats.end),
Expand Down
14 changes: 6 additions & 8 deletions src/dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@ async function main() {
const client = new ProximaStreamClient();

const streamRegistry = new StreamRegistryClient();
const endpoints = await streamRegistry.getStreamEndpoints("abc", Offset.zero);
console.log("ENDPOINTS", endpoints);
const endpoints = await streamRegistry.getStreamEndpoints("proxima.exchange-rates.0_1", Offset.zero);
const allStreams = await streamRegistry.getStreams();
console.dir(allStreams, { depth: 10 });

let currentOffset = Offset.zero;
for (let i = 0; i < 10; i++) {
for (let i = 0; i < 2; i++) {
const events = await client.fetchEvents(
"proxima.eth-main.blocks.1_0",
"proxima.exchange-rates.0_1",
currentOffset,
100,
"next"
Expand All @@ -34,7 +32,7 @@ async function main() {
console.log("starting the stream");

const stream = await client.streamEvents(
"proxima.eth-main.blocks.1_0",
"proxima.exchange-rates.0_1",
Offset.zero
);
const reader = BufferedStreamReader.fromStream(stream, 10000);
Expand All @@ -45,8 +43,8 @@ async function main() {

console.log(
`doing some stuff with a batch of ${batch.length}. ${batch[
batch.length - 1
].offset.toString()}`
batch.length - 1
].offset.toString()}`
);
// simulate processing
await sleep(100);
Expand Down
3 changes: 2 additions & 1 deletion src/model/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ export class StreamStats {
export class StreamEndpoint {
public constructor(
public readonly uri: string,
public readonly stats: StreamStats
public readonly stats: StreamStats,
public readonly httpUri?: string,
) {}
}

Expand Down
17 changes: 17 additions & 0 deletions src/streamdbConsumerClient/consumerClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { StreamEvent, Offset } from "../model";
import { PausableStream, StreamController } from "../lib/pausableStream";

export interface StreamDBConsumerClient {
getEvents(
stream: string,
offset: Offset,
count: number,
direction: "next" | "last"
): Promise<StreamEvent[]>;

getEventsStream(
stream: string,
offset: Offset,
controller?: StreamController
): PausableStream<StreamEvent>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import {
Direction,
StreamStateTransitionsRequest,
} from "../gen/stream_consumer/v1alpha1/stream_consumer";
import { offsetToProto, stateTransitionProtoToStreamEvent } from "./converters";
import { offsetToProto, stateTransitionProtoToStreamEvent } from "../streamdb/converters";
import * as grpc from "@grpc/grpc-js";
import { StreamEvent, Offset } from "../model";

import { PausableStream, StreamController } from "../lib/pausableStream";
import {StreamDBConsumerClient} from "./consumerClient";

export class StreamDBConsumerClient {
export class StreamDBConsumerGrpcClient implements StreamDBConsumerClient {
private consumer: StreamConsumerServiceClient;

constructor(uri: string) {
Expand Down
74 changes: 74 additions & 0 deletions src/streamdbConsumerClient/httpClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import {Offset, StreamEvent} from "../model";
import {Axios} from "axios";
import {
GetStateTransitionsResponse,
StreamStateTransitionsResponse
} from "../gen/stream_consumer/v1alpha1/stream_consumer";
import {stateTransitionProtoToStreamEvent} from "../streamdb/converters";
import {PausableStream, StreamController} from "../lib/pausableStream";
import {ExponentialBackoff, WebsocketBuilder} from "websocket-ts";
import {StreamDBConsumerClient} from "./consumerClient";

export class StreamDBConsumerHttpClient implements StreamDBConsumerClient {
private readonly client: Axios;

constructor(private readonly uri: string) {
this.client = new Axios({
baseURL: "https://" + this.uri,
validateStatus: status =>
(status >= 200 && status < 300) || status == 404,
});
}

public getEvents(
stream: string,
offset: Offset,
count: number,
direction: "next" | "last"
): Promise<StreamEvent[]> {
return this.client
.post(`/api/consumer/${stream}/transitions`,
JSON.stringify({
offset: offset,
count: count,
direction: direction.toUpperCase(),
}, (key, value) => (typeof value === "bigint" ? value.toString() : value)
)
).then(resp =>
(JSON.parse(resp.data) as GetStateTransitionsResponse)
.stateTransitions
.map(stateTransitionProtoToStreamEvent)
);
}

public getEventsStream(
stream: string,
offset: Offset,
controller?: StreamController
): PausableStream<StreamEvent> {
return PausableStream.create<StreamEvent>((observer, _) => {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivanbenko is it necessary to use PauseState here? I haven't dove deeply into its internals

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, pauseState is important. Client should react to its state. It's being used by Buffered Reader to prevent overconsuming events

new WebsocketBuilder(webSocketUriForOffset(this.uri, stream, offset))
.onClose(() => observer.complete())
.onError((_, ev) => observer.error(ev.type))
.onMessage((_, ev) =>
(JSON.parse(ev.data)["result"] as StreamStateTransitionsResponse)
.stateTransition
.map(stateTransitionProtoToStreamEvent)
.forEach(transition => observer.next(transition)))
.withBackoff(new ExponentialBackoff(100, 7))
.build();
}, controller);
}
}

function webSocketUriForOffset(endpoint: string, streamId: string, offset: Offset): string {
const params = new URLSearchParams({
"offset.height": offset.height.toString(),
"offset.timestamp.epochMs": offset.timestamp.epochMs.toString(),
});

// creates "&offset.timestamp.parts=parts[0]&offset.timestamp.parts=parts[1]" etc. string
const parts = ["", ...offset.timestamp.parts].join("&offset.timestamp.parts=");

return `wss://${endpoint}/api/consumer/${streamId}/stream?${params.toString()}${parts}`;
}
1 change: 1 addition & 0 deletions tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"outDir": "./build",
"allowSyntheticDefaultImports": true,
"skipLibCheck": true,
"allowJs": true,
"jsx": "react",
"strict": true,
"declaration": true,
Expand Down
Loading