Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ target/
metastore_db/
.metals/
.mvn/
.m2repo/
*.bloop/
*.vscode/
*.metals/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.marker.MarkerOperation.ALL_MARKERS_URL;
Expand All @@ -53,6 +54,7 @@
import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_BASEPATH_PARAM;
import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM;
import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM;
import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_REQUEST_ID_PARAM;

/**
* Marker operations of using timeline server as a proxy to create and delete markers.
Expand Down Expand Up @@ -197,6 +199,7 @@ private Map<String, String> getConfigMap(
} else {
paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName);
}
paramsMap.put(MARKER_REQUEST_ID_PARAM, UUID.randomUUID().toString());

if (initEarlyConflictDetectionConfigs) {
paramsMap.put(MARKER_BASEPATH_PARAM, basePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class MarkerOperation implements Serializable {
public static final String MARKER_DIR_PATH_PARAM = "markerdirpath";
public static final String MARKER_NAME_PARAM = "markername";
public static final String MARKER_BASEPATH_PARAM = "basepath";
public static final String MARKER_REQUEST_ID_PARAM = "requestid";

// GET requests
public static final String ALL_MARKERS_URL = String.format("%s/%s", BASE_URL, "all");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,11 +545,19 @@ private void registerMarkerAPI() {

app.post(MarkerOperation.CREATE_MARKER_URL, new ViewHandler(ctx -> {
metricsRegistry.add("CREATE_MARKER", 1);
String requestId = ctx.queryParam(MarkerOperation.MARKER_REQUEST_ID_PARAM);
// Require requestId for all incoming requests
if (requestId == null || requestId.trim().isEmpty()) {
ctx.status(400);
writeValueAsString(ctx, false);
return;
}
ctx.future(markerHandler.createMarker(
ctx,
getMarkerDirParam(ctx),
ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, String.class).getOrDefault(""),
ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault("")));
ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault(""),
requestId));
}, false));

app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,12 @@ public boolean doesMarkerDirExist(String markerDir) {
* @param context Javalin app context
* @param markerDir marker directory path
* @param markerName marker name
* @param basePath base path
* @param requestId optional request ID for idempotent retries (POST only); null for legacy or when TLS recovers
* @return the {@code CompletableFuture} instance for the request
*/
public CompletableFuture<String> createMarker(Context context, String markerDir, String markerName, String basePath) {
public CompletableFuture<String> createMarker(Context context, String markerDir, String markerName, String basePath,
String requestId) {
// Step1 do early conflict detection if enable
if (timelineServiceConfig.earlyConflictDetectionEnable) {
try {
Expand Down Expand Up @@ -237,21 +240,19 @@ public CompletableFuture<String> createMarker(Context context, String markerDir,
log.warn("Failed to execute early conflict detection. Marker creation will continue.", e);
// When early conflict detection fails to execute, we still allow the marker creation
// to continue
return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName);
return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName, requestId);
}
}

// Step 2 create marker
return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName);
return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName, requestId);
}

private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing(
Context context, String markerDir, String markerName) {
log.debug("Request: Create marker: {}", markerName);
MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName);
// Add the future to the list
Context context, String markerDir, String markerName, String requestId) {
log.debug("Request: Create marker: {} with requestId: {}", markerName, requestId);
MarkerDirState markerDirState = getMarkerDirState(markerDir);
markerDirState.addMarkerCreationFuture(future);
MarkerCreationFuture future = markerDirState.getOrCreateMarkerCreationFuture(context, markerName, requestId);
if (!firstCreationRequestSeen) {
synchronized (firstCreationRequestSeenLock) {
if (!firstCreationRequestSeen) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

/**
* Future for async marker creation request.
* Request ID is optional; when present it enables idempotent retries (same request ID = same logical create).
*/
@Getter
@Slf4j
Expand All @@ -37,16 +38,22 @@ public class MarkerCreationFuture extends CompletableFuture<String> {
private final Context context;
private final String markerDirPath;
private final String markerName;
private final String requestId;
private boolean isSuccessful;
@Getter(AccessLevel.NONE)
private final HoodieTimer timer;

public MarkerCreationFuture(Context context, String markerDirPath, String markerName) {
this(context, markerDirPath, markerName, null);
}

public MarkerCreationFuture(Context context, String markerDirPath, String markerName, String requestId) {
super();
this.timer = HoodieTimer.start();
this.context = context;
this.markerDirPath = markerDirPath;
this.markerName = markerName;
this.requestId = requestId;
this.isSuccessful = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.hudi.storage.StoragePath;

import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Getter;
import io.javalin.http.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.util.StringUtils;

Expand All @@ -45,11 +45,13 @@
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -65,13 +67,14 @@
@Slf4j
public class MarkerDirState implements Serializable {

private static final String NULL_REQUEST_ID = "";
// Marker directory
private final StoragePath markerDirPath;
private final HoodieStorage storage;
private final Registry metricsRegistry;
// A cached copy of all markers in memory
@Getter
private final Set<String> allMarkers = new HashSet<>();
// Marker name -> request ID (empty string sentinel when TLS recovers from crash).
// Enables idempotent retries: same requestId for same marker is treated as success.
private final Map<String, String> markerToRequestIdMap = new ConcurrentHashMap<>();
// A cached copy of marker entries in each marker file, stored in StringBuilder
// for efficient appending
// Mapping: {markerFileIndex -> markers}
Expand All @@ -80,8 +83,9 @@ public class MarkerDirState implements Serializable {
// {@code true} means the file is in use by a {@code BatchCreateMarkerRunnable}.
// Index of the list is used for the filename, i.e., "1" -> "MARKERS1"
private final List<Boolean> threadUseStatus;
// A list of pending futures from async marker creation requests
private final List<MarkerCreationFuture> markerCreationFutures = new ArrayList<>();
// Map of in-flight requests: (markerName + "|" + requestId) -> MarkerCreationFuture
// Used for BOTH deduplication AND batch marker creation requests queue
private final Map<String, MarkerCreationFuture> inflightRequestMap = new ConcurrentHashMap<>();
private final int parallelism;
private final Object markerCreationProcessingLock = new Object();
// Early conflict detection strategy if enabled
Expand All @@ -107,6 +111,13 @@ public MarkerDirState(String markerDirPath, int markerBatchNumThreads,
syncMarkersFromFileSystem();
}

/**
* @return all marker names in the directory (backward compatible).
*/
public Set<String> getAllMarkers() {
return Collections.unmodifiableSet(new HashSet<>(markerToRequestIdMap.keySet()));
}

/**
* @return {@code true} if the marker directory exists in the system.
*/
Expand All @@ -119,15 +130,27 @@ public boolean exists() {
}

/**
* Adds a {@code MarkerCreationCompletableFuture} instance from a marker
* creation request to the queue.
* Checks if an identical in-flight request exists and returns it, or creates a new future.
* This deduplicates concurrent requests with the same marker and requestId.
*
* @param future {@code MarkerCreationCompletableFuture} instance.
* @param context Javalin context
* @param markerName Marker name
* @param requestId Request ID (non-null from clients, can be null only during recovery)
* @return Existing future if found, otherwise a new future
*/
public void addMarkerCreationFuture(MarkerCreationFuture future) {
synchronized (markerCreationFutures) {
markerCreationFutures.add(future);
public MarkerCreationFuture getOrCreateMarkerCreationFuture(Context context, String markerName, String requestId) {
String dedupKey = markerName + "|" + (requestId != null ? requestId : NULL_REQUEST_ID);

MarkerCreationFuture existingFuture = inflightRequestMap.get(dedupKey);
if (existingFuture != null) {
log.debug("Found existing in-flight request for marker {} with requestId {}. Returning same future.",
markerName, requestId != null ? requestId : "null");
return existingFuture;
}

MarkerCreationFuture future = new MarkerCreationFuture(context, markerDirPath.toString(), markerName, requestId);
inflightRequestMap.put(dedupKey, future);
return future;
}

/**
Expand Down Expand Up @@ -174,23 +197,35 @@ public List<MarkerCreationFuture> fetchPendingMarkerCreationRequests() {
}

/**
* @param shouldClear Should clear the internal request list or not.
* @param shouldClear Should clear the internal request map or not.
* @return futures of pending marker creation requests.
*/
public List<MarkerCreationFuture> getPendingMarkerCreationRequests(boolean shouldClear) {
List<MarkerCreationFuture> pendingFutures;
synchronized (markerCreationFutures) {
if (markerCreationFutures.isEmpty()) {
return new ArrayList<>();
}
pendingFutures = new ArrayList<>(markerCreationFutures);
if (shouldClear) {
markerCreationFutures.clear();
}
if (inflightRequestMap.isEmpty()) {
return Collections.emptyList();
}

List<MarkerCreationFuture> pendingFutures = new ArrayList<>(inflightRequestMap.values());
if (shouldClear) {
inflightRequestMap.clear();
}
return pendingFutures;
}

private boolean isMarkerAlreadySeen(MarkerCreationFuture future, String markerName, String requestId) {
if (!markerToRequestIdMap.containsKey(markerName)) {
return false;
}

String existingRequestId = markerToRequestIdMap.get(markerName);
boolean idempotentMatch = !NULL_REQUEST_ID.equals(existingRequestId)
&& requestId != null
&& requestId.equals(existingRequestId);

future.setIsSuccessful(idempotentMatch);
return true;
}

/**
* Processes pending marker creation requests.
*
Expand All @@ -207,11 +242,13 @@ public void processMarkerCreationRequests(
log.debug("timeMs={} markerDirPath={} numRequests={} fileIndex={}",
System.currentTimeMillis(), markerDirPath, pendingMarkerCreationFutures.size(), fileIndex);
boolean shouldFlushMarkers = false;

synchronized (markerCreationProcessingLock) {
for (MarkerCreationFuture future : pendingMarkerCreationFutures) {
String markerName = future.getMarkerName();
boolean exists = allMarkers.contains(markerName);
String requestId = future.getRequestId();

boolean exists = isMarkerAlreadySeen(future, markerName, requestId);
if (!exists) {
if (conflictDetectionStrategy.isPresent()) {
try {
Expand All @@ -225,16 +262,16 @@ public void processMarkerCreationRequests(
log.warn("Failed to execute early conflict detection. Marker creation will continue.", e);
// When early conflict detection fails to execute, we still allow the marker creation
// to continue
addMarkerToMap(fileIndex, markerName);
addMarkerToMap(fileIndex, markerName, requestId);
future.setIsSuccessful(true);
shouldFlushMarkers = true;
continue;
}
}
addMarkerToMap(fileIndex, markerName);
addMarkerToMap(fileIndex, markerName, requestId);
future.setIsSuccessful(true);
shouldFlushMarkers = true;
}
future.setIsSuccessful(!exists);
}

if (!isMarkerTypeWritten) {
Expand Down Expand Up @@ -265,13 +302,14 @@ public void processMarkerCreationRequests(
*/
public boolean deleteAllMarkers() {
boolean result = FSUtils.deleteDir(hoodieEngineContext, storage, markerDirPath, parallelism);
allMarkers.clear();
markerToRequestIdMap.clear();
fileMarkersMap.clear();
return result;
}

/**
* Syncs all markers maintained in the underlying files under the marker directory in the file system.
* When TLS recovers from crash, we have no request IDs so store sentinel value for each marker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we cannot know what the requestID was, we can avoid backfilling this map with the NULL_REQUEST_ID and let the requests fail to be safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but to fail the request, we end up looking up in markerToRequestIdMap right. So, to even fail a marker request call to an already existing marker, we need to populate entries in the map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we will allow the request to succeed if they are both NULL_REQUEST_ID which is different from existing behavior where we fail in this case. I think it is easier to reason about if we don't backfill this map.

*/
private void syncMarkersFromFileSystem() {
Map<String, Set<String>> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem(
Expand All @@ -282,7 +320,9 @@ private void syncMarkersFromFileSystem() {
int index = parseMarkerFileIndex(markersFilePathStr);
if (index >= 0) {
fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers)));
allMarkers.addAll(fileMarkers);
for (String marker : fileMarkers) {
markerToRequestIdMap.put(marker, NULL_REQUEST_ID);
}
}
}
}
Expand All @@ -301,9 +341,10 @@ private void syncMarkersFromFileSystem() {
*
* @param fileIndex Marker file index number.
* @param markerName Marker name.
* @param requestId Request ID for idempotency; null for recovery.
*/
private void addMarkerToMap(int fileIndex, String markerName) {
allMarkers.add(markerName);
private void addMarkerToMap(int fileIndex, String markerName, String requestId) {
markerToRequestIdMap.put(markerName, requestId == null ? NULL_REQUEST_ID : requestId);
StringBuilder stringBuilder = fileMarkersMap.computeIfAbsent(fileIndex, k -> new StringBuilder(16384));
stringBuilder.append(markerName);
stringBuilder.append('\n');
Expand Down
Loading
Loading