Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.alliancegenome.es.util.ProcessDisplayHelper;
import org.alliancegenome.exceptional.client.ExceptionCatcher;
import org.alliancegenome.indexer.config.IndexerConfig;
import org.alliancegenome.indexer.indexers.curation.interfaces.ElasticSearchInterface;
import org.alliancegenome.es.util.ElasticSearchInterface;

import com.fasterxml.jackson.databind.ObjectMapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ private Constants() {
public static final String VARIANT_INDEXER_SHARDS = "VARIANT_INDEXER_SHARDS";
public static final String VARIANT_INDEXER_BULK_PROCESSOR_THREADS = "VARIANT_INDEXER_BULK_PROCESSOR_THREADS";
public static final String VARIANT_BULK_PROCESSOR_SETTINGS = "VARIANT_BULK_PROCESSOR_SETTINGS";
public static final String VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE = "VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE";

// literature Indexer
public static final String BLUETEAM_ES_URL = "BLUETEAM_ES_URL";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_THREADS;
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_BUCKET_SIZE;
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_SIZE;
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE;
import static org.alliancegenome.core.config.Constants.VARIANT_TRANSFORMER_THREADS;

import java.util.HashMap;
Expand Down Expand Up @@ -82,8 +83,8 @@ public static void init() {
defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_BUCKET_SIZE, "25");
defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_OBJECT_QUEUE_SIZE, "125");
defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_OBJECT_QUEUE_BUCKET_SIZE, "120");

defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE, String.valueOf(1 * 1024 * 1024));

defaults.put(VARIANT_PRODUCER_THREADS, "32");
defaults.put(VARIANT_TRANSFORMER_THREADS, "12");

Expand Down Expand Up @@ -335,6 +336,17 @@ public static int getSourceDocumentCreatorObjectQueueBucketSize() {
}
}

public static long getSourceDocumentCreatorJsonQueueBucketSize() {
if (!init) {
init();
}
try {
return Long.parseLong(config.get(VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE));
} catch (NumberFormatException e) {
return 1024 * 1024;
}
}

public static int getIndexerBulkProcessorThreads() {
if (!init) {
init();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.alliancegenome.indexer.indexers.curation.interfaces;
package org.alliancegenome.es.util;

import java.util.Map;

Expand All @@ -21,4 +21,8 @@ public interface ElasticSearchInterface {
@GET
@Path("/{index}/_count")
Map<String, Object> count(@PathParam("index") String index);

@GET
@Path("/_nodes/os")
Map<String, Object> getNodesOs();
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,27 +54,26 @@ public class SourceDocumentCreation extends Thread {
private ProcessDisplayHelper ph1 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
private ProcessDisplayHelper ph2 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
private ProcessDisplayHelper ph3 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
private ProcessDisplayHelper ph4 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());

private VariantSummaryConverter variantSummaryConverter;
private SequenceSummaryConverter sequenceSummaryConverter;
private VariantSearchResultConverter variantSearchResultConverter;

private String messageHeader = "";

public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet<String> variantsCache, Map<String, Integer> severityRanking) {
public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet<String> variantsCache, Map<String, Integer> severityRanking, LinkedBlockingDeque<List<byte[]>> jsonQueue) {
this.downloadPath = downloadPath;
this.source = source;
this.geneCache = geneCache;
this.variantsCache = variantsCache;
this.severityRanking = severityRanking;
this.jsonQueue = jsonQueue;
speciesType = SpeciesType.getTypeByID(source.getTaxonId());
messageHeader = speciesType.getModName() + " ";
int vcQueueSize = source.getVcQueueSize() != null ? source.getVcQueueSize() : VariantConfigHelper.getSourceDocumentCreatorVCQueueSize();
int objectQueueSize = source.getObjectQueueSize() != null ? source.getObjectQueueSize() : VariantConfigHelper.getSourceDocumentCreatorObjectQueueSize();
vcQueue = new LinkedBlockingDeque<>(vcQueueSize);
objectQueue = new LinkedBlockingDeque<>(objectQueueSize);
jsonQueue = new LinkedBlockingDeque<>(250);
}

@Override
Expand Down Expand Up @@ -113,16 +112,6 @@ public void run() {
producers.add(producer);
}

int shardCount = VariantConfigHelper.getIndexerShards();

ph4.startProcess(messageHeader + "RoutedBulkIndexers");
ArrayList<RoutedBulkIndexer> indexers = new ArrayList<>();
for (int i = 0; i < shardCount * 4; i++) {
RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, indexName, messageHeader + "BP(" + (i + 1) + ")", ph4);
indexer.start();
indexers.add(indexer);
}

try {

log.info(messageHeader + "Waiting for VCFReader's to finish");
Expand Down Expand Up @@ -161,25 +150,10 @@ public void run() {
log.info(messageHeader + "JSONProducers shutdown");
ph3.finishProcess();

log.info(messageHeader + "Waiting for jsonQueues to empty");
while (!jsonQueue.isEmpty()) {
Thread.sleep(1000);
}

log.info(messageHeader + "Shutting down bulk indexers");
for (RoutedBulkIndexer indexer : indexers) {
indexer.interrupt();
indexer.join();
}
ph4.finishProcess();
log.info(messageHeader + "Bulk Indexers shutdown");

} catch (Exception e) {
ExceptionCatcher.report(e);
e.printStackTrace();
}

log.info(messageHeader + "Bulk Processors finished");
}

private class VCFReader extends Thread {
Expand Down Expand Up @@ -303,6 +277,8 @@ public void run() {

private class JSONProducer extends Thread {

private final long workBucketMaxBytes = VariantConfigHelper.getSourceDocumentCreatorJsonQueueBucketSize();

private ObjectMapper mapper = RestConfig.createSmileObjectMapper();
private ObjectWriter cachedWriter;
private ObjectWriter sequenceWriter;
Expand All @@ -314,67 +290,58 @@ public void run() {
searchWriter = mapper.writerWithView(CurationView.VariantSearchResultDocument.class);
cachedWriter = mapper.writerWithView(CurationView.VariantSummaryDocument.class);
sequenceWriter = mapper.writerWithView(CurationView.SequenceSummaryDocument.class);

List<byte[]> workBucket = new ArrayList<>();
long workBucketBytes = 0;

while (!(Thread.currentThread().isInterrupted())) {
try {
List<ESDocument> docList = objectQueue.take();

List<byte[]> workBucket = new ArrayList<>();

if (!docList.isEmpty()) {
for (ESDocument doc : docList) {
try {
byte[] smileDoc = null;
if (doc instanceof SequenceSummaryDocument ssd) {
//jsonDoc = sequenceWriter.writeValueAsString(ssd);
smileDoc = sequenceWriter.writeValueAsBytes(ssd);
} else if (doc instanceof VariantSummaryDocument vsd) {
//jsonDoc = cachedWriter.writeValueAsString(vsd);
smileDoc = cachedWriter.writeValueAsBytes(vsd);
} else if (doc instanceof VariantSearchResultDocument vsrd) {
//jsonDoc = searchWriter.writeValueAsString(vsrd);
smileDoc = searchWriter.writeValueAsBytes(vsrd);
} else {
log.error("Unexpected ESDocument type: " + doc.getClass().getName());
continue;
}

workBucket.add(smileDoc);
workBucketBytes += smileDoc.length;

if (workBucketBytes >= workBucketMaxBytes) {
jsonQueue.put(workBucket);
workBucket = new ArrayList<>();
workBucketBytes = 0;
}

// Left here for debugging purposes
// ph5.progressProcess("M: " + (int) mean + " SD: " + (int) sd + " SK: " + skew
// //+ " lw: " + lowerWidth + " uw: " + upperWidth + " t1: " + t1 + " t2: " + t2 + " t3: " + t3 + " t4: " + t4 + " t5: " + t5 + " t6: " + t6 + " t7: " + t7
// + " jsonQueue1(" + jqs[0][0] + "," + jqs[0][1] + "," + jqs[0][2] + "): " + jsonQueue1.size()
// + " jsonQueue2(" + jqs[1][0] + "," + jqs[1][1] + "," + jqs[1][2] + "): " + jsonQueue2.size()
// + " jsonQueue3(" + jqs[2][0] + "," + jqs[2][1] + "," + jqs[2][2] + "): " + jsonQueue3.size()
// + " jsonQueue4(" + jqs[3][0] + "," + jqs[3][1] + "," + jqs[3][2] + "): " + jsonQueue4.size()
// + " jsonQueue5(" + jqs[4][0] + "," + jqs[4][1] + "," + jqs[4][2] + "): " + jsonQueue5.size()
// + " jsonQueue6(" + jqs[5][0] + "," + jqs[5][1] + "," + jqs[5][2] + "): " + jsonQueue6.size()
// + " jsonQueue7(" + jqs[6][0] + "," + jqs[6][1] + "," + jqs[6][2] + "): " + jsonQueue7.size()
// + " jsonQueue8(" + jqs[7][0] + "," + jqs[7][1] + "," + jqs[7][2] + "): " + jsonQueue8.size()
// );

ph3.progressProcess();

} catch (Exception e) {
ExceptionCatcher.report(e);
e.printStackTrace();
}
}

try {
if (workBucket.size() > 0) {
jsonQueue.put(workBucket);
}

} catch (InterruptedException e) {
ExceptionCatcher.report(e);
e.printStackTrace();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

// Flush remaining docs
try {
if (!workBucket.isEmpty()) {
jsonQueue.put(workBucket);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;

import org.alliancegenome.core.config.ConfigHelper;
import org.alliancegenome.core.filedownload.model.DownloadFileSet;
import org.alliancegenome.core.filedownload.model.DownloadSource;
import org.alliancegenome.core.variant.config.VariantConfigHelper;
import org.alliancegenome.curation_api.interfaces.crud.ontology.SoTermCrudInterface;
import org.alliancegenome.curation_api.interfaces.document.VariantDocumentInterface;
import org.alliancegenome.es.index.site.cache.GeneDocumentCache;
import org.alliancegenome.es.rest.RestConfig;
import org.alliancegenome.es.util.ElasticSearchInterface;
import org.alliancegenome.es.util.ProcessDisplayHelper;
import org.alliancegenome.exceptional.client.ExceptionCatcher;
import org.alliancegenome.neo4j.repository.indexer.GeneIndexerRepository;

Expand Down Expand Up @@ -62,17 +66,56 @@ public void run() {
log.info("Fetching SO term severity ranking from curation API...");
Map<String, Integer> severityRanking = soTermApi.getSeverityRanking();
log.info("Fetched severity ranking for {} SO terms", severityRanking.size());

// Count active species and create shared jsonQueue
long activeCount = downloadSet.getDownloadFileSources().stream().filter(DownloadSource::getActive).count();
LinkedBlockingDeque<List<byte[]>> jsonQueue = new LinkedBlockingDeque<>((int) (250 * activeCount));

// Query cluster CPU count and create shared bulk indexer pool
int totalCpus = getClusterCpuCount();
int poolSize = totalCpus * 2;
log.info("ES cluster total CPUs: {}, RoutedBulkIndexer pool size: {}, active species: {}, jsonQueue capacity: {}", totalCpus, poolSize, activeCount, 250 * activeCount);

// Start shared RoutedBulkIndexer pool
ProcessDisplayHelper phPool = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
phPool.startProcess("SharedRoutedBulkIndexers");
ArrayList<RoutedBulkIndexer> indexers = new ArrayList<>();
for (int i = 0; i < poolSize; i++) {
RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, SourceDocumentCreation.indexName, "SharedBP(" + (i + 1) + ")", phPool);
indexer.start();
indexers.add(indexer);
}

// Build and start active species creators
List<SourceDocumentCreation> creators = new ArrayList<>();
for (DownloadSource source : downloadSet.getDownloadFileSources()) {
if (source.getActive()) {
SourceDocumentCreation creator = new SourceDocumentCreation(downloadSet.getDownloadPath(), source, geneCache, variantsCache, severityRanking);
SourceDocumentCreation creator = new SourceDocumentCreation(downloadSet.getDownloadPath(), source, geneCache, variantsCache, severityRanking, jsonQueue);
creator.start();
creators.add(creator);
}
}

// Wait for all species threads to complete
for (SourceDocumentCreation creator : creators) {
creator.join();
}

// Wait for shared jsonQueue to drain
log.info("All species threads finished, waiting for jsonQueue to drain");
while (!jsonQueue.isEmpty()) {
Thread.sleep(1000);
}

// Shut down shared bulk indexer pool
log.info("Shutting down shared RoutedBulkIndexer pool");
for (RoutedBulkIndexer indexer : indexers) {
indexer.interrupt();
indexer.join();
}
phPool.finishProcess();
log.info("Shared RoutedBulkIndexer pool shutdown");

log.info("SourceDocumentCreationManager all species finished");

} catch (Exception e) {
Expand All @@ -81,4 +124,38 @@ public void run() {
System.exit(-1);
}
}

private int getClusterCpuCount() {
int defaultCpus = VariantConfigHelper.getIndexerShards() / 2;
try {
String firstHost = ConfigHelper.getEsHost().split(",")[0];
String esHost;
String esPort;
if (firstHost.contains(":")) {
esHost = firstHost.split(":")[0];
esPort = firstHost.split(":")[1];
} else {
esHost = firstHost;
esPort = String.valueOf(ConfigHelper.getEsPort());
}
String esUrl = "http://" + esHost + ":" + esPort;
ElasticSearchInterface esApi = RestProxyFactory.createProxy(ElasticSearchInterface.class, esUrl);
Map<String, Object> response = esApi.getNodesOs();
Map<String, Object> nodes = (Map<String, Object>) response.get("nodes");
int totalCpus = 0;
for (Object nodeObj : nodes.values()) {
Map<String, Object> node = (Map<String, Object>) nodeObj;
Map<String, Object> os = (Map<String, Object>) node.get("os");
if (os != null && os.containsKey("available_processors")) {
totalCpus += ((Number) os.get("available_processors")).intValue();
}
}
if (totalCpus > 0) {
return totalCpus;
}
} catch (Exception e) {
log.warn("Failed to query ES cluster CPU count, using default: {}", defaultCpus, e);
}
return defaultCpus;
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<maven-checkstyle-plugin.version>3.3.1</maven-checkstyle-plugin.version>
<checkstyle.version>10.17.0</checkstyle.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<curation.version>v0.47.32</curation.version>
<curation.version>v0.47.33</curation.version>
</properties>
<dependencyManagement>
<dependencies>
Expand Down
Loading