Skip to content

Commit

Permalink
fix a race condition (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
tigerinus authored Jan 17, 2023
1 parent e1c1b78 commit 2a5b6cb
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 28 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: Collect Code Coverage

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:
build:
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Go Generate
run: go generate
- name: Run coverage
run: go test -race -failfast -coverprofile=coverage.txt -covermode=atomic -v ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
2 changes: 1 addition & 1 deletion .goreleaser.debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ before:
- go generate
- go run github.com/google/go-licenses@latest check . --disallowed_types=restricted
- go mod tidy
- go test -v ./...
- go test -race -v ./...
builds:
- id: casaos-message-bus-amd64
binary: build/sysroot/usr/bin/casaos-message-bus
Expand Down
2 changes: 1 addition & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ before:
- go generate
- go run github.com/google/go-licenses@latest check . --disallowed_types=restricted
- go mod tidy
- go test -v ./...
- go test -race -v ./...
builds:
- id: casaos-message-bus-amd64
binary: build/sysroot/usr/bin/casaos-message-bus
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# CasaOS-MessageBus

[![Go Reference](https://pkg.go.dev/badge/github.com/IceWhaleTech/CasaOS-MessageBus.svg)](https://pkg.go.dev/github.com/IceWhaleTech/CasaOS-MessageBus) [![Go Report Card](https://goreportcard.com/badge/github.com/IceWhaleTech/CasaOS-MessageBus)](https://goreportcard.com/report/github.com/IceWhaleTech/CasaOS-MessageBus) [![goreleaser](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml/badge.svg)](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml)
[![Go Reference](https://pkg.go.dev/badge/github.com/IceWhaleTech/CasaOS-MessageBus.svg)](https://pkg.go.dev/github.com/IceWhaleTech/CasaOS-MessageBus) [![Go Report Card](https://goreportcard.com/badge/github.com/IceWhaleTech/CasaOS-MessageBus)](https://goreportcard.com/report/github.com/IceWhaleTech/CasaOS-MessageBus) [![goreleaser](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml/badge.svg)](https://github.com/IceWhaleTech/CasaOS-MessageBus/actions/workflows/release.yml) [![codecov](https://codecov.io/gh/IceWhaleTech/CasaOS-MessageBus/branch/main/graph/badge.svg?token=U4S4ZSZAL9)](https://codecov.io/gh/IceWhaleTech/CasaOS-MessageBus)

Message bus accepts events and actions from various sources and delivers them to subscribers.

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ require (
modernc.org/libc v1.22.2 // indirect
modernc.org/mathutil v1.5.0 // indirect
modernc.org/memory v1.5.0 // indirect
modernc.org/sqlite v1.20.1 // indirect
modernc.org/sqlite v1.20.2 // indirect
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ modernc.org/memory v1.5.0/go.mod h1:PkUhL0Mugw21sHPeskwZW4D6VscE/GQJOnIpCnW6pSU=
modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sqlite v1.20.0/go.mod h1:EsYz8rfOvLCiYTy5ZFsOYzoCcRMu98YYkwAcCw5YIYw=
modernc.org/sqlite v1.20.1 h1:z6qRLw72B0VfRrJjs3l6hWkzYDx1bo0WGVrBGP4ohhM=
modernc.org/sqlite v1.20.1/go.mod h1:fODt+bFmc/j8LcoCbMSkAuKuGmhxjG45KGc25N2705M=
modernc.org/sqlite v1.20.2 h1:9AaVzJH1Yf0u9iOZRjjuvqxLoGqybqVFbAUC5rvi9u8=
modernc.org/sqlite v1.20.2/go.mod h1:zKcGyrICaxNTMEHSr1HQ2GUraP0j+845GYw37+EyT6A=
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
modernc.org/tcl v1.15.0/go.mod h1:xRoGotBZ6dU+Zo2tca+2EqVEeMmOUBzHnhIwq4YrVnE=
modernc.org/token v1.0.1/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM=
Expand Down
55 changes: 33 additions & 22 deletions service/event_service_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
type EventServiceWS struct {
typeService *EventTypeService

ctx *context.Context
mutex sync.Mutex
stop chan struct{}
ctx *context.Context
stop chan struct{}

inboundChannel chan model.Event
subscriberChannels map[string]map[string][]chan model.Event
}

var mutex = &sync.Mutex{}

func (s *EventServiceWS) Publish(event model.Event) {
if s.inboundChannel == nil {
logger.Error("error when publishing event via websocket", zap.Error(ErrInboundChannelNotFound))
Expand Down Expand Up @@ -70,22 +71,28 @@ func (s *EventServiceWS) Subscribe(sourceID string, names []string) (chan model.
}
}

if s.subscriberChannels == nil {
s.subscriberChannels = make(map[string]map[string][]chan model.Event)
}
c := func() chan model.Event {
mutex.Lock()
defer mutex.Unlock()

if s.subscriberChannels[sourceID] == nil {
s.subscriberChannels[sourceID] = make(map[string][]chan model.Event)
}
if s.subscriberChannels == nil {
s.subscriberChannels = make(map[string]map[string][]chan model.Event)
}

if s.subscriberChannels[sourceID] == nil {
s.subscriberChannels[sourceID] = make(map[string][]chan model.Event)
}

c := make(chan model.Event, 1)
c := make(chan model.Event, 1)

for _, name := range names {
if s.subscriberChannels[sourceID][name] == nil {
s.subscriberChannels[sourceID][name] = make([]chan model.Event, 0)
for _, name := range names {
if s.subscriberChannels[sourceID][name] == nil {
s.subscriberChannels[sourceID][name] = make([]chan model.Event, 0)
}
s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c)
}
s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c)
}
return c
}()

return c, nil
}
Expand All @@ -104,8 +111,8 @@ func (s *EventServiceWS) Unsubscribe(sourceID string, name string, c chan model.
}

for i, subscriber := range s.subscriberChannels[sourceID][name] {
s.mutex.Lock()
defer s.mutex.Unlock()
mutex.Lock()
defer mutex.Unlock()

if subscriber == c {
logger.Info("unsubscribing from event type", zap.String("sourceID", sourceID), zap.String("name", name), zap.Int("subscriber", i))
Expand All @@ -122,12 +129,16 @@ func (s *EventServiceWS) Unsubscribe(sourceID string, name string, c chan model.
}

func (s *EventServiceWS) Start(ctx *context.Context) {
s.ctx = ctx
s.mutex = sync.Mutex{}
func() {
mutex.Lock()
defer mutex.Unlock()

s.ctx = ctx

s.inboundChannel = make(chan model.Event)
s.subscriberChannels = make(map[string]map[string][]chan model.Event)
s.stop = make(chan struct{})
s.inboundChannel = make(chan model.Event)
s.subscriberChannels = make(map[string]map[string][]chan model.Event)
s.stop = make(chan struct{})
}()

defer func() {
if s.subscriberChannels != nil {
Expand Down

0 comments on commit 2a5b6cb

Please sign in to comment.