Skip to content

Commit 5a9295e

Browse files
authored
Merge pull request #1548 from alliance-genome/custom-bulk-processor
Replace BulkProcessor with custom RoutedBulkIndexer
2 parents 71f41ff + b62c495 commit 5a9295e

File tree

6 files changed

+276
-462
lines changed

6 files changed

+276
-462
lines changed

agr_java_core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,10 @@
165165
<groupId>com.fasterxml.jackson.datatype</groupId>
166166
<artifactId>jackson-datatype-jsr310</artifactId>
167167
</dependency>
168+
<dependency>
169+
<groupId>com.fasterxml.jackson.dataformat</groupId>
170+
<artifactId>jackson-dataformat-smile</artifactId>
171+
</dependency>
168172
<dependency>
169173
<groupId>com.fasterxml.jackson.core</groupId>
170174
<artifactId>jackson-core</artifactId>

agr_java_core/src/main/java/org/alliancegenome/es/rest/RestConfig.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.fasterxml.jackson.databind.ObjectMapper;
1313
import com.fasterxml.jackson.databind.SerializationFeature;
1414
import com.fasterxml.jackson.databind.jsontype.NamedType;
15+
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
1516
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
1617
import com.fasterxml.jackson.module.blackbird.BlackbirdModule;
1718

@@ -69,5 +70,21 @@ public static ObjectMapper createObjectMapper() {
6970
);
7071
return mapper;
7172
}
73+
74+
public static ObjectMapper createSmileObjectMapper() {
75+
ObjectMapper mapper = new ObjectMapper(new SmileFactory());
76+
mapper.registerModule(new JavaTimeModule());
77+
mapper.registerModule(new BlackbirdModule());
78+
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
79+
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
80+
mapper.disable(MapperFeature.DEFAULT_VIEW_INCLUSION);
81+
mapper.setSerializationInclusion(Include.NON_NULL);
82+
mapper.setSerializationInclusion(Include.NON_EMPTY);
83+
mapper.registerSubtypes(
84+
new NamedType(ResourceDescriptor.class, "ResourceDescriptor"),
85+
new NamedType(ResourceDescriptorPage.class, "ResourceDescriptorPage")
86+
);
87+
return mapper;
88+
}
7289

7390
}
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package org.alliancegenome.indexer.variant.es.managers;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.concurrent.LinkedBlockingDeque;
7+
import java.util.concurrent.ThreadLocalRandom;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import org.alliancegenome.core.config.ConfigHelper;
11+
import org.alliancegenome.core.variant.config.VariantConfigHelper;
12+
import org.alliancegenome.es.util.EsClientFactory;
13+
import org.alliancegenome.es.util.ProcessDisplayHelper;
14+
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
15+
import org.elasticsearch.action.bulk.BulkItemResponse;
16+
import org.elasticsearch.action.bulk.BulkRequest;
17+
import org.elasticsearch.action.bulk.BulkResponse;
18+
import org.elasticsearch.action.index.IndexRequest;
19+
import org.elasticsearch.client.RequestOptions;
20+
import org.elasticsearch.client.RestHighLevelClient;
21+
import org.elasticsearch.core.TimeValue;
22+
import org.elasticsearch.xcontent.XContentType;
23+
24+
import lombok.extern.slf4j.Slf4j;
25+
26+
@Slf4j
27+
public class RoutedBulkIndexer extends Thread {
28+
29+
private final LinkedBlockingDeque<List<byte[]>> jsonQueue;
30+
private final String indexName;
31+
private final int shardCount;
32+
private final long maxBulkSizeBytes;
33+
private final int maxRetries;
34+
private final long retryBaseMs;
35+
private final String label;
36+
37+
private RestHighLevelClient client;
38+
private ProcessDisplayHelper phGlobal;
39+
private ProcessDisplayHelper ph;
40+
41+
private final SummaryStatistics docStats = new SummaryStatistics();
42+
private final SummaryStatistics queueStats = new SummaryStatistics();
43+
private final SummaryStatistics esBatchRequestStats = new SummaryStatistics();
44+
45+
private boolean gatherStats = VariantConfigHelper.isGatherStats();
46+
47+
private long totalBytes;
48+
private long totalRetries;
49+
private long totalFailedDocs;
50+
51+
public RoutedBulkIndexer(
52+
LinkedBlockingDeque<List<byte[]>> jsonQueue,
53+
String indexName,
54+
int shardCount,
55+
int maxRetries,
56+
String label,
57+
ProcessDisplayHelper phGlobal
58+
) {
59+
this.jsonQueue = jsonQueue;
60+
this.indexName = indexName;
61+
this.shardCount = shardCount;
62+
this.maxBulkSizeBytes = ConfigHelper.getEsBulkSizeMB() * 1024 * 1024; //10MB * the multiplier
63+
this.maxRetries = maxRetries;
64+
this.retryBaseMs = 1000;
65+
this.label = label;
66+
this.phGlobal = phGlobal;
67+
}
68+
69+
@Override
70+
public void run() {
71+
boolean indexing = VariantConfigHelper.isIndexing();
72+
if (indexing) {
73+
client = EsClientFactory.getMustCloseSearchClient();
74+
}
75+
ph = new ProcessDisplayHelper(VariantConfigHelper.getDisplayInterval());
76+
if (gatherStats) {
77+
ph.startProcess(label);
78+
}
79+
80+
try {
81+
List<byte[]> pendingDocs = new ArrayList<>();
82+
long pendingBytes = 0;
83+
84+
while (!Thread.currentThread().isInterrupted()) {
85+
try {
86+
List<byte[]> docs = jsonQueue.poll(1, TimeUnit.SECONDS);
87+
if (docs == null) {
88+
continue;
89+
}
90+
if (gatherStats) {
91+
queueStats.addValue(docs.size());
92+
}
93+
94+
for (byte[] smileDoc : docs) {
95+
int docBytes = smileDoc.length;
96+
97+
if (gatherStats) {
98+
docStats.addValue(docBytes);
99+
}
100+
101+
if (pendingBytes + docBytes > maxBulkSizeBytes && !pendingDocs.isEmpty()) {
102+
submitBatch(pendingDocs);
103+
pendingDocs = new ArrayList<>();
104+
pendingBytes = 0;
105+
}
106+
107+
pendingDocs.add(smileDoc);
108+
pendingBytes += docBytes;
109+
if (gatherStats) {
110+
totalBytes += docBytes;
111+
ph.progressProcess(
112+
"qs: (" + jsonQueue.size()
113+
+ ") q: (" + queueStats.getN() + "/" + (int) queueStats.getMean()
114+
+ ") d: (" + docStats.getN() + "/" + (int) docStats.getMean()
115+
+ ") es: (" + esBatchRequestStats.getN() + "/" + (int) esBatchRequestStats.getMean()
116+
+ ") B/r/f: (" + totalBytes + "/" + totalRetries + "/" + totalFailedDocs + ")"
117+
);
118+
}
119+
phGlobal.progressProcess();
120+
}
121+
} catch (InterruptedException e) {
122+
Thread.currentThread().interrupt();
123+
}
124+
}
125+
126+
// Clear interrupt flag so the final bulk request can complete
127+
Thread.interrupted();
128+
129+
// Flush remaining
130+
if (!pendingDocs.isEmpty()) {
131+
submitBatch(pendingDocs);
132+
}
133+
134+
} finally {
135+
if (gatherStats) {
136+
ph.finishProcess();
137+
logStats();
138+
}
139+
if (client != null) {
140+
try {
141+
client.close();
142+
} catch (IOException e) {
143+
log.error(label + " Error closing ES client", e);
144+
}
145+
}
146+
}
147+
}
148+
149+
private void submitBatch(List<byte[]> docs) {
150+
if (gatherStats) {
151+
esBatchRequestStats.addValue(docs.size());
152+
}
153+
String routing = Integer.toString(ThreadLocalRandom.current().nextInt(shardCount));
154+
submitWithRetry(docs, routing, 0);
155+
}
156+
157+
private void submitWithRetry(List<byte[]> docs, String routing, int attempt) {
158+
if (!VariantConfigHelper.isIndexing()) {
159+
return;
160+
}
161+
BulkRequest bulkRequest = new BulkRequest().timeout(TimeValue.timeValueHours(1));
162+
for (byte[] smileDoc : docs) {
163+
bulkRequest.add(new IndexRequest(indexName).source(smileDoc, XContentType.SMILE).routing(routing));
164+
}
165+
166+
try {
167+
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
168+
169+
if (response.hasFailures()) {
170+
List<byte[]> failedDocs = new ArrayList<>();
171+
for (BulkItemResponse item : response) {
172+
if (item.isFailed()) {
173+
failedDocs.add(docs.get(item.getItemId()));
174+
}
175+
}
176+
177+
if (!failedDocs.isEmpty() && attempt < maxRetries) {
178+
totalRetries++;
179+
long sleepMs = retryBaseMs * (1L << Math.min(attempt, 10));
180+
log.warn(label + " Retrying " + failedDocs.size() + " failed items (attempt " + (attempt + 1) + "), sleeping " + sleepMs + "ms");
181+
try {
182+
Thread.sleep(sleepMs);
183+
} catch (InterruptedException e) {
184+
Thread.currentThread().interrupt();
185+
return;
186+
}
187+
submitWithRetry(failedDocs, routing, attempt + 1);
188+
} else if (!failedDocs.isEmpty()) {
189+
totalFailedDocs += failedDocs.size();
190+
log.error(label + " Failed to index " + failedDocs.size() + " documents after " + maxRetries + " retries");
191+
log.error(label + " First failure: " + response.getItems()[0].getFailureMessage());
192+
}
193+
}
194+
195+
} catch (IOException e) {
196+
if (attempt < maxRetries) {
197+
totalRetries++;
198+
long sleepMs = retryBaseMs * (1L << Math.min(attempt, 10));
199+
log.warn(label + " Bulk request failed: " + e.getMessage() + ", retrying (attempt " + (attempt + 1) + "), sleeping " + sleepMs + "ms");
200+
try {
201+
Thread.sleep(sleepMs);
202+
} catch (InterruptedException ie) {
203+
Thread.currentThread().interrupt();
204+
return;
205+
}
206+
submitWithRetry(docs, routing, attempt + 1);
207+
} else {
208+
log.error(label + " Bulk request failed after " + maxRetries + " retries: " + e.getMessage());
209+
System.exit(-1);
210+
}
211+
}
212+
}
213+
214+
private void logStats() {
215+
log.info(label + " Doc Stats: " + docStats);
216+
log.info(label + " Queue Stats: " + queueStats);
217+
log.info(label + " ES Batch Stats: " + esBatchRequestStats);
218+
log.info(label + " Total Bytes: " + totalBytes + " Retries: " + totalRetries + " Failed: " + totalFailedDocs);
219+
}
220+
}

0 commit comments

Comments
 (0)