diff --git a/agr_java_core/pom.xml b/agr_java_core/pom.xml index f595206d1..44a971a2c 100644 --- a/agr_java_core/pom.xml +++ b/agr_java_core/pom.xml @@ -165,6 +165,10 @@ com.fasterxml.jackson.datatype jackson-datatype-jsr310 + + com.fasterxml.jackson.dataformat + jackson-dataformat-smile + com.fasterxml.jackson.core jackson-core diff --git a/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java b/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java index 24fdf30a1..ef80eb18a 100644 --- a/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java +++ b/agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.blackbird.BlackbirdModule; @@ -69,5 +70,21 @@ public static ObjectMapper createObjectMapper() { ); return mapper; } + + public static ObjectMapper createSmileObjectMapper() { + ObjectMapper mapper = new ObjectMapper(new SmileFactory()); + mapper.registerModule(new JavaTimeModule()); + mapper.registerModule(new BlackbirdModule()); + mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION); + mapper.setSerializationInclusion(Include.NON_NULL); + mapper.setSerializationInclusion(Include.NON_EMPTY); + mapper.registerSubtypes( + new NamedType(ResourceDescriptor.class, "ResourceDescriptor"), + new NamedType(ResourceDescriptorPage.class, "ResourceDescriptorPage") + ); + return mapper; + } } diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java new file mode 100644 index 000000000..00c742790 --- /dev/null +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/RoutedBulkIndexer.java @@ -0,0 +1,220 @@ +package org.alliancegenome.indexer.variant.es.managers; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.alliancegenome.core.config.ConfigHelper; +import org.alliancegenome.core.variant.config.VariantConfigHelper; +import org.alliancegenome.es.util.EsClientFactory; +import org.alliancegenome.es.util.ProcessDisplayHelper; +import org.apache.commons.math3.stat.descriptive.SummaryStatistics; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.xcontent.XContentType; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RoutedBulkIndexer extends Thread { + + private final LinkedBlockingDeque> jsonQueue; + private final String indexName; + private final int shardCount; + private final long maxBulkSizeBytes; + private final int maxRetries; + private final long retryBaseMs; + private final String label; + + private RestHighLevelClient client; + private ProcessDisplayHelper phGlobal; + private ProcessDisplayHelper ph; + + private final SummaryStatistics docStats = new SummaryStatistics(); + private final SummaryStatistics queueStats = new SummaryStatistics(); + private final SummaryStatistics esBatchRequestStats = new SummaryStatistics(); + + private boolean gatherStats = VariantConfigHelper.isGatherStats(); + + private long totalBytes; + private long totalRetries; + private long totalFailedDocs; + + public RoutedBulkIndexer( + LinkedBlockingDeque> jsonQueue, + String indexName, + int shardCount, + int maxRetries, + String label, + ProcessDisplayHelper phGlobal + ) { + this.jsonQueue = jsonQueue; + this.indexName = indexName; + this.shardCount = shardCount; + this.maxBulkSizeBytes = ConfigHelper.getEsBulkSizeMB() * 1024 * 1024; //10MB * the multiplier + this.maxRetries = maxRetries; + this.retryBaseMs = 1000; + this.label = label; + this.phGlobal = phGlobal; + } + + @Override + public void run() { + boolean indexing = VariantConfigHelper.isIndexing(); + if (indexing) { + client = EsClientFactory.getMustCloseSearchClient(); + } + ph = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); + if (gatherStats) { + ph.startProcess(label); + } + + try { + List pendingDocs = new ArrayList<>(); + long pendingBytes = 0; + + while (!Thread.currentThread().isInterrupted()) { + try { + List docs = jsonQueue.poll(1, TimeUnit.SECONDS); + if (docs == null) { + continue; + } + if (gatherStats) { + queueStats.addValue(docs.size()); + } + + for (byte[] smileDoc : docs) { + int docBytes = smileDoc.length; + + if (gatherStats) { + docStats.addValue(docBytes); + } + + if (pendingBytes + docBytes > maxBulkSizeBytes && !pendingDocs.isEmpty()) { + submitBatch(pendingDocs); + pendingDocs = new ArrayList<>(); + pendingBytes = 0; + } + + pendingDocs.add(smileDoc); + pendingBytes += docBytes; + if (gatherStats) { + totalBytes += docBytes; + ph.progressProcess( + "qs: (" + jsonQueue.size() + + ") q: (" + queueStats.getN() + "/" + (int) queueStats.getMean() + + ") d: (" + docStats.getN() + "/" + (int) docStats.getMean() + + ") es: (" + esBatchRequestStats.getN() + "/" + (int) esBatchRequestStats.getMean() + + ") B/r/f: (" + totalBytes + "/" + totalRetries + "/" + totalFailedDocs + ")" + ); + } + phGlobal.progressProcess(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // Clear interrupt flag so the final bulk request can complete + Thread.interrupted(); + + // Flush remaining + if (!pendingDocs.isEmpty()) { + submitBatch(pendingDocs); + } + + } finally { + if (gatherStats) { + ph.finishProcess(); + logStats(); + } + if (client != null) { + try { + client.close(); + } catch (IOException e) { + log.error(label + " Error closing ES client", e); + } + } + } + } + + private void submitBatch(List docs) { + if (gatherStats) { + esBatchRequestStats.addValue(docs.size()); + } + String routing = Integer.toString(ThreadLocalRandom.current().nextInt(shardCount)); + submitWithRetry(docs, routing, 0); + } + + private void submitWithRetry(List docs, String routing, int attempt) { + if (!VariantConfigHelper.isIndexing()) { + return; + } + BulkRequest bulkRequest = new BulkRequest().timeout(TimeValue.timeValueHours(1)); + for (byte[] smileDoc : docs) { + bulkRequest.add(new IndexRequest(indexName).source(smileDoc, XContentType.SMILE).routing(routing)); + } + + try { + BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); + + if (response.hasFailures()) { + List failedDocs = new ArrayList<>(); + for (BulkItemResponse item : response) { + if (item.isFailed()) { + failedDocs.add(docs.get(item.getItemId())); + } + } + + if (!failedDocs.isEmpty() && attempt < maxRetries) { + totalRetries++; + long sleepMs = retryBaseMs * (1L << Math.min(attempt, 10)); + log.warn(label + " Retrying " + failedDocs.size() + " failed items (attempt " + (attempt + 1) + "), sleeping " + sleepMs + "ms"); + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + submitWithRetry(failedDocs, routing, attempt + 1); + } else if (!failedDocs.isEmpty()) { + totalFailedDocs += failedDocs.size(); + log.error(label + " Failed to index " + failedDocs.size() + " documents after " + maxRetries + " retries"); + log.error(label + " First failure: " + response.getItems()[0].getFailureMessage()); + } + } + + } catch (IOException e) { + if (attempt < maxRetries) { + totalRetries++; + long sleepMs = retryBaseMs * (1L << Math.min(attempt, 10)); + log.warn(label + " Bulk request failed: " + e.getMessage() + ", retrying (attempt " + (attempt + 1) + "), sleeping " + sleepMs + "ms"); + try { + Thread.sleep(sleepMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + submitWithRetry(docs, routing, attempt + 1); + } else { + log.error(label + " Bulk request failed after " + maxRetries + " retries: " + e.getMessage()); + System.exit(-1); + } + } + } + + private void logStats() { + log.info(label + " Doc Stats: " + docStats); + log.info(label + " Queue Stats: " + queueStats); + log.info(label + " ES Batch Stats: " + esBatchRequestStats); + log.info(label + " Total Bytes: " + totalBytes + " Retries: " + totalRetries + " Failed: " + totalFailedDocs); + } +} diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java index ee7bc6c2c..4b0f60140 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java @@ -7,9 +7,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; -import org.alliancegenome.core.config.ConfigHelper; import org.alliancegenome.core.filedownload.model.DownloadSource; -import org.alliancegenome.core.util.StatsCollector; import org.alliancegenome.core.variant.config.VariantConfigHelper; import org.alliancegenome.core.variant.converters.SequenceSummaryConverter; import org.alliancegenome.core.variant.converters.VariantSearchResultConverter; @@ -21,21 +19,9 @@ import org.alliancegenome.es.index.site.cache.GeneDocumentCache; import org.alliancegenome.es.model.VariantSearchResultDocument; import org.alliancegenome.es.rest.RestConfig; -import org.alliancegenome.es.util.EsClientFactory; import org.alliancegenome.es.util.ProcessDisplayHelper; import org.alliancegenome.exceptional.client.ExceptionCatcher; import org.alliancegenome.neo4j.entity.SpeciesType; -import org.elasticsearch.action.bulk.BackoffPolicy; -import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.TimeValue; -import org.elasticsearch.xcontent.XContentType; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; @@ -58,65 +44,22 @@ public class SourceDocumentCreation extends Thread { private String[] header; public static String indexName; - private BulkProcessor.Builder builder1; - private BulkProcessor.Builder builder2; - private BulkProcessor.Builder builder3; - private BulkProcessor.Builder builder4; - private BulkProcessor.Builder builder5; - private BulkProcessor.Builder builder6; - private BulkProcessor.Builder builder7; - private BulkProcessor.Builder builder8; - - private BulkProcessor bulkProcessor1; - private BulkProcessor bulkProcessor2; - private BulkProcessor bulkProcessor3; - private BulkProcessor bulkProcessor4; - private BulkProcessor bulkProcessor5; - private BulkProcessor bulkProcessor6; - private BulkProcessor bulkProcessor7; - private BulkProcessor bulkProcessor8; - - // public AlleleRepository repo = new AlleleRepository(); - - private boolean indexing = VariantConfigHelper.isIndexing(); - private boolean gatherStats = VariantConfigHelper.isGatherStats(); - private LinkedBlockingDeque> vcQueue; private LinkedBlockingDeque> objectQueue; - private LinkedBlockingDeque> jsonQueue1; - private LinkedBlockingDeque> jsonQueue2; - private LinkedBlockingDeque> jsonQueue3; - private LinkedBlockingDeque> jsonQueue4; - private LinkedBlockingDeque> jsonQueue5; - private LinkedBlockingDeque> jsonQueue6; - private LinkedBlockingDeque> jsonQueue7; - private LinkedBlockingDeque> jsonQueue8; - - private long[][] jqs = new long[8][3]; // Json Queue Stats + private LinkedBlockingDeque> jsonQueue; private ProcessDisplayHelper ph1 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph2 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph3 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph4 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); - private ProcessDisplayHelper ph5 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private VariantSummaryConverter variantSummaryConverter; private SequenceSummaryConverter sequenceSummaryConverter; private VariantSearchResultConverter variantSearchResultConverter; - private StatsCollector statsCollector = new StatsCollector(); private String messageHeader = ""; - private RestHighLevelClient client1; - private RestHighLevelClient client2; - private RestHighLevelClient client3; - private RestHighLevelClient client4; - private RestHighLevelClient client5; - private RestHighLevelClient client6; - private RestHighLevelClient client7; - private RestHighLevelClient client8; - public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet variantsCache) { this.downloadPath = downloadPath; this.source = source; @@ -128,181 +71,12 @@ public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDo int objectQueueSize = source.getObjectQueueSize() != null ? source.getObjectQueueSize() : VariantConfigHelper.getSourceDocumentCreatorObjectQueueSize(); vcQueue = new LinkedBlockingDeque<>(vcQueueSize); objectQueue = new LinkedBlockingDeque<>(objectQueueSize); + jsonQueue = new LinkedBlockingDeque<>(250); } @Override public void run() { - jsonQueue1 = new LinkedBlockingDeque<>(250); - jsonQueue2 = new LinkedBlockingDeque<>(250); - jsonQueue3 = new LinkedBlockingDeque<>(250); - jsonQueue4 = new LinkedBlockingDeque<>(250); - jsonQueue5 = new LinkedBlockingDeque<>(250); - jsonQueue6 = new LinkedBlockingDeque<>(250); - jsonQueue7 = new LinkedBlockingDeque<>(250); - jsonQueue8 = new LinkedBlockingDeque<>(250); - - if (indexing) { - client1 = EsClientFactory.getMustCloseSearchClient(); - client2 = EsClientFactory.getMustCloseSearchClient(); - client3 = EsClientFactory.getMustCloseSearchClient(); - client4 = EsClientFactory.getMustCloseSearchClient(); - client5 = EsClientFactory.getMustCloseSearchClient(); - client6 = EsClientFactory.getMustCloseSearchClient(); - client7 = EsClientFactory.getMustCloseSearchClient(); - client8 = EsClientFactory.getMustCloseSearchClient(); - - builder1 = BulkProcessor.builder((request, bulkListener) -> client1.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor1 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder2 = BulkProcessor.builder((request, bulkListener) -> client2.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor2 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder3 = BulkProcessor.builder((request, bulkListener) -> client3.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor3 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder4 = BulkProcessor.builder((request, bulkListener) -> client4.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder5 = BulkProcessor.builder((request, bulkListener) -> client5.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder6 = BulkProcessor.builder((request, bulkListener) -> client6.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder7 = BulkProcessor.builder((request, bulkListener) -> client7.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - builder8 = BulkProcessor.builder((request, bulkListener) -> client8.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(messageHeader + "BulkProcessor4 Request Failure: " + failure.getMessage()); - failure.printStackTrace(); - System.exit(-1); - } - }); - - int concurrentRequests = source.getBulkProcessorConcurrentRequests() != null ? source.getBulkProcessorConcurrentRequests() : ConfigHelper.getEsBulkConcurrentRequests(); - ByteSizeValue bulkSize = new ByteSizeValue(ConfigHelper.getEsBulkSizeMB(), ByteSizeUnit.MB); - BackoffPolicy backoff = BackoffPolicy.exponentialBackoff(TimeValue.timeValueSeconds(1L), 100); - - bulkProcessor1 = builder1.setBulkActions(7643).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor2 = builder2.setBulkActions(6410).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor3 = builder3.setBulkActions(6045).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor4 = builder4.setBulkActions(6241).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor5 = builder5.setBulkActions(3120).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor6 = builder6.setBulkActions(2181).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor7 = builder7.setBulkActions(1393).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - bulkProcessor8 = builder8.setBulkActions(510).setConcurrentRequests(concurrentRequests).setBulkSize(bulkSize).setBackoffPolicy(backoff).build(); - - } - ph1.startProcess(messageHeader + "VCFReader"); List readers = new ArrayList(); for (String filePath : source.getGenerateFilePaths()) { @@ -328,7 +102,7 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) } List producers = new ArrayList<>(); - ph5.startProcess(messageHeader + "JSONProducers"); + ph3.startProcess(messageHeader + "JSONProducers"); int producerThreadCount = source.getProducerThreads() != null ? source.getProducerThreads() : VariantConfigHelper.getProducerThreads(); for (int i = 0; i < producerThreadCount; i++) { JSONProducer producer = new JSONProducer(); @@ -336,39 +110,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) producers.add(producer); } - ArrayList indexers = new ArrayList<>(); + int shardCount = VariantConfigHelper.getIndexerShards(); - if (!indexing) { - indexName = "no_index"; - } - - ph3.startProcess(messageHeader + "VCFJsonIndexer BulkProcessor"); - ph4.startProcess(messageHeader + "VCFJsonIndexer Buckets"); - for (int i = 0; i < VariantConfigHelper.getIndexerBulkProcessorThreads(); i++) { - VCFJsonBulkIndexer indexer1 = new VCFJsonBulkIndexer(jsonQueue1, bulkProcessor1); - indexer1.start(); - indexers.add(indexer1); - VCFJsonBulkIndexer indexer2 = new VCFJsonBulkIndexer(jsonQueue2, bulkProcessor2); - indexer2.start(); - indexers.add(indexer2); - VCFJsonBulkIndexer indexer3 = new VCFJsonBulkIndexer(jsonQueue3, bulkProcessor3); - indexer3.start(); - indexers.add(indexer3); - VCFJsonBulkIndexer indexer4 = new VCFJsonBulkIndexer(jsonQueue4, bulkProcessor4); - indexer4.start(); - indexers.add(indexer4); - VCFJsonBulkIndexer indexer5 = new VCFJsonBulkIndexer(jsonQueue5, bulkProcessor5); - indexer5.start(); - indexers.add(indexer5); - VCFJsonBulkIndexer indexer6 = new VCFJsonBulkIndexer(jsonQueue6, bulkProcessor6); - indexer6.start(); - indexers.add(indexer6); - VCFJsonBulkIndexer indexer7 = new VCFJsonBulkIndexer(jsonQueue7, bulkProcessor7); - indexer7.start(); - indexers.add(indexer7); - VCFJsonBulkIndexer indexer8 = new VCFJsonBulkIndexer(jsonQueue8, bulkProcessor8); - indexer8.start(); - indexers.add(indexer8); + ph4.startProcess(messageHeader + "RoutedBulkIndexers"); + ArrayList indexers = new ArrayList<>(); + for (int i = 0; i < shardCount * 4; i++) { + RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, indexName, shardCount, 100, messageHeader + "BP(" + (i + 1) + ")", ph4); + indexer.start(); + indexers.add(indexer); } try { @@ -407,66 +156,21 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) p.join(); } log.info(messageHeader + "JSONProducers shutdown"); - ph5.finishProcess(); + ph3.finishProcess(); - log.info(messageHeader + "Waiting for jsonQueue to empty"); - while ( - !jsonQueue1.isEmpty() || !jsonQueue2.isEmpty() || !jsonQueue3.isEmpty() || !jsonQueue4.isEmpty() || - !jsonQueue5.isEmpty() || !jsonQueue6.isEmpty() || !jsonQueue7.isEmpty() || !jsonQueue8.isEmpty() - ) { + log.info(messageHeader + "Waiting for jsonQueues to empty"); + while (!jsonQueue.isEmpty()) { Thread.sleep(1000); } - log.info(messageHeader + "Waiting for bulk processors to finish"); - - Thread.sleep(60000); - - log.info(messageHeader + "JSon Queue Empty shuting down bulk indexers"); - for (VCFJsonBulkIndexer indexer : indexers) { + log.info(messageHeader + "Shutting down bulk indexers"); + for (RoutedBulkIndexer indexer : indexers) { indexer.interrupt(); indexer.join(); } - log.info(messageHeader + "Bulk Indexers shutdown"); - ph3.finishProcess(); ph4.finishProcess(); - - - if (gatherStats) { - statsCollector.printOutput(speciesType.getModName()); - } - - if (indexing) { - bulkProcessor1.flush(); - bulkProcessor2.flush(); - bulkProcessor3.flush(); - bulkProcessor4.flush(); - bulkProcessor5.flush(); - bulkProcessor6.flush(); - bulkProcessor7.flush(); - bulkProcessor8.flush(); - - bulkProcessor1.awaitClose(10, TimeUnit.DAYS); - bulkProcessor2.awaitClose(10, TimeUnit.DAYS); - bulkProcessor3.awaitClose(10, TimeUnit.DAYS); - bulkProcessor4.awaitClose(10, TimeUnit.DAYS); - bulkProcessor5.awaitClose(10, TimeUnit.DAYS); - bulkProcessor6.awaitClose(10, TimeUnit.DAYS); - bulkProcessor7.awaitClose(10, TimeUnit.DAYS); - bulkProcessor8.awaitClose(10, TimeUnit.DAYS); - - client1.close(); - client2.close(); - client3.close(); - client4.close(); - client5.close(); - client6.close(); - client7.close(); - client8.close(); - - } - - log.info(messageHeader + "Threads finished: "); - + log.info(messageHeader + "Bulk Indexers shutdown"); + } catch (Exception e) { ExceptionCatcher.report(e); e.printStackTrace(); @@ -596,17 +300,11 @@ public void run() { private class JSONProducer extends Thread { - private ObjectMapper mapper = RestConfig.createObjectMapper(); + private ObjectMapper mapper = RestConfig.createSmileObjectMapper(); private ObjectWriter cachedWriter; private ObjectWriter sequenceWriter; private ObjectWriter searchWriter; - // Welford's online algorithm state for mean, variance, and skewness - private long n; - private double mean; - private double m2; // second central moment (for variance/SD) - private double m3; // third central moment (for skewness) - @Override public void run() { mapper.disable(SerializationFeature.INDENT_OUTPUT); @@ -617,82 +315,27 @@ public void run() { try { List docList = objectQueue.take(); - List docs1 = new ArrayList<>(); - List docs2 = new ArrayList<>(); - List docs3 = new ArrayList<>(); - List docs4 = new ArrayList<>(); - List docs5 = new ArrayList<>(); - List docs6 = new ArrayList<>(); - List docs7 = new ArrayList<>(); - List docs8 = new ArrayList<>(); + List workBucket = new ArrayList<>(); if (!docList.isEmpty()) { for (ESDocument doc : docList) { try { - String jsonDoc = null; + byte[] smileDoc = null; if (doc instanceof SequenceSummaryDocument ssd) { - jsonDoc = sequenceWriter.writeValueAsString(ssd); + //jsonDoc = sequenceWriter.writeValueAsString(ssd); + smileDoc = sequenceWriter.writeValueAsBytes(ssd); } else if (doc instanceof VariantSummaryDocument vsd) { - jsonDoc = cachedWriter.writeValueAsString(vsd); - } else if (doc instanceof VariantSearchResultDocument vsd) { - jsonDoc = searchWriter.writeValueAsString(vsd); + //jsonDoc = cachedWriter.writeValueAsString(vsd); + smileDoc = cachedWriter.writeValueAsBytes(vsd); + } else if (doc instanceof VariantSearchResultDocument vsrd) { + //jsonDoc = searchWriter.writeValueAsString(vsrd); + smileDoc = searchWriter.writeValueAsBytes(vsrd); } else { log.error("Unexpected ESDocument type: " + doc.getClass().getName()); - continue; // This should never happen + continue; } - int len = jsonDoc.length(); - - // Welford's online update for mean, M2, M3 - // This code distributes the document via size over the 8 - // queues so that each bulk processor works with same sized docs - n++; - double delta = len - mean; - double deltaN = delta / n; - double term1 = delta * deltaN * (n - 1); - mean += deltaN; - m3 += term1 * deltaN * (n - 2) - 3 * deltaN * m2; - m2 += term1; - - double sd = n > 1 ? Math.sqrt(m2 / (n - 1)) : 0.0; - double skew = (n > 2 && m2 > 0) ? (Math.sqrt(n) * m3 / Math.pow(m2, 1.5)) : 0.0; - - int lowerWidth = skew != 0.0 ? (int) (sd / skew) : (int) sd; - int upperWidth = (int) sd; - - int t1 = (int) (mean - (1.5 * lowerWidth)); - int t2 = (int) (mean - (1.0 * lowerWidth)); - int t3 = (int) (mean - (0.5 * lowerWidth)); - int t4 = (int) mean; - int t5 = (int) (mean + (0.5 * upperWidth)); - int t6 = (int) (mean + (1.0 * upperWidth)); - int t7 = (int) (mean + (2.0 * upperWidth)); - - if (len < t1) { - docs1.add(jsonDoc); - jqs[0][2] += len; - } else if (len < t2) { - docs2.add(jsonDoc); - jqs[1][2] += len; - } else if (len < t3) { - docs3.add(jsonDoc); - jqs[2][2] += len; - } else if (len < t4) { - docs4.add(jsonDoc); - jqs[3][2] += len; - } else if (len < t5) { - docs5.add(jsonDoc); - jqs[4][2] += len; - } else if (len < t6) { - docs6.add(jsonDoc); - jqs[5][2] += len; - } else if (len < t7) { - docs7.add(jsonDoc); - jqs[6][2] += len; - } else { - docs8.add(jsonDoc); - jqs[7][2] += len; - } + workBucket.add(smileDoc); // Left here for debugging purposes // ph5.progressProcess("M: " + (int) mean + " SD: " + (int) sd + " SK: " + skew @@ -707,7 +350,7 @@ public void run() { // + " jsonQueue8(" + jqs[7][0] + "," + jqs[7][1] + "," + jqs[7][2] + "): " + jsonQueue8.size() // ); - ph5.progressProcess(); + ph3.progressProcess(); } catch (Exception e) { ExceptionCatcher.report(e); @@ -716,48 +359,10 @@ public void run() { } try { - if (docs1.size() > 0) { - jsonQueue1.put(docs1); - jqs[0][0]++; - jqs[0][1] += docs1.size(); - } - if (docs2.size() > 0) { - jsonQueue2.put(docs2); - jqs[1][0]++; - jqs[1][1] += docs2.size(); - } - if (docs3.size() > 0) { - jsonQueue3.put(docs3); - jqs[2][0]++; - jqs[2][1] += docs3.size(); + if (workBucket.size() > 0) { + jsonQueue.put(workBucket); } - if (docs4.size() > 0) { - jsonQueue4.put(docs4); - jqs[3][0]++; - jqs[3][1] += docs4.size(); - } - if (docs5.size() > 0) { - jsonQueue5.put(docs5); - jqs[4][0]++; - jqs[4][1] += docs5.size(); - } - if (docs6.size() > 0) { - jsonQueue6.put(docs6); - jqs[5][0]++; - jqs[5][1] += docs6.size(); - } - if (docs7.size() > 0) { - jsonQueue7.put(docs7); - jqs[6][0]++; - jqs[6][1] += docs7.size(); - } - if (docs8.size() > 0) { - jsonQueue8.put(docs8); - jqs[7][0]++; - jqs[7][1] += docs8.size(); - } - - + } catch (InterruptedException e) { ExceptionCatcher.report(e); e.printStackTrace(); @@ -771,36 +376,4 @@ public void run() { } } - private class VCFJsonBulkIndexer extends Thread { - private LinkedBlockingDeque> jsonQueue; - private BulkProcessor bulkProcessor; - - public VCFJsonBulkIndexer(LinkedBlockingDeque> jsonQueue, BulkProcessor bulkProcessor) { - this.jsonQueue = jsonQueue; - this.bulkProcessor = bulkProcessor; - } - - @Override - public void run() { - while (!(Thread.currentThread().isInterrupted())) { - try { - List docs = jsonQueue.take(); - - for (String doc : docs) { - if (gatherStats) { - statsCollector.addDocument(doc); - } - if (indexing) { - bulkProcessor.add(new IndexRequest(indexName).source(doc, XContentType.JSON)); - } - ph3.progressProcess(); - } - ph4.progressProcess("JSon Queue: " + jsonQueue.size()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - } - } diff --git a/docker/run_elasticsearch b/docker/run_elasticsearch index d85af80ef..973886d70 100755 --- a/docker/run_elasticsearch +++ b/docker/run_elasticsearch @@ -1 +1 @@ -docker run -d --net curation -p 9200:9200 -p 9300:9300 -e "xpack.security.enabled=false" -e "ingest.geoip.downloader.enabled=false" -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xmx8g -Xms8g" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.14.0 +docker run -d --net curation -p 9200:9200 -p 9300:9300 -e "http.max_content_length=1g" -e "xpack.security.enabled=false" -e "ingest.geoip.downloader.enabled=false" -e "discovery.type=single-node" -e "ES_JAVA_OPTS=-Xmx8g -Xms8g" --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:7.14.0 diff --git a/pom.xml b/pom.xml index 904dee1e8..f53dd8f08 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 3.3.1 10.17.0 UTF-8 - v0.47.25 + v0.47.26