diff --git a/v3/src/main/java/com/skyflow/utils/Utils.java b/v3/src/main/java/com/skyflow/utils/Utils.java index 76edb9cd..2be9cb78 100644 --- a/v3/src/main/java/com/skyflow/utils/Utils.java +++ b/v3/src/main/java/com/skyflow/utils/Utils.java @@ -125,7 +125,7 @@ public static com.skyflow.vault.data.InsertResponse formatResponse(InsertRespons tokensMap.put(key, tokenList); } } - Success success = new Success(indexNumber, record.get(index).getSkyflowId().get(), tokensMap, null); + Success success = new Success(indexNumber, record.get(index).getSkyflowId().get(), tokensMap, record.get(index).getData().isPresent() ? record.get(index).getData().get() : null); successRecords.add(success); } indexNumber++; diff --git a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java index 40093fb0..a1843ff6 100644 --- a/v3/src/main/java/com/skyflow/vault/controller/VaultController.java +++ b/v3/src/main/java/com/skyflow/vault/controller/VaultController.java @@ -22,6 +22,7 @@ import io.github.cdimascio.dotenv.Dotenv; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -77,13 +78,13 @@ public CompletableFuture bulkInsertAsync( setBearerToken(); configureInsertConcurrencyAndBatchSize(insertRequest.getValues().size()); com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest request = super.getBulkInsertRequestBody(insertRequest, super.getVaultConfig()); - - List> futures = this.insertBatchFutures(request); + List errorRecords = Collections.synchronizedList(new ArrayList<>()); + List> futures = this.insertBatchFutures(request, errorRecords); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> { List successRecords = new ArrayList<>(); - List errorRecords = new ArrayList<>(); +// List errorRecords = new ArrayList<>(); for (CompletableFuture future : futures) { com.skyflow.vault.data.InsertResponse futureResponse = future.join(); @@ -111,10 +112,10 @@ private com.skyflow.vault.data.InsertResponse processSync( ArrayList> originalPayload ) throws ExecutionException, InterruptedException { LogUtil.printInfoLog(InfoLogs.PROCESSING_BATCHES.getLog()); - List errorRecords = new ArrayList<>(); +// List errorRecords = new ArrayList<>(); List successRecords = new ArrayList<>(); - - List> futures = this.insertBatchFutures(insertRequest); + List errorRecords = Collections.synchronizedList(new ArrayList<>()); + List> futures = this.insertBatchFutures(insertRequest, errorRecords); CompletableFuture allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); allFutures.join(); @@ -137,8 +138,8 @@ private com.skyflow.vault.data.InsertResponse processSync( private List> insertBatchFutures( - com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest - ) { + com.skyflow.generated.rest.resources.recordservice.requests.InsertRequest insertRequest, + List errorRecords) { List records = insertRequest.getRecords().get(); ExecutorService executor = Executors.newFixedThreadPool(insertConcurrencyLimit); @@ -152,7 +153,10 @@ private List> insertBat CompletableFuture future = CompletableFuture .supplyAsync(() -> insertBatch(batch, insertRequest.getTableName().get()), executor) .thenApply(response -> formatResponse(response, batchNumber, insertBatchSize)) - .exceptionally(ex -> new com.skyflow.vault.data.InsertResponse(null, handleBatchException(ex, batch, batchNumber, batches))); + .exceptionally(ex -> { + errorRecords.addAll(handleBatchException(ex, batch, batchNumber, batches)); + return null; + }); futures.add(future); } } finally { @@ -181,7 +185,7 @@ private void configureInsertConcurrencyAndBatchSize(int totalRequests) { int batchSize = Integer.parseInt(userProvidedBatchSize); int maxBatchSize = Math.min(batchSize, Constants.MAX_INSERT_BATCH_SIZE); if (maxBatchSize > 0) { - this.insertBatchSize = batchSize; + this.insertBatchSize = maxBatchSize; } else { LogUtil.printWarningLog(WarningLogs.INVALID_BATCH_SIZE_PROVIDED.getLog()); this.insertBatchSize = Constants.INSERT_BATCH_SIZE;