-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Resolve some data races in chunk accesses #15080
base: main
Are you sure you want to change the base?
Conversation
@@ -239,8 +239,8 @@ func (s *streamIterator) Next() bool { | |||
// remove the first stream | |||
s.instances[0].streams = s.instances[0].streams[1:] | |||
|
|||
stream.chunkMtx.RLock() | |||
defer stream.chunkMtx.RUnlock() | |||
stream.chunkMtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toWireChunk writes to c.blocks[*].offset
on pkg/chunkenc/memchunk.go:674
Races with chunk.Bounds(), which doesn't strictly read this value, but does iterate over c.blocks
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I briefly looked at memchunk.go to see if we could avoid the write to offset, but its not easy to do. Offset is expected to be set after encoding a chunk so all the tests fail if you keep track of offsets separately.
@@ -441,9 +441,11 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP | |||
) | |||
|
|||
// encodeChunk mutates the chunk so we must pass by reference | |||
chunkMtx.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as toWireChunks. encodeChunk ultimately writes to c.blocks[*].offset
in pkg/chunkenc/memchunk.go:674
@@ -737,8 +737,9 @@ func (i *instance) getStats(ctx context.Context, req *logproto.IndexStatsRequest | |||
|
|||
if err = i.forMatchingStreams(ctx, from, matchers, nil, func(s *stream) error { | |||
// Consider streams which overlap our time range | |||
s.chunkMtx.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We read s.chunk.Bounds() in shouldConsiderStreams
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but Bounds()
already acquires the RLock here. do we still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good spot - we probably don't need this change then. Grabbing the write lock when encoding should be enough since that is what causes the conflict.
@@ -441,9 +441,11 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP | |||
) | |||
|
|||
// encodeChunk mutates the chunk so we must pass by reference | |||
chunkMtx.Lock() | |||
if err := i.encodeChunk(ctx, &ch, c); err != nil { | |||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should you unlock here too ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, on line 448 below. I can wrap this in a func if you prefer to use defer
What this PR does / why we need it:
-race
.