, DatabusMessage> {
+public final class MessagePayloadAdapter {
/**
* The message deserializer.
*/
- private final Serializer
messageSerializer;
-
- /**
- * The headers map.
- */
- private final Headers headers;
+ private final Serializer
userSerializer;
/**
* Constructor
*
- * @param messageSerializer A {@link Serializer} instance
+ * @param userSerializer A {@link Serializer} instance
* or creating a {@link DatabusMessage}.
- * @param headers Headers map.
*/
- public MessagePayloadAdapter(final Serializer
messageSerializer,
- final Headers headers) {
+ public MessagePayloadAdapter(final Serializer
userSerializer) {
- this.messageSerializer = messageSerializer;
- this.headers = headers;
+ this.userSerializer = userSerializer;
}
/**
@@ -48,10 +40,10 @@ public MessagePayloadAdapter(final Serializer
messageSerializer,
*
* @param messagePayload a {@link MessagePayload} instance to be adapted.
* @return a {@link DatabusMessage} instance.
+ * @param headers headers
*/
- @Override
- public DatabusMessage adapt(final MessagePayload
messagePayload) {
- final byte[] payload = messageSerializer.serialize(messagePayload.getPayload());
+ public DatabusMessage adapt(final MessagePayload
messagePayload, final Headers headers) {
+ final byte[] payload = userSerializer.serialize(messagePayload.getPayload());
return new DatabusMessage(headers, payload);
}
diff --git a/src/main/java/com/opendxl/databus/common/internal/util/HeaderInternalField.java b/src/main/java/com/opendxl/databus/common/internal/util/HeaderInternalField.java
index 84bb9fd..4169b12 100644
--- a/src/main/java/com/opendxl/databus/common/internal/util/HeaderInternalField.java
+++ b/src/main/java/com/opendxl/databus/common/internal/util/HeaderInternalField.java
@@ -10,6 +10,7 @@
*/
public final class HeaderInternalField {
+
private HeaderInternalField() {
}
@@ -27,4 +28,11 @@ private HeaderInternalField() {
* The topic name key name.
*/
public static final String TOPIC_NAME_KEY = INTERNAL_HEADER_IDENTIFIER + "TN" + INTERNAL_HEADER_IDENTIFIER;
+
+ public static final String TIER_STORAGE_BUCKET_NAME_KEY = INTERNAL_HEADER_IDENTIFIER + "BN"
+ + INTERNAL_HEADER_IDENTIFIER;;
+
+ public static final String TIER_STORAGE_OBJECT_NAME_KEY = INTERNAL_HEADER_IDENTIFIER + "OB"
+ + INTERNAL_HEADER_IDENTIFIER;;
+
}
diff --git a/src/main/java/com/opendxl/databus/consumer/DatabusConsumer.java b/src/main/java/com/opendxl/databus/consumer/DatabusConsumer.java
index b2eb98a..b959753 100644
--- a/src/main/java/com/opendxl/databus/consumer/DatabusConsumer.java
+++ b/src/main/java/com/opendxl/databus/consumer/DatabusConsumer.java
@@ -5,6 +5,7 @@
package com.opendxl.databus.consumer;
import com.opendxl.databus.credential.Credential;
+import com.opendxl.databus.entities.TierStorage;
import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.producer.DatabusProducer;
import com.opendxl.databus.serialization.Deserializer;
@@ -71,9 +72,26 @@ public class DatabusConsumer
extends Consumer
{
* @throws DatabusClientRuntimeException if a DatabusConsumer getInstance was not able to be created
*/
public DatabusConsumer(final Map configs, final Deserializer messageDeserializer) {
- this(configs, messageDeserializer, null);
+ this(configs, messageDeserializer, null, null);
}
+ /**
+ * A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
+ * are documented here. Values can be
+ * either strings or objects of the appropriate type (for example a numeric configuration would accept either the
+ * string "42" or the integer 42).
+ *
+ * Valid configuration strings are documented at {@link org.apache.kafka.clients.consumer.ConsumerConfig}
+ *
+ * @param configs The consumer configs
+ * @param messageDeserializer a {@link Deserializer} getInstance implementd by SDK's user
+ * @throws DatabusClientRuntimeException if a DatabusConsumer getInstance was not able to be created
+ * @param tierStorage Tier Storage
+ */
+ public DatabusConsumer(final Map configs, final Deserializer messageDeserializer,
+ final TierStorage tierStorage) {
+ this(configs, messageDeserializer, null, tierStorage);
+ }
/**
* A consumer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
* are documented here. Values can be
@@ -85,15 +103,16 @@ public DatabusConsumer(final Map configs, final Deserializer
* @param configs The consumer configs
* @param messageDeserializer a {@link Deserializer} getInstance implementd by SDK's user
* @param credential identity to authenticate/authorization
+ * @param tierStorage Tier Storage
*
* @throws DatabusClientRuntimeException if a DatabusConsumer getInstance was not able to be created
*/
public DatabusConsumer(final Map configs, final Deserializer messageDeserializer,
- final Credential credential) {
+ final Credential credential, final TierStorage tierStorage) {
try {
Map configuration = configureCredential(configs, credential);
configuration = configureClientId(configuration);
- setFieldMembers(messageDeserializer, configuration);
+ setFieldMembers(messageDeserializer, configuration, tierStorage);
setConsumer(new KafkaConsumer(configuration, getKeyDeserializer(), getValueDeserializer()));
} catch (DatabusClientRuntimeException e) {
throw e;
@@ -117,7 +136,12 @@ public DatabusConsumer(final Map configs, final Deserializer
* @throws DatabusClientRuntimeException if a DatabusConsumer getInstance was not able to be created
*/
public DatabusConsumer(final Properties properties, final Deserializer
messageDeserializer) {
- this(properties, messageDeserializer, null);
+ this(properties, messageDeserializer, null, null);
+ }
+
+ public DatabusConsumer(final Properties properties, final Deserializer
messageDeserializer,
+ final TierStorage tierStorage) {
+ this(properties, messageDeserializer, null, tierStorage);
}
/**
@@ -131,15 +155,17 @@ public DatabusConsumer(final Properties properties, final Deserializer
messag
* @param properties The consumer configuration properties
* @param messageDeserializer a {@link Deserializer} getInstance implementd by SDK's user
* @param credential identity to authenticate/authorization
+ * @param tierStorage Tier Storage
*
* @throws DatabusClientRuntimeException if a DatabusConsumer getInstance was not able to be created
*/
public DatabusConsumer(final Properties properties, final Deserializer
messageDeserializer,
- final Credential credential) {
+ final Credential credential, final TierStorage tierStorage) {
try {
Map configuration = configureCredential((Map) properties, credential);
configuration = configureClientId(configuration);
- setFieldMembers(messageDeserializer, configuration);
+ configuration.put(ConsumerConfiguration.ISOLATION_LEVEL_CONFIG, "read_committed");
+ setFieldMembers(messageDeserializer, configuration, tierStorage);
setConsumer(new KafkaConsumer(configuration, getKeyDeserializer(), getValueDeserializer()));
} catch (DatabusClientRuntimeException e) {
throw e;
@@ -156,14 +182,16 @@ public DatabusConsumer(final Properties properties, final Deserializer messag
* @param configuration The consumer configuration map.
* @param messageDeserializer a {@link Deserializer} getInstance implemented by SDK's user.
*/
- private void setFieldMembers(final Deserializer
messageDeserializer, final Map configuration) {
+ private void setFieldMembers(final Deserializer messageDeserializer,
+ final Map configuration,
+ final TierStorage tierStorage) {
if (messageDeserializer == null) {
throw new DatabusClientRuntimeException(DATABUS_CONSUMER_INSTANCE_CANNOT_BE_CREATED_MESSAGE
+ "Message Deserializer cannot be null" , DatabusConsumer.class);
}
setKeyDeserializer(new DatabusKeyDeserializer());
- setValueDeserializer(new MessageDeserializer());
+ setValueDeserializer(new MessageDeserializer(tierStorage));
setConsumerRecordsAdapter(new ConsumerRecordsAdapter(messageDeserializer));
setClientId((String) configuration.get(ConsumerConfiguration.CLIENT_ID_CONFIG));
}
diff --git a/src/main/java/com/opendxl/databus/consumer/DatabusPushConsumer.java b/src/main/java/com/opendxl/databus/consumer/DatabusPushConsumer.java
index 1dd2af1..4296ea9 100644
--- a/src/main/java/com/opendxl/databus/consumer/DatabusPushConsumer.java
+++ b/src/main/java/com/opendxl/databus/consumer/DatabusPushConsumer.java
@@ -6,6 +6,7 @@
import com.opendxl.databus.common.TopicPartition;
import com.opendxl.databus.credential.Credential;
+import com.opendxl.databus.entities.TierStorage;
import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.serialization.Deserializer;
import org.apache.kafka.common.errors.WakeupException;
@@ -14,8 +15,11 @@
import java.io.Closeable;
import java.time.Duration;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CancellationException;
@@ -86,6 +90,11 @@ public final class DatabusPushConsumer
extends DatabusConsumer
implements
*/
private CountDownLatch countDownLatch = new CountDownLatch(1);
+ /**
+ * An boolean to signal if pause operation has to be refreshed
+ */
+ private AtomicBoolean refreshPause = new AtomicBoolean(false);
+
/**
* Constructor
*
@@ -100,6 +109,22 @@ public DatabusPushConsumer(final Map configs,
this.consumerListener = consumerListener;
}
+ /**
+ * Constructor
+ *
+ * @param configs consumer configuration
+ * @param messageDeserializer consumer message deserializer
+ * @param consumerListener consumer listener
+ * @param tierStorage Tier storage
+ */
+ public DatabusPushConsumer(final Map configs,
+ final Deserializer messageDeserializer,
+ final DatabusPushConsumerListener consumerListener,
+ final TierStorage tierStorage) {
+ super(configs, messageDeserializer, null, tierStorage);
+ this.consumerListener = consumerListener;
+ }
+
/**
* @param configs consumer configuration
* @param messageDeserializer consumer message deserializer
@@ -110,7 +135,23 @@ public DatabusPushConsumer(final Map configs,
final Deserializer messageDeserializer,
final DatabusPushConsumerListener consumerListener,
final Credential credential) {
- super(configs, messageDeserializer, credential);
+ super(configs, messageDeserializer, credential, null);
+ this.consumerListener = consumerListener;
+ }
+
+ /**
+ * @param configs consumer configuration
+ * @param messageDeserializer consumer message deserializer
+ * @param consumerListener consumer listener
+ * @param credential credential to get access to Databus in case security is enabled
+ * @param tierStorage Tier storage
+ */
+ public DatabusPushConsumer(final Map configs,
+ final Deserializer messageDeserializer,
+ final DatabusPushConsumerListener consumerListener,
+ final Credential credential,
+ final TierStorage tierStorage) {
+ super(configs, messageDeserializer, credential, tierStorage);
this.consumerListener = consumerListener;
}
@@ -127,6 +168,21 @@ public DatabusPushConsumer(final Properties properties,
}
+ /**
+ * @param properties consumer configuration
+ * @param messageDeserializer consumer message deserializer
+ * @param consumerListener consumer listener
+ * @param tierStorage Tier storage
+ */
+ public DatabusPushConsumer(final Properties properties,
+ final Deserializer
messageDeserializer,
+ final DatabusPushConsumerListener consumerListener,
+ final TierStorage tierStorage) {
+ super(properties, messageDeserializer, null, tierStorage);
+ this.consumerListener = consumerListener;
+
+ }
+
/**
* @param properties consumer configuration
* @param messageDeserializer consumer message deserializer
@@ -137,10 +193,59 @@ public DatabusPushConsumer(final Properties properties,
final Deserializer
messageDeserializer,
final DatabusPushConsumerListener consumerListener,
final Credential credential) {
- super(properties, messageDeserializer, credential);
+ super(properties, messageDeserializer, credential, null);
this.consumerListener = consumerListener;
}
+ /**
+ * @param properties consumer configuration
+ * @param messageDeserializer consumer message deserializer
+ * @param consumerListener consumer listener
+ * @param credential credential to get access to Databus in case security is enabled
+ * @param tierStorage Tier storage
+ */
+ public DatabusPushConsumer(final Properties properties,
+ final Deserializer
messageDeserializer,
+ final DatabusPushConsumerListener consumerListener,
+ final Credential credential,
+ final TierStorage tierStorage) {
+ super(properties, messageDeserializer, credential, tierStorage);
+ this.consumerListener = consumerListener;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void subscribe(final Map> groupTopics) {
+ super.subscribe(groupTopics, new PushConsumerRebalanceListener(null));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void subscribe(final Map> groupTopics,
+ final ConsumerRebalanceListener consumerRebalanceListener) {
+ super.subscribe(groupTopics, new PushConsumerRebalanceListener(consumerRebalanceListener));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void subscribe(final List topics,
+ final ConsumerRebalanceListener consumerRebalanceListener) {
+ super.subscribe(topics, new PushConsumerRebalanceListener(consumerRebalanceListener));
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void subscribe(final List topics) {
+ super.subscribe(topics, new PushConsumerRebalanceListener(null));
+ }
/**
* {@inheritDoc}
@@ -305,6 +410,13 @@ private void push(final DatabusPushConsumerFuture databusPushConsumerFuture,
LOG.info("Consumer " + super.getClientId() + " is resumed");
} catch (TimeoutException e) {
+ // refreshPause == true means a rebalance was performed and partitions might be reassigned.
+ // Then, in order to avoid reading messages and just sends the heartbeat when poll(),
+ // a pause() method has to be invoked with the updated partitions assignment.
+ if (refreshPause.get()) {
+ refreshPause.set(false);
+ pause(assignment());
+ }
// TimeoutException means that listener is still working.
// So, a poll is performed to heartbeat Databus
super.poll(Duration.ofMillis(0));
@@ -439,6 +551,28 @@ public void close() {
}
}
+ private class PushConsumerRebalanceListener implements ConsumerRebalanceListener {
+
+ private final ConsumerRebalanceListener customerListener;
+
+ PushConsumerRebalanceListener(final ConsumerRebalanceListener customerListener) {
+ this.customerListener = Optional.ofNullable(customerListener).orElse(new NoOpConsumerRebalanceListener());
+
+ }
+
+ @Override
+ public void onPartitionsRevoked(final Collection partitions) {
+ customerListener.onPartitionsRevoked(partitions);
+
+ }
+
+ @Override
+ public void onPartitionsAssigned(final Collection partitions) {
+ refreshPause.set(true);
+ customerListener.onPartitionsAssigned(partitions);
+
+ }
+ }
}
diff --git a/src/main/java/com/opendxl/databus/entities/RoutingData.java b/src/main/java/com/opendxl/databus/entities/RoutingData.java
index b08d720..7854cf1 100644
--- a/src/main/java/com/opendxl/databus/entities/RoutingData.java
+++ b/src/main/java/com/opendxl/databus/entities/RoutingData.java
@@ -16,7 +16,8 @@
* Represent a address where a message must be sent.
* It is used by {@link ProducerRecord}
* to know what the destination is.
- * It contains a mandatory topic name as well as optionals sharding key and tenant group and partitions.
+ * It contains a mandatory topic name as well as optionals sharding key, tenant group,
+ * partitions and tier storage metadata.
*
*
* See how to use in {@link DatabusProducer} example
@@ -37,27 +38,35 @@ public class RoutingData {
/**
* The topic name
*/
- private String topic = null;
+ private String topic;
/**
* The sharding key value
*/
- private String shardingKey = null;
+ private String shardingKey;
/**
* The tenant group
*/
private String tenantGroup = DEFAULT_TENANT_GROUP;
+ /**
+ * Tier Storage Metadata
+ */
+ private TierStorageMetadata tierStorageMetadata;
+
/**
* RoutingData constructor with only topic name parameter
*
* @param topic The topic name where the message must be sent
*/
public RoutingData(final String topic) {
- this(topic, null, null, null);
+ this(topic, null, null, null, null);
}
+ public RoutingData(final String topic, final TierStorageMetadata tierStorageMetadata) {
+ this(topic, null, null, null, tierStorageMetadata);
+ }
/**
* RoutingData constructor with topic name sharding key and tenant group parameters
*
@@ -66,9 +75,15 @@ public RoutingData(final String topic) {
* @param tenantGroup The name that groups topics
*/
public RoutingData(final String topic, final String shardingKey, final String tenantGroup) {
- this(topic, shardingKey, tenantGroup, null);
+ this(topic, shardingKey, tenantGroup, null, null);
}
+ public RoutingData(final String topic,
+ final String shardingKey,
+ final String tenantGroup,
+ final TierStorageMetadata tierStorageMetadata) {
+ this(topic, shardingKey, tenantGroup, null, tierStorageMetadata);
+ }
/**
* RoutingData constructor with all parameters
*
@@ -76,10 +91,11 @@ public RoutingData(final String topic, final String shardingKey, final String te
* @param shardingKey The Databus sharding key
* @param tenantGroup The name that groups topics
* @param partition The partition number
+ * @param tierStorageMetadata Tier Storage Metadata
+ *
*/
public RoutingData(final String topic, final String shardingKey, final String tenantGroup,
- final Integer partition) {
-
+ final Integer partition, final TierStorageMetadata tierStorageMetadata) {
if (StringUtils.isBlank(topic)) {
throw new DatabusClientRuntimeException("topic cannot be empty or null", RoutingData.class);
}
@@ -87,6 +103,7 @@ public RoutingData(final String topic, final String shardingKey, final String te
this.tenantGroup = Optional.ofNullable(tenantGroup).orElse("").trim();
this.shardingKey = shardingKey;
this.partition = partition;
+ this.tierStorageMetadata = tierStorageMetadata;
}
/**
@@ -124,4 +141,9 @@ public String getTenantGroup() {
public Integer getPartition() {
return partition;
}
+
+ public TierStorageMetadata getTierStorageMetadata() {
+ return tierStorageMetadata;
+ }
+
}
diff --git a/src/main/java/com/opendxl/databus/entities/S3TierStorage.java b/src/main/java/com/opendxl/databus/entities/S3TierStorage.java
new file mode 100644
index 0000000..a04f4ac
--- /dev/null
+++ b/src/main/java/com/opendxl/databus/entities/S3TierStorage.java
@@ -0,0 +1,172 @@
+package com.opendxl.databus.entities;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.internal.Mimetypes;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.opendxl.databus.exception.DatabusClientRuntimeException;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+/**
+ * It is a built-in AWS S3 Tier Storage.
+ *
+ * It implements mechanisms to upload and download AWS S3 objects
+ */
+public class S3TierStorage implements TierStorage {
+
+ /**
+ * The logger object.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(S3TierStorage.class);
+
+ /**
+ * S3 client
+ */
+ private AmazonS3 s3Client;
+
+ /**
+ * Constructor used to create a role-based authenticated tier storage instance.
+ *
+ * @param awsRegion AWS region
+ * @param config AWS client configuration
+ * @throws DatabusClientRuntimeException exception if the underlying AWS S3 client cannot be created
+ */
+ public S3TierStorage(final String awsRegion,
+ final ClientConfiguration config) {
+
+ AmazonS3ClientBuilder s3Builder = AmazonS3ClientBuilder.standard();
+ s3Builder.withCredentials(new InstanceProfileCredentialsProvider(false));
+ s3Builder.withRegion(awsRegion);
+ if (config != null) {
+ s3Builder.withClientConfiguration(config);
+ }
+ try {
+ this.s3Client = s3Builder.build();
+ } catch (Exception e) {
+ final String errMsg = "Error creating a S3 Tier Storage. Region: " + awsRegion + " " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, this.getClass());
+ }
+ }
+
+
+ /**
+ * Constructor used to create a tier storage instance with AWS access and secret key
+ *
+ * @param awsRegion AWS region
+ * @param config AWS client configuration
+ * @param awsAccessKey AWS access key
+ * @param awsSecretKey AWS secret key
+ * @throws DatabusClientRuntimeException exception if the underlying AWS S3 client cannot be created
+ */
+ public S3TierStorage(final String awsRegion,
+ final ClientConfiguration config,
+ final String awsAccessKey,
+ final String awsSecretKey) {
+
+
+ AmazonS3ClientBuilder s3Builder = AmazonS3ClientBuilder.standard();
+ s3Builder.withCredentials(
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(awsAccessKey, awsSecretKey)));
+ s3Builder.withRegion(awsRegion);
+ if (config != null) {
+ s3Builder.withClientConfiguration(config);
+ }
+
+ try {
+ this.s3Client = s3Builder.build();
+ } catch (Exception e) {
+ final String errMsg = "Error creating a S3 Tier Storage. Region: " + awsRegion + " " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, this.getClass());
+ }
+ }
+
+
+ /**
+ * Upload a object to AWS S3 bucket
+ *
+ * @param s3BucketName AWS S3 bucket
+ * @param s3KeyName AWS S3 object name
+ * @param payload AWS object content
+ * @throws DatabusClientRuntimeException exception if the underlying AWS S3 fails.
+ *
+ */
+ @Override
+ public void put(final String s3BucketName, final String s3KeyName, final byte[] payload) {
+
+ try {
+ if (!s3Client.doesBucketExistV2(s3BucketName)) {
+ s3Client.createBucket(s3BucketName);
+ }
+
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(payload.length);
+ metadata.setContentType(Mimetypes.MIMETYPE_HTML);
+ InputStream s3Object = new ByteArrayInputStream(payload);
+ s3Client.putObject(s3BucketName, s3KeyName, s3Object, metadata);
+
+ } catch (Exception e) {
+ final String errMsg = "Error uploading S3 object: Bucket: " + " Object: "
+ + s3KeyName + " " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, this.getClass());
+ }
+
+ }
+
+
+ /**
+ * Check if an AWS S3 object exists
+ *
+ * @param s3BucketName AWS S3 bucket
+ * @param s3KeyName AWS object name
+ * @return a boolean
+ * @throws DatabusClientRuntimeException exception if the underlying AWS S3 fails.
+ */
+ @Override
+ public boolean doesObjectExist(final String s3BucketName, final String s3KeyName) {
+ try {
+ return s3Client.doesObjectExist(s3BucketName, s3KeyName);
+ } catch (Exception e) {
+ final String errMsg = "Error trying to find a S3 object: Bucket: " + " Object: " + s3KeyName + " "
+ + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, this.getClass());
+ }
+ }
+
+
+ /**
+ * Download a AWS S3 object content
+ *
+ * @param s3BucketName AWS S3 bucket name
+ * @param s3KeyName AWS S3 object name
+ * @return the object content
+ * @throws DatabusClientRuntimeException exception if the underlying AWS S3 fails.
+ */
+ @Override
+ public byte[] get(final String s3BucketName, final String s3KeyName) {
+ try {
+ S3Object s3Object = s3Client.getObject(new GetObjectRequest(s3BucketName, s3KeyName));
+ return IOUtils.toByteArray(s3Object.getObjectContent());
+ } catch (Exception e) {
+ final String errMsg = "Error reading S3 object: Bucket: " + " Object: " + s3KeyName + " " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, this.getClass());
+ }
+ }
+
+}
diff --git a/src/main/java/com/opendxl/databus/entities/TierStorage.java b/src/main/java/com/opendxl/databus/entities/TierStorage.java
new file mode 100644
index 0000000..620ffb0
--- /dev/null
+++ b/src/main/java/com/opendxl/databus/entities/TierStorage.java
@@ -0,0 +1,8 @@
+package com.opendxl.databus.entities;
+
+public interface TierStorage {
+ void put(String bucketName, String objectName, byte[] payload);
+ byte[] get(String bucketName, String objectName);
+ boolean doesObjectExist(String bucketName, String objectName);
+}
+
diff --git a/src/main/java/com/opendxl/databus/entities/TierStorageMetadata.java b/src/main/java/com/opendxl/databus/entities/TierStorageMetadata.java
new file mode 100644
index 0000000..2b76455
--- /dev/null
+++ b/src/main/java/com/opendxl/databus/entities/TierStorageMetadata.java
@@ -0,0 +1,22 @@
+package com.opendxl.databus.entities;
+
+public class TierStorageMetadata {
+
+ private final String bucketName;
+ private final String objectName;
+
+ public TierStorageMetadata(final String bucketName, final String objectName) {
+ this.bucketName = bucketName.trim();
+ this.objectName = objectName.trim();
+ }
+
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ public String getObjectName() {
+ return objectName;
+ }
+
+
+}
diff --git a/src/main/java/com/opendxl/databus/producer/DatabusProducer.java b/src/main/java/com/opendxl/databus/producer/DatabusProducer.java
index 95cff21..7c0549b 100644
--- a/src/main/java/com/opendxl/databus/producer/DatabusProducer.java
+++ b/src/main/java/com/opendxl/databus/producer/DatabusProducer.java
@@ -128,7 +128,7 @@ public DatabusProducer(final Map configs, final Serializer me
setFieldMembers(messageSerializer);
this.setConfiguration(overrideConfig(configs));
this.configureCredential(getConfiguration(), credential);
- setProducer(new KafkaProducer(this.getConfiguration(), getKeySerializer(), getValueSerializer()));
+ setProducer(new KafkaProducer(this.getConfiguration(), getKeySerializer(), getKafkaValueSerializer()));
setClientId((String) configs.get(ProducerConfig.CLIENT_ID_CONFIG));
} catch (DatabusClientRuntimeException e) {
throw e;
@@ -176,7 +176,7 @@ public DatabusProducer(final Properties properties, final Serializer
messageS
Properties fixedProperties = overrideConfig(properties);
this.setConfiguration((Map) fixedProperties);
this.configureCredential(getConfiguration(), credential);
- setProducer(new KafkaProducer(this.getConfiguration(), getKeySerializer(), getValueSerializer()));
+ setProducer(new KafkaProducer(this.getConfiguration(), getKeySerializer(), getKafkaValueSerializer()));
setClientId((String) fixedProperties.get(ProducerConfig.CLIENT_ID_CONFIG));
} catch (DatabusClientRuntimeException e) {
throw e;
@@ -197,8 +197,8 @@ private void setFieldMembers(final Serializer
messageSerializer) {
+ "Message Serializer cannot be null" , DatabusProducer.class);
}
- setKeySerializer(new DatabusKeySerializer());
- setValueSerializer(new MessageSerializer());
+ setKafkaKeySerializer(new DatabusKeySerializer());
+ setKafkaValueSerializer(new MessageSerializer());
setDatabusProducerRecordAdapter(new DatabusProducerRecordAdapter
(messageSerializer));
}
diff --git a/src/main/java/com/opendxl/databus/producer/DatabusTierStorageProducer.java b/src/main/java/com/opendxl/databus/producer/DatabusTierStorageProducer.java
new file mode 100644
index 0000000..ca3cc97
--- /dev/null
+++ b/src/main/java/com/opendxl/databus/producer/DatabusTierStorageProducer.java
@@ -0,0 +1,427 @@
+/*---------------------------------------------------------------------------*
+ * Copyright (c) 2019 McAfee, LLC - All Rights Reserved. *
+ *---------------------------------------------------------------------------*/
+
+package com.opendxl.databus.producer;
+
+import com.opendxl.databus.common.RecordMetadata;
+import com.opendxl.databus.common.internal.adapter.DatabusProducerRecordAdapter;
+import com.opendxl.databus.common.internal.adapter.MessagePayloadAdapter;
+import com.opendxl.databus.credential.Credential;
+import com.opendxl.databus.entities.MessagePayload;
+import com.opendxl.databus.entities.TierStorage;
+import com.opendxl.databus.entities.TierStorageMetadata;
+import com.opendxl.databus.entities.internal.DatabusMessage;
+import com.opendxl.databus.exception.DatabusClientRuntimeException;
+import com.opendxl.databus.serialization.Serializer;
+import com.opendxl.databus.serialization.internal.MessageSerializer;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * It writes a Message to kafka and stores Payload and Header in a Tier Storage. The kafka message is used like
+ * offsets control and to point to payload which is stored in the Tier Storage.
+ *
+ * @param
Payload's type, tipically a byte[]
+ */
+public class DatabusTierStorageProducer
extends DatabusProducer
{
+
+ /**
+ * The logger
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(DatabusTierStorageProducer.class);
+
+ /**
+ * Used to save the message in a separated store
+ */
+ private TierStorage tierStorage;
+
+ /**
+ * Transform a user payload in a {@link DatabusMessage}
+ */
+ private MessagePayloadAdapter
messagePayloadAdapter;
+
+ /**
+ * Constructor
+ *
+ * @param configs Producer configuration
+ * @param userSerializer user serializer
+ * @param tierStorage tier storage
+ */
+ public DatabusTierStorageProducer(final Map configs, final Serializer userSerializer,
+ final TierStorage tierStorage) {
+ this(configs, userSerializer, null, tierStorage);
+ }
+
+
+ /**
+ * Constructor
+ *
+ * @param configs producer configuration
+ * @param userSerializer user serializer
+ * @param credential credentials
+ * @param tierStorage tier storage
+ */
+ public DatabusTierStorageProducer(final Map configs, final Serializer userSerializer,
+ final Credential credential, final TierStorage tierStorage) {
+ super(configs, userSerializer, credential);
+ if (tierStorage == null) {
+ throw new IllegalArgumentException("Tier Storage cannot be null");
+ }
+ validateConfiguration(configs);
+ this.tierStorage = tierStorage;
+ setFieldMembers(userSerializer);
+ initTransactions();
+ }
+
+
+ /**
+ * Constructor
+ *
+ * @param properties producer configuration
+ * @param userSerializer user serializer
+ * @param tierStorage tier storage
+ */
+ public DatabusTierStorageProducer(final Properties properties, final Serializer
userSerializer,
+ final TierStorage tierStorage) {
+ this(properties, userSerializer, null, tierStorage);
+ }
+
+
+ /**
+ * Constructor
+ *
+ * @param properties producer configuration
+ * @param userSerializer user serializer
+ * @param credential credential
+ * @param tierStorage tier storage
+ */
+ public DatabusTierStorageProducer(final Properties properties, final Serializer
userSerializer,
+ final Credential credential, final TierStorage tierStorage) {
+
+ super(properties, userSerializer, credential);
+ if (tierStorage == null) {
+ throw new IllegalArgumentException("Tier Storage cannot be null");
+ }
+ validateConfiguration(properties);
+ this.tierStorage = tierStorage;
+ setFieldMembers(userSerializer);
+ initTransactions();
+ }
+
+ private void setFieldMembers(Serializer
userSerializer) {
+ setKafkaValueSerializer(new MessageSerializer()); // The serializer used bu Kafka
+ this.messagePayloadAdapter = new MessagePayloadAdapter<>(userSerializer);
+ setDatabusProducerRecordAdapter(new DatabusProducerRecordAdapter<>(userSerializer));
+ }
+
+ private void validateConfiguration(final Map config) {
+ Properties properties = new Properties();
+ try {
+ properties.putAll(config);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Producer configuration is invalid ERROR:" + e.getMessage());
+ }
+ validateConfiguration(properties);
+ }
+
+ private void validateConfiguration(final Properties config) {
+ if (config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) == null) {
+ throw new IllegalArgumentException("Transaction Id cannot be null or empty");
+ }
+ final String transactionId = config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString();
+ if (transactionId == null || transactionId.trim().isEmpty()) {
+ throw new IllegalArgumentException("Transaction Id cannot be null or empty");
+ }
+ }
+
+ /**
+ * It writes a Message to kafka and stores Payload and Header in Tier Storage.
+ * The kafka message has headers information pointing to Tier Storage payload.
+ * Both operation are in the same tansaction. If something goes wrong, they will be consistently aborted
+ *
+ * @param producerRecord producer record
+ */
+ @Override
+ public void send(final ProducerRecord producerRecord) {
+ try {
+ validateTierStorageMetadata(producerRecord);
+
+ // Get the Tier Storage from RoutindData which was already created by the user
+ final TierStorageMetadata tierStorageMetadata =
+ producerRecord.getRoutingData().getTierStorageMetadata();
+
+ // Serialize the producerRecord payload to be stored with TieredStorage
+ // when callback being invoked by Kafka
+ final DatabusMessage databusMessage =
+ messagePayloadAdapter.adapt(producerRecord.payload(), producerRecord.getHeaders());
+ final byte[] databusMessageSerialized = getKafkaValueSerializer().serialize("", databusMessage);
+
+ // Remove the producerRecord payload to be written in kafka and keeps Headers.
+ final ProducerRecord
adaptedProducerRecord = new ProducerRecord<>(producerRecord.getRoutingData(),
+ producerRecord.getHeaders(),
+ new MessagePayload<>(null));
+
+ // Transform a Databus ProducerRecord in a Kafka Producer Record
+ org.apache.kafka.clients.producer.ProducerRecord targetProducerRecord =
+ getDatabusProducerRecordAdapter().adapt(adaptedProducerRecord);
+
+ try {
+ beginTransaction();
+ super.sendKafkaRecord(targetProducerRecord);
+ tierStorage.put(tierStorageMetadata.getBucketName(),
+ tierStorageMetadata.getObjectName(),
+ databusMessageSerialized);
+ commitTransaction();
+ LOG.info("Send Ok. Message was sent and payload was stored in Tier Storage");
+ } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+ super.flush();
+ super.close();
+ final String errMsg = "Send cannot be performed. Producer throws an irrecoverable exception "
+ + "during a transaction. Producer is closed effective immediately. ERROR:" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, this.getClass());
+
+ } catch (Exception e) {
+ abortTransaction();
+ final String errMsg = "Send cannot be performed. Producer throws an exception during a transaction. "
+ + "Producer continues active. Message should be sent again to retry. ERROR:" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
+ }
+ } catch (Exception e) {
+ final String errMsg = "send cannot be performed: ERROR:" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
+ }
+
+ }
+
+ protected void validateTierStorageMetadata(ProducerRecord producerRecord) {
+ if (producerRecord.getRoutingData().getTierStorageMetadata() == null
+ || producerRecord.getRoutingData().getTierStorageMetadata().getBucketName() == null
+ || producerRecord.getRoutingData().getTierStorageMetadata().getBucketName().isEmpty()
+ || producerRecord.getRoutingData().getTierStorageMetadata().getObjectName() == null
+ || producerRecord.getRoutingData().getTierStorageMetadata().getObjectName().isEmpty()
+ ) {
+ final String errMsg = "Send cannot be performed. Bucket metadatada is invalid";
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+ }
+
+ /**
+ * It writes a Message to kafka and stores Payload in Tier Storage.
+ * The kafka message has headers information pointing to Tier Storage payload. So that a Consumer can recover
+ * Both operation are in the same tansaction. If something goes wrong, they will be consistently aborted
+ *
+ * @param producerRecord The non-null record to send
+ * @param callback A user-supplied callback to execute when the record has been acknowledged by the server
+ * (null indicates no callback)
+ */
+ @Override
+ public void send(ProducerRecord
producerRecord, final Callback callback) {
+
+ validateTierStorageMetadata(producerRecord);
+
+ if (callback == null) {
+ final String errMsg = "Send cannot be performed. Producer Callback is invalid";
+ LOG.error(errMsg);
+ throw new IllegalArgumentException(errMsg);
+ }
+
+ try {
+
+ // Get the Tier Storage from RoutindData which was already created by the user
+ final TierStorageMetadata tierStorageMetadata =
+ producerRecord.getRoutingData().getTierStorageMetadata();
+
+ // Serialize the producerRecord payload to be stored with TieredStorage when callback being invoked by Kafka
+ final DatabusMessage databusMessage =
+ messagePayloadAdapter.adapt(producerRecord.payload(), producerRecord.getHeaders());
+ final byte[] kafkaValueSerialized = getKafkaValueSerializer().serialize("", databusMessage);
+
+ // Remove the producerRecord payload to be written in kafka and keeps Headers.
+ final ProducerRecord
adaptedProducerRecord = new ProducerRecord<>(producerRecord.getRoutingData(),
+ producerRecord.getHeaders(),
+ new MessagePayload<>(null));
+
+ // Transform a Databus ProducerRecord in a Kafka Producer Record
+ org.apache.kafka.clients.producer.ProducerRecord targetProducerRecord =
+ getDatabusProducerRecordAdapter().adapt(adaptedProducerRecord);
+
+ // Create the callback
+ CountDownLatch latch = new CountDownLatch(1);
+ final CallbackAdapterTierStorage callbackAdapterTierStorage;
+ callbackAdapterTierStorage = new CallbackAdapterTierStorage(callback,
+ kafkaValueSerialized,
+ latch,
+ tierStorageMetadata);
+
+ try {
+ beginTransaction();
+ super.sendKafkaRecord(targetProducerRecord, callbackAdapterTierStorage);
+ // wait for callback ends
+ final boolean callbackFinished = latch.await(10000, TimeUnit.MILLISECONDS);
+ if (callbackFinished) { // means the callback finished before timeout
+ if (callbackAdapterTierStorage.isMessageAndPayloadStored()) {
+ commitTransaction();
+ LOG.info("Send OK. Message was sent and payload was stored in Tier Storage");
+ } else { // means something was wrong in kafka or tier storage
+ abortTransaction(); // Logging is already performed in the Callback
+ throw new DatabusClientRuntimeException("Send cannot be performed. Record not produced. "
+ + "Something was wrong producing the message in Kafka or "
+ + " storing the payload in Tier Storage", this.getClass());
+ }
+ } else { // means that the callback has not finished in time
+ abortTransaction();
+ final String errMsg = "Send cannot be performed. Record not produced. "
+ + "Timeout: Too long time taken by Kafka or Tier Storage.";
+ LOG.error(errMsg);
+ throw new DatabusClientRuntimeException(errMsg, this.getClass());
+ }
+ } catch (InterruptedException e) {
+ abortTransaction();
+ final String errMsg = "Send cannot be performed. Producer was interrupted while "
+ + "waiting for a Callback response. "
+ + "Producer continues active. Message should be sent again to retry. ERROR:" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
+ } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
+ super.flush();
+ super.close();
+ final String errMsg = "Send cannot be performed. Producer throws an irrecoverable exception "
+ + "during a transaction. Producer is closed effective immediately. ERROR:" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, this.getClass());
+
+ } catch (Exception e) {
+ abortTransaction();
+ final String errMsg = "Producer throws an exception during a transaction. "
+ + "Producer continues active. Message should be sent again to retry. ERROR:" + e.getMessage();
+ LOG.error(errMsg);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
+ }
+
+ } catch (Exception e) {
+ if (e instanceof DatabusClientRuntimeException) {
+ throw e;
+ }
+ final String errMsg = "Send cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
+ }
+
+ }
+
+
+ /**
+ * Callback Adapter
+ *
+ * It forwards a kafka callback to databus callback
+ */
+ private class CallbackAdapterTierStorage implements org.apache.kafka.clients.producer.Callback {
+ /**
+ * Callback defined by the user when invoking send method
+ */
+ private final Callback userCallback;
+
+ /**
+ * the kafka value serializer
+ */
+ private final byte[] kafkaValueSerialized;
+
+ /**
+ * An object to signal when callback has finished
+ */
+ private CountDownLatch latch;
+
+ /**
+ * The Tier Storage in charged to store payload
+ */
+ private TierStorageMetadata tierStorageMetadata;
+
+ /**
+ * storage operation result
+ */
+ private AtomicBoolean isMessageAndPayloadStored = new AtomicBoolean(false);
+
+ /**
+ * @param userCallback user callback
+ * @param kafkaValueSerialized kafka serializer
+ * @param latch a object to signal when callback
+ */
+ CallbackAdapterTierStorage(final Callback userCallback,
+ final byte[] kafkaValueSerialized,
+ final CountDownLatch latch,
+ final TierStorageMetadata tierStorageMetadata) {
+ this.userCallback = userCallback;
+ this.kafkaValueSerialized = kafkaValueSerialized;
+ this.latch = latch;
+ this.tierStorageMetadata = tierStorageMetadata;
+ }
+
+ /**
+ * It is called as a send result. Then it is forwarded and adapted to databus callback
+ *
+ * @param recordMetadata Kafka RecordMetadata
+ * @param exception An exception thrown by Databus broker
+ */
+ @Override
+ public void onCompletion(final org.apache.kafka.clients.producer.RecordMetadata recordMetadata,
+ final Exception exception) {
+
+ if (exception != null) {
+ LOG.error("Send cannot be performed. The record was not produced. ERROR:"
+ + exception.getMessage(), exception);
+ response(recordMetadata, exception);
+ return;
+ }
+
+ try {
+
+ tierStorage.put(tierStorageMetadata.getBucketName(),
+ tierStorageMetadata.getObjectName(),
+ kafkaValueSerialized);
+ response(recordMetadata, exception);
+ } catch (DatabusClientRuntimeException databusException) {
+ LOG.error("Send cannot be performed. The record was not produced. ERROR:"
+ + databusException.getMessage(), databusException);
+ response(recordMetadata, databusException);
+ }
+ }
+
+ /**
+ * Send callback response
+ *
+ * @param kafkaRecordMetadata recordMetadata
+ * @param exception exception
+ */
+ private void response(final org.apache.kafka.clients.producer.RecordMetadata kafkaRecordMetadata,
+ final Exception exception) {
+ isMessageAndPayloadStored.set(exception == null);
+ latch.countDown();
+
+ RecordMetadata databusRecordMetadata = null;
+ if (kafkaRecordMetadata != null) {
+ databusRecordMetadata = new RecordMetadata(kafkaRecordMetadata);
+ }
+ userCallback.onCompletion(databusRecordMetadata, exception);
+ }
+
+ protected boolean isMessageAndPayloadStored() {
+ return isMessageAndPayloadStored.get();
+ }
+ }
+
+
+}
diff --git a/src/main/java/com/opendxl/databus/producer/Producer.java b/src/main/java/com/opendxl/databus/producer/Producer.java
index 8e7288e..81b1207 100644
--- a/src/main/java/com/opendxl/databus/producer/Producer.java
+++ b/src/main/java/com/opendxl/databus/producer/Producer.java
@@ -4,8 +4,6 @@
package com.opendxl.databus.producer;
-import com.opendxl.databus.consumer.Consumer;
-import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.common.MetricName;
import com.opendxl.databus.common.PartitionInfo;
import com.opendxl.databus.common.RecordMetadata;
@@ -13,24 +11,29 @@
import com.opendxl.databus.common.internal.adapter.DatabusProducerRecordAdapter;
import com.opendxl.databus.common.internal.adapter.MetricNameMapAdapter;
import com.opendxl.databus.common.internal.adapter.PartitionInfoListAdapter;
+import com.opendxl.databus.consumer.Consumer;
import com.opendxl.databus.consumer.OffsetAndMetadata;
import com.opendxl.databus.consumer.OffsetCommitCallback;
import com.opendxl.databus.entities.internal.DatabusMessage;
+import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.producer.metric.ProducerMetric;
import com.opendxl.databus.producer.metric.ProducerMetricBuilder;
import com.opendxl.databus.producer.metric.ProducerMetricEnum;
import com.opendxl.databus.serialization.internal.DatabusKeySerializer;
import org.apache.commons.lang.StringUtils;
-import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.List;
+import java.time.Duration;
+import java.time.temporal.TemporalUnit;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
- * A abstract producer, responsible for handling Databus outgoing messages.
+ * A abstract producer, responsible for handling Databus outgoing messages.
+ *
* @param
payload's type
*/
public abstract class Producer
{
@@ -43,7 +46,7 @@ public abstract class Producer
{
/**
* A Kafka Serializer of {@link DatabusMessage}.
*/
- private org.apache.kafka.common.serialization.Serializer valueSerializer;
+ private org.apache.kafka.common.serialization.Serializer kafkaValueSerializer;
/**
* A configuration map for the producer.
@@ -90,7 +93,7 @@ public Map getConfiguration() {
* and the buffer is full.
* InterruptException If the thread is interrupted while blocked
*/
- public void send(final ProducerRecord record) {
+ public void send(final ProducerRecord
record) {
send(record, null);
}
@@ -133,9 +136,9 @@ public void send(final ProducerRecord record) {
* expensive callbacks it is recommended to use your own {@link java.util.concurrent.Executor} in the callback body
* to parallelize processing.
*
- * @param producerRecord The non-null record to send
- * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null
- * indicates no callback)
+ * @param producerRecord The non-null record to send
+ * @param callback A user-supplied callback to execute when the record has been acknowledged by the server
+ * (null indicates no callback)
* @throws IllegalArgumentException If record argumet is null
* @throws DatabusClientRuntimeException If send method fails. The original cause could be any of these exceptions:
*
SerializationException If the key or value are not valid objects
@@ -161,13 +164,26 @@ public void send(final ProducerRecord
producerRecord, final Callback callback
callbackAdapter = null;
}
- producer.send(targetProducerRecord, callbackAdapter);
+ sendKafkaRecord(targetProducerRecord, callbackAdapter);
} catch (Exception e) {
- throw new DatabusClientRuntimeException("send cannot be performed: " + e.getMessage(), e, Producer.class);
+ final String errMsg = "send cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
+ protected void
+ sendKafkaRecord(final org.apache.kafka.clients.producer.ProducerRecord record,
+ final org.apache.kafka.clients.producer.Callback callback) {
+ producer.send(record, callback);
+ }
+
+ protected void
+ sendKafkaRecord(final org.apache.kafka.clients.producer.ProducerRecord record) {
+ producer.send(record);
+ }
+
/**
* Invoking this method makes all buffered records immediately available to send (even if linger.ms is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
@@ -205,7 +221,9 @@ public void flush() {
try {
producer.flush();
} catch (Exception e) {
- throw new DatabusClientRuntimeException("flush cannot be performed :" + e.getMessage(), e, Producer.class);
+ final String errMsg = "flush cannot be performed :" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -216,7 +234,7 @@ public void flush() {
* @param topic to get info
* @return List of {@link PartitionInfo}
* @throws DatabusClientRuntimeException If partitionsFor method fails.
- * The original cause could be the following exception:
+ * The original cause could be the following exception:
* InterruptException If the thread is interrupted while blocked
*/
public List partitionsFor(final String topic) {
@@ -224,8 +242,9 @@ public List partitionsFor(final String topic) {
List partitions = producer.partitionsFor(topic);
return new PartitionInfoListAdapter().adapt(partitions);
} catch (Exception e) {
- throw new DatabusClientRuntimeException("partitionsFor cannot be performed :"
- + e.getMessage(), e, Producer.class);
+ final String errMsg = "partitionsFor cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -242,8 +261,9 @@ public List partitionsFor(final String topic) {
return new MetricNameMapAdapter().adapt(metrics);
} catch (Exception e) {
- throw new DatabusClientRuntimeException("metrics cannot be performed :"
- + e.getMessage(), e, Producer.class);
+ final String errMsg = "metrics cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -265,7 +285,9 @@ public void close() {
try {
producer.close();
} catch (Exception e) {
- throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class);
+ final String errMsg = "close cannot be performed :" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -287,38 +309,72 @@ public void close() {
* InterruptException If the thread is interrupted while blocked
*
IllegalArgumentException If the timeout is negative.
*/
+ @Deprecated
public void close(final long timeout, final TimeUnit timeUnit) {
try {
producer.close(timeout, timeUnit);
} catch (Exception e) {
- throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class);
+ final String errMsg = "close cannot be performed :" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
+ }
+
+ }
+
+ /**
+ * This method waits up to timeout for the producer to complete the sending of all incomplete requests.
+ *
+ * If the producer is unable to complete all requests before the timeout expires, this method will fail
+ * any unsent and unacknowledged records immediately.
+ *
+ * If invoked from within a {@link Callback} this method will not block and will be equivalent to
+ * close(0, TimeUnit.MILLISECONDS). This is done since no further sending will happen while
+ * blocking the I/O thread of the producer.
+ *
+ * @param duration The maximum time to wait for producer to complete any pending requests. The value should be
+ * non-negative. Specifying a timeout of zero means do not wait for pending send
+ * requests to complete.
+ * @param timeUnit The time unit for the timeoutl
+ * @throws DatabusClientRuntimeException If close method fails. The original cause could be any of these exceptions:
+ *
InterruptException If the thread is interrupted while blocked
+ *
IllegalArgumentException If the timeout is negative.
+ */
+ public void close(long duration, TemporalUnit timeUnit) {
+ try {
+ producer.close(Duration.of(duration, timeUnit));
+ } catch (Exception e) {
+ final String errMsg = "close cannot be performed :" + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
+
/**
* Set the DatabusKeySerializer in producer
*
* @param keySerializer A DatabusKeySerializer Instance
*/
- protected void setKeySerializer(final DatabusKeySerializer keySerializer) {
+ protected void setKafkaKeySerializer(final DatabusKeySerializer keySerializer) {
this.keySerializer = keySerializer;
}
/**
* Set the value serializer in producer
*
- * @param valueSerializer A Serializer object instance for the value serializer
+ * @param kafkaValueSerializer A Serializer object instance for the value serializer
*/
protected void
- setValueSerializer(final org.apache.kafka.common.serialization.Serializer valueSerializer) {
- this.valueSerializer = valueSerializer;
+ setKafkaValueSerializer(final org.apache.kafka.common.serialization.Serializer
+ kafkaValueSerializer) {
+ this.kafkaValueSerializer = kafkaValueSerializer;
}
/**
* Get the key serializer from producer
*
- * @return A {@link DatabusKeySerializer} object instance
+ * @return A {@link DatabusKeySerializer} object instance
*/
protected DatabusKeySerializer getKeySerializer() {
return keySerializer;
@@ -327,16 +383,15 @@ protected DatabusKeySerializer getKeySerializer() {
/**
* Get the value serializer from producer
*
- * @return A {@link org.apache.kafka.common.serialization.Serializer} object instance
+ * @return A {@link org.apache.kafka.common.serialization.Serializer} object instance
*/
- protected org.apache.kafka.common.serialization.Serializer getValueSerializer() {
- return valueSerializer;
+ protected org.apache.kafka.common.serialization.Serializer getKafkaValueSerializer() {
+ return kafkaValueSerializer;
}
/**
* Set a Kafka producer instance to the producer.
- *
- * @return A {@link org.apache.kafka.clients.producer.Producer} object instance to set in the producer
+ * @param producer Producer
*/
protected void setProducer(final org.apache.kafka.clients.producer.Producer producer) {
this.producer = producer;
@@ -351,6 +406,16 @@ protected void setDatabusProducerRecordAdapter(final DatabusProducerRecordAdapte
this.databusProducerRecordAdapter = databusProducerRecordAdapter;
}
+
+
+ /**
+ *
+ * @return Databus producer adapter
+ */
+ protected DatabusProducerRecordAdapter getDatabusProducerRecordAdapter() {
+ return this.databusProducerRecordAdapter;
+ }
+
/**
* Set the clientId to the producer
*
@@ -406,7 +471,7 @@ public void setConfiguration(final Map configuration) {
/**
* Needs to be called before any other methods when the transactional.id is set in the configuration.
- *
+ *
* This method does the following:
* 1. Ensures any transactions initiated by previous instances of the producer with the same
* transactional.id are completed. If the previous instance had failed with a transaction in
@@ -427,8 +492,9 @@ public void initTransactions() {
try {
producer.initTransactions();
} catch (Exception e) {
- throw new DatabusClientRuntimeException("initTransactions cannot be performed: "
- + e.getMessage(), e, Producer.class);
+ final String errMsg = "initTransactions cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -450,8 +516,9 @@ public void beginTransaction() {
try {
producer.beginTransaction();
} catch (Exception e) {
- throw new DatabusClientRuntimeException("beginTransaction cannot be performed: "
- + e.getMessage(), e, Producer.class);
+ final String errMsg = "beginTransaction cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -471,7 +538,7 @@ public void beginTransaction() {
* (via {@link Consumer#commitSync(Map) sync} or
* {@link Consumer#commitAsync(OffsetCommitCallback)} commits).
*
- * @param offsets offsets
+ * @param offsets offsets
* @param consumerGroupId consumer group id
* @throws DatabusClientRuntimeException If method fails. The original cause could be any of these exceptions:
*
IllegalStateException if no transactional.id has been configured or no transaction has been started
@@ -486,7 +553,7 @@ public void beginTransaction() {
* other unexpected error
*/
public void sendOffsetsToTransaction(final Map offsets,
- final String consumerGroupId) {
+ final String consumerGroupId) {
try {
Map adaptedOffsets = new HashMap();
@@ -501,35 +568,37 @@ public void sendOffsetsToTransaction(final Map
* Further, if any of the {@link #send(ProducerRecord)} calls which were part of the transaction hit irrecoverable
* errors, this method will throw the last received exception immediately and the transaction will not be committed.
* So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed.
- *
+ *
* DatabusClientRuntimeException If method fails. The original cause could be any of these exceptions:
*
IllegalStateException if no transactional.id has been configured or no transaction has been started
*
ProducerFencedException fatal error indicating another producer with the same transactional.id is active
*
org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
- * does not support transactions (i.e. if its version is lower than 0.11.0.0)
+ * does not support transactions (i.e. if its version is lower than 0.11.0.0)
*
org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
- * transactional.id is not authorized. See the exception for more details
+ * transactional.id is not authorized. See the exception for more details
*
KafkaException if the producer has encountered a previous fatal or abortable error, or for any
- * other unexpected error
+ * other unexpected error
*/
public void commitTransaction() {
try {
producer.commitTransaction();
} catch (Exception e) {
- throw new DatabusClientRuntimeException("commitTransaction cannot be performed: "
- + e.getMessage(), e, Producer.class);
+ final String errMsg = "commitTransaction cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -544,17 +613,18 @@ public void commitTransaction() {
*
IllegalStateException if no transactional.id has been configured or no transaction has been started
*
ProducerFencedException fatal error indicating another producer with the same transactional.id is active
*
org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
- * does not support transactions (i.e. if its version is lower than 0.11.0.0)
+ * does not support transactions (i.e. if its version is lower than 0.11.0.0)
*
org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
- * transactional.id is not authorized. See the exception for more details
+ * transactional.id is not authorized. See the exception for more details
*
KafkaException if the producer has encountered a previous fatal error or for any other unexpected error
*/
public void abortTransaction() {
try {
producer.abortTransaction();
} catch (Exception e) {
- throw new DatabusClientRuntimeException("abortTransaction cannot be performed: "
- + e.getMessage(), e, Producer.class);
+ final String errMsg = "abortTransaction cannot be performed: " + e.getMessage();
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, Producer.class);
}
}
@@ -715,7 +785,7 @@ private ProducerMetric getMetricPerClientId(final ProducerMetricEnum producerMet
/**
* Gets a {@link ProducerMetric} given a Topic name and a {@link ProducerMetricEnum}.
*
- * @param topic The topic name.
+ * @param topic The topic name.
* @param producerMetricEnum The {@link ProducerMetricEnum} to get the metric.
* @return a {@link ProducerMetric} instance.
*/
diff --git a/src/main/java/com/opendxl/databus/serialization/SerdeDatabus.java b/src/main/java/com/opendxl/databus/serialization/SerdeDatabus.java
index a9393ff..6629124 100644
--- a/src/main/java/com/opendxl/databus/serialization/SerdeDatabus.java
+++ b/src/main/java/com/opendxl/databus/serialization/SerdeDatabus.java
@@ -4,6 +4,7 @@
package com.opendxl.databus.serialization;
+import com.opendxl.databus.entities.TierStorage;
import com.opendxl.databus.entities.internal.DatabusMessage;
import com.opendxl.databus.serialization.internal.MessageDeserializer;
import com.opendxl.databus.serialization.internal.MessageSerializer;
@@ -17,6 +18,17 @@
*/
public class SerdeDatabus implements Serde {
+ private final TierStorage tierStorage;
+
+ public SerdeDatabus(final TierStorage tierStorage) {
+
+ this.tierStorage = tierStorage;
+ }
+
+ public SerdeDatabus() {
+ this(null);
+ }
+
/**
* Not implemented.
*/
@@ -50,6 +62,6 @@ public Serializer serializer() {
*/
@Override
public Deserializer deserializer() {
- return new MessageDeserializer();
+ return new MessageDeserializer(tierStorage);
}
}
diff --git a/src/main/java/com/opendxl/databus/serialization/internal/AvroMessageDeserializer.java b/src/main/java/com/opendxl/databus/serialization/internal/AvroMessageDeserializer.java
index 3707b51..9df383f 100644
--- a/src/main/java/com/opendxl/databus/serialization/internal/AvroMessageDeserializer.java
+++ b/src/main/java/com/opendxl/databus/serialization/internal/AvroMessageDeserializer.java
@@ -5,9 +5,11 @@
package com.opendxl.databus.serialization.internal;
+import com.opendxl.databus.common.internal.util.HeaderInternalField;
import com.opendxl.databus.consumer.ConsumerRecord;
import com.opendxl.databus.consumer.DatabusConsumer;
import com.opendxl.databus.entities.Headers;
+import com.opendxl.databus.entities.TierStorage;
import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.common.internal.adapter.HeadersAvroDeserializedAdapter;
import com.opendxl.databus.common.internal.adapter.PayloadHeadersAvroDeserializedAdapter;
@@ -17,6 +19,8 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Avro Message Deserializer
@@ -25,6 +29,11 @@
*/
public final class AvroMessageDeserializer implements InternalDeserializer {
+ /**
+ * The logger object.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(AvroMessageDeserializer.class);
+
/**
* The schema to define the message.
*/
@@ -44,32 +53,77 @@ public AvroMessageDeserializer(final Schema schema) {
this.reader = new GenericDatumReader(schema);
}
+
/**
- * Deserialize a message
*
- * @param data The data to serialize.
+ * @param topic the topic where the message comes from
+ * @param data data to be deserialized
* @return A {@link DatabusMessage} instance.
*/
@Override
public DatabusMessage deserialize(final String topic, final byte[] data) {
+ return this.deserialize(topic, data, null);
+ }
+
+ /**
+ *
+ * @param topic the topic where the message comes from
+ * @param data data to be deserialized
+ * @param tierStorage tier storage where the payload should be read
+ * @return A {@link DatabusMessage} instance.
+ */
+ @Override
+ public DatabusMessage deserialize(final String topic, final byte[] data, final TierStorage tierStorage) {
try {
- final GenericRecord avroRecord = reader.read(null, DecoderFactory.get().binaryDecoder(data, null));
+ GenericRecord avroRecord = reader.read(null, DecoderFactory.get().binaryDecoder(data, null));
- final Headers headers =
+ Headers headers =
new HeadersAvroDeserializedAdapter()
.adapt(avroRecord.get("headers"));
- final byte[] payload =
+ byte[] payload =
new PayloadHeadersAvroDeserializedAdapter()
.adapt(avroRecord.get("payload"));
- final DatabusMessage message = new DatabusMessage(headers, payload);
- return message;
+
+ // Tier Storage Section.
+ // When it is not null, it will try to read the object from Tier Storage and override headers and payload.
+ if (tierStorage != null) {
+ final String bucketName = headers.get(HeaderInternalField.TIER_STORAGE_BUCKET_NAME_KEY);
+ final String objectName = headers.get(HeaderInternalField.TIER_STORAGE_OBJECT_NAME_KEY);
+
+ if (bucketName != null && objectName != null && !bucketName.isEmpty() && !objectName.isEmpty()) {
+ byte[] tierStorageObjectContent = null;
+ try {
+ tierStorageObjectContent = tierStorage.get(bucketName, objectName);
+ } catch (Exception e) {
+ LOG.error("Error when reading message from Tier Storage. Bucket Name: "
+ + bucketName + "Object Name: "
+ + objectName, e);
+ }
+
+ if (tierStorageObjectContent != null && tierStorageObjectContent.length > 0) {
+ MessageStructure messageStructure =
+ MessageStructureFactory.getStructure(tierStorageObjectContent);
+ avroRecord = reader
+ .read(null, DecoderFactory.get().binaryDecoder(messageStructure.getPayload(),
+ null));
+ headers = new HeadersAvroDeserializedAdapter().adapt(avroRecord.get("headers"));
+ payload = new PayloadHeadersAvroDeserializedAdapter().adapt(avroRecord.get("payload"));
+ } else {
+ LOG.warn("Object content read from Tier Storage is null or empty. Bucket: " + bucketName
+ + " Object: " + objectName);
+ }
+ }
+ }
+
+ return new DatabusMessage(headers, payload);
} catch (Exception e) {
- throw new DatabusClientRuntimeException("Error deserializing Avro schema:" + schema.toString(true),
- e, AvroMessageDeserializer.class);
+ final String errMsg = "Error deserializing Avro schema:" + schema.toString(true);
+ LOG.error(errMsg, e);
+ throw new DatabusClientRuntimeException(errMsg, e, AvroMessageDeserializer.class);
}
}
}
diff --git a/src/main/java/com/opendxl/databus/serialization/internal/InternalDeserializer.java b/src/main/java/com/opendxl/databus/serialization/internal/InternalDeserializer.java
index 3215ebb..d0eac6d 100644
--- a/src/main/java/com/opendxl/databus/serialization/internal/InternalDeserializer.java
+++ b/src/main/java/com/opendxl/databus/serialization/internal/InternalDeserializer.java
@@ -4,6 +4,8 @@
package com.opendxl.databus.serialization.internal;
+import com.opendxl.databus.entities.TierStorage;
+
/**
* Internal Deserializer
* Used by SDK to deserialize an object of P type,
@@ -22,4 +24,14 @@ public interface InternalDeserializer {
*/
P deserialize(String topic, byte[] data);
+ /**
+ *
+ * @param topic the topic where the message comes from
+ * @param data data to be deserialized
+ * @param tierStorage tier storage where the payload should be read
+ * @return data of type P
+ */
+ P deserialize(String topic, byte[] data, TierStorage tierStorage);
+
+
}
diff --git a/src/main/java/com/opendxl/databus/serialization/internal/LegacyMessageDeserializer.java b/src/main/java/com/opendxl/databus/serialization/internal/LegacyMessageDeserializer.java
index 8105ab7..d4af046 100644
--- a/src/main/java/com/opendxl/databus/serialization/internal/LegacyMessageDeserializer.java
+++ b/src/main/java/com/opendxl/databus/serialization/internal/LegacyMessageDeserializer.java
@@ -6,6 +6,7 @@
import com.google.gson.Gson;
+import com.opendxl.databus.entities.TierStorage;
import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.common.internal.builder.TopicNameBuilder;
import com.opendxl.databus.common.internal.util.HeaderInternalField;
@@ -32,6 +33,13 @@ public final class LegacyMessageDeserializer implements InternalDeserializer {
+
+ /**
+ * Tier Storage
+ */
+ private TierStorage tierStorage;
+
+ /**
+ * Constructor
+ *
+ * @param tierStorage If null it will be ignored and payload won't be read
+ */
+ public MessageDeserializer(final TierStorage tierStorage) {
+ this.tierStorage = tierStorage;
+ }
+
+ /**
+ * Constructor
+ */
+ public MessageDeserializer() {
+ this(null);
+ }
+
+
/**
* Not implemented.
*/
@@ -22,6 +46,7 @@ public void configure(final Map map, final boolean b) {
/**
* Deserialize a message to a {@link DatabusMessage}
+ * If tierStorage is not null will be used to read the payload from the underlying Tier Storage.
*
* @param topic The topic name.
* @param serializedMessage A serialized message.
@@ -33,7 +58,7 @@ public DatabusMessage deserialize(final String topic, final byte[] serializedMes
final MessageStructure messageStructure = MessageStructureFactory.getStructure(serializedMessage);
final Integer version = messageStructure.getVersion();
final InternalDeserializer deserializer = DeserializerRegistry.getDeserializer(version);
- return deserializer.deserialize(topic, messageStructure.getPayload());
+ return deserializer.deserialize(topic, messageStructure.getPayload(), tierStorage);
}
diff --git a/src/main/java/com/opendxl/databus/serialization/internal/RawMessageDeserializer.java b/src/main/java/com/opendxl/databus/serialization/internal/RawMessageDeserializer.java
index a2801d2..9659fc5 100644
--- a/src/main/java/com/opendxl/databus/serialization/internal/RawMessageDeserializer.java
+++ b/src/main/java/com/opendxl/databus/serialization/internal/RawMessageDeserializer.java
@@ -4,6 +4,7 @@
package com.opendxl.databus.serialization.internal;
+import com.opendxl.databus.entities.TierStorage;
import com.opendxl.databus.entities.internal.DatabusMessage;
/**
@@ -23,4 +24,16 @@ public final class RawMessageDeserializer implements InternalDeserializer();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
+ config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
+ config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
+ config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ new DatabusTierStorageProducer(config, new ByteArraySerializer(), null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldFailWhenTransactionIdIsNotDefined() {
+ final Map config = new HashMap();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
+ config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
+
+ // The following line is commented on purpose to show that transaction id is not configured
+ //config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
+
+ new DatabusTierStorageProducer(config, new ByteArraySerializer(), tierStorage);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void shouldFailWhenTransactionIdIsNull() {
+ final Map config = new HashMap();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
+ config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
+
+ // The following line set TransactionId null
+ config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
+
+ new DatabusTierStorageProducer(config, new ByteArraySerializer(), tierStorage);
+ }
+
+ @Test(expected = DatabusClientRuntimeException.class)
+ public void shouldFailWhenTransactionIdIsEmpty() {
+ final Map config = new HashMap();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
+ config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
+
+ // The following line set TransactionId empty
+ config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "");
+
+ new DatabusTierStorageProducer(config, new ByteArraySerializer(), tierStorage);
+ }
+
+
+ @Test
+ public void shouldProduceARecordWithCallBackAndTierStorageRecord() {
+ final String topicName = UUID.randomUUID().toString();
+
+ Producer producer = null;
+ Consumer consumer = null;
+
+ try {
+ producer = getProducer();
+
+ // Prepare a record
+ final String message = "Hello World at "+ LocalDateTime.now();
+ final byte[] payload = message.getBytes(Charset.defaultCharset());
+ final String key = UUID.randomUUID().toString();
+ final ProducerRecord producerRecord = getProducerRecord(topicName, payload, key);
+
+ // Send the record and set an anonymous callback for check the result
+ CountDownLatch latch = new CountDownLatch(1);
+ producer.send(producerRecord, (metadata, exception) -> {
+ try {
+ if(exception != null) {
+ Assert.fail(exception.getMessage());
+ }
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ // Wait for callback being invoked by Kafka
+ boolean isTimeout = latch.await(10000, TimeUnit.MILLISECONDS);
+ if(!isTimeout) {
+ Assert.fail("Producer take a long time to produce a record");
+ return;
+ }
+
+ // Consume the record
+ consumer = getConsumer();
+ consumer.subscribe(Collections.singletonList(topicName));
+ boolean closed = false;
+ while(!closed) {
+ final ConsumerRecords records = consumer.poll(1000);
+ for(ConsumerRecord record : records) {
+ if(record.getKey().equals(key)) {
+ final String actualMessage = new String(record.getMessagePayload().getPayload());
+ Assert.assertTrue(actualMessage.equals(message));
+ closed = true;
+ break;
+ }
+ }
+ }
+ Assert.assertTrue(tierStorage.doesObjectExist(BUCKET_NAME, topicName + key));
+
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ }
+ }
+
+ @Test
+ public void shouldProduceARecordWithoutCallBackAndTierStorageRecord() {
+ final String topicName = UUID.randomUUID().toString();
+
+ Producer producer = null;
+ Consumer consumer = null;
+
+ try {
+ producer = getProducer();
+
+ // Prepare a record
+ final String message = "Hello World at "+ LocalDateTime.now();
+ final byte[] payload = message.getBytes(Charset.defaultCharset());
+ final String key = UUID.randomUUID().toString();
+ final ProducerRecord producerRecord = getProducerRecord(topicName, payload, key);
+
+ // Send the record
+ producer.send(producerRecord);
+
+ // Consume the record
+ consumer = getConsumer();
+ consumer.subscribe(Collections.singletonList(topicName));
+ boolean closed = false;
+ while(!closed) {
+ final ConsumerRecords records = consumer.poll(1000);
+ for(ConsumerRecord record : records) {
+ if(record.getKey().equals(key)) {
+ final String actualMessage = new String(record.getMessagePayload().getPayload());
+ Assert.assertTrue(actualMessage.equals(message));
+ closed = true;
+ break;
+ }
+ }
+ }
+ Assert.assertTrue(tierStorage.doesObjectExist(BUCKET_NAME, topicName + key));
+
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+
+ }
+ }
+
+
+ @Test
+ public void shouldConsumeWithoutTierStorage() {
+ final String topicName = UUID.randomUUID().toString();
+
+ Producer producer = null;
+ Consumer consumer = null;
+
+ try {
+ producer = getProducer();
+
+ // Prepare a record
+ final String message = "Hello World at "+ LocalDateTime.now();
+ final byte[] payload = message.getBytes(Charset.defaultCharset());
+ final String key = UUID.randomUUID().toString();
+ final ProducerRecord producerRecord = getProducerRecord(topicName, payload, key);
+
+ // Send the record and set an anonymous callback for check the result
+ CountDownLatch latch = new CountDownLatch(1);
+ producer.send(producerRecord, (metadata, exception) -> {
+ try {
+ if(exception != null) {
+ Assert.fail(exception.getMessage());
+ }
+ } finally {
+ latch.countDown();
+ }
+ });
+
+ // Wait for callback being invoked by Kafka
+ boolean isTimeout = latch.await(10000, TimeUnit.MILLISECONDS);
+ if(!isTimeout) {
+ Assert.fail("Producer take a long time to produce a record");
+ return;
+ }
+
+ // Consume the record
+ consumer = getConsumerWOTierStorage();
+ consumer.subscribe(Collections.singletonList(topicName));
+ boolean closed = false;
+ while(!closed) {
+ final ConsumerRecords records = consumer.poll(500);
+ for (ConsumerRecord record : records) {
+ final Headers headers = record.getHeaders();
+ final String bucketName = headers.get(HeaderInternalField.TIER_STORAGE_BUCKET_NAME_KEY);
+ final String objectName = headers.get(HeaderInternalField.TIER_STORAGE_OBJECT_NAME_KEY);
+ Assert.assertTrue(bucketName.equals(BUCKET_NAME));
+ Assert.assertTrue(objectName.equals(topicName + key));
+ closed = true;
+ break;
+ }
+ }
+
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ }
+
+
+ @Test
+ public void shouldConsumeWithoutCallbackAndTierStorage() {
+ final String topicName = UUID.randomUUID().toString();
+
+ Producer producer = null;
+ Consumer consumer = null;
+
+ try {
+ producer = getProducer();
+
+ // Prepare a record
+ final String message = "Hello World at "+ LocalDateTime.now();
+ final byte[] payload = message.getBytes(Charset.defaultCharset());
+ final String key = UUID.randomUUID().toString();
+ final ProducerRecord producerRecord = getProducerRecord(topicName, payload, key);
+
+ // Send the record
+ producer.send(producerRecord);
+
+ // Consume the record
+ consumer = getConsumerWOTierStorage();
+ consumer.subscribe(Collections.singletonList(topicName));
+ boolean closed = false;
+ while(!closed) {
+ final ConsumerRecords records = consumer.poll(500);
+ for (ConsumerRecord record : records) {
+ final Headers headers = record.getHeaders();
+ final String bucketName = headers.get(HeaderInternalField.TIER_STORAGE_BUCKET_NAME_KEY);
+ final String objectName = headers.get(HeaderInternalField.TIER_STORAGE_OBJECT_NAME_KEY);
+ Assert.assertTrue(bucketName.equals(BUCKET_NAME));
+ Assert.assertTrue(objectName.equals(topicName + key));
+ closed = true;
+ break;
+ }
+ }
+
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ if (consumer != null) {
+ consumer.close();
+ }
+ }
+ }
+
+ public Consumer getConsumer() {
+ final Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfiguration.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProps.put(ConsumerConfiguration.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ consumerProps.put(ConsumerConfiguration.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ consumerProps.put(ConsumerConfiguration.CLIENT_ID_CONFIG, "consumer-id-sample");
+ consumerProps.put(ConsumerConfiguration.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new DatabusConsumer(consumerProps, new ByteArrayDeserializer(), tierStorage );
+ }
+
+ public Consumer getConsumerWOTierStorage() {
+ final Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfiguration.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProps.put(ConsumerConfiguration.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ consumerProps.put(ConsumerConfiguration.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ consumerProps.put(ConsumerConfiguration.CLIENT_ID_CONFIG, "consumer-id-sample");
+ consumerProps.put(ConsumerConfiguration.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new DatabusConsumer(consumerProps, new ByteArrayDeserializer());
+ }
+ public Producer getProducer() {
+ final Map config = new HashMap();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
+ config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
+ config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
+ return new DatabusTierStorageProducer(config, new ByteArraySerializer(), tierStorage);
+ }
+
+ public ProducerRecord getProducerRecord(final String topic, final byte[] payload, String key) {
+ final TierStorageMetadata tStorageMetadata =
+ new TierStorageMetadata(BUCKET_NAME, topic + key);
+ final RoutingData routingData = new RoutingData(topic, key, null, tStorageMetadata);
+ final Headers headers = new Headers();
+ final MessagePayload messagePayload = new MessagePayload<>(payload);
+ return new ProducerRecord<>(routingData, headers, messagePayload);
+ }
+
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/opendxl/databus/producer/DatabusTierStorageProducerWithoutS3Test.java b/src/test/java/com/opendxl/databus/producer/DatabusTierStorageProducerWithoutS3Test.java
new file mode 100644
index 0000000..6c2b99c
--- /dev/null
+++ b/src/test/java/com/opendxl/databus/producer/DatabusTierStorageProducerWithoutS3Test.java
@@ -0,0 +1,187 @@
+package com.opendxl.databus.producer;
+
+import broker.ClusterHelper;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.opendxl.databus.consumer.Consumer;
+import com.opendxl.databus.consumer.ConsumerConfiguration;
+import com.opendxl.databus.consumer.DatabusConsumer;
+import com.opendxl.databus.entities.*;
+import com.opendxl.databus.exception.DatabusClientRuntimeException;
+import com.opendxl.databus.serialization.ByteArrayDeserializer;
+import com.opendxl.databus.serialization.ByteArraySerializer;
+import io.findify.s3mock.S3Mock;
+import junit.extensions.PA;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.charset.Charset;
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class DatabusTierStorageProducerWithoutS3Test {
+
+ private static final String AWS_SECRET_KEY = "secretKey";
+ private static final String AWS_ACCESS_KEY = "accessKey";
+ private static final String AWS_REGION = "us-east-1";
+ private static final String BUCKET_NAME = "databus-poc-test";
+ private static S3Mock api;
+ private static AmazonS3Client client;
+ private static S3TierStorage tierStorage;
+
+ @BeforeClass
+ public static void beforeClass() {
+ // Start Kafka cluster
+ ClusterHelper
+ .getInstance()
+ .addBroker(9092)
+ .zookeeperPort(2181)
+ .start();
+
+ api = new S3Mock.Builder().withPort(8001).withInMemoryBackend().build();
+ // api.start is missing on purpose to keep Tier Storage down
+ AwsClientBuilder.EndpointConfiguration endpoint =
+ new AwsClientBuilder
+ .EndpointConfiguration("http://localhost:8001", "us-east-1");
+
+ client = (AmazonS3Client) AmazonS3ClientBuilder
+ .standard()
+ .withPathStyleAccessEnabled(true)
+ .withEndpointConfiguration(endpoint)
+ .withCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()))
+ .build();
+
+ tierStorage = new S3TierStorage(AWS_REGION, new ClientConfiguration(),
+ AWS_ACCESS_KEY, AWS_SECRET_KEY);
+ PA.setValue(tierStorage, "s3Client", client);
+
+ }
+ @AfterClass
+ public static void afterClass() {
+ ClusterHelper.getInstance().stop();
+ }
+
+ @Test
+ public void shouldFailBecauseTierStorageIsUnreachable() {
+ final String topicName = UUID.randomUUID().toString();
+
+ Producer producer = null;
+
+ try {
+ producer = getProducer();
+
+ // Prepare a record
+ final String message = "Hello World at " + LocalDateTime.now();
+ final byte[] payload = message.getBytes(Charset.defaultCharset());
+ final String key = UUID.randomUUID().toString();
+ final ProducerRecord producerRecord = getProducerRecord(topicName, payload, key);
+
+ // Send the record
+ CountDownLatch latch = new CountDownLatch(1);
+ producer.send(producerRecord, (metadata, exception) -> {
+ try {
+ if (exception != null) {
+ Assert.fail(exception.getMessage());
+ }
+ } finally {
+ latch.countDown();
+ }
+ });
+ latch.await(10000, TimeUnit.MILLISECONDS);
+ Assert.fail();
+ } catch (DatabusClientRuntimeException e) {
+ Assert.assertTrue(true);
+ } catch (Exception e) {
+ Assert.fail();
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ }
+
+
+ @Test
+ public void shouldFailBecauseTierStorageIsUnreachable1() {
+ final String topicName = UUID.randomUUID().toString();
+
+ Producer producer = null;
+
+ try {
+ producer = getProducer();
+
+ // Prepare a record
+ final String message = "Hello World at " + LocalDateTime.now();
+ final byte[] payload = message.getBytes(Charset.defaultCharset());
+ final String key = UUID.randomUUID().toString();
+ final ProducerRecord producerRecord = getProducerRecord(topicName, payload, key);
+
+ // Send the record
+ producer.send(producerRecord);
+ Assert.fail();
+ } catch (DatabusClientRuntimeException e) {
+ Assert.assertTrue(true);
+ } catch (Exception e) {
+ Assert.fail();
+ } finally {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ }
+
+ public Consumer getConsumer() {
+ final Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfiguration.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProps.put(ConsumerConfiguration.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ consumerProps.put(ConsumerConfiguration.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ consumerProps.put(ConsumerConfiguration.CLIENT_ID_CONFIG, "consumer-id-sample");
+ consumerProps.put(ConsumerConfiguration.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new DatabusConsumer(consumerProps, new ByteArrayDeserializer(), tierStorage );
+ }
+
+ public Consumer getConsumerWOTierStorage() {
+ final Properties consumerProps = new Properties();
+ consumerProps.put(ConsumerConfiguration.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProps.put(ConsumerConfiguration.GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ consumerProps.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ consumerProps.put(ConsumerConfiguration.SESSION_TIMEOUT_MS_CONFIG, "30000");
+ consumerProps.put(ConsumerConfiguration.CLIENT_ID_CONFIG, "consumer-id-sample");
+ consumerProps.put(ConsumerConfiguration.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new DatabusConsumer(consumerProps, new ByteArrayDeserializer());
+ }
+ public Producer getProducer() {
+ final Map config = new HashMap();
+ config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ config.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id-sample");
+ config.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+ config.put(ProducerConfig.BATCH_SIZE_CONFIG, "150000");
+ config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, UUID.randomUUID().toString());
+ config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ return new DatabusTierStorageProducer(config, new ByteArraySerializer(), tierStorage);
+ }
+
+ public ProducerRecord getProducerRecord(final String topic, final byte[] payload, String key) {
+ TierStorageMetadata tStorageMetadata =
+ new TierStorageMetadata(BUCKET_NAME, topic + key);
+ RoutingData routingData = new RoutingData(topic, key, null, tStorageMetadata);
+ Headers headers = new Headers();
+ MessagePayload messagePayload = new MessagePayload<>(payload);
+ return new ProducerRecord<>(routingData, headers, messagePayload);
+ }
+
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/com/opendxl/databus/serialization/internal/MessageSerializationTest.java b/src/test/java/com/opendxl/databus/serialization/internal/MessageSerializationTest.java
index 3389c1c..30e93cf 100644
--- a/src/test/java/com/opendxl/databus/serialization/internal/MessageSerializationTest.java
+++ b/src/test/java/com/opendxl/databus/serialization/internal/MessageSerializationTest.java
@@ -30,7 +30,7 @@ public void shouldSerializeAndDeserializeADatabusMessage() {
DatabusMessage actualMessage = new DatabusMessage(new Headers(map),payload);
byte[] serializedMessage = ser.serialize(actualMessage);
- DatabusMessage deserializedMessage = des.deserialize(topic,serializedMessage);
+ DatabusMessage deserializedMessage = des.deserialize(topic, serializedMessage);
Assert.assertTrue(Arrays.equals(deserializedMessage.getPayload(),payload));
Assert.assertTrue(deserializedMessage.getHeaders().getAll().equals(headers.getAll()));