diff --git a/.gitignore b/.gitignore index 1e9c6c992cca6..5bdb70ed9939c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ target/ metastore_db/ .metals/ .mvn/ +.m2repo/ *.bloop/ *.vscode/ *.metals/ diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java index b31fdc94178af..0c87dcd6a05ed 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java @@ -43,6 +43,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import static org.apache.hudi.common.table.marker.MarkerOperation.ALL_MARKERS_URL; @@ -53,6 +54,7 @@ import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_BASEPATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_REQUEST_ID_PARAM; /** * Marker operations of using timeline server as a proxy to create and delete markers. @@ -197,6 +199,7 @@ private Map getConfigMap( } else { paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName); } + paramsMap.put(MARKER_REQUEST_ID_PARAM, UUID.randomUUID().toString()); if (initEarlyConflictDetectionConfigs) { paramsMap.put(MARKER_BASEPATH_PARAM, basePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java index 965e8192dec77..42fcd8dca943b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java @@ -30,6 +30,7 @@ public class MarkerOperation implements Serializable { public static final String MARKER_DIR_PATH_PARAM = "markerdirpath"; public static final String MARKER_NAME_PARAM = "markername"; public static final String MARKER_BASEPATH_PARAM = "basepath"; + public static final String MARKER_REQUEST_ID_PARAM = "requestid"; // GET requests public static final String ALL_MARKERS_URL = String.format("%s/%s", BASE_URL, "all"); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 065e1457d6a17..c05c95e0dd3b5 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -545,11 +545,19 @@ private void registerMarkerAPI() { app.post(MarkerOperation.CREATE_MARKER_URL, new ViewHandler(ctx -> { metricsRegistry.add("CREATE_MARKER", 1); + String requestId = ctx.queryParam(MarkerOperation.MARKER_REQUEST_ID_PARAM); + // Require requestId for all incoming requests + if (requestId == null || requestId.trim().isEmpty()) { + ctx.status(400); + writeValueAsString(ctx, false); + return; + } ctx.future(markerHandler.createMarker( ctx, getMarkerDirParam(ctx), ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault(""))); + ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault(""), + requestId)); }, false)); app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java index 7a35266720a6a..61a46e05770c2 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java @@ -184,9 +184,12 @@ public boolean doesMarkerDirExist(String markerDir) { * @param context Javalin app context * @param markerDir marker directory path * @param markerName marker name + * @param basePath base path + * @param requestId optional request ID for idempotent retries (POST only); null for legacy or when TLS recovers * @return the {@code CompletableFuture} instance for the request */ - public CompletableFuture createMarker(Context context, String markerDir, String markerName, String basePath) { + public CompletableFuture createMarker(Context context, String markerDir, String markerName, String basePath, + String requestId) { // Step1 do early conflict detection if enable if (timelineServiceConfig.earlyConflictDetectionEnable) { try { @@ -237,21 +240,19 @@ public CompletableFuture createMarker(Context context, String markerDir, log.warn("Failed to execute early conflict detection. Marker creation will continue.", e); // When early conflict detection fails to execute, we still allow the marker creation // to continue - return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName); + return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName, requestId); } } // Step 2 create marker - return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName); + return addMarkerCreationRequestForAsyncProcessing(context, markerDir, markerName, requestId); } private MarkerCreationFuture addMarkerCreationRequestForAsyncProcessing( - Context context, String markerDir, String markerName) { - log.debug("Request: Create marker: {}", markerName); - MarkerCreationFuture future = new MarkerCreationFuture(context, markerDir, markerName); - // Add the future to the list + Context context, String markerDir, String markerName, String requestId) { + log.debug("Request: Create marker: {} with requestId: {}", markerName, requestId); MarkerDirState markerDirState = getMarkerDirState(markerDir); - markerDirState.addMarkerCreationFuture(future); + MarkerCreationFuture future = markerDirState.getOrCreateMarkerCreationFuture(context, markerName, requestId); if (!firstCreationRequestSeen) { synchronized (firstCreationRequestSeenLock) { if (!firstCreationRequestSeen) { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java index 8b6d9192f9b61..f0580d65336b2 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCreationFuture.java @@ -29,6 +29,7 @@ /** * Future for async marker creation request. + * Request ID is optional; when present it enables idempotent retries (same request ID = same logical create). */ @Getter @Slf4j @@ -37,16 +38,22 @@ public class MarkerCreationFuture extends CompletableFuture { private final Context context; private final String markerDirPath; private final String markerName; + private final String requestId; private boolean isSuccessful; @Getter(AccessLevel.NONE) private final HoodieTimer timer; public MarkerCreationFuture(Context context, String markerDirPath, String markerName) { + this(context, markerDirPath, markerName, null); + } + + public MarkerCreationFuture(Context context, String markerDirPath, String markerName, String requestId) { super(); this.timer = HoodieTimer.start(); this.context = context; this.markerDirPath = markerDirPath; this.markerName = markerName; + this.requestId = requestId; this.isSuccessful = false; } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 9f804086f4811..151f7ae326b06 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -34,7 +34,7 @@ import org.apache.hudi.storage.StoragePath; import com.fasterxml.jackson.core.JsonProcessingException; -import lombok.Getter; +import io.javalin.http.Context; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.util.StringUtils; @@ -45,11 +45,13 @@ import java.io.Serializable; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -65,13 +67,14 @@ @Slf4j public class MarkerDirState implements Serializable { + private static final String NULL_REQUEST_ID = ""; // Marker directory private final StoragePath markerDirPath; private final HoodieStorage storage; private final Registry metricsRegistry; - // A cached copy of all markers in memory - @Getter - private final Set allMarkers = new HashSet<>(); + // Marker name -> request ID (empty string sentinel when TLS recovers from crash). + // Enables idempotent retries: same requestId for same marker is treated as success. + private final Map markerToRequestIdMap = new ConcurrentHashMap<>(); // A cached copy of marker entries in each marker file, stored in StringBuilder // for efficient appending // Mapping: {markerFileIndex -> markers} @@ -80,8 +83,9 @@ public class MarkerDirState implements Serializable { // {@code true} means the file is in use by a {@code BatchCreateMarkerRunnable}. // Index of the list is used for the filename, i.e., "1" -> "MARKERS1" private final List threadUseStatus; - // A list of pending futures from async marker creation requests - private final List markerCreationFutures = new ArrayList<>(); + // Map of in-flight requests: (markerName + "|" + requestId) -> MarkerCreationFuture + // Used for BOTH deduplication AND batch marker creation requests queue + private final Map inflightRequestMap = new ConcurrentHashMap<>(); private final int parallelism; private final Object markerCreationProcessingLock = new Object(); // Early conflict detection strategy if enabled @@ -107,6 +111,13 @@ public MarkerDirState(String markerDirPath, int markerBatchNumThreads, syncMarkersFromFileSystem(); } + /** + * @return all marker names in the directory (backward compatible). + */ + public Set getAllMarkers() { + return Collections.unmodifiableSet(new HashSet<>(markerToRequestIdMap.keySet())); + } + /** * @return {@code true} if the marker directory exists in the system. */ @@ -119,15 +130,27 @@ public boolean exists() { } /** - * Adds a {@code MarkerCreationCompletableFuture} instance from a marker - * creation request to the queue. + * Checks if an identical in-flight request exists and returns it, or creates a new future. + * This deduplicates concurrent requests with the same marker and requestId. * - * @param future {@code MarkerCreationCompletableFuture} instance. + * @param context Javalin context + * @param markerName Marker name + * @param requestId Request ID (non-null from clients, can be null only during recovery) + * @return Existing future if found, otherwise a new future */ - public void addMarkerCreationFuture(MarkerCreationFuture future) { - synchronized (markerCreationFutures) { - markerCreationFutures.add(future); + public MarkerCreationFuture getOrCreateMarkerCreationFuture(Context context, String markerName, String requestId) { + String dedupKey = markerName + "|" + (requestId != null ? requestId : NULL_REQUEST_ID); + + MarkerCreationFuture existingFuture = inflightRequestMap.get(dedupKey); + if (existingFuture != null) { + log.debug("Found existing in-flight request for marker {} with requestId {}. Returning same future.", + markerName, requestId != null ? requestId : "null"); + return existingFuture; } + + MarkerCreationFuture future = new MarkerCreationFuture(context, markerDirPath.toString(), markerName, requestId); + inflightRequestMap.put(dedupKey, future); + return future; } /** @@ -174,23 +197,35 @@ public List fetchPendingMarkerCreationRequests() { } /** - * @param shouldClear Should clear the internal request list or not. + * @param shouldClear Should clear the internal request map or not. * @return futures of pending marker creation requests. */ public List getPendingMarkerCreationRequests(boolean shouldClear) { - List pendingFutures; - synchronized (markerCreationFutures) { - if (markerCreationFutures.isEmpty()) { - return new ArrayList<>(); - } - pendingFutures = new ArrayList<>(markerCreationFutures); - if (shouldClear) { - markerCreationFutures.clear(); - } + if (inflightRequestMap.isEmpty()) { + return Collections.emptyList(); + } + + List pendingFutures = new ArrayList<>(inflightRequestMap.values()); + if (shouldClear) { + inflightRequestMap.clear(); } return pendingFutures; } + private boolean isMarkerAlreadySeen(MarkerCreationFuture future, String markerName, String requestId) { + if (!markerToRequestIdMap.containsKey(markerName)) { + return false; + } + + String existingRequestId = markerToRequestIdMap.get(markerName); + boolean idempotentMatch = !NULL_REQUEST_ID.equals(existingRequestId) + && requestId != null + && requestId.equals(existingRequestId); + + future.setIsSuccessful(idempotentMatch); + return true; + } + /** * Processes pending marker creation requests. * @@ -207,11 +242,13 @@ public void processMarkerCreationRequests( log.debug("timeMs={} markerDirPath={} numRequests={} fileIndex={}", System.currentTimeMillis(), markerDirPath, pendingMarkerCreationFutures.size(), fileIndex); boolean shouldFlushMarkers = false; - + synchronized (markerCreationProcessingLock) { for (MarkerCreationFuture future : pendingMarkerCreationFutures) { String markerName = future.getMarkerName(); - boolean exists = allMarkers.contains(markerName); + String requestId = future.getRequestId(); + + boolean exists = isMarkerAlreadySeen(future, markerName, requestId); if (!exists) { if (conflictDetectionStrategy.isPresent()) { try { @@ -225,16 +262,16 @@ public void processMarkerCreationRequests( log.warn("Failed to execute early conflict detection. Marker creation will continue.", e); // When early conflict detection fails to execute, we still allow the marker creation // to continue - addMarkerToMap(fileIndex, markerName); + addMarkerToMap(fileIndex, markerName, requestId); future.setIsSuccessful(true); shouldFlushMarkers = true; continue; } } - addMarkerToMap(fileIndex, markerName); + addMarkerToMap(fileIndex, markerName, requestId); + future.setIsSuccessful(true); shouldFlushMarkers = true; } - future.setIsSuccessful(!exists); } if (!isMarkerTypeWritten) { @@ -265,13 +302,14 @@ public void processMarkerCreationRequests( */ public boolean deleteAllMarkers() { boolean result = FSUtils.deleteDir(hoodieEngineContext, storage, markerDirPath, parallelism); - allMarkers.clear(); + markerToRequestIdMap.clear(); fileMarkersMap.clear(); return result; } /** * Syncs all markers maintained in the underlying files under the marker directory in the file system. + * When TLS recovers from crash, we have no request IDs so store sentinel value for each marker. */ private void syncMarkersFromFileSystem() { Map> fileMarkersSetMap = MarkerUtils.readTimelineServerBasedMarkersFromFileSystem( @@ -282,7 +320,9 @@ private void syncMarkersFromFileSystem() { int index = parseMarkerFileIndex(markersFilePathStr); if (index >= 0) { fileMarkersMap.put(index, new StringBuilder(StringUtils.join(",", fileMarkers))); - allMarkers.addAll(fileMarkers); + for (String marker : fileMarkers) { + markerToRequestIdMap.put(marker, NULL_REQUEST_ID); + } } } } @@ -301,9 +341,10 @@ private void syncMarkersFromFileSystem() { * * @param fileIndex Marker file index number. * @param markerName Marker name. + * @param requestId Request ID for idempotency; null for recovery. */ - private void addMarkerToMap(int fileIndex, String markerName) { - allMarkers.add(markerName); + private void addMarkerToMap(int fileIndex, String markerName, String requestId) { + markerToRequestIdMap.put(markerName, requestId == null ? NULL_REQUEST_ID : requestId); StringBuilder stringBuilder = fileMarkersMap.computeIfAbsent(fileIndex, k -> new StringBuilder(16384)); stringBuilder.append(markerName); stringBuilder.append('\n'); diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java index acb8e4bd0d763..83349e7a7eb1e 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/TestRequestHandler.java @@ -33,23 +33,30 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.hadoop.conf.Configuration; +import org.apache.http.client.HttpResponseException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.hudi.common.config.HoodieStorageConfig.HOODIE_STORAGE_CLASS; +import static org.apache.hudi.common.table.marker.MarkerOperation.ALL_MARKERS_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.CREATE_MARKER_URL; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_DIR_PATH_PARAM; import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_NAME_PARAM; +import static org.apache.hudi.common.table.marker.MarkerOperation.MARKER_REQUEST_ID_PARAM; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.BASEPATH_PARAM; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.LAST_INSTANT_TS; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.REFRESH_TABLE_URL; import static org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.TIMELINE_HASH; +import static org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod.GET; import static org.apache.hudi.timeline.TimelineServiceClientBase.RequestMethod.POST; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; class TestRequestHandler extends HoodieCommonTestHarness { @@ -120,6 +127,93 @@ void testCreateMarkerAPIWithDifferentSchemes() throws IOException { assertMarkerCreation(tempDir.resolve("base-path-2").toUri().toString(), "test2:/"); } + @Test + void testMarkerIdempotency() throws IOException, InterruptedException { + String basePath = tempDir.resolve("base-path-idempotency").toUri().toString(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, getTableType()); + String markerDir = metaClient.getMarkerFolderPath("102"); + String markerName = "partition1/file1.parquet.marker.CREATE"; + String requestId1 = java.util.UUID.randomUUID().toString(); + String requestId2 = java.util.UUID.randomUUID().toString(); + + Map queryParameters = new HashMap<>(); + queryParameters.put(BASEPATH_PARAM, basePath); + queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); + queryParameters.put(MARKER_NAME_PARAM, markerName); + + // No requestId → server returns 400 + assertThrows(HttpResponseException.class, () -> + timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {})); + + queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId1); + + // First request + boolean result1 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result1, "First marker creation should succeed"); + + waitForMarkerCreation(markerDir, markerName, 1000); + + // Retry with same requestId (simulating timeout + retry) should succeed (idempotent) + boolean result2 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result2, "Retry with same requestId should succeed (idempotent)"); + + // Retry with different requestId (distinct logical request) should fail + queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId2); + boolean result3 = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertFalse(result3, "Retry with different requestId should fail"); + } + + @Test + void testMarkerCreationAfterRecovery() throws IOException, InterruptedException { + String basePath = tempDir.resolve("base-path-recovery").toUri().toString(); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(basePath, getTableType()); + String markerDir = metaClient.getMarkerFolderPath("105"); + String markerName = "partition1/file1.parquet.marker.CREATE"; + String requestId = java.util.UUID.randomUUID().toString(); + + Map queryParameters = new HashMap<>(); + queryParameters.put(BASEPATH_PARAM, basePath); + queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); + queryParameters.put(MARKER_NAME_PARAM, markerName); + queryParameters.put(MARKER_REQUEST_ID_PARAM, requestId); + + boolean result = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertTrue(result, "Initial marker creation should succeed"); + waitForMarkerCreation(markerDir, markerName, 1000); + + // Restart server to simulate recovery + tearDown(); + setUp(); + + queryParameters.put(MARKER_REQUEST_ID_PARAM, java.util.UUID.randomUUID().toString()); + boolean afterRecovery = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference() {}); + assertFalse(afterRecovery, "After recovery (backfill NULL), different requestId should fail"); + } + private void assertMarkerCreation(String basePath, String schema) throws IOException { Map queryParameters = new HashMap<>(); String basePathScheme = getPathWithReplacedSchema(basePath, schema); @@ -129,6 +223,8 @@ private void assertMarkerCreation(String basePath, String schema) throws IOExcep queryParameters.put(BASEPATH_PARAM, basePathScheme); queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); queryParameters.put(MARKER_NAME_PARAM, "marker-file-1"); + // Add requestId for idempotency + queryParameters.put(MARKER_REQUEST_ID_PARAM, java.util.UUID.randomUUID().toString()); boolean content = timelineServiceClient.makeRequest( TimelineServiceClient.Request.newBuilder(POST, CREATE_MARKER_URL) @@ -147,4 +243,28 @@ private String getPathWithReplacedSchema(String path, String schemaToUse) { } throw new IllegalArgumentException("Invalid file provided"); } + + private void waitForMarkerCreation(String markerDir, String expectedMarker, long timeoutMs) + throws IOException, InterruptedException { + long startTime = System.currentTimeMillis(); + long pollIntervalMs = 50; + + Map queryParameters = new HashMap<>(); + queryParameters.put(MARKER_DIR_PATH_PARAM, markerDir); + + while (System.currentTimeMillis() - startTime < timeoutMs) { + List markers = timelineServiceClient.makeRequest( + TimelineServiceClient.Request.newBuilder(GET, ALL_MARKERS_URL) + .addQueryParams(queryParameters) + .build()) + .getDecodedContent(new TypeReference>() { + }); + + if (markers != null && markers.contains(expectedMarker)) { + return; + } + + Thread.sleep(pollIntervalMs); + } + } }