From f6db969dba7f1d1affbf74f7b36b0744e921736f Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Mon, 2 Feb 2026 12:35:21 -0800 Subject: [PATCH 1/7] GI-18072 - Allow retries on marker files --- .../TimelineServerBasedWriteMarkers.java | 3 + .../common/table/marker/MarkerOperation.java | 1 + .../hudi/timeline/service/RequestHandler.java | 4 +- .../service/handlers/MarkerHandler.java | 13 +-- .../handlers/marker/MarkerCreationFuture.java | 7 ++ .../handlers/marker/MarkerDirState.java | 85 ++++++++++++------- .../timeline/service/TestRequestHandler.java | 48 +++++++++++ 7 files changed, 125 insertions(+), 36 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java index b31fdc94178af..0c87dcd6a05ed 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import static org.apache.hudi.common.table.marker.MarkerOperation.ALL_MARKERS_URL; @@ -53,6 +54,7 @@ import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_BASEPATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_REQUEST_ID_PARAM; /** * Marker operations of using timeline server as a proxy to create and delete markers. @@ -197,6 +199,7 @@ private Map getConfigMap( } else { paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName); } + paramsMap.put(MARKER_REQUEST_ID_PARAM, UUID.randomUUID().toString()); if (initEarlyConflictDetectionConfigs) { paramsMap.put(MARKER_BASEPATH_PARAM, basePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java index 965e8192dec77..42fcd8dca943b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java @@ -30,6 +30,7 @@ public class MarkerOperation implements Serializable { public static final String MARKER_DIR_PATH_PARAM = "markerdirpath"; public static final String MARKER_NAME_PARAM = "markername"; public static final String MARKER_BASEPATH_PARAM = "basepath"; + public static final String MARKER_REQUEST_ID_PARAM = "requestid"; // GET requests public static final String ALL_MARKERS_URL = String.format("%s/%s", BASE_URL, "all"); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 065e1457d6a17..e9aa1f6c86bbe 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -545,11 +545,13 @@ private void registerMarkerAPI() { app.post(MarkerOperation.CREATE_MARKER_URL, new ViewHandler(ctx -> { metricsRegistry.add("CREATE_MARKER", 1); + String requestId = ctx.queryParam(MarkerOperation.MARKER_REQUEST_ID_PARAM); ctx.future(markerHandler.createMarker( ctx, getMarkerDirParam(ctx), ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault(""))); + ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault(""), + requestId)); }, false)); app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index 7a35266720a6a..f43bcebaad218 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -184,9 +184,12 @@ public boolean doesMarkerDirExist(String markerDir) { * @param context Javalin app context * @param markerDir marker directory path * @param markerName marker name + * @param basePath base path + * @param requestId optional request ID for idempotent retries (POST only); null for legacy or when TLS recovers * @return the {@code CompletableFuture} instance for the request */ - public CompletableFuture createMarker(Context context, String markerDir, String markerName, String basePath) { + public CompletableFuture createMarker(Context context, String markerDir, String markerName, String basePath, + String requestId) { // Step1 do early conflict detection if enable if (timelineServiceConfig.earlyConflictDetectionEnable) { try { @@ -237,18 +240,18 @@ public CompletableFuture createMarker(Context context, String markerDir, log.warn("Failed to execute early conflict detection. Marker creation will continue.", e); // When early conflict detection fails to execute, we still allow the marker creation // to continue - return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName); + return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName, requestId); } } // Step 2 create marker - return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName); + return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName, requestId); } private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing( - Context context, String markerDir, String markerName) { + Context context, String markerDir, String markerName, String requestId) { log.debug("Request: Create marker: {}", markerName); - MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName); + MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName, requestId); // Add the future to the list MarkerDirState markerDirState = getMarkerDirState(markerDir); markerDirState.addMarkerCreationFuture(future); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java index 8b6d9192f9b61..f0580d65336b2 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java @@ -29,6 +29,7 @@ /** * Future for async marker creation request. + * Request ID is optional; when present it enables idempotent retries (same request ID = same logical create). */ @Getter @Slf4j @@ -37,16 +38,22 @@ public class MarkerCreationFuture extends CompletableFuture { private final Context context; private final String markerDirPath; private final String markerName; + private final String requestId; private boolean isSuccessful; @Getter(AccessLevel.NONE) private final HoodieTimer timer; public MarkerCreationFuture(Context context, String markerDirPath, String markerName) { + this(context, markerDirPath, markerName, null); + } + + public MarkerCreationFuture(Context context, String markerDirPath, String markerName, String requestId) { super(); this.timer = HoodieTimer.start(); this.context = context; this.markerDirPath = markerDirPath; this.markerName = markerName; + this.requestId = requestId; this.isSuccessful = false; } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 9f804086f4811..244a4f3eec978 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -34,7 +34,6 @@ import org.apache.hudi.storage.StoragePath; import com.fasterxml.jackson.core.JsonProcessingException; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.util.StringUtils; @@ -45,11 +44,13 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -65,13 +66,14 @@ @Slf4j public class MarkerDirState implements Serializable { + private static final String NULL_REQUEST_ID = ""; // Marker directory private final StoragePath markerDirPath; private final HoodieStorage storage; private final Registry metricsRegistry; - // A cached copy of all markers in memory - @Getter - private final Set allMarkers = new HashSet<>(); + // Marker name -> request ID (empty string sentinel when TLS recovers from crash or for legacy clients). + // Enables idempotent retries: same requestId for same marker is treated as success. + private final Map markerToRequestIdMap = new ConcurrentHashMap<>(); // A cached copy of marker entries in each marker file, stored in StringBuilder // for efficient appending // Mapping: {markerFileIndex -> markers} @@ -107,6 +109,13 @@ public MarkerDirState(String markerDirPath, int markerBatchNumThreads, syncMarkersFromFileSystem(); } + /** + * @return all marker names in the directory (backward compatible). + */ + public Set getAllMarkers() { + return Collections.unmodifiableSet(new HashSet<>(markerToRequestIdMap.keySet())); + } + /** * @return {@code true} if the marker directory exists in the system. */ @@ -211,30 +220,42 @@ public void processMarkerCreationRequests( synchronized (markerCreationProcessingLock) { for (MarkerCreationFuture future : pendingMarkerCreationFutures) { String markerName = future.getMarkerName(); - boolean exists = allMarkers.contains(markerName); - if (!exists) { - if (conflictDetectionStrategy.isPresent()) { - try { - conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary(); - } catch (HoodieEarlyConflictDetectionException he) { - log.error("Detected the write conflict due to a concurrent writer, " - + "failing the marker creation as the early conflict detection is enabled", he); - future.setIsSuccessful(false); - continue; - } catch (Exception e) { - log.warn("Failed to execute early conflict detection. Marker creation will continue.", e); - // When early conflict detection fails to execute, we still allow the marker creation - // to continue - addMarkerToMap(fileIndex, markerName); - future.setIsSuccessful(true); - shouldFlushMarkers = true; - continue; - } + String requestId = future.getRequestId(); + // Idempotent retry: marker already created with same requestId + if (markerToRequestIdMap.containsKey(markerName)) { + String existingRequestId = markerToRequestIdMap.get(markerName); + String normalizedRequestId = requestId == null ? NULL_REQUEST_ID : requestId; + boolean idempotentMatch = normalizedRequestId.equals(existingRequestId) + || NULL_REQUEST_ID.equals(existingRequestId) + || NULL_REQUEST_ID.equals(normalizedRequestId); + if (idempotentMatch) { + future.setIsSuccessful(true); + } else { + future.setIsSuccessful(false); } - addMarkerToMap(fileIndex, markerName); - shouldFlushMarkers = true; + continue; } - future.setIsSuccessful(!exists); + if (conflictDetectionStrategy.isPresent()) { + try { + conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary(); + } catch (HoodieEarlyConflictDetectionException he) { + log.error("Detected the write conflict due to a concurrent writer, " + + "failing the marker creation as the early conflict detection is enabled", he); + future.setIsSuccessful(false); + continue; + } catch (Exception e) { + log.warn("Failed to execute early conflict detection. Marker creation will continue.", e); + // When early conflict detection fails to execute, we still allow the marker creation + // to continue + addMarkerToMap(fileIndex, markerName, requestId); + future.setIsSuccessful(true); + shouldFlushMarkers = true; + continue; + } + } + addMarkerToMap(fileIndex, markerName, requestId); + future.setIsSuccessful(true); + shouldFlushMarkers = true; } if (!isMarkerTypeWritten) { @@ -265,13 +286,14 @@ public void processMarkerCreationRequests( */ public boolean deleteAllMarkers() { boolean result = FSUtils.deleteDir(hoodieEngineContext, storage, markerDirPath, parallelism); - allMarkers.clear(); + markerToRequestIdMap.clear(); fileMarkersMap.clear(); return result; } /** * Syncs all markers maintained in the underlying files under the marker directory in the file system. + * When TLS recovers from crash, we have no request IDs so store sentinel value for each marker. */ private void syncMarkersFromFileSystem() { Map> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( @@ -282,7 +304,9 @@ private void syncMarkersFromFileSystem() { int index = parseMarkerFileIndex(markersFilePathStr); if (index >= 0) { fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers))); - allMarkers.addAll(fileMarkers); + for (String marker : fileMarkers) { + markerToRequestIdMap.put(marker, NULL_REQUEST_ID); + } } } } @@ -301,9 +325,10 @@ private void syncMarkersFromFileSystem() { * * @param fileIndex Marker file index number. * @param markerName Marker name. + * @param requestId Optional request ID for idempotency; null for legacy or recovery. */ - private void addMarkerToMap(int fileIndex, String markerName) { - allMarkers.add(markerName); + private void addMarkerToMap(int fileIndex, String markerName, String requestId) { + markerToRequestIdMap.put(markerName, requestId == null ? NULL_REQUEST_ID : requestId); StringBuilder stringBuilder = fileMarkersMap.computeIfAbsent(fileIndex, k -> new StringBuilder(16384)); stringBuilder.append(markerName); stringBuilder.append('\n'); diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java index acb8e4bd0d763..207b0d02345ff 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java @@ -45,11 +45,13 @@ import static org.apache.hudi.common.table.marker.MarkerOperation.CREATE_MARKER_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_REQUEST_ID_PARAM; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.BASEPATH_PARAM; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.LAST_INSTANT_TS; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.REFRESH_TABLE_URL; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.TIMELINE_HASH; import static org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod.POST; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; class TestRequestHandler extends HoodieCommonTestHarness { @@ -120,6 +122,50 @@ void testCreateMarkerAPIWithDifferentSchemes() throws IOException { assertMarkerCreation(tempDir.resolve("base-path-2").toUri().toString(), "test2:/"); } + @Test + void testMarkerIdempotency() throws IOException, InterruptedException { + String basePath = tempDir.resolve("base-path-idempotency").toUri().toString(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, getTableType()); + String markerDir = metaClient.getMarkerFolderPath("102"); + String markerName = "partition1/file1.parquet.marker.CREATE"; + String requestId1 = java.util.UUID.randomUUID().toString(); + String requestId2 = java.util.UUID.randomUUID().toString(); + + Map queryParameters = new HashMap<>(); + queryParameters.put(BASEPATH_PARAM, basePath); + queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); + queryParameters.put(MARKER_NAME_PARAM, markerName); + queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId1); + + // First request + boolean result1 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result1, "First marker creation should succeed"); + + // Give server time to process + Thread.sleep(500); + + // Retry with same requestId (simulating timeout + retry) should succeed (idempotent) + boolean result2 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result2, "Retry with same requestId should succeed (idempotent)"); + + // Retry with different requestId (distinct logical request) should fail + queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId2); + boolean result3 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertFalse(result3, "Retry with different requestId should fail"); + } + private void assertMarkerCreation(String basePath, String schema) throws IOException { Map queryParameters = new HashMap<>(); String basePathScheme = getPathWithReplacedSchema(basePath, schema); @@ -129,6 +175,8 @@ private void assertMarkerCreation(String basePath, String schema) throws IOExcep queryParameters.put(BASEPATH_PARAM, basePathScheme); queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); queryParameters.put(MARKER_NAME_PARAM, "marker-file-1"); + // Add requestId for idempotency + queryParameters.put(MARKER_REQUEST_ID_PARAM, java.util.UUID.randomUUID().toString()); boolean content = timelineServiceClient.makeRequest( TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) From be90d4fffc55cebc20d8debe2ee5c7897c06619c Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Mon, 2 Feb 2026 12:46:34 -0800 Subject: [PATCH 2/7] GI-18072 - Allow retries on marker files - added more tests --- .../timeline/service/TestRequestHandler.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java index 207b0d02345ff..a23bc26228c9d 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java @@ -166,6 +166,43 @@ void testMarkerIdempotency() throws IOException, InterruptedException { assertFalse(result3, "Retry with different requestId should fail"); } + @Test + void testMarkerBackwardCompatibilityNullExistingRequestId() throws IOException, InterruptedException { + String basePath = tempDir.resolve("base-path-backward-compat").toUri().toString(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, getTableType()); + String markerDir = metaClient.getMarkerFolderPath("104"); + String markerName = "partition1/file1.parquet.marker.CREATE"; + String newRequestId = java.util.UUID.randomUUID().toString(); + + // First request: no requestId (legacy client) → marker stored with null requestId + Map queryParametersLegacy = new HashMap<>(); + queryParametersLegacy.put(BASEPATH_PARAM, basePath); + queryParametersLegacy.put(MARKER_DIR_PATH_PARAM, markerDir); + queryParametersLegacy.put(MARKER_NAME_PARAM, markerName); + // Omit MARKER_REQUEST_ID_PARAM + + boolean result1 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParametersLegacy) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result1, "First marker creation (no requestId) should succeed"); + + // Give server time to process + Thread.sleep(500); + + // Second request: same marker with non-null requestId → should succeed (backward compat) + Map queryParametersWithRequestId = new HashMap<>(queryParametersLegacy); + queryParametersWithRequestId.put(MARKER_REQUEST_ID_PARAM, newRequestId); + + boolean result2 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParametersWithRequestId) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result2, "Request with non-null requestId when marker has null should succeed (backward compat)"); + } + private void assertMarkerCreation(String basePath, String schema) throws IOException { Map queryParameters = new HashMap<>(); String basePathScheme = getPathWithReplacedSchema(basePath, schema); From 3aa87e00d1d5d1895302c0df0371c4741ea407e4 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Tue, 3 Feb 2026 14:36:28 -0800 Subject: [PATCH 3/7] GI-18072 - Addressed comments --- .../service/handlers/MarkerHandler.java | 6 +- .../handlers/marker/MarkerDirState.java | 121 ++++++++++-------- .../timeline/service/TestRequestHandler.java | 37 +++++- 3 files changed, 101 insertions(+), 63 deletions(-) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index f43bcebaad218..61a46e05770c2 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -250,11 +250,9 @@ public CompletableFuture createMarker(Context context, String markerDir, private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing( Context context, String markerDir, String markerName, String requestId) { - log.debug("Request: Create marker: {}", markerName); - MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName, requestId); - // Add the future to the list + log.debug("Request: Create marker: {} with requestId: {}", markerName, requestId); MarkerDirState markerDirState = getMarkerDirState(markerDir); - markerDirState.addMarkerCreationFuture(future); + MarkerCreationFuture future = markerDirState.getOrCreateMarkerCreationFuture(context, markerName, requestId); if (!firstCreationRequestSeen) { synchronized (firstCreationRequestSeenLock) { if (!firstCreationRequestSeen) { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 244a4f3eec978..d42e070bd4dc3 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -34,6 +34,7 @@ import org.apache.hudi.storage.StoragePath; import com.fasterxml.jackson.core.JsonProcessingException; +import io.javalin.http.Context; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.util.StringUtils; @@ -71,7 +72,7 @@ public class MarkerDirState implements Serializable { private final StoragePath markerDirPath; private final HoodieStorage storage; private final Registry metricsRegistry; - // Marker name -> request ID (empty string sentinel when TLS recovers from crash or for legacy clients). + // Marker name -> request ID (empty string sentinel when TLS recovers from crash). // Enables idempotent retries: same requestId for same marker is treated as success. private final Map markerToRequestIdMap = new ConcurrentHashMap<>(); // A cached copy of marker entries in each marker file, stored in StringBuilder @@ -82,8 +83,9 @@ public class MarkerDirState implements Serializable { // {@code true} means the file is in use by a {@code BatchCreateMarkerRunnable}. // Index of the list is used for the filename, i.e., "1" -> "MARKERS1" private final List threadUseStatus; - // A list of pending futures from async marker creation requests - private final List markerCreationFutures = new ArrayList<>(); + // Map of in-flight requests: (markerName + "|" + requestId) -> MarkerCreationFuture + // Used for BOTH deduplication AND batch marker creation requests queue + private final Map inflightRequestMap = new ConcurrentHashMap<>(); private final int parallelism; private final Object markerCreationProcessingLock = new Object(); // Early conflict detection strategy if enabled @@ -128,15 +130,27 @@ public boolean exists() { } /** - * Adds a {@code MarkerCreationCompletableFuture} instance from a marker - * creation request to the queue. + * Checks if an identical in-flight request exists and returns it, or creates a new future. + * This deduplicates concurrent requests with the same marker and requestId. * - * @param future {@code MarkerCreationCompletableFuture} instance. + * @param context Javalin context + * @param markerName Marker name + * @param requestId Request ID (non-null from clients, can be null only during recovery) + * @return Existing future if found, otherwise a new future */ - public void addMarkerCreationFuture(MarkerCreationFuture future) { - synchronized (markerCreationFutures) { - markerCreationFutures.add(future); + public MarkerCreationFuture getOrCreateMarkerCreationFuture(Context context, String markerName, String requestId) { + String dedupKey = markerName + "|" + (requestId != null ? requestId : NULL_REQUEST_ID); + + MarkerCreationFuture existingFuture = inflightRequestMap.get(dedupKey); + if (existingFuture != null) { + log.debug("Found existing in-flight request for marker {} with requestId {}. Returning same future.", + markerName, requestId != null ? requestId : "null"); + return existingFuture; } + + MarkerCreationFuture future = new MarkerCreationFuture(context, markerDirPath.toString(), markerName, requestId); + inflightRequestMap.put(dedupKey, future); + return future; } /** @@ -183,23 +197,34 @@ public List fetchPendingMarkerCreationRequests() { } /** - * @param shouldClear Should clear the internal request list or not. + * @param shouldClear Should clear the internal request map or not. * @return futures of pending marker creation requests. */ public List getPendingMarkerCreationRequests(boolean shouldClear) { - List pendingFutures; - synchronized (markerCreationFutures) { - if (markerCreationFutures.isEmpty()) { - return new ArrayList<>(); - } - pendingFutures = new ArrayList<>(markerCreationFutures); - if (shouldClear) { - markerCreationFutures.clear(); - } + if (inflightRequestMap.isEmpty()) { + return new ArrayList<>(); + } + + List pendingFutures = new ArrayList<>(inflightRequestMap.values()); + if (shouldClear) { + inflightRequestMap.clear(); } return pendingFutures; } + private boolean isMarkerAlreadySeen(MarkerCreationFuture future, String markerName, String requestId) { + if (!markerToRequestIdMap.containsKey(markerName)) { + return false; + } + + String existingRequestId = markerToRequestIdMap.get(markerName); + String normalizedRequestId = requestId == null ? NULL_REQUEST_ID : requestId; + boolean idempotentMatch = normalizedRequestId.equals(existingRequestId); + + future.setIsSuccessful(idempotentMatch); + return true; + } + /** * Processes pending marker creation requests. * @@ -216,46 +241,36 @@ public void processMarkerCreationRequests( log.debug("timeMs={} markerDirPath={} numRequests={} fileIndex={}", System.currentTimeMillis(), markerDirPath, pendingMarkerCreationFutures.size(), fileIndex); boolean shouldFlushMarkers = false; - + synchronized (markerCreationProcessingLock) { for (MarkerCreationFuture future : pendingMarkerCreationFutures) { String markerName = future.getMarkerName(); String requestId = future.getRequestId(); - // Idempotent retry: marker already created with same requestId - if (markerToRequestIdMap.containsKey(markerName)) { - String existingRequestId = markerToRequestIdMap.get(markerName); - String normalizedRequestId = requestId == null ? NULL_REQUEST_ID : requestId; - boolean idempotentMatch = normalizedRequestId.equals(existingRequestId) - || NULL_REQUEST_ID.equals(existingRequestId) - || NULL_REQUEST_ID.equals(normalizedRequestId); - if (idempotentMatch) { - future.setIsSuccessful(true); - } else { - future.setIsSuccessful(false); - } - continue; - } - if (conflictDetectionStrategy.isPresent()) { - try { - conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary(); - } catch (HoodieEarlyConflictDetectionException he) { - log.error("Detected the write conflict due to a concurrent writer, " - + "failing the marker creation as the early conflict detection is enabled", he); - future.setIsSuccessful(false); - continue; - } catch (Exception e) { - log.warn("Failed to execute early conflict detection. Marker creation will continue.", e); - // When early conflict detection fails to execute, we still allow the marker creation - // to continue - addMarkerToMap(fileIndex, markerName, requestId); - future.setIsSuccessful(true); - shouldFlushMarkers = true; - continue; + + boolean exists = isMarkerAlreadySeen(future, markerName, requestId); + if (!exists) { + if (conflictDetectionStrategy.isPresent()) { + try { + conflictDetectionStrategy.get().detectAndResolveConflictIfNecessary(); + } catch (HoodieEarlyConflictDetectionException he) { + log.error("Detected the write conflict due to a concurrent writer, " + + "failing the marker creation as the early conflict detection is enabled", he); + future.setIsSuccessful(false); + continue; + } catch (Exception e) { + log.warn("Failed to execute early conflict detection. Marker creation will continue.", e); + // When early conflict detection fails to execute, we still allow the marker creation + // to continue + addMarkerToMap(fileIndex, markerName, requestId); + future.setIsSuccessful(true); + shouldFlushMarkers = true; + continue; + } } + addMarkerToMap(fileIndex, markerName, requestId); + future.setIsSuccessful(true); + shouldFlushMarkers = true; } - addMarkerToMap(fileIndex, markerName, requestId); - future.setIsSuccessful(true); - shouldFlushMarkers = true; } if (!isMarkerTypeWritten) { diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java index a23bc26228c9d..1713019f0be9e 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java @@ -39,9 +39,11 @@ import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.hudi.common.config.HoodieStorageConfig.HOODIE_STORAGE_CLASS; +import static org.apache.hudi.common.table.marker.MarkerOperation.ALL_MARKERS_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.CREATE_MARKER_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM; @@ -50,6 +52,7 @@ import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.LAST_INSTANT_TS; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.REFRESH_TABLE_URL; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.TIMELINE_HASH; +import static org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod.GET; import static org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod.POST; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -145,8 +148,7 @@ void testMarkerIdempotency() throws IOException, InterruptedException { .getDecodedContent(new TypeReference() {}); assertTrue(result1, "First marker creation should succeed"); - // Give server time to process - Thread.sleep(500); + waitForMarkerCreation(markerDir, markerName, 1000); // Retry with same requestId (simulating timeout + retry) should succeed (idempotent) boolean result2 = timelineServiceClient.makeRequest( @@ -188,10 +190,9 @@ void testMarkerBackwardCompatibilityNullExistingRequestId() throws IOException, .getDecodedContent(new TypeReference() {}); assertTrue(result1, "First marker creation (no requestId) should succeed"); - // Give server time to process - Thread.sleep(500); + waitForMarkerCreation(markerDir, markerName, 1000); - // Second request: same marker with non-null requestId → should succeed (backward compat) + // Second request: same marker with non-null requestId → should fail (no backward compat) Map queryParametersWithRequestId = new HashMap<>(queryParametersLegacy); queryParametersWithRequestId.put(MARKER_REQUEST_ID_PARAM, newRequestId); @@ -200,7 +201,7 @@ void testMarkerBackwardCompatibilityNullExistingRequestId() throws IOException, .addQueryParams(queryParametersWithRequestId) .build()) .getDecodedContent(new TypeReference() {}); - assertTrue(result2, "Request with non-null requestId when marker has null should succeed (backward compat)"); + assertFalse(result2, "Request with non-null requestId when marker has null should fail (no backward compat)"); } private void assertMarkerCreation(String basePath, String schema) throws IOException { @@ -232,4 +233,28 @@ private String getPathWithReplacedSchema(String path, String schemaToUse) { } throw new IllegalArgumentException("Invalid file provided"); } + + private void waitForMarkerCreation(String markerDir, String expectedMarker, long timeoutMs) + throws IOException, InterruptedException { + long startTime = System.currentTimeMillis(); + long pollIntervalMs = 50; + + Map queryParameters = new HashMap<>(); + queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); + + while (System.currentTimeMillis() - startTime < timeoutMs) { + List markers = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(GET, ALL_MARKERS_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference>() { + }); + + if (markers != null && markers.contains(expectedMarker)) { + return; + } + + Thread.sleep(pollIntervalMs); + } + } } From ae5f6a8e1130044c919606bad7271099c75d9981 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Tue, 3 Feb 2026 14:41:40 -0800 Subject: [PATCH 4/7] GI-18072 - Addressed comments --- .../hudi/timeline/service/handlers/marker/MarkerDirState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index d42e070bd4dc3..a40696daa7923 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -340,7 +340,7 @@ private void syncMarkersFromFileSystem() { * * @param fileIndex Marker file index number. * @param markerName Marker name. - * @param requestId Optional request ID for idempotency; null for legacy or recovery. + * @param requestId Request ID for idempotency; null for recovery. */ private void addMarkerToMap(int fileIndex, String markerName, String requestId) { markerToRequestIdMap.put(markerName, requestId == null ? NULL_REQUEST_ID : requestId); From cdee329c7c4a772fab0bf163a7441d9883f7ebbf Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Mon, 9 Feb 2026 14:36:21 -0800 Subject: [PATCH 5/7] GI-18072 - Addressed comments --- .../hudi/timeline/service/handlers/marker/MarkerDirState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index a40696daa7923..425fccd00eeea 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -202,7 +202,7 @@ public List fetchPendingMarkerCreationRequests() { */ public List getPendingMarkerCreationRequests(boolean shouldClear) { if (inflightRequestMap.isEmpty()) { - return new ArrayList<>(); + return Collections.emptyList(); } List pendingFutures = new ArrayList<>(inflightRequestMap.values()); From 140c3bbe87e979ce76f3d8fff02add267a2a0b33 Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Mon, 9 Feb 2026 17:09:27 -0800 Subject: [PATCH 6/7] GI-18072 - Addressed comments --- .gitignore | 1 + .../handlers/marker/MarkerDirState.java | 3 +- .../timeline/service/TestRequestHandler.java | 35 +++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 1e9c6c992cca6..5bdb70ed9939c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ target/ metastore_db/ .metals/ .mvn/ +.m2repo/ *.bloop/ *.vscode/ *.metals/ diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 425fccd00eeea..ee5ddaef4dcfe 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -219,7 +219,8 @@ private boolean isMarkerAlreadySeen(MarkerCreationFuture future, String markerNa String existingRequestId = markerToRequestIdMap.get(markerName); String normalizedRequestId = requestId == null ? NULL_REQUEST_ID : requestId; - boolean idempotentMatch = normalizedRequestId.equals(existingRequestId); + boolean idempotentMatch = !NULL_REQUEST_ID.equals(existingRequestId) + && normalizedRequestId.equals(existingRequestId); future.setIsSuccessful(idempotentMatch); return true; diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java index 1713019f0be9e..873ce14b78576 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java @@ -204,6 +204,41 @@ void testMarkerBackwardCompatibilityNullExistingRequestId() throws IOException, assertFalse(result2, "Request with non-null requestId when marker has null should fail (no backward compat)"); } + @Test + void testMarkerCreationAfterRecovery() throws IOException, InterruptedException { + String basePath = tempDir.resolve("base-path-recovery").toUri().toString(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, getTableType()); + String markerDir = metaClient.getMarkerFolderPath("105"); + String markerName = "partition1/file1.parquet.marker.CREATE"; + String requestId = java.util.UUID.randomUUID().toString(); + + Map queryParameters = new HashMap<>(); + queryParameters.put(BASEPATH_PARAM, basePath); + queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); + queryParameters.put(MARKER_NAME_PARAM, markerName); + queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId); + + boolean result = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result, "Initial marker creation should succeed"); + waitForMarkerCreation(markerDir, markerName, 1000); + + // Restart server to simulate recovery + tearDown(); + setUp(); + + queryParameters.put(MARKER_REQUEST_ID_PARAM, java.util.UUID.randomUUID().toString()); + boolean afterRecovery = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertFalse(afterRecovery, "After recovery (backfill NULL), different requestId should fail"); + } + private void assertMarkerCreation(String basePath, String schema) throws IOException { Map queryParameters = new HashMap<>(); String basePathScheme = getPathWithReplacedSchema(basePath, schema); From ddf29f9f0ac22ae7f108d83a4e1edda96027cdbd Mon Sep 17 00:00:00 2001 From: Pavithran Ravichandiran Date: Mon, 9 Feb 2026 17:29:06 -0800 Subject: [PATCH 7/7] GI-18072 - Addressed comments - around requestId --- .../hudi/timeline/service/RequestHandler.java | 6 +++ .../handlers/marker/MarkerDirState.java | 4 +- .../timeline/service/TestRequestHandler.java | 47 +++++-------------- 3 files changed, 19 insertions(+), 38 deletions(-) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index e9aa1f6c86bbe..c05c95e0dd3b5 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -546,6 +546,12 @@ private void registerMarkerAPI() { app.post(MarkerOperation.CREATE_MARKER_URL, new ViewHandler(ctx -> { metricsRegistry.add("CREATE_MARKER", 1); String requestId = ctx.queryParam(MarkerOperation.MARKER_REQUEST_ID_PARAM); + // Require requestId for all incoming requests + if (requestId == null || requestId.trim().isEmpty()) { + ctx.status(400); + writeValueAsString(ctx, false); + return; + } ctx.future(markerHandler.createMarker( ctx, getMarkerDirParam(ctx), diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index ee5ddaef4dcfe..151f7ae326b06 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -218,9 +218,9 @@ private boolean isMarkerAlreadySeen(MarkerCreationFuture future, String markerNa } String existingRequestId = markerToRequestIdMap.get(markerName); - String normalizedRequestId = requestId == null ? NULL_REQUEST_ID : requestId; boolean idempotentMatch = !NULL_REQUEST_ID.equals(existingRequestId) - && normalizedRequestId.equals(existingRequestId); + && requestId != null + && requestId.equals(existingRequestId); future.setIsSuccessful(idempotentMatch); return true; diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java index 873ce14b78576..83349e7a7eb1e 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java @@ -33,6 +33,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.hadoop.conf.Configuration; +import org.apache.http.client.HttpResponseException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -55,6 +56,7 @@ import static org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod.GET; import static org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod.POST; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class TestRequestHandler extends HoodieCommonTestHarness { @@ -138,6 +140,15 @@ void testMarkerIdempotency() throws IOException, InterruptedException { queryParameters.put(BASEPATH_PARAM, basePath); queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); queryParameters.put(MARKER_NAME_PARAM, markerName); + + // No requestId → server returns 400 + assertThrows(HttpResponseException.class, () -> + timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {})); + queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId1); // First request @@ -168,42 +179,6 @@ void testMarkerIdempotency() throws IOException, InterruptedException { assertFalse(result3, "Retry with different requestId should fail"); } - @Test - void testMarkerBackwardCompatibilityNullExistingRequestId() throws IOException, InterruptedException { - String basePath = tempDir.resolve("base-path-backward-compat").toUri().toString(); - HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, getTableType()); - String markerDir = metaClient.getMarkerFolderPath("104"); - String markerName = "partition1/file1.parquet.marker.CREATE"; - String newRequestId = java.util.UUID.randomUUID().toString(); - - // First request: no requestId (legacy client) → marker stored with null requestId - Map queryParametersLegacy = new HashMap<>(); - queryParametersLegacy.put(BASEPATH_PARAM, basePath); - queryParametersLegacy.put(MARKER_DIR_PATH_PARAM, markerDir); - queryParametersLegacy.put(MARKER_NAME_PARAM, markerName); - // Omit MARKER_REQUEST_ID_PARAM - - boolean result1 = timelineServiceClient.makeRequest( - TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) - .addQueryParams(queryParametersLegacy) - .build()) - .getDecodedContent(new TypeReference() {}); - assertTrue(result1, "First marker creation (no requestId) should succeed"); - - waitForMarkerCreation(markerDir, markerName, 1000); - - // Second request: same marker with non-null requestId → should fail (no backward compat) - Map queryParametersWithRequestId = new HashMap<>(queryParametersLegacy); - queryParametersWithRequestId.put(MARKER_REQUEST_ID_PARAM, newRequestId); - - boolean result2 = timelineServiceClient.makeRequest( - TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) - .addQueryParams(queryParametersWithRequestId) - .build()) - .getDecodedContent(new TypeReference() {}); - assertFalse(result2, "Request with non-null requestId when marker has null should fail (no backward compat)"); - } - @Test void testMarkerCreationAfterRecovery() throws IOException, InterruptedException { String basePath = tempDir.resolve("base-path-recovery").toUri().toString();