diff --git a/api/message_bus/openapi.yaml b/api/message_bus/openapi.yaml index 9b08a67..a327265 100644 --- a/api/message_bus/openapi.yaml +++ b/api/message_bus/openapi.yaml @@ -148,10 +148,12 @@ paths: /event/{source_id}: get: - summary: Subscribe to an event type (WebSocket) + summary: Subscribe to events by source ID (WebSocket) description: | - Subscribe to an event type by source ID and name via WebSocket. - operationId: subscribeEvent + Subscribe to event by `source_id` via WebSocket. + + > `names` can be specified in the query string to subscribe to specific event types. + operationId: subscribeEventWS tags: - Event methods parameters: @@ -162,6 +164,34 @@ paths: description: | The connection will be upgraded to a WebSocket connection and the client will receive events as they are published. + /event: + get: + summary: Subscribe to events by source ID (SocketIO) + description: | + Subscribe to events by `source_id` via SocketIO. + + > `source_id` should be specified as the SocketIO *event* (which is a different from *event* in message bus) when listening. + operationId: subscribeEventSIO + tags: + - Event methods + responses: + "101": + description: | + The connection will be upgraded to a SocketIO connection and the client will receive events as they are published. + + /event/: + get: + summary: Subscribe to all event types (SocketIO) + description: | + Subscribe to all event types via SocketIO. + > In URI definition, `/event` and `/event/` are different paths. This endpoint is a workaround to maximize compatibility. + operationId: subscribeEventSIO2 + responses: + "101": + description: | + The connection will be upgraded to a SocketIO connection and the client will receive events as they are published. + + /action_type: get: summary: List action types @@ -247,10 +277,12 @@ paths: /action/{source_id}: get: - summary: Subscribe to an action type (WebSocket) + summary: Subscribe to actions by source ID (WebSocket) description: | - Subscribe to an action type by source ID and name via WebSocket. - operationId: subscribeAction + Subscribe to actions by `source_id` via WebSocket. + + > `names` can be specified in the query string to subscribe to specific action types. + operationId: subscribeActionWS tags: - Action methods parameters: @@ -261,6 +293,33 @@ paths: description: | The connection will be upgraded to a WebSocket connection and the client will receive actions as they are triggered. + /action: + get: + summary: Subscribe to actions by source ID (SocketIO) + description: | + Subscribe to actions by `source_id` via SocketIO. + + > `source_id` should be specified as the SocketIO *event* (which is a different from *event* in message bus) when listening. + operationId: subscribeActionSIO + tags: + - Action methods + responses: + "101": + description: | + The connection will be upgraded to a SocketIO connection and the client will receive actions as they are triggered. + + /action/: + get: + summary: Subscribe to all action types (SocketIO) + description: | + Subscribe to all action types via SocketIO. + > In URI definition, `/action` and `/action/` are different paths. This endpoint is a workaround to maximize compatibility. + operationId: subscribeActionSIO2 + responses: + "101": + description: | + The connection will be upgraded to a SocketIO connection and the client will receive actions as they are triggered. + components: securitySchemes: access_token: diff --git a/common/constants.go b/common/constants.go index 7e8645c..3aac45e 100644 --- a/common/constants.go +++ b/common/constants.go @@ -1,7 +1,7 @@ package common const ( - MessageBusVersion = "0.4.1" + MessageBusVersion = "0.4.2" MessageBusServiceName = "message-bus" MessageBusSourceID = "message-bus" diff --git a/go.mod b/go.mod index 34a0458..286b4b2 100644 --- a/go.mod +++ b/go.mod @@ -27,11 +27,14 @@ require ( github.com/gobwas/pool v0.2.1 // indirect github.com/goccy/go-json v0.10.0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/gofrs/uuid v4.0.0+incompatible // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.4.3 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/gomodule/redigo v1.8.4 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/gorilla/mux v1.8.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.15.13 // indirect @@ -64,6 +67,7 @@ require ( github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/swag v0.22.3 // indirect github.com/google/uuid v1.3.0 + github.com/googollee/go-socket.io v1.6.2 github.com/invopop/yaml v0.2.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/labstack/echo/v4 v4.10.0 diff --git a/go.sum b/go.sum index c47d5f7..d15fe24 100644 --- a/go.sum +++ b/go.sum @@ -61,6 +61,8 @@ github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= +github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-jwt/jwt/v4 v4.4.3 h1:Hxl6lhQFj4AnOX6MLrsCb/+7tCj7DxP7VA+2rDIq5AU= @@ -69,6 +71,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gomodule/redigo v1.8.4 h1:Z5JUg94HMTR1XpwBaSH4vq3+PNSIykBLxMdglbw10gg= +github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= @@ -77,8 +81,13 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googollee/go-socket.io v1.6.2 h1:olKLLHJtHz1IkL/OrTyNriZZvVQYEORNkJAqsOwPask= +github.com/googollee/go-socket.io v1.6.2/go.mod h1:0vGP8/dXR9SZUMMD4+xxaGo/lohOw3YWMh2WRiWeKxg= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q= github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY= @@ -160,6 +169,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= @@ -249,6 +259,7 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/route/api_route_action.go b/route/api_route_action.go index afa74df..08d1a75 100644 --- a/route/api_route_action.go +++ b/route/api_route_action.go @@ -19,7 +19,7 @@ import ( ) func (r *APIRoute) GetActionTypes(c echo.Context) error { - actionType, err := r.services.ActionService.GetActionTypes() + actionType, err := r.services.ActionTypeService.GetActionTypes() if err != nil { message := err.Error() return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) @@ -42,7 +42,7 @@ func (r *APIRoute) RegisterActionTypes(c echo.Context) error { } for _, actionType := range actionTypes { - _, err := r.services.ActionService.RegisterActionType(in.ActionTypeAdapter(actionType)) + _, err := r.services.ActionTypeService.RegisterActionType(in.ActionTypeAdapter(actionType)) if err != nil { message := err.Error() return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) @@ -53,7 +53,7 @@ func (r *APIRoute) RegisterActionTypes(c echo.Context) error { } func (r *APIRoute) GetActionTypesBySourceID(c echo.Context, sourceID codegen.SourceID) error { - results, err := r.services.ActionService.GetActionTypesBySourceID(sourceID) + results, err := r.services.ActionTypeService.GetActionTypesBySourceID(sourceID) if err != nil { message := err.Error() return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) @@ -63,7 +63,7 @@ func (r *APIRoute) GetActionTypesBySourceID(c echo.Context, sourceID codegen.Sou } func (r *APIRoute) GetActionType(c echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - result, err := r.services.ActionService.GetActionType(sourceID, name) + result, err := r.services.ActionTypeService.GetActionType(sourceID, name) if err != nil { message := err.Error() return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) @@ -77,7 +77,7 @@ func (r *APIRoute) GetActionType(c echo.Context, sourceID codegen.SourceID, name } func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - actionType, err := r.services.ActionService.GetActionType(sourceID, name) + actionType, err := r.services.ActionTypeService.GetActionType(sourceID, name) if err != nil { message := err.Error() return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) @@ -100,7 +100,7 @@ func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name Timestamp: utils.Ptr(time.Now()), } - result, err := r.services.ActionService.Trigger(in.ActionAdapter(action)) + result, err := r.services.ActionServiceWS.Trigger(in.ActionAdapter(action)) if err != nil { message := err.Error() return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) @@ -109,11 +109,11 @@ func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name return c.JSON(http.StatusOK, out.ActionAdapter(*result)) } -func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionParams) error { +func (r *APIRoute) SubscribeActionWS(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionWSParams) error { var actionNames []string if params.Names != nil { for _, actionName := range *params.Names { - actionType, err := r.services.ActionService.GetActionType(sourceID, actionName) + actionType, err := r.services.ActionTypeService.GetActionType(sourceID, actionName) if err != nil { message := err.Error() return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) @@ -126,7 +126,7 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa actionNames = append(actionNames, actionName) } } else { - actionTypes, err := r.services.ActionService.GetActionTypesBySourceID(sourceID) + actionTypes, err := r.services.ActionTypeService.GetActionTypesBySourceID(sourceID) if err != nil { message := err.Error() return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) @@ -143,7 +143,7 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) } - channel, err := r.services.ActionService.Subscribe(sourceID, actionNames) + channel, err := r.services.ActionServiceWS.Subscribe(sourceID, actionNames) if err != nil { conn.Close() // need to close connection here, instead of defer, because of the goroutine message := err.Error() @@ -155,24 +155,24 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa defer close(channel) defer func(actionNames []string) { for _, name := range actionNames { - if err := r.services.ActionService.Unsubscribe(sourceID, name, channel); err != nil { - logger.Error("error when trying to unsubscribe an action type", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name)) + if err := r.services.ActionServiceWS.Unsubscribe(sourceID, name, channel); err != nil { + logger.Error("error when trying to unsubscribe an action type via websocket", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name)) } } }(actionNames) - logger.Info("started", zap.String("remote_addr", conn.RemoteAddr().String())) + logger.Info("a websocket connection has started for actions", zap.String("remote_addr", conn.RemoteAddr().String())) for { action, ok := <-channel if !ok { - logger.Info("channel closed") + logger.Info("websocket channel for events is closed") return } if action.SourceID == common.MessageBusSourceID && action.Name == common.MessageBusHeartbeatName { if err := wsutil.WriteServerMessage(conn, ws.OpPing, []byte{}); err != nil { - logger.Error("error when trying to send ping message", zap.Error(err)) + logger.Error("error when trying to send ping message via websocket", zap.Error(err)) return } continue @@ -180,17 +180,17 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa message, err := json.Marshal(out.ActionAdapter(action)) if err != nil { - logger.Error("error when trying to marshal action", zap.Error(err)) + logger.Error("error when trying to marshal action for websocket", zap.Error(err)) continue } - logger.Info("sending", zap.String("remote_addr", conn.RemoteAddr().String()), zap.String("message", string(message))) + logger.Info("sending action via websocket", zap.String("remote_addr", conn.RemoteAddr().String()), zap.String("message", string(message))) if err := wsutil.WriteServerBinary(conn, message); err != nil { if _, ok := err.(*net.OpError); ok { - logger.Info("ended", zap.String("error", err.Error())) + logger.Info("websocket connection ended", zap.String("error", err.Error())) } else { - logger.Error("error", zap.String("error", err.Error())) + logger.Error("error when sending event via websocket", zap.String("error", err.Error())) } return } @@ -199,3 +199,14 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa return nil } + +func (r *APIRoute) SubscribeActionSIO(ctx echo.Context) error { + server := r.services.ActionServiceSIO.Server() + server.ServeHTTP(ctx.Response(), ctx.Request()) + return nil +} + +// unfortunately need to duplicate this func to support both `/action` and `/action/` API endpoints +func (r *APIRoute) SubscribeActionSIO2(ctx echo.Context) error { + return r.SubscribeActionSIO(ctx) +} diff --git a/route/api_route_event.go b/route/api_route_event.go index fe76e7b..365146d 100644 --- a/route/api_route_event.go +++ b/route/api_route_event.go @@ -22,7 +22,7 @@ import ( ) func (r *APIRoute) GetEventTypes(ctx echo.Context) error { - eventTypes, err := r.services.EventService.GetEventTypes() + eventTypes, err := r.services.EventTypeService.GetEventTypes() if err != nil { message := err.Error() return ctx.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) @@ -45,7 +45,7 @@ func (r *APIRoute) RegisterEventTypes(ctx echo.Context) error { } for _, eventType := range eventTypes { - _, err := r.services.EventService.RegisterEventType(in.EventTypeAdapter(eventType)) + _, err := r.services.EventTypeService.RegisterEventType(in.EventTypeAdapter(eventType)) if err != nil { message := err.Error() return ctx.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) @@ -56,7 +56,7 @@ func (r *APIRoute) RegisterEventTypes(ctx echo.Context) error { } func (r *APIRoute) GetEventTypesBySourceID(ctx echo.Context, sourceID codegen.SourceID) error { - results, err := r.services.EventService.GetEventTypesBySourceID(sourceID) + results, err := r.services.EventTypeService.GetEventTypesBySourceID(sourceID) if err != nil { message := err.Error() return ctx.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) @@ -66,7 +66,7 @@ func (r *APIRoute) GetEventTypesBySourceID(ctx echo.Context, sourceID codegen.So } func (r *APIRoute) GetEventType(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - result, err := r.services.EventService.GetEventType(sourceID, name) + result, err := r.services.EventTypeService.GetEventType(sourceID, name) if err != nil { message := err.Error() return ctx.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) @@ -80,7 +80,7 @@ func (r *APIRoute) GetEventType(ctx echo.Context, sourceID codegen.SourceID, nam } func (r *APIRoute) PublishEvent(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - eventType, err := r.services.EventService.GetEventType(sourceID, name) + eventType, err := r.services.EventTypeService.GetEventType(sourceID, name) if err != nil { message := err.Error() return ctx.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) @@ -112,7 +112,9 @@ func (r *APIRoute) PublishEvent(ctx echo.Context, sourceID codegen.SourceID, nam Uuid: &uuidStr, } - result, err := r.services.EventService.Publish(in.EventAdapter(event)) + go r.services.EventServiceSIO.Publish(in.EventAdapter(event)) + + result, err := r.services.EventServiceWS.Publish(in.EventAdapter(event)) if err != nil { message := err.Error() return ctx.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) @@ -121,11 +123,11 @@ func (r *APIRoute) PublishEvent(ctx echo.Context, sourceID codegen.SourceID, nam return ctx.JSON(http.StatusOK, out.EventAdapter(*result)) } -func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeEventParams) error { +func (r *APIRoute) SubscribeEventWS(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeEventWSParams) error { var eventNames []string if params.Names != nil { for _, eventName := range *params.Names { - eventType, err := r.services.EventService.GetEventType(sourceID, eventName) + eventType, err := r.services.EventTypeService.GetEventType(sourceID, eventName) if err != nil { message := err.Error() return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) @@ -138,7 +140,7 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par eventNames = append(eventNames, eventName) } } else { - eventTypes, err := r.services.EventService.GetEventTypesBySourceID(sourceID) + eventTypes, err := r.services.EventTypeService.GetEventTypesBySourceID(sourceID) if err != nil || len(eventTypes) == 0 { if err != nil { message := err.Error() @@ -159,7 +161,7 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) } - channel, err := r.services.EventService.Subscribe(sourceID, eventNames) + channel, err := r.services.EventServiceWS.Subscribe(sourceID, eventNames) if err != nil { conn.Close() // need to close connection here, instead of defer, because of the goroutine message := err.Error() @@ -171,24 +173,24 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par defer close(channel) defer func(eventNames []string) { for _, name := range eventNames { - if err := r.services.EventService.Unsubscribe(sourceID, name, channel); err != nil { - logger.Error("error when trying to unsubscribe an event type", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name)) + if err := r.services.EventServiceWS.Unsubscribe(sourceID, name, channel); err != nil { + logger.Error("error when trying to unsubscribe an event type via websocket", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name)) } } }(eventNames) - logger.Info("started", zap.String("remote_addr", conn.RemoteAddr().String())) + logger.Info("a websocket connection has started for events", zap.String("remote_addr", conn.RemoteAddr().String())) for { event, ok := <-channel if !ok { - logger.Info("channel closed") + logger.Info("websocket channel for events is closed") return } if event.SourceID == common.MessageBusSourceID && event.Name == common.MessageBusHeartbeatName { if err := wsutil.WriteServerMessage(conn, ws.OpPing, []byte{}); err != nil { - logger.Error("error when trying to send ping message", zap.Error(err)) + logger.Error("error when trying to send ping message via websocket", zap.Error(err)) return } continue @@ -196,17 +198,17 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par message, err := json.Marshal(out.EventAdapter(event)) if err != nil { - logger.Error("error when trying to marshal event", zap.Error(err)) + logger.Error("error when trying to marshal event for websocket", zap.Error(err)) continue } - logger.Info("sending", zap.String("remote_addr", conn.RemoteAddr().String()), zap.String("message", string(message))) + logger.Info("sending event via websocket", zap.String("remote_addr", conn.RemoteAddr().String()), zap.String("message", string(message))) if err := wsutil.WriteServerText(conn, message); err != nil { if _, ok := err.(*net.OpError); ok { - logger.Info("ended", zap.String("error", err.Error())) + logger.Info("websocket connection ended", zap.String("error", err.Error())) } else { - logger.Error("error", zap.String("error", err.Error())) + logger.Error("error when sending event via websocket", zap.String("error", err.Error())) } return } @@ -215,3 +217,14 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par return nil } + +func (r *APIRoute) SubscribeEventSIO(ctx echo.Context) error { + server := r.services.EventServiceSIO.Server() + server.ServeHTTP(ctx.Response(), ctx.Request()) + return nil +} + +// unfortunately need to duplicate the func to support both `/event` and `/event/` API endpoints +func (r *APIRoute) SubscribeEventSIO2(ctx echo.Context) error { + return r.SubscribeEventSIO(ctx) +} diff --git a/route/api_route_event_test.go b/route/api_route_event_test.go index b4d2e10..4c2b47a 100644 --- a/route/api_route_event_test.go +++ b/route/api_route_event_test.go @@ -19,7 +19,10 @@ import ( var json2 = jsoniter.ConfigCompatibleWithStandardLibrary func TestEventRoute(t *testing.T) { - defer goleak.VerifyNone(t) + defer goleak.VerifyNone( + t, + goleak.IgnoreTopFunction("github.com/googollee/go-socket.io/engineio.(*Server).Accept"), // there is a goroutine leak in go-socket.io + ) sourceID := "Foo" name := "Bar" diff --git a/service/action_service_socketio.go b/service/action_service_socketio.go new file mode 100644 index 0000000..71d98ca --- /dev/null +++ b/service/action_service_socketio.go @@ -0,0 +1,34 @@ +package service + +import ( + "context" + + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" + socketio "github.com/googollee/go-socket.io" + "go.uber.org/zap" +) + +type ActionServiceSIO struct { + server *socketio.Server +} + +func (s *ActionServiceSIO) Trigger(action model.Action) { + s.server.BroadcastToNamespace("/", action.SourceID, action) +} + +func (s *ActionServiceSIO) Start(ctx *context.Context) { + if err := s.server.Serve(); err != nil { + logger.Error("error when serving socketio for actions", zap.Error(err)) + } +} + +func (s *ActionServiceSIO) Server() *socketio.Server { + return s.server +} + +func NewActionServiceSIO() *ActionServiceSIO { + return &ActionServiceSIO{ + server: buildServer(), + } +} diff --git a/service/action_service_websocket.go b/service/action_service_websocket.go new file mode 100644 index 0000000..a2b2c3f --- /dev/null +++ b/service/action_service_websocket.go @@ -0,0 +1,225 @@ +package service + +import ( + "context" + "sync" + "time" + + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" + "github.com/IceWhaleTech/CasaOS-MessageBus/common" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" + "go.uber.org/zap" +) + +type ActionServiceWS struct { + typeService *ActionTypeService + + ctx *context.Context + mutex sync.Mutex + stop chan struct{} + + inboundChannel chan model.Action + subscriberChannels map[string]map[string][]chan model.Action +} + +func (s *ActionServiceWS) Trigger(action model.Action) (*model.Action, error) { + if s.inboundChannel == nil { + return nil, ErrInboundChannelNotFound + } + + if action.Timestamp == 0 { + action.Timestamp = time.Now().Unix() + } + + // TODO - ensure properties are valid for event type + + select { + case s.inboundChannel <- action: + + case <-(*s.ctx).Done(): + return nil, (*s.ctx).Err() + + default: // drop event if no one is listening + } + + return &action, nil +} + +func (s *ActionServiceWS) Subscribe(sourceID string, names []string) (chan model.Action, error) { + if len(names) == 0 { + actionTypes, err := s.typeService.GetActionTypesBySourceID(sourceID) + if err != nil { + return nil, err + } + + for _, actionType := range actionTypes { + names = append(names, actionType.Name) + } + } + + for _, name := range names { + actionType, err := s.typeService.GetActionType(sourceID, name) + if err != nil { + return nil, err + } + + if actionType == nil { + return nil, ErrActionNameNotFound + } + } + + if s.subscriberChannels == nil { + s.subscriberChannels = make(map[string]map[string][]chan model.Action) + } + + if s.subscriberChannels[sourceID] == nil { + s.subscriberChannels[sourceID] = make(map[string][]chan model.Action) + } + + c := make(chan model.Action, 1) + + for _, name := range names { + if s.subscriberChannels[sourceID][name] == nil { + s.subscriberChannels[sourceID][name] = make([]chan model.Action, 0) + } + s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c) + } + + return c, nil +} + +func (s *ActionServiceWS) Unsubscribe(sourceID string, name string, c chan model.Action) error { + if s.subscriberChannels == nil { + return ErrSubscriberChannelsNotFound + } + + if s.subscriberChannels[sourceID] == nil { + return ErrActionSourceIDNotFound + } + + if s.subscriberChannels[sourceID][name] == nil { + return ErrActionNameNotFound + } + + for i, subscriber := range s.subscriberChannels[sourceID][name] { + s.mutex.Lock() + defer s.mutex.Unlock() + + if subscriber == c { + if i >= len(s.subscriberChannels[sourceID][name]) { + logger.Error("the i-th subscriber is removed before we get here - concurrency issue?", zap.Int("subscriber", i), zap.Int("total", len(s.subscriberChannels[sourceID][name]))) + return ErrAlreadySubscribed + } + s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name][:i], s.subscriberChannels[sourceID][name][i+1:]...) + return nil + } + } + + return nil +} + +func (s *ActionServiceWS) Start(ctx *context.Context) { + s.ctx = ctx + s.mutex = sync.Mutex{} + + s.inboundChannel = make(chan model.Action) + s.subscriberChannels = make(map[string]map[string][]chan model.Action) + s.stop = make(chan struct{}) + + defer func() { + if s.subscriberChannels != nil { + for sourceID, source := range s.subscriberChannels { + for actionName, subscribers := range source { + for _, subscriber := range subscribers { + select { + case _, ok := <-subscriber: + if ok { + close(subscriber) + } + default: + continue + } + } + delete(s.subscriberChannels[sourceID], actionName) + } + delete(s.subscriberChannels, sourceID) + } + s.subscriberChannels = nil + } + + close(s.inboundChannel) + s.inboundChannel = nil + + close(s.stop) + s.stop = nil + }() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + + case <-(*s.ctx).Done(): + return + + case action, ok := <-s.inboundChannel: + if !ok { + return + } + + if s.subscriberChannels == nil { + continue + } + + if s.subscriberChannels[action.SourceID] == nil { + continue + } + + if s.subscriberChannels[action.SourceID][action.Name] == nil { + continue + } + + for _, c := range s.subscriberChannels[action.SourceID][action.Name] { + select { + case c <- action: + case <-(*s.ctx).Done(): + return + default: // drop event if no one is listening + continue + } + } + + case <-ticker.C: + if s.subscriberChannels == nil { + continue + } + + heartbeat := model.Action{ + SourceID: common.MessageBusSourceID, + Name: common.MessageBusHeartbeatName, + Timestamp: time.Now().Unix(), + } + + for _, source := range s.subscriberChannels { + for _, subscribers := range source { + for _, subscriber := range subscribers { + select { + case subscriber <- heartbeat: + case <-(*s.ctx).Done(): + return + default: // drop event if no one is listening + continue + } + } + } + } + } + } +} + +func NewActionServiceWS(actionTypeService *ActionTypeService) *ActionServiceWS { + return &ActionServiceWS{ + typeService: actionTypeService, + } +} diff --git a/service/action_type_service.go b/service/action_type_service.go index 7ca6cd0..91f139b 100644 --- a/service/action_type_service.go +++ b/service/action_type_service.go @@ -1,25 +1,14 @@ package service import ( - "context" "errors" - "sync" - "time" - "github.com/IceWhaleTech/CasaOS-Common/utils/logger" - "github.com/IceWhaleTech/CasaOS-MessageBus/common" "github.com/IceWhaleTech/CasaOS-MessageBus/model" "github.com/IceWhaleTech/CasaOS-MessageBus/repository" - "go.uber.org/zap" ) -type ActionService struct { - ctx *context.Context - mutex sync.Mutex - repository *repository.Repository - inboundChannel chan model.Action - subscriberChannels map[string]map[string][]chan model.Action - stop chan struct{} +type ActionTypeService struct { + repository *repository.Repository } var ( @@ -27,222 +16,26 @@ var ( ErrActionNameNotFound = errors.New("event name not found") ) -func (s *ActionService) GetActionTypes() ([]model.ActionType, error) { +func (s *ActionTypeService) GetActionTypes() ([]model.ActionType, error) { return (*s.repository).GetActionTypes() } -func (s *ActionService) RegisterActionType(actionType model.ActionType) (*model.ActionType, error) { +func (s *ActionTypeService) RegisterActionType(actionType model.ActionType) (*model.ActionType, error) { // TODO - ensure sourceID and name are URL safe return (*s.repository).RegisterActionType(actionType) } -func (s *ActionService) GetActionTypesBySourceID(sourceID string) ([]model.ActionType, error) { +func (s *ActionTypeService) GetActionTypesBySourceID(sourceID string) ([]model.ActionType, error) { return (*s.repository).GetActionTypesBySourceID(sourceID) } -func (s *ActionService) GetActionType(sourceID string, name string) (*model.ActionType, error) { +func (s *ActionTypeService) GetActionType(sourceID string, name string) (*model.ActionType, error) { return (*s.repository).GetActionType(sourceID, name) } -func (s *ActionService) Trigger(action model.Action) (*model.Action, error) { - if s.inboundChannel == nil { - return nil, ErrInboundChannelNotFound - } - - if action.Timestamp == 0 { - action.Timestamp = time.Now().Unix() - } - - // TODO - ensure properties are valid for event type - - select { - case s.inboundChannel <- action: - - case <-(*s.ctx).Done(): - return nil, (*s.ctx).Err() - - default: // drop event if no one is listening - } - - return &action, nil -} - -func (s *ActionService) Subscribe(sourceID string, names []string) (chan model.Action, error) { - if len(names) == 0 { - actionTypes, err := s.GetActionTypesBySourceID(sourceID) - if err != nil { - return nil, err - } - - for _, actionType := range actionTypes { - names = append(names, actionType.Name) - } - } - - for _, name := range names { - actionType, err := s.GetActionType(sourceID, name) - if err != nil { - return nil, err - } - - if actionType == nil { - return nil, ErrActionNameNotFound - } - } - - if s.subscriberChannels == nil { - s.subscriberChannels = make(map[string]map[string][]chan model.Action) - } - - if s.subscriberChannels[sourceID] == nil { - s.subscriberChannels[sourceID] = make(map[string][]chan model.Action) - } - - c := make(chan model.Action, 1) - - for _, name := range names { - if s.subscriberChannels[sourceID][name] == nil { - s.subscriberChannels[sourceID][name] = make([]chan model.Action, 0) - } - s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c) - } - - return c, nil -} - -func (s *ActionService) Unsubscribe(sourceID string, name string, c chan model.Action) error { - if s.subscriberChannels == nil { - return ErrSubscriberChannelsNotFound - } - - if s.subscriberChannels[sourceID] == nil { - return ErrActionSourceIDNotFound - } - - if s.subscriberChannels[sourceID][name] == nil { - return ErrActionNameNotFound - } - - for i, subscriber := range s.subscriberChannels[sourceID][name] { - s.mutex.Lock() - defer s.mutex.Unlock() - - if subscriber == c { - if i >= len(s.subscriberChannels[sourceID][name]) { - logger.Error("the i-th subscriber is removed before we get here - concurrency issue?", zap.Int("subscriber", i), zap.Int("total", len(s.subscriberChannels[sourceID][name]))) - return ErrAlreadySubscribed - } - s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name][:i], s.subscriberChannels[sourceID][name][i+1:]...) - return nil - } - } - - return nil -} - -func (s *ActionService) Start(ctx *context.Context) { - s.ctx = ctx - s.mutex = sync.Mutex{} - - s.inboundChannel = make(chan model.Action) - s.subscriberChannels = make(map[string]map[string][]chan model.Action) - s.stop = make(chan struct{}) - - defer func() { - if s.subscriberChannels != nil { - for sourceID, source := range s.subscriberChannels { - for actionName, subscribers := range source { - for _, subscriber := range subscribers { - select { - case _, ok := <-subscriber: - if ok { - close(subscriber) - } - default: - continue - } - } - delete(s.subscriberChannels[sourceID], actionName) - } - delete(s.subscriberChannels, sourceID) - } - s.subscriberChannels = nil - } - - close(s.inboundChannel) - s.inboundChannel = nil - - close(s.stop) - s.stop = nil - }() - - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - - case <-(*s.ctx).Done(): - return - - case action, ok := <-s.inboundChannel: - if !ok { - return - } - - if s.subscriberChannels == nil { - continue - } - - if s.subscriberChannels[action.SourceID] == nil { - continue - } - - if s.subscriberChannels[action.SourceID][action.Name] == nil { - continue - } - - for _, c := range s.subscriberChannels[action.SourceID][action.Name] { - select { - case c <- action: - case <-(*s.ctx).Done(): - return - default: // drop event if no one is listening - continue - } - } - - case <-ticker.C: - if s.subscriberChannels == nil { - continue - } - - heartbeat := model.Action{ - SourceID: common.MessageBusSourceID, - Name: common.MessageBusHeartbeatName, - Timestamp: time.Now().Unix(), - } - - for _, source := range s.subscriberChannels { - for _, subscribers := range source { - for _, subscriber := range subscribers { - select { - case subscriber <- heartbeat: - case <-(*s.ctx).Done(): - return - default: // drop event if no one is listening - continue - } - } - } - } - } - } -} - -func NewActionService(repository *repository.Repository) *ActionService { - return &ActionService{ +func NewActionTypeService(repository *repository.Repository) *ActionTypeService { + return &ActionTypeService{ repository: repository, } } diff --git a/service/event_service_socketio.go b/service/event_service_socketio.go new file mode 100644 index 0000000..bf7a6eb --- /dev/null +++ b/service/event_service_socketio.go @@ -0,0 +1,34 @@ +package service + +import ( + "context" + + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" + socketio "github.com/googollee/go-socket.io" + "go.uber.org/zap" +) + +type EventServiceSIO struct { + server *socketio.Server +} + +func (s *EventServiceSIO) Publish(event model.Event) { + s.server.BroadcastToNamespace("/", event.SourceID, event) +} + +func (s *EventServiceSIO) Start(ctx *context.Context) { + if err := s.server.Serve(); err != nil { + logger.Error("error when serving socketio for events", zap.Error(err)) + } +} + +func (s *EventServiceSIO) Server() *socketio.Server { + return s.server +} + +func NewEventServiceSIO() *EventServiceSIO { + return &EventServiceSIO{ + server: buildServer(), + } +} diff --git a/service/event_service_websocket.go b/service/event_service_websocket.go new file mode 100644 index 0000000..6707b96 --- /dev/null +++ b/service/event_service_websocket.go @@ -0,0 +1,227 @@ +package service + +import ( + "context" + "sync" + "time" + + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" + + "github.com/IceWhaleTech/CasaOS-MessageBus/common" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" + "go.uber.org/zap" +) + +type EventServiceWS struct { + typeService *EventTypeService + + ctx *context.Context + mutex sync.Mutex + stop chan struct{} + + inboundChannel chan model.Event + subscriberChannels map[string]map[string][]chan model.Event +} + +func (s *EventServiceWS) Publish(event model.Event) (*model.Event, error) { + if s.inboundChannel == nil { + return nil, ErrInboundChannelNotFound + } + + if event.Timestamp == 0 { + event.Timestamp = time.Now().Unix() + } + + // TODO - ensure properties are valid for event type + + select { + case s.inboundChannel <- event: + + case <-(*s.ctx).Done(): + return nil, (*s.ctx).Err() + + default: // drop event if no one is listening + } + + return &event, nil +} + +func (s *EventServiceWS) Subscribe(sourceID string, names []string) (chan model.Event, error) { + if len(names) == 0 { + eventTypes, err := s.typeService.GetEventTypesBySourceID(sourceID) + if err != nil { + return nil, err + } + + for _, eventType := range eventTypes { + names = append(names, eventType.Name) + } + } + + for _, name := range names { + eventType, err := s.typeService.GetEventType(sourceID, name) + if err != nil { + return nil, err + } + + if eventType == nil { + return nil, ErrEventNameNotFound + } + } + + if s.subscriberChannels == nil { + s.subscriberChannels = make(map[string]map[string][]chan model.Event) + } + + if s.subscriberChannels[sourceID] == nil { + s.subscriberChannels[sourceID] = make(map[string][]chan model.Event) + } + + c := make(chan model.Event, 1) + + for _, name := range names { + if s.subscriberChannels[sourceID][name] == nil { + s.subscriberChannels[sourceID][name] = make([]chan model.Event, 0) + } + s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c) + } + + return c, nil +} + +func (s *EventServiceWS) Unsubscribe(sourceID string, name string, c chan model.Event) error { + if s.subscriberChannels == nil { + return ErrSubscriberChannelsNotFound + } + + if s.subscriberChannels[sourceID] == nil { + return ErrEventSourceIDNotFound + } + + if s.subscriberChannels[sourceID][name] == nil { + return ErrEventNameNotFound + } + + for i, subscriber := range s.subscriberChannels[sourceID][name] { + s.mutex.Lock() + defer s.mutex.Unlock() + + if subscriber == c { + logger.Info("unsubscribing from event type", zap.String("sourceID", sourceID), zap.String("name", name), zap.Int("subscriber", i)) + if i >= len(s.subscriberChannels[sourceID][name]) { + logger.Error("the i-th subscriber is removed before we get here - concurrency issue?", zap.Int("subscriber", i), zap.Int("total", len(s.subscriberChannels[sourceID][name]))) + return ErrAlreadySubscribed + } + s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name][:i], s.subscriberChannels[sourceID][name][i+1:]...) + return nil + } + } + + return nil +} + +func (s *EventServiceWS) Start(ctx *context.Context) { + s.ctx = ctx + s.mutex = sync.Mutex{} + + s.inboundChannel = make(chan model.Event) + s.subscriberChannels = make(map[string]map[string][]chan model.Event) + s.stop = make(chan struct{}) + + defer func() { + if s.subscriberChannels != nil { + for sourceID, source := range s.subscriberChannels { + for eventName, subscribers := range source { + for _, subscriber := range subscribers { + select { + case _, ok := <-subscriber: + if ok { + close(subscriber) + } + default: + continue + } + } + delete(s.subscriberChannels[sourceID], eventName) + } + delete(s.subscriberChannels, sourceID) + } + s.subscriberChannels = nil + } + + close(s.inboundChannel) + s.inboundChannel = nil + + close(s.stop) + s.stop = nil + }() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + + case <-(*s.ctx).Done(): + return + + case event, ok := <-s.inboundChannel: + if !ok { + return + } + + if s.subscriberChannels == nil { + continue + } + + if s.subscriberChannels[event.SourceID] == nil { + continue + } + + if s.subscriberChannels[event.SourceID][event.Name] == nil { + continue + } + + for _, c := range s.subscriberChannels[event.SourceID][event.Name] { + select { + case c <- event: + case <-(*s.ctx).Done(): + return + default: // drop event if no one is listening + continue + } + } + + case <-ticker.C: + if s.subscriberChannels == nil { + continue + } + + heartbeat := model.Event{ + SourceID: common.MessageBusSourceID, + Name: common.MessageBusHeartbeatName, + Timestamp: time.Now().Unix(), + } + + for _, source := range s.subscriberChannels { + for _, subscribers := range source { + for _, subscriber := range subscribers { + select { + case subscriber <- heartbeat: + case <-(*s.ctx).Done(): + return + default: // drop event if no one is listening + continue + } + } + } + } + } + } +} + +func NewEventServiceWS(eventTypeService *EventTypeService) *EventServiceWS { + return &EventServiceWS{ + typeService: eventTypeService, + } +} diff --git a/service/event_type_service.go b/service/event_type_service.go index eda3c59..c8cbf86 100644 --- a/service/event_type_service.go +++ b/service/event_type_service.go @@ -1,25 +1,14 @@ package service import ( - "context" "errors" - "sync" - "time" - "github.com/IceWhaleTech/CasaOS-Common/utils/logger" - "github.com/IceWhaleTech/CasaOS-MessageBus/common" "github.com/IceWhaleTech/CasaOS-MessageBus/model" "github.com/IceWhaleTech/CasaOS-MessageBus/repository" - "go.uber.org/zap" ) -type EventService struct { - ctx *context.Context - mutex sync.Mutex - repository *repository.Repository - inboundChannel chan model.Event - subscriberChannels map[string]map[string][]chan model.Event - stop chan struct{} +type EventTypeService struct { + repository *repository.Repository } var ( @@ -27,223 +16,26 @@ var ( ErrEventNameNotFound = errors.New("event name not found") ) -func (s *EventService) GetEventTypes() ([]model.EventType, error) { +func (s *EventTypeService) GetEventTypes() ([]model.EventType, error) { return (*s.repository).GetEventTypes() } -func (s *EventService) RegisterEventType(eventType model.EventType) (*model.EventType, error) { +func (s *EventTypeService) RegisterEventType(eventType model.EventType) (*model.EventType, error) { // TODO - ensure sourceID and name are URL safe return (*s.repository).RegisterEventType(eventType) } -func (s *EventService) GetEventTypesBySourceID(sourceID string) ([]model.EventType, error) { +func (s *EventTypeService) GetEventTypesBySourceID(sourceID string) ([]model.EventType, error) { return (*s.repository).GetEventTypesBySourceID(sourceID) } -func (s *EventService) GetEventType(sourceID string, name string) (*model.EventType, error) { +func (s *EventTypeService) GetEventType(sourceID string, name string) (*model.EventType, error) { return (*s.repository).GetEventType(sourceID, name) } -func (s *EventService) Publish(event model.Event) (*model.Event, error) { - if s.inboundChannel == nil { - return nil, ErrInboundChannelNotFound - } - - if event.Timestamp == 0 { - event.Timestamp = time.Now().Unix() - } - - // TODO - ensure properties are valid for event type - - select { - case s.inboundChannel <- event: - - case <-(*s.ctx).Done(): - return nil, (*s.ctx).Err() - - default: // drop event if no one is listening - } - - return &event, nil -} - -func (s *EventService) Subscribe(sourceID string, names []string) (chan model.Event, error) { - if len(names) == 0 { - eventTypes, err := s.GetEventTypesBySourceID(sourceID) - if err != nil { - return nil, err - } - - for _, eventType := range eventTypes { - names = append(names, eventType.Name) - } - } - - for _, name := range names { - eventType, err := s.GetEventType(sourceID, name) - if err != nil { - return nil, err - } - - if eventType == nil { - return nil, ErrEventNameNotFound - } - } - - if s.subscriberChannels == nil { - s.subscriberChannels = make(map[string]map[string][]chan model.Event) - } - - if s.subscriberChannels[sourceID] == nil { - s.subscriberChannels[sourceID] = make(map[string][]chan model.Event) - } - - c := make(chan model.Event, 1) - - for _, name := range names { - if s.subscriberChannels[sourceID][name] == nil { - s.subscriberChannels[sourceID][name] = make([]chan model.Event, 0) - } - s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c) - } - - return c, nil -} - -func (s *EventService) Unsubscribe(sourceID string, name string, c chan model.Event) error { - if s.subscriberChannels == nil { - return ErrSubscriberChannelsNotFound - } - - if s.subscriberChannels[sourceID] == nil { - return ErrEventSourceIDNotFound - } - - if s.subscriberChannels[sourceID][name] == nil { - return ErrEventNameNotFound - } - - for i, subscriber := range s.subscriberChannels[sourceID][name] { - s.mutex.Lock() - defer s.mutex.Unlock() - - if subscriber == c { - logger.Info("unsubscribing from event type", zap.String("sourceID", sourceID), zap.String("name", name), zap.Int("subscriber", i)) - if i >= len(s.subscriberChannels[sourceID][name]) { - logger.Error("the i-th subscriber is removed before we get here - concurrency issue?", zap.Int("subscriber", i), zap.Int("total", len(s.subscriberChannels[sourceID][name]))) - return ErrAlreadySubscribed - } - s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name][:i], s.subscriberChannels[sourceID][name][i+1:]...) - return nil - } - } - - return nil -} - -func (s *EventService) Start(ctx *context.Context) { - s.ctx = ctx - s.mutex = sync.Mutex{} - - s.inboundChannel = make(chan model.Event) - s.subscriberChannels = make(map[string]map[string][]chan model.Event) - s.stop = make(chan struct{}) - - defer func() { - if s.subscriberChannels != nil { - for sourceID, source := range s.subscriberChannels { - for eventName, subscribers := range source { - for _, subscriber := range subscribers { - select { - case _, ok := <-subscriber: - if ok { - close(subscriber) - } - default: - continue - } - } - delete(s.subscriberChannels[sourceID], eventName) - } - delete(s.subscriberChannels, sourceID) - } - s.subscriberChannels = nil - } - - close(s.inboundChannel) - s.inboundChannel = nil - - close(s.stop) - s.stop = nil - }() - - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - - case <-(*s.ctx).Done(): - return - - case event, ok := <-s.inboundChannel: - if !ok { - return - } - - if s.subscriberChannels == nil { - continue - } - - if s.subscriberChannels[event.SourceID] == nil { - continue - } - - if s.subscriberChannels[event.SourceID][event.Name] == nil { - continue - } - - for _, c := range s.subscriberChannels[event.SourceID][event.Name] { - select { - case c <- event: - case <-(*s.ctx).Done(): - return - default: // drop event if no one is listening - continue - } - } - - case <-ticker.C: - if s.subscriberChannels == nil { - continue - } - - heartbeat := model.Event{ - SourceID: common.MessageBusSourceID, - Name: common.MessageBusHeartbeatName, - Timestamp: time.Now().Unix(), - } - - for _, source := range s.subscriberChannels { - for _, subscribers := range source { - for _, subscriber := range subscribers { - select { - case subscriber <- heartbeat: - case <-(*s.ctx).Done(): - return - default: // drop event if no one is listening - continue - } - } - } - } - } - } -} - -func NewEventService(repository *repository.Repository) *EventService { - return &EventService{ +func NewEventTypeService(repository *repository.Repository) *EventTypeService { + return &EventTypeService{ repository: repository, } } diff --git a/service/event_type_service_test.go b/service/event_type_service_test.go index d7d2f22..2d07760 100644 --- a/service/event_type_service_test.go +++ b/service/event_type_service_test.go @@ -18,21 +18,22 @@ func TestEventTypeService(t *testing.T) { assert.NilError(t, err) defer repository.Close() - // new service - service := NewEventService(&repository) + // new typeService + typeService := NewEventTypeService(&repository) + wsService := NewEventServiceWS(typeService) // new context ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go service.Start(&ctx) + go wsService.Start(&ctx) sourceID := "Foo" eventNames := []string{"Bar", "Baz"} // register event type for _, name := range eventNames { - _, err = service.RegisterEventType(model.EventType{ + _, err = typeService.RegisterEventType(model.EventType{ SourceID: sourceID, Name: name, PropertyTypeList: []model.PropertyType{{Name: "Property1"}, {Name: "Property2"}}, @@ -42,25 +43,25 @@ func TestEventTypeService(t *testing.T) { assert.NilError(t, err) // get event types - eventTypes, err := service.GetEventTypes() + eventTypes, err := typeService.GetEventTypes() assert.NilError(t, err) assert.Equal(t, len(eventTypes), 2) // get event types by source id - eventTypes, err = service.GetEventTypesBySourceID(sourceID) + eventTypes, err = typeService.GetEventTypesBySourceID(sourceID) assert.NilError(t, err) assert.Equal(t, len(eventTypes), 2) // get event type for _, name := range eventNames { - eventType, err := service.GetEventType(sourceID, name) + eventType, err := typeService.GetEventType(sourceID, name) assert.NilError(t, err) assert.Equal(t, eventType.SourceID, sourceID) assert.Equal(t, eventType.Name, name) } // subscribe event type - channel, err := service.Subscribe(sourceID, eventNames) + channel, err := wsService.Subscribe(sourceID, eventNames) assert.NilError(t, err) outputChannel := make(chan model.Event) @@ -89,7 +90,7 @@ func TestEventTypeService(t *testing.T) { // }, } - actualEvent1, err := service.Publish(expectedEvent) + actualEvent1, err := wsService.Publish(expectedEvent) assert.NilError(t, err) assert.DeepEqual(t, model.Event{SourceID: actualEvent1.SourceID, Name: actualEvent1.Name, Properties: actualEvent1.Properties}, expectedEvent) diff --git a/service/services.go b/service/services.go index 0aa0afd..305e7d9 100644 --- a/service/services.go +++ b/service/services.go @@ -3,13 +3,26 @@ package service import ( "context" "errors" + "net/http" + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" "github.com/IceWhaleTech/CasaOS-MessageBus/repository" + socketio "github.com/googollee/go-socket.io" + "github.com/googollee/go-socket.io/engineio" + "github.com/googollee/go-socket.io/engineio/transport" + "github.com/googollee/go-socket.io/engineio/transport/polling" + "github.com/googollee/go-socket.io/engineio/transport/websocket" + "go.uber.org/zap" ) type Services struct { - EventService *EventService - ActionService *ActionService + EventTypeService *EventTypeService + EventServiceWS *EventServiceWS + EventServiceSIO *EventServiceSIO + + ActionTypeService *ActionTypeService + ActionServiceWS *ActionServiceWS + ActionServiceSIO *ActionServiceSIO } var ( @@ -19,13 +32,59 @@ var ( ) func (s *Services) Start(ctx *context.Context) { - go s.EventService.Start(ctx) - go s.ActionService.Start(ctx) + go s.EventServiceWS.Start(ctx) + go s.ActionServiceWS.Start(ctx) + + go s.EventServiceSIO.Start(ctx) + go s.ActionServiceSIO.Start(ctx) } func NewServices(repository *repository.Repository) Services { + eventTypeService := NewEventTypeService(repository) + actionTypeService := NewActionTypeService(repository) + return Services{ - EventService: NewEventService(repository), - ActionService: NewActionService(repository), + EventTypeService: eventTypeService, + EventServiceWS: NewEventServiceWS(eventTypeService), + EventServiceSIO: NewEventServiceSIO(), + + ActionTypeService: actionTypeService, + ActionServiceWS: NewActionServiceWS(actionTypeService), + ActionServiceSIO: NewActionServiceSIO(), + } +} + +func buildServer() *socketio.Server { + websocketTransport := websocket.Default + websocketTransport.CheckOrigin = func(r *http.Request) bool { + return true // TODO remove this debug setting } + + pollingTransport := polling.Default + pollingTransport.CheckOrigin = func(r *http.Request) bool { + return true // TODO remove this debug setting + } + + server := socketio.NewServer(&engineio.Options{ + Transports: []transport.Transport{ + websocketTransport, + pollingTransport, + }, + }) + + server.OnConnect("/", func(s socketio.Conn) error { + s.SetContext("") + logger.Info("a socketio connection has started", zap.Any("remote_addr", s.RemoteAddr())) + return nil + }) + + server.OnError("/", func(s socketio.Conn, e error) { + logger.Error("error in socketio connnection", zap.Any("error", e)) + }) + + server.OnDisconnect("/", func(s socketio.Conn, reason string) { + logger.Info("a socketio connection is disconnected", zap.Any("reason", reason)) + }) + + return server }