Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,31 @@ interface IngestClient : Closeable {
interface MultiIngestClient : IngestClient {

/**
* Ingest data from multiple sources.
* Ingest data from multiple blob sources.
*
* @param sources The sources to ingest.
* **Important:** Multi-blob ingestion only supports [BlobSource]. This design avoids
* partial failure scenarios where some local sources might be uploaded successfully
* while others fail, leaving the user in an inconsistent state.
*
* **For local files/streams**, you have two options:
*
* 1. **Single-source ingestion**: Use `ingestAsync(source, properties)` with a single
* [com.microsoft.azure.kusto.ingest.v2.source.LocalSource] (FileSource or StreamSource).
* The client handles upload internally.
*
* 2. **Multi-source ingestion**: Use [com.microsoft.azure.kusto.ingest.v2.uploader.IUploader]
* to upload local sources to blob storage first, then call this method with the
* resulting [BlobSource] objects.
*
* @param sources The blob sources to ingest. All sources must be [BlobSource] instances.
* @param ingestRequestProperties Ingestion properties containing database,
* table, format, and other settings.
* @return An [IngestionOperation] object that can be used to track the
* status of the ingestion.
* @return An [ExtendedIngestResponse] containing the ingestion operation details.
*/
suspend fun ingestAsync(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): ExtendedIngestResponse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ import com.microsoft.azure.kusto.ingest.v2.uploader.IUploader
import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.future.future
import kotlinx.coroutines.withTimeoutOrNull
Expand Down Expand Up @@ -74,13 +71,17 @@ internal constructor(
private val logger = LoggerFactory.getLogger(QueuedIngestClient::class.java)

/**
* Ingests data from multiple sources with the given properties. This is the
* Ingests data from multiple blob sources with the given properties. This is the
* suspend function for Kotlin callers.
*
* Multi-blob ingestion only supports [BlobSource]. The blobs are assumed to already
* exist in blob storage, so no upload is performed - the request is sent directly
* to the Data Management service.
*/
override suspend fun ingestAsync(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): ExtendedIngestResponse =
ingestAsyncInternal(
Expand Down Expand Up @@ -108,14 +109,18 @@ internal constructor(
)

/**
* Ingests data from multiple sources with the given properties. This is the
* Ingests data from multiple blob sources with the given properties. This is the
* Java-friendly version that returns a CompletableFuture.
*
* Multi-blob ingestion only supports [BlobSource]. The blobs are assumed to already
* exist in blob storage, so no upload is performed - the request is sent directly
* to the Data Management service.
*/
@JvmName("ingestAsync")
fun ingestAsyncJava(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): CompletableFuture<ExtendedIngestResponse> =
CoroutineScope(Dispatchers.IO).future {
Expand Down Expand Up @@ -197,14 +202,18 @@ internal constructor(
)
}

/** Internal implementation of ingestAsync for multiple sources. */
/**
* Internal implementation of ingestAsync for multiple blob sources.
*
* This method only accepts [BlobSource] - no upload is performed.
* The blobs are assumed to already exist in blob storage.
*/
private suspend fun ingestAsyncInternal(
database: String,
table: String,
sources: List<IngestionSource>,
sources: List<BlobSource>,
ingestRequestProperties: IngestRequestProperties?,
): ExtendedIngestResponse {
// Extract database and table from properties
// Validate sources list is not empty
require(sources.isNotEmpty()) { "sources list cannot be empty" }
val maxBlobsPerBatch = getMaxSourcesPerMultiIngest()
Expand All @@ -228,16 +237,13 @@ internal constructor(
differentFormatBlob.joinToString(", "),
)
throw IngestClientException(
"All blobs in the request must have the same format. All blobs in the request must have the same format.Received formats: $differentFormatBlob",
message = "All blobs in the request must have the same format. Received formats: $differentFormatBlob",
)
}

// Split sources and upload local sources in parallel
val blobSources = uploadLocalSourcesAsync(sources)

// Check for duplicate blob URLs
val duplicates =
blobSources
sources
.groupBy { sanitizeBlobUrl(it.blobPath) }
.filter { it.value.size > 1 }

Expand All @@ -251,12 +257,13 @@ internal constructor(
"{Url: $url, Source Ids: [$sourceIds]}"
}
throw IngestClientException(
"Duplicate blob sources detected in the request: [$duplicateInfo]",
message = "Duplicate blob sources detected in the request: [$duplicateInfo]",
)
}

// Create blob objects for the request
val blobs =
blobSources.map {
sources.map {
Blob(
it.blobPath,
sourceId = it.sourceId.toString(),
Expand Down Expand Up @@ -340,7 +347,7 @@ internal constructor(
}
else -> {
throw IngestClientException(
"Unsupported ingestion source type: ${source::class.simpleName}",
message = "Unsupported ingestion source type: ${source::class.simpleName}",
)
}
}
Expand Down Expand Up @@ -382,55 +389,6 @@ internal constructor(
}
}

/**
* Splits sources into BlobSources and LocalSources, uploads LocalSources in
* parallel, and returns a unified list of BlobSources.
*
* @param sources The list of ingestion sources to process
* @return A list of BlobSources including both original BlobSources and
* uploaded LocalSources
* @throws IngestClientException if an unsupported source type is
* encountered
*/
private suspend fun uploadLocalSourcesAsync(
sources: List<IngestionSource>,
): List<BlobSource> {
// Split sources into BlobSources and LocalSources
val blobSources = mutableListOf<BlobSource>()
val localSources = mutableListOf<LocalSource>()

sources.forEach { source ->
when (source) {
is BlobSource -> blobSources.add(source)
is LocalSource -> localSources.add(source)
else ->
throw IngestClientException(
"Unsupported ingestion source type: ${source::class.simpleName}",
)
}
}

// Upload LocalSources in parallel and collect the resulting BlobSources
if (localSources.isNotEmpty()) {
logger.info(
"Uploading ${localSources.size} local source(s) to blob storage",
)
val uploadedBlobs = coroutineScope {
localSources
.map { localSource ->
async { uploader.uploadAsync(localSource) }
}
.awaitAll()
}
blobSources.addAll(uploadedBlobs)
logger.info(
"Successfully uploaded ${uploadedBlobs.size} local source(s)",
)
}

return blobSources
}

/**
* Sanitizes a blob URL by removing the SAS token and query parameters to
* allow proper duplicate detection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,57 @@ class DefaultConfigurationCache(
val clientDetails: ClientDetails,
val configurationProvider: (suspend () -> ConfigurationResponse)? = null,
) : ConfigurationCache {
companion object {
/**
* Creates a DefaultConfigurationCache for Java callers.
*
* This factory method provides a convenient way to create a cache from Java
* without dealing with Kotlin named parameters.
*
* @param dmUrl Data management endpoint URL
* @param tokenCredential Authentication credentials
* @param clientDetails Client identification details for tracking
* @return A new DefaultConfigurationCache instance
*/
@JvmStatic
fun create(
dmUrl: String,
tokenCredential: TokenCredential,
clientDetails: ClientDetails,
): DefaultConfigurationCache =
DefaultConfigurationCache(
dmUrl = dmUrl,
tokenCredential = tokenCredential,
clientDetails = clientDetails,
)

/**
* Creates a DefaultConfigurationCache with all options for Java callers.
*
* @param dmUrl Data management endpoint URL
* @param tokenCredential Authentication credentials
* @param skipSecurityChecks Whether to skip security validation
* @param clientDetails Client identification details for tracking
* @param refreshInterval Duration after which cached configuration is stale
* @return A new DefaultConfigurationCache instance
*/
@JvmStatic
fun create(
dmUrl: String,
tokenCredential: TokenCredential,
skipSecurityChecks: Boolean,
clientDetails: ClientDetails,
refreshInterval: Duration,
): DefaultConfigurationCache =
DefaultConfigurationCache(
refreshInterval = refreshInterval,
dmUrl = dmUrl,
tokenCredential = tokenCredential,
skipSecurityChecks = skipSecurityChecks,
clientDetails = clientDetails,
)
}

init {
if (
configurationProvider == null &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import java.nio.file.Path
import java.util.UUID

/** Represents a file-based ingestion source. */
class FileSource(
class FileSource @JvmOverloads constructor(
val path: Path,
format: Format,
sourceId: UUID = UUID.randomUUID(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import java.io.InputStream
import java.util.UUID

/** Represents a stream-based ingestion source. */
class StreamSource(
class StreamSource @JvmOverloads constructor(
stream: InputStream,
format: Format,
sourceCompression: CompressionType,
sourceCompression: CompressionType = CompressionType.NONE,
sourceId: UUID = UUID.randomUUID(),
leaveOpen: Boolean = false,
) : LocalSource(format, leaveOpen, sourceCompression, sourceId) {
Expand Down
Loading
Loading