forked from ovotech/castle
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathintegration.spec.ts
More file actions
309 lines (245 loc) · 11.7 KB
/
integration.spec.ts
File metadata and controls
309 lines (245 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import { retry } from 'ts-retry-promise';
import * as ansiRegex from 'ansi-regex';
import * as uuid from 'uuid';
import { Output, castle } from '../src';
import { AvroKafka, SchemaRegistry } from '@ovotech/avro-kafkajs';
import { Kafka, logLevel, ConfigResourceTypes } from 'kafkajs';
import { join } from 'path';
import { readFileSync, writeFileSync } from 'fs';
const topic1 = `test_topic1_${uuid.v4()}`;
const topic2 = `test_topic2_${uuid.v4()}`;
const config = `test_config_${uuid.v4()}`;
const groupId = `test_groupId_${uuid.v4()}`;
class Logger {
public std = '';
public err = '';
public log(line: string): void {
this.std += line.replace(ansiRegex(), '') + '\n';
}
public error(line: string): void {
this.err += line.replace(ansiRegex(), '') + '\n';
}
public clear(): void {
this.std = '';
this.err = '';
}
}
const logger = new Logger();
const output = new Output(logger, false);
const kafka = new AvroKafka(
new SchemaRegistry({ uri: 'http://localhost:8081' }),
new Kafka({ brokers: ['localhost:29092'], logLevel: logLevel.ERROR, clientId: 'testClient' }),
);
const admin = kafka.admin();
const consumer = kafka.consumer({ groupId: uuid.v4() });
describe('Integration', () => {
it('Should process', async () => {
jest.setTimeout(100000);
await Promise.all([admin.connect(), consumer.connect()]);
try {
// Config set
// ================================================
const configSet1 = `node castle config set ${config} --schema-registry http://localhost:8081 --kafka-broker localhost:29092`;
await castle(output).parseAsync(configSet1.split(' '));
expect(logger.std).toContain(`Setting config "${config}"`);
expect(logger.std).toContain(`Success`);
logger.clear();
// Config search
// ================================================
const configSearch1 = `node castle config search ${config}`;
await castle(output).parseAsync(configSearch1.split(' '));
expect(logger.std).toContain(`Searching for config "${config}"`);
expect(logger.std).toContain(
`${config} | Kafka: localhost:29092 SchemaRegistry: http://localhost:8081`,
);
logger.clear();
// Config remove
// ================================================
const configRemove1 = `node castle config remove ${config}`;
await castle(output).parseAsync(configRemove1.split(' '));
expect(logger.std).toContain(`Removing config "${config}"`);
expect(logger.std).toContain('Success');
logger.clear();
// Create small topic
// ================================================
const createTopic1 = `node castle topic create ${topic1}`;
await castle(output).parseAsync(createTopic1.split(' '));
expect(logger.std).toContain(`Creating topic "${topic1}"`);
expect(logger.std).toContain(`Number of partitions | 1`);
expect(logger.std).toContain(`Replication factor | 1`);
expect(logger.std).toContain(`Complete`);
logger.clear();
// Create big topic
// ================================================
const createTopic2 = `node castle topic create ${topic2} --num-partitions 3 --config-entry file.delete.delay.ms=40000`;
await castle(output).parseAsync(createTopic2.split(' '));
expect(logger.std).toContain(`Creating topic "${topic2}"`);
expect(logger.std).toContain(`Number of partitions | 3`);
expect(logger.std).toContain(`Replication factor | 1`);
expect(logger.std).toContain(`file.delete.delay.ms | 40000`);
expect(logger.std).toContain(`Complete`);
logger.clear();
// Check topics actually created
// ================================================
const metadata = await admin.fetchTopicMetadata({ topics: [topic1, topic2] });
expect(metadata.topics).toContainEqual({
name: topic1,
partitions: [expect.any(Object)],
});
expect(metadata.topics).toContainEqual({
name: topic2,
partitions: [expect.any(Object), expect.any(Object), expect.any(Object)],
});
// Check topic-info for big topic
// ================================================
const topicInfo2 = `node castle topic show ${topic2}`;
await castle(output).parseAsync(topicInfo2.split(' '));
expect(logger.std).toContain(`Topic "${topic2}"`);
expect(logger.std).toContain(`file.delete.delay.ms | 40000`);
logger.clear();
// Check topic-update works for big topic
// ================================================
const topicUpdate2 = `node castle topic update ${topic2} --config-entry file.delete.delay.ms=50000`;
await castle(output).parseAsync(topicUpdate2.split(' '));
expect(logger.std).toContain(`Updating topic "${topic2}"`);
expect(logger.std).toContain(`file.delete.delay.ms | 50000`);
expect(logger.std).toContain(`Complete`);
logger.clear();
// Check config was updated
// ================================================
const configs = await admin.describeConfigs({
includeSynonyms: false,
resources: [{ type: ConfigResourceTypes.TOPIC, name: topic2 }],
});
expect(configs.resources[0].configEntries).toContainEqual(
expect.objectContaining({
configName: 'file.delete.delay.ms',
configValue: '50000',
}),
);
// Check topics
// ================================================
const topics = `node castle topic search ${topic2}`;
await castle(output).parseAsync(topics.split(' '));
expect(logger.std).toContain(`Topics containing "${topic2}"`);
expect(logger.std).toContain(`${topic2} | 3 | 168 Hours | delete`);
logger.clear();
// Produce Ad-Hoc Messages
// ================================================
const topicMessages: unknown[] = [];
await consumer.subscribe({ topic: topic1 });
await consumer.subscribe({ topic: topic2 });
await consumer.run({
eachMessage: async ({ message }) => {
topicMessages.push(message);
},
});
const schemaFile = join(__dirname, 'schema1.json');
const keySchemaFile = join(__dirname, 'schema2.json');
const produceMessage1 = `node castle topic message ${topic1} --schema-file ${schemaFile} --key-schema-file ${keySchemaFile} --message {"field1":"other"} --key {"id":11}`;
await castle(output).parseAsync(produceMessage1.split(' '));
expect(logger.std).toContain(`Produce message in "${topic1}"`);
expect(logger.std).toContain(`Success`);
logger.clear();
await retry(
async () => {
expect(topicMessages).toContainEqual(
expect.objectContaining({ value: { field1: 'other' } }),
);
},
{ retries: 4, delay: 1000 },
);
// Produce Messages
// ================================================
const produceTemplate = JSON.parse(
readFileSync(join(__dirname, 'produce-template.json'), 'utf8'),
);
const produceFile = join(__dirname, '__generated__', 'produce-file.json');
writeFileSync(produceFile, JSON.stringify({ ...produceTemplate, topic: topic2 }));
const produce2 = `node castle topic produce ${produceFile}`;
await castle(output).parseAsync(produce2.split(' '));
expect(logger.std).toContain(`Produce "10" messages for ${topic2}`);
expect(logger.std).toContain('Success');
logger.clear();
await retry(
async () => {
expect(topicMessages).toHaveLength(11);
},
{ retries: 4, delay: 1000 },
);
await consumer.stop();
consumer.disconnect();
// Schema Search
// ================================================
const schemaSearch2 = `node castle schema search ${topic2}`;
await castle(output).parseAsync(schemaSearch2.split(' '));
expect(logger.std).toContain(`Searching for schemas "${topic2}"`);
logger.clear();
// Schema
// ================================================
const schema2 = `node castle schema show ${topic2}-value --depth 8`;
await castle(output).parseAsync(schema2.split(' '));
expect(logger.std).toContain(`Showing schema "${topic2}-value"`);
expect(logger.std).toContain('Version 01');
expect(logger.std).toContain("type: 'record'");
expect(logger.std).toContain("name: 'Event'");
expect(logger.std).toContain("fields: [ { name: 'field1', type: 'string' } ]");
logger.clear();
// Consume With Key
// ================================================
const consume1 = `node castle topic consume ${topic1} --group-id ${groupId} --encoded-key`;
await castle(output).parseAsync(consume1.split(' '));
expect(logger.std).toContain(`Consume "${topic1}"`);
expect(logger.std).toContain('Key { id: 11 }');
expect(logger.std).toContain("Event { field1: 'other' }");
expect(logger.std).toContain('Success');
logger.clear();
// Consume
// ================================================
const consume2 = `node castle topic consume ${topic2} --group-id ${groupId}`;
await castle(output).parseAsync(consume2.split(' '));
expect(logger.std).toContain(`Consume "${topic2}"`);
expect(logger.std).toContain('Partition 0 - Offsets 0...2 (100%)');
expect(logger.std).toContain("Event { field1: 'test1' }");
expect(logger.std).toContain("Event { field1: 'test4' }");
expect(logger.std).toContain("Event { field1: 'test5' }");
expect(logger.std).toContain('Partition 1 - Offsets 0...3 (100%)');
expect(logger.std).toContain("Event { field1: 'test2' }");
expect(logger.std).toContain("Event { field1: 'test6' }");
expect(logger.std).toContain("Event { field1: 'test7' }");
expect(logger.std).toContain("Event { field1: 'test8' }");
expect(logger.std).toContain('Partition 2 - Offsets 0...2 (100%)');
expect(logger.std).toContain("Event { field1: 'test3' }");
expect(logger.std).toContain("Event { field1: 'test10' }");
expect(logger.std).toContain("Event { field1: 'test11' }");
expect(logger.std).toContain('Success');
logger.clear();
// Group Show
// ================================================
const groupShow = `node castle group show ${groupId} ${topic2}`;
await castle(output).parseAsync(groupShow.split(' '));
expect(logger.std).toContain(`Consumer group "${groupId}"`);
expect(logger.std).toContain('Partition | Offset | Group Offset | Lag | Metadata');
expect(logger.std).toContain('0 | 3 | 3 | 0 |');
expect(logger.std).toContain('1 | 4 | 4 | 0 |');
expect(logger.std).toContain('2 | 3 | 3 | 0 |');
logger.clear();
// Group Update Reset Offsets
// ================================================
const groupUpdate1 = `node castle group update ${groupId} ${topic2} --reset-offsets latest`;
await castle(output).parseAsync(groupUpdate1.split(' '));
expect(logger.std).toContain(`Success. Topic ${topic2} offsets reset to latest`);
logger.clear();
// Group Update Set Offsets
// ================================================
const groupUpdate2 = `node castle group update ${groupId} ${topic2} --set-offset 0=2 --set-offset 1=2`;
await castle(output).parseAsync(groupUpdate2.split(' '));
expect(logger.std).toContain(`Success. Topic ${topic2} offsets set`);
expect(logger.std).toContain(`Partition | Offset`);
expect(logger.std).toContain(`0 | 2`);
expect(logger.std).toContain(`1 | 2`);
} finally {
await Promise.all([admin.disconnect(), consumer.disconnect()]);
}
});
});