Skip to content

Conversation

Copy link

Copilot AI commented Oct 10, 2025

Problem

When executing IndexCount operations with limits (including IndexExists which translates to IndexCount with choosenLimit=1), multiple slaves count in parallel across index partitions. If one slave reaches the limit quickly, the other slaves continue searching unnecessarily. This is particularly wasteful when:

  • There are filters that require iterating through keys
  • The index is large
  • Multiple slaves are involved

For example, with 10 slaves searching a large filtered index where slave 2 reaches the limit in 1 second, the other 9 slaves may continue spinning for many seconds checking their partitions before the operation completes.

Solution

Implemented an early termination mechanism using master-slave signaling that applies to any IndexCount operation with a choosenLimit:

  1. Slave counts locally - Each slave counts matches in its partitions
  2. Slave sends partial count - Sends count to master when complete or limit reached
  3. Master detects limit exceeded - When aggregate count exceeds choosenLimit, master broadcasts stop signal
  4. Slaves listen asynchronously - Dedicated thread listens for stop signals without blocking counting loop
  5. Early termination - Slaves check atomic flag and exit counting loop when stop signal received
  6. Aggregation completes - All slaves send partial counts; master aggregates and returns result

Implementation Details

Master Side (CIndexCountActivityMaster)

  • Added stopTag (mptag_t) for stop signal communication
  • Overrides aggregateToLimit() to detect when limit exceeded:
    • Receives partial counts from all slaves sequentially
    • When total > choosenLimit, broadcasts stop signal to all slaves
    • Continues receiving from remaining slaves
    • Applies to any operation with choosenLimit (not just IndexExists)
  • Properly serializes stopTag to slaves

Slave Side (CIndexCountSlaveActivity)

  • CStopHandler thread: Dedicated asynchronous listener for stop signals
    • Runs in separate thread, blocking recv without impacting counting loop
    • Sets atomic stopped flag when stop signal received
    • Properly managed lifecycle (start/stop/join)
  • checkStopped(): Lightweight atomic flag check (no blocking operations)
  • Integrated stop checks in counting loops:
    • After each key when filter is present (most critical path)
    • After each part in the part iteration loop

Key Design Decisions

  • Asynchronous thread: Dedicated CStopHandler thread eliminates polling overhead
  • Atomic flag: Thread-safe communication between handler thread and counting loop
  • Broadly applicable: Works whenever choosenLimit is set (CHOOSEN, IndexExists, etc.)
  • Efficient: Stop check is simple atomic load, no blocking operations in counting loop
  • Proper lifecycle: Thread started in start(), stopped in stop()/abort()

Benefits

  • Reduced wasted computation: Slaves stop searching once aggregate limit exceeded
  • Better cluster utilization: Resources freed up sooner for other work
  • Scalability improvement: Benefit increases with number of slaves
  • Broadly applicable: Works for IndexExists, CHOOSEN, and any operation with choosenLimit
  • Efficient: No polling overhead, dedicated async thread handles signaling
  • Backward compatible: Only activates when choosenLimit is set
  • Safe: Follows existing patterns (similar to CMessageHandler), proper abort handling

Example Scenarios

IndexExists (choosenLimit=1)

// Filtered index with 10 slaves
fi := myIndex(KEYED(surname='Smith') AND age > 30);

// Before optimization:
// - All 10 slaves search their partitions completely
// - Total time = max(slave search times)

// After optimization:  
// - Slave 2 finds match in 1 second, sends count=1
// - Master detects total > 0, broadcasts stop to all slaves
// - Other 9 slaves exit their loops early
// - Total time ≈ 1 second + network latency

OUTPUT(EXISTS(fi));  // Returns TRUE faster

CHOOSEN with limit

// Large index with choosenLimit=100
result := CHOOSEN(myIndex(filter), 100);

// If first few slaves quickly find 100+ matches:
// - Master detects total > 100, signals stop
// - Remaining slaves exit early
// - Faster execution, same correct result

OUTPUT(COUNT(result));

Testing

Existing regression tests exercise this code path:

  • aggidx1.ecl: Contains EXISTS(sq.SimplePersonBookIndex)
  • indexlimit3.ecl: Contains EXISTS(fi) with filtered index and CHOOSEN operations

Full integration testing requires HPCC cluster environment to observe the early termination behavior across multiple slaves.

Original prompt

Look at IndexCount code in thorlcr/activities/indexread/thindexread.cpp and thorlcr/activities/indexread/thindexreadslave.cpp. In particular look at how aggregateToLimit works.
The slave code in CIndexCountSlaveActivity::CATCH_NEXTROW(). It calculate the local count (totalCount), either via getLocalCount or more iteratively if there's a filter.

IndexCount is generated for IndexExists where the requirement is that there is >0 results, i.e. it does not need to keep counting. At a local level (slave) it does this by having a choosenLimit imposed (1).

However, if there are multiple slaves, they will all be getting a count in parallel, if 1 of them finds a >0 count, it is wasteful for the other to continue. This is particularly an issue if there's a filter and a large index is involved, it means if 1 slave finds a >0 result, but the others have not stop yet (not found a match yet), they will keep spinning and looking.

What should happen, when a slave has gathered a local count, it sends to the master (via sendPartialCount), if it's an indexexists variant, what we want to happen, is to tell all the other slaves that it is "done" (count>0), and for that to cause those other slaves to bail out of their local count calculation.

Implement this solution


💬 Share your feedback on Copilot coding agent for the chance to win a $200 gift card! Click here to start the survey.

Copilot AI and others added 2 commits October 10, 2025 13:34
- Modified CIndexCountActivityMaster to detect IndexExists case (choosenLimit == 1)
- Added aggregateToLimitIndexExists() that broadcasts stop signal when count > 0
- Added stopTag for master-slave communication
- Modified CIndexCountSlaveActivity to check for stop signal during counting
- Slaves can now terminate early when another slave finds a match

Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com>
- Changed serializeSlaveData to always append stopTag
- Changed slave init to always read stopTag
- This ensures consistent serialization/deserialization

Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com>
Copilot AI changed the title [WIP] Optimize IndexCount for parallel slave operations Optimize IndexExists to terminate slaves early when match found Oct 10, 2025
Copilot AI requested a review from jakesmith October 10, 2025 13:44
Copy link
Owner

@jakesmith jakesmith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot - see comments.

{
// Non-blocking check for stop signal
CMessageBuffer msg;
if (container.queryJobChannel().queryJobComm().recv(msg, 0, stopTag, nullptr, 0))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot - this approach won't work. Even if this was non-blocking (which it isn't), it is too inefficient for the caller of checkStopSignal() to do this each call.
There probably needs to be an asynchronous thread that is listening for stop signals, that flags 'stopped' so the count routines can bail out when it sees the state.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to use a dedicated async CStopHandler thread instead of polling. The thread blocks on recv() waiting for stop signals and sets an atomic flag that the counting loop checks. This eliminates the inefficient per-call recv() overhead. See commit for details.


rowcount_t total;
rowcount_t choosenLimit = helper->getChooseNLimit();
if (choosenLimit == 1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot - In fact, this should not be the basis of stopping early.
In all cases, if the limit is exceeded, the master should signal to the workers that they can stop to save them wasting their time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified to send stop signals whenever ANY choosenLimit is exceeded (not just choosenLimit==1). The master now checks total > choosenLimit in the overridden aggregateToLimit() method, making this optimization broadly applicable to CHOOSEN, IndexExists, and any other operation with a limit. See commit for details.

- Replaced polling approach with dedicated CStopHandler thread
- Thread listens asynchronously for stop signals from master
- Master now sends stop signals when ANY choosenLimit is exceeded (not just choosenLimit==1)
- Uses atomic flag for thread-safe communication between handler and counting loop
- Properly manages thread lifecycle in start/stop/abort methods

Addresses feedback on PR review comments

Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com>
Copilot AI changed the title Optimize IndexExists to terminate slaves early when match found Optimize IndexCount to terminate slaves early when limit exceeded Oct 10, 2025
Copilot AI requested a review from jakesmith October 10, 2025 17:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants