diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 1a2b0607dd1e..ab5159be125a 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Locale; import java.util.UUID; +import javax.annotation.Nullable; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.TimestampAssigner; @@ -43,10 +44,12 @@ import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.maintenance.operator.LockRemover; +import org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory; import org.apache.iceberg.flink.maintenance.operator.MonitorSource; import org.apache.iceberg.flink.maintenance.operator.TableChange; import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; import org.apache.iceberg.flink.maintenance.operator.TriggerManager; +import org.apache.iceberg.flink.maintenance.operator.TriggerManagerOperatorFactory; import org.apache.iceberg.flink.sink.IcebergSink; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -71,39 +74,77 @@ private TableMaintenance() {} * * @param changeStream the table changes * @param tableLoader used for accessing the table - * @param lockFactory used for preventing concurrent task runs + * @param lockFactory used for preventing concurrent task runs, if null, use coordination lock. * @return builder for the maintenance stream + * @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link #forChangeStream(DataStream, + * TableLoader)} instead. */ + @Deprecated @Internal public static Builder forChangeStream( DataStream changeStream, TableLoader tableLoader, - TriggerLockFactory lockFactory) { + @Nullable TriggerLockFactory lockFactory) { Preconditions.checkNotNull(changeStream, "The change stream should not be null"); Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); - Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); return new Builder(null, changeStream, tableLoader, lockFactory); } + /** + * Use when the change stream is already provided, like in the {@link + * IcebergSink#addPostCommitTopology(DataStream)}. + * + * @param changeStream the table changes + * @param tableLoader used for accessing the table + * @return builder for the maintenance stream + */ + @Internal + public static Builder forChangeStream( + DataStream changeStream, TableLoader tableLoader) { + Preconditions.checkNotNull(changeStream, "The change stream should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + + return new Builder(null, changeStream, tableLoader, null); + } + /** * Use this for standalone maintenance job. It creates a monitor source that detect table changes * and build the maintenance pipelines afterwards. * * @param env used to register the monitor source * @param tableLoader used for accessing the table - * @param lockFactory used for preventing concurrent task runs + * @param lockFactory used for preventing concurrent task runs. If null, use coordination lock. * @return builder for the maintenance stream + * @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link + * #forTable(StreamExecutionEnvironment, TableLoader)} instead. */ + @Deprecated public static Builder forTable( - StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { + StreamExecutionEnvironment env, + TableLoader tableLoader, + @Nullable TriggerLockFactory lockFactory) { Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); - Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); return new Builder(env, null, tableLoader, lockFactory); } + /** + * Use this for standalone maintenance job. It creates a monitor source that detect table changes + * and build the maintenance pipelines afterwards. But use coordination lock default. + * + * @param env used to register the monitor source + * @param tableLoader used for accessing the table + * @return builder for the maintenance stream + */ + public static Builder forTable(StreamExecutionEnvironment env, TableLoader tableLoader) { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); + + return new Builder(env, null, tableLoader, null); + } + public static class Builder { private final StreamExecutionEnvironment env; private final DataStream inputStream; @@ -226,21 +267,43 @@ public void append() throws IOException { try (TableLoader loader = tableLoader.clone()) { loader.open(); String tableName = loader.loadTable().name(); - DataStream triggers = - DataStreamUtils.reinterpretAsKeyedStream( - changeStream(tableName, loader), unused -> true) - .process( - new TriggerManager( - loader, - lockFactory, - taskNames, - evaluators, - rateLimit.toMillis(), - lockCheckDelay.toMillis())) - .name(TRIGGER_MANAGER_OPERATOR_NAME) - .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) - .slotSharingGroup(slotSharingGroup) - .forceNonParallel() + DataStream triggers; + if (lockFactory == null) { + triggers = + DataStreamUtils.reinterpretAsKeyedStream( + changeStream(tableName, loader), unused -> true) + .transform( + TRIGGER_MANAGER_OPERATOR_NAME, + TypeInformation.of(Trigger.class), + new TriggerManagerOperatorFactory( + tableName, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + } else { + triggers = + DataStreamUtils.reinterpretAsKeyedStream( + changeStream(tableName, loader), unused -> true) + .process( + new TriggerManager( + loader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_OPERATOR_NAME) + .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + } + + triggers = + triggers .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) .name(WATERMARK_ASSIGNER_OPERATOR_NAME) .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix) @@ -277,14 +340,25 @@ public void append() throws IOException { } // Add the LockRemover to the end - unioned - .transform( - LOCK_REMOVER_OPERATOR_NAME, - TypeInformation.of(Void.class), - new LockRemover(tableName, lockFactory, taskNames)) - .forceNonParallel() - .uid("lock-remover-" + uidSuffix) - .slotSharingGroup(slotSharingGroup); + if (lockFactory == null) { + unioned + .transform( + LOCK_REMOVER_OPERATOR_NAME, + TypeInformation.of(Void.class), + new LockRemoverOperatorFactory(tableName, taskNames)) + .uid("lock-remover-" + uidSuffix) + .forceNonParallel() + .slotSharingGroup(slotSharingGroup); + } else { + unioned + .transform( + LOCK_REMOVER_OPERATOR_NAME, + TypeInformation.of(Void.class), + new LockRemover(tableName, lockFactory, taskNames)) + .forceNonParallel() + .uid("lock-remover-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + } } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockAcquireResultEvent.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockAcquireResultEvent.java new file mode 100644 index 000000000000..e23a1d708e51 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockAcquireResultEvent.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +/** + * Event sent from TriggerManager operator to TableMaintenanceCoordinator to confirm that the lock + * has been acquired and the maintenance task is starting. + */ +public class LockAcquireResultEvent implements OperatorEvent { + + private final boolean isLock; + private final String lockId; + private final long timestamp; + + public LockAcquireResultEvent(boolean isLock, String lockId, long timestamp) { + this.isLock = isLock; + this.lockId = lockId; + this.timestamp = timestamp; + } + + public String lockId() { + return lockId; + } + + public boolean isLock() { + return isLock; + } + + public long timestamp() { + return timestamp; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("lockId", lockId) + .add("isLockHeld", isLock) + .add("timestamp", timestamp) + .toString(); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java new file mode 100644 index 000000000000..5608d7a9a70a --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Objects; + +/** + * Event sent from TriggerManager operator to TableMaintenanceCoordinator to confirm that the lock + * has been acquired and the maintenance task is starting. + */ +public class LockRegisterEvent implements OperatorEvent { + + private final String lockId; + private final long timestamp; + + public LockRegisterEvent(String lockId, long timestamp) { + this.lockId = lockId; + this.timestamp = timestamp; + } + + public String lockId() { + return lockId; + } + + public long timestamp() { + return timestamp; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof LockRegisterEvent)) { + return false; + } + + LockRegisterEvent that = (LockRegisterEvent) o; + return Objects.equal(timestamp, that.timestamp) && Objects.equal(lockId, that.lockId); + } + + @Override + public int hashCode() { + return Objects.hashCode(timestamp, lockId); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("timeStamp", timestamp) + .add("lockId", lockId) + .toString(); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleasedEvent.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleasedEvent.java new file mode 100644 index 000000000000..1ad1ae6c0e21 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleasedEvent.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; + +/** + * Event sent from TriggerManagerOperatorCoordinator to TriggerManager operator to notify that a + * lock has been released (watermark arrived). + */ +public class LockReleasedEvent implements OperatorEvent { + + private final String lockId; + private final long timestamp; + + public LockReleasedEvent(String lockId, long timestamp) { + this.lockId = lockId; + this.timestamp = timestamp; + } + + public long timestamp() { + return timestamp; + } + + public String lockId() { + return lockId; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("lockId", lockId) + .add("timestamp", timestamp) + .toString(); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java new file mode 100644 index 000000000000..7e5e0e07cf2c --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Experimental +@Internal +public class LockRemoverOperator extends AbstractStreamOperator + implements OneInputStreamOperator, OperatorEventHandler { + private static final Logger LOG = LoggerFactory.getLogger(LockRemoverOperator.class); + + private static final long serialVersionUID = 1L; + + private final String tableName; + private final OperatorEventGateway operatorEventGateway; + private final List maintenanceTaskNames; + private transient List succeededTaskResultCounters; + private transient List failedTaskResultCounters; + private transient List taskLastRunDurationMs; + + LockRemoverOperator( + StreamOperatorParameters parameters, + OperatorEventGateway operatorEventGateway, + String tableName, + List maintenanceTaskNames) { + super(parameters); + this.tableName = tableName; + this.operatorEventGateway = operatorEventGateway; + this.maintenanceTaskNames = maintenanceTaskNames; + } + + @Override + public void open() throws Exception { + this.succeededTaskResultCounters = + Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); + this.failedTaskResultCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); + this.taskLastRunDurationMs = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); + for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { + MetricGroup taskMetricGroup = + TableMaintenanceMetrics.groupFor( + getRuntimeContext(), tableName, maintenanceTaskNames.get(taskIndex), taskIndex); + succeededTaskResultCounters.add( + taskMetricGroup.counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER)); + failedTaskResultCounters.add( + taskMetricGroup.counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER)); + AtomicLong duration = new AtomicLong(0); + taskLastRunDurationMs.add(duration); + taskMetricGroup.gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get); + } + } + + @Override + public void handleOperatorEvent(OperatorEvent event) { + // no incoming events + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void processElement(StreamRecord streamRecord) { + TaskResult taskResult = streamRecord.getValue(); + LOG.info( + "Processing result {} for task {}", + taskResult, + maintenanceTaskNames.get(taskResult.taskIndex())); + long duration = System.currentTimeMillis() - taskResult.startEpoch(); + operatorEventGateway.sendEventToCoordinator( + new LockReleasedEvent(tableName, streamRecord.getTimestamp())); + + // Update the metrics + taskLastRunDurationMs.get(taskResult.taskIndex()).set(duration); + if (taskResult.success()) { + succeededTaskResultCounters.get(taskResult.taskIndex()).inc(); + } else { + failedTaskResultCounters.get(taskResult.taskIndex()).inc(); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + operatorEventGateway.sendEventToCoordinator( + new LockReleasedEvent(tableName, mark.getTimestamp())); + super.processWatermark(mark); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java new file mode 100644 index 000000000000..1428994b7377 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.iceberg.flink.maintenance.api.TaskResult; + +@Internal +public class LockRemoverOperatorFactory extends AbstractStreamOperatorFactory + implements CoordinatedOperatorFactory, OneInputStreamOperatorFactory { + private final String tableName; + private final List maintenanceTaskNames; + + public LockRemoverOperatorFactory(String tableName, List maintenanceTaskNames) { + this.tableName = tableName; + this.maintenanceTaskNames = maintenanceTaskNames; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new TableMaintenanceCoordinatorProvider(operatorName, operatorID); + } + + @SuppressWarnings("unchecked") + @Override + public > T createStreamOperator( + StreamOperatorParameters parameters) { + OperatorID operatorId = parameters.getStreamConfig().getOperatorID(); + OperatorEventGateway gateway = + parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId); + LockRemoverOperator lockRemoverOperator = + new LockRemoverOperator(parameters, gateway, tableName, maintenanceTaskNames); + parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, lockRemoverOperator); + + return (T) lockRemoverOperator; + } + + @SuppressWarnings("rawtypes") + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return LockRemoverOperator.class; + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceCoordinator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceCoordinator.java new file mode 100644 index 000000000000..31e0ef1f3bac --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceCoordinator.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.function.Consumer; +import javax.annotation.Nonnull; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FatalExitExceptionHandler; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Experimental +@Internal +public class TableMaintenanceCoordinator implements OperatorCoordinator { + + private static final Logger LOG = LoggerFactory.getLogger(TableMaintenanceCoordinator.class); + + private final String operatorName; + private final Context context; + + private final ExecutorService coordinatorExecutor; + private final CoordinatorExecutorThreadFactory coordinatorThreadFactory; + private transient boolean started; + private final transient SubtaskGateways subtaskGateways; + private static final Map> LOCK_RELEASE_CONSUMERS = + Maps.newConcurrentMap(); + + public TableMaintenanceCoordinator(String operatorName, Context context) { + this.operatorName = operatorName; + this.context = context; + + this.coordinatorThreadFactory = + new CoordinatorExecutorThreadFactory( + "TableMaintenanceCoordinator-" + operatorName, context.getUserCodeClassloader()); + this.coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory); + this.subtaskGateways = new SubtaskGateways(operatorName, context.currentParallelism()); + LOG.info("Created TableMaintenanceCoordinator: {}", operatorName); + } + + @Override + public void start() throws Exception { + LOG.info("Starting TableMaintenanceCoordinator: {}", operatorName); + this.started = true; + } + + @Override + public void close() throws Exception { + coordinatorExecutor.shutdown(); + this.started = false; + LOG.info("Closed TableMaintenanceCoordinator: {}", operatorName); + LOCK_RELEASE_CONSUMERS.clear(); + } + + @Override + public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) { + runInCoordinatorThread( + () -> { + LOG.debug( + "Handling event from subtask {} (#{}) of {}: {}", + subtask, + attemptNumber, + operatorName, + event); + if (event instanceof LockRegisterEvent) { + registerTriggerManagerReceiveReleaseEvent((LockRegisterEvent) event); + } else if (event instanceof LockReleasedEvent) { + handleReleaseLock((LockReleasedEvent) event); + } else { + throw new IllegalArgumentException( + "Invalid operator event type: " + event.getClass().getCanonicalName()); + } + }, + String.format( + Locale.ROOT, + "handling operator event %s from subtask %d (#%d)", + event.getClass(), + subtask, + attemptNumber)); + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void registerTriggerManagerReceiveReleaseEvent(LockRegisterEvent lockRegisterEvent) { + LOCK_RELEASE_CONSUMERS.put( + lockRegisterEvent.lockId(), + lock -> { + LOG.info( + "Send release event for lock id {}, timestamp: {} to Operator {}", + lock.lockId(), + lock.timestamp(), + operatorName); + this.subtaskGateways.getSubtaskGateway(0).sendEvent(lock); + }); + } + + /** Release the lock and optionally trigger the next pending task. */ + private void handleReleaseLock(LockReleasedEvent lockReleasedEvent) { + if (LOCK_RELEASE_CONSUMERS.containsKey(lockReleasedEvent.lockId())) { + LOCK_RELEASE_CONSUMERS.get(lockReleasedEvent.lockId()).accept(lockReleasedEvent); + LOG.info( + "Send release event for lock id {}, timestamp: {}", + lockReleasedEvent.lockId(), + lockReleasedEvent.timestamp()); + } else { + LOG.info( + "No consumer for lock id {}, timestamp: {}", + lockReleasedEvent.lockId(), + lockReleasedEvent.timestamp()); + } + } + + @Override + public void checkpointCoordinator(long checkpointId, CompletableFuture resultFuture) { + // We don’t need to track how many locks are currently held, because when recovering from state, + // a `recover lock` will be issued to ensure all tasks finish running and then release all + // locks. + // The `TriggerManagerOperator` already keeps the `TableChange` state and related information, + // so there’s no need to store additional state here. + runInCoordinatorThread( + () -> { + resultFuture.complete(new byte[0]); + }, + String.format(Locale.ROOT, "taking checkpoint %d", checkpointId)); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) {} + + @Override + public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {} + + @Override + public void subtaskReset(int subtask, long checkpointId) { + runInCoordinatorThread( + () -> { + LOG.info("Subtask {} is reset to checkpoint {}", subtask, checkpointId); + Preconditions.checkState(coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.reset(subtask); + }, + String.format( + Locale.ROOT, "handling subtask %d recovery to checkpoint %d", subtask, checkpointId)); + } + + @Override + public void executionAttemptFailed(int subtask, int attemptNumber, Throwable reason) { + runInCoordinatorThread( + () -> { + LOG.info( + "Unregistering gateway after failure for subtask {} (#{}) of data statistics {}", + subtask, + attemptNumber, + operatorName); + Preconditions.checkState( + this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber); + }, + String.format(Locale.ROOT, "handling subtask %d (#%d) failure", subtask, attemptNumber)); + } + + @Override + public void executionAttemptReady(int subtask, int attemptNumber, SubtaskGateway gateway) { + Preconditions.checkArgument(subtask == gateway.getSubtask()); + Preconditions.checkArgument(attemptNumber == gateway.getExecution().getAttemptNumber()); + runInCoordinatorThread( + () -> { + Preconditions.checkState( + this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()); + subtaskGateways.registerSubtaskGateway(gateway); + }, + String.format( + Locale.ROOT, + "making event gateway to subtask %d (#%d) available", + subtask, + attemptNumber)); + } + + private void runInCoordinatorThread(Runnable runnable, String actionString) { + ensureStarted(); + coordinatorExecutor.execute( + () -> { + try { + runnable.run(); + } catch (Throwable t) { + LOG.error( + "Uncaught exception in TableMaintenanceCoordinator while {}: {}", + actionString, + t.getMessage(), + t); + context.failJob(t); + } + }); + } + + @VisibleForTesting + void callInCoordinatorThread(Callable callable, String errorMessage) { + ensureStarted(); + // Ensure the task is done by the coordinator executor. + if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) { + try { + Callable guardedCallable = + () -> { + try { + return callable.call(); + } catch (Throwable t) { + LOG.error( + "Uncaught Exception in table maintenance coordinator: {} executor", + operatorName, + t); + ExceptionUtils.rethrowException(t); + return null; + } + }; + + coordinatorExecutor.submit(guardedCallable).get(); + } catch (InterruptedException | ExecutionException e) { + throw new FlinkRuntimeException(errorMessage, e); + } + } else { + try { + callable.call(); + } catch (Throwable t) { + LOG.error( + "Uncaught Exception in table maintenance coordinator: {} executor", operatorName, t); + throw new FlinkRuntimeException(errorMessage, t); + } + } + } + + private void ensureStarted() { + Preconditions.checkState(started, "The coordinator has not started yet."); + } + + /** Inner class to manage subtask gateways. */ + private static class SubtaskGateways { + private final String operatorName; + private final Map[] gateways; + + @SuppressWarnings("unchecked") + private SubtaskGateways(String operatorName, int parallelism) { + this.operatorName = operatorName; + gateways = new Map[parallelism]; + + for (int i = 0; i < parallelism; ++i) { + gateways[i] = new java.util.HashMap<>(); + } + } + + private void registerSubtaskGateway(SubtaskGateway gateway) { + int subtaskIndex = gateway.getSubtask(); + int attemptNumber = gateway.getExecution().getAttemptNumber(); + Preconditions.checkState( + !gateways[subtaskIndex].containsKey(attemptNumber), + "Coordinator of %s already has a subtask gateway for %d (#%d)", + operatorName, + subtaskIndex, + attemptNumber); + LOG.debug( + "Coordinator of {} registers gateway for subtask {} attempt {}", + operatorName, + subtaskIndex, + attemptNumber); + gateways[subtaskIndex].put(attemptNumber, gateway); + LOG.debug("Registered gateway for subtask {} attempt {}", subtaskIndex, attemptNumber); + } + + private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) { + gateways[subtaskIndex].remove(attemptNumber); + LOG.debug("Unregistered gateway for subtask {} attempt {}", subtaskIndex, attemptNumber); + } + + private SubtaskGateway getSubtaskGateway(int subtaskIndex) { + Preconditions.checkState( + !gateways[subtaskIndex].isEmpty(), + "Coordinator subtask %d is not ready yet to receive events", + subtaskIndex); + return gateways[subtaskIndex].values().iterator().next(); + } + + private void reset(int subtaskIndex) { + gateways[subtaskIndex].clear(); + } + } + + /** Custom thread factory for the coordinator executor. */ + private static class CoordinatorExecutorThreadFactory + implements ThreadFactory, Thread.UncaughtExceptionHandler { + + private final String coordinatorThreadName; + private final ClassLoader classLoader; + private final Thread.UncaughtExceptionHandler errorHandler; + + private Thread thread; + + CoordinatorExecutorThreadFactory(String coordinatorThreadName, ClassLoader contextClassLoader) { + this(coordinatorThreadName, contextClassLoader, FatalExitExceptionHandler.INSTANCE); + } + + CoordinatorExecutorThreadFactory( + String coordinatorThreadName, + ClassLoader contextClassLoader, + Thread.UncaughtExceptionHandler errorHandler) { + this.coordinatorThreadName = coordinatorThreadName; + this.classLoader = contextClassLoader; + this.errorHandler = errorHandler; + } + + @Override + public synchronized Thread newThread(@Nonnull Runnable runnable) { + thread = new Thread(runnable, coordinatorThreadName); + thread.setContextClassLoader(classLoader); + thread.setUncaughtExceptionHandler(this); + return thread; + } + + @Override + public synchronized void uncaughtException(Thread t, Throwable e) { + errorHandler.uncaughtException(t, e); + } + + boolean isCurrentThreadCoordinatorThread() { + return Thread.currentThread() == thread; + } + } + + @VisibleForTesting + Map> lockHeldMap() { + return LOCK_RELEASE_CONSUMERS; + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceCoordinatorProvider.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceCoordinatorProvider.java new file mode 100644 index 000000000000..da1b808b8eb7 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TableMaintenanceCoordinatorProvider.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator; + +@Internal +public class TableMaintenanceCoordinatorProvider + extends RecreateOnResetOperatorCoordinator.Provider { + + private final String operatorName; + + public TableMaintenanceCoordinatorProvider(String operatorName, OperatorID operatorID) { + super(operatorID); + this.operatorName = operatorName; + } + + @Override + public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) { + return new TableMaintenanceCoordinator(operatorName, context); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java index f1f2b51c0943..6d8326645d42 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java @@ -226,7 +226,8 @@ private void checkAndFire(long current, TimerService timerService, Collector evaluators, - List changes, - List lastTriggerTimes, - long currentTime, - int startPos) { - int current = startPos; - do { - if (evaluators - .get(current) - .check(changes.get(current), lastTriggerTimes.get(current), currentTime)) { - return current; - } - - current = (current + 1) % evaluators.size(); - } while (current != startPos); - - return null; - } - private void init(Collector out, TimerService timerService) throws Exception { if (!inited) { long current = timerService.currentProcessingTime(); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java new file mode 100644 index 000000000000..678ca9196b5a --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * TriggerManagerOperator sends events to the coordinator to acquire a lock, then waits for the + * response. If the response indicates that the lock has been acquired, it fires a trigger; + * otherwise, it schedules the next attempt. When the job recovers from a failure, tasks from + * different execution paths of the previous run may still be running. Therefore, it first needs to + * send a lock with the maximum timestamp, and then send a recovery trigger. Only after the + * downstream removes this lock can we be sure that all tasks have fully stopped. + */ +@Experimental +@Internal +public class TriggerManagerOperator extends AbstractStreamOperator + implements OneInputStreamOperator, + OperatorEventHandler, + ProcessingTimeCallback { + private static final Logger LOG = LoggerFactory.getLogger(TriggerManagerOperator.class); + + private final List maintenanceTaskNames; + private final List evaluators; + private transient Long nextEvaluationTime; + private final long minFireDelayMs; + private final OperatorEventGateway operatorEventGateway; + private transient List accumulatedChanges; + private transient ListState nextEvaluationTimeState; + private transient ListState accumulatedChangesState; + private transient ListState lastTriggerTimesState; + private transient Counter rateLimiterTriggeredCounter; + private transient Counter concurrentRunThrottledCounter; + private transient Counter nothingToTriggerCounter; + private transient List triggerCounters; + private final long lockCheckDelayMs; + private transient List lastTriggerTimes; + // To keep the task scheduling fair we keep the last triggered task position in memory. + // If we find a task to trigger, then we run it, but after it is finished, we start from the given + // position to prevent "starvation" of the tasks. + // When there is nothing to trigger, we start from the beginning, as the order of the tasks might + // be important (RewriteDataFiles first, and then RewriteManifestFiles later) + private transient int startsFrom = 0; + private transient boolean triggered = false; + private final String tableName; + private transient boolean restoreTasks = false; + private transient boolean lockHeld = false; + + public TriggerManagerOperator( + StreamOperatorParameters parameters, + OperatorEventGateway operatorEventGateway, + List maintenanceTaskNames, + List evaluators, + long minFireDelayMs, + long lockCheckDelayMs, + String tableName) { + super(parameters); + Preconditions.checkArgument( + maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(), + "Invalid maintenance task names: null or empty"); + Preconditions.checkArgument( + evaluators != null && !evaluators.isEmpty(), "Invalid evaluators: null or empty"); + Preconditions.checkArgument( + maintenanceTaskNames.size() == evaluators.size(), + "Provide a name and evaluator for all of the maintenance tasks"); + Preconditions.checkArgument(minFireDelayMs > 0, "Minimum fire delay should be at least 1."); + Preconditions.checkArgument( + lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 ms."); + + this.maintenanceTaskNames = maintenanceTaskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + this.tableName = tableName; + this.operatorEventGateway = operatorEventGateway; + } + + @Override + public void open() throws Exception { + super.open(); + MetricGroup mainGroup = TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName); + this.rateLimiterTriggeredCounter = + mainGroup.counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED); + this.concurrentRunThrottledCounter = + mainGroup.counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED); + this.nothingToTriggerCounter = mainGroup.counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER); + this.triggerCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size()); + for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) { + triggerCounters.add( + TableMaintenanceMetrics.groupFor( + mainGroup, maintenanceTaskNames.get(taskIndex), taskIndex) + .counter(TableMaintenanceMetrics.TRIGGERED)); + } + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + this.nextEvaluationTimeState = + context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG)); + + this.accumulatedChangesState = + context + .getOperatorStateStore() + .getListState( + new ListStateDescriptor<>( + "triggerManagerAccumulatedChange", TypeInformation.of(TableChange.class))); + + this.lastTriggerTimesState = + context + .getOperatorStateStore() + .getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG)); + + long current = getProcessingTimeService().getCurrentProcessingTime(); + + // Initialize from state + if (!Iterables.isEmpty(nextEvaluationTimeState.get())) { + nextEvaluationTime = Iterables.getOnlyElement(nextEvaluationTimeState.get()); + } + + this.accumulatedChanges = Lists.newArrayList(accumulatedChangesState.get()); + this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get()); + + // Initialize if the state was empty + if (accumulatedChanges.isEmpty()) { + for (int i = 0; i < evaluators.size(); ++i) { + accumulatedChanges.add(TableChange.empty()); + lastTriggerTimes.add(current); + } + } + + // register the lock register event + operatorEventGateway.sendEventToCoordinator(new LockRegisterEvent(tableName, current)); + + if (context.isRestored()) { + // When the job state is restored, there could be ongoing tasks. + // To prevent collision with the new triggers the following is done: + // - add a recovery lock + // - fire a recovery trigger + // This ensures that the tasks of the previous trigger are executed, and the lock is removed + // in the end. The result of the 'tryLock' is ignored as an already existing lock prevents + // collisions as well. + // register the recover lock + restoreTasks = true; + lockHeld = false; + output.collect(new StreamRecord<>(Trigger.recovery(current), current)); + if (nextEvaluationTime == null) { + schedule(getProcessingTimeService(), current + minFireDelayMs); + } else { + schedule(getProcessingTimeService(), nextEvaluationTime); + } + } else { + restoreTasks = false; + lockHeld = false; + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + nextEvaluationTimeState.clear(); + if (nextEvaluationTime != null) { + nextEvaluationTimeState.add(nextEvaluationTime); + } + + accumulatedChangesState.update(accumulatedChanges); + lastTriggerTimesState.update(lastTriggerTimes); + LOG.info( + "Storing state: nextEvaluationTime {}, accumulatedChanges {}, lastTriggerTimes {}", + nextEvaluationTime, + accumulatedChanges, + lastTriggerTimes); + } + + @Override + public void handleOperatorEvent(OperatorEvent event) { + if (event instanceof LockReleasedEvent) { + LOG.info("Received lock released event: {}", event); + handleLockReleaseResult((LockReleasedEvent) event); + } else { + throw new IllegalArgumentException( + "Invalid operator event type: " + event.getClass().getCanonicalName()); + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + @Override + public void processElement(StreamRecord streamRecord) throws Exception { + TableChange change = streamRecord.getValue(); + accumulatedChanges.forEach(tableChange -> tableChange.merge(change)); + if (nextEvaluationTime == null) { + checkAndFire(getProcessingTimeService()); + } else { + LOG.info( + "Trigger manager rate limiter triggered current: {}, next: {}, accumulated changes: {},{}", + getProcessingTimeService().getCurrentProcessingTime(), + nextEvaluationTime, + accumulatedChanges, + maintenanceTaskNames); + rateLimiterTriggeredCounter.inc(); + } + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + } + + @Override + public void onProcessingTime(long l) { + this.nextEvaluationTime = null; + checkAndFire(getProcessingTimeService()); + } + + @Override + public void close() throws Exception { + super.close(); + this.lockHeld = false; + this.restoreTasks = false; + } + + @VisibleForTesting + void handleLockReleaseResult(LockReleasedEvent event) { + if (event.lockId().equals(tableName)) { + this.lockHeld = false; + this.restoreTasks = false; + } + } + + private void checkAndFire(ProcessingTimeService timerService) { + long current = timerService.getCurrentProcessingTime(); + if (restoreTasks) { + // Recovered tasks in progress. Skip trigger check + LOG.info("The recovery lock is still held at {}", current); + schedule(timerService, current + lockCheckDelayMs); + return; + } + + Integer taskToStart = + TriggerUtil.nextTrigger( + evaluators, accumulatedChanges, lastTriggerTimes, current, startsFrom); + if (taskToStart == null) { + // Nothing to execute + if (!triggered) { + nothingToTriggerCounter.inc(); + LOG.debug("Nothing to execute at {} for collected: {}", current, accumulatedChanges); + } else { + LOG.debug("Execution check finished"); + } + + // Next time start from the beginning + startsFrom = 0; + triggered = false; + return; + } + + if (!lockHeld) { + this.lockHeld = true; + TableChange change = accumulatedChanges.get(taskToStart); + output.collect(new StreamRecord<>(Trigger.create(current, taskToStart), current)); + LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, tableName); + triggerCounters.get(taskToStart).inc(); + accumulatedChanges.set(taskToStart, TableChange.empty()); + lastTriggerTimes.set(taskToStart, current); + schedule(timerService, current + minFireDelayMs); + startsFrom = (taskToStart + 1) % evaluators.size(); + triggered = true; + } else { + // A task is already running, waiting for it to finish + LOG.info("Failed to acquire lock. Delaying task to {}", current + lockCheckDelayMs); + + startsFrom = taskToStart; + concurrentRunThrottledCounter.inc(); + schedule(timerService, current + lockCheckDelayMs); + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + private void schedule(ProcessingTimeService timerService, long time) { + this.nextEvaluationTime = time; + timerService.registerTimer(time, this); + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java new file mode 100644 index 000000000000..2dcfa83d5b8c --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; +import org.apache.iceberg.flink.maintenance.api.Trigger; + +@Internal +public class TriggerManagerOperatorFactory extends AbstractStreamOperatorFactory + implements CoordinatedOperatorFactory, + OneInputStreamOperatorFactory { + + private final String lockId; + private final List maintenanceTaskNames; + private final List evaluators; + private final long minFireDelayMs; + private final long lockCheckDelayMs; + + public TriggerManagerOperatorFactory( + String lockId, + List maintenanceTaskNames, + List evaluators, + long minFireDelayMs, + long lockCheckDelayMs) { + this.lockId = lockId; + this.maintenanceTaskNames = maintenanceTaskNames; + this.evaluators = evaluators; + this.minFireDelayMs = minFireDelayMs; + this.lockCheckDelayMs = lockCheckDelayMs; + } + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider( + String operatorName, OperatorID operatorID) { + return new TableMaintenanceCoordinatorProvider(operatorName, operatorID); + } + + @SuppressWarnings("unchecked") + @Override + public > T createStreamOperator( + StreamOperatorParameters parameters) { + OperatorID operatorId = parameters.getStreamConfig().getOperatorID(); + OperatorEventGateway gateway = + parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId); + + TriggerManagerOperator triggerManagerOperator = + new TriggerManagerOperator( + parameters, + gateway, + maintenanceTaskNames, + evaluators, + minFireDelayMs, + lockCheckDelayMs, + lockId); + + parameters + .getOperatorEventDispatcher() + .registerEventHandler(operatorId, triggerManagerOperator); + + return (T) triggerManagerOperator; + } + + @SuppressWarnings("rawtypes") + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return TriggerManagerOperator.class; + } +} diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java new file mode 100644 index 000000000000..bcfee1423482 --- /dev/null +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import java.util.List; + +public class TriggerUtil { + + private TriggerUtil() {} + + public static Integer nextTrigger( + List evaluators, + List changes, + List lastTriggerTimes, + long currentTime, + int startPos) { + int current = startPos; + do { + if (evaluators + .get(current) + .check(changes.get(current), lastTriggerTimes.get(current), currentTime)) { + return current; + } + + current = (current + 1) % evaluators.size(); + } while (current != startPos); + + return null; + } +} diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java index 0a860fec4799..fe8457167a1f 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java @@ -77,4 +77,43 @@ void testE2e() throws Exception { closeJobClient(jobClient); } } + + @Test + void testE2eUseCoordinator() throws Exception { + TableMaintenance.forTable(env, tableLoader()) + .uidSuffix("E2eTestUID") + .rateLimit(Duration.ofMinutes(10)) + .lockCheckDelay(Duration.ofSeconds(10)) + .add( + ExpireSnapshots.builder() + .scheduleOnCommitCount(10) + .maxSnapshotAge(Duration.ofMinutes(10)) + .retainLast(5) + .deleteBatchSize(5) + .parallelism(8)) + .add( + RewriteDataFiles.builder() + .scheduleOnDataFileCount(10) + .partialProgressEnabled(true) + .partialProgressMaxCommits(10) + .maxRewriteBytes(1000L) + .targetFileSizeBytes(1000L) + .minFileSizeBytes(1000L) + .maxFileSizeBytes(1000L) + .minInputFiles(10) + .deleteFileThreshold(10) + .rewriteAll(false) + .maxFileGroupSizeBytes(1000L)) + .append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Just make sure that we are able to instantiate the flow + assertThat(jobClient).isNotNull(); + } finally { + closeJobClient(jobClient); + } + } } diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java new file mode 100644 index 000000000000..0280872f2aca --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.api; + +import static org.apache.iceberg.flink.SimpleDataUtil.createRowData; +import static org.apache.iceberg.flink.maintenance.api.TableMaintenance.SOURCE_OPERATOR_NAME_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.flink.maintenance.operator.ManualSource; +import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase; +import org.apache.iceberg.flink.maintenance.operator.TableChange; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +class TestTableMaintenanceCoordinationLock extends OperatorTestBase { + private static final String MAINTENANCE_TASK_NAME = "TestTableMaintenance"; + private static final String[] TASKS = + new String[] {MAINTENANCE_TASK_NAME + " [0]", MAINTENANCE_TASK_NAME + " [1]"}; + private static final TableChange DUMMY_CHANGE = TableChange.builder().commitCount(1).build(); + private static final List PROCESSED = + Collections.synchronizedList(Lists.newArrayListWithCapacity(1)); + + private StreamExecutionEnvironment env; + private Table table; + + @TempDir private File checkpointDir; + + @BeforeEach + public void beforeEach() throws IOException { + Configuration config = new Configuration(); + config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem"); + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + checkpointDir.getPath()); + this.env = StreamExecutionEnvironment.getExecutionEnvironment(config); + this.table = createTable(); + insert(table, 1, "a"); + + PROCESSED.clear(); + MaintenanceTaskBuilderForTest.counter = 0; + } + + @Test + void testForChangeStream() throws Exception { + ManualSource schedulerSource = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + + TableMaintenance.Builder streamBuilder = + TableMaintenance.forChangeStream(schedulerSource.dataStream(), tableLoader()) + .rateLimit(Duration.ofMillis(2)) + .lockCheckDelay(Duration.ofSeconds(3)) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .scheduleOnDataFileCount(2) + .scheduleOnDataFileSize(3L) + .scheduleOnEqDeleteFileCount(4) + .scheduleOnEqDeleteRecordCount(5L) + .scheduleOnPosDeleteFileCount(6) + .scheduleOnPosDeleteRecordCount(7L) + .scheduleOnInterval(Duration.ofHours(1))); + + sendEvents(schedulerSource, streamBuilder, ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1))); + } + + @Test + void testForTable() throws Exception { + TableLoader tableLoader = tableLoader(); + + env.enableCheckpointing(10); + + TableMaintenance.forTable(env, tableLoader) + .rateLimit(Duration.ofMillis(2)) + .maxReadBack(2) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(2)) + .append(); + + // Creating a stream for inserting data into the table concurrently + ManualSource insertSource = + new ManualSource<>(env, InternalTypeInfo.of(FlinkSchemaUtil.convert(table.schema()))); + FlinkSink.forRowData(insertSource.dataStream()) + .tableLoader(tableLoader) + .uidPrefix(UID_SUFFIX + "-iceberg-sink") + .append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + insertSource.sendRecord(createRowData(2, "b")); + + Awaitility.await().until(() -> PROCESSED.size() == 1); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testUidAndSlotSharingGroup() throws IOException { + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader()) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP)) + .append(); + + checkUidsAreSet(env, UID_SUFFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupUnset() throws IOException { + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader()) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .append(); + + checkUidsAreSet(env, null); + checkSlotSharingGroupsAreSet(env, null); + } + + @Test + void testUidAndSlotSharingGroupInherit() throws IOException { + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader()) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) + .append(); + + checkUidsAreSet(env, UID_SUFFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + @Test + void testUidAndSlotSharingGroupOverWrite() throws IOException { + String anotherUid = "Another-UID"; + String anotherSlotSharingGroup = "Another-SlotSharingGroup"; + TableMaintenance.forChangeStream( + new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), + tableLoader()) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidSuffix(anotherUid) + .slotSharingGroup(anotherSlotSharingGroup)) + .append(); + + // Choose an operator from the scheduler part of the graph + Transformation schedulerTransformation = + env.getTransformations().stream() + .filter(t -> t.getName().equals("Trigger manager")) + .findFirst() + .orElseThrow(); + assertThat(schedulerTransformation.getUid()).contains(UID_SUFFIX); + assertThat(schedulerTransformation.getSlotSharingGroup()).isPresent(); + assertThat(schedulerTransformation.getSlotSharingGroup().get().getName()) + .isEqualTo(SLOT_SHARING_GROUP); + + // Choose an operator from the maintenance task part of the graph + Transformation scheduledTransformation = + env.getTransformations().stream() + .filter(t -> t.getName().startsWith(MAINTENANCE_TASK_NAME)) + .findFirst() + .orElseThrow(); + assertThat(scheduledTransformation.getUid()).contains(anotherUid); + assertThat(scheduledTransformation.getSlotSharingGroup()).isPresent(); + assertThat(scheduledTransformation.getSlotSharingGroup().get().getName()) + .isEqualTo(anotherSlotSharingGroup); + } + + @Test + void testUidAndSlotSharingGroupForMonitorSource() throws IOException { + TableMaintenance.forTable(env, tableLoader()) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP) + .add( + new MaintenanceTaskBuilderForTest(true) + .scheduleOnCommitCount(1) + .uidSuffix(UID_SUFFIX) + .slotSharingGroup(SLOT_SHARING_GROUP)) + .append(); + + Transformation source = monitorSource(); + assertThat(source).isNotNull(); + assertThat(source.getUid()).contains(UID_SUFFIX); + assertThat(source.getSlotSharingGroup()).isPresent(); + assertThat(source.getSlotSharingGroup().get().getName()).isEqualTo(SLOT_SHARING_GROUP); + + checkUidsAreSet(env, UID_SUFFIX); + checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); + } + + /** + * Sends the events though the {@link ManualSource} provided, and waits until the given number of + * records are processed. + * + * @param schedulerSource used for sending the events + * @param streamBuilder used for generating the job + * @param eventsAndResultNumbers the pair of the event and the expected processed records + * @throws Exception if any + */ + private void sendEvents( + ManualSource schedulerSource, + TableMaintenance.Builder streamBuilder, + List> eventsAndResultNumbers) + throws Exception { + streamBuilder.append(); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + eventsAndResultNumbers.forEach( + eventsAndResultNumber -> { + int expectedSize = PROCESSED.size() + eventsAndResultNumber.f1; + schedulerSource.sendRecord(eventsAndResultNumber.f0); + Awaitility.await().until(() -> PROCESSED.size() == expectedSize); + }); + } finally { + closeJobClient(jobClient); + } + } + + /** + * Finds the {@link org.apache.iceberg.flink.maintenance.operator.MonitorSource} for testing + * purposes by parsing the transformation tree. + * + * @return The monitor source if we found it + */ + private Transformation monitorSource() { + assertThat(env.getTransformations()).isNotEmpty(); + assertThat(env.getTransformations().get(0).getInputs()).isNotEmpty(); + assertThat(env.getTransformations().get(0).getInputs().get(0).getInputs()).isNotEmpty(); + + Transformation result = + env.getTransformations().get(0).getInputs().get(0).getInputs().get(0); + + // Some checks to make sure this is the transformation we are looking for + assertThat(result).isInstanceOf(SourceTransformation.class); + assertThat(result.getName()).startsWith(SOURCE_OPERATOR_NAME_PREFIX); + + return result; + } + + private static class MaintenanceTaskBuilderForTest + extends MaintenanceTaskBuilder { + private final boolean success; + private final int id; + private static int counter = 0; + + MaintenanceTaskBuilderForTest(boolean success) { + this.success = success; + this.id = counter; + ++counter; + } + + @Override + String maintenanceTaskName() { + return MAINTENANCE_TASK_NAME; + } + + @Override + DataStream append(DataStream trigger) { + String name = TASKS[id]; + return trigger + .map(new DummyMaintenanceTask(success)) + .name(name) + .uid(uidSuffix() + "-test-mapper-" + name + "-" + id) + .slotSharingGroup(slotSharingGroup()) + .forceNonParallel(); + } + } + + private static class DummyMaintenanceTask + implements MapFunction, ResultTypeQueryable, Serializable { + private final boolean success; + + private DummyMaintenanceTask(boolean success) { + this.success = success; + } + + @Override + public TaskResult map(Trigger trigger) { + // Ensure that the lock is held when processing + // assertThat(LOCK_FACTORY.createLock().isHeld()).isTrue(); + PROCESSED.add(trigger); + + return new TaskResult( + trigger.taskId(), + trigger.timestamp(), + success, + success ? Collections.emptyList() : Lists.newArrayList(new Exception("Testing error"))); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(TaskResult.class); + } + } +} diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java new file mode 100644 index 000000000000..e9f2c0083a68 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.LAST_RUN_DURATION_MS; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.flink.maintenance.api.TaskResult; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 10) +class TestLockRemoverOperation extends OperatorTestBase { + private static final String[] TASKS = new String[] {"task0", "task1", "task2"}; + private static final String OPERATOR_NAME = "TestCoordinator"; + private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L); + + private TableMaintenanceCoordinator tableMaintenanceCoordinator; + + @BeforeEach + public void before() { + MetricsReporterFactoryForTests.reset(); + this.tableMaintenanceCoordinator = createCoordinator(); + try { + tableMaintenanceCoordinator.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterEach + void after() throws IOException { + super.after(); + try { + tableMaintenanceCoordinator.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + void testProcess() throws Exception { + MockOperatorEventGateway mockGateway = new MockOperatorEventGateway(); + LockRemoverOperator operator = + new LockRemoverOperator(null, mockGateway, DUMMY_TASK_NAME, Lists.newArrayList(TASKS[0])); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + + operator.processElement( + new StreamRecord<>(new TaskResult(0, 0L, true, Lists.newArrayList()))); + assertThat(mockGateway.getEventsSent()).hasSize(1); + + operator.processWatermark(WATERMARK); + assertThat(mockGateway.getEventsSent()).hasSize(2); + } + } + + @Test + void testMetrics() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource source = new ManualSource<>(env, TypeInformation.of(TaskResult.class)); + source + .dataStream() + .transform( + DUMMY_TASK_NAME, + TypeInformation.of(Void.class), + new LockRemoverOperatorFactory(DUMMY_TABLE_NAME, Lists.newArrayList(TASKS))) + .forceNonParallel(); + + JobClient jobClient = null; + long time = System.currentTimeMillis(); + try { + jobClient = env.executeAsync(); + // Start the 2 successful and one failed result trigger for task1, and 3 successful for task2 + processAndCheck(source, new TaskResult(0, time, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, false, Lists.newArrayList())); + processAndCheck(source, new TaskResult(0, time, true, Lists.newArrayList())); + processAndCheck(source, new TaskResult(1, 0L, true, Lists.newArrayList())); + + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + ImmutableList.of( + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[1], + "1", + SUCCEEDED_TASK_COUNTER)) + .equals(3L)); + + // Final check all the counters + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder, Long>() + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", SUCCEEDED_TASK_COUNTER), + 2L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", FAILED_TASK_COUNTER), + 1L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", SUCCEEDED_TASK_COUNTER), + 3L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", FAILED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", SUCCEEDED_TASK_COUNTER), + 0L) + .put( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", FAILED_TASK_COUNTER), + 0L) + .build()); + + assertThat( + MetricsReporterFactoryForTests.gauge( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", LAST_RUN_DURATION_MS))) + .isPositive(); + assertThat( + MetricsReporterFactoryForTests.gauge( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", LAST_RUN_DURATION_MS))) + .isGreaterThan(time); + assertThat( + MetricsReporterFactoryForTests.gauge( + ImmutableList.of( + DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", LAST_RUN_DURATION_MS))) + .isZero(); + } finally { + closeJobClient(jobClient); + } + } + + private void processAndCheck(ManualSource source, TaskResult input) { + processAndCheck(source, input, null); + } + + private void processAndCheck( + ManualSource source, TaskResult input, String counterPrefix) { + List counterKey = + ImmutableList.of( + (counterPrefix != null ? counterPrefix : "") + DUMMY_TASK_NAME, + DUMMY_TABLE_NAME, + TASKS[input.taskIndex()], + String.valueOf(input.taskIndex()), + input.success() ? SUCCEEDED_TASK_COUNTER : FAILED_TASK_COUNTER); + Long counterValue = MetricsReporterFactoryForTests.counter(counterKey); + Long expected = counterValue != null ? counterValue + 1 : 1L; + + source.sendRecord(input); + source.sendWatermark(input.startEpoch()); + + Awaitility.await() + .until(() -> expected.equals(MetricsReporterFactoryForTests.counter(counterKey))); + } + + private OneInputStreamOperatorTestHarness createHarness( + LockRemoverOperator lockRemoverOperator) throws Exception { + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness<>(lockRemoverOperator); + harness.open(); + return harness; + } + + private static TableMaintenanceCoordinator createCoordinator() { + return new TableMaintenanceCoordinator( + OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 1)); + } +} diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTableMaintenanceCoordinator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTableMaintenanceCoordinator.java new file mode 100644 index 000000000000..3e1b6a84e639 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTableMaintenanceCoordinator.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.EventReceivingTasks; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.util.ExceptionUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 10) +class TestTableMaintenanceCoordinator extends OperatorTestBase { + private static final String OPERATOR_NAME = "TestCoordinator"; + private static final String OPERATOR_NAME_1 = "TestCoordinator_1"; + private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L); + private static final OperatorID TEST_OPERATOR_ID_1 = new OperatorID(1235L, 5679L); + private static final int NUM_SUBTASKS = 1; + private static final LockRegisterEvent LOCK_REGISTER_EVENT = + new LockRegisterEvent(DUMMY_TABLE_NAME, 1L); + private static final LockRegisterEvent RECOVER_LOCK_ACQUIRED_EVENT = + new LockRegisterEvent(DUMMY_TABLE_NAME, Long.MAX_VALUE); + private static final LockReleasedEvent LOCK_RELEASE_EVENT = + new LockReleasedEvent(DUMMY_TABLE_NAME, 1L); + + private EventReceivingTasks receivingTasks; + private EventReceivingTasks receivingTasks1; + + @BeforeEach + public void before() { + this.receivingTasks = EventReceivingTasks.createForRunningTasks(); + this.receivingTasks1 = EventReceivingTasks.createForRunningTasks(); + } + + private void tasksReady(TableMaintenanceCoordinator coordinator) { + setAllTasksReady(coordinator, receivingTasks); + } + + @Test + public void testThrowExceptionWhenNotStarted() throws Exception { + try (TableMaintenanceCoordinator tableMaintenanceCoordinator = + createCoordinator(OPERATOR_NAME, TEST_OPERATOR_ID)) { + String failureMessage = "The coordinator has not started yet."; + assertThatThrownBy( + () -> tableMaintenanceCoordinator.handleEventFromOperator(0, 0, LOCK_REGISTER_EVENT)) + .isInstanceOf(IllegalStateException.class) + .hasMessage(failureMessage); + assertThatThrownBy(() -> tableMaintenanceCoordinator.executionAttemptFailed(0, 0, null)) + .isInstanceOf(IllegalStateException.class) + .hasMessage(failureMessage); + assertThatThrownBy(() -> tableMaintenanceCoordinator.checkpointCoordinator(0, null)) + .isInstanceOf(IllegalStateException.class) + .hasMessage(failureMessage); + } + } + + @Test + public void testEventHandling() throws Exception { + try (TableMaintenanceCoordinator tableMaintenanceCoordinator = + createCoordinator(OPERATOR_NAME, TEST_OPERATOR_ID); + TableMaintenanceCoordinator tableMaintenanceCoordinator1 = + createCoordinator(OPERATOR_NAME_1, TEST_OPERATOR_ID_1)) { + + tableMaintenanceCoordinator.start(); + tableMaintenanceCoordinator1.start(); + + tasksReady(tableMaintenanceCoordinator); + tasksReady(tableMaintenanceCoordinator1); + + tableMaintenanceCoordinator.handleEventFromOperator(0, 0, LOCK_REGISTER_EVENT); + waitForCoordinatorToProcessActions(tableMaintenanceCoordinator); + assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(0); + + // release lock from coordinator1 and get one event from coordinator + tableMaintenanceCoordinator1.handleEventFromOperator(0, 0, LOCK_RELEASE_EVENT); + waitForCoordinatorToProcessActions(tableMaintenanceCoordinator1); + assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(1); + assertThat(receivingTasks1.getSentEventsForSubtask(0).size()).isEqualTo(0); + } + } + + private static TableMaintenanceCoordinator createCoordinator( + String operatorName, OperatorID operatorID) { + return new TableMaintenanceCoordinator( + operatorName, new MockOperatorCoordinatorContext(operatorID, 1)); + } + + private static void setAllTasksReady( + TableMaintenanceCoordinator tableMaintenanceCoordinator, EventReceivingTasks receivingTasks) { + for (int i = 0; i < NUM_SUBTASKS; i++) { + tableMaintenanceCoordinator.executionAttemptReady( + i, 0, receivingTasks.createGatewayForSubtask(i, 0)); + } + } + + private static void waitForCoordinatorToProcessActions(TableMaintenanceCoordinator coordinator) { + CompletableFuture future = new CompletableFuture<>(); + coordinator.callInCoordinatorThread( + () -> { + future.complete(null); + return null; + }, + "Coordinator fails to process action"); + + try { + future.get(); + } catch (InterruptedException e) { + throw new AssertionError("test interrupted"); + } catch (ExecutionException e) { + ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e)); + } + } +} diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java new file mode 100644 index 000000000000..2eefc965fd49 --- /dev/null +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java @@ -0,0 +1,684 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.maintenance.operator; + +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED; +import static org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; +import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.iceberg.Table; +import org.apache.iceberg.flink.maintenance.api.Trigger; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestTriggerManagerOperator extends OperatorTestBase { + private static final long DELAY = 10L; + private static final String OPERATOR_NAME = "TestCoordinator"; + private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 5678L); + private static final String[] TASKS = new String[] {"task0", "task1"}; + private long processingTime = 0L; + private String tableName; + private TableMaintenanceCoordinator tableMaintenanceCoordinator; + + @BeforeEach + void before() { + super.before(); + Table table = createTable(); + this.tableName = table.name(); + this.tableMaintenanceCoordinator = createCoordinator(); + try { + tableMaintenanceCoordinator.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterEach + void after() throws IOException { + super.after(); + try { + tableMaintenanceCoordinator.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + void testCommitCount() throws Exception { + MockOperatorEventGateway mockGateway = new MockOperatorEventGateway(); + TriggerManagerOperator operator = + createOperator(new TriggerEvaluator.Builder().commitCount(3).build(), mockGateway); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(1).build(), 0); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(2).build(), 1); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(3).build(), 2); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(1).build(), 3); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(1).build(), 3); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(1).build(), 4); + } + } + + @Test + void testDataFileCount() throws Exception { + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().dataFileCount(3).build(), + new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileCount(1).build(), 0); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileCount(2).build(), 1); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileCount(3).build(), 2); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileCount(5).build(), 3); + + // No trigger in this case + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileCount(1).build(), 3); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileCount(2).build(), 4); + } + } + + @Test + void testDataFileSizeInBytes() throws Exception { + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().dataFileSizeInBytes(3).build(), + new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileSizeInBytes(1L).build(), 0); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileSizeInBytes(2L).build(), 1); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileSizeInBytes(5L).build(), 2); + + // No trigger in this case + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileSizeInBytes(1L).build(), 2); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().dataFileSizeInBytes(2L).build(), 3); + } + } + + @Test + void testPosDeleteFileCount() throws Exception { + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().posDeleteFileCount(3).build(), + new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteFileCount(1).build(), 0); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteFileCount(2).build(), 1); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteFileCount(3).build(), 2); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteFileCount(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteFileCount(1).build(), 3); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteFileCount(1).build(), 3); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteFileCount(1).build(), 4); + } + } + + @Test + void testPosDeleteRecordCount() throws Exception { + + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().posDeleteRecordCount(3).build(), + new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteRecordCount(1L).build(), 0); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteRecordCount(2L).build(), 1); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteRecordCount(5L).build(), 2); + + // No trigger in this case + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteRecordCount(1L).build(), 2); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().posDeleteRecordCount(2L).build(), 3); + } + } + + @Test + void testEqDeleteFileCount() throws Exception { + + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().eqDeleteFileCount(3).build(), + new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteFileCount(1).build(), 0); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteFileCount(2).build(), 1); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteFileCount(3).build(), 2); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteFileCount(10).build(), 3); + + // No trigger in this case + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteFileCount(1).build(), 3); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteFileCount(1).build(), 3); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteFileCount(1).build(), 4); + } + } + + @Test + void testEqDeleteRecordCount() throws Exception { + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().eqDeleteRecordCount(3).build(), + new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteRecordCount(1L).build(), 0); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteRecordCount(2L).build(), 1); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteRecordCount(5L).build(), 2); + + // No trigger in this case + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteRecordCount(1L).build(), 2); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().eqDeleteRecordCount(2L).build(), 3); + } + } + + @Test + void testTimeout() throws Exception { + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build(), + new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + TableChange event = TableChange.builder().dataFileCount(1).commitCount(1).build(); + + // Wait for some time + testHarness.processElement(event, EVENT_TIME); + assertThat(testHarness.extractOutputValues()).isEmpty(); + + // Wait for the timeout to expire + long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Remove the lock to allow the next trigger + operator.handleLockReleaseResult(new LockReleasedEvent(tableName, 1L)); + + // Send a new event + testHarness.setProcessingTime(newTime + 1); + testHarness.processElement(event, newTime); + + // No trigger yet + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Send a new event + newTime += Duration.ofSeconds(1).toMillis(); + testHarness.setProcessingTime(newTime); + testHarness.processElement(event, newTime); + + // New trigger should arrive + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testStateRestore() throws Exception { + OperatorSubtaskState state; + TriggerManagerOperator operator = + createOperator( + new TriggerEvaluator.Builder().commitCount(2).build(), new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + testHarness.processElement( + TableChange.builder().dataFileCount(1).commitCount(1).build(), EVENT_TIME); + + assertThat(testHarness.extractOutputValues()).isEmpty(); + + state = testHarness.snapshot(1, EVENT_TIME); + } + + // Restore the state, write some more data, create a checkpoint, check the data which is written + TriggerManagerOperator newOperator = + createOperator( + new TriggerEvaluator.Builder().commitCount(2).build(), new MockOperatorEventGateway()); + try (OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>(newOperator)) { + testHarness.initializeState(state); + testHarness.open(); + + // Mock a recovery trigger lock + assertTriggers( + testHarness.extractOutputValues(), + Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime()))); + + testHarness.processElement(TableChange.builder().commitCount(1).build(), EVENT_TIME_2); + + // Remove the lock to allow the next trigger + newOperator.handleOperatorEvent(new LockReleasedEvent(tableName, 1L)); + testHarness.setProcessingTime(EVENT_TIME_2); + + // At this point the output contains the recovery trigger and the real trigger + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testMinFireDelay() throws Exception { + TriggerManagerOperator operator = + createOperator( + tableName, + new TriggerEvaluator.Builder().commitCount(2).build(), + new MockOperatorEventGateway(), + DELAY, + 1); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + testHarness.open(); + + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(2).build(), 1); + long currentTime = testHarness.getProcessingTime(); + + // No new fire yet + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(2).build(), 1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testLockCheckDelay() throws Exception { + TriggerManagerOperator operator = + createOperator( + tableName, + new TriggerEvaluator.Builder().commitCount(2).build(), + new MockOperatorEventGateway(), + 1, + DELAY); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + testHarness.open(); + + // Create a lock to prevent execution, and check that there is no result + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(2).build(), 1, false); + addEventAndCheckResult( + operator, testHarness, TableChange.builder().commitCount(2).build(), 1); + long currentTime = testHarness.getProcessingTime(); + + // Remove the lock, and still no trigger + operator.handleOperatorEvent(new LockReleasedEvent(tableName, 1L)); + assertThat(testHarness.extractOutputValues()).hasSize(1); + + // Check that the trigger fired after the delay + testHarness.setProcessingTime(currentTime + DELAY); + assertThat(testHarness.extractOutputValues()).hasSize(2); + } + } + + @Test + void testTriggerMetrics() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink sink = new CollectingSink<>(); + + TriggerManagerOperatorFactory triggerManagerOperatorFactory = + new TriggerManagerOperatorFactory( + tableName, + Lists.newArrayList(TASKS), + Lists.newArrayList( + new TriggerEvaluator.Builder().commitCount(2).build(), + new TriggerEvaluator.Builder().commitCount(4).build()), + 1L, + 1L); + source + .dataStream() + .keyBy(unused -> true) + .transform( + DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), triggerManagerOperatorFactory) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // This one doesn't trigger - tests NOTHING_TO_TRIGGER + source.sendRecord(TableChange.builder().commitCount(1).build()); + + Awaitility.await() + .until( + () -> { + Long notingCounter = + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER)); + return notingCounter != null && notingCounter.equals(1L); + }); + + // Trigger one of the tasks - tests TRIGGERED + source.sendRecord(TableChange.builder().commitCount(1).build()); + // Wait until we receive the trigger + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + assertThat( + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED))) + .isEqualTo(1L); + + // manual unlock + tableMaintenanceCoordinator.handleEventFromOperator( + 0, 0, new LockReleasedEvent(tableName, 1L)); + // Trigger both of the tasks - tests TRIGGERED + source.sendRecord(TableChange.builder().commitCount(2).build()); + // Wait until we receive the trigger + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + + // manual unlock + tableMaintenanceCoordinator.handleEventFromOperator( + 0, 0, new LockReleasedEvent(tableName, 1L)); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + + // manual unlock + tableMaintenanceCoordinator.handleEventFromOperator( + 0, 0, new LockReleasedEvent(tableName, 1L)); + assertThat( + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED))) + .isEqualTo(2L); + assertThat( + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", TRIGGERED))) + .isEqualTo(1L); + + // Final check all the counters + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder, Long>() + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED), -1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED), -1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED), 2L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", TRIGGERED), 1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER), 1L) + .build()); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testRateLimiterMetrics() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink sink = new CollectingSink<>(); + + // High delay, so only triggered once + TriggerManagerOperatorFactory manager = manager(1_000_000L, 1L); + + source + .dataStream() + .keyBy(unused -> true) + .transform(DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), manager) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Start the first trigger + source.sendRecord(TableChange.builder().commitCount(2).build()); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + + // Remove the lock to allow the next trigger + tableMaintenanceCoordinator.handleEventFromOperator( + 0, 0, new LockReleasedEvent(tableName, 1L)); + + // The second trigger will be blocked + source.sendRecord(TableChange.builder().commitCount(2).build()); + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED)) + .equals(1L)); + + // Final check all the counters + assertCounters(1L, 0L); + } finally { + closeJobClient(jobClient); + } + } + + @Test + void testConcurrentRunMetrics() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + ManualSource source = + new ManualSource<>(env, TypeInformation.of(TableChange.class)); + CollectingSink sink = new CollectingSink<>(); + + // High delay, so only triggered once + TriggerManagerOperatorFactory manager = manager(1L, 1_000_000L); + + source + .dataStream() + .keyBy(unused -> true) + .transform(DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), manager) + .forceNonParallel() + .sinkTo(sink); + + JobClient jobClient = null; + try { + jobClient = env.executeAsync(); + + // Start the first trigger - notice that we do not remove the lock after the trigger + source.sendRecord(TableChange.builder().commitCount(2).build()); + assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull(); + + // The second trigger will be blocked by the lock + source.sendRecord(TableChange.builder().commitCount(2).build()); + Awaitility.await() + .until( + () -> + MetricsReporterFactoryForTests.counter( + ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED)) + .equals(1L)); + + // Final check all the counters + assertCounters(0L, 1L); + } finally { + closeJobClient(jobClient); + } + } + + private void assertCounters(long rateLimiterTrigger, long concurrentRunTrigger) { + MetricsReporterFactoryForTests.assertCounters( + new ImmutableMap.Builder, Long>() + .put( + ImmutableList.of(DUMMY_TASK_NAME, tableName, RATE_LIMITER_TRIGGERED), + rateLimiterTrigger) + .put( + ImmutableList.of(DUMMY_TASK_NAME, tableName, CONCURRENT_RUN_THROTTLED), + concurrentRunTrigger) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", TRIGGERED), 1L) + .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, NOTHING_TO_TRIGGER), 0L) + .build()); + } + + private KeyedOneInputStreamOperatorTestHarness harness( + TriggerManager manager) throws Exception { + return new KeyedOneInputStreamOperatorTestHarness<>( + new KeyedProcessOperator<>(manager), value -> true, Types.BOOLEAN); + } + + private void addEventAndCheckResult( + TriggerManagerOperator operator, + OneInputStreamOperatorTestHarness testHarness, + TableChange event, + int expectedSize) + throws Exception { + addEventAndCheckResult(operator, testHarness, event, expectedSize, true); + } + + private void addEventAndCheckResult( + TriggerManagerOperator operator, + OneInputStreamOperatorTestHarness testHarness, + TableChange event, + int expectedSize, + boolean removeLock) + throws Exception { + ++processingTime; + testHarness.setProcessingTime(processingTime); + testHarness.processElement(event, processingTime); + assertThat(testHarness.extractOutputValues()).hasSize(expectedSize); + if (removeLock) { + // Remove the lock to allow the next trigger + operator.handleLockReleaseResult(new LockReleasedEvent(tableName, 1L)); + } + } + + private TriggerManagerOperatorFactory manager(long minFireDelayMs, long lockCheckDelayMs) { + return new TriggerManagerOperatorFactory( + tableName, + Lists.newArrayList(TASKS[0]), + Lists.newArrayList(new TriggerEvaluator.Builder().commitCount(2).build()), + minFireDelayMs, + lockCheckDelayMs); + } + + private static void assertTriggers(List expected, List actual) { + assertThat(actual).hasSize(expected.size()); + for (int i = 0; i < expected.size(); ++i) { + Trigger expectedTrigger = expected.get(i); + Trigger actualTrigger = actual.get(i); + assertThat(actualTrigger.timestamp()).isEqualTo(expectedTrigger.timestamp()); + assertThat(actualTrigger.taskId()).isEqualTo(expectedTrigger.taskId()); + assertThat(actualTrigger.isRecovery()).isEqualTo(expectedTrigger.isRecovery()); + } + } + + private static TableMaintenanceCoordinator createCoordinator() { + return new TableMaintenanceCoordinator( + OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 2)); + } + + private TriggerManagerOperator createOperator( + TriggerEvaluator evaluator, OperatorEventGateway mockGateway) { + return createOperator(tableName, evaluator, mockGateway, 1, 1); + } + + private TriggerManagerOperator createOperator( + String lockId, + TriggerEvaluator evaluator, + OperatorEventGateway mockGateway, + long minFireDelayMs, + long lockCheckDelayMs) { + TriggerManagerOperator operator = + new TriggerManagerOperator( + null, + mockGateway, + Lists.newArrayList(TASKS[0]), + Lists.newArrayList(evaluator), + minFireDelayMs, + lockCheckDelayMs, + lockId); + return operator; + } + + private OneInputStreamOperatorTestHarness createHarness( + TriggerManagerOperator triggerManagerOperator) throws Exception { + OneInputStreamOperatorTestHarness harness = + new OneInputStreamOperatorTestHarness<>(triggerManagerOperator); + harness.open(); + return harness; + } +}