diff --git a/route/api_route_action.go b/route/api_route_action.go index 556fd63..01bbb20 100644 --- a/route/api_route_action.go +++ b/route/api_route_action.go @@ -93,22 +93,17 @@ func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) } - action := codegen.Action{ + action := in.ActionAdapter(codegen.Action{ SourceID: sourceID, Name: name, Properties: properties, Timestamp: utils.Ptr(time.Now()), - } - - go r.services.SocketIOService.Publish(in.ActionAdapter(action)) + }) - result, err := r.services.ActionServiceWS.Trigger(in.ActionAdapter(action)) - if err != nil { - message := err.Error() - return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) - } + go r.services.SocketIOService.Publish(action) + go r.services.ActionServiceWS.Trigger(action) - return c.JSON(http.StatusOK, out.ActionAdapter(*result)) + return c.JSON(http.StatusOK, out.ActionAdapter(action)) } func (r *APIRoute) SubscribeActionWS(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionWSParams) error { diff --git a/route/api_route_event.go b/route/api_route_event.go index 27ff3d8..cc8819d 100644 --- a/route/api_route_event.go +++ b/route/api_route_event.go @@ -104,23 +104,18 @@ func (r *APIRoute) PublishEvent(ctx echo.Context, sourceID codegen.SourceID, nam } uuidStr := uuid.New().String() - event := codegen.Event{ + event := in.EventAdapter(codegen.Event{ SourceID: sourceID, Name: name, Properties: properties, Timestamp: utils.Ptr(time.Now()), Uuid: &uuidStr, - } - - go r.services.SocketIOService.Publish(in.EventAdapter(event)) + }) - result, err := r.services.EventServiceWS.Publish(in.EventAdapter(event)) - if err != nil { - message := err.Error() - return ctx.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) - } + go r.services.SocketIOService.Publish(event) + go r.services.EventServiceWS.Publish(event) - return ctx.JSON(http.StatusOK, out.EventAdapter(*result)) + return ctx.JSON(http.StatusOK, out.EventAdapter(event)) } func (r *APIRoute) SubscribeEventWS(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeEventWSParams) error { diff --git a/service/action_service_websocket.go b/service/action_service_websocket.go index a2b2c3f..5c78328 100644 --- a/service/action_service_websocket.go +++ b/service/action_service_websocket.go @@ -22,27 +22,28 @@ type ActionServiceWS struct { subscriberChannels map[string]map[string][]chan model.Action } -func (s *ActionServiceWS) Trigger(action model.Action) (*model.Action, error) { +func (s *ActionServiceWS) Trigger(action model.Action) { if s.inboundChannel == nil { - return nil, ErrInboundChannelNotFound + logger.Error("error when triggering action via websocket", zap.Error(ErrInboundChannelNotFound)) } if action.Timestamp == 0 { action.Timestamp = time.Now().Unix() } - // TODO - ensure properties are valid for event type + // TODO - ensure properties are valid for action type select { case s.inboundChannel <- action: case <-(*s.ctx).Done(): - return nil, (*s.ctx).Err() + if err := (*s.ctx).Err(); err != nil { + logger.Info(err.Error()) + } + return - default: // drop event if no one is listening + default: // drop action if no one is listening } - - return &action, nil } func (s *ActionServiceWS) Subscribe(sourceID string, names []string) (chan model.Action, error) { diff --git a/service/event_service_websocket.go b/service/event_service_websocket.go index 6707b96..1eb7018 100644 --- a/service/event_service_websocket.go +++ b/service/event_service_websocket.go @@ -23,9 +23,9 @@ type EventServiceWS struct { subscriberChannels map[string]map[string][]chan model.Event } -func (s *EventServiceWS) Publish(event model.Event) (*model.Event, error) { +func (s *EventServiceWS) Publish(event model.Event) { if s.inboundChannel == nil { - return nil, ErrInboundChannelNotFound + logger.Error("error when publishing event via websocket", zap.Error(ErrInboundChannelNotFound)) } if event.Timestamp == 0 { @@ -38,12 +38,13 @@ func (s *EventServiceWS) Publish(event model.Event) (*model.Event, error) { case s.inboundChannel <- event: case <-(*s.ctx).Done(): - return nil, (*s.ctx).Err() + if err := (*s.ctx).Err(); err != nil { + logger.Info(err.Error()) + } + return default: // drop event if no one is listening } - - return &event, nil } func (s *EventServiceWS) Subscribe(sourceID string, names []string) (chan model.Event, error) { diff --git a/service/event_type_service_test.go b/service/event_type_service_test.go index 2d07760..9a008e0 100644 --- a/service/event_type_service_test.go +++ b/service/event_type_service_test.go @@ -90,12 +90,11 @@ func TestEventTypeService(t *testing.T) { // }, } - actualEvent1, err := wsService.Publish(expectedEvent) + wsService.Publish(expectedEvent) assert.NilError(t, err) - assert.DeepEqual(t, model.Event{SourceID: actualEvent1.SourceID, Name: actualEvent1.Name, Properties: actualEvent1.Properties}, expectedEvent) - actualEvent2, ok := <-outputChannel + actualEvent, ok := <-outputChannel assert.Equal(t, ok, true) - assert.DeepEqual(t, actualEvent2, *actualEvent1) + assert.DeepEqual(t, model.Event{SourceID: actualEvent.SourceID, Name: actualEvent.Name, Properties: actualEvent.Properties}, expectedEvent) } } diff --git a/service/socketio_service.go b/service/socketio_service.go index c772493..e57cc52 100644 --- a/service/socketio_service.go +++ b/service/socketio_service.go @@ -18,18 +18,18 @@ type SocketIOService struct { server *socketio.Server } -func (s *SocketIOService) Publish(entity interface{}) { - if event, ok := entity.(model.Event); ok { +func (s *SocketIOService) Publish(message interface{}) { + if event, ok := message.(model.Event); ok { s.server.BroadcastToRoom("/", "event", event.Name, event) return } - if action, ok := entity.(model.Action); ok { + if action, ok := message.(model.Action); ok { s.server.BroadcastToRoom("/", "action", action.Name, action) return } - logger.Error("unknown entity type", zap.Any("entity", entity)) + logger.Error("unknown message type", zap.Any("message", message)) } func (s *SocketIOService) Start(ctx *context.Context) {