Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pulsar-io/kafka-connect-adaptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
Expand Down Expand Up @@ -68,14 +70,15 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {

// pulsar io related variables
private Iterator<SourceRecord> currentBatch = null;
private CompletableFuture<Void> flushFuture;
private OffsetBackingStore offsetStore;
private OffsetStorageReader offsetReader;
private String topicNamespace;
@Getter
public OffsetStorageWriter offsetWriter;
// number of outstandingRecords that have been polled but not been acked
private final AtomicInteger outstandingRecords = new AtomicInteger(0);
private final AtomicBoolean flushing = new AtomicBoolean(false);
private final AtomicReference<CompletableFuture<Void>> flushFutureRef = new AtomicReference<>();

public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass";

Expand Down Expand Up @@ -162,38 +165,103 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
sourceTask.initialize(sourceTaskContext);
sourceTask.start(taskConfig);
}
private void onOffsetsFlushed(Throwable error, CompletableFuture<Void> snapshotFlushFuture) {
if (error != null) {
log.error("Failed to flush offsets to storage: ", error);
offsetWriter.cancelFlush();
snapshotFlushFuture.completeExceptionally(new Exception("No Offsets Added Error", error));
return;
}
try {
sourceTask.commit();
if (log.isDebugEnabled()) {
log.debug("Finished flushing offsets to storage");
}
snapshotFlushFuture.complete(null);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.warn("Flush interrupted, cancelling", ie);
offsetWriter.cancelFlush();
snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", ie));
} catch (Throwable t) {
log.warn("Flush failed, cancelling", t);
offsetWriter.cancelFlush();
snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", t));
}
}

private void triggerOffsetsFlushIfNeeded() {
final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get();
// Only flush when we have a batch in flight, nothing outstanding, and a pending future
if (snapshotFlushFuture == null || snapshotFlushFuture.isDone() || outstandingRecords.get() != 0) {
return;
}
if (!flushing.compareAndSet(false, true)) {
return; // someone else is flushing
}
try {
if (offsetWriter.beginFlush()) {
offsetWriter.doFlush((error, ignored) -> {
try {
onOffsetsFlushed(error, snapshotFlushFuture);
} finally {
flushing.set(false);
}
});
} else {
try {
onOffsetsFlushed(null, snapshotFlushFuture);
} finally {
flushing.set(false);
}
}
} catch (ConnectException alreadyFlushing) {
// Another thread initiated the flush; let their callback complete the future.
// Keep 'flushing' = true until read() finalizes the batch.
} catch (Exception t) {
try {
onOffsetsFlushed(t, snapshotFlushFuture);
} finally {
flushing.set(false);
}
}
}

@Override
public synchronized Record<T> read() throws Exception {
while (true) {
if (currentBatch == null) {
flushFuture = new CompletableFuture<>();
List<SourceRecord> recordList = sourceTask.poll();
if (recordList == null || recordList.isEmpty()) {
continue;
}
outstandingRecords.addAndGet(recordList.size());
currentBatch = recordList.iterator();

final CompletableFuture<Void> newFuture = new CompletableFuture<>();
flushFutureRef.set(newFuture);
}
if (currentBatch.hasNext()) {
AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next());
if (processRecord == null || processRecord.isEmpty()) {
outstandingRecords.decrementAndGet();
continue;
triggerOffsetsFlushIfNeeded();
} else {
return processRecord;
}
} else {
// there is no records any more, then waiting for the batch to complete writing
// to sink and the offsets are committed as well, then do next round read.
final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get();
try {
flushFuture.get();
if (snapshotFlushFuture != null) {
snapshotFlushFuture.get();
}
} catch (ExecutionException ex) {
// log the error, continue execution
log.error("execution exception while get flushFuture", ex);
throw new Exception("Flush failed", ex.getCause());
} finally {
flushFuture = null;
flushing.set(false);
// Clear only if this is still the current batch future
flushFutureRef.compareAndSet(snapshotFlushFuture, null);
currentBatch = null;
}
}
Expand Down Expand Up @@ -272,62 +340,19 @@ public boolean isEmpty() {
return this.value == null;
}

private void completedFlushOffset(Throwable error, Void result) {
if (error != null) {
log.error("Failed to flush offsets to storage: ", error);
currentBatch = null;
offsetWriter.cancelFlush();
flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
} else {
try {
sourceTask.commit();

log.info("Finished flushing offsets to storage");
currentBatch = null;
flushFuture.complete(null);
} catch (InterruptedException exception) {
log.warn("Flush of {} offsets interrupted, cancelling", this);
Thread.currentThread().interrupt();
offsetWriter.cancelFlush();
flushFuture.completeExceptionally(new Exception("Failed to commit offsets", exception));
} catch (Throwable t) {
// SourceTask can throw unchecked ConnectException/KafkaException.
// Make sure the future is cancelled in that case
log.warn("Flush of {} offsets failed, cancelling", this);
offsetWriter.cancelFlush();
flushFuture.completeExceptionally(new Exception("Failed to commit offsets", t));
}
}
}

@Override
public void ack() {
// TODO: should flush for each batch. not wait for a time for acked all.
// How to handle order between each batch. QueueList<pair<batch, automicInt>>. check if head is all acked.
boolean canFlush = (outstandingRecords.decrementAndGet() == 0);

// consumed all the records, flush the offsets
if (canFlush && flushFuture != null) {
if (!offsetWriter.beginFlush()) {
log.error("When beginFlush, No offsets to commit!");
flushFuture.completeExceptionally(new Exception("No Offsets Added Error when beginFlush"));
return;
}

Future<Void> doFlush = offsetWriter.doFlush(this::completedFlushOffset);
if (doFlush == null) {
// Offsets added in processSourceRecord, But here no offsets to commit
log.error("No offsets to commit!");
flushFuture.completeExceptionally(new Exception("No Offsets Added Error"));
return;
}
// Decrement and let the centralized flusher decide if we should flush now
if (outstandingRecords.decrementAndGet() == 0) {
triggerOffsetsFlushIfNeeded();
}
}

@Override
public void fail() {
if (flushFuture != null) {
flushFuture.completeExceptionally(new Exception("Sink Error"));
final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get();
if (snapshotFlushFuture != null) {
snapshotFlushFuture.completeExceptionally(new Exception("Sink Error"));
}
}
}
Expand Down
Loading
Loading