Skip to content

Commit

Permalink
Add mutex for t.stopped to prevent data races (#11933)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This addresses the data race present on the `t.stopped` variable in
`tail.go`.

```
==================
WARNING: DATA RACE
Write at 0x00c00098b198 by goroutine 568:
  github.com/grafana/loki/pkg/querier.(*Tailer).close()
      /Users/progers/dev/src/github.com/grafana/loki/pkg/querier/tail.go:272 +0x104
  github.com/grafana/loki/pkg/querier.TestTailer.func7.2()
      /Users/progers/dev/src/github.com/grafana/loki/pkg/querier/tail_test.go:169 +0x34
  runtime.deferreturn()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/runtime/panic.go:477 +0x34
  testing.tRunner()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/testing/testing.go:1595 +0x1b0
  testing.(*T).Run.func1()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/testing/testing.go:1648 +0x40

Previous read at 0x00c00098b198 by goroutine 569:
  github.com/grafana/loki/pkg/querier.(*Tailer).loop()
      /Users/progers/dev/src/github.com/grafana/loki/pkg/querier/tail.go:88 +0x13c
  github.com/grafana/loki/pkg/querier.newTailer.func1()
      /Users/progers/dev/src/github.com/grafana/loki/pkg/querier/tail.go:342 +0x34

Goroutine 568 (running) created at:
  testing.(*T).Run()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/testing/testing.go:1648 +0x5e8
  github.com/grafana/loki/pkg/querier.TestTailer()
      /Users/progers/dev/src/github.com/grafana/loki/pkg/querier/tail_test.go:158 +0x10dc
  testing.tRunner()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/testing/testing.go:1595 +0x1b0
  testing.(*T).Run.func1()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/testing/testing.go:1648 +0x40

Goroutine 569 (running) created at:
  github.com/grafana/loki/pkg/querier.newTailer()
      /Users/progers/dev/src/github.com/grafana/loki/pkg/querier/tail.go:342 +0x300
  github.com/grafana/loki/pkg/querier.TestTailer.func7()
      /Users/progers/dev/src/github.com/grafana/loki/pkg/querier/tail_test.go:168 +0x138
  testing.tRunner()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/testing/testing.go:1595 +0x1b0
  testing.(*T).Run.func1()
      /opt/homebrew/Cellar/go/1.21.6/libexec/src/testing/testing.go:1648 +0x40
==================
```


**Which issue(s) this PR fixes**:
Relates to: #8586

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)
  • Loading branch information
paul1r authored Feb 13, 2024
1 parent 1f9f9a6 commit b2e4cc3
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) {
}
level.Error(logger).Log("msg", "Error from client", "err", err)
break
} else if tailer.stopped {
} else if tailer.stopped.Load() {
return
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/querier/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"go.uber.org/atomic"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand Down Expand Up @@ -51,7 +53,7 @@ type Tailer struct {
querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters
querierTailClientsMtx sync.RWMutex

stopped bool
stopped atomic.Bool
delayFor time.Duration
responseChan chan *loghttp.TailResponse
closeErrChan chan error
Expand Down Expand Up @@ -85,7 +87,8 @@ func (t *Tailer) loop() {

droppedEntries := make([]loghttp.DroppedEntry, 0)

for !t.stopped {
stopped := t.stopped.Load()
for !stopped {
select {
case <-checkConnectionTicker.C:
// Try to reconnect dropped ingesters and connect to new ingesters
Expand Down Expand Up @@ -214,7 +217,8 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_

logger := util_log.WithContext(querierTailClient.Context(), t.logger)
for {
if t.stopped {
stopped := t.stopped.Load()
if stopped {
if err := querierTailClient.CloseSend(); err != nil {
level.Error(logger).Log("msg", "Error closing grpc tail client", "err", err)
}
Expand All @@ -223,7 +227,7 @@ func (t *Tailer) readTailClient(addr string, querierTailClient logproto.Querier_
resp, err = querierTailClient.Recv()
if err != nil {
// We don't want to log error when its due to stopping the tail request
if !t.stopped {
if !stopped {
level.Error(logger).Log("msg", "Error receiving response from grpc tail client", "err", err)
}
break
Expand Down Expand Up @@ -269,7 +273,8 @@ func (t *Tailer) close() error {
t.metrics.tailsActive.Dec()
t.metrics.tailedStreamsActive.Sub(t.activeStreamCount())

t.stopped = true
t.stopped.Store(true)

return t.openStreamIterator.Close()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/tail_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, er
timeoutTicker := time.NewTicker(timeout)
defer timeoutTicker.Stop()

for !tailer.stopped && entriesCount < maxEntries {
for !tailer.stopped.Load() && entriesCount < maxEntries {
select {
case <-timeoutTicker.C:
return nil, errors.New("timeout expired while reading responses from Tailer")
Expand Down

0 comments on commit b2e4cc3

Please sign in to comment.