Skip to content

Commit ca415e2

Browse files
SDK 0.3.0 release
Updating SDK to use Kafka 2.6.0 Updating base docker image for 0.20.0 Updating mirrormaker Updating certificate management code to download the correct cert Adding date in consumer-group-id
1 parent d1d87d7 commit ca415e2

File tree

11 files changed

+171
-94
lines changed

11 files changed

+171
-94
lines changed

docker/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ RUN mvn -B \
1010

1111
### Build Images ###
1212
## SDK app ##
13-
FROM strimzi/kafka:0.14.0-kafka-2.3.0 as sdk-app
13+
FROM strimzi/kafka:0.20.0-kafka-2.6.0 as sdk-app
1414

1515
COPY . /home/kafka
1616

@@ -24,4 +24,4 @@ ENV JAVAX_NET_SSL_TRUSTSTORE=truststore/ncdsTrustStore.p12
2424

2525
ENTRYPOINT ["bash","docker/run-sdk-app.sh"]
2626

27-
CMD ["-opt", "TOPICS"]
27+
CMD ["-opt", "TOPICS"]

docker/mirrormaker/template/kafka-mirror-maker.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ kind: KafkaMirrorMaker
33
metadata:
44
name: my-mirror-maker
55
spec:
6-
version: 2.3.0
6+
version: 2.6.0
77
replicas: 1
88
consumer:
99
bootstrapServers: clouddataservice.broker.bootstrap.nasdaq.com:9094

ncds-sdk/pom.xml

Lines changed: 73 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,20 @@
77
<parent>
88
<groupId>com.nasdaq.ncds</groupId>
99
<artifactId>ncds</artifactId>
10-
<version>0.0.2</version>
10+
<version>0.3.0</version>
1111
<relativePath>../pom.xml</relativePath>
1212
</parent>
1313

1414
<artifactId>ncds-sdk</artifactId>
15-
<version>0.0.2</version>
15+
<packaging>jar</packaging>
16+
<properties>
17+
<kafkaScalaVersion>kafka_2.12</kafkaScalaVersion>
18+
<junit5.version>5.6.2</junit5.version>
19+
<junit5PlatformProvider.version>1.3.2</junit5PlatformProvider.version>
20+
<curatorTestVersion>2.12.0</curatorTestVersion>
21+
<slf4jVersion>1.7.30</slf4jVersion>
22+
<surefire.version>2.22.2</surefire.version>
23+
</properties>
1624

1725
<name>SDK</name>
1826
<description>Provide Development Kit to connect with Kafka</description>
@@ -43,76 +51,88 @@
4351
<dependency>
4452
<groupId>io.strimzi</groupId>
4553
<artifactId>kafka-oauth-common</artifactId>
46-
<version>0.1.0</version>
54+
<version>0.6.0</version>
4755
<scope>compile</scope>
4856
</dependency>
4957

50-
<dependency>
51-
<groupId>org.apache.kafka</groupId>
52-
<artifactId>kafka_2.11</artifactId>
53-
<version>${kafka.version}</version>
54-
</dependency>
5558
<!-- Testing -->
59+
60+
<!-- Kafka -->
61+
<dependency>
62+
<groupId>org.apache.kafka</groupId>
63+
<artifactId>${kafkaScalaVersion}</artifactId>
64+
<version>${kafka.version}</version>
65+
<exclusions>
66+
<!-- Don't bring in kafka's logging framework -->
67+
<exclusion>
68+
<groupId>org.slf4j</groupId>
69+
<artifactId>slf4j-log4j12</artifactId>
70+
</exclusion>
71+
<exclusion>
72+
<groupId>javax.mail</groupId>
73+
<artifactId>mail</artifactId>
74+
</exclusion>
75+
</exclusions>
76+
<scope>test</scope>
77+
</dependency>
78+
5679
<dependency>
57-
<artifactId>kafka-junit-core</artifactId>
5880
<groupId>com.salesforce.kafka.test</groupId>
59-
<version>3.2.0</version>
60-
81+
<artifactId>kafka-junit-core</artifactId>
82+
<version>3.2.2</version>
83+
<scope>test</scope>
6184
</dependency>
6285

86+
<!-- JUnit5 tests -->
6387
<dependency>
6488
<groupId>org.junit.jupiter</groupId>
6589
<artifactId>junit-jupiter-api</artifactId>
66-
<version>5.5.2</version>
67-
<scope>compile</scope>
90+
<version>${junit5.version}</version>
91+
<scope>test</scope>
6892
</dependency>
6993

7094
<dependency>
7195
<groupId>org.junit.jupiter</groupId>
72-
<artifactId>junit-jupiter-engine</artifactId>
73-
<version>5.5.2</version>
96+
<artifactId>junit-jupiter-params</artifactId>
97+
<version>${junit5.version}</version>
98+
<scope>test</scope>
7499
</dependency>
75100

101+
<!-- Mockito for mocks in tests -->
76102
<dependency>
77-
<groupId>org.junit.jupiter</groupId>
78-
<artifactId>junit-jupiter-params</artifactId>
79-
<version>5.5.2</version>
80-
103+
<groupId>org.mockito</groupId>
104+
<artifactId>mockito-core</artifactId>
105+
<version>2.28.2</version>
106+
<scope>test</scope>
81107
</dependency>
82108

83109
<dependency>
84-
<groupId>org.hamcrest</groupId>
85-
<artifactId>hamcrest-all</artifactId>
86-
<version>1.3</version>
87-
110+
<groupId>org.apache.curator</groupId>
111+
<artifactId>curator-test</artifactId>
112+
<version>${curatorTestVersion}</version>
113+
<scope>test</scope>
88114
</dependency>
89115

116+
<!-- Logging in tests -->
90117
<dependency>
91-
<groupId>org.apache.maven.plugins</groupId>
92-
<artifactId>maven-checkstyle-plugin</artifactId>
93-
<version>3.1.0</version>
94-
<type>maven-plugin</type>
95-
118+
<groupId>org.slf4j</groupId>
119+
<artifactId>slf4j-simple</artifactId>
120+
<version>${slf4jVersion}</version>
121+
<scope>test</scope>
96122
</dependency>
97123

124+
<!-- Testing support class -->
98125
<dependency>
99126
<groupId>com.github.stephenc.high-scale-lib</groupId>
100127
<artifactId>high-scale-lib</artifactId>
101128
<version>1.1.4</version>
102-
129+
<scope>test</scope>
103130
</dependency>
104131

105132
<dependency>
106133
<groupId>org.apache.commons</groupId>
107134
<artifactId>commons-lang3</artifactId>
108135
<version>3.3.2</version>
109-
110-
</dependency>
111-
112-
<dependency>
113-
<groupId>org.apache.logging.log4j</groupId>
114-
<artifactId>log4j-core</artifactId>
115-
<version>2.13.2</version>
116136
<scope>test</scope>
117137
</dependency>
118138

@@ -122,22 +142,36 @@
122142
<version>1.3.176</version>
123143
<scope>test</scope>
124144
</dependency>
145+
125146
</dependencies>
126147

127148
<build>
128149
<plugins>
129150
<plugin>
130151
<groupId>org.apache.maven.plugins</groupId>
131152
<artifactId>maven-compiler-plugin</artifactId>
153+
<version>3.8.0</version>
132154
<configuration>
133-
<source>8</source>
134-
<target>8</target>
155+
<source>1.8</source>
156+
<target>1.8</target>
135157
</configuration>
136158
</plugin>
137159
<plugin>
138160
<groupId>org.apache.maven.plugins</groupId>
139161
<artifactId>maven-surefire-plugin</artifactId>
140-
<version>2.22.1</version>
162+
<version>${surefire.version}</version>
163+
<dependencies>
164+
<dependency>
165+
<groupId>org.junit.platform</groupId>
166+
<artifactId>junit-platform-surefire-provider</artifactId>
167+
<version>${junit5PlatformProvider.version}</version>
168+
</dependency>
169+
<dependency>
170+
<groupId>org.junit.jupiter</groupId>
171+
<artifactId>junit-jupiter-engine</artifactId>
172+
<version>${junit5.version}</version>
173+
</dependency>
174+
</dependencies>
141175
</plugin>
142176
</plugins>
143177
</build>

ncds-sdk/src/main/java/com/nasdaq/ncdsclient/consumer/NasdaqKafkaAvroConsumer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import org.apache.kafka.common.serialization.StringDeserializer;
1818

1919

20+
import java.text.DateFormat;
21+
import java.text.SimpleDateFormat;
2022
import java.util.*;
2123

2224
import static com.nasdaq.ncdsclient.internal.utils.AuthenticationConfigLoader.getClientID;
@@ -151,7 +153,7 @@ public KafkaAvroConsumer getConsumer(Schema avroSchema) throws Exception {
151153
if(!kafkaProps.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
152154
kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
153155
}
154-
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" + UUID.randomUUID().toString());
156+
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, this.clientID + "_" +getDate() + "_" + UUID.randomUUID().toString());
155157
ConfigProperties.resolve(kafkaProps);
156158
return new KafkaAvroConsumer(kafkaProps, avroSchema);
157159
}
@@ -211,4 +213,12 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
211213
throw (e);
212214
}
213215
}
216+
217+
private String getDate(){
218+
// Get Today's EST date
219+
DateFormat dateformat = new SimpleDateFormat("yyyy-MM-dd");
220+
dateformat.setTimeZone(TimeZone.getTimeZone("America/New_York"));
221+
String date = dateformat.format(new Date());
222+
return date;
223+
}
214224
}

ncds-sdk/src/main/java/com/nasdaq/ncdsclient/internal/utils/InstallCertificates.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
import org.apache.http.impl.client.HttpClients;
99
import org.apache.http.message.BasicHeader;
1010
import org.apache.http.util.EntityUtils;
11+
import org.bouncycastle.asn1.x509.*;
1112
import org.json.JSONObject;
13+
import sun.security.x509.URIName;
1214
import sun.security.x509.X509CertImpl;
1315

1416
import javax.net.ssl.HttpsURLConnection;
@@ -17,6 +19,8 @@
1719
import java.security.KeyStore;
1820
import java.security.cert.Certificate;
1921
import java.security.cert.CertificateFactory;
22+
import java.security.cert.X509Certificate;
23+
import java.util.List;
2024

2125

2226
public class InstallCertificates {
@@ -123,14 +127,26 @@ private void installCertsToTrustStore(String password) throws Exception {
123127

124128
}
125129

126-
private Certificate getAuthCertificate() throws IOException {
130+
private Certificate getAuthCertificate() throws Exception {
127131
URL url = new URL(null, authUrl, new sun.net.www.protocol.https.Handler());
128132
HttpsURLConnection con = (HttpsURLConnection)url.openConnection();
129133
con.connect();
130134
Certificate[] certs = con.getServerCertificates();
131-
for(Certificate cert : certs){
132-
if(((X509CertImpl) cert).getSubjectDN().getName().contains("keycloak")){
133-
return cert;
135+
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
136+
for(Certificate cert : certs) {
137+
List<sun.security.x509.AccessDescription> descriptions = ((X509CertImpl) cert).getAuthorityInfoAccessExtension().getAccessDescriptions();
138+
for (sun.security.x509.AccessDescription ad : descriptions) {
139+
// check if it's a URL to issuer's certificate
140+
if (ad.getAccessMethod().toString().equals(X509ObjectIdentifiers.id_ad_caIssuers.toString())) {
141+
sun.security.x509.GeneralName location = ad.getAccessLocation();
142+
if (location.getType() == GeneralName.uniformResourceIdentifier) {
143+
// Get issuer's URL
144+
String issuerUrl = ((URIName) location.getName()).getURI().toString();
145+
URL url1 = new URL(issuerUrl);
146+
X509Certificate issuer = (X509Certificate) certificateFactory.generateCertificate(url1.openStream());
147+
return issuer;
148+
}
149+
}
134150
}
135151
}
136152
return null;

ncds-sdk/src/test/java/com/nasdaq/ncdsclient/NCDSSDKJunitTest.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package com.nasdaq.ncdsclient;
22

3-
import junit.framework.Assert;
43
import com.nasdaq.ncdsclient.utils.NCDSTestUtil;
54
import org.apache.avro.generic.GenericRecord;
65
import org.apache.kafka.clients.consumer.ConsumerRecord;
76
import org.apache.kafka.clients.consumer.ConsumerRecords;
87
import org.junit.jupiter.api.*;
98

10-
import java.util.ArrayList;
11-
import java.util.Arrays;
12-
import java.util.Collections;
13-
import java.util.Iterator;
9+
import java.util.*;
1410

11+
import static org.junit.jupiter.api.Assertions.assertEquals;
12+
import static org.junit.jupiter.api.Assertions.assertNotNull;
1513

1614
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
1715
class NCDSSDKJunitTest {
@@ -23,16 +21,16 @@ class NCDSSDKJunitTest {
2321
ncdsTestUtil = new NCDSTestUtil();
2422
} catch (Exception e) {
2523
e.printStackTrace();
24+
System.exit(0);
2625
}
2726
}
2827

29-
3028
@Test
3129
@Order(1)
32-
void testNCDSClient() {
30+
public void testNCDSClient() {
3331
try {
3432
NCDSClient ncdsClient = new NCDSClient(null, null);
35-
Assertions.assertNotNull(ncdsClient);
33+
assertNotNull(ncdsClient);
3634
} catch (Exception e){
3735
new AssertionError("Error");
3836
System.out.println(e.getMessage());
@@ -42,41 +40,41 @@ void testNCDSClient() {
4240

4341
@Test
4442
@Order(2)
45-
void testListTopicsForTheClient() {
43+
public void testListTopicsForTheClient() {
4644
try {
4745
NCDSClient ncdsClient = new NCDSClient(null, null);
4846
String[] topics = ncdsClient.ListTopicsForTheClient();
4947
Collections.sort(Arrays.asList(topics));
5048
String[] addedTopics = ncdsTestUtil.getAddedTopics();
5149
Collections.sort(Arrays.asList(addedTopics));
52-
Assert.assertEquals(Arrays.asList(topics),Arrays.asList(addedTopics));
50+
assertEquals(Arrays.asList(topics),Arrays.asList(addedTopics));
5351
} catch (Exception e) {
54-
Assert.fail();
52+
Assertions.fail();
5553
System.out.println(e.getMessage());
5654
}
5755
}
5856

5957

6058
@Test
6159
@Order(3)
62-
void testgetSchemaForTheTopic(){
60+
public void testgetSchemaForTheTopic(){
6361
try {
6462
NCDSClient ncdsClient = new NCDSClient(null, null);
6563
String topic = "GIDS";
6664
String schemaFromSDK = ncdsClient.getSchemaForTheTopic(topic);
6765
String schemaFile = "testGIDS.avsc";
6866
String schemaFromFile = ncdsTestUtil.getSchemaForTopic(schemaFile);
6967

70-
Assert.assertEquals(schemaFromSDK,schemaFromFile);
68+
assertEquals(schemaFromSDK,schemaFromFile);
7169
} catch (Exception e) {
72-
Assert.fail();
70+
Assertions.fail();
7371
System.out.println(e.getMessage());
7472
}
7573
}
7674

7775
@Test
7876
@Order(4)
79-
void testInsertion(){
77+
public void testInsertion(){
8078
ArrayList<GenericRecord> mockRecords = ncdsTestUtil.getMockMessages();
8179
ArrayList<GenericRecord> mockRecordsFromKafka = new ArrayList<>();
8280

@@ -90,9 +88,9 @@ void testInsertion(){
9088
mockRecordsFromKafka.add(iteratorMocker.next().value());
9189
}
9290
//mockRecordsFromKafka.remove(0);
93-
Assert.assertEquals(mockRecords,mockRecordsFromKafka);
91+
assertEquals(mockRecords,mockRecordsFromKafka);
9492
} catch (Exception e) {
95-
Assert.fail();
93+
Assertions.fail();
9694
e.printStackTrace();
9795
}
9896
}

0 commit comments

Comments
 (0)