-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple-consumer.ts
More file actions
43 lines (34 loc) · 1.04 KB
/
simple-consumer.ts
File metadata and controls
43 lines (34 loc) · 1.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import { SchemaRegistry } from '../lib/schema-registry';
import { AvroConsumer, AvroMessage } from '../lib/kafka';
const consumerOpts = {
'metadata.broker.list': 'kafka:9092',
'group.id': 'my-group-id',
'socket.nagle.disable': true,
'socket.keepalive.enable': true,
'enable.auto.commit': false,
'enable.auto.offset.store': true,
'log.connection.close': false,
};
const consumerOffset = {
'auto.offset.reset': 'earliest',
};
const streamOptions = {
topics: ['simple-consumer-topic'],
};
const startConsumer = async () => {
const sr = new SchemaRegistry(
'schema-registry:8081',
'simple-consumer-topic',
'latest'
);
await sr.load();
const avroConsumer = new AvroConsumer(consumerOpts, consumerOffset, streamOptions, sr.schemas);
avroConsumer.on('avro', (data: AvroMessage) => {
console.log(`Received Message! (Offset: ${data.offset})`);
console.log(`Value: ${data.value}`);
console.log(`Key: ${data.key}`);
// This is ugly.
avroConsumer.stream.consumer.commitMessage(data);
});
};
startConsumer();