Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
private PulsarKafkaConnectSinkConfig kafkaSinkConfig;

protected String topicName;
protected boolean useOptionalPrimitives;

private boolean sanitizeTopicName = false;
// Thi is a workaround for https://github.com/apache/pulsar/issues/19922
Expand Down Expand Up @@ -164,6 +165,7 @@ public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
useOptionalPrimitives = kafkaSinkConfig.isUseOptionalPrimitives();

useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
Expand Down Expand Up @@ -446,8 +448,11 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
&& sourceRecord.getSchema().getSchemaInfo() != null
&& sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
// Assume Key_Value schema's key and value are always optional
keySchema = PulsarSchemaToKafkaSchema
.getOptionalKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives);
valueSchema = PulsarSchemaToKafkaSchema
.getOptionalKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives);

Object nativeObject = sourceRecord.getValue().getNativeObject();

Expand All @@ -464,12 +469,13 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
keySchema = Schema.BYTES_SCHEMA;
keySchema = useOptionalPrimitives ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
} else {
key = sourceRecord.getKey().orElse(null);
keySchema = Schema.STRING_SCHEMA;
keySchema = useOptionalPrimitives ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
}
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
valueSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(sourceRecord.getSchema(), useOptionalPrimitives);
value = KafkaConnectData.getKafkaConnectData(sourceRecord.getValue().getNativeObject(), valueSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
private boolean collapsePartitionedTopics = false;

@FieldDoc(
defaultValue = "false",
help = "Pulsar schema does not contain information whether the Schema is optional, Kafka's does. \n"
+ "This provides a way to force all primitive schemas to be optional for Kafka. \n")
private boolean useOptionalPrimitives = false;

public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ public Schema schema() {
}

private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
private static final ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToOptionalKafkaSchema;
private static final ImmutableSet<String> kafkaLogicalSchemas;
private static final AvroData avroData = new AvroData(1000);
private static final Cache<byte[], Schema> schemaCache =
CacheBuilder.newBuilder().maximumSize(10000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();
private static final Cache<Schema, Schema> optionalSchemaCache =
CacheBuilder.newBuilder().maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES).build();

static {
pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
Expand All @@ -134,6 +138,17 @@ public Schema schema() {
.put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
.put(SchemaType.DATE, Date.SCHEMA)
.build();
pulsarSchemaTypeToOptionalKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
.put(SchemaType.BOOLEAN, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.put(SchemaType.INT8, Schema.OPTIONAL_INT8_SCHEMA)
.put(SchemaType.INT16, Schema.OPTIONAL_INT16_SCHEMA)
.put(SchemaType.INT32, Schema.OPTIONAL_INT32_SCHEMA)
.put(SchemaType.INT64, Schema.OPTIONAL_INT64_SCHEMA)
.put(SchemaType.FLOAT, Schema.OPTIONAL_FLOAT32_SCHEMA)
.put(SchemaType.DOUBLE, Schema.OPTIONAL_FLOAT64_SCHEMA)
.put(SchemaType.STRING, Schema.OPTIONAL_STRING_SCHEMA)
.put(SchemaType.BYTES, Schema.OPTIONAL_BYTES_SCHEMA)
.build();
kafkaLogicalSchemas = ImmutableSet.<String>builder()
.add(Timestamp.LOGICAL_NAME)
.add(Date.LOGICAL_NAME)
Expand All @@ -153,12 +168,33 @@ private static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
return parser.parse(schemaJson);
}

public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
Schema s = getKafkaConnectSchema(pulsarSchema);
return new OptionalForcingSchema(s);
public static Schema makeOptional(Schema s) {
if (s == null || s.isOptional()) {
return s;
}

String logicalSchemaName = s.name();
if (kafkaLogicalSchemas.contains(logicalSchemaName)) {
return s;
}

try {
return optionalSchemaCache.get(s, () -> new OptionalForcingSchema(s));
} catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
String msg = "Failed to create optional schema for " + s;
log.error(msg);
throw new IllegalStateException(msg, ee);
}
}

public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
public static Schema getOptionalKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
boolean useOptionalPrimitives) {
return makeOptional(getKafkaConnectSchema(pulsarSchema, useOptionalPrimitives));

}

public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
boolean useOptionalPrimitives) {
if (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null) {
throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
}
Expand Down Expand Up @@ -191,6 +227,11 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
throw new IllegalStateException("Unsupported Kafka Logical Schema " + logicalSchemaName);
}

if (useOptionalPrimitives
&& pulsarSchemaTypeToOptionalKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
return pulsarSchemaTypeToOptionalKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}

if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
}
Expand All @@ -199,8 +240,10 @@ public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema p
return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
getOptionalKafkaConnectSchema(kvSchema.getValueSchema()))
return SchemaBuilder.map(
makeOptional(getKafkaConnectSchema(kvSchema.getKeySchema(), useOptionalPrimitives)),
makeOptional(getKafkaConnectSchema(kvSchema.getValueSchema(), useOptionalPrimitives)))
.optional()
.build();
}
org.apache.avro.Schema avroSchema = parseAvroSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public T answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}

private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";
final private String offsetTopicName = "persistent://my-property/my-ns/kafka-connect-sink-offset";

private Path file;
private Map<String, Object> props;
Expand Down Expand Up @@ -797,7 +797,9 @@ public void kafkaLogicalTypesTimestampTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("12/30/1999 11:12:13");
Object connectData = KafkaConnectData
Expand All @@ -815,7 +817,9 @@ public void kafkaLogicalTypesTimeTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("01/01/1970 11:12:13");
Object connectData = KafkaConnectData
Expand All @@ -833,7 +837,9 @@ public void kafkaLogicalTypesDateTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

java.util.Date date = getDateFromString("12/31/2022 00:00:00");
Object connectData = KafkaConnectData
Expand All @@ -854,7 +860,9 @@ public void kafkaLogicalTypesDecimalTest() {
.build());

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
.getKafkaConnectSchema(schema, true);

Assert.assertFalse(kafkaSchema.isOptional());

Object connectData = KafkaConnectData
.getKafkaConnectData(Decimal.fromLogical(kafkaSchema, BigDecimal.valueOf(100L, 10)), kafkaSchema);
Expand All @@ -874,11 +882,11 @@ public void connectDataComplexAvroSchemaGenericRecordTest() {
getGenericRecord(value, pulsarAvroSchema));

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema));
.getKafkaConnectSchema(Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema), false);

Object connectData = KafkaConnectData.getKafkaConnectData(kv, kafkaSchema);

org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
Assert.assertTrue(kafkaSchema.isOptional());
Assert.assertTrue(kafkaSchema.keySchema().isOptional());
Assert.assertTrue(kafkaSchema.valueSchema().isOptional());
}

@Test
Expand Down Expand Up @@ -990,7 +998,8 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem
Object value = pojoAsAvroRecord(pojo, pulsarAvroSchema);

org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(pulsarAvroSchema);
.getKafkaConnectSchema(pulsarAvroSchema, false);
Assert.assertFalse(kafkaSchema.isOptional());

Object connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);

Expand All @@ -999,6 +1008,18 @@ private void testPojoAsAvroAndJsonConversionToConnectData(Object pojo, AvroSchem
Object jsonNode = pojoAsJsonNode(pojo);
connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);

kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(pulsarAvroSchema, true);
Assert.assertFalse(kafkaSchema.isOptional());

connectData = KafkaConnectData.getKafkaConnectData(value, kafkaSchema);

org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);

jsonNode = pojoAsJsonNode(pojo);
connectData = KafkaConnectData.getKafkaConnectData(jsonNode, kafkaSchema);
org.apache.kafka.connect.data.ConnectSchema.validateValue(kafkaSchema, connectData);
}

private JsonNode pojoAsJsonNode(Object pojo) {
Expand Down
Loading