Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails
import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse
import java.lang.AutoCloseable
import java.time.Duration
import java.time.LocalTime
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.min

/**
* Interface for caching configuration data.
Expand Down Expand Up @@ -97,45 +99,78 @@ class DefaultConfigurationCache(
}

/**
* Holds both the configuration and its refresh timestamp atomically. This
* prevents race conditions between checking expiration and updating.
* Holds the configuration, its refresh timestamp, and the effective refresh
* interval atomically. This prevents race conditions between checking
* expiration and updating, and ensures we use the correct refresh interval
* from when the config was fetched.
*/
private data class CachedData(
val configuration: ConfigurationResponse,
val timestamp: Long,
val refreshInterval: Long,
)

private val cache = AtomicReference<CachedData?>(null)

/**
* Helper function to calculate effective refresh interval from a
* configuration response. If the configuration specifies a refresh
* interval, use the minimum of that and the default. Otherwise, use the
* default refresh interval.
*/
private fun calculateEffectiveRefreshInterval(
config: ConfigurationResponse?,
): Long {
val configRefreshInterval = config?.containerSettings?.refreshInterval
return if (configRefreshInterval?.isNotEmpty() == true) {
min(
this.refreshInterval.toMillis(),
LocalTime.parse(configRefreshInterval).toSecondOfDay() *
1000L,
Comment on lines +128 to +129
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The refresh interval parsing uses LocalTime.parse() which expects a time format (HH:mm:ss), but the actual format in the config is a TimeSpan/Duration string ("01:00:00"). While "01:00:00" may parse as LocalTime, this is semantically incorrect - you're parsing a duration as a time-of-day. Consider using Duration.parse() with the ISO-8601 duration format (e.g., "PT1H") or implement proper TimeSpan parsing that handles the format correctly.

Copilot uses AI. Check for mistakes.
)
} else {
this.refreshInterval.toMillis()
}
}

override suspend fun getConfiguration(): ConfigurationResponse {
val currentTime = System.currentTimeMillis()
val cached = cache.get()
val cachedData = cache.get()

// Check if we need to refresh
// Check if we need to refresh based on the effective refresh interval
// stored with the cached data
val needsRefresh =
cached == null ||
(currentTime - cached.timestamp) >=
refreshInterval.toMillis()
cachedData == null ||
(currentTime - cachedData.timestamp) >=
cachedData.refreshInterval

if (needsRefresh) {
// Attempt to refresh - only one thread will succeed
val newConfig =
runCatching { provider() }
.getOrElse {
// If fetch fails, return cached if available, otherwise rethrow
cached?.configuration ?: throw it
cachedData?.configuration ?: throw it
}

// Calculate effective refresh interval from the NEW configuration
val newEffectiveRefreshInterval =
calculateEffectiveRefreshInterval(newConfig)

// Atomically update if still needed (prevents thundering herd)
cache.updateAndGet { current ->
val currentTimestamp = current?.timestamp ?: 0
// Only update if current is null or still stale
// Only update if current is null or still stale based on its
// stored effective interval
if (
current == null ||
(currentTime - currentTimestamp) >=
refreshInterval.toMillis()
(currentTime - current.timestamp) >=
current.refreshInterval
) {
CachedData(newConfig, currentTime)
CachedData(
newConfig,
currentTime,
newEffectiveRefreshInterval,
)
} else {
// Another thread already refreshed
current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ abstract class ContainerUploaderBase(
private val retryPolicy: IngestRetryPolicy,
private val maxConcurrency: Int,
private val maxDataSize: Long,
private val configurationCache: ConfigurationCache,
protected val configurationCache: ConfigurationCache,
private val uploadMethod: UploadMethod,
private val tokenCredential: TokenCredential?,
) : IUploader {
Expand Down Expand Up @@ -100,7 +100,7 @@ abstract class ContainerUploaderBase(
}

// Get containers from configuration
val containers = selectContainers(configurationCache, uploadMethod)
val containers = selectContainers(uploadMethod)

if (containers.isEmpty()) {
logger.error("No containers available for upload")
Expand Down Expand Up @@ -706,17 +706,14 @@ abstract class ContainerUploaderBase(
}

/**
* Selects the appropriate containers for upload based on the provided
* configuration cache and upload method.
* Selects the appropriate containers for upload based on the uploader's
* configuration cache and the specified upload method.
*
* @param configurationCache The configuration cache to use for selecting
* containers.
* @param uploadMethod The upload method to consider when selecting
* containers.
* @return A list of selected container information.
*/
abstract suspend fun selectContainers(
configurationCache: ConfigurationCache,
uploadMethod: UploadMethod,
): List<ExtendedContainerInfo>
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ internal constructor(
}

override suspend fun selectContainers(
configurationCache: ConfigurationCache,
uploadMethod: UploadMethod,
): List<ExtendedContainerInfo> {
// This method is delegated to and this calls getConfiguration again to ensure fresh data is
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package com.microsoft.azure.kusto.ingest.v2.uploader

import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache
import com.microsoft.azure.kusto.ingest.v2.common.serialization.OffsetDateTimeSerializer
import com.microsoft.azure.kusto.ingest.v2.models.ConfigurationResponse
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import kotlinx.serialization.json.Json
import kotlinx.serialization.modules.SerializersModule
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertNotNull
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import java.time.OffsetDateTime

class ManagedUploaderTest {

@ParameterizedTest(name = "PreferredUploadMethod={0}")
@CsvSource("DEFAULT", "STORAGE", "LAKE")
fun selectContainers(preferredUploadMethod: String): Unit = runBlocking {
val uploadMethod = UploadMethod.valueOf(preferredUploadMethod)
val configurationCache = TestConfigurationCache()
val managedUploader =
ManagedUploaderBuilder.create()
.withConfigurationCache(configurationCache)
.build()
val selectedContainers = managedUploader.selectContainers(uploadMethod)
assertNotNull(selectedContainers)
assertTrue(selectedContainers.isNotEmpty())
selectedContainers.forEach {
assertNotNull(it.containerInfo.path)
// If we get both lake and storage and user does not specify, lake is preferred. If user
// specifies, respect that.
Comment on lines +40 to +41
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test logic for DEFAULT upload method has an issue. According to the test comment on lines 40-41, when DEFAULT is used and both lake and storage are available, lake should be preferred. However, the config-response.json sets "preferredUploadMethod": "Lake", and the actual ManagedUploader implementation (lines 64-100 in ManagedUploader.kt) will honor this server preference. The test correctly expects lake, but the comment suggests this is a client-side default rather than respecting the server's preference. Consider clarifying whether this test is validating the server preference mechanism or the client-side default behavior.

Suggested change
// If we get both lake and storage and user does not specify, lake is preferred. If user
// specifies, respect that.
// When the server configuration prefers Lake and the user does not specify (DEFAULT),
// ManagedUploader should honor the server preference and use Lake. If the user explicitly
// specifies a method (e.g., STORAGE), that explicit choice is respected.

Copilot uses AI. Check for mistakes.
if (uploadMethod != UploadMethod.STORAGE) {
assertTrue(
it.containerInfo.path?.contains("alakefolder") ?: false,
)
assertFalse(
it.containerInfo.path?.contains("somecontainer")
?: false,
)
} else {
// User mentioned storage here, use that
assertFalse(
it.containerInfo.path?.contains("alakefolder") ?: false,
)
assertTrue(
it.containerInfo.path?.contains("somecontainer")
?: false,
)
}
}
}

private class TestConfigurationCache : ConfigurationCache {
private val json = Json {
ignoreUnknownKeys = true
serializersModule = SerializersModule {
contextual(OffsetDateTime::class, OffsetDateTimeSerializer)
}
}
override val refreshInterval: Duration
get() = Duration.ofHours(1)

override suspend fun getConfiguration(): ConfigurationResponse {
val resourcesDirectory = "src/test/resources/"
val fileName = "config-response.json"
val configContent =
withContext(Dispatchers.IO) {
Files.readString(
Paths.get(resourcesDirectory + fileName),
StandardCharsets.UTF_8,
)
}
val configurationResponse =
json.decodeFromString<ConfigurationResponse>(configContent)

assertNotNull(configurationResponse)
assertNotNull(configurationResponse.containerSettings)
return configurationResponse
}

override fun close() {
// No resources to clean up in this test implementation
}
}
}
21 changes: 21 additions & 0 deletions ingest-v2/src/test/resources/config-response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"containerSettings": {
"containers": [
{
"path": "https://somecontainer.z11.blob.storage.azure.net/trdwvweg9nfnngghb1eey-20260108-ingestdata-e5c334ee145d4b4-0?sv=keys"
}
],
"lakeFolders": [
{
"path": "https://alakefolder.onelake.fabric.microsoft.com/17a97d10-a17f-4d72-8f38-858aac992978/bb9c26d4-4f99-44b5-9614-3ebb037f3510/Ingestions/20260108-lakedata"
}
],
"refreshInterval": "01:00:00",
"preferredUploadMethod": "Lake"
},
"ingestionSettings": {
"maxBlobsPerBatch": 20,
"maxDataSize": 6442450944,
"preferredIngestionMethod": "Rest"
}
}
Loading