Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
Expand All @@ -43,10 +44,12 @@
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.LockRemover;
import org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory;
import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
import org.apache.iceberg.flink.maintenance.operator.TriggerManagerOperatorFactory;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -71,39 +74,77 @@ private TableMaintenance() {}
*
* @param changeStream the table changes
* @param tableLoader used for accessing the table
* @param lockFactory used for preventing concurrent task runs
* @param lockFactory used for preventing concurrent task runs, if null, use coordination lock.
* @return builder for the maintenance stream
* @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link #forChangeStream(DataStream,
* TableLoader)} instead.
*/
@Deprecated
@Internal
public static Builder forChangeStream(
DataStream<TableChange> changeStream,
TableLoader tableLoader,
TriggerLockFactory lockFactory) {
@Nullable TriggerLockFactory lockFactory) {
Preconditions.checkNotNull(changeStream, "The change stream should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");

return new Builder(null, changeStream, tableLoader, lockFactory);
}

/**
* Use when the change stream is already provided, like in the {@link
* IcebergSink#addPostCommitTopology(DataStream)}.
*
* @param changeStream the table changes
* @param tableLoader used for accessing the table
* @return builder for the maintenance stream
*/
@Internal
public static Builder forChangeStream(
DataStream<TableChange> changeStream, TableLoader tableLoader) {
Preconditions.checkNotNull(changeStream, "The change stream should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");

return new Builder(null, changeStream, tableLoader, null);
}

/**
* Use this for standalone maintenance job. It creates a monitor source that detect table changes
* and build the maintenance pipelines afterwards.
*
* @param env used to register the monitor source
* @param tableLoader used for accessing the table
* @param lockFactory used for preventing concurrent task runs
* @param lockFactory used for preventing concurrent task runs. If null, use coordination lock.
* @return builder for the maintenance stream
* @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link
* #forTable(StreamExecutionEnvironment, TableLoader)} instead.
*/
@Deprecated
public static Builder forTable(
StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) {
StreamExecutionEnvironment env,
TableLoader tableLoader,
@Nullable TriggerLockFactory lockFactory) {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");

return new Builder(env, null, tableLoader, lockFactory);
}

/**
* Use this for standalone maintenance job. It creates a monitor source that detect table changes
* and build the maintenance pipelines afterwards. But use coordination lock default.
*
* @param env used to register the monitor source
* @param tableLoader used for accessing the table
* @return builder for the maintenance stream
*/
public static Builder forTable(StreamExecutionEnvironment env, TableLoader tableLoader) {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");

return new Builder(env, null, tableLoader, null);
}

public static class Builder {
private final StreamExecutionEnvironment env;
private final DataStream<TableChange> inputStream;
Expand Down Expand Up @@ -226,21 +267,43 @@ public void append() throws IOException {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
String tableName = loader.loadTable().name();
DataStream<Trigger> triggers =
DataStreamUtils.reinterpretAsKeyedStream(
changeStream(tableName, loader), unused -> true)
.process(
new TriggerManager(
loader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel()
DataStream<Trigger> triggers;
if (lockFactory == null) {
triggers =
DataStreamUtils.reinterpretAsKeyedStream(
changeStream(tableName, loader), unused -> true)
.transform(
TRIGGER_MANAGER_OPERATOR_NAME,
TypeInformation.of(Trigger.class),
new TriggerManagerOperatorFactory(
tableName,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();
} else {
triggers =
DataStreamUtils.reinterpretAsKeyedStream(
changeStream(tableName, loader), unused -> true)
.process(
new TriggerManager(
loader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();
}

triggers =
triggers
.assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
.name(WATERMARK_ASSIGNER_OPERATOR_NAME)
.uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
Expand Down Expand Up @@ -277,14 +340,25 @@ public void append() throws IOException {
}

// Add the LockRemover to the end
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(tableName, lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
if (lockFactory == null) {
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemoverOperatorFactory(tableName, taskNames))
.uid("lock-remover-" + uidSuffix)
.forceNonParallel()
.slotSharingGroup(slotSharingGroup);
} else {
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(tableName, lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
* Event sent from TriggerManager operator to TableMaintenanceCoordinator to confirm that the lock
* has been acquired and the maintenance task is starting.
*/
public class LockAcquireResultEvent implements OperatorEvent {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this anymore


private final boolean isLock;
private final String lockId;
private final long timestamp;

public LockAcquireResultEvent(boolean isLock, String lockId, long timestamp) {
this.isLock = isLock;
this.lockId = lockId;
this.timestamp = timestamp;
}

public String lockId() {
return lockId;
}

public boolean isLock() {
return isLock;
}

public long timestamp() {
return timestamp;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("lockId", lockId)
.add("isLockHeld", isLock)
.add("timestamp", timestamp)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Objects;

/**
* Event sent from TriggerManager operator to TableMaintenanceCoordinator to confirm that the lock
* has been acquired and the maintenance task is starting.
*/
public class LockRegisterEvent implements OperatorEvent {

private final String lockId;
private final long timestamp;

public LockRegisterEvent(String lockId, long timestamp) {
this.lockId = lockId;
this.timestamp = timestamp;
}

public String lockId() {
return lockId;
}

public long timestamp() {
return timestamp;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (!(o instanceof LockRegisterEvent)) {
return false;
}

LockRegisterEvent that = (LockRegisterEvent) o;
return Objects.equal(timestamp, that.timestamp) && Objects.equal(lockId, that.lockId);
}

@Override
public int hashCode() {
return Objects.hashCode(timestamp, lockId);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("timeStamp", timestamp)
.add("lockId", lockId)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.maintenance.operator;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
* Event sent from TriggerManagerOperatorCoordinator to TriggerManager operator to notify that a
* lock has been released (watermark arrived).
*/
public class LockReleasedEvent implements OperatorEvent {

private final String lockId;
private final long timestamp;

public LockReleasedEvent(String lockId, long timestamp) {
this.lockId = lockId;
this.timestamp = timestamp;
}

public long timestamp() {
return timestamp;
}

public String lockId() {
return lockId;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("lockId", lockId)
.add("timestamp", timestamp)
.toString();
}
}
Loading