Skip to content

refactor: shared bulk processor pool sized by cluster CPU count#1555

Merged
oblodgett merged 2 commits intostagefrom
variant_indexer_improvements
Apr 7, 2026
Merged

refactor: shared bulk processor pool sized by cluster CPU count#1555
oblodgett merged 2 commits intostagefrom
variant_indexer_improvements

Conversation

@oblodgett
Copy link
Copy Markdown
Member

Move RoutedBulkIndexer pool from per-species to a shared pool managed by SourceDocumentCreationManager. Pool size is dynamically calculated from the ES cluster's total available CPUs * 2, queried via the new _nodes/os endpoint on ElasticSearchInterface.

JSONProducer work buckets now use byte-size limit (1MB default) instead of document count, configurable via VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE.

Shared jsonQueue capacity scales with active species count (250 * species).

Move ElasticSearchInterface from agr_indexer to agr_java_core for reuse across modules, add getNodesOs() method for cluster info queries.

Move RoutedBulkIndexer pool from per-species to a shared pool managed
by SourceDocumentCreationManager. Pool size is dynamically calculated
from the ES cluster's total available CPUs * 2, queried via the new
_nodes/os endpoint on ElasticSearchInterface.

JSONProducer work buckets now use byte-size limit (1MB default) instead
of document count, configurable via VARIANT_SOURCE_DOCUMENT_CREATOR_JSON_QUEUE_BUCKET_SIZE.

Shared jsonQueue capacity scales with active species count (250 * species).

Move ElasticSearchInterface from agr_indexer to agr_java_core for reuse
across modules, add getNodesOs() method for cluster info queries.
@oblodgett oblodgett requested a review from a team as a code owner April 6, 2026 21:10
@claude
Copy link
Copy Markdown

claude bot commented Apr 6, 2026

Code Review

This is a clean refactor that moves the bulk indexer pool from per-species to shared management, with dynamic sizing from ES cluster CPU count. The changes look correct.

One minor observation (non-blocking):

In SourceDocumentCreationManager.java, the jsonQueue drain loop (while (!jsonQueue.isEmpty()) { Thread.sleep(1000); }) followed by interrupt()+join() relies on RoutedBulkIndexer completing its current in-flight batch after being interrupted. This is the same pattern that existed before (just hoisted up a level), so it's not a regression — but worth being aware that correctness depends on RoutedBulkIndexer.run() finishing its current bulk request before honoring the interrupt.

Everything else looks good:

  • JSONProducer byte-size bucketing with proper flush on interrupt is correct
  • getClusterCpuCount() has appropriate try-catch with sensible fallback
  • Thread-confined workBucket/workBucketBytes locals avoid concurrency issues
  • ElasticSearchInterface move to agr_java_core for cross-module reuse makes sense
  • Import in LiteratureIndexer updated to match the new package location

@oblodgett oblodgett enabled auto-merge April 6, 2026 21:13
@oblodgett oblodgett disabled auto-merge April 7, 2026 01:58
@oblodgett oblodgett merged commit 0091518 into stage Apr 7, 2026
5 checks passed
@oblodgett oblodgett deleted the variant_indexer_improvements branch April 7, 2026 01:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant