From bc71e3889ed9f86eb299a1edd6ac80faca801ab6 Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Fri, 9 Jan 2026 08:16:54 +0530 Subject: [PATCH 1/7] * Fix method signature for uploads --- .../v2/uploader/ContainerUploaderBase.kt | 7 +- .../ingest/v2/uploader/ManagedUploader.kt | 1 - .../ingest/v2/uploader/ManagedUploaderTest.kt | 77 +++++++++++++++++++ .../src/test/resources/config-response.json | 21 +++++ 4 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt create mode 100644 ingest-v2/src/test/resources/config-response.json diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index 2058bf7c5..d0fb2fae5 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -48,7 +48,7 @@ abstract class ContainerUploaderBase( private val retryPolicy: IngestRetryPolicy, private val maxConcurrency: Int, private val maxDataSize: Long, - private val configurationCache: ConfigurationCache, + val configurationCache: ConfigurationCache, private val uploadMethod: UploadMethod, private val tokenCredential: TokenCredential?, ) : IUploader { @@ -100,7 +100,7 @@ abstract class ContainerUploaderBase( } // Get containers from configuration - val containers = selectContainers(configurationCache, uploadMethod) + val containers = selectContainers(uploadMethod) if (containers.isEmpty()) { logger.error("No containers available for upload") @@ -709,14 +709,11 @@ abstract class ContainerUploaderBase( * Selects the appropriate containers for upload based on the provided * configuration cache and upload method. * - * @param configurationCache The configuration cache to use for selecting - * containers. * @param uploadMethod The upload method to consider when selecting * containers. * @return A list of selected container information. */ abstract suspend fun selectContainers( - configurationCache: ConfigurationCache, uploadMethod: UploadMethod, ): List } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt index a2084e200..df38b52ed 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt @@ -40,7 +40,6 @@ internal constructor( } override suspend fun selectContainers( - configurationCache: ConfigurationCache, uploadMethod: UploadMethod, ): List { // This method is delegated to and this calls getConfiguration again to ensure fresh data is diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt new file mode 100644 index 000000000..fabcc26cb --- /dev/null +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt @@ -0,0 +1,77 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2.uploader + +import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache +import com.microsoft.azure.kusto.ingest.v2.common.serialization.OffsetDateTimeSerializer +import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import kotlinx.serialization.json.Json +import kotlinx.serialization.modules.SerializersModule +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths +import java.time.Duration +import java.time.OffsetDateTime + +class ManagedUploaderTest { + + @Test + fun selectContainers(): Unit = runBlocking { + val configurationCache = TestConfigurationCache() + val managedUploader = + ManagedUploaderBuilder.create() + .withConfigurationCache(configurationCache) + .build() + val selectedContainers = + managedUploader.selectContainers(UploadMethod.DEFAULT) + assertNotNull(selectedContainers) + assertTrue(selectedContainers.isNotEmpty()) + selectedContainers.forEach { + assertNotNull(it.containerInfo.path) + assertTrue(it.containerInfo.path?.contains("alakefolder") ?: false) + assertFalse( + it.containerInfo.path?.contains("somecontainer") ?: true, + ) + } + } + + private class TestConfigurationCache : ConfigurationCache { + private val json = Json { + ignoreUnknownKeys = true + serializersModule = SerializersModule { + contextual(OffsetDateTime::class, OffsetDateTimeSerializer) + } + } + override val refreshInterval: Duration + get() = Duration.ofHours(1) + + override suspend fun getConfiguration(): ConfigurationResponse { + val resourcesDirectory = "src/test/resources/" + val fileName = "config-response.json" + val configContent = + withContext(Dispatchers.IO) { + Files.readString( + Paths.get(resourcesDirectory + fileName), + StandardCharsets.UTF_8, + ) + } + val configurationResponse = + json.decodeFromString(configContent) + + assertNotNull(configurationResponse) + assertNotNull(configurationResponse.containerSettings) + return configurationResponse + } + + override fun close() { + // No resources to clean up in this test implementation + } + } +} diff --git a/ingest-v2/src/test/resources/config-response.json b/ingest-v2/src/test/resources/config-response.json new file mode 100644 index 000000000..4a3975593 --- /dev/null +++ b/ingest-v2/src/test/resources/config-response.json @@ -0,0 +1,21 @@ +{ + "containerSettings": { + "containers": [ + { + "path": "https://somecontainer.z11.blob.storage.azure.net/trdwvweg9nfnngghb1eey-20260108-ingestdata-e5c334ee145d4b4-0?sv=keys" + } + ], + "lakeFolders": [ + { + "path": "https://alakefolder.onelake.fabric.microsoft.com/17a97d10-a17f-4d72-8f38-858aac992978/bb9c26d4-4f99-44b5-9614-3ebb037f3510/Ingestions/20260108-lakedata%22" + } + ], + "refreshInterval": "01:00:00", + "preferredUploadMethod": "Lake" + }, + "ingestionSettings": { + "maxBlobsPerBatch": 20, + "maxDataSize": 6442450944, + "preferredIngestionMethod": "Rest" + } +} \ No newline at end of file From 29def521cbb6aacd0dcfe0564a1b8e89a43221e5 Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Fri, 9 Jan 2026 11:41:08 +0530 Subject: [PATCH 2/7] * Minor edit to ConfigurationCache to determine refresh interval logic --- .../ingest/v2/common/ConfigurationCache.kt | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt index b716a7bb1..2e226da68 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt @@ -10,7 +10,9 @@ import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse import java.lang.AutoCloseable import java.time.Duration +import java.time.LocalTime import java.util.concurrent.atomic.AtomicReference +import kotlin.math.min /** * Interface for caching configuration data. @@ -109,13 +111,34 @@ class DefaultConfigurationCache( override suspend fun getConfiguration(): ConfigurationResponse { val currentTime = System.currentTimeMillis() - val cached = cache.get() + val cachedConfiguration = cache.get() + val configRefreshInterval = + cachedConfiguration + ?.configuration + ?.containerSettings + ?.refreshInterval + + /** + * Determine the effective refresh interval. If the configuration + * specifies a refresh interval, use the minimum of that and the + * default. Otherwise, use the default refresh interval. + */ + val effectiveRefreshInterval = + if (configRefreshInterval?.isNotEmpty() == true) { + min( + this.refreshInterval.toMillis(), + LocalTime.parse(configRefreshInterval) + .toSecondOfDay() * 1000L, + ) + } else { + this.refreshInterval.toMillis() + } // Check if we need to refresh val needsRefresh = - cached == null || - (currentTime - cached.timestamp) >= - refreshInterval.toMillis() + cachedConfiguration == null || + (currentTime - cachedConfiguration.timestamp) >= + effectiveRefreshInterval if (needsRefresh) { // Attempt to refresh - only one thread will succeed @@ -123,7 +146,7 @@ class DefaultConfigurationCache( runCatching { provider() } .getOrElse { // If fetch fails, return cached if available, otherwise rethrow - cached?.configuration ?: throw it + cachedConfiguration?.configuration ?: throw it } // Atomically update if still needed (prevents thundering herd) @@ -133,7 +156,7 @@ class DefaultConfigurationCache( if ( current == null || (currentTime - currentTimestamp) >= - refreshInterval.toMillis() + effectiveRefreshInterval ) { CachedData(newConfig, currentTime) } else { From 11d42f65b2191bfe6c60d4e8fd57a141828b9523 Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Fri, 9 Jan 2026 12:34:06 +0530 Subject: [PATCH 3/7] * Additional tests for preferred upload combinations --- .../ingest/v2/uploader/ManagedUploaderTest.kt | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt index fabcc26cb..020e583b8 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt @@ -13,7 +13,8 @@ import kotlinx.serialization.modules.SerializersModule import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertNotNull import org.junit.jupiter.api.Assertions.assertTrue -import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource import java.nio.charset.StandardCharsets import java.nio.file.Files import java.nio.file.Paths @@ -22,23 +23,38 @@ import java.time.OffsetDateTime class ManagedUploaderTest { - @Test - fun selectContainers(): Unit = runBlocking { + @ParameterizedTest(name = "PreferredUploadMethod={0}") + @CsvSource("DEFAULT", "STORAGE", "LAKE") + fun selectContainers(preferredUploadMethod: String): Unit = runBlocking { + val uploadMethod = UploadMethod.valueOf(preferredUploadMethod) val configurationCache = TestConfigurationCache() val managedUploader = ManagedUploaderBuilder.create() .withConfigurationCache(configurationCache) .build() - val selectedContainers = - managedUploader.selectContainers(UploadMethod.DEFAULT) + val selectedContainers = managedUploader.selectContainers(uploadMethod) assertNotNull(selectedContainers) assertTrue(selectedContainers.isNotEmpty()) selectedContainers.forEach { assertNotNull(it.containerInfo.path) - assertTrue(it.containerInfo.path?.contains("alakefolder") ?: false) - assertFalse( - it.containerInfo.path?.contains("somecontainer") ?: true, - ) + // If we get both lake and storage and user does not specify, lake is preferred. If user + // specifies, respect that. + if (uploadMethod != UploadMethod.STORAGE) { + assertTrue( + it.containerInfo.path?.contains("alakefolder") ?: false, + ) + assertFalse( + it.containerInfo.path?.contains("somecontainer") ?: true, + ) + } else { + // User mentioned storage here, use that + assertFalse( + it.containerInfo.path?.contains("alakefolder") ?: false, + ) + assertTrue( + it.containerInfo.path?.contains("somecontainer") ?: true, + ) + } } } From 23580f17af5858dd4e94fad11d9ba4d2171e78f1 Mon Sep 17 00:00:00 2001 From: Ramachandran A G <106139410+ag-ramachandran@users.noreply.github.com> Date: Fri, 9 Jan 2026 16:11:15 +0530 Subject: [PATCH 4/7] Update ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index d0fb2fae5..a19254696 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -48,7 +48,7 @@ abstract class ContainerUploaderBase( private val retryPolicy: IngestRetryPolicy, private val maxConcurrency: Int, private val maxDataSize: Long, - val configurationCache: ConfigurationCache, + protected val configurationCache: ConfigurationCache, private val uploadMethod: UploadMethod, private val tokenCredential: TokenCredential?, ) : IUploader { From 32008a63801b0e19e90b441e44450063659a4a78 Mon Sep 17 00:00:00 2001 From: Ramachandran A G <106139410+ag-ramachandran@users.noreply.github.com> Date: Fri, 9 Jan 2026 16:11:24 +0530 Subject: [PATCH 5/7] Update ingest-v2/src/test/resources/config-response.json Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- ingest-v2/src/test/resources/config-response.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest-v2/src/test/resources/config-response.json b/ingest-v2/src/test/resources/config-response.json index 4a3975593..d58a019c0 100644 --- a/ingest-v2/src/test/resources/config-response.json +++ b/ingest-v2/src/test/resources/config-response.json @@ -7,7 +7,7 @@ ], "lakeFolders": [ { - "path": "https://alakefolder.onelake.fabric.microsoft.com/17a97d10-a17f-4d72-8f38-858aac992978/bb9c26d4-4f99-44b5-9614-3ebb037f3510/Ingestions/20260108-lakedata%22" + "path": "https://alakefolder.onelake.fabric.microsoft.com/17a97d10-a17f-4d72-8f38-858aac992978/bb9c26d4-4f99-44b5-9614-3ebb037f3510/Ingestions/20260108-lakedata" } ], "refreshInterval": "01:00:00", From da72756aa7900c9c1b6ea8425baed67e8f2c4486 Mon Sep 17 00:00:00 2001 From: Ramachandran A G <106139410+ag-ramachandran@users.noreply.github.com> Date: Fri, 9 Jan 2026 16:15:48 +0530 Subject: [PATCH 6/7] Update ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index a19254696..0f69e3a75 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -706,8 +706,8 @@ abstract class ContainerUploaderBase( } /** - * Selects the appropriate containers for upload based on the provided - * configuration cache and upload method. + * Selects the appropriate containers for upload based on the uploader's + * configuration cache and the specified upload method. * * @param uploadMethod The upload method to consider when selecting * containers. From a51c91a6200f7b440119bf82d431d2be6bd05e41 Mon Sep 17 00:00:00 2001 From: ag-ramachandran Date: Fri, 9 Jan 2026 16:25:02 +0530 Subject: [PATCH 7/7] * Fix review comments --- .../ingest/v2/common/ConfigurationCache.kt | 80 +++++++++++-------- .../ingest/v2/uploader/ManagedUploaderTest.kt | 6 +- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt index 2e226da68..6f4976cf8 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt @@ -99,46 +99,50 @@ class DefaultConfigurationCache( } /** - * Holds both the configuration and its refresh timestamp atomically. This - * prevents race conditions between checking expiration and updating. + * Holds the configuration, its refresh timestamp, and the effective refresh + * interval atomically. This prevents race conditions between checking + * expiration and updating, and ensures we use the correct refresh interval + * from when the config was fetched. */ private data class CachedData( val configuration: ConfigurationResponse, val timestamp: Long, + val refreshInterval: Long, ) private val cache = AtomicReference(null) + /** + * Helper function to calculate effective refresh interval from a + * configuration response. If the configuration specifies a refresh + * interval, use the minimum of that and the default. Otherwise, use the + * default refresh interval. + */ + private fun calculateEffectiveRefreshInterval( + config: ConfigurationResponse?, + ): Long { + val configRefreshInterval = config?.containerSettings?.refreshInterval + return if (configRefreshInterval?.isNotEmpty() == true) { + min( + this.refreshInterval.toMillis(), + LocalTime.parse(configRefreshInterval).toSecondOfDay() * + 1000L, + ) + } else { + this.refreshInterval.toMillis() + } + } + override suspend fun getConfiguration(): ConfigurationResponse { val currentTime = System.currentTimeMillis() - val cachedConfiguration = cache.get() - val configRefreshInterval = - cachedConfiguration - ?.configuration - ?.containerSettings - ?.refreshInterval - - /** - * Determine the effective refresh interval. If the configuration - * specifies a refresh interval, use the minimum of that and the - * default. Otherwise, use the default refresh interval. - */ - val effectiveRefreshInterval = - if (configRefreshInterval?.isNotEmpty() == true) { - min( - this.refreshInterval.toMillis(), - LocalTime.parse(configRefreshInterval) - .toSecondOfDay() * 1000L, - ) - } else { - this.refreshInterval.toMillis() - } + val cachedData = cache.get() - // Check if we need to refresh + // Check if we need to refresh based on the effective refresh interval + // stored with the cached data val needsRefresh = - cachedConfiguration == null || - (currentTime - cachedConfiguration.timestamp) >= - effectiveRefreshInterval + cachedData == null || + (currentTime - cachedData.timestamp) >= + cachedData.refreshInterval if (needsRefresh) { // Attempt to refresh - only one thread will succeed @@ -146,19 +150,27 @@ class DefaultConfigurationCache( runCatching { provider() } .getOrElse { // If fetch fails, return cached if available, otherwise rethrow - cachedConfiguration?.configuration ?: throw it + cachedData?.configuration ?: throw it } + // Calculate effective refresh interval from the NEW configuration + val newEffectiveRefreshInterval = + calculateEffectiveRefreshInterval(newConfig) + // Atomically update if still needed (prevents thundering herd) cache.updateAndGet { current -> - val currentTimestamp = current?.timestamp ?: 0 - // Only update if current is null or still stale + // Only update if current is null or still stale based on its + // stored effective interval if ( current == null || - (currentTime - currentTimestamp) >= - effectiveRefreshInterval + (currentTime - current.timestamp) >= + current.refreshInterval ) { - CachedData(newConfig, currentTime) + CachedData( + newConfig, + currentTime, + newEffectiveRefreshInterval, + ) } else { // Another thread already refreshed current diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt index 020e583b8..03f55037a 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt @@ -44,7 +44,8 @@ class ManagedUploaderTest { it.containerInfo.path?.contains("alakefolder") ?: false, ) assertFalse( - it.containerInfo.path?.contains("somecontainer") ?: true, + it.containerInfo.path?.contains("somecontainer") + ?: false, ) } else { // User mentioned storage here, use that @@ -52,7 +53,8 @@ class ManagedUploaderTest { it.containerInfo.path?.contains("alakefolder") ?: false, ) assertTrue( - it.containerInfo.path?.contains("somecontainer") ?: true, + it.containerInfo.path?.contains("somecontainer") + ?: false, ) } }