From 2a5b6cbcc12b0cfd0414c1a3853079947e2d54d9 Mon Sep 17 00:00:00 2001 From: Tiger Wang Date: Tue, 17 Jan 2023 12:33:02 -0500 Subject: [PATCH] fix a race condition (#25) --- .github/workflows/codecov.yml | 24 +++++++++++++ .goreleaser.debug.yaml | 2 +- .goreleaser.yaml | 2 +- README.md | 2 +- go.mod | 2 +- go.sum | 4 +-- service/event_service_websocket.go | 55 ++++++++++++++++++------------ 7 files changed, 63 insertions(+), 28 deletions(-) create mode 100644 .github/workflows/codecov.yml diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml new file mode 100644 index 0000000..581786b --- /dev/null +++ b/.github/workflows/codecov.yml @@ -0,0 +1,24 @@ +name: Collect Code Coverage + +on: + push: + branches: + - main + pull_request: + branches: + - main + +jobs: + build: + runs-on: ubuntu-22.04 + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + fetch-depth: 0 + - name: Go Generate + run: go generate + - name: Run coverage + run: go test -race -failfast -coverprofile=coverage.txt -covermode=atomic -v ./... + - name: Upload coverage to Codecov + uses: codecov/codecov-action@v3 diff --git a/.goreleaser.debug.yaml b/.goreleaser.debug.yaml index 2ec9b6a..bc1ddfa 100644 --- a/.goreleaser.debug.yaml +++ b/.goreleaser.debug.yaml @@ -6,7 +6,7 @@ before: - go generate - go run github.com/google/go-licenses@latest check . --disallowed_types=restricted - go mod tidy - - go test -v ./... + - go test -race -v ./... builds: - id: casaos-message-bus-amd64 binary: build/sysroot/usr/bin/casaos-message-bus diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 1d1e069..78cceab 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -6,7 +6,7 @@ before: - go generate - go run github.com/google/go-licenses@latest check . --disallowed_types=restricted - go mod tidy - - go test -v ./... + - go test -race -v ./... builds: - id: casaos-message-bus-amd64 binary: build/sysroot/usr/bin/casaos-message-bus diff --git a/README.md b/README.md index 15b45aa..3a12da0 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # CasaOS-MessageBus -[![Go Reference](https://pkg.go.dev/badge/github.com/IceWhaleTech/CasaOS-MessageBus.svg)](https://pkg.go.dev/github.com/IceWhaleTech/CasaOS-MessageBus) [![Go Report Card](https://goreportcard.com/badge/github.com/IceWhaleTech/CasaOS-MessageBus)](https://goreportcard.com/report/github.com/IceWhaleTech/CasaOS-MessageBus) [![goreleaser](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml/badge.svg)](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml) +[![Go Reference](https://pkg.go.dev/badge/github.com/IceWhaleTech/CasaOS-MessageBus.svg)](https://pkg.go.dev/github.com/IceWhaleTech/CasaOS-MessageBus) [![Go Report Card](https://goreportcard.com/badge/github.com/IceWhaleTech/CasaOS-MessageBus)](https://goreportcard.com/report/github.com/IceWhaleTech/CasaOS-MessageBus) [![goreleaser](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml/badge.svg)](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml) [![codecov](https://codecov.io/gh/IceWhaleTech/CasaOS-MessageBus/branch/main/graph/badge.svg?token=U4S4ZSZAL9)](https://codecov.io/gh/IceWhaleTech/CasaOS-MessageBus) Message bus accepts events and actions from various sources and delivers them to subscribers. diff --git a/go.mod b/go.mod index 93cc5bd..246769a 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( modernc.org/libc v1.22.2 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.5.0 // indirect - modernc.org/sqlite v1.20.1 // indirect + modernc.org/sqlite v1.20.2 // indirect ) require ( diff --git a/go.sum b/go.sum index cc811d9..398933a 100644 --- a/go.sum +++ b/go.sum @@ -300,8 +300,8 @@ modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU= modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0= modernc.org/sqlite v1.20.0/go.mod h1:EsYz8rfOvLCiYTy5ZFsOYzoCcRMu98YYkwAcCw5YIYw= -modernc.org/sqlite v1.20.1 h1:z6qRLw72B0VfRrJjs3l6hWkzYDx1bo0WGVrBGP4ohhM= -modernc.org/sqlite v1.20.1/go.mod h1:fODt+bFmc/j8LcoCbMSkAuKuGmhxjG45KGc25N2705M= +modernc.org/sqlite v1.20.2 h1:9AaVzJH1Yf0u9iOZRjjuvqxLoGqybqVFbAUC5rvi9u8= +modernc.org/sqlite v1.20.2/go.mod h1:zKcGyrICaxNTMEHSr1HQ2GUraP0j+845GYw37+EyT6A= modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw= modernc.org/tcl v1.15.0/go.mod h1:xRoGotBZ6dU+Zo2tca+2EqVEeMmOUBzHnhIwq4YrVnE= modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/service/event_service_websocket.go b/service/event_service_websocket.go index 1eb7018..34518e2 100644 --- a/service/event_service_websocket.go +++ b/service/event_service_websocket.go @@ -15,14 +15,15 @@ import ( type EventServiceWS struct { typeService *EventTypeService - ctx *context.Context - mutex sync.Mutex - stop chan struct{} + ctx *context.Context + stop chan struct{} inboundChannel chan model.Event subscriberChannels map[string]map[string][]chan model.Event } +var mutex = &sync.Mutex{} + func (s *EventServiceWS) Publish(event model.Event) { if s.inboundChannel == nil { logger.Error("error when publishing event via websocket", zap.Error(ErrInboundChannelNotFound)) @@ -70,22 +71,28 @@ func (s *EventServiceWS) Subscribe(sourceID string, names []string) (chan model. } } - if s.subscriberChannels == nil { - s.subscriberChannels = make(map[string]map[string][]chan model.Event) - } + c := func() chan model.Event { + mutex.Lock() + defer mutex.Unlock() - if s.subscriberChannels[sourceID] == nil { - s.subscriberChannels[sourceID] = make(map[string][]chan model.Event) - } + if s.subscriberChannels == nil { + s.subscriberChannels = make(map[string]map[string][]chan model.Event) + } + + if s.subscriberChannels[sourceID] == nil { + s.subscriberChannels[sourceID] = make(map[string][]chan model.Event) + } - c := make(chan model.Event, 1) + c := make(chan model.Event, 1) - for _, name := range names { - if s.subscriberChannels[sourceID][name] == nil { - s.subscriberChannels[sourceID][name] = make([]chan model.Event, 0) + for _, name := range names { + if s.subscriberChannels[sourceID][name] == nil { + s.subscriberChannels[sourceID][name] = make([]chan model.Event, 0) + } + s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c) } - s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c) - } + return c + }() return c, nil } @@ -104,8 +111,8 @@ func (s *EventServiceWS) Unsubscribe(sourceID string, name string, c chan model. } for i, subscriber := range s.subscriberChannels[sourceID][name] { - s.mutex.Lock() - defer s.mutex.Unlock() + mutex.Lock() + defer mutex.Unlock() if subscriber == c { logger.Info("unsubscribing from event type", zap.String("sourceID", sourceID), zap.String("name", name), zap.Int("subscriber", i)) @@ -122,12 +129,16 @@ func (s *EventServiceWS) Unsubscribe(sourceID string, name string, c chan model. } func (s *EventServiceWS) Start(ctx *context.Context) { - s.ctx = ctx - s.mutex = sync.Mutex{} + func() { + mutex.Lock() + defer mutex.Unlock() + + s.ctx = ctx - s.inboundChannel = make(chan model.Event) - s.subscriberChannels = make(map[string]map[string][]chan model.Event) - s.stop = make(chan struct{}) + s.inboundChannel = make(chan model.Event) + s.subscriberChannels = make(map[string]map[string][]chan model.Event) + s.stop = make(chan struct{}) + }() defer func() { if s.subscriberChannels != nil {