diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt index 4799f850..41dac220 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/IngestClient.kt @@ -80,18 +80,31 @@ interface IngestClient : Closeable { interface MultiIngestClient : IngestClient { /** - * Ingest data from multiple sources. + * Ingest data from multiple blob sources. * - * @param sources The sources to ingest. + * **Important:** Multi-blob ingestion only supports [BlobSource]. This design avoids + * partial failure scenarios where some local sources might be uploaded successfully + * while others fail, leaving the user in an inconsistent state. + * + * **For local files/streams**, you have two options: + * + * 1. **Single-source ingestion**: Use `ingestAsync(source, properties)` with a single + * [com.microsoft.azure.kusto.ingest.v2.source.LocalSource] (FileSource or StreamSource). + * The client handles upload internally. + * + * 2. **Multi-source ingestion**: Use [com.microsoft.azure.kusto.ingest.v2.uploader.IUploader] + * to upload local sources to blob storage first, then call this method with the + * resulting [BlobSource] objects. + * + * @param sources The blob sources to ingest. All sources must be [BlobSource] instances. * @param ingestRequestProperties Ingestion properties containing database, * table, format, and other settings. - * @return An [IngestionOperation] object that can be used to track the - * status of the ingestion. + * @return An [ExtendedIngestResponse] containing the ingestion operation details. */ suspend fun ingestAsync( database: String, table: String, - sources: List, + sources: List, ingestRequestProperties: IngestRequestProperties?, ): ExtendedIngestResponse diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt index 038ac94d..e053aece 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/client/QueuedIngestClient.kt @@ -28,9 +28,6 @@ import com.microsoft.azure.kusto.ingest.v2.uploader.IUploader import io.ktor.http.HttpStatusCode import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.future.future import kotlinx.coroutines.withTimeoutOrNull @@ -74,13 +71,17 @@ internal constructor( private val logger = LoggerFactory.getLogger(QueuedIngestClient::class.java) /** - * Ingests data from multiple sources with the given properties. This is the + * Ingests data from multiple blob sources with the given properties. This is the * suspend function for Kotlin callers. + * + * Multi-blob ingestion only supports [BlobSource]. The blobs are assumed to already + * exist in blob storage, so no upload is performed - the request is sent directly + * to the Data Management service. */ override suspend fun ingestAsync( database: String, table: String, - sources: List, + sources: List, ingestRequestProperties: IngestRequestProperties?, ): ExtendedIngestResponse = ingestAsyncInternal( @@ -108,14 +109,18 @@ internal constructor( ) /** - * Ingests data from multiple sources with the given properties. This is the + * Ingests data from multiple blob sources with the given properties. This is the * Java-friendly version that returns a CompletableFuture. + * + * Multi-blob ingestion only supports [BlobSource]. The blobs are assumed to already + * exist in blob storage, so no upload is performed - the request is sent directly + * to the Data Management service. */ @JvmName("ingestAsync") fun ingestAsyncJava( database: String, table: String, - sources: List, + sources: List, ingestRequestProperties: IngestRequestProperties?, ): CompletableFuture = CoroutineScope(Dispatchers.IO).future { @@ -197,14 +202,18 @@ internal constructor( ) } - /** Internal implementation of ingestAsync for multiple sources. */ + /** + * Internal implementation of ingestAsync for multiple blob sources. + * + * This method only accepts [BlobSource] - no upload is performed. + * The blobs are assumed to already exist in blob storage. + */ private suspend fun ingestAsyncInternal( database: String, table: String, - sources: List, + sources: List, ingestRequestProperties: IngestRequestProperties?, ): ExtendedIngestResponse { - // Extract database and table from properties // Validate sources list is not empty require(sources.isNotEmpty()) { "sources list cannot be empty" } val maxBlobsPerBatch = getMaxSourcesPerMultiIngest() @@ -228,16 +237,13 @@ internal constructor( differentFormatBlob.joinToString(", "), ) throw IngestClientException( - "All blobs in the request must have the same format. All blobs in the request must have the same format.Received formats: $differentFormatBlob", + message = "All blobs in the request must have the same format. Received formats: $differentFormatBlob", ) } - // Split sources and upload local sources in parallel - val blobSources = uploadLocalSourcesAsync(sources) - // Check for duplicate blob URLs val duplicates = - blobSources + sources .groupBy { sanitizeBlobUrl(it.blobPath) } .filter { it.value.size > 1 } @@ -251,12 +257,13 @@ internal constructor( "{Url: $url, Source Ids: [$sourceIds]}" } throw IngestClientException( - "Duplicate blob sources detected in the request: [$duplicateInfo]", + message = "Duplicate blob sources detected in the request: [$duplicateInfo]", ) } + // Create blob objects for the request val blobs = - blobSources.map { + sources.map { Blob( it.blobPath, sourceId = it.sourceId.toString(), @@ -340,7 +347,7 @@ internal constructor( } else -> { throw IngestClientException( - "Unsupported ingestion source type: ${source::class.simpleName}", + message = "Unsupported ingestion source type: ${source::class.simpleName}", ) } } @@ -382,55 +389,6 @@ internal constructor( } } - /** - * Splits sources into BlobSources and LocalSources, uploads LocalSources in - * parallel, and returns a unified list of BlobSources. - * - * @param sources The list of ingestion sources to process - * @return A list of BlobSources including both original BlobSources and - * uploaded LocalSources - * @throws IngestClientException if an unsupported source type is - * encountered - */ - private suspend fun uploadLocalSourcesAsync( - sources: List, - ): List { - // Split sources into BlobSources and LocalSources - val blobSources = mutableListOf() - val localSources = mutableListOf() - - sources.forEach { source -> - when (source) { - is BlobSource -> blobSources.add(source) - is LocalSource -> localSources.add(source) - else -> - throw IngestClientException( - "Unsupported ingestion source type: ${source::class.simpleName}", - ) - } - } - - // Upload LocalSources in parallel and collect the resulting BlobSources - if (localSources.isNotEmpty()) { - logger.info( - "Uploading ${localSources.size} local source(s) to blob storage", - ) - val uploadedBlobs = coroutineScope { - localSources - .map { localSource -> - async { uploader.uploadAsync(localSource) } - } - .awaitAll() - } - blobSources.addAll(uploadedBlobs) - logger.info( - "Successfully uploaded ${uploadedBlobs.size} local source(s)", - ) - } - - return blobSources - } - /** * Sanitizes a blob URL by removing the SAS token and query parameters to * allow proper duplicate detection. diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt index b716a7bb..99c902f4 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/common/ConfigurationCache.kt @@ -69,6 +69,57 @@ class DefaultConfigurationCache( val clientDetails: ClientDetails, val configurationProvider: (suspend () -> ConfigurationResponse)? = null, ) : ConfigurationCache { + companion object { + /** + * Creates a DefaultConfigurationCache for Java callers. + * + * This factory method provides a convenient way to create a cache from Java + * without dealing with Kotlin named parameters. + * + * @param dmUrl Data management endpoint URL + * @param tokenCredential Authentication credentials + * @param clientDetails Client identification details for tracking + * @return A new DefaultConfigurationCache instance + */ + @JvmStatic + fun create( + dmUrl: String, + tokenCredential: TokenCredential, + clientDetails: ClientDetails, + ): DefaultConfigurationCache = + DefaultConfigurationCache( + dmUrl = dmUrl, + tokenCredential = tokenCredential, + clientDetails = clientDetails, + ) + + /** + * Creates a DefaultConfigurationCache with all options for Java callers. + * + * @param dmUrl Data management endpoint URL + * @param tokenCredential Authentication credentials + * @param skipSecurityChecks Whether to skip security validation + * @param clientDetails Client identification details for tracking + * @param refreshInterval Duration after which cached configuration is stale + * @return A new DefaultConfigurationCache instance + */ + @JvmStatic + fun create( + dmUrl: String, + tokenCredential: TokenCredential, + skipSecurityChecks: Boolean, + clientDetails: ClientDetails, + refreshInterval: Duration, + ): DefaultConfigurationCache = + DefaultConfigurationCache( + refreshInterval = refreshInterval, + dmUrl = dmUrl, + tokenCredential = tokenCredential, + skipSecurityChecks = skipSecurityChecks, + clientDetails = clientDetails, + ) + } + init { if ( configurationProvider == null && diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/FileSource.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/FileSource.kt index 359f1e8c..66c6d0d8 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/FileSource.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/FileSource.kt @@ -13,7 +13,7 @@ import java.nio.file.Path import java.util.UUID /** Represents a file-based ingestion source. */ -class FileSource( +class FileSource @JvmOverloads constructor( val path: Path, format: Format, sourceId: UUID = UUID.randomUUID(), diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/StreamSource.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/StreamSource.kt index 3b3e88a7..662e6c5e 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/StreamSource.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/source/StreamSource.kt @@ -7,10 +7,10 @@ import java.io.InputStream import java.util.UUID /** Represents a stream-based ingestion source. */ -class StreamSource( +class StreamSource @JvmOverloads constructor( stream: InputStream, format: Format, - sourceCompression: CompressionType, + sourceCompression: CompressionType = CompressionType.NONE, sourceId: UUID = UUID.randomUUID(), leaveOpen: Boolean = false, ) : LocalSource(format, leaveOpen, sourceCompression, sourceId) { diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt index 2058bf7c..d947016e 100644 --- a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ContainerUploaderBase.kt @@ -26,11 +26,15 @@ import com.microsoft.azure.kusto.ingest.v2.uploader.compression.CompressionExcep import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadErrorCode import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadResult import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadResults +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay +import kotlinx.coroutines.future.future +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import kotlinx.coroutines.withContext import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -41,6 +45,8 @@ import java.io.PipedOutputStream import java.time.Clock import java.time.Duration import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.atomic.AtomicInteger import java.util.zip.GZIPOutputStream /** Represents an abstract base class for uploaders to storage containers. */ @@ -59,6 +65,12 @@ abstract class ContainerUploaderBase( private val effectiveMaxConcurrency: Int = minOf(maxConcurrency, Runtime.getRuntime().availableProcessors()) + /** + * Atomic counter for round-robin container selection. Increments on each + * upload to distribute load evenly across containers. + */ + private val containerIndexCounter = AtomicInteger(0) + override var ignoreSizeLimit: Boolean = false override fun close() { @@ -251,9 +263,10 @@ abstract class ContainerUploaderBase( ) /** - * Uploads a stream with retry logic and container cycling. Randomly selects - * a starting container and cycles through containers on each retry. For - * example, with 2 containers and 3 retries: 1->2->1 or 2->1->2 + * Uploads a stream with retry logic and container cycling. Uses an + * incrementing counter (mod container count) for round-robin container + * selection, ensuring even load distribution across containers on each + * retry. For example, with 2 containers and 3 retries: 0->1->0 or 1->0->1 */ private suspend fun uploadWithRetries( local: LocalSource, @@ -262,11 +275,12 @@ abstract class ContainerUploaderBase( containers: List, effectiveCompressionType: CompressionType = local.compressionType, ): BlobSource { - // Select random starting container index - var containerIndex = (0 until containers.size).random() + // Select container using incrementing counter for round-robin distribution + var containerIndex = + containerIndexCounter.getAndIncrement() % containers.size logger.debug( - "Starting upload with {} containers, random start index: {}", + "Starting upload with {} containers, round-robin index: {}", containers.size, containerIndex, ) @@ -373,73 +387,54 @@ abstract class ContainerUploaderBase( localSources.size, maxConcurrency, ) - // Process sources in chunks to respect maxConcurrency at file level - val results = - localSources.chunked(maxConcurrency).flatMap { chunk -> - chunk.map { source -> - async { - val startedAt = - Instant.now(Clock.systemUTC()) - try { - val blobSource = uploadAsync(source) - val completedAt = - Instant.now(Clock.systemUTC()) - UploadResult.Success( - sourceName = source.name, - startedAt = startedAt, - completedAt = completedAt, - blobUrl = blobSource.blobPath, - sizeBytes = source.size() ?: -1, - ) - } catch (e: Exception) { - val completedAt = - Instant.now(Clock.systemUTC()) - val errorCode = - when { - e.message?.contains( - "size", - ) == true -> - UploadErrorCode - .SOURCE_SIZE_LIMIT_EXCEEDED - e.message?.contains( - "readable", - ) == true -> - UploadErrorCode - .SOURCE_NOT_READABLE - e.message?.contains( - "empty", - ) == true -> - UploadErrorCode - .SOURCE_IS_EMPTY - e.message?.contains( - "container", - ) == true -> - UploadErrorCode - .NO_CONTAINERS_AVAILABLE - else -> - UploadErrorCode - .UPLOAD_FAILED - } - - UploadResult.Failure( - sourceName = source.name, - startedAt = startedAt, - completedAt = completedAt, - errorCode = errorCode, - errorMessage = - e.message - ?: "Upload failed", - exception = e, - isPermanent = - e is IngestException && - e.isPermanent == - true, - ) + // TODO check and validate failure scenarios + // Use semaphore for true streaming parallelism + // This allows up to maxConcurrency concurrent uploads, starting new ones as soon as slots are available + val semaphore = Semaphore(maxConcurrency) + + // Launch all uploads concurrently, but semaphore limits actual concurrent execution + val results = localSources.map { source -> + async { + semaphore.withPermit { + val startedAt = Instant.now(Clock.systemUTC()) + try { + val blobSource = uploadAsync(source) + val completedAt = Instant.now(Clock.systemUTC()) + UploadResult.Success( + sourceName = source.name, + startedAt = startedAt, + completedAt = completedAt, + blobUrl = blobSource.blobPath, + sizeBytes = source.size() ?: -1, + ) + } catch (e: Exception) { + val completedAt = Instant.now(Clock.systemUTC()) + val errorCode = when { + e.message?.contains("size") == true -> + UploadErrorCode.SOURCE_SIZE_LIMIT_EXCEEDED + e.message?.contains("readable") == true -> + UploadErrorCode.SOURCE_NOT_READABLE + e.message?.contains("empty") == true -> + UploadErrorCode.SOURCE_IS_EMPTY + e.message?.contains("container") == true -> + UploadErrorCode.NO_CONTAINERS_AVAILABLE + else -> + UploadErrorCode.UPLOAD_FAILED } + + UploadResult.Failure( + sourceName = source.name, + startedAt = startedAt, + completedAt = completedAt, + errorCode = errorCode, + errorMessage = e.message ?: "Upload failed", + exception = e, + isPermanent = e is IngestException && e.isPermanent == true, + ) } } - .awaitAll() } + }.awaitAll() val successes = results.filterIsInstance() val failures = results.filterIsInstance() @@ -705,6 +700,34 @@ abstract class ContainerUploaderBase( } } + /** + * Uploads the specified local source asynchronously. + * This is the Java-compatible version that returns a CompletableFuture. + * + * @param local The local source to upload. + * @return A CompletableFuture that will complete with the uploaded blob source. + */ + @JvmName("uploadAsync") + fun uploadAsyncJava(local: LocalSource): CompletableFuture = + CoroutineScope(Dispatchers.IO).future { + uploadAsync(local) + } + + /** + * Uploads the specified local sources asynchronously. + * This is the Java-compatible version that returns a CompletableFuture. + * + * @param localSources List of the local sources to upload. + * @return A CompletableFuture that will complete with the upload results. + */ + @JvmName("uploadManyAsync") + fun uploadManyAsyncJava( + localSources: List, + ): CompletableFuture = + CoroutineScope(Dispatchers.IO).future { + uploadManyAsync(localSources) + } + /** * Selects the appropriate containers for upload based on the provided * configuration cache and upload method. diff --git a/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ICustomUploader.kt b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ICustomUploader.kt new file mode 100644 index 00000000..229641d3 --- /dev/null +++ b/ingest-v2/src/main/kotlin/com/microsoft/azure/kusto/ingest/v2/uploader/ICustomUploader.kt @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package com.microsoft.azure.kusto.ingest.v2.uploader + +import com.microsoft.azure.kusto.ingest.v2.source.BlobSource +import com.microsoft.azure.kusto.ingest.v2.source.LocalSource +import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadResults +import kotlinx.coroutines.future.await +import java.io.Closeable +import java.util.concurrent.CompletableFuture + +/** + * Java-compatible interface for creating custom uploaders. + * + * This interface uses [CompletableFuture] instead of Kotlin coroutines, + * allowing Java developers to implement custom upload logic without + * needing to understand Kotlin suspend functions. + * + */ +interface ICustomUploader : Closeable { + /** + * Indicates whether to ignore the max data size allowed during upload. + * Default should be false. + */ + fun getIgnoreSizeLimit(): Boolean + + /** + * Sets whether to ignore the max data size limit. + */ + fun setIgnoreSizeLimit(value: Boolean) + + /** + * Uploads the specified local source asynchronously. + * + * @param local The local source to upload. + * @return A CompletableFuture that completes with the uploaded blob source. + */ + fun uploadAsync(local: LocalSource): CompletableFuture + + /** + * Uploads the specified local sources asynchronously. + * + * @param localSources List of the local sources to upload. + * @return A CompletableFuture that completes with the upload results. + */ + fun uploadManyAsync(localSources: List): CompletableFuture + +} + +/** + * Extension function to convert [ICustomUploader] to [IUploader]. + * + * Kotlin users can use this as: `myCustomUploader.asUploader()` + */ +fun ICustomUploader.asUploader(): IUploader = CustomUploaderAdapter(this) + +/** + * Static helper methods for [ICustomUploader]. + * + * Provides Java-friendly static methods to work with custom uploaders. + */ +object CustomUploaderHelper { + /** + * Wraps an [ICustomUploader] with an adapter to create an [IUploader]. + * + * This is the Java-friendly way to convert a custom uploader: + * + */ + @JvmStatic + fun asUploader(customUploader: ICustomUploader): IUploader = CustomUploaderAdapter(customUploader) +} + +/** + * Adapter that wraps an [ICustomUploader] to implement the [IUploader] interface. + * + * This allows Java-implemented uploaders to be used anywhere an [IUploader] is expected, + * such as with QueuedIngestClient or ManagedStreamingIngestClient. + * + */ +class CustomUploaderAdapter(private val customUploader: ICustomUploader) : IUploader { + override var ignoreSizeLimit: Boolean + get() = customUploader.getIgnoreSizeLimit() + set(value) { customUploader.setIgnoreSizeLimit(value) } + + override suspend fun uploadAsync(local: LocalSource): BlobSource { + return customUploader.uploadAsync(local).await() + } + + override suspend fun uploadManyAsync(localSources: List): UploadResults { + return customUploader.uploadManyAsync(localSources).await() + } + + override fun close() { + customUploader.close() + } +} diff --git a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientTest.kt b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientTest.kt index 12cbd5ee..72539505 100644 --- a/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientTest.kt +++ b/ingest-v2/src/test/kotlin/com/microsoft/azure/kusto/ingest/v2/QueuedIngestClientTest.kt @@ -412,47 +412,6 @@ class QueuedIngestClientTest : logger.info( "Large file upload completed: $largeSucceeded succeeded", ) - - // Batch upload (5 files) - logger.info("Testing batch upload (5 files)") - val batchSources = - (1..5).map { i -> - createTestStreamSource( - 1024 * i, - "combined_batch_$i.json", - ) - } - val batchResponse = - queuedIngestClient.ingestAsync( - database = database, - table = targetTable, - sources = batchSources, - ingestRequestProperties = - IngestRequestPropertiesBuilder.create() - .withEnableTracking(true) - .build(), - ) - assertNotNull(batchResponse.ingestResponse.ingestionOperationId) - val batchStatus = - queuedIngestClient.pollUntilCompletion( - database = database, - table = targetTable, - operationId = - batchResponse.ingestResponse - .ingestionOperationId, - pollingInterval = pollInterval, - timeout = pollTimeout, - ) - val batchSucceeded = - batchStatus.details?.count { - it.status == BlobStatus.Status.Succeeded - } ?: 0 - assert(batchSucceeded == batchSources.size) { - "Expected all batch files to succeed" - } - logger.info( - "Batch upload completed: $batchSucceeded/${batchSources.size} succeeded", - ) } catch (e: ConnectException) { assumeTrue(false, "Skipping test: ${e.message}") } catch (e: Exception) { @@ -465,37 +424,44 @@ class QueuedIngestClientTest : } @Test - fun `E2E - parallel processing with maxConcurrency`() = runBlocking { - logger.info("E2E: Testing parallel processing with maxConcurrency=5") + fun `E2E - multi-blob batch ingestion`() = runBlocking { + logger.info("E2E: Testing multi-blob batch ingestion using BlobSource") - val queuedIngestClient = createTestClient(maxConcurrency = 5) + val queuedIngestClient = createTestClient() - val sources = - (1..10).map { i -> - createTestStreamSource(512 * 1024, "parallel_$i.json") + // Multi-source API only accepts BlobSource - blobs already exist in storage, + // no upload needed, all blob URLs are submitted in a single request. + val sampleJsonFiles = listOf( + "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json", + "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json", + ) + val blobSources = + sampleJsonFiles.map { url -> + BlobSource( + url, + format = Format.multijson, // Use multijson to handle different JSON structures + ) } try { - val startTime = System.currentTimeMillis() val response = queuedIngestClient.ingestAsync( database = database, table = targetTable, - sources = sources, + sources = blobSources, ingestRequestProperties = IngestRequestPropertiesBuilder.create() .withEnableTracking(true) .build(), ) - val uploadDuration = System.currentTimeMillis() - startTime val operationId = assertValidIngestionResponse( response, - "E2E - parallel processing", + "E2E - multi-blob batch ingestion", ) logger.info( - "Parallel upload submitted in ${uploadDuration}ms with operation ID: $operationId", + "Multi-blob batch submitted with operation ID: $operationId", ) val finalStatus = @@ -512,10 +478,10 @@ class QueuedIngestClientTest : it.status == BlobStatus.Status.Succeeded } ?: 0 logger.info( - "Parallel upload: $succeededCount/${sources.size} succeeded (avg ${uploadDuration / sources.size}ms per file)", + "Multi-blob batch: $succeededCount/${blobSources.size} blobs succeeded", ) - assert(succeededCount == sources.size) { - "Expected parallel uploads to succeed" + assert(succeededCount == blobSources.size) { + "Expected all blobs in batch to be ingested successfully" } } catch (e: ConnectException) { assumeTrue(false, "Skipping test: ${e.message}") @@ -623,7 +589,7 @@ class QueuedIngestClientTest : queuedIngestClient.ingestAsync( database = database, table = targetTable, - sources = listOf(source), + source = source, ingestRequestProperties = irp, ) @@ -771,49 +737,67 @@ class QueuedIngestClientTest : } } + @Test + fun `E2E - duplicate blob URLs should be rejected`(): Unit = runBlocking { + logger.info("E2E: Testing duplicate blob URL detection") + + val client = createTestClient() + + // Create sources with duplicate blob URLs (same URL used multiple times) + val duplicateBlobUrl = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json" + val sources = + listOf( + BlobSource(duplicateBlobUrl, format = Format.json), + BlobSource("https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json", format = Format.json), + BlobSource(duplicateBlobUrl, format = Format.json), // Duplicate! + ) + + logger.info( + "Ingesting ${sources.size} blob sources with duplicate URLs (should fail)", + ) + val exception = + assertThrows { + client.ingestAsync( + database = database, + table = targetTable, + sources = sources, + ingestRequestProperties = + IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(), + ) + } + assertNotNull(exception, "Duplicate blob URLs should throw IngestClientException") + assert(exception.message?.contains("Duplicate blob sources detected") == true) { + "Exception message should indicate duplicate blob sources. Got: ${exception.message}" + } + logger.info("Duplicate blob URL detection test passed: ${exception.message}") + } + @Test fun `E2E - format mismatch and mixed format batch`(): Unit = runBlocking { logger.info("E2E: Testing format mismatch detection with mixed formats") val client = createTestClient() - val jsonContent = - """{"name":"test","value":123,"timestamp":"2024-01-01"}""" - val csvContent = - """name,value,timestamp -test,123,2024-01-01 -test2,456,2024-01-02""" - val sources = listOf( - StreamSource( - stream = - ByteArrayInputStream( - jsonContent.toByteArray(), - ), + BlobSource( + "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json", format = Format.json, - sourceCompression = CompressionType.NONE, ), - StreamSource( - stream = - ByteArrayInputStream( - csvContent.toByteArray(), - ), + BlobSource( + "https://kustosamplefiles.blob.core.windows.net/csvsamplefiles/simple.csv", format = Format.csv, - sourceCompression = CompressionType.NONE, ), - StreamSource( - stream = - ByteArrayInputStream( - jsonContent.toByteArray(), - ), + BlobSource( + "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json", format = Format.json, - sourceCompression = CompressionType.NONE, ), ) logger.info( - "Uploading ${sources.size} sources with mixed formats (JSON and CSV)", + "Ingesting ${sources.size} blob sources with mixed formats (JSON and CSV)", ) val exception = assertThrows { @@ -831,6 +815,9 @@ test2,456,2024-01-02""" exception, "Mixed formats are not permitted for ingestion", ) + assert(exception.message?.contains("same format") == true) { + "Exception message should indicate format mismatch. Got: ${exception.message}" + } } @ParameterizedTest( @@ -890,11 +877,12 @@ test2,456,2024-01-02""" .withEnableTracking(true) .build() + // Use single-source API for LocalSource val ingestionResponse = queuedIngestClient.ingestAsync( database = database, table = targetTable, - sources = listOf(source), + source = source, ingestRequestProperties = properties, ) diff --git a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java index 44591c4d..8dd33075 100644 --- a/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java +++ b/quickstart/src/main/java/com/microsoft/azure/kusto/quickstart/SampleApp.java @@ -20,13 +20,22 @@ import com.microsoft.azure.kusto.ingest.v2.builders.QueuedIngestClientBuilder; import com.microsoft.azure.kusto.ingest.v2.client.IngestionOperation; import com.microsoft.azure.kusto.ingest.v2.client.QueuedIngestClient; +import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache; +import com.microsoft.azure.kusto.ingest.v2.common.DefaultConfigurationCache; +import com.microsoft.azure.kusto.ingest.v2.common.SimpleRetryPolicy; +import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails; import com.microsoft.azure.kusto.ingest.v2.common.models.ExtendedIngestResponse; import com.microsoft.azure.kusto.ingest.v2.common.models.IngestRequestPropertiesBuilder; import com.microsoft.azure.kusto.ingest.v2.models.*; +import com.microsoft.azure.kusto.ingest.v2.source.BlobSource; import com.microsoft.azure.kusto.ingest.v2.source.CompressionType; import com.microsoft.azure.kusto.ingest.v2.source.FileSource; -import com.microsoft.azure.kusto.ingest.v2.source.IngestionSource; +import com.microsoft.azure.kusto.ingest.v2.source.LocalSource; import com.microsoft.azure.kusto.ingest.v2.source.StreamSource; +import com.microsoft.azure.kusto.ingest.v2.uploader.ManagedUploader; +import com.microsoft.azure.kusto.ingest.v2.uploader.UploadMethod; +import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadResult; +import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadResults; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.exporters.logging.LoggingSpanExporter; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -73,7 +82,6 @@ enum SourceType { return null; } } - /** * AuthenticationModeOptions - represents the different options to authenticate to the system */ @@ -824,18 +832,82 @@ private static ChainedTokenCredential buildIngestV2Credential(@NotNull IngestV2Q } private static @NotNull CompletableFuture ingestV2BatchIngestion(ConfigJson config, IngestV2QuickstartConfig ingestV2Config, - @NotNull QueuedIngestClient queuedIngestClient) { - System.out.println("\n=== Queued ingestion from multiple sources (ingest-v2 batch) ==="); + @NotNull QueuedIngestClient queuedIngestClient) { + System.out.println("\n=== Queued batch ingestion: Upload local files to blob, then ingest (ingest-v2) ==="); + String clusterPath = ingestV2Config.getClusterPath(); + ChainedTokenCredential credential = buildIngestV2Credential(ingestV2Config); + + ConfigurationCache configCache = DefaultConfigurationCache.create( + clusterPath, + credential, + new ClientDetails("SampleApp", "1.0", "quickstart-sample")); + + ManagedUploader uploader = ManagedUploader.builder() + .withConfigurationCache(configCache) + .withRetryPolicy(new SimpleRetryPolicy()) + .withMaxConcurrency(ingestV2Config.getMaxConcurrency()) + .withMaxDataSize(4L * 1024 * 1024 * 1024) // 4GB max size + .withUploadMethod(UploadMethod.STORAGE) + .withTokenCredential(credential) + .build(); + + System.out.println("ManagedUploader created for batch upload"); + FileSource source1 = new FileSource(resolveQuickstartPath("dataset.csv"), Format.csv, UUID.randomUUID(), CompressionType.NONE); FileSource source2 = new FileSource(resolveQuickstartPath("dataset.csv"), Format.csv, UUID.randomUUID(), CompressionType.NONE); - List sources = Arrays.asList(source1, source2); + List localSources = Arrays.asList(source1, source2); + + System.out.println("Prepared " + localSources.size() + " local files for upload:"); + for (LocalSource source : localSources) { + System.out.println(" - " + source.getName() + " (format: " + source.getFormat() + ")"); + } IngestRequestProperties props = buildIngestV2RequestProperties(config, ingestV2Config, null); - return queuedIngestClient.ingestAsync(config.getDatabaseName(), config.getTableName(), sources, props) - .thenCompose(response -> { - System.out.println("Batch ingestion queued. Operation ID: " + response.getIngestResponse().getIngestionOperationId()); - System.out.println("Number of sources in batch: " + sources.size()); - return trackIngestV2Operation(config, ingestV2Config, queuedIngestClient, response, "Batch Ingestion"); + + + System.out.println("Uploading " + localSources.size() + " files to blob storage..."); + + return uploader.uploadManyAsync(localSources) + .thenCompose(uploadResults -> { + System.out.println("Upload completed:"); + System.out.println(" Successes: " + uploadResults.getSuccesses().size()); + System.out.println(" Failures: " + uploadResults.getFailures().size()); + + for (UploadResult.Failure failure : uploadResults.getFailures()) { + System.err.println(" Upload failed for " + failure.getSourceName() + + ": " + failure.getErrorMessage()); + } + + List blobSources = new ArrayList<>(); + for (UploadResult.Success success : uploadResults.getSuccesses()) { + System.out.println(" Uploaded: " + success.getSourceName() + + " -> " + success.getBlobUrl().split("\\?")[0]); // Hide SAS token in log + + BlobSource blobSource = new BlobSource( + success.getBlobUrl(), + Format.csv, // All our files are CSV format + UUID.randomUUID(), + CompressionType.GZIP); + blobSources.add(blobSource); + } + + if (blobSources.isEmpty()) { + return CompletableFuture.failedFuture( + new RuntimeException("All uploads failed - nothing to ingest")); + } + + System.out.println("Ingesting " + blobSources.size() + " blobs as a batch..."); + return queuedIngestClient.ingestAsync(config.getDatabaseName(), config.getTableName(), blobSources, props) + .thenCompose(response -> { + System.out.println("Batch ingestion queued. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + System.out.println("Number of sources in batch: " + blobSources.size()); + return trackIngestV2Operation(config, ingestV2Config, queuedIngestClient, response, "Batch Upload & Ingest"); + }); + }) + .whenComplete((unused, throwable) -> { + uploader.close(); + System.out.println("ManagedUploader closed"); }); } diff --git a/samples/src/main/java/ingestv2/AzureBlobRestCustomUploader.java b/samples/src/main/java/ingestv2/AzureBlobRestCustomUploader.java new file mode 100644 index 00000000..b0ca14f8 --- /dev/null +++ b/samples/src/main/java/ingestv2/AzureBlobRestCustomUploader.java @@ -0,0 +1,541 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +package ingestv2; + +import com.azure.core.credential.TokenCredential; +import com.azure.identity.AzureCliCredentialBuilder; +import com.azure.identity.ChainedTokenCredential; +import com.azure.identity.ChainedTokenCredentialBuilder; +import com.azure.identity.ClientSecretCredentialBuilder; +import com.microsoft.azure.kusto.data.StringUtils; +import com.microsoft.azure.kusto.ingest.v2.common.models.mapping.IngestionMapping; +import com.microsoft.azure.kusto.ingest.v2.source.BlobSource; +import com.microsoft.azure.kusto.ingest.v2.source.CompressionType; +import com.microsoft.azure.kusto.ingest.v2.source.FileSource; +import com.microsoft.azure.kusto.ingest.v2.source.StreamSource; +import com.microsoft.azure.kusto.ingest.v2.models.Format; +import com.microsoft.azure.kusto.ingest.v2.source.LocalSource; +import com.microsoft.azure.kusto.ingest.v2.uploader.ICustomUploader; +import com.microsoft.azure.kusto.ingest.v2.uploader.CustomUploaderHelper; +import com.microsoft.azure.kusto.ingest.v2.uploader.IUploader; +import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadResult; +import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadResults; +import com.microsoft.azure.kusto.ingest.v2.uploader.models.UploadErrorCode; +import com.microsoft.azure.kusto.ingest.v2.builders.QueuedIngestClientBuilder; +import com.microsoft.azure.kusto.ingest.v2.client.QueuedIngestClient; +import com.microsoft.azure.kusto.ingest.v2.common.models.IngestRequestPropertiesBuilder; +import com.microsoft.azure.kusto.ingest.v2.models.IngestRequestProperties; + +import java.io.*; +import java.net.*; +import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.*; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Custom uploader implementation using Azure Blob REST API with SAS tokens from DM config. + * + * This demonstrates end-to-end ICustomUploader usage: + * 1. Gets container URLs with SAS tokens from DM cluster's config API + * 2. Uses standard HTTP PUT to upload blobs (same approach works for S3/GCS) + * 3. Returns BlobSource with the uploaded blob URL for ingestion + * + * ICustomUploader pattern works end-to-end and can be adapted for: + * - AWS S3 (use AWS SDK or S3 REST API) + * - Google Cloud Storage (use GCS SDK or REST API) + * - Any HTTP-based blob storage + */ +public class AzureBlobRestCustomUploader implements ICustomUploader { + + private final String containerUrlWithSas; // Container URL with SAS token from DM config + private final ExecutorService executor; + private boolean ignoreSizeLimit = false; + + /** + * Creates a custom uploader using a container URL with SAS token. + * + * The container URL is obtained from DM cluster's config API: + * - ConfigurationResponse.containerSettings.containers[0].path + * + * @param containerUrlWithSas Full container URL including SAS token + * Example: https://account.blob.core.windows.net/container?sv=...&sig=... + */ + public AzureBlobRestCustomUploader(String containerUrlWithSas) { + this.containerUrlWithSas = containerUrlWithSas; + this.executor = Executors.newFixedThreadPool(4); + } + + @Override + public boolean getIgnoreSizeLimit() { + return ignoreSizeLimit; + } + + @Override + public void setIgnoreSizeLimit(boolean value) { + this.ignoreSizeLimit = value; + } + + @Override + public CompletableFuture uploadAsync(LocalSource local) { + return CompletableFuture.supplyAsync(() -> { + try { + return uploadBlobUsingRest(local); + } catch (Exception e) { + throw new RuntimeException("Upload failed: " + e.getMessage(), e); + } + }, executor); + } + + @Override + public CompletableFuture uploadManyAsync(List localSources) { + return CompletableFuture.supplyAsync(() -> { + List successes = new ArrayList<>(); + List failures = new ArrayList<>(); + + for (LocalSource source : localSources) { + Instant startedAt = Instant.now(); + try { + BlobSource result = uploadBlobUsingRest(source); + Long size = source.size(); + successes.add(new UploadResult.Success( + source.getName(), + startedAt, + Instant.now(), + result.getBlobPath(), + size != null ? size : -1 + )); + } catch (Exception e) { + failures.add(new UploadResult.Failure( + source.getName(), + startedAt, + Instant.now(), + UploadErrorCode.UPLOAD_FAILED, + e.getMessage(), + e, + false + )); + } + } + + return new UploadResults(successes, failures); + }, executor); + } + + @Override + public void close() throws IOException { + executor.shutdown(); + } + + /** + * Uploads a blob using HTTP PUT with SAS token authentication. + * + * This is the core upload logic that can be adapted for any REST/SDK based storage + */ + private BlobSource uploadBlobUsingRest(LocalSource local) throws Exception { + String blobName = generateBlobName(local); + + // Read data from source using Kotlin's data() method + byte[] data; + try (InputStream inputStream = local.data()) { + data = inputStream.readAllBytes(); + } + + // Parse container URL and SAS + String[] parts = containerUrlWithSas.split("\\?", 2); + String containerUrl = parts[0]; + String sasToken = parts.length > 1 ? parts[1] : ""; + + // Build full blob URL with SAS + String blobUrl = containerUrl + "/" + blobName; + if (!sasToken.isEmpty()) { + blobUrl += "?" + sasToken; + } + + System.out.println("Uploading to: " + containerUrl + "/" + blobName); + + // Create HTTP PUT request + URL url = new URL(blobUrl); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("PUT"); + connection.setDoOutput(true); + connection.setConnectTimeout(30000); + connection.setReadTimeout(60000); + + // Set required headers for Azure Blob REST API + connection.setRequestProperty("x-ms-blob-type", "BlockBlob"); + connection.setRequestProperty("x-ms-version", "2021-06-08"); + connection.setRequestProperty("Content-Type", getContentType(local.getFormat())); + connection.setRequestProperty("Content-Length", String.valueOf(data.length)); + + // Upload the data + try (OutputStream out = connection.getOutputStream()) { + out.write(data); + out.flush(); + } + + // Check response + int responseCode = connection.getResponseCode(); + if (responseCode < 200 || responseCode >= 300) { + String errorBody = ""; + try (InputStream errorStream = connection.getErrorStream()) { + if (errorStream != null) { + errorBody = new String(errorStream.readAllBytes(), StandardCharsets.UTF_8); + } + } catch (Exception ignored) {} + throw new RuntimeException( + "Upload failed with status " + responseCode + ": " + errorBody + ); + } + + System.out.println("Successfully uploaded blob: " + blobName + " (Status: " + responseCode + ", Size: " + data.length + " bytes)"); + + // Return BlobSource with the blob URL (including SAS for Kusto to access) + String resultBlobUrl = containerUrl + "/" + blobName; + if (!sasToken.isEmpty()) { + resultBlobUrl += "?" + sasToken; + } + + return new BlobSource( + resultBlobUrl, + local.getFormat(), + local.getSourceId(), + local.getCompressionType() + ); + } + + private String generateBlobName(LocalSource source) { + String name = source.getName(); + if (name == null || name.isEmpty()) { + name = UUID.randomUUID().toString(); + } + String extension = getExtension(source.getFormat(), source.getCompressionType()); + if (!name.endsWith(extension)) { + name = name + extension; + } + return "custom-upload/" + UUID.randomUUID().toString().substring(0, 8) + "_" + name; + } + + private String getExtension(Format format, CompressionType compression) { + String formatExt; + switch (format) { + case json: formatExt = ".json"; break; + case csv: formatExt = ".csv"; break; + case parquet: formatExt = ".parquet"; break; + case avro: formatExt = ".avro"; break; + case orc: formatExt = ".orc"; break; + default: formatExt = ".dat"; break; + } + + if (compression == CompressionType.GZIP) { + return formatExt + ".gz"; + } + return formatExt; + } + + private String getContentType(Format format) { + switch (format) { + case json: return "application/json"; + case csv: return "text/csv"; + default: return "application/octet-stream"; + } + } + + // ========================================== + // End-to-end demo using system properties (like other samples) + // ========================================== + + public static void main(String[] args) throws Exception { + // Get configuration from system properties (consistent with other samples) + String engineEndpoint = System.getProperty("clusterPath"); // "https://.kusto.windows.net" + String appId = System.getProperty("app-id"); + String appKey = System.getProperty("appKey"); + String tenant = System.getProperty("tenant"); + + String database = System.getProperty("dbName"); + String table = System.getProperty("tableName"); + String mapping = System.getProperty("dataMappingName"); + + // Container URL with SAS token (in production, get from DM config API) + String containerUrlWithSas = System.getProperty("containerUrl"); + + if (engineEndpoint == null || engineEndpoint.isEmpty()) { + System.out.println("=== Azure Blob REST Custom Uploader Demo ==="); + System.out.println(); + System.out.println("This demonstrates ICustomUploader end-to-end with Azure Blob REST API."); + System.out.println(); + System.out.println("Usage:"); + System.out.println(" mvn exec:java -pl samples -Dexec.mainClass=\"ingestv2.AzureBlobRestCustomUploader\" \\"); + System.out.println(" -DclusterPath=https://mycluster.region.kusto.windows.net \\"); + System.out.println(" -DdbName=MyDatabase \\"); + System.out.println(" -DtableName=MyTable \\"); + System.out.println(" -DdataMappingName=MyMapping \\"); + System.out.println(" -DcontainerUrl=https://account.blob.core.windows.net/container?sas=..."); + System.out.println(); + System.out.println("Optional authentication (defaults to Azure CLI credential):"); + System.out.println(" -Dapp-id= -DappKey= -Dtenant="); + System.out.println(); + System.out.println("Usage pattern:"); + System.out.println("```java"); + System.out.println("// 1. Get container with SAS from DM config API or provide your own"); + System.out.println("String containerUrlWithSas = ...; // e.g., from ConfigurationResponse"); + System.out.println(); + System.out.println("// 2. Create custom uploader"); + System.out.println("ICustomUploader customUploader = new AzureBlobRestCustomUploader(containerUrlWithSas);"); + System.out.println(); + System.out.println("// 3. Convert to IUploader using CustomUploaderHelper"); + System.out.println("IUploader uploader = CustomUploaderHelper.asUploader(customUploader);"); + System.out.println(); + System.out.println("// 4. Create QueuedIngestClient with the custom uploader"); + System.out.println("QueuedIngestClient client = QueuedIngestClientBuilder.create(dmUrl)"); + System.out.println(" .withAuthentication(credential)"); + System.out.println(" .withUploader(uploader, true) // true = client manages uploader lifecycle"); + System.out.println(" .build();"); + System.out.println(); + System.out.println("// 5. Ingest - the custom uploader handles the upload!"); + System.out.println("client.ingestAsync(fileSource, properties).join();"); + System.out.println("```"); + return; + } + + System.out.println("=== Running End-to-End Azure Blob REST Custom Uploader Test ==="); + System.out.println("Engine Endpoint: " + engineEndpoint); + System.out.println("Database: " + database); + System.out.println("Table: " + table); + System.out.println("Mapping: " + mapping); + System.out.println("Container URL: " + (containerUrlWithSas != null ? "[provided]" : "[not provided - using default uploader]")); + + // Create Azure AD credential + ChainedTokenCredential credential; + if (StringUtils.isNotBlank(appId) + && StringUtils.isNotBlank(appKey) + && StringUtils.isNotBlank(tenant)) { + System.out.println("Using Service Principal authentication"); + credential = new ChainedTokenCredentialBuilder() + .addFirst(new ClientSecretCredentialBuilder() + .clientId(appId) + .clientSecret(appKey) + .tenantId(tenant) + .build()) + .build(); + } else { + System.out.println("Using Azure CLI authentication"); + credential = new ChainedTokenCredentialBuilder() + .addFirst(new AzureCliCredentialBuilder().build()) + .build(); + } + + // Build the ingest URL (DM endpoint) + // The correct pattern: https://cluster.region.kusto.windows.net -> https://ingest-cluster.region.kusto.windows.net + String dmUrl = engineEndpoint; + if (engineEndpoint.startsWith("https://")) { + dmUrl = engineEndpoint.replace("https://", "https://ingest-"); + } else if (engineEndpoint.startsWith("http://")) { + dmUrl = engineEndpoint.replace("http://", "http://ingest-"); + } + System.out.println("DM Endpoint: " + dmUrl); + + QueuedIngestClient queuedIngestClient = null; + try { + // Create the QueuedIngestClient with or without custom uploader + QueuedIngestClientBuilder builder = QueuedIngestClientBuilder.create(dmUrl) + .withAuthentication(credential) + .withMaxConcurrency(10); + + // If container URL is not provided, fetch it from the cluster's configuration API + if (containerUrlWithSas == null || containerUrlWithSas.isEmpty()) { + System.out.println("\n1. Fetching container URL from Kusto cluster configuration API..."); + containerUrlWithSas = fetchContainerUrlFromKustoConfig(dmUrl, credential); + System.out.println(" Retrieved container URL from cluster configuration"); + } + + // Now create our custom uploader with the container URL + if (containerUrlWithSas != null && !containerUrlWithSas.isEmpty()) { + System.out.println("\n2. Creating AzureBlobRestCustomUploader with container URL"); + + // create the custom uploader + IUploader uploader = CustomUploaderHelper.asUploader(new AzureBlobRestCustomUploader(containerUrlWithSas)); + + // Configure the builder to use our custom uploader + // true = client will manage the uploader lifecycle (close it when done) + builder.withUploader(uploader, true); + + System.out.println(" Custom uploader configured successfully!"); + } else { + System.out.println("\n2. No container URL available - using default managed uploader"); + } + + // Build the client + queuedIngestClient = builder.build(); + System.out.println("\n3. QueuedIngestClient created successfully"); + + // Demonstrate ingestion using a file source + String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; + + // Try CSV file first + FileSource fileSource = new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); + + IngestionMapping ingestionMapping = new IngestionMapping(mapping, IngestionMapping.IngestionMappingType.CSV); + + IngestRequestProperties properties; + if (mapping != null && !mapping.isEmpty()) { + properties = IngestRequestPropertiesBuilder.create() + .withIngestionMapping(ingestionMapping) + .withEnableTracking(true) + .build(); + } else { + properties = IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); + } + + System.out.println("\n4. Ingesting file: " + resourcesDirectory + "dataset.csv"); + System.out.println(" (This will use the custom uploader to upload to Azure Blob Storage!)"); + + // Perform ingestion - the custom uploader handles the upload! + var response = queuedIngestClient.ingestAsync(database, table, fileSource, properties).get(); + + System.out.println("\n5. Ingestion queued successfully!"); + System.out.println(" Operation ID: " + response.getIngestResponse().getIngestionOperationId()); + System.out.println(" Ingestion Type: " + response.getIngestionType()); + + IngestionMapping jsonMapping = new IngestionMapping(mapping, IngestionMapping.IngestionMappingType.JSON); + // Also demonstrate JSON ingestion with mapping + if (mapping != null && !mapping.isEmpty()) { + FileSource jsonFileSource = new FileSource(Paths.get(resourcesDirectory + "dataset.json"), Format.json); + + IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create() + .withIngestionMapping(jsonMapping) + .withEnableTracking(true) + .build(); + + System.out.println("\n6. Ingesting JSON file with mapping: " + resourcesDirectory + "dataset.json"); + + var jsonResponse = queuedIngestClient.ingestAsync(database, table, jsonFileSource, jsonProperties).get(); + + System.out.println(" JSON Ingestion queued successfully!"); + System.out.println(" Operation ID: " + jsonResponse.getIngestResponse().getIngestionOperationId()); + } + + // Demonstrate stream ingestion + String csvData = "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null"; + InputStream csvInputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array()); + + StreamSource streamSource = new StreamSource(csvInputStream, Format.csv); + + IngestRequestProperties streamProperties = IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); + + System.out.println("\n7. Ingesting from stream (CSV data)"); + + var streamResponse = queuedIngestClient.ingestAsync(database, table,streamSource, streamProperties).get(); + + System.out.println(" Stream Ingestion queued successfully!"); + System.out.println(" Operation ID: " + streamResponse.getIngestResponse().getIngestionOperationId()); + + System.out.println("\n=== Azure Blob REST Custom Uploader Demo Complete ==="); + System.out.println(); + System.out.println("Key Integration Points:"); + System.out.println(" 1. AzureBlobRestCustomUploader implements ICustomUploader"); + System.out.println(" 2. CustomUploaderHelper.asUploader() converts to IUploader"); + System.out.println(" 3. QueuedIngestClientBuilder.withUploader() configures the custom uploader"); + System.out.println(" 4. client.ingestAsync() internally uses the custom uploader!"); + System.out.println(); + System.out.println("The same pattern works for any other source such as S3/GCP - just implement ICustomUploader!"); + + } catch (Exception e) { + System.err.println("Error during ingestion: " + e.getMessage()); + e.printStackTrace(); + } finally { + if (queuedIngestClient != null) { + queuedIngestClient.close(); + } + } + } + + /** + * Fetches container URL with SAS token from Kusto cluster's configuration API. + * + * The configuration API endpoint: {dmUrl}/v1/rest/ingestion/configuration + * Returns JSON with containerSettings.containers[0].path containing the container URL with SAS. + * + * @param dmUrl The DM (Data Management) cluster URL + * @param credential Azure credential for authentication + * @return Container URL with SAS token, or null if not available + */ + private static String fetchContainerUrlFromKustoConfig(String dmUrl, TokenCredential credential) throws Exception { + String configUrl = dmUrl + "/v1/rest/ingestion/configuration"; + System.out.println(" Fetching configuration from: " + configUrl); + + // Get access token for the Kusto resource + String scope = dmUrl + "/.default"; + com.azure.core.credential.AccessToken token = credential.getToken( + new com.azure.core.credential.TokenRequestContext().addScopes(scope) + ).block(); + + if (token == null) { + throw new RuntimeException("Failed to get access token for " + scope); + } + + // Create HTTP GET request + URL url = new URL(configUrl); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Authorization", "Bearer " + token.getToken()); + connection.setRequestProperty("Accept", "application/json"); + connection.setConnectTimeout(30000); + connection.setReadTimeout(60000); + + // Read response + int responseCode = connection.getResponseCode(); + if (responseCode < 200 || responseCode >= 300) { + String errorBody = ""; + try (InputStream errorStream = connection.getErrorStream()) { + if (errorStream != null) { + errorBody = new String(errorStream.readAllBytes(), StandardCharsets.UTF_8); + } + } catch (Exception ignored) {} + throw new RuntimeException( + "Failed to get configuration from " + configUrl + " (status " + responseCode + "): " + errorBody + ); + } + + String responseBody; + try (InputStream inputStream = connection.getInputStream()) { + responseBody = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + } + + // Parse JSON response to get container URL + ObjectMapper mapper = new ObjectMapper(); + JsonNode root = mapper.readTree(responseBody); + + // Navigate: containerSettings -> containers -> [0] -> path + JsonNode containerSettings = root.get("containerSettings"); + if (containerSettings == null) { + System.out.println(" Warning: No containerSettings in configuration response"); + return null; + } + + JsonNode containers = containerSettings.get("containers"); + if (containers == null || !containers.isArray() || containers.isEmpty()) { + System.out.println(" Warning: No containers in configuration response"); + return null; + } + + JsonNode firstContainer = containers.get(0); + JsonNode path = firstContainer.get("path"); + if (path == null || path.isNull()) { + System.out.println(" Warning: Container path is null"); + return null; + } + + String containerUrl = path.asText(); + System.out.println(" Found container: " + containerUrl.split("\\?")[0] + "?..."); + return containerUrl; + } +} diff --git a/samples/src/main/java/ingestv2/QueuedIngestV2.java b/samples/src/main/java/ingestv2/QueuedIngestV2.java index 317b34d0..bf68de0d 100644 --- a/samples/src/main/java/ingestv2/QueuedIngestV2.java +++ b/samples/src/main/java/ingestv2/QueuedIngestV2.java @@ -5,19 +5,34 @@ import com.azure.core.credential.TokenCredential; import com.azure.identity.AzureCliCredentialBuilder; +import com.azure.identity.ChainedTokenCredential; import com.azure.identity.ClientSecretCredentialBuilder; import com.microsoft.azure.kusto.data.StringUtils; import com.microsoft.azure.kusto.ingest.v2.builders.QueuedIngestClientBuilder; import com.microsoft.azure.kusto.ingest.v2.client.IngestionOperation; import com.microsoft.azure.kusto.ingest.v2.client.QueuedIngestClient; +import com.microsoft.azure.kusto.ingest.v2.common.DefaultConfigurationCache; +import com.microsoft.azure.kusto.ingest.v2.common.SimpleRetryPolicy; +import com.microsoft.azure.kusto.ingest.v2.common.models.ClientDetails; import com.microsoft.azure.kusto.ingest.v2.common.models.ExtendedIngestResponse; import com.microsoft.azure.kusto.ingest.v2.common.models.IngestRequestPropertiesBuilder; +import com.microsoft.azure.kusto.ingest.v2.models.BlobStatus; +import com.microsoft.azure.kusto.ingest.v2.models.Format; +import com.microsoft.azure.kusto.ingest.v2.models.IngestRequestProperties; +import com.microsoft.azure.kusto.ingest.v2.models.Status; +import com.microsoft.azure.kusto.ingest.v2.models.StatusResponse; +import com.microsoft.azure.kusto.ingest.v2.source.BlobSource; import com.microsoft.azure.kusto.ingest.v2.common.models.mapping.IngestionMapping; import com.microsoft.azure.kusto.ingest.v2.models.*; import com.microsoft.azure.kusto.ingest.v2.source.CompressionType; import com.microsoft.azure.kusto.ingest.v2.source.FileSource; -import com.microsoft.azure.kusto.ingest.v2.source.IngestionSource; +import com.microsoft.azure.kusto.ingest.v2.source.LocalSource; import com.microsoft.azure.kusto.ingest.v2.source.StreamSource; +import com.microsoft.azure.kusto.ingest.v2.uploader.IUploader; +import com.microsoft.azure.kusto.ingest.v2.uploader.ManagedUploader; +import com.microsoft.azure.kusto.ingest.v2.uploader.UploadMethod; +import com.microsoft.azure.kusto.ingest.v2.common.ConfigurationCache; + import org.jetbrains.annotations.NotNull; import java.io.ByteArrayInputStream; @@ -114,94 +129,98 @@ public static void main(String[] args) { /** * Demonstrates ingestion from various stream sources including: - In-memory string data as CSV * - Compressed file stream (CSV) - JSON file stream with mapping + * + *

Shows both source configuration with defaults) and source configuration with full control) approaches. */ static List> ingestFromStream() throws Exception { System.out.println("\n=== Queued Ingestion from Streams ==="); List> futures = new ArrayList<>(); - // Example 1: Ingest from in-memory CSV string - String csvData = "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null"; - InputStream csvInputStream = new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array()); + // Example 1: Ingest from in-memory CSV string (only 2 required parameters) + // sourceCompression defaults to CompressionType.NONE, sourceId auto-generated, baseName null, leaveOpen false + String csvData = + "0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,\"Zero\",0,00:00:00,,null"; + InputStream csvInputStream = + new ByteArrayInputStream(StandardCharsets.UTF_8.encode(csvData).array()); - StreamSource csvStreamSource = new StreamSource( - csvInputStream, - Format.csv, - CompressionType.NONE, - UUID.randomUUID(), - false); + StreamSource csvStreamSource = new StreamSource(csvInputStream, Format.csv); IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() .withEnableTracking(true) .build(); System.out.println("Queueing CSV data from string..."); - CompletableFuture csvFuture = queuedIngestClient - .ingestAsync(database, table, csvStreamSource, csvProperties) - .thenCompose( - response -> { - System.out.println( - "CSV ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation(response, "CSV Stream"); - }) - .whenComplete((unused, throwable) -> closeQuietly(csvInputStream)); + CompletableFuture csvFuture = + queuedIngestClient + .ingestAsync(database, table, csvStreamSource, csvProperties) + .thenCompose( + response -> { + System.out.println( + "CSV ingestion queued. Operation ID: " + + response.getIngestResponse() + .getIngestionOperationId()); + return trackIngestionOperation(response, "CSV Stream"); + }) + .whenComplete((unused, throwable) -> closeQuietly(csvInputStream)); futures.add(csvFuture); - // Example 2: Ingest from compressed CSV file + // Example 2: Ingest from compressed CSV file (all 6 parameters needed) + // Explicitly specify compression, sourceId, baseName, and leaveOpen String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; InputStream compressedCsvStream = new ByteArrayInputStream(readResourceBytes(resourcesDirectory, "dataset.csv.gz")); - StreamSource compressedStreamSource = new StreamSource( - compressedCsvStream, - Format.csv, - CompressionType.GZIP, - UUID.randomUUID(), - false); + StreamSource compressedStreamSource = + new StreamSource( + compressedCsvStream, + Format.csv, + CompressionType.GZIP, + UUID.randomUUID(), + false); System.out.println("Queueing compressed CSV file..."); - CompletableFuture compressedFuture = queuedIngestClient - .ingestAsync(database, table, compressedStreamSource, csvProperties) - .thenCompose( - response -> { - System.out.println( - "Compressed CSV ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation( - response, "Compressed CSV Stream"); - }) - .whenComplete((unused, throwable) -> closeQuietly(compressedCsvStream)); + CompletableFuture compressedFuture = + queuedIngestClient + .ingestAsync(database, table, compressedStreamSource, csvProperties) + .thenCompose( + response -> { + System.out.println( + "Compressed CSV ingestion queued. Operation ID: " + + response.getIngestResponse() + .getIngestionOperationId()); + return trackIngestionOperation( + response, "Compressed CSV Stream"); + }) + .whenComplete((unused, throwable) -> closeQuietly(compressedCsvStream)); futures.add(compressedFuture); - // Example 3: Ingest JSON with mapping - InputStream jsonStream = new ByteArrayInputStream(readResourceBytes(resourcesDirectory, "dataset.json")); + // Example 3: Ingest JSON with mapping - with defaults + // Uses defaults: sourceCompression=NONE, auto-generated sourceId, leaveOpen=false + InputStream jsonStream = + new ByteArrayInputStream(readResourceBytes(resourcesDirectory, "dataset.json")); + + StreamSource jsonStreamSource = new StreamSource(jsonStream, Format.json); - StreamSource jsonStreamSource = new StreamSource( - jsonStream, - Format.json, - CompressionType.NONE, - UUID.randomUUID(), - false); IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON); - IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create() - .withIngestionMapping(mapping) - .withEnableTracking(true) - .build(); + IngestRequestProperties jsonProperties = + IngestRequestPropertiesBuilder.create() + .withIngestionMapping(mapping) + .withEnableTracking(true) + .build(); System.out.println("Queueing JSON file with mapping..."); - CompletableFuture jsonFuture = queuedIngestClient - .ingestAsync(database, table, jsonStreamSource, jsonProperties) - .thenCompose( - response -> { - System.out.println( - "JSON ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation(response, "JSON Stream"); - }) - .whenComplete((unused, throwable) -> closeQuietly(jsonStream)); + CompletableFuture jsonFuture = + queuedIngestClient + .ingestAsync(database, table, jsonStreamSource, jsonProperties) + .thenCompose( + response -> { + System.out.println( + "JSON ingestion queued. Operation ID: " + + response.getIngestResponse() + .getIngestionOperationId()); + return trackIngestionOperation(response, "JSON Stream"); + }) + .whenComplete((unused, throwable) -> closeQuietly(jsonStream)); futures.add(jsonFuture); return futures; @@ -210,6 +229,8 @@ static List> ingestFromStream() throws Exception { /** * Demonstrates ingestion from file sources including: - CSV file - Compressed JSON file with * mapping + * + * Shows both source configuration with defaults and source configuration with all params approaches. */ static List> ingestFromFile() { System.out.println("\n=== Queued Ingestion from Files ==="); @@ -218,99 +239,243 @@ static List> ingestFromFile() { String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; - // Example 1: Ingest CSV file - FileSource csvFileSource = new FileSource( - Paths.get(resourcesDirectory + "dataset.csv"), - Format.csv, - UUID.randomUUID(), - CompressionType.NONE); + // Example 1: Ingest CSV file - with defaults + // compressionType auto-detected from filename (.csv = NONE), sourceId auto-generated, baseName auto-extracted + FileSource csvFileSource = new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); - IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() - .withEnableTracking(true) - .build(); + IngestRequestProperties csvProperties = + IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); System.out.println("Queueing CSV file..."); - CompletableFuture csvFuture = queuedIngestClient - .ingestAsync(database, table, csvFileSource, csvProperties) - .thenCompose( - response -> { - System.out.println( - "CSV file ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation(response, "CSV File"); - }); + CompletableFuture csvFuture = + queuedIngestClient + .ingestAsync(database, table, csvFileSource, csvProperties) + .thenCompose( + response -> { + System.out.println( + "CSV file ingestion queued. Operation ID: " + + response.getIngestResponse() + .getIngestionOperationId()); + return trackIngestionOperation(response, "CSV File"); + }); futures.add(csvFuture); - // Example 2: Ingest compressed JSON file with mapping - FileSource jsonFileSource = new FileSource( - Paths.get(resourcesDirectory + "dataset.jsonz.gz"), - Format.json, - UUID.randomUUID(), - CompressionType.GZIP); + // Example 2: Ingest compressed JSON file with mapping - with all parameters specified + // Explicitly specify sourceId, compression (auto-detected from .gz), and baseName for full control + FileSource jsonFileSource = + new FileSource( + Paths.get(resourcesDirectory + "dataset.jsonz.gz"), + Format.json, + UUID.randomUUID(), + CompressionType.GZIP); + IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON); - IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create() - .withIngestionMapping(mapping) - .withEnableTracking(true) - .build(); + IngestRequestProperties jsonProperties = + IngestRequestPropertiesBuilder.create() + .withIngestionMapping(mapping) + .withEnableTracking(true) + .build(); System.out.println("Queueing compressed JSON file with mapping..."); - CompletableFuture jsonFuture = queuedIngestClient - .ingestAsync(database, table, jsonFileSource, jsonProperties) - .thenCompose( - response -> { - System.out.println( - "Compressed JSON file ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - return trackIngestionOperation( - response, "Compressed JSON File"); - }); + CompletableFuture jsonFuture = + queuedIngestClient + .ingestAsync(database, table, jsonFileSource, jsonProperties) + .thenCompose( + response -> { + System.out.println( + "Compressed JSON file ingestion queued. Operation ID: " + + response.getIngestResponse() + .getIngestionOperationId()); + return trackIngestionOperation( + response, "Compressed JSON File"); + }); futures.add(jsonFuture); return futures; } /** - * Demonstrates batch ingestion from multiple sources in a single operation. This is more - * efficient than ingesting sources one by one when you have multiple files. + * Demonstrates batch ingestion from multiple blob sources in a single operation. + * + *

IMPORTANT: Multi-source ingestion only accepts BlobSource. For local sources + * (FileSource, StreamSource), you must either: + *

    + *
  1. Ingest them one by one using the single-source ingestAsync method, or
  2. + *
  3. First upload them to blob storage using uploadManyAsync, then pass the resulting + * BlobSource list to ingestAsync
  4. + *
+ * + *

This example uses public blob URLs from the Kusto sample files to demonstrate + * multi-blob batch ingestion. All blobs must have the same format. */ - static @NotNull CompletableFuture ingestMultipleSources() { - System.out.println("\n=== Queued Ingestion from Multiple Sources (Batch) ==="); + static CompletableFuture ingestMultipleSources() { + System.out.println("\n=== Queued Ingestion from Multiple Blob Sources (Batch) ==="); - String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; + // Multi-source API only accepts BlobSource - not FileSource or StreamSource. + // If you have local files, you must upload them to blob storage first. + // Here we use public sample blob URLs from Kusto sample files to demonstrate the pattern. - // Create multiple file sources - FileSource source1 = new FileSource( - Paths.get(resourcesDirectory + "dataset.csv"), - Format.csv, + // IMPORTANT: All sources in a batch must have the same format! + // BlobSource constructor requires: blobPath, format, sourceId, compressionType, baseName + + // Using multiple JSON files from Kusto public sample files + // All files are JSON format - this is required for batch ingestion + BlobSource blob1 = new BlobSource( + "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json", + Format.json, UUID.randomUUID(), CompressionType.NONE); - FileSource source2 = new FileSource( - Paths.get(resourcesDirectory + "dataset.csv.gz"), - Format.csv, + BlobSource blob2 = new BlobSource( + "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json", + Format.json, UUID.randomUUID(), - CompressionType.GZIP); + CompressionType.NONE); - List sources = Arrays.asList(source1, source2); + // Create list with all blob sources - all must have identical format + List blobSources = Arrays.asList(blob1, blob2); + IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON); + IngestRequestProperties properties = + IngestRequestPropertiesBuilder.create() + .withIngestionMapping(mapping) + .withEnableTracking(true) + .build(); - IngestRequestProperties properties = IngestRequestPropertiesBuilder.create() - .withEnableTracking(true) - .build(); + System.out.println("Queueing " + blobSources.size() + " blob sources in batch..."); + for (int i = 0; i < blobSources.size(); i++) { + System.out.println(" Blob " + (i + 1) + ": " + blobSources.get(i).getName()); + } - System.out.println("Queueing multiple sources in batch..."); return queuedIngestClient - .ingestAsync(database, table, sources, properties) - .thenCompose( - response -> { - System.out.println( - "Batch ingestion queued. Operation ID: " - + response.getIngestResponse() - .getIngestionOperationId()); - System.out.println("Number of sources in batch: " + sources.size()); - return trackIngestionOperation(response, "Batch Ingestion"); - }); + .ingestAsync(database, table, blobSources, properties) + .thenCompose(response -> { + System.out.println( + "Batch ingestion queued. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + System.out.println("Number of sources in batch: " + blobSources.size()); + return trackIngestionOperation(response, "Batch Blob Ingestion"); + }); + } + + /** + * Demonstrates ingesting multiple local files by uploading them to blob storage first, + * then ingesting them as a batch using the multi-source ingestAsync API. + * + *

Pattern: + *

    + *
  1. Create a ManagedUploader with proper configuration
  2. + *
  3. Create list of LocalSource (FileSource) objects
  4. + *
  5. Call uploader.uploadManyAsyncJava(localSources) to upload all files to blob storage
  6. + *
  7. Convert successful upload results to BlobSource list
  8. + *
  9. Call queuedIngestClient.ingestAsync(blobSources, properties) to ingest as a batch
  10. + *
+ * + *

This approach allows batch ingestion of local files by first uploading them + * to blob storage, which is required because the multi-source API only accepts BlobSource. + */ + static CompletableFuture ingestMultipleLocalFilesViaBlobUpload( + String engineEndpoint, ChainedTokenCredential credential) { + System.out.println("\n=== Queued Ingestion: Upload Local Files to Blob, Then Ingest ==="); + + // Step 1: Create configuration cache (needed for ManagedUploader) + String dmUrl = engineEndpoint.replace(".kusto.", ".ingest-"); + + ConfigurationCache configCache = + DefaultConfigurationCache.create( + dmUrl, + credential, + new ClientDetails("QueuedIngestV2Sample", "1.0", "ingest-v2-sample")); + + // Step 2: Create ManagedUploader for batch uploading local files to blob storage + ManagedUploader uploader = + ManagedUploader.builder() + .withConfigurationCache(configCache) + .withRetryPolicy(new SimpleRetryPolicy()) + .withMaxConcurrency(10) + .withMaxDataSize(4L * 1024 * 1024 * 1024) // 4GB max size + .withUploadMethod(UploadMethod.STORAGE) + .withTokenCredential(credential) + .build(); + + System.out.println("ManagedUploader created for batch upload"); + + // Step 3: Prepare list of local files to upload (all same format - CSV) + String resourcesDirectory = System.getProperty("user.dir") + "/src/main/resources/"; + + // IMPORTANT: All files must have the same format for batch ingestion! + FileSource file1 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); + FileSource file2 = new FileSource(Paths.get(resourcesDirectory + "dataset.csv.gz"), Format.csv); + + List localSources = Arrays.asList(file1, file2); + + System.out.println("Prepared " + localSources.size() + " local files for upload:"); + for (LocalSource source : localSources) { + System.out.println(" - " + source.getName() + " (format: " + source.getFormat() + ")"); + } + + IngestRequestProperties properties = + IngestRequestPropertiesBuilder.create() + .withEnableTracking(true) + .build(); + + // Step 4: Upload all local files to blob storage using uploadManyAsync + // Note: The Kotlin suspend function uploadManyAsync is exposed to Java as uploadManyAsync + // (via @JvmName annotation) and returns CompletableFuture + System.out.println("Uploading " + localSources.size() + " files to blob storage..."); + + return uploader.uploadManyAsync(localSources) + .thenCompose(uploadResults -> { + // Step 5: Process upload results + System.out.println("Upload completed:"); + System.out.println(" Successes: " + uploadResults.getSuccesses().size()); + System.out.println(" Failures: " + uploadResults.getFailures().size()); + + // Log any failures + for (var failure : uploadResults.getFailures()) { + System.err.println(" Upload failed for " + failure.getSourceName() + + ": " + failure.getErrorMessage()); + } + + // Step 6: Convert successful uploads to BlobSource list + List blobSources = new ArrayList<>(); + for (var success : uploadResults.getSuccesses()) { + System.out.println(" Uploaded: " + success.getSourceName() + + " -> " + success.getBlobUrl().split("\\?")[0]); // Hide SAS token in log + + // Create BlobSource from upload result + // Match format from original FileSource (CSV in this case) + BlobSource blobSource = new BlobSource( + success.getBlobUrl(), + Format.csv, // All our files are CSV format + UUID.randomUUID(), + CompressionType.GZIP // Uploader auto-compresses to GZIP + ); + blobSources.add(blobSource); + } + + if (blobSources.isEmpty()) { + return CompletableFuture.failedFuture( + new RuntimeException("All uploads failed - nothing to ingest")); + } + + // Step 7: Ingest all blobs as a batch + System.out.println("Ingesting " + blobSources.size() + " blobs as a batch..."); + return queuedIngestClient.ingestAsync(database, table, blobSources, properties) + .thenCompose(response -> { + System.out.println( + "Batch ingestion queued. Operation ID: " + + response.getIngestResponse().getIngestionOperationId()); + System.out.println("Number of sources in batch: " + blobSources.size()); + return trackIngestionOperation(response, "Local Files Via Blob Upload"); + }); + }) + .whenComplete((unused, throwable) -> { + // Clean up uploader + uploader.close(); + System.out.println("ManagedUploader closed"); + }); } /** @@ -369,9 +534,7 @@ private static CompletableFuture trackIngestionOperation( }); } - /** - * Prints detailed status information from a StatusResponse - */ + /** Prints detailed status information from a StatusResponse */ private static void printStatusResponse(StatusResponse statusResponse) { if (statusResponse == null) { System.out.println(" Status: null"); diff --git a/samples/src/main/java/ingestv2/StreamingIngestV2.java b/samples/src/main/java/ingestv2/StreamingIngestV2.java index 83269d71..7c3b7149 100644 --- a/samples/src/main/java/ingestv2/StreamingIngestV2.java +++ b/samples/src/main/java/ingestv2/StreamingIngestV2.java @@ -84,6 +84,9 @@ public static void main(String[] args) { /** * Demonstrates ingestion from various stream sources including: - In-memory string data as CSV * - Compressed file stream (CSV) - JSON file stream with mapping + * + *

NOTE: This example shows both source creation with defaults and source creation with full control + * StreamSource defaults: compression=NONE, sourceId=auto-generated, baseName=null, leaveOpen=false */ static void ingestFromStream() throws Exception { System.out.println("\n=== Ingesting from Streams ==="); @@ -127,15 +130,14 @@ static void ingestFromStream() throws Exception { compressedCsvStream.close(); // Example 3: Ingest JSON with mapping + // Demonstrating minimal parameters for quick prototyping FileInputStream jsonStream = new FileInputStream(resourcesDirectory + "dataset.json"); StreamSource jsonStreamSource = new StreamSource( jsonStream, - Format.json, - CompressionType.NONE, - UUID.randomUUID(), - false); + Format.json); + IngestionMapping mapping = new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON); IngestRequestProperties jsonProperties = IngestRequestPropertiesBuilder.create() .withIngestionMapping(new IngestionMapping(mappingName, IngestionMapping.IngestionMappingType.JSON)) .withEnableTracking(true) @@ -152,18 +154,20 @@ static void ingestFromStream() throws Exception { /** * Demonstrates ingestion from file sources including: - CSV file - Compressed JSON file with * mapping + * + *

NOTE: This example shows both source creation with defaults and source creation with full control. + * FileSource defaults: sourceId=auto-generated, compression=auto-detected from extension, baseName=from-filename */ static void ingestFromFile() throws Exception { System.out.println("\n=== Ingesting from Files ==="); String resourcesDirectory = System.getProperty("user.dir") + "/samples/src/main/resources/"; - // Example 1: Ingest CSV file - FileSource csvFileSource = new FileSource( - Paths.get(resourcesDirectory + "dataset.csv"), - Format.csv, - UUID.randomUUID(), - CompressionType.NONE); + // Example 1: Ingest CSV file using with defaults + // Only providing required parameters: path and format + // Defaults: sourceId=auto-generated, compression=auto-detected (NONE for .csv), baseName="dataset.csv" + FileSource csvFileSource = + new FileSource(Paths.get(resourcesDirectory + "dataset.csv"), Format.csv); IngestRequestProperties csvProperties = IngestRequestPropertiesBuilder.create() .withEnableTracking(true)