diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt index b716a7bb..6f4976cf 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt @@ -10,7 +10,9 @@ import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse import java.lang.AutoCloseable import java.time.Duration +import java.time.LocalTime import java.util.concurrent.atomic.AtomicReference +import kotlin.math.min /** * Interface for caching configuration data. @@ -97,25 +99,50 @@ class DefaultConfigurationCache( } /** - * Holds both the configuration and its refresh timestamp atomically. This - * prevents race conditions between checking expiration and updating. + * Holds the configuration, its refresh timestamp, and the effective refresh + * interval atomically. This prevents race conditions between checking + * expiration and updating, and ensures we use the correct refresh interval + * from when the config was fetched. */ private data class CachedData( val configuration: ConfigurationResponse, val timestamp: Long, + val refreshInterval: Long, ) private val cache = AtomicReference(null) + /** + * Helper function to calculate effective refresh interval from a + * configuration response. If the configuration specifies a refresh + * interval, use the minimum of that and the default. Otherwise, use the + * default refresh interval. + */ + private fun calculateEffectiveRefreshInterval( + config: ConfigurationResponse?, + ): Long { + val configRefreshInterval = config?.containerSettings?.refreshInterval + return if (configRefreshInterval?.isNotEmpty() == true) { + min( + this.refreshInterval.toMillis(), + LocalTime.parse(configRefreshInterval).toSecondOfDay() * + 1000L, + ) + } else { + this.refreshInterval.toMillis() + } + } + override suspend fun getConfiguration(): ConfigurationResponse { val currentTime = System.currentTimeMillis() - val cached = cache.get() + val cachedData = cache.get() - // Check if we need to refresh + // Check if we need to refresh based on the effective refresh interval + // stored with the cached data val needsRefresh = - cached == null || - (currentTime - cached.timestamp) >= - refreshInterval.toMillis() + cachedData == null || + (currentTime - cachedData.timestamp) >= + cachedData.refreshInterval if (needsRefresh) { // Attempt to refresh - only one thread will succeed @@ -123,19 +150,27 @@ class DefaultConfigurationCache( runCatching { provider() } .getOrElse { // If fetch fails, return cached if available, otherwise rethrow - cached?.configuration ?: throw it + cachedData?.configuration ?: throw it } + // Calculate effective refresh interval from the NEW configuration + val newEffectiveRefreshInterval = + calculateEffectiveRefreshInterval(newConfig) + // Atomically update if still needed (prevents thundering herd) cache.updateAndGet { current -> - val currentTimestamp = current?.timestamp ?: 0 - // Only update if current is null or still stale + // Only update if current is null or still stale based on its + // stored effective interval if ( current == null || - (currentTime - currentTimestamp) >= - refreshInterval.toMillis() + (currentTime - current.timestamp) >= + current.refreshInterval ) { - CachedData(newConfig, currentTime) + CachedData( + newConfig, + currentTime, + newEffectiveRefreshInterval, + ) } else { // Another thread already refreshed current diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index 2058bf7c..0f69e3a7 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -48,7 +48,7 @@ abstract class ContainerUploaderBase( private val retryPolicy: IngestRetryPolicy, private val maxConcurrency: Int, private val maxDataSize: Long, - private val configurationCache: ConfigurationCache, + protected val configurationCache: ConfigurationCache, private val uploadMethod: UploadMethod, private val tokenCredential: TokenCredential?, ) : IUploader { @@ -100,7 +100,7 @@ abstract class ContainerUploaderBase( } // Get containers from configuration - val containers = selectContainers(configurationCache, uploadMethod) + val containers = selectContainers(uploadMethod) if (containers.isEmpty()) { logger.error("No containers available for upload") @@ -706,17 +706,14 @@ abstract class ContainerUploaderBase( } /** - * Selects the appropriate containers for upload based on the provided - * configuration cache and upload method. + * Selects the appropriate containers for upload based on the uploader's + * configuration cache and the specified upload method. * - * @param configurationCache The configuration cache to use for selecting - * containers. * @param uploadMethod The upload method to consider when selecting * containers. * @return A list of selected container information. */ abstract suspend fun selectContainers( - configurationCache: ConfigurationCache, uploadMethod: UploadMethod, ): List } diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt index a2084e20..df38b52e 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploader.kt @@ -40,7 +40,6 @@ internal constructor( } override suspend fun selectContainers( - configurationCache: ConfigurationCache, uploadMethod: UploadMethod, ): List { // This method is delegated to and this calls getConfiguration again to ensure fresh data is diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt new file mode 100644 index 00000000..03f55037 --- /dev/null +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ManagedUploaderTest.kt @@ -0,0 +1,95 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2.uploader + +import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache +import com.microsoft.azure.kusto.ingest.v2.common.serialization.OffsetDateTimeSerializer +import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import kotlinx.serialization.json.Json +import kotlinx.serialization.modules.SerializersModule +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.CsvSource +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths +import java.time.Duration +import java.time.OffsetDateTime + +class ManagedUploaderTest { + + @ParameterizedTest(name = "PreferredUploadMethod={0}") + @CsvSource("DEFAULT", "STORAGE", "LAKE") + fun selectContainers(preferredUploadMethod: String): Unit = runBlocking { + val uploadMethod = UploadMethod.valueOf(preferredUploadMethod) + val configurationCache = TestConfigurationCache() + val managedUploader = + ManagedUploaderBuilder.create() + .withConfigurationCache(configurationCache) + .build() + val selectedContainers = managedUploader.selectContainers(uploadMethod) + assertNotNull(selectedContainers) + assertTrue(selectedContainers.isNotEmpty()) + selectedContainers.forEach { + assertNotNull(it.containerInfo.path) + // If we get both lake and storage and user does not specify, lake is preferred. If user + // specifies, respect that. + if (uploadMethod != UploadMethod.STORAGE) { + assertTrue( + it.containerInfo.path?.contains("alakefolder") ?: false, + ) + assertFalse( + it.containerInfo.path?.contains("somecontainer") + ?: false, + ) + } else { + // User mentioned storage here, use that + assertFalse( + it.containerInfo.path?.contains("alakefolder") ?: false, + ) + assertTrue( + it.containerInfo.path?.contains("somecontainer") + ?: false, + ) + } + } + } + + private class TestConfigurationCache : ConfigurationCache { + private val json = Json { + ignoreUnknownKeys = true + serializersModule = SerializersModule { + contextual(OffsetDateTime::class, OffsetDateTimeSerializer) + } + } + override val refreshInterval: Duration + get() = Duration.ofHours(1) + + override suspend fun getConfiguration(): ConfigurationResponse { + val resourcesDirectory = "src/test/resources/" + val fileName = "config-response.json" + val configContent = + withContext(Dispatchers.IO) { + Files.readString( + Paths.get(resourcesDirectory + fileName), + StandardCharsets.UTF_8, + ) + } + val configurationResponse = + json.decodeFromString(configContent) + + assertNotNull(configurationResponse) + assertNotNull(configurationResponse.containerSettings) + return configurationResponse + } + + override fun close() { + // No resources to clean up in this test implementation + } + } +} diff --git a/ingest-v2/src/test/resources/config-response.json b/ingest-v2/src/test/resources/config-response.json new file mode 100644 index 00000000..d58a019c --- /dev/null +++ b/ingest-v2/src/test/resources/config-response.json @@ -0,0 +1,21 @@ +{ + "containerSettings": { + "containers": [ + { + "path": "https://somecontainer.z11.blob.storage.azure.net/trdwvweg9nfnngghb1eey-20260108-ingestdata-e5c334ee145d4b4-0?sv=keys" + } + ], + "lakeFolders": [ + { + "path": "https://alakefolder.onelake.fabric.microsoft.com/17a97d10-a17f-4d72-8f38-858aac992978/bb9c26d4-4f99-44b5-9614-3ebb037f3510/Ingestions/20260108-lakedata" + } + ], + "refreshInterval": "01:00:00", + "preferredUploadMethod": "Lake" + }, + "ingestionSettings": { + "maxBlobsPerBatch": 20, + "maxDataSize": 6442450944, + "preferredIngestionMethod": "Rest" + } +} \ No newline at end of file