diff --git a/agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/LiteratureIndexer.java b/agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/LiteratureIndexer.java index 80c348a03..b85b0e467 100644 --- a/agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/LiteratureIndexer.java +++ b/agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/LiteratureIndexer.java @@ -10,7 +10,7 @@ import org.alliancegenome.es.util.ProcessDisplayHelper; import org.alliancegenome.exceptional.client.ExceptionCatcher; import org.alliancegenome.indexer.config.IndexerConfig; -import org.alliancegenome.indexer.indexers.curation.interfaces.ElasticSearchInterface; +import org.alliancegenome.es.util.ElasticSearchInterface; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/agr_java_core/src/main/java/org/alliancegenome/core/config/Constants.java b/agr_java_core/src/main/java/org/alliancegenome/core/config/Constants.java index 983c316ef..a7f46ac4b 100644 --- a/agr_java_core/src/main/java/org/alliancegenome/core/config/Constants.java +++ b/agr_java_core/src/main/java/org/alliancegenome/core/config/Constants.java @@ -81,6 +81,7 @@ private Constants() { public static final String VARIANT_INDEXER_SHARDS = "VARIANT_INDEXER_SHARDS"; public static final String VARIANT_INDEXER_BULK_PROCESSOR_THREADS = "VARIANT_INDEXER_BULK_PROCESSOR_THREADS"; public static final String VARIANT_BULK_PROCESSOR_SETTINGS = "VARIANT_BULK_PROCESSOR_SETTINGS"; + public static final String VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE = "VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE"; // literature Indexer public static final String BLUETEAM_ES_URL = "BLUETEAM_ES_URL"; diff --git a/agr_java_core/src/main/java/org/alliancegenome/core/variant/config/VariantConfigHelper.java b/agr_java_core/src/main/java/org/alliancegenome/core/variant/config/VariantConfigHelper.java index f4dda0f78..08f0789e5 100644 --- a/agr_java_core/src/main/java/org/alliancegenome/core/variant/config/VariantConfigHelper.java +++ b/agr_java_core/src/main/java/org/alliancegenome/core/variant/config/VariantConfigHelper.java @@ -21,6 +21,7 @@ import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_THREADS; import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_BUCKET_SIZE; import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_SIZE; +import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE; import static org.alliancegenome.core.config.Constants.VARIANT_TRANSFORMER_THREADS; import java.util.HashMap; @@ -82,8 +83,8 @@ public static void init() { defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_BUCKET_SIZE, "25"); defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_OBJECT_QUEUE_SIZE, "125"); defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_OBJECT_QUEUE_BUCKET_SIZE, "120"); - - + defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE, String.valueOf(1 * 1024 * 1024)); + defaults.put(VARIANT_PRODUCER_THREADS, "32"); defaults.put(VARIANT_TRANSFORMER_THREADS, "12"); @@ -335,6 +336,17 @@ public static int getSourceDocumentCreatorObjectQueueBucketSize() { } } + public static long getSourceDocumentCreatorJsonQueueBucketSize() { + if (!init) { + init(); + } + try { + return Long.parseLong(config.get(VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE)); + } catch (NumberFormatException e) { + return 1024 * 1024; + } + } + public static int getIndexerBulkProcessorThreads() { if (!init) { init(); diff --git a/agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/curation/interfaces/ElasticSearchInterface.java b/agr_java_core/src/main/java/org/alliancegenome/es/util/ElasticSearchInterface.java similarity index 86% rename from agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/curation/interfaces/ElasticSearchInterface.java rename to agr_java_core/src/main/java/org/alliancegenome/es/util/ElasticSearchInterface.java index 427a673ee..52663e98b 100644 --- a/agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/curation/interfaces/ElasticSearchInterface.java +++ b/agr_java_core/src/main/java/org/alliancegenome/es/util/ElasticSearchInterface.java @@ -1,4 +1,4 @@ -package org.alliancegenome.indexer.indexers.curation.interfaces; +package org.alliancegenome.es.util; import java.util.Map; @@ -21,4 +21,8 @@ public interface ElasticSearchInterface { @GET @Path("/{index}/_count") Map count(@PathParam("index") String index); + + @GET + @Path("/_nodes/os") + Map getNodesOs(); } diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java index e1e94a01a..a5557fe68 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java @@ -54,7 +54,6 @@ public class SourceDocumentCreation extends Thread { private ProcessDisplayHelper ph1 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph2 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private ProcessDisplayHelper ph3 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); - private ProcessDisplayHelper ph4 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); private VariantSummaryConverter variantSummaryConverter; private SequenceSummaryConverter sequenceSummaryConverter; @@ -62,19 +61,19 @@ public class SourceDocumentCreation extends Thread { private String messageHeader = ""; - public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet variantsCache, Map severityRanking) { + public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet variantsCache, Map severityRanking, LinkedBlockingDeque> jsonQueue) { this.downloadPath = downloadPath; this.source = source; this.geneCache = geneCache; this.variantsCache = variantsCache; this.severityRanking = severityRanking; + this.jsonQueue = jsonQueue; speciesType = SpeciesType.getTypeByID(source.getTaxonId()); messageHeader = speciesType.getModName() + " "; int vcQueueSize = source.getVcQueueSize() != null ? source.getVcQueueSize() : VariantConfigHelper.getSourceDocumentCreatorVCQueueSize(); int objectQueueSize = source.getObjectQueueSize() != null ? source.getObjectQueueSize() : VariantConfigHelper.getSourceDocumentCreatorObjectQueueSize(); vcQueue = new LinkedBlockingDeque<>(vcQueueSize); objectQueue = new LinkedBlockingDeque<>(objectQueueSize); - jsonQueue = new LinkedBlockingDeque<>(250); } @Override @@ -113,16 +112,6 @@ public void run() { producers.add(producer); } - int shardCount = VariantConfigHelper.getIndexerShards(); - - ph4.startProcess(messageHeader + "RoutedBulkIndexers"); - ArrayList indexers = new ArrayList<>(); - for (int i = 0; i < shardCount * 4; i++) { - RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, indexName, messageHeader + "BP(" + (i + 1) + ")", ph4); - indexer.start(); - indexers.add(indexer); - } - try { log.info(messageHeader + "Waiting for VCFReader's to finish"); @@ -161,25 +150,10 @@ public void run() { log.info(messageHeader + "JSONProducers shutdown"); ph3.finishProcess(); - log.info(messageHeader + "Waiting for jsonQueues to empty"); - while (!jsonQueue.isEmpty()) { - Thread.sleep(1000); - } - - log.info(messageHeader + "Shutting down bulk indexers"); - for (RoutedBulkIndexer indexer : indexers) { - indexer.interrupt(); - indexer.join(); - } - ph4.finishProcess(); - log.info(messageHeader + "Bulk Indexers shutdown"); - } catch (Exception e) { ExceptionCatcher.report(e); e.printStackTrace(); } - - log.info(messageHeader + "Bulk Processors finished"); } private class VCFReader extends Thread { @@ -303,6 +277,8 @@ public void run() { private class JSONProducer extends Thread { + private final long workBucketMaxBytes = VariantConfigHelper.getSourceDocumentCreatorJsonQueueBucketSize(); + private ObjectMapper mapper = RestConfig.createSmileObjectMapper(); private ObjectWriter cachedWriter; private ObjectWriter sequenceWriter; @@ -314,24 +290,23 @@ public void run() { searchWriter = mapper.writerWithView(CurationView.VariantSearchResultDocument.class); cachedWriter = mapper.writerWithView(CurationView.VariantSummaryDocument.class); sequenceWriter = mapper.writerWithView(CurationView.SequenceSummaryDocument.class); + + List workBucket = new ArrayList<>(); + long workBucketBytes = 0; + while (!(Thread.currentThread().isInterrupted())) { try { List docList = objectQueue.take(); - List workBucket = new ArrayList<>(); - if (!docList.isEmpty()) { for (ESDocument doc : docList) { try { byte[] smileDoc = null; if (doc instanceof SequenceSummaryDocument ssd) { - //jsonDoc = sequenceWriter.writeValueAsString(ssd); smileDoc = sequenceWriter.writeValueAsBytes(ssd); } else if (doc instanceof VariantSummaryDocument vsd) { - //jsonDoc = cachedWriter.writeValueAsString(vsd); smileDoc = cachedWriter.writeValueAsBytes(vsd); } else if (doc instanceof VariantSearchResultDocument vsrd) { - //jsonDoc = searchWriter.writeValueAsString(vsrd); smileDoc = searchWriter.writeValueAsBytes(vsrd); } else { log.error("Unexpected ESDocument type: " + doc.getClass().getName()); @@ -339,20 +314,14 @@ public void run() { } workBucket.add(smileDoc); + workBucketBytes += smileDoc.length; + + if (workBucketBytes >= workBucketMaxBytes) { + jsonQueue.put(workBucket); + workBucket = new ArrayList<>(); + workBucketBytes = 0; + } - // Left here for debugging purposes -// ph5.progressProcess("M: " + (int) mean + " SD: " + (int) sd + " SK: " + skew -// //+ " lw: " + lowerWidth + " uw: " + upperWidth + " t1: " + t1 + " t2: " + t2 + " t3: " + t3 + " t4: " + t4 + " t5: " + t5 + " t6: " + t6 + " t7: " + t7 -// + " jsonQueue1(" + jqs[0][0] + "," + jqs[0][1] + "," + jqs[0][2] + "): " + jsonQueue1.size() -// + " jsonQueue2(" + jqs[1][0] + "," + jqs[1][1] + "," + jqs[1][2] + "): " + jsonQueue2.size() -// + " jsonQueue3(" + jqs[2][0] + "," + jqs[2][1] + "," + jqs[2][2] + "): " + jsonQueue3.size() -// + " jsonQueue4(" + jqs[3][0] + "," + jqs[3][1] + "," + jqs[3][2] + "): " + jsonQueue4.size() -// + " jsonQueue5(" + jqs[4][0] + "," + jqs[4][1] + "," + jqs[4][2] + "): " + jsonQueue5.size() -// + " jsonQueue6(" + jqs[5][0] + "," + jqs[5][1] + "," + jqs[5][2] + "): " + jsonQueue6.size() -// + " jsonQueue7(" + jqs[6][0] + "," + jqs[6][1] + "," + jqs[6][2] + "): " + jsonQueue7.size() -// + " jsonQueue8(" + jqs[7][0] + "," + jqs[7][1] + "," + jqs[7][2] + "): " + jsonQueue8.size() -// ); - ph3.progressProcess(); } catch (Exception e) { @@ -360,21 +329,19 @@ public void run() { e.printStackTrace(); } } - - try { - if (workBucket.size() > 0) { - jsonQueue.put(workBucket); - } - - } catch (InterruptedException e) { - ExceptionCatcher.report(e); - e.printStackTrace(); - } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } + } + // Flush remaining docs + try { + if (!workBucket.isEmpty()) { + jsonQueue.put(workBucket); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } } diff --git a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreationManager.java b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreationManager.java index c4986b4f7..792911f75 100644 --- a/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreationManager.java +++ b/agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreationManager.java @@ -5,14 +5,18 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.LinkedBlockingDeque; import org.alliancegenome.core.config.ConfigHelper; import org.alliancegenome.core.filedownload.model.DownloadFileSet; import org.alliancegenome.core.filedownload.model.DownloadSource; +import org.alliancegenome.core.variant.config.VariantConfigHelper; import org.alliancegenome.curation_api.interfaces.crud.ontology.SoTermCrudInterface; import org.alliancegenome.curation_api.interfaces.document.VariantDocumentInterface; import org.alliancegenome.es.index.site.cache.GeneDocumentCache; import org.alliancegenome.es.rest.RestConfig; +import org.alliancegenome.es.util.ElasticSearchInterface; +import org.alliancegenome.es.util.ProcessDisplayHelper; import org.alliancegenome.exceptional.client.ExceptionCatcher; import org.alliancegenome.neo4j.repository.indexer.GeneIndexerRepository; @@ -62,17 +66,56 @@ public void run() { log.info("Fetching SO term severity ranking from curation API..."); Map severityRanking = soTermApi.getSeverityRanking(); log.info("Fetched severity ranking for {} SO terms", severityRanking.size()); + + // Count active species and create shared jsonQueue + long activeCount = downloadSet.getDownloadFileSources().stream().filter(DownloadSource::getActive).count(); + LinkedBlockingDeque> jsonQueue = new LinkedBlockingDeque<>((int) (250 * activeCount)); + + // Query cluster CPU count and create shared bulk indexer pool + int totalCpus = getClusterCpuCount(); + int poolSize = totalCpus * 2; + log.info("ES cluster total CPUs: {}, RoutedBulkIndexer pool size: {}, active species: {}, jsonQueue capacity: {}", totalCpus, poolSize, activeCount, 250 * activeCount); + + // Start shared RoutedBulkIndexer pool + ProcessDisplayHelper phPool = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval()); + phPool.startProcess("SharedRoutedBulkIndexers"); + ArrayList indexers = new ArrayList<>(); + for (int i = 0; i < poolSize; i++) { + RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, SourceDocumentCreation.indexName, "SharedBP(" + (i + 1) + ")", phPool); + indexer.start(); + indexers.add(indexer); + } + + // Build and start active species creators List creators = new ArrayList<>(); for (DownloadSource source : downloadSet.getDownloadFileSources()) { if (source.getActive()) { - SourceDocumentCreation creator = new SourceDocumentCreation(downloadSet.getDownloadPath(), source, geneCache, variantsCache, severityRanking); + SourceDocumentCreation creator = new SourceDocumentCreation(downloadSet.getDownloadPath(), source, geneCache, variantsCache, severityRanking, jsonQueue); creator.start(); creators.add(creator); } } + + // Wait for all species threads to complete for (SourceDocumentCreation creator : creators) { creator.join(); } + + // Wait for shared jsonQueue to drain + log.info("All species threads finished, waiting for jsonQueue to drain"); + while (!jsonQueue.isEmpty()) { + Thread.sleep(1000); + } + + // Shut down shared bulk indexer pool + log.info("Shutting down shared RoutedBulkIndexer pool"); + for (RoutedBulkIndexer indexer : indexers) { + indexer.interrupt(); + indexer.join(); + } + phPool.finishProcess(); + log.info("Shared RoutedBulkIndexer pool shutdown"); + log.info("SourceDocumentCreationManager all species finished"); } catch (Exception e) { @@ -81,4 +124,38 @@ public void run() { System.exit(-1); } } + + private int getClusterCpuCount() { + int defaultCpus = VariantConfigHelper.getIndexerShards() / 2; + try { + String firstHost = ConfigHelper.getEsHost().split(",")[0]; + String esHost; + String esPort; + if (firstHost.contains(":")) { + esHost = firstHost.split(":")[0]; + esPort = firstHost.split(":")[1]; + } else { + esHost = firstHost; + esPort = String.valueOf(ConfigHelper.getEsPort()); + } + String esUrl = "http://" + esHost + ":" + esPort; + ElasticSearchInterface esApi = RestProxyFactory.createProxy(ElasticSearchInterface.class, esUrl); + Map response = esApi.getNodesOs(); + Map nodes = (Map) response.get("nodes"); + int totalCpus = 0; + for (Object nodeObj : nodes.values()) { + Map node = (Map) nodeObj; + Map os = (Map) node.get("os"); + if (os != null && os.containsKey("available_processors")) { + totalCpus += ((Number) os.get("available_processors")).intValue(); + } + } + if (totalCpus > 0) { + return totalCpus; + } + } catch (Exception e) { + log.warn("Failed to query ES cluster CPU count, using default: {}", defaultCpus, e); + } + return defaultCpus; + } } diff --git a/pom.xml b/pom.xml index b941dda21..fe3502d0d 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 3.3.1 10.17.0 UTF-8 - v0.47.32 + v0.47.33