Skip to content

Commit

Permalink
minor optimization in publishing message via websocket (#24)
Browse files Browse the repository at this point in the history
Signed-off-by: Tiger Wang <[email protected]>
  • Loading branch information
tigerinus authored Jan 13, 2023
1 parent 95ba219 commit e1c1b78
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 40 deletions.
15 changes: 5 additions & 10 deletions route/api_route_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,17 @@ func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name
return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message})
}

action := codegen.Action{
action := in.ActionAdapter(codegen.Action{
SourceID: sourceID,
Name: name,
Properties: properties,
Timestamp: utils.Ptr(time.Now()),
}

go r.services.SocketIOService.Publish(in.ActionAdapter(action))
})

result, err := r.services.ActionServiceWS.Trigger(in.ActionAdapter(action))
if err != nil {
message := err.Error()
return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message})
}
go r.services.SocketIOService.Publish(action)
go r.services.ActionServiceWS.Trigger(action)

return c.JSON(http.StatusOK, out.ActionAdapter(*result))
return c.JSON(http.StatusOK, out.ActionAdapter(action))
}

func (r *APIRoute) SubscribeActionWS(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionWSParams) error {
Expand Down
15 changes: 5 additions & 10 deletions route/api_route_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,18 @@ func (r *APIRoute) PublishEvent(ctx echo.Context, sourceID codegen.SourceID, nam
}

uuidStr := uuid.New().String()
event := codegen.Event{
event := in.EventAdapter(codegen.Event{
SourceID: sourceID,
Name: name,
Properties: properties,
Timestamp: utils.Ptr(time.Now()),
Uuid: &uuidStr,
}

go r.services.SocketIOService.Publish(in.EventAdapter(event))
})

result, err := r.services.EventServiceWS.Publish(in.EventAdapter(event))
if err != nil {
message := err.Error()
return ctx.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message})
}
go r.services.SocketIOService.Publish(event)
go r.services.EventServiceWS.Publish(event)

return ctx.JSON(http.StatusOK, out.EventAdapter(*result))
return ctx.JSON(http.StatusOK, out.EventAdapter(event))
}

func (r *APIRoute) SubscribeEventWS(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeEventWSParams) error {
Expand Down
15 changes: 8 additions & 7 deletions service/action_service_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,28 @@ type ActionServiceWS struct {
subscriberChannels map[string]map[string][]chan model.Action
}

func (s *ActionServiceWS) Trigger(action model.Action) (*model.Action, error) {
func (s *ActionServiceWS) Trigger(action model.Action) {
if s.inboundChannel == nil {
return nil, ErrInboundChannelNotFound
logger.Error("error when triggering action via websocket", zap.Error(ErrInboundChannelNotFound))
}

if action.Timestamp == 0 {
action.Timestamp = time.Now().Unix()
}

// TODO - ensure properties are valid for event type
// TODO - ensure properties are valid for action type

select {
case s.inboundChannel <- action:

case <-(*s.ctx).Done():
return nil, (*s.ctx).Err()
if err := (*s.ctx).Err(); err != nil {
logger.Info(err.Error())
}
return

default: // drop event if no one is listening
default: // drop action if no one is listening
}

return &action, nil
}

func (s *ActionServiceWS) Subscribe(sourceID string, names []string) (chan model.Action, error) {
Expand Down
11 changes: 6 additions & 5 deletions service/event_service_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ type EventServiceWS struct {
subscriberChannels map[string]map[string][]chan model.Event
}

func (s *EventServiceWS) Publish(event model.Event) (*model.Event, error) {
func (s *EventServiceWS) Publish(event model.Event) {
if s.inboundChannel == nil {
return nil, ErrInboundChannelNotFound
logger.Error("error when publishing event via websocket", zap.Error(ErrInboundChannelNotFound))
}

if event.Timestamp == 0 {
Expand All @@ -38,12 +38,13 @@ func (s *EventServiceWS) Publish(event model.Event) (*model.Event, error) {
case s.inboundChannel <- event:

case <-(*s.ctx).Done():
return nil, (*s.ctx).Err()
if err := (*s.ctx).Err(); err != nil {
logger.Info(err.Error())
}
return

default: // drop event if no one is listening
}

return &event, nil
}

func (s *EventServiceWS) Subscribe(sourceID string, names []string) (chan model.Event, error) {
Expand Down
7 changes: 3 additions & 4 deletions service/event_type_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,11 @@ func TestEventTypeService(t *testing.T) {
// },
}

actualEvent1, err := wsService.Publish(expectedEvent)
wsService.Publish(expectedEvent)
assert.NilError(t, err)
assert.DeepEqual(t, model.Event{SourceID: actualEvent1.SourceID, Name: actualEvent1.Name, Properties: actualEvent1.Properties}, expectedEvent)

actualEvent2, ok := <-outputChannel
actualEvent, ok := <-outputChannel
assert.Equal(t, ok, true)
assert.DeepEqual(t, actualEvent2, *actualEvent1)
assert.DeepEqual(t, model.Event{SourceID: actualEvent.SourceID, Name: actualEvent.Name, Properties: actualEvent.Properties}, expectedEvent)
}
}
8 changes: 4 additions & 4 deletions service/socketio_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ type SocketIOService struct {
server *socketio.Server
}

func (s *SocketIOService) Publish(entity interface{}) {
if event, ok := entity.(model.Event); ok {
func (s *SocketIOService) Publish(message interface{}) {
if event, ok := message.(model.Event); ok {
s.server.BroadcastToRoom("/", "event", event.Name, event)
return
}

if action, ok := entity.(model.Action); ok {
if action, ok := message.(model.Action); ok {
s.server.BroadcastToRoom("/", "action", action.Name, action)
return
}

logger.Error("unknown entity type", zap.Any("entity", entity))
logger.Error("unknown message type", zap.Any("message", message))
}

func (s *SocketIOService) Start(ctx *context.Context) {
Expand Down

0 comments on commit e1c1b78

Please sign in to comment.