-
Notifications
You must be signed in to change notification settings - Fork 543
tbs: Fix potential data race #19948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
tbs: Fix potential data race #19948
Conversation
🤖 GitHub commentsJust comment with:
|
|
This pull request does not have a backport label. Could you fix it @ericywl? 🙏
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. The setup looks about right, but the specific case that needs to be tested is not what I'm thinking about.
In this PR, there are txn1 and txn2 both part of trace1, which don't make sense to me. 1 trace should only have 1 root transaction. In your current setup, what should happen to txn2 is undefined.
To be clearer, what I'd like to test is similar: txn1 is root txn of trace1, and txn2 is a child of txn1.
At time t1: apm server receives txn1
t2: background sampling goroutine: apm server makes sampling decision for txn1
t2': apm server receives txn2
t3: background sampling goroutine: marks trace1 as sampled
^ the above is a race, because apm server receives txn2 between t2 and t3, and the result is txn2 is lost forever. If it happens either before t2 or after t3, txn2 is exported correctly.
It gets a bit theoretical but I believe it is possible. Lmk if you have any questions.
1e2035e to
8f27d8c
Compare
675d174 to
a96c059
Compare
carsonip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: should the test pass or fail here? I assume when the test passes, it means a race happened, right? If so, I think the test is correctly validating the race in its current state
carsonip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, the approach looks good now
carsonip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the benchmarks do you mind also running at a higher GOMAXPROCS? e.g. -cpu=14,100 and see if it makes any difference?
carsonip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm terribly sorry - Yes the PR in its existing state will address new race conditions introduced in 9.0 due to the lack of db transactions, but CMIIW theoretically there's another type of race that is inherent in the RW design, which is also in 8.x, where:
- goroutine A ProcessBatch: IsTraceSampled(traceID) returns ErrNotFound
- background goroutine B responsible for publishing: WriteTraceSampled(traceID, true)
- background goroutine B responsible for publishing: ReadTraceEvents(traceID)
- goroutine A ProcessBatch: WriteTraceEvent(traceID, event1)
In this case, event1 will be written to DB and dropped silently.
Maybe we'll have to zoom out and rethink this. Either:
- we'll give up addressing flush time races; or
- introduce processor level locking; or
- implement some less expensive handling that will help us get back events fallen victim to this race; or
- merge this PR as is to resolve this kind of race as an improvement, but create a follow up issue to address this newly identified kind of race. I don't want us to merge this PR with the impression that we've already fixed publish time race.
thoughts?
|
In that case, my previous solution (on top of ShardLockRW) should be able to catch this. The publishing will be deferred to another goroutine that waits until |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, my previous solution (on top of ShardLockRW) should be able to catch this. The publishing will be deferred to another goroutine that waits until event1 is written.
I studied f187e90. My understanding is that it increments and decrements a per-trace-id counter before and after WriteTraceEvent in the ingest goroutine. And in the background publishing goroutine, between WriteTraceSampled and ReadTraceEvents it checks if the counter is 0. If it isn't 0, retry later.
The issue with this is it doesn't prevent the following sequence of events:
- t1: ingest goroutine A IsTraceSampled
- t2: background goroutine B WriteTraceSampled
- t3: background goroutine B performs counter==0 check
- t4: background goroutine B ReadTraceEvents
- t4': ingest goroutine A +1 counter, WriteTraceEvent, -1 counter
Race happens and there is data loss when t4 < t4'. Therefore, in terms of correctness, f187e90 isn't race proof.
(On a side note, if the +1 counter happens before IsTraceSampled instead of WriteTraceEvent, I think it might be correct. But even in that case I'd prefer a simpler design with performance and memory implications that are easier to reason about.)
I have some ideas on how to fix it and let's take it offline. In any case we might want to have a more generic test (in addition to / replacing the existing one) that sends a lot of events at around sampling decision time. It may not be deterministic but will give us confidence on whether this class of race conditions is eliminated, without specifying the exact sequence.
carsonip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! code looks good, but i have a comment about reducing the write lock time without losing correctness. Other than that, mostly nits on maintainability.
Also, I've benchmarked the PR with a modified test,
func BenchmarkProcess(b *testing.B) {
b.SetParallelism(10)
cfg := newTempdirConfigLogger(b, logp.NewNopLogger()).Config
cfg.FlushInterval = 1 * time.Second
cfg.Policies[0].SampleRate = 1
processor, err := sampling.NewProcessor(cfg, logp.NewNopLogger())
require.NoError(b, err)
go processor.Run()
b.Cleanup(func() { processor.Stop(context.Background()) })
b.RunParallel(func(pb *testing.PB) {
var seed int64
err := binary.Read(cryptorand.Reader, binary.LittleEndian, &seed)
assert.NoError(b, err)
rng := rand.New(rand.NewSource(seed))
var traceID [16]byte
for pb.Next() {
binary.LittleEndian.PutUint64(traceID[:8], rng.Uint64())
binary.LittleEndian.PutUint64(traceID[8:], rng.Uint64())
transactionID := traceID[:8]
spanID := traceID[8:]
trace := modelpb.Trace{Id: hex.EncodeToString(traceID[:])}
transaction := &modelpb.Transaction{
Id: hex.EncodeToString(transactionID),
}
span := &modelpb.Span{
Id: hex.EncodeToString(spanID),
}
batch := modelpb.Batch{
{Trace: &trace, Transaction: transaction},
{Trace: &trace, Span: span, ParentId: transaction.Id},
//{Trace: &trace, Span: span, ParentId: transaction.Id},
//{Trace: &trace, Span: span, ParentId: transaction.Id},
}
if err := processor.ProcessBatch(context.Background(), &batch); err != nil {
b.Fatal(err)
}
}
})
}
goos: linux
goarch: amd64
pkg: github.com/elastic/apm-server/x-pack/apm-server/sampling
cpu: AMD Ryzen 7 PRO 8840HS w/ Radeon 780M Graphics
│ main-100-2.bench │ tbs-potential-data-race-100.bench │
│ sec/op │ sec/op vs base │
Process-10 191.7n ± 11% 185.7n ± 16% ~ (p=0.310 n=6)
and I tend to believe there is no perf regression, at least for a fast NVMe SSD. Therefore I think PR is good to merge after polishing and updating the subject and description.
| return m.next.DeleteTraceEvent(traceID, id) | ||
| } | ||
|
|
||
| func TestPotentialRaceCondition(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
q: what are your thoughts on this? should we keep this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove it since the concurrent one catches it too.
| } | ||
|
|
||
| func TestPotentialRaceConditionConcurrent(t *testing.T) { | ||
| flushInterval := 5 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be way shorter, in ms or max 1s, because this affects total test time
| reportedMu.Lock() | ||
| defer reportedMu.Unlock() | ||
| reportedPlusLateArrivals := int64(len(reported)) + lateArrivals.Load() | ||
| assert.Equal(t, reportedPlusLateArrivals, processed.Load()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| assert.Equal(t, reportedPlusLateArrivals, processed.Load()) | |
| assert.Equal(t, processed.Load(), reportedPlusLateArrivals) |
nit
| var reportedMu sync.Mutex | ||
| reported := map[string]struct{}{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: simplify to atomic int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's to deduplicate the transaction IDs.
| first := true | ||
| index := i * 100000000 | ||
|
|
||
| timer := time.NewTimer(flushInterval + 2*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| timer := time.NewTimer(flushInterval + 2*time.Second) | |
| timer := time.NewTimer(flushInterval * 2) |
| for i := 0; i < numShards; i++ { | ||
| locks[i] = sync.RWMutex{} | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| for i := 0; i < numShards; i++ { | |
| locks[i] = sync.RWMutex{} | |
| } |
nit: zero value is ready to use
| events = events[:0] | ||
| if err := p.eventStore.ReadTraceEvents(traceID, &events); err != nil { | ||
| err = p.eventStore.ReadTraceEvents(traceID, &events) | ||
| p.shardLock.Unlock(traceID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CMIIW but I believe it is sufficient to just lock WriteTraceSampled without locking ReadTraceEvents, given we know if IsTraceSampled returns true, we shortcircuit and write to batch processor immediately. Holding the write lock for as short as possible would be good for perf.
| } | ||
| } | ||
|
|
||
| p.shardLock.Lock(traceID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you mind adding a comment about why we lock, and what scenario we are trying to prevent?
| } | ||
|
|
||
| batch := modelpb.Batch{{ | ||
| Trace: &modelpb.Trace{Id: fmt.Sprintf("trace%d", i)}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it'll be way easier to hit the race if different goroutines all share the same trace id, likely before the loop (remember to increment the processed count).
💚 Build Succeeded
History
cc @ericywl |
carsonip
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm thanks
|
@Mergifyio backport 9.1 9.2 9.3 |
✅ Backports have been createdDetails
|
* Add test confirming the potential data race * Remove unnecessary sleeps * Add assertion for transaction ids at the end * Add parent id to transaction2 * Update potential race condition test * Try fixing race condition * Fix bug where multiple ongoing trasactions can race to delete first * Add ShardLockReadWriter * Panic if numShards <= 0 * Remove unnecessary code * Use RWMutex * Make fmt * Add shard lock on processor level instead * Make fmt update * Revert "Make fmt update" This reverts commit b788c3f. * Update based on review (cherry picked from commit 67a5a2b)
* Add test confirming the potential data race * Remove unnecessary sleeps * Add assertion for transaction ids at the end * Add parent id to transaction2 * Update potential race condition test * Try fixing race condition * Fix bug where multiple ongoing trasactions can race to delete first * Add ShardLockReadWriter * Panic if numShards <= 0 * Remove unnecessary code * Use RWMutex * Make fmt * Add shard lock on processor level instead * Make fmt update * Revert "Make fmt update" This reverts commit b788c3f. * Update based on review (cherry picked from commit 67a5a2b) # Conflicts: # x-pack/apm-server/sampling/processor.go # x-pack/apm-server/sampling/processor_test.go
* Add test confirming the potential data race * Remove unnecessary sleeps * Add assertion for transaction ids at the end * Add parent id to transaction2 * Update potential race condition test * Try fixing race condition * Fix bug where multiple ongoing trasactions can race to delete first * Add ShardLockReadWriter * Panic if numShards <= 0 * Remove unnecessary code * Use RWMutex * Make fmt * Add shard lock on processor level instead * Make fmt update * Revert "Make fmt update" This reverts commit b788c3f. * Update based on review (cherry picked from commit 67a5a2b)
Summary
Fix potential data race between
WriteTraceEventinProcessBatchandReadTraceEventin the sampling goroutine. Closes #17772.Performance
Baseline
Single Mutex
ShardLockReadWriter
ShardLockReadWriter with RWMutex