Skip to content

Commit 0091518

Browse files
authored
Merge pull request #1555 from alliance-genome/variant_indexer_improvements
refactor: shared bulk processor pool sized by cluster CPU count
2 parents 4fcede6 + 1d3a7a4 commit 0091518

File tree

7 files changed

+123
-62
lines changed

7 files changed

+123
-62
lines changed

agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/LiteratureIndexer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import org.alliancegenome.es.util.ProcessDisplayHelper;
1111
import org.alliancegenome.exceptional.client.ExceptionCatcher;
1212
import org.alliancegenome.indexer.config.IndexerConfig;
13-
import org.alliancegenome.indexer.indexers.curation.interfaces.ElasticSearchInterface;
13+
import org.alliancegenome.es.util.ElasticSearchInterface;
1414

1515
import com.fasterxml.jackson.databind.ObjectMapper;
1616

agr_java_core/src/main/java/org/alliancegenome/core/config/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ private Constants() {
8181
public static final String VARIANT_INDEXER_SHARDS = "VARIANT_INDEXER_SHARDS";
8282
public static final String VARIANT_INDEXER_BULK_PROCESSOR_THREADS = "VARIANT_INDEXER_BULK_PROCESSOR_THREADS";
8383
public static final String VARIANT_BULK_PROCESSOR_SETTINGS = "VARIANT_BULK_PROCESSOR_SETTINGS";
84+
public static final String VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE = "VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE";
8485

8586
// literature Indexer
8687
public static final String BLUETEAM_ES_URL = "BLUETEAM_ES_URL";

agr_java_core/src/main/java/org/alliancegenome/core/variant/config/VariantConfigHelper.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_THREADS;
2222
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_BUCKET_SIZE;
2323
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_SIZE;
24+
import static org.alliancegenome.core.config.Constants.VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE;
2425
import static org.alliancegenome.core.config.Constants.VARIANT_TRANSFORMER_THREADS;
2526

2627
import java.util.HashMap;
@@ -82,8 +83,8 @@ public static void init() {
8283
defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_VCQUEUE_BUCKET_SIZE, "25");
8384
defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_OBJECT_QUEUE_SIZE, "125");
8485
defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_OBJECT_QUEUE_BUCKET_SIZE, "120");
85-
86-
86+
defaults.put(VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE, String.valueOf(1 * 1024 * 1024));
87+
8788
defaults.put(VARIANT_PRODUCER_THREADS, "32");
8889
defaults.put(VARIANT_TRANSFORMER_THREADS, "12");
8990

@@ -335,6 +336,17 @@ public static int getSourceDocumentCreatorObjectQueueBucketSize() {
335336
}
336337
}
337338

339+
public static long getSourceDocumentCreatorJsonQueueBucketSize() {
340+
if (!init) {
341+
init();
342+
}
343+
try {
344+
return Long.parseLong(config.get(VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE));
345+
} catch (NumberFormatException e) {
346+
return 1024 * 1024;
347+
}
348+
}
349+
338350
public static int getIndexerBulkProcessorThreads() {
339351
if (!init) {
340352
init();

agr_indexer/src/main/java/org/alliancegenome/indexer/indexers/curation/interfaces/ElasticSearchInterface.java renamed to agr_java_core/src/main/java/org/alliancegenome/es/util/ElasticSearchInterface.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.alliancegenome.indexer.indexers.curation.interfaces;
1+
package org.alliancegenome.es.util;
22

33
import java.util.Map;
44

@@ -21,4 +21,8 @@ public interface ElasticSearchInterface {
2121
@GET
2222
@Path("/{index}/_count")
2323
Map<String, Object> count(@PathParam("index") String index);
24+
25+
@GET
26+
@Path("/_nodes/os")
27+
Map<String, Object> getNodesOs();
2428
}

agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreation.java

Lines changed: 23 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -54,27 +54,26 @@ public class SourceDocumentCreation extends Thread {
5454
private ProcessDisplayHelper ph1 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
5555
private ProcessDisplayHelper ph2 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
5656
private ProcessDisplayHelper ph3 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
57-
private ProcessDisplayHelper ph4 = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
5857

5958
private VariantSummaryConverter variantSummaryConverter;
6059
private SequenceSummaryConverter sequenceSummaryConverter;
6160
private VariantSearchResultConverter variantSearchResultConverter;
6261

6362
private String messageHeader = "";
6463

65-
public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet<String> variantsCache, Map<String, Integer> severityRanking) {
64+
public SourceDocumentCreation(String downloadPath, DownloadSource source, GeneDocumentCache geneCache, HashSet<String> variantsCache, Map<String, Integer> severityRanking, LinkedBlockingDeque<List<byte[]>> jsonQueue) {
6665
this.downloadPath = downloadPath;
6766
this.source = source;
6867
this.geneCache = geneCache;
6968
this.variantsCache = variantsCache;
7069
this.severityRanking = severityRanking;
70+
this.jsonQueue = jsonQueue;
7171
speciesType = SpeciesType.getTypeByID(source.getTaxonId());
7272
messageHeader = speciesType.getModName() + " ";
7373
int vcQueueSize = source.getVcQueueSize() != null ? source.getVcQueueSize() : VariantConfigHelper.getSourceDocumentCreatorVCQueueSize();
7474
int objectQueueSize = source.getObjectQueueSize() != null ? source.getObjectQueueSize() : VariantConfigHelper.getSourceDocumentCreatorObjectQueueSize();
7575
vcQueue = new LinkedBlockingDeque<>(vcQueueSize);
7676
objectQueue = new LinkedBlockingDeque<>(objectQueueSize);
77-
jsonQueue = new LinkedBlockingDeque<>(250);
7877
}
7978

8079
@Override
@@ -113,16 +112,6 @@ public void run() {
113112
producers.add(producer);
114113
}
115114

116-
int shardCount = VariantConfigHelper.getIndexerShards();
117-
118-
ph4.startProcess(messageHeader + "RoutedBulkIndexers");
119-
ArrayList<RoutedBulkIndexer> indexers = new ArrayList<>();
120-
for (int i = 0; i < shardCount * 4; i++) {
121-
RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, indexName, messageHeader + "BP(" + (i + 1) + ")", ph4);
122-
indexer.start();
123-
indexers.add(indexer);
124-
}
125-
126115
try {
127116

128117
log.info(messageHeader + "Waiting for VCFReader's to finish");
@@ -161,25 +150,10 @@ public void run() {
161150
log.info(messageHeader + "JSONProducers shutdown");
162151
ph3.finishProcess();
163152

164-
log.info(messageHeader + "Waiting for jsonQueues to empty");
165-
while (!jsonQueue.isEmpty()) {
166-
Thread.sleep(1000);
167-
}
168-
169-
log.info(messageHeader + "Shutting down bulk indexers");
170-
for (RoutedBulkIndexer indexer : indexers) {
171-
indexer.interrupt();
172-
indexer.join();
173-
}
174-
ph4.finishProcess();
175-
log.info(messageHeader + "Bulk Indexers shutdown");
176-
177153
} catch (Exception e) {
178154
ExceptionCatcher.report(e);
179155
e.printStackTrace();
180156
}
181-
182-
log.info(messageHeader + "Bulk Processors finished");
183157
}
184158

185159
private class VCFReader extends Thread {
@@ -303,6 +277,8 @@ public void run() {
303277

304278
private class JSONProducer extends Thread {
305279

280+
private final long workBucketMaxBytes = VariantConfigHelper.getSourceDocumentCreatorJsonQueueBucketSize();
281+
306282
private ObjectMapper mapper = RestConfig.createSmileObjectMapper();
307283
private ObjectWriter cachedWriter;
308284
private ObjectWriter sequenceWriter;
@@ -314,67 +290,58 @@ public void run() {
314290
searchWriter = mapper.writerWithView(CurationView.VariantSearchResultDocument.class);
315291
cachedWriter = mapper.writerWithView(CurationView.VariantSummaryDocument.class);
316292
sequenceWriter = mapper.writerWithView(CurationView.SequenceSummaryDocument.class);
293+
294+
List<byte[]> workBucket = new ArrayList<>();
295+
long workBucketBytes = 0;
296+
317297
while (!(Thread.currentThread().isInterrupted())) {
318298
try {
319299
List<ESDocument> docList = objectQueue.take();
320300

321-
List<byte[]> workBucket = new ArrayList<>();
322-
323301
if (!docList.isEmpty()) {
324302
for (ESDocument doc : docList) {
325303
try {
326304
byte[] smileDoc = null;
327305
if (doc instanceof SequenceSummaryDocument ssd) {
328-
//jsonDoc = sequenceWriter.writeValueAsString(ssd);
329306
smileDoc = sequenceWriter.writeValueAsBytes(ssd);
330307
} else if (doc instanceof VariantSummaryDocument vsd) {
331-
//jsonDoc = cachedWriter.writeValueAsString(vsd);
332308
smileDoc = cachedWriter.writeValueAsBytes(vsd);
333309
} else if (doc instanceof VariantSearchResultDocument vsrd) {
334-
//jsonDoc = searchWriter.writeValueAsString(vsrd);
335310
smileDoc = searchWriter.writeValueAsBytes(vsrd);
336311
} else {
337312
log.error("Unexpected ESDocument type: " + doc.getClass().getName());
338313
continue;
339314
}
340315

341316
workBucket.add(smileDoc);
317+
workBucketBytes += smileDoc.length;
318+
319+
if (workBucketBytes >= workBucketMaxBytes) {
320+
jsonQueue.put(workBucket);
321+
workBucket = new ArrayList<>();
322+
workBucketBytes = 0;
323+
}
342324

343-
// Left here for debugging purposes
344-
// ph5.progressProcess("M: " + (int) mean + " SD: " + (int) sd + " SK: " + skew
345-
// //+ " lw: " + lowerWidth + " uw: " + upperWidth + " t1: " + t1 + " t2: " + t2 + " t3: " + t3 + " t4: " + t4 + " t5: " + t5 + " t6: " + t6 + " t7: " + t7
346-
// + " jsonQueue1(" + jqs[0][0] + "," + jqs[0][1] + "," + jqs[0][2] + "): " + jsonQueue1.size()
347-
// + " jsonQueue2(" + jqs[1][0] + "," + jqs[1][1] + "," + jqs[1][2] + "): " + jsonQueue2.size()
348-
// + " jsonQueue3(" + jqs[2][0] + "," + jqs[2][1] + "," + jqs[2][2] + "): " + jsonQueue3.size()
349-
// + " jsonQueue4(" + jqs[3][0] + "," + jqs[3][1] + "," + jqs[3][2] + "): " + jsonQueue4.size()
350-
// + " jsonQueue5(" + jqs[4][0] + "," + jqs[4][1] + "," + jqs[4][2] + "): " + jsonQueue5.size()
351-
// + " jsonQueue6(" + jqs[5][0] + "," + jqs[5][1] + "," + jqs[5][2] + "): " + jsonQueue6.size()
352-
// + " jsonQueue7(" + jqs[6][0] + "," + jqs[6][1] + "," + jqs[6][2] + "): " + jsonQueue7.size()
353-
// + " jsonQueue8(" + jqs[7][0] + "," + jqs[7][1] + "," + jqs[7][2] + "): " + jsonQueue8.size()
354-
// );
355-
356325
ph3.progressProcess();
357326

358327
} catch (Exception e) {
359328
ExceptionCatcher.report(e);
360329
e.printStackTrace();
361330
}
362331
}
363-
364-
try {
365-
if (workBucket.size() > 0) {
366-
jsonQueue.put(workBucket);
367-
}
368-
369-
} catch (InterruptedException e) {
370-
ExceptionCatcher.report(e);
371-
e.printStackTrace();
372-
}
373332
}
374333
} catch (InterruptedException e) {
375334
Thread.currentThread().interrupt();
376335
}
336+
}
377337

338+
// Flush remaining docs
339+
try {
340+
if (!workBucket.isEmpty()) {
341+
jsonQueue.put(workBucket);
342+
}
343+
} catch (InterruptedException e) {
344+
Thread.currentThread().interrupt();
378345
}
379346
}
380347
}

agr_variant_indexer/src/main/java/org/alliancegenome/indexer/variant/es/managers/SourceDocumentCreationManager.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55
import java.util.HashSet;
66
import java.util.List;
77
import java.util.Map;
8+
import java.util.concurrent.LinkedBlockingDeque;
89

910
import org.alliancegenome.core.config.ConfigHelper;
1011
import org.alliancegenome.core.filedownload.model.DownloadFileSet;
1112
import org.alliancegenome.core.filedownload.model.DownloadSource;
13+
import org.alliancegenome.core.variant.config.VariantConfigHelper;
1214
import org.alliancegenome.curation_api.interfaces.crud.ontology.SoTermCrudInterface;
1315
import org.alliancegenome.curation_api.interfaces.document.VariantDocumentInterface;
1416
import org.alliancegenome.es.index.site.cache.GeneDocumentCache;
1517
import org.alliancegenome.es.rest.RestConfig;
18+
import org.alliancegenome.es.util.ElasticSearchInterface;
19+
import org.alliancegenome.es.util.ProcessDisplayHelper;
1620
import org.alliancegenome.exceptional.client.ExceptionCatcher;
1721
import org.alliancegenome.neo4j.repository.indexer.GeneIndexerRepository;
1822

@@ -62,17 +66,56 @@ public void run() {
6266
log.info("Fetching SO term severity ranking from curation API...");
6367
Map<String, Integer> severityRanking = soTermApi.getSeverityRanking();
6468
log.info("Fetched severity ranking for {} SO terms", severityRanking.size());
69+
70+
// Count active species and create shared jsonQueue
71+
long activeCount = downloadSet.getDownloadFileSources().stream().filter(DownloadSource::getActive).count();
72+
LinkedBlockingDeque<List<byte[]>> jsonQueue = new LinkedBlockingDeque<>((int) (250 * activeCount));
73+
74+
// Query cluster CPU count and create shared bulk indexer pool
75+
int totalCpus = getClusterCpuCount();
76+
int poolSize = totalCpus * 2;
77+
log.info("ES cluster total CPUs: {}, RoutedBulkIndexer pool size: {}, active species: {}, jsonQueue capacity: {}", totalCpus, poolSize, activeCount, 250 * activeCount);
78+
79+
// Start shared RoutedBulkIndexer pool
80+
ProcessDisplayHelper phPool = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
81+
phPool.startProcess("SharedRoutedBulkIndexers");
82+
ArrayList<RoutedBulkIndexer> indexers = new ArrayList<>();
83+
for (int i = 0; i < poolSize; i++) {
84+
RoutedBulkIndexer indexer = new RoutedBulkIndexer(jsonQueue, SourceDocumentCreation.indexName, "SharedBP(" + (i + 1) + ")", phPool);
85+
indexer.start();
86+
indexers.add(indexer);
87+
}
88+
89+
// Build and start active species creators
6590
List<SourceDocumentCreation> creators = new ArrayList<>();
6691
for (DownloadSource source : downloadSet.getDownloadFileSources()) {
6792
if (source.getActive()) {
68-
SourceDocumentCreation creator = new SourceDocumentCreation(downloadSet.getDownloadPath(), source, geneCache, variantsCache, severityRanking);
93+
SourceDocumentCreation creator = new SourceDocumentCreation(downloadSet.getDownloadPath(), source, geneCache, variantsCache, severityRanking, jsonQueue);
6994
creator.start();
7095
creators.add(creator);
7196
}
7297
}
98+
99+
// Wait for all species threads to complete
73100
for (SourceDocumentCreation creator : creators) {
74101
creator.join();
75102
}
103+
104+
// Wait for shared jsonQueue to drain
105+
log.info("All species threads finished, waiting for jsonQueue to drain");
106+
while (!jsonQueue.isEmpty()) {
107+
Thread.sleep(1000);
108+
}
109+
110+
// Shut down shared bulk indexer pool
111+
log.info("Shutting down shared RoutedBulkIndexer pool");
112+
for (RoutedBulkIndexer indexer : indexers) {
113+
indexer.interrupt();
114+
indexer.join();
115+
}
116+
phPool.finishProcess();
117+
log.info("Shared RoutedBulkIndexer pool shutdown");
118+
76119
log.info("SourceDocumentCreationManager all species finished");
77120

78121
} catch (Exception e) {
@@ -81,4 +124,38 @@ public void run() {
81124
System.exit(-1);
82125
}
83126
}
127+
128+
private int getClusterCpuCount() {
129+
int defaultCpus = VariantConfigHelper.getIndexerShards() / 2;
130+
try {
131+
String firstHost = ConfigHelper.getEsHost().split(",")[0];
132+
String esHost;
133+
String esPort;
134+
if (firstHost.contains(":")) {
135+
esHost = firstHost.split(":")[0];
136+
esPort = firstHost.split(":")[1];
137+
} else {
138+
esHost = firstHost;
139+
esPort = String.valueOf(ConfigHelper.getEsPort());
140+
}
141+
String esUrl = "http://" + esHost + ":" + esPort;
142+
ElasticSearchInterface esApi = RestProxyFactory.createProxy(ElasticSearchInterface.class, esUrl);
143+
Map<String, Object> response = esApi.getNodesOs();
144+
Map<String, Object> nodes = (Map<String, Object>) response.get("nodes");
145+
int totalCpus = 0;
146+
for (Object nodeObj : nodes.values()) {
147+
Map<String, Object> node = (Map<String, Object>) nodeObj;
148+
Map<String, Object> os = (Map<String, Object>) node.get("os");
149+
if (os != null && os.containsKey("available_processors")) {
150+
totalCpus += ((Number) os.get("available_processors")).intValue();
151+
}
152+
}
153+
if (totalCpus > 0) {
154+
return totalCpus;
155+
}
156+
} catch (Exception e) {
157+
log.warn("Failed to query ES cluster CPU count, using default: {}", defaultCpus, e);
158+
}
159+
return defaultCpus;
160+
}
84161
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<maven-checkstyle-plugin.version>3.3.1</maven-checkstyle-plugin.version>
2424
<checkstyle.version>10.17.0</checkstyle.version>
2525
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
26-
<curation.version>v0.47.32</curation.version>
26+
<curation.version>v0.47.33</curation.version>
2727
</properties>
2828
<dependencyManagement>
2929
<dependencies>

0 commit comments

Comments
 (0)