From 292f057b46169b1242faf039095c7d6dfd4cac6d Mon Sep 17 00:00:00 2001 From: Olin Blodgett Date: Thu, 2 Apr 2026 09:13:46 -0400 Subject: [PATCH 1/7] feat: replace BulkProcessor with custom RoutedBulkIndexer - New RoutedBulkIndexer class with shard routing, retry with backoff, and per-indexer stats (docStats, queueStats, esBatchRequestStats) - Removes 8 BulkProcessors, 8 builders, 8 clients, VCFJsonBulkIndexer - Each batch routed to a random shard for sequential writes - Proper 429/failure retry instead of silent data loss - System.exit(-1) after max retries exhausted to ensure data integrity - Indexing flag read from config; pipeline runs in both modes - ~300 lines of boilerplate removed from SourceDocumentCreation --- .../es/managers/RoutedBulkIndexer.java | 193 ++++++++++ .../es/managers/SourceDocumentCreation.java | 335 +----------------- 2 files changed, 210 insertions(+), 318 deletions(-) create mode 100644 agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java new file mode 100644 index 000000000..a70425410 --- /dev/null +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java @@ -0,0 +1,193 @@ +package org.alliancegenome.indexer.variant.es.managers; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ThreadLocalRandom; + +import org.alliancegenome.core.config.ConfigHelper; +import org.alliancegenome.core.variant.config.VariantConfigHelper; +import org.alliancegenome.es.util.EsClientFactory; +import org.alliancegenome.es.util.ProcessDisplayHelper; +import org.apache.commons.math3.stat.descriptive.SummaryStatistics; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.xcontent.XContentType; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RoutedBulkIndexer extends Thread { + + private final LinkedBlockingDeque> jsonQueue; + private final String indexName; + private final int shardCount; + private final long maxBulkSizeBytes; + private final int maxRetries; + private final long retryBaseMs; + private final String label; + + private RestHighLevelClient client; + private ProcessDisplayHelper ph; + + private final SummaryStatistics docStats = new SummaryStatistics(); + private final SummaryStatistics queueStats = new SummaryStatistics(); + private final SummaryStatistics esBatchRequestStats = new SummaryStatistics(); + private long totalBytes; + private long totalRetries; + private long totalFailedDocs; + + public RoutedBulkIndexer( + LinkedBlockingDeque> jsonQueue, + String indexName, + int shardCount, + int maxRetries, + String label + ) { + this.jsonQueue = jsonQueue; + this.indexName = indexName; + this.shardCount = shardCount; + this.maxBulkSizeBytes = ConfigHelper.getEsBulkSizeMB() * 1024 * 1024; + this.maxRetries = maxRetries; + this.retryBaseMs = 1000; + this.label = label; + } + + @Override + public void run() { + boolean indexing = VariantConfigHelper.isIndexing(); + if (indexing) { + client = EsClientFactory.getMustCloseSearchClient(); + } + ph = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); + ph.startProcess(label); + + try { + List pendingDocs = new ArrayList<>(); + long pendingBytes = 0; + + while (!Thread.currentThread().isInterrupted()) { + try { + List docs = jsonQueue.poll(1, TimeUnit.SECONDS); + if (docs == null) continue; + queueStats.addValue(docs.size()); + + for (String doc : docs) { + int docBytes = doc.getBytes(StandardCharsets.UTF_8).length; + docStats.addValue(docBytes); + + if (pendingBytes + docBytes > maxBulkSizeBytes && !pendingDocs.isEmpty()) { + submitBatch(pendingDocs); + pendingDocs = new ArrayList<>(); + pendingBytes = 0; + } + + pendingDocs.add(doc); + pendingBytes += docBytes; + totalBytes += docBytes; + ph.progressProcess(label + + " qs: (" + jsonQueue.size() + + ") q: (" + queueStats.getN() + "/" + queueStats.getMean() + + ") d: (" + docStats.getN() + "/" + docStats.getMean() + + ") es: (" + esBatchRequestStats.getN() + "/" + esBatchRequestStats.getMean() + + ") B/r/f: (" + totalBytes + "/" + totalRetries + "/" + totalFailedDocs + ")" + ); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // Flush remaining + if (!pendingDocs.isEmpty()) { + submitBatch(pendingDocs); + } + + } finally { + ph.finishProcess(); + logStats(); + if (client != null) { + try { + client.close(); + } catch (IOException e) { + log.error(label + " Error closing ES client", e); + } + } + } + } + + private void submitBatch(List docs) { + esBatchRequestStats.addValue(docs.size()); + String routing = Integer.toString(ThreadLocalRandom.current().nextInt(shardCount)); + submitWithRetry(docs, routing, 0); + } + + private void submitWithRetry(List docs, String routing, int attempt) { + if (!VariantConfigHelper.isIndexing()) return; + BulkRequest bulkRequest = new BulkRequest(); + for (String doc : docs) { + bulkRequest.add(new IndexRequest(indexName).source(doc, XContentType.JSON).routing(routing)); + } + + try { + BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); + + if (response.hasFailures()) { + List failedDocs = new ArrayList<>(); + for (BulkItemResponse item : response) { + if (item.isFailed()) { + failedDocs.add(docs.get(item.getItemId())); + } + } + + if (!failedDocs.isEmpty() && attempt < maxRetries) { + totalRetries++; + long sleepMs = retryBaseMs * (1L << Math.min(attempt, 10)); + log.warn(label + " Retrying " + failedDocs.size() + " failed items (attempt " + (attempt + 1) + "), sleeping " + sleepMs + "ms"); + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + submitWithRetry(failedDocs, routing, attempt + 1); + } else if (!failedDocs.isEmpty()) { + totalFailedDocs += failedDocs.size(); + log.error(label + " Failed to index " + failedDocs.size() + " documents after " + maxRetries + " retries"); + log.error(label + " First failure: " + response.getItems()[0].getFailureMessage()); + } + } + + } catch (IOException e) { + if (attempt < maxRetries) { + totalRetries++; + long sleepMs = retryBaseMs * (1L << Math.min(attempt, 10)); + log.warn(label + " Bulk request failed: " + e.getMessage() + ", retrying (attempt " + (attempt + 1) + "), sleeping " + sleepMs + "ms"); + try { + Thread.sleep(sleepMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + submitWithRetry(docs, routing, attempt + 1); + } else { + log.error(label + " Bulk request failed after " + maxRetries + " retries: " + e.getMessage()); + System.exit(-1); + } + } + } + + private void logStats() { + log.info(label + " Doc Stats: " + docStats); + log.info(label + " Queue Stats: " + queueStats); + log.info(label + " ES Batch Stats: " + esBatchRequestStats); + log.info(label + " Total Bytes: " + totalBytes + " Retries: " + totalRetries + " Failed: " + totalFailedDocs); + } +} diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java index ee7bc6c2c..20984afc3 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java @@ -7,7 +7,6 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import org.alliancegenome.core.config.ConfigHelper; import org.alliancegenome.core.filedownload.model.DownloadSource; import org.alliancegenome.core.util.StatsCollector; import org.alliancegenome.core.variant.config.VariantConfigHelper; @@ -21,21 +20,9 @@ import org.alliancegenome.es.index.site.cache.GeneDocumentCache; import org.alliancegenome.es.model.VariantSearchResultDocument; import org.alliancegenome.es.rest.RestConfig; -import org.alliancegenome.es.util.EsClientFactory; import org.alliancegenome.es.util.ProcessDisplayHelper; import org.alliancegenome.exceptional.client.ExceptionCatcher; import org.alliancegenome.neo4j.entity.SpeciesType; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.xcontent.XContentType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; @@ -58,27 +45,6 @@ public class SourceDocumentCreation extends Thread { private String[] header; public static String indexName; - private BulkProcessor.Builder builder1; - private BulkProcessor.Builder builder2; - private BulkProcessor.Builder builder3; - private BulkProcessor.Builder builder4; - private BulkProcessor.Builder builder5; - private BulkProcessor.Builder builder6; - private BulkProcessor.Builder builder7; - private BulkProcessor.Builder builder8; - - private BulkProcessor bulkProcessor1; - private BulkProcessor bulkProcessor2; - private BulkProcessor bulkProcessor3; - private BulkProcessor bulkProcessor4; - private BulkProcessor bulkProcessor5; - private BulkProcessor bulkProcessor6; - private BulkProcessor bulkProcessor7; - private BulkProcessor bulkProcessor8; - - // public AlleleRepository repo = new AlleleRepository(); - - private boolean indexing = VariantConfigHelper.isIndexing(); private boolean gatherStats = VariantConfigHelper.isGatherStats(); private LinkedBlockingDeque> vcQueue; @@ -97,26 +63,14 @@ public class SourceDocumentCreation extends Thread { private ProcessDisplayHelper ph1 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph2 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); - private ProcessDisplayHelper ph3 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); - private ProcessDisplayHelper ph4 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph5 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private VariantSummaryConverter variantSummaryConverter; private SequenceSummaryConverter sequenceSummaryConverter; private VariantSearchResultConverter variantSearchResultConverter; - private StatsCollector statsCollector = new StatsCollector(); private String messageHeader = ""; - private RestHighLevelClient client1; - private RestHighLevelClient client2; - private RestHighLevelClient client3; - private RestHighLevelClient client4; - private RestHighLevelClient client5; - private RestHighLevelClient client6; - private RestHighLevelClient client7; - private RestHighLevelClient client8; - public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet variantsCache) { this.downloadPath = downloadPath; this.source = source; @@ -142,167 +96,6 @@ public void run() { jsonQueue7 = new LinkedBlockingDeque<>(250); jsonQueue8 = new LinkedBlockingDeque<>(250); - if (indexing) { - client1 = EsClientFactory.getMustCloseSearchClient(); - client2 = EsClientFactory.getMustCloseSearchClient(); - client3 = EsClientFactory.getMustCloseSearchClient(); - client4 = EsClientFactory.getMustCloseSearchClient(); - client5 = EsClientFactory.getMustCloseSearchClient(); - client6 = EsClientFactory.getMustCloseSearchClient(); - client7 = EsClientFactory.getMustCloseSearchClient(); - client8 = EsClientFactory.getMustCloseSearchClient(); - - builder1 = BulkProcessor.builder((request, bulkListener) -> client1.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor1 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder2 = BulkProcessor.builder((request, bulkListener) -> client2.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor2 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder3 = BulkProcessor.builder((request, bulkListener) -> client3.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor3 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder4 = BulkProcessor.builder((request, bulkListener) -> client4.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder5 = BulkProcessor.builder((request, bulkListener) -> client5.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder6 = BulkProcessor.builder((request, bulkListener) -> client6.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder7 = BulkProcessor.builder((request, bulkListener) -> client7.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder8 = BulkProcessor.builder((request, bulkListener) -> client8.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - int concurrentRequests = source.getBulkProcessorConcurrentRequests() != null ? source.getBulkProcessorConcurrentRequests() : ConfigHelper.getEsBulkConcurrentRequests(); - ByteSizeValue bulkSize = new ByteSizeValue(ConfigHelper.getEsBulkSizeMB(), ByteSizeUnit.MB); - BackoffPolicy backoff = BackoffPolicy.exponentialBackoff(TimeValue.timeValueSeconds(1L), 100); - - bulkProcessor1 = builder1.setBulkActions(7643).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor2 = builder2.setBulkActions(6410).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor3 = builder3.setBulkActions(6045).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor4 = builder4.setBulkActions(6241).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor5 = builder5.setBulkActions(3120).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor6 = builder6.setBulkActions(2181).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor7 = builder7.setBulkActions(1393).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor8 = builder8.setBulkActions(510).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - - } - ph1.startProcess(messageHeader + "VCFReader"); List readers = new ArrayList(); for (String filePath : source.getGenerateFilePaths()) { @@ -336,39 +129,20 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) producers.add(producer); } - ArrayList indexers = new ArrayList<>(); - - if (!indexing) { - indexName = "no_index"; - } - - ph3.startProcess(messageHeader + "VCFJsonIndexer BulkProcessor"); - ph4.startProcess(messageHeader + "VCFJsonIndexer Buckets"); - for (int i = 0; i < VariantConfigHelper.getIndexerBulkProcessorThreads(); i++) { - VCFJsonBulkIndexer indexer1 = new VCFJsonBulkIndexer(jsonQueue1, bulkProcessor1); - indexer1.start(); - indexers.add(indexer1); - VCFJsonBulkIndexer indexer2 = new VCFJsonBulkIndexer(jsonQueue2, bulkProcessor2); - indexer2.start(); - indexers.add(indexer2); - VCFJsonBulkIndexer indexer3 = new VCFJsonBulkIndexer(jsonQueue3, bulkProcessor3); - indexer3.start(); - indexers.add(indexer3); - VCFJsonBulkIndexer indexer4 = new VCFJsonBulkIndexer(jsonQueue4, bulkProcessor4); - indexer4.start(); - indexers.add(indexer4); - VCFJsonBulkIndexer indexer5 = new VCFJsonBulkIndexer(jsonQueue5, bulkProcessor5); - indexer5.start(); - indexers.add(indexer5); - VCFJsonBulkIndexer indexer6 = new VCFJsonBulkIndexer(jsonQueue6, bulkProcessor6); - indexer6.start(); - indexers.add(indexer6); - VCFJsonBulkIndexer indexer7 = new VCFJsonBulkIndexer(jsonQueue7, bulkProcessor7); - indexer7.start(); - indexers.add(indexer7); - VCFJsonBulkIndexer indexer8 = new VCFJsonBulkIndexer(jsonQueue8, bulkProcessor8); - indexer8.start(); - indexers.add(indexer8); + int shardCount = VariantConfigHelper.getIndexerShards(); + LinkedBlockingDeque>[] jsonQueues = new LinkedBlockingDeque[] { + jsonQueue1, jsonQueue2, jsonQueue3, jsonQueue4, + jsonQueue5, jsonQueue6, jsonQueue7, jsonQueue8 + }; + + ArrayList indexers = new ArrayList<>(); + for (int i = 0; i < jsonQueues.length; i++) { + RoutedBulkIndexer indexer = new RoutedBulkIndexer( + jsonQueues[i], indexName, shardCount, 100, + messageHeader + "BP(" + (i + 1) + ")" + ); + indexer.start(); + indexers.add(indexer); } try { @@ -409,7 +183,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) log.info(messageHeader + "JSONProducers shutdown"); ph5.finishProcess(); - log.info(messageHeader + "Waiting for jsonQueue to empty"); + log.info(messageHeader + "Waiting for jsonQueues to empty"); while ( !jsonQueue1.isEmpty() || !jsonQueue2.isEmpty() || !jsonQueue3.isEmpty() || !jsonQueue4.isEmpty() || !jsonQueue5.isEmpty() || !jsonQueue6.isEmpty() || !jsonQueue7.isEmpty() || !jsonQueue8.isEmpty() @@ -417,55 +191,12 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) Thread.sleep(1000); } - log.info(messageHeader + "Waiting for bulk processors to finish"); - - Thread.sleep(60000); - - log.info(messageHeader + "JSon Queue Empty shuting down bulk indexers"); - for (VCFJsonBulkIndexer indexer : indexers) { + log.info(messageHeader + "Shutting down bulk indexers"); + for (RoutedBulkIndexer indexer : indexers) { indexer.interrupt(); indexer.join(); } log.info(messageHeader + "Bulk Indexers shutdown"); - ph3.finishProcess(); - ph4.finishProcess(); - - - if (gatherStats) { - statsCollector.printOutput(speciesType.getModName()); - } - - if (indexing) { - bulkProcessor1.flush(); - bulkProcessor2.flush(); - bulkProcessor3.flush(); - bulkProcessor4.flush(); - bulkProcessor5.flush(); - bulkProcessor6.flush(); - bulkProcessor7.flush(); - bulkProcessor8.flush(); - - bulkProcessor1.awaitClose(10, TimeUnit.DAYS); - bulkProcessor2.awaitClose(10, TimeUnit.DAYS); - bulkProcessor3.awaitClose(10, TimeUnit.DAYS); - bulkProcessor4.awaitClose(10, TimeUnit.DAYS); - bulkProcessor5.awaitClose(10, TimeUnit.DAYS); - bulkProcessor6.awaitClose(10, TimeUnit.DAYS); - bulkProcessor7.awaitClose(10, TimeUnit.DAYS); - bulkProcessor8.awaitClose(10, TimeUnit.DAYS); - - client1.close(); - client2.close(); - client3.close(); - client4.close(); - client5.close(); - client6.close(); - client7.close(); - client8.close(); - - } - - log.info(messageHeader + "Threads finished: "); } catch (Exception e) { ExceptionCatcher.report(e); @@ -771,36 +502,4 @@ public void run() { } } - private class VCFJsonBulkIndexer extends Thread { - private LinkedBlockingDeque> jsonQueue; - private BulkProcessor bulkProcessor; - - public VCFJsonBulkIndexer(LinkedBlockingDeque> jsonQueue, BulkProcessor bulkProcessor) { - this.jsonQueue = jsonQueue; - this.bulkProcessor = bulkProcessor; - } - - @Override - public void run() { - while (!(Thread.currentThread().isInterrupted())) { - try { - List docs = jsonQueue.take(); - - for (String doc : docs) { - if (gatherStats) { - statsCollector.addDocument(doc); - } - if (indexing) { - bulkProcessor.add(new IndexRequest(indexName).source(doc, XContentType.JSON)); - } - ph3.progressProcess(); - } - ph4.progressProcess("JSon Queue: " + jsonQueue.size()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - } From 791a236751c47b866297121a7470ebe74024ef9b Mon Sep 17 00:00:00 2001 From: Olin Blodgett Date: Thu, 2 Apr 2026 15:08:10 -0400 Subject: [PATCH 2/7] Switched to Smile Format --- agr_java_core/pom.xml | 4 + .../alliancegenome/es/rest/RestConfig.java | 17 ++++ .../es/managers/RoutedBulkIndexer.java | 90 ++++++++++++------- .../es/managers/SourceDocumentCreation.java | 85 +++++++++--------- 4 files changed, 123 insertions(+), 73 deletions(-) diff --git a/agr_java_core/pom.xml b/agr_java_core/pom.xml index f595206d1..44a971a2c 100644 --- a/agr_java_core/pom.xml +++ b/agr_java_core/pom.xml @@ -165,6 +165,10 @@ com.fasterxml.jackson.datatype jackson-datatype-jsr310 + + com.fasterxml.jackson.dataformat + jackson-dataformat-smile + com.fasterxml.jackson.core jackson-core diff --git a/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java b/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java index 24fdf30a1..ef80eb18a 100644 --- a/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java +++ b/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.blackbird.BlackbirdModule; @@ -69,5 +70,21 @@ public static ObjectMapper createObjectMapper() { ); return mapper; } + + public static ObjectMapper createSmileObjectMapper() { + ObjectMapper mapper = new ObjectMapper(new SmileFactory()); + mapper.registerModule(new JavaTimeModule()); + mapper.registerModule(new BlackbirdModule()); + mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION); + mapper.setSerializationInclusion(Include.NON_NULL); + mapper.setSerializationInclusion(Include.NON_EMPTY); + mapper.registerSubtypes( + new NamedType(ResourceDescriptor.class, "ResourceDescriptor"), + new NamedType(ResourceDescriptorPage.class, "ResourceDescriptorPage") + ); + return mapper; + } } diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java index a70425410..e5e3b70a3 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java @@ -1,12 +1,11 @@ package org.alliancegenome.indexer.variant.es.managers; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.alliancegenome.core.config.ConfigHelper; import org.alliancegenome.core.variant.config.VariantConfigHelper; @@ -26,7 +25,7 @@ @Slf4j public class RoutedBulkIndexer extends Thread { - private final LinkedBlockingDeque> jsonQueue; + private final LinkedBlockingDeque> jsonQueue; private final String indexName; private final int shardCount; private final long maxBulkSizeBytes; @@ -35,29 +34,35 @@ public class RoutedBulkIndexer extends Thread { private final String label; private RestHighLevelClient client; + private ProcessDisplayHelper phGlobal; private ProcessDisplayHelper ph; private final SummaryStatistics docStats = new SummaryStatistics(); private final SummaryStatistics queueStats = new SummaryStatistics(); private final SummaryStatistics esBatchRequestStats = new SummaryStatistics(); + + private boolean gatherStats = VariantConfigHelper.isGatherStats(); + private long totalBytes; private long totalRetries; private long totalFailedDocs; public RoutedBulkIndexer( - LinkedBlockingDeque> jsonQueue, + LinkedBlockingDeque> jsonQueue, String indexName, int shardCount, int maxRetries, - String label + String label, + ProcessDisplayHelper phGlobal ) { this.jsonQueue = jsonQueue; this.indexName = indexName; this.shardCount = shardCount; - this.maxBulkSizeBytes = ConfigHelper.getEsBulkSizeMB() * 1024 * 1024; + this.maxBulkSizeBytes = (ConfigHelper.getEsBulkSizeMB() * 1024 * 1024) * 10; //10MB * the multiplier this.maxRetries = maxRetries; this.retryBaseMs = 1000; this.label = label; + this.phGlobal = phGlobal; } @Override @@ -67,21 +72,30 @@ public void run() { client = EsClientFactory.getMustCloseSearchClient(); } ph = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); - ph.startProcess(label); + if(gatherStats) { + ph.startProcess(label); + } try { - List pendingDocs = new ArrayList<>(); + List pendingDocs = new ArrayList<>(); long pendingBytes = 0; while (!Thread.currentThread().isInterrupted()) { try { - List docs = jsonQueue.poll(1, TimeUnit.SECONDS); - if (docs == null) continue; - queueStats.addValue(docs.size()); + List docs = jsonQueue.poll(1, TimeUnit.SECONDS); + if (docs == null) { + continue; + } + if(gatherStats) { + queueStats.addValue(docs.size()); + } - for (String doc : docs) { - int docBytes = doc.getBytes(StandardCharsets.UTF_8).length; - docStats.addValue(docBytes); + for (byte[] smileDoc : docs) { + int docBytes = smileDoc.length; + + if(gatherStats) { + docStats.addValue(docBytes); + } if (pendingBytes + docBytes > maxBulkSizeBytes && !pendingDocs.isEmpty()) { submitBatch(pendingDocs); @@ -89,30 +103,38 @@ public void run() { pendingBytes = 0; } - pendingDocs.add(doc); + pendingDocs.add(smileDoc); pendingBytes += docBytes; - totalBytes += docBytes; - ph.progressProcess(label + - " qs: (" + jsonQueue.size() + - ") q: (" + queueStats.getN() + "/" + queueStats.getMean() + - ") d: (" + docStats.getN() + "/" + docStats.getMean() + - ") es: (" + esBatchRequestStats.getN() + "/" + esBatchRequestStats.getMean() + - ") B/r/f: (" + totalBytes + "/" + totalRetries + "/" + totalFailedDocs + ")" - ); + if(gatherStats) { + totalBytes += docBytes; + ph.progressProcess( + "qs: (" + jsonQueue.size() + + ") q: (" + queueStats.getN() + "/" + (int)queueStats.getMean() + + ") d: (" + docStats.getN() + "/" + (int)docStats.getMean() + + ") es: (" + esBatchRequestStats.getN() + "/" + (int)esBatchRequestStats.getMean() + + ") B/r/f: (" + totalBytes + "/" + totalRetries + "/" + totalFailedDocs + ")" + ); + } + phGlobal.progressProcess(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } + // Clear interrupt flag so the final bulk request can complete + Thread.interrupted(); + // Flush remaining if (!pendingDocs.isEmpty()) { submitBatch(pendingDocs); } } finally { - ph.finishProcess(); - logStats(); + if(gatherStats) { + ph.finishProcess(); + logStats(); + } if (client != null) { try { client.close(); @@ -123,24 +145,28 @@ public void run() { } } - private void submitBatch(List docs) { - esBatchRequestStats.addValue(docs.size()); + private void submitBatch(List docs) { + if(gatherStats) { + esBatchRequestStats.addValue(docs.size()); + } String routing = Integer.toString(ThreadLocalRandom.current().nextInt(shardCount)); submitWithRetry(docs, routing, 0); } - private void submitWithRetry(List docs, String routing, int attempt) { - if (!VariantConfigHelper.isIndexing()) return; + private void submitWithRetry(List docs, String routing, int attempt) { + if (!VariantConfigHelper.isIndexing()) { + return; + } BulkRequest bulkRequest = new BulkRequest(); - for (String doc : docs) { - bulkRequest.add(new IndexRequest(indexName).source(doc, XContentType.JSON).routing(routing)); + for (byte[] smileDoc : docs) { + bulkRequest.add(new IndexRequest(indexName).source(smileDoc, XContentType.SMILE).routing(routing)); } try { BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); if (response.hasFailures()) { - List failedDocs = new ArrayList<>(); + List failedDocs = new ArrayList<>(); for (BulkItemResponse item : response) { if (item.isFailed()) { failedDocs.add(docs.get(item.getItemId())); diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java index 20984afc3..3f9dee31e 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java @@ -8,7 +8,6 @@ import java.util.concurrent.TimeUnit; import org.alliancegenome.core.filedownload.model.DownloadSource; -import org.alliancegenome.core.util.StatsCollector; import org.alliancegenome.core.variant.config.VariantConfigHelper; import org.alliancegenome.core.variant.converters.SequenceSummaryConverter; import org.alliancegenome.core.variant.converters.VariantSearchResultConverter; @@ -45,25 +44,24 @@ public class SourceDocumentCreation extends Thread { private String[] header; public static String indexName; - private boolean gatherStats = VariantConfigHelper.isGatherStats(); - private LinkedBlockingDeque> vcQueue; private LinkedBlockingDeque> objectQueue; - private LinkedBlockingDeque> jsonQueue1; - private LinkedBlockingDeque> jsonQueue2; - private LinkedBlockingDeque> jsonQueue3; - private LinkedBlockingDeque> jsonQueue4; - private LinkedBlockingDeque> jsonQueue5; - private LinkedBlockingDeque> jsonQueue6; - private LinkedBlockingDeque> jsonQueue7; - private LinkedBlockingDeque> jsonQueue8; + private LinkedBlockingDeque> jsonQueue1; + private LinkedBlockingDeque> jsonQueue2; + private LinkedBlockingDeque> jsonQueue3; + private LinkedBlockingDeque> jsonQueue4; + private LinkedBlockingDeque> jsonQueue5; + private LinkedBlockingDeque> jsonQueue6; + private LinkedBlockingDeque> jsonQueue7; + private LinkedBlockingDeque> jsonQueue8; private long[][] jqs = new long[8][3]; // Json Queue Stats private ProcessDisplayHelper ph1 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph2 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); - private ProcessDisplayHelper ph5 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); + private ProcessDisplayHelper ph3 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); + private ProcessDisplayHelper ph4 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private VariantSummaryConverter variantSummaryConverter; private SequenceSummaryConverter sequenceSummaryConverter; @@ -121,7 +119,7 @@ public void run() { } List producers = new ArrayList<>(); - ph5.startProcess(messageHeader + "JSONProducers"); + ph3.startProcess(messageHeader + "JSONProducers"); int producerThreadCount = source.getProducerThreads() != null ? source.getProducerThreads() : VariantConfigHelper.getProducerThreads(); for (int i = 0; i < producerThreadCount; i++) { JSONProducer producer = new JSONProducer(); @@ -130,16 +128,17 @@ public void run() { } int shardCount = VariantConfigHelper.getIndexerShards(); - LinkedBlockingDeque>[] jsonQueues = new LinkedBlockingDeque[] { + LinkedBlockingDeque>[] jsonQueues = new LinkedBlockingDeque[] { jsonQueue1, jsonQueue2, jsonQueue3, jsonQueue4, jsonQueue5, jsonQueue6, jsonQueue7, jsonQueue8 }; + ph4.startProcess(messageHeader + "RoutedBulkIndexers"); ArrayList indexers = new ArrayList<>(); for (int i = 0; i < jsonQueues.length; i++) { RoutedBulkIndexer indexer = new RoutedBulkIndexer( jsonQueues[i], indexName, shardCount, 100, - messageHeader + "BP(" + (i + 1) + ")" + messageHeader + "BP(" + (i + 1) + ")", ph4 ); indexer.start(); indexers.add(indexer); @@ -181,7 +180,7 @@ public void run() { p.join(); } log.info(messageHeader + "JSONProducers shutdown"); - ph5.finishProcess(); + ph3.finishProcess(); log.info(messageHeader + "Waiting for jsonQueues to empty"); while ( @@ -196,8 +195,9 @@ public void run() { indexer.interrupt(); indexer.join(); } + ph4.finishProcess(); log.info(messageHeader + "Bulk Indexers shutdown"); - + } catch (Exception e) { ExceptionCatcher.report(e); e.printStackTrace(); @@ -327,7 +327,7 @@ public void run() { private class JSONProducer extends Thread { - private ObjectMapper mapper = RestConfig.createObjectMapper(); + private ObjectMapper mapper = RestConfig.createSmileObjectMapper(); private ObjectWriter cachedWriter; private ObjectWriter sequenceWriter; private ObjectWriter searchWriter; @@ -348,31 +348,34 @@ public void run() { try { List docList = objectQueue.take(); - List docs1 = new ArrayList<>(); - List docs2 = new ArrayList<>(); - List docs3 = new ArrayList<>(); - List docs4 = new ArrayList<>(); - List docs5 = new ArrayList<>(); - List docs6 = new ArrayList<>(); - List docs7 = new ArrayList<>(); - List docs8 = new ArrayList<>(); + List docs1 = new ArrayList<>(); + List docs2 = new ArrayList<>(); + List docs3 = new ArrayList<>(); + List docs4 = new ArrayList<>(); + List docs5 = new ArrayList<>(); + List docs6 = new ArrayList<>(); + List docs7 = new ArrayList<>(); + List docs8 = new ArrayList<>(); if (!docList.isEmpty()) { for (ESDocument doc : docList) { try { - String jsonDoc = null; + byte[] smileDoc = null; if (doc instanceof SequenceSummaryDocument ssd) { - jsonDoc = sequenceWriter.writeValueAsString(ssd); + //jsonDoc = sequenceWriter.writeValueAsString(ssd); + smileDoc = sequenceWriter.writeValueAsBytes(ssd); } else if (doc instanceof VariantSummaryDocument vsd) { - jsonDoc = cachedWriter.writeValueAsString(vsd); - } else if (doc instanceof VariantSearchResultDocument vsd) { - jsonDoc = searchWriter.writeValueAsString(vsd); + //jsonDoc = cachedWriter.writeValueAsString(vsd); + smileDoc = cachedWriter.writeValueAsBytes(vsd); + } else if (doc instanceof VariantSearchResultDocument vsrd) { + //jsonDoc = searchWriter.writeValueAsString(vsrd); + smileDoc = searchWriter.writeValueAsBytes(vsrd); } else { log.error("Unexpected ESDocument type: " + doc.getClass().getName()); continue; // This should never happen } - int len = jsonDoc.length(); + int len = smileDoc.length; // Welford's online update for mean, M2, M3 // This code distributes the document via size over the 8 @@ -400,28 +403,28 @@ public void run() { int t7 = (int) (mean + (2.0 * upperWidth)); if (len < t1) { - docs1.add(jsonDoc); + docs1.add(smileDoc); jqs[0][2] += len; } else if (len < t2) { - docs2.add(jsonDoc); + docs2.add(smileDoc); jqs[1][2] += len; } else if (len < t3) { - docs3.add(jsonDoc); + docs3.add(smileDoc); jqs[2][2] += len; } else if (len < t4) { - docs4.add(jsonDoc); + docs4.add(smileDoc); jqs[3][2] += len; } else if (len < t5) { - docs5.add(jsonDoc); + docs5.add(smileDoc); jqs[4][2] += len; } else if (len < t6) { - docs6.add(jsonDoc); + docs6.add(smileDoc); jqs[5][2] += len; } else if (len < t7) { - docs7.add(jsonDoc); + docs7.add(smileDoc); jqs[6][2] += len; } else { - docs8.add(jsonDoc); + docs8.add(smileDoc); jqs[7][2] += len; } @@ -438,7 +441,7 @@ public void run() { // + " jsonQueue8(" + jqs[7][0] + "," + jqs[7][1] + "," + jqs[7][2] + "): " + jsonQueue8.size() // ); - ph5.progressProcess(); + ph3.progressProcess(); } catch (Exception e) { ExceptionCatcher.report(e); From 872f7f29898282c9b67bf7869d3ea90c707d48f8 Mon Sep 17 00:00:00 2001 From: Olin Blodgett Date: Thu, 2 Apr 2026 15:52:47 -0400 Subject: [PATCH 3/7] Removed json queues --- .../es/managers/RoutedBulkIndexer.java | 5 +- .../es/managers/SourceDocumentCreation.java | 151 ++---------------- 2 files changed, 14 insertions(+), 142 deletions(-) diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java index e5e3b70a3..0e295ae2b 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.XContentType; import lombok.extern.slf4j.Slf4j; @@ -58,7 +59,7 @@ public RoutedBulkIndexer( this.jsonQueue = jsonQueue; this.indexName = indexName; this.shardCount = shardCount; - this.maxBulkSizeBytes = (ConfigHelper.getEsBulkSizeMB() * 1024 * 1024) * 10; //10MB * the multiplier + this.maxBulkSizeBytes = (ConfigHelper.getEsBulkSizeMB() * 1024 * 1024); //10MB * the multiplier this.maxRetries = maxRetries; this.retryBaseMs = 1000; this.label = label; @@ -157,7 +158,7 @@ private void submitWithRetry(List docs, String routing, int attempt) { if (!VariantConfigHelper.isIndexing()) { return; } - BulkRequest bulkRequest = new BulkRequest(); + BulkRequest bulkRequest = new BulkRequest().timeout(TimeValue.timeValueHours(1)); for (byte[] smileDoc : docs) { bulkRequest.add(new IndexRequest(indexName).source(smileDoc, XContentType.SMILE).routing(routing)); } diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java index 3f9dee31e..4b0f60140 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java @@ -47,16 +47,7 @@ public class SourceDocumentCreation extends Thread { private LinkedBlockingDeque> vcQueue; private LinkedBlockingDeque> objectQueue; - private LinkedBlockingDeque> jsonQueue1; - private LinkedBlockingDeque> jsonQueue2; - private LinkedBlockingDeque> jsonQueue3; - private LinkedBlockingDeque> jsonQueue4; - private LinkedBlockingDeque> jsonQueue5; - private LinkedBlockingDeque> jsonQueue6; - private LinkedBlockingDeque> jsonQueue7; - private LinkedBlockingDeque> jsonQueue8; - - private long[][] jqs = new long[8][3]; // Json Queue Stats + private LinkedBlockingDeque> jsonQueue; private ProcessDisplayHelper ph1 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph2 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); @@ -80,20 +71,12 @@ public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDo int objectQueueSize = source.getObjectQueueSize() != null ? source.getObjectQueueSize() : VariantConfigHelper.getSourceDocumentCreatorObjectQueueSize(); vcQueue = new LinkedBlockingDeque<>(vcQueueSize); objectQueue = new LinkedBlockingDeque<>(objectQueueSize); + jsonQueue = new LinkedBlockingDeque<>(250); } @Override public void run() { - jsonQueue1 = new LinkedBlockingDeque<>(250); - jsonQueue2 = new LinkedBlockingDeque<>(250); - jsonQueue3 = new LinkedBlockingDeque<>(250); - jsonQueue4 = new LinkedBlockingDeque<>(250); - jsonQueue5 = new LinkedBlockingDeque<>(250); - jsonQueue6 = new LinkedBlockingDeque<>(250); - jsonQueue7 = new LinkedBlockingDeque<>(250); - jsonQueue8 = new LinkedBlockingDeque<>(250); - ph1.startProcess(messageHeader + "VCFReader"); List readers = new ArrayList(); for (String filePath : source.getGenerateFilePaths()) { @@ -128,18 +111,11 @@ public void run() { } int shardCount = VariantConfigHelper.getIndexerShards(); - LinkedBlockingDeque>[] jsonQueues = new LinkedBlockingDeque[] { - jsonQueue1, jsonQueue2, jsonQueue3, jsonQueue4, - jsonQueue5, jsonQueue6, jsonQueue7, jsonQueue8 - }; ph4.startProcess(messageHeader + "RoutedBulkIndexers"); ArrayList indexers = new ArrayList<>(); - for (int i = 0; i < jsonQueues.length; i++) { - RoutedBulkIndexer indexer = new RoutedBulkIndexer( - jsonQueues[i], indexName, shardCount, 100, - messageHeader + "BP(" + (i + 1) + ")", ph4 - ); + for (int i = 0; i < shardCount * 4; i++) { + RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, indexName, shardCount, 100, messageHeader + "BP(" + (i + 1) + ")", ph4); indexer.start(); indexers.add(indexer); } @@ -183,10 +159,7 @@ public void run() { ph3.finishProcess(); log.info(messageHeader + "Waiting for jsonQueues to empty"); - while ( - !jsonQueue1.isEmpty() || !jsonQueue2.isEmpty() || !jsonQueue3.isEmpty() || !jsonQueue4.isEmpty() || - !jsonQueue5.isEmpty() || !jsonQueue6.isEmpty() || !jsonQueue7.isEmpty() || !jsonQueue8.isEmpty() - ) { + while (!jsonQueue.isEmpty()) { Thread.sleep(1000); } @@ -332,12 +305,6 @@ private class JSONProducer extends Thread { private ObjectWriter sequenceWriter; private ObjectWriter searchWriter; - // Welford's online algorithm state for mean, variance, and skewness - private long n; - private double mean; - private double m2; // second central moment (for variance/SD) - private double m3; // third central moment (for skewness) - @Override public void run() { mapper.disable(SerializationFeature.INDENT_OUTPUT); @@ -348,14 +315,7 @@ public void run() { try { List docList = objectQueue.take(); - List docs1 = new ArrayList<>(); - List docs2 = new ArrayList<>(); - List docs3 = new ArrayList<>(); - List docs4 = new ArrayList<>(); - List docs5 = new ArrayList<>(); - List docs6 = new ArrayList<>(); - List docs7 = new ArrayList<>(); - List docs8 = new ArrayList<>(); + List workBucket = new ArrayList<>(); if (!docList.isEmpty()) { for (ESDocument doc : docList) { @@ -372,61 +332,10 @@ public void run() { smileDoc = searchWriter.writeValueAsBytes(vsrd); } else { log.error("Unexpected ESDocument type: " + doc.getClass().getName()); - continue; // This should never happen + continue; } - int len = smileDoc.length; - - // Welford's online update for mean, M2, M3 - // This code distributes the document via size over the 8 - // queues so that each bulk processor works with same sized docs - n++; - double delta = len - mean; - double deltaN = delta / n; - double term1 = delta * deltaN * (n - 1); - mean += deltaN; - m3 += term1 * deltaN * (n - 2) - 3 * deltaN * m2; - m2 += term1; - - double sd = n > 1 ? Math.sqrt(m2 / (n - 1)) : 0.0; - double skew = (n > 2 && m2 > 0) ? (Math.sqrt(n) * m3 / Math.pow(m2, 1.5)) : 0.0; - - int lowerWidth = skew != 0.0 ? (int) (sd / skew) : (int) sd; - int upperWidth = (int) sd; - - int t1 = (int) (mean - (1.5 * lowerWidth)); - int t2 = (int) (mean - (1.0 * lowerWidth)); - int t3 = (int) (mean - (0.5 * lowerWidth)); - int t4 = (int) mean; - int t5 = (int) (mean + (0.5 * upperWidth)); - int t6 = (int) (mean + (1.0 * upperWidth)); - int t7 = (int) (mean + (2.0 * upperWidth)); - - if (len < t1) { - docs1.add(smileDoc); - jqs[0][2] += len; - } else if (len < t2) { - docs2.add(smileDoc); - jqs[1][2] += len; - } else if (len < t3) { - docs3.add(smileDoc); - jqs[2][2] += len; - } else if (len < t4) { - docs4.add(smileDoc); - jqs[3][2] += len; - } else if (len < t5) { - docs5.add(smileDoc); - jqs[4][2] += len; - } else if (len < t6) { - docs6.add(smileDoc); - jqs[5][2] += len; - } else if (len < t7) { - docs7.add(smileDoc); - jqs[6][2] += len; - } else { - docs8.add(smileDoc); - jqs[7][2] += len; - } + workBucket.add(smileDoc); // Left here for debugging purposes // ph5.progressProcess("M: " + (int) mean + " SD: " + (int) sd + " SK: " + skew @@ -450,48 +359,10 @@ public void run() { } try { - if (docs1.size() > 0) { - jsonQueue1.put(docs1); - jqs[0][0]++; - jqs[0][1] += docs1.size(); - } - if (docs2.size() > 0) { - jsonQueue2.put(docs2); - jqs[1][0]++; - jqs[1][1] += docs2.size(); - } - if (docs3.size() > 0) { - jsonQueue3.put(docs3); - jqs[2][0]++; - jqs[2][1] += docs3.size(); + if (workBucket.size() > 0) { + jsonQueue.put(workBucket); } - if (docs4.size() > 0) { - jsonQueue4.put(docs4); - jqs[3][0]++; - jqs[3][1] += docs4.size(); - } - if (docs5.size() > 0) { - jsonQueue5.put(docs5); - jqs[4][0]++; - jqs[4][1] += docs5.size(); - } - if (docs6.size() > 0) { - jsonQueue6.put(docs6); - jqs[5][0]++; - jqs[5][1] += docs6.size(); - } - if (docs7.size() > 0) { - jsonQueue7.put(docs7); - jqs[6][0]++; - jqs[6][1] += docs7.size(); - } - if (docs8.size() > 0) { - jsonQueue8.put(docs8); - jqs[7][0]++; - jqs[7][1] += docs8.size(); - } - - + } catch (InterruptedException e) { ExceptionCatcher.report(e); e.printStackTrace(); From 4d202b9a98a4859c57855e8f299cad34afe95bc7 Mon Sep 17 00:00:00 2001 From: Olin Blodgett Date: Thu, 2 Apr 2026 15:57:47 -0400 Subject: [PATCH 4/7] fix: checkstyle - whitespace after typecast, if, and operator wrap --- .../es/managers/RoutedBulkIndexer.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java index 0e295ae2b..e5f598ec8 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java @@ -73,7 +73,7 @@ public void run() { client = EsClientFactory.getMustCloseSearchClient(); } ph = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); - if(gatherStats) { + if (gatherStats) { ph.startProcess(label); } @@ -87,14 +87,14 @@ public void run() { if (docs == null) { continue; } - if(gatherStats) { + if (gatherStats) { queueStats.addValue(docs.size()); } for (byte[] smileDoc : docs) { int docBytes = smileDoc.length; - if(gatherStats) { + if (gatherStats) { docStats.addValue(docBytes); } @@ -106,14 +106,14 @@ public void run() { pendingDocs.add(smileDoc); pendingBytes += docBytes; - if(gatherStats) { + if (gatherStats) { totalBytes += docBytes; ph.progressProcess( - "qs: (" + jsonQueue.size() + - ") q: (" + queueStats.getN() + "/" + (int)queueStats.getMean() + - ") d: (" + docStats.getN() + "/" + (int)docStats.getMean() + - ") es: (" + esBatchRequestStats.getN() + "/" + (int)esBatchRequestStats.getMean() + - ") B/r/f: (" + totalBytes + "/" + totalRetries + "/" + totalFailedDocs + ")" + "qs: (" + jsonQueue.size() + + ") q: (" + queueStats.getN() + "/" + (int) queueStats.getMean() + + ") d: (" + docStats.getN() + "/" + (int) docStats.getMean() + + ") es: (" + esBatchRequestStats.getN() + "/" + (int) esBatchRequestStats.getMean() + + ") B/r/f: (" + totalBytes + "/" + totalRetries + "/" + totalFailedDocs + ")" ); } phGlobal.progressProcess(); @@ -132,7 +132,7 @@ public void run() { } } finally { - if(gatherStats) { + if (gatherStats) { ph.finishProcess(); logStats(); } @@ -147,7 +147,7 @@ public void run() { } private void submitBatch(List docs) { - if(gatherStats) { + if (gatherStats) { esBatchRequestStats.addValue(docs.size()); } String routing = Integer.toString(ThreadLocalRandom.current().nextInt(shardCount)); From ace585fac21aecbd9e737cca2d77b67e9abfde62 Mon Sep 17 00:00:00 2001 From: Olin Blodgett Date: Thu, 2 Apr 2026 16:03:08 -0400 Subject: [PATCH 5/7] fix: minor cleanup and config tuning for variant indexer Remove unnecessary parens in RoutedBulkIndexer, set non-MGD species inactive in VariantFileSet.yaml for MGD-only run, and increase local ES max_content_length to 1g. --- .../variant/es/managers/RoutedBulkIndexer.java | 2 +- .../src/main/resources/VariantFileSet.yaml | 12 ++++++------ docker/run_elasticsearch | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java index e5f598ec8..00c742790 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java @@ -59,7 +59,7 @@ public RoutedBulkIndexer( this.jsonQueue = jsonQueue; this.indexName = indexName; this.shardCount = shardCount; - this.maxBulkSizeBytes = (ConfigHelper.getEsBulkSizeMB() * 1024 * 1024); //10MB * the multiplier + this.maxBulkSizeBytes = ConfigHelper.getEsBulkSizeMB() * 1024 * 1024; //10MB * the multiplier this.maxRetries = maxRetries; this.retryBaseMs = 1000; this.label = label; diff --git a/agr_variant_indexer/src/main/resources/VariantFileSet.yaml b/agr_variant_indexer/src/main/resources/VariantFileSet.yaml index f8b51c7b5..36970eb9a 100644 --- a/agr_variant_indexer/src/main/resources/VariantFileSet.yaml +++ b/agr_variant_indexer/src/main/resources/VariantFileSet.yaml @@ -3,19 +3,19 @@ s3RootUrl: "https://download.alliancegenome.org/variants" downloadFileSources: - source: WB - active: true + active: false species: CELEGANS taxonId: "NCBITaxon:6239" chromosomeList: [I, II, III, IV, V, X, MtDNA] - source: RGD - active: true + active: false species: RAT taxonId: "NCBITaxon:10116" chromosomeList: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, X, Y] - source: FB - active: true + active: false species: FLY taxonId: "NCBITaxon:7227" chromosomeList: [2L, 2R, 3L, 3R, 4, X] @@ -32,19 +32,19 @@ downloadFileSources: objectQueueSize: 250 - source: SGD - active: true + active: false species: YEAST taxonId: "NCBITaxon:559292" chromosomeList: [chrI, chrII, chrIII, chrIV, chrV, chrVI, chrVII, chrVIII, chrIX, chrX, chrXI, chrXII, chrXIII, chrXIV, chrXV, chrXVI, chrmt] - source: ZFIN - active: true + active: false species: ZEBRAFISH taxonId: "NCBITaxon:7955" chromosomeList: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25] - source: HUMAN - active: true + active: false species: HUMAN taxonId: NCBITaxon:9606 chromosomeList: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, MT, X, Y] \ No newline at end of file diff --git a/docker/run_elasticsearch b/docker/run_elasticsearch index d85af80ef..973886d70 100755 --- a/docker/run_elasticsearch +++ b/docker/run_elasticsearch @@ -1 +1 @@ -docker run -d --net curation -p 9200:9200 -p 9300:9300 -e "xpack.security.enabled=false" -e "ingest.geoip.downloader.enabled=false" -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xmx8g -Xms8g" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.14.0 +docker run -d --net curation -p 9200:9200 -p 9300:9300 -e "http.max_content_length=1g" -e "xpack.security.enabled=false" -e "ingest.geoip.downloader.enabled=false" -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xmx8g -Xms8g" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.14.0 From f03fc01a614724ad1cf7c7d9a19fbdcafc9d0b59 Mon Sep 17 00:00:00 2001 From: Olin Blodgett Date: Thu, 2 Apr 2026 16:09:12 -0400 Subject: [PATCH 6/7] Bump curation version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 904dee1e8..f53dd8f08 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 3.3.1 10.17.0 UTF-8 - v0.47.25 + v0.47.26 From b62c4953fa0865bde1659d7742522e7ab9bfac71 Mon Sep 17 00:00:00 2001 From: Olin Blodgett Date: Thu, 2 Apr 2026 16:13:27 -0400 Subject: [PATCH 7/7] Revert claude commit --- .../src/main/resources/VariantFileSet.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/agr_variant_indexer/src/main/resources/VariantFileSet.yaml b/agr_variant_indexer/src/main/resources/VariantFileSet.yaml index 36970eb9a..f8b51c7b5 100644 --- a/agr_variant_indexer/src/main/resources/VariantFileSet.yaml +++ b/agr_variant_indexer/src/main/resources/VariantFileSet.yaml @@ -3,19 +3,19 @@ s3RootUrl: "https://download.alliancegenome.org/variants" downloadFileSources: - source: WB - active: false + active: true species: CELEGANS taxonId: "NCBITaxon:6239" chromosomeList: [I, II, III, IV, V, X, MtDNA] - source: RGD - active: false + active: true species: RAT taxonId: "NCBITaxon:10116" chromosomeList: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, X, Y] - source: FB - active: false + active: true species: FLY taxonId: "NCBITaxon:7227" chromosomeList: [2L, 2R, 3L, 3R, 4, X] @@ -32,19 +32,19 @@ downloadFileSources: objectQueueSize: 250 - source: SGD - active: false + active: true species: YEAST taxonId: "NCBITaxon:559292" chromosomeList: [chrI, chrII, chrIII, chrIV, chrV, chrVI, chrVII, chrVIII, chrIX, chrX, chrXI, chrXII, chrXIII, chrXIV, chrXV, chrXVI, chrmt] - source: ZFIN - active: false + active: true species: ZEBRAFISH taxonId: "NCBITaxon:7955" chromosomeList: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25] - source: HUMAN - active: false + active: true species: HUMAN taxonId: NCBITaxon:9606 chromosomeList: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, MT, X, Y] \ No newline at end of file