@@ -32,41 +32,37 @@ public class NasdaqKafkaAvroConsumer {
3232 private KafkaConsumer kafkaConsumer ;
3333 private String clientID ;
3434
35- private Properties securityProps ;
36- private Properties kafkaProps ;
35+ private Properties properties = new Properties ();
3736 private ReadSchemaTopic readSchemaTopic = new ReadSchemaTopic ();
3837
3938 public NasdaqKafkaAvroConsumer (Properties securityCfg ,Properties kafkaCfg ) throws Exception {
4039 try {
41- if (kafkaCfg == null )
40+ if (securityCfg == null ) {
41+ properties .setProperty (AuthenticationConfigLoader .OAUTH_CLIENT_ID , "unit-test" ); // Just for the unit tests.
42+ }
43+ else {
44+ properties .putAll (securityCfg );
45+ }
46+ if (kafkaCfg == null ) {
4247 if (IsItJunit .isJUnitTest ()) {
4348 Properties junitKafkaCfg = KafkaConfigLoader .loadConfig ();
44- kafkaProps = junitKafkaCfg ;
49+ properties . putAll ( junitKafkaCfg ) ;
4550 }
4651 else {
4752 throw new Exception ("Kafka Configuration not Defined " );
4853 }
49-
50- else {
51- kafkaProps = kafkaCfg ;
52- KafkaConfigLoader .validateAndAddSpecificProperties (kafkaProps );
53- }
54-
55- if (securityCfg == null ) {
56- securityProps = new Properties ();
57- securityProps .setProperty (AuthenticationConfigLoader .OAUTH_CLIENT_ID , "unit-test" ); // Just for the unit tests.
5854 }
5955 else {
60- securityProps = securityCfg ;
61-
56+ properties . putAll ( kafkaCfg ) ;
57+ KafkaConfigLoader . validateAndAddSpecificProperties ( properties );
6258 }
6359 }
6460 catch (Exception e ) {
6561 throw (e );
6662 }
67- readSchemaTopic .setSecurityProps (securityProps );
68- readSchemaTopic .setKafkaProps (kafkaProps );
69- this .clientID = getClientID (securityProps );
63+ readSchemaTopic .setSecurityProps (properties );
64+ readSchemaTopic .setKafkaProps (properties );
65+ this .clientID = getClientID (properties );
7066
7167 }
7268
@@ -86,7 +82,7 @@ public KafkaConsumer getKafkaConsumer(String streamName) throws Exception {
8682 kafkaConsumer = getConsumer (kafkaSchema , streamName );
8783 TopicPartition topicPartition = new TopicPartition (streamName + ".stream" ,0 );
8884 kafkaConsumer .assign (Collections .singletonList (topicPartition ));
89- if (kafkaProps .get (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ).equals (OffsetResetStrategy .EARLIEST .toString ().toLowerCase ())) {
85+ if (properties .get (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ).equals (OffsetResetStrategy .EARLIEST .toString ().toLowerCase ())) {
9086 return seekToMidNight (topicPartition );
9187 }
9288 }
@@ -144,21 +140,20 @@ public KafkaConsumer getKafkaConsumer(String streamName, long timestamp) throws
144140
145141 public KafkaAvroConsumer getConsumer (Schema avroSchema , String streamName ) throws Exception {
146142 try {
147- if (!IsItJunit .isJUnitTest ()) {
148- ConfigProperties .resolveAndExportToSystemProperties (securityProps );
149- }
143+ // if(!IsItJunit.isJUnitTest()) {
144+ // ConfigProperties.resolveAndExportToSystemProperties(securityProps);
145+ // }
150146 //Properties kafkaProps = KafkaConfigLoader.loadConfig();
151147
152- kafkaProps .put ("key.deserializer" , StringDeserializer .class .getName ());
153- kafkaProps .put ("value.deserializer" , AvroDeserializer .class .getName ());
154- if (!kafkaProps .containsKey (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG )) {
155- kafkaProps .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , OffsetResetStrategy .EARLIEST .toString ().toLowerCase ());
148+ properties .put ("key.deserializer" , StringDeserializer .class .getName ());
149+ properties .put ("value.deserializer" , AvroDeserializer .class .getName ());
150+ if (!properties .containsKey (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG )) {
151+ properties .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , OffsetResetStrategy .EARLIEST .toString ().toLowerCase ());
156152 }
157- if (!kafkaProps .containsKey (ConsumerConfig .GROUP_ID_CONFIG )) {
158- kafkaProps .put (ConsumerConfig .GROUP_ID_CONFIG , this .clientID + "_" + streamName + "_" + getDate ());
153+ if (!properties .containsKey (ConsumerConfig .GROUP_ID_CONFIG )) {
154+ properties .put (ConsumerConfig .GROUP_ID_CONFIG , this .clientID ); // + "_" + streamName + "_" + getDate());
159155 }
160- ConfigProperties .resolve (kafkaProps );
161- return new KafkaAvroConsumer (kafkaProps , avroSchema );
156+ return new KafkaAvroConsumer (properties , avroSchema );
162157 }
163158 catch (Exception e ) {
164159 throw e ;
@@ -211,7 +206,7 @@ public KafkaConsumer getNewsConsumer(String topic) throws Exception {
211206 kafkaConsumer = getConsumer (newsSchema , topic );
212207 TopicPartition topicPartition = new TopicPartition (topic + ".stream" ,0 );
213208 kafkaConsumer .assign (Collections .singletonList (topicPartition ));
214- if (kafkaProps .get (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ).equals (OffsetResetStrategy .EARLIEST .toString ().toLowerCase ())) {
209+ if (properties .get (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ).equals (OffsetResetStrategy .EARLIEST .toString ().toLowerCase ())) {
215210 return seekToMidNight (topicPartition );
216211 }
217212 return kafkaConsumer ;
0 commit comments