Skip to content

Conversation

@Tiihott
Copy link

@Tiihott Tiihott commented Aug 29, 2024

Contains:

  • Refactoring of Offset interface to KafkaRecord interface, with ability to print itself in SyslogRecord format.
  • Implemented a separate KafkaRecordConverter class for converting the kafka records to SyslogRecord format.
  • Refactoring of DataBaseOutput class to BatchDistribution (interface+class), which now allocates the consumed batch records to PartitionFile objects based on the record's topic partition.
  • Implemented PartitionFile interface and class for storing the consumed batch records to files that are then stored to HDFS.
  • Refactoring of WriteableQueue and Config classes to be immutable.
  • Implemented consumer group rebalance handling using IngestionRebalanceListener class.
  • Refactored tests and added new tests.
  • Refactored maximum file size checks to be done after a record is added to the file, instead of checking the record size and file size before adding the record to the file.

Also contains after changes:

  • Timeout handling if records are not coming in at a sufficient pace.
  • Config refactoring.

Tiihott added 26 commits August 21, 2024 10:31
…set interface to KafkaRecord. Renamed RecordOffset to KafkaRecordImpl. Renamed NullOffset to NullKafkaRecord. Implemented KafkaRecordConverter for converting kafka records to SyslogRecords. Refactored WriteableQueue to be immutable.
…Renamed DatabaseOutput.java to BatchDistribution.java. Removed unused record() and isNull() methods from KafkaRecord interface and replaced record() usage in tests.
…o allow appending via SyslogAvroWriter. Improved exception handling in PartitionFile. Refactoring ProcessingTest.java
…dded preliminary version of method for writing the file managed by PartitionFile to HDFS while skipping file size checks.
…erface and renamed original PartitionFile.java to PartitionFileImpl.java.
…interface and PartitionRecordsImpl class which generates SyslogReccords list from KafkaRecords.
… to use secondary constructor instead. Added Config tests.
…d Config.java by replacing missed setter with secondary constructor. Beginning refactoring tests to mirror the refactoring changes.
…rface, renamed BatchDistribution class to BatchDistributionImpl. Implemented IngestionRebalanceListener for handling kafka consumer group rebalance.
…rebalance() method to PartitionFile interface.
…e it to Ingestion0FilesLowSizeTest.java file.
…itialization in PartitionFileImpl.java and refactored ProcessingFailureTest.java which was affected by the fix. Cleaned comments.
@Tiihott Tiihott requested a review from 51-code August 29, 2024 09:57
@Tiihott Tiihott self-assigned this Aug 29, 2024
Copy link

@51-code 51-code left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are some general notes:
Multiple objects have long constructors with code in them, maybe that could be refactored?

Noticed that the useMockKafkaConsumer is a bit odd. Multiple objects have an if statement for checking this, could the objects instead implement an interface and provide a MockKafka version of the object?

Check which objects could be made final (public final class), probably most of them.

…mpl.java. Refactored ConsumerRebalanceListenerImpl to support topic partition offset tracking. Fixed listener registration for the consumer in ReadCoordinator.java. Refactored MockKafkaConsumerFactory to use subscribe method instead of assign.
@Tiihott Tiihott requested a review from kortemik September 30, 2024 10:16
@Tiihott
Copy link
Author

Tiihott commented Oct 1, 2024

Pair session result: Minor naming change to improve naming conventions.

@q22u q22u added the review label Oct 2, 2024
if (LOGGER.isDebugEnabled()) {
LOGGER
.debug(
"Fuura searching your batch for <[{}]> with records <{}> and took <{}> milliseconds. <{}> EPS. ",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps we should drop the joke about Fuura and have a proper message now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Solved in commit 7a21cda

this.topicPartition = topicPartition;
this.batchOffsets = new ArrayList<>();
this.partitionRecords = new PartitionRecordsImpl(config);
try (SyslogAvroWriter syslogAvroWriter = new SyslogAvroWriter(syslogFile)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please elaborate this? try-with-resources to create a new syslogAvroWriter and then a close for it so it gets a double close. other thing is that no code in constructor

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some further testing and rechecking the logic how the syslogFile is handled, I deemed that initializing the avro-file in the constructor is redundant as the commitRecords() method will initialize the file if it doesn't exist yet.
Solved in commit 0a31fdd

…va. Refactored tests to check if initialized avro-file exists or not as expected.
…nfiguration files to prevent unintentional triggering of the consumer timeout.
@Tiihott Tiihott requested a review from kortemik November 7, 2024 08:32
import java.util.Properties;

// This class will only hold the common configuration parameters.
public final class ConfigurationImpl implements Configuration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use cnf_01 for configuration

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented cnf_01 usage on commit 37d2a70


public interface ConfigurationValidation {

public abstract void validate(Properties properties);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validators are not necessary, use rather configuration in the methods like

public int timeout() {
String timeoutString = map.get("timeout");
if (timeout == null) {
throw new ConfigurationException("timeout not configured");
}

int timeout;
try {
timeout = Integer.valueOf()
}
catch (FormatException e) {
LOGGER.error("not a number");
throw e
}

if (timeout <= 0) {
 ...
}

return timeout
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented refactored configurations to the main code on commit 37d2a70

if (!partitionFileMap.containsKey(recordOffset.get("partition").getAsString())) {
try {
partitionFileMap
.put(recordOffset.get("partition").getAsString(), new PartitionFileImpl(config, recordOffset));
Copy link
Member

@kortemik kortemik Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use immutable partitionFileFactory or so, so the object created is no longer configurable but gets always immutable configuration and only the recordOffset provided to the partitionFileFactory.partitionFor(recordOffset). the partitionFileFactory must be either initialized in secondary constructor or be passed from the object creating this one directly to the constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply similar pattern to the other at the moment configurable objects too, because objects must not be configurable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented PartitionFileFactory in commit 12bc003
Added improvements for it in commit 062ddcc and b6fe5dc

@Tiihott Tiihott requested a review from kortemik November 19, 2024 09:17
@Tiihott
Copy link
Author

Tiihott commented May 27, 2025

Infrastructure side changes are ongoing for Kafka, which will require changes to the project once they are complete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants