diff --git a/api/message_bus/openapi.yaml b/api/message_bus/openapi.yaml index a327265..3ee61f4 100644 --- a/api/message_bus/openapi.yaml +++ b/api/message_bus/openapi.yaml @@ -26,6 +26,14 @@ tags: description: |- methods for managing action types, as well as publishing and subscribing to actions + - name: Websocket Endpoints + description: |- + Endpoint paths for subscribing to events and actions via WebSocket + + - name: SocketIO Endpoints + description: |- + Endpoint paths for subscribing to events and actions via SocketIO + - name: EventType description: |- @@ -42,6 +50,10 @@ tags: description: |- + - name: PropertyType + description: |- + + x-tagGroups: - name: Methods tags: @@ -50,12 +62,18 @@ x-tagGroups: - ActionType methods - Action methods + - name: Endpoints + tags: + - WebSocket Endpoints + - SocketIO Endpoints + - name: Schemas tags: - EventType - Event - ActionType - Action + - PropertyType security: - access_token: [] @@ -155,7 +173,7 @@ paths: > `names` can be specified in the query string to subscribe to specific event types. operationId: subscribeEventWS tags: - - Event methods + - WebSocket Endpoints parameters: - $ref: "#/components/parameters/SourceID" - $ref: "#/components/parameters/EventNames" @@ -164,34 +182,6 @@ paths: description: | The connection will be upgraded to a WebSocket connection and the client will receive events as they are published. - /event: - get: - summary: Subscribe to events by source ID (SocketIO) - description: | - Subscribe to events by `source_id` via SocketIO. - - > `source_id` should be specified as the SocketIO *event* (which is a different from *event* in message bus) when listening. - operationId: subscribeEventSIO - tags: - - Event methods - responses: - "101": - description: | - The connection will be upgraded to a SocketIO connection and the client will receive events as they are published. - - /event/: - get: - summary: Subscribe to all event types (SocketIO) - description: | - Subscribe to all event types via SocketIO. - > In URI definition, `/event` and `/event/` are different paths. This endpoint is a workaround to maximize compatibility. - operationId: subscribeEventSIO2 - responses: - "101": - description: | - The connection will be upgraded to a SocketIO connection and the client will receive events as they are published. - - /action_type: get: summary: List action types @@ -284,7 +274,7 @@ paths: > `names` can be specified in the query string to subscribe to specific action types. operationId: subscribeActionWS tags: - - Action methods + - WebSocket Endpoints parameters: - $ref: "#/components/parameters/SourceID" - $ref: "#/components/parameters/ActionNames" @@ -293,32 +283,60 @@ paths: description: | The connection will be upgraded to a WebSocket connection and the client will receive actions as they are triggered. - /action: + /socket.io: get: - summary: Subscribe to actions by source ID (SocketIO) + summary: Subscribe to events and actions (SocketIO) description: | - Subscribe to actions by `source_id` via SocketIO. - - > `source_id` should be specified as the SocketIO *event* (which is a different from *event* in message bus) when listening. - operationId: subscribeActionSIO + Subscribe to events and actions via SocketIO. + + - SocketIO `room` should be either `event` or `action` + + operationId: subscribeSIO tags: - - Action methods + - SocketIO Endpoints responses: "101": description: | - The connection will be upgraded to a SocketIO connection and the client will receive actions as they are triggered. + The connection will be upgraded to a SocketIO connection and the client will receive events as they are published. + "200": + description: | + Polling initialized for SocketIO - /action/: + post: + summary: Poll events and actions (SocketIO) + description: | + Poll events and actions by via SocketIO. + operationId: pollSIO + tags: + - SocketIO Endpoints + responses: + "200": + description: | + Polling continued from SocketIO + + /socket.io/: get: - summary: Subscribe to all action types (SocketIO) + summary: Subscribe to events and actions (SocketIO) description: | - Subscribe to all action types via SocketIO. - > In URI definition, `/action` and `/action/` are different paths. This endpoint is a workaround to maximize compatibility. - operationId: subscribeActionSIO2 + > Same as `/socket.io` except it comes with a trailing slash `/` - a workaround to maximize compatibility + operationId: subscribeSIO2 responses: "101": description: | - The connection will be upgraded to a SocketIO connection and the client will receive actions as they are triggered. + The connection will be upgraded to a SocketIO connection and the client will receive events as they are published. + "200": + description: | + Polling initialized for SocketIO + + post: + summary: Poll events and actions (SocketIO) + description: | + > Same as `/socket.io` except it comes with a trailing slash `/` - a workaround to maximize compatibility + operationId: pollSIO2 + responses: + "200": + description: | + Polling continued from SocketIO components: securitySchemes: @@ -527,7 +545,14 @@ components: properties: name: type: string - description: property name + description: |- + property name + + > It is recommended for a property name to be as descriptive as possible. One option is to prefix with a namespace. + > - If the property is source specific, prefix with source ID. For example, `local-storage:vendor` + > - Otherwise, prefix with `common:`. For example, `common:email` + > + > Some bad examples are `id`, `avail`, `blk`...which can be ambiguous and confusing. example: "local-storage:vendor" description: type: string diff --git a/go.mod b/go.mod index 286b4b2..93cc5bd 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang-jwt/jwt/v4 v4.4.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/gomodule/redigo v1.8.4 // indirect + github.com/gomodule/redigo v2.0.0+incompatible // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect diff --git a/go.sum b/go.sum index d15fe24..cc811d9 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,9 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomodule/redigo v1.8.4 h1:Z5JUg94HMTR1XpwBaSH4vq3+PNSIykBLxMdglbw10gg= github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= +github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= diff --git a/route/api_route_action.go b/route/api_route_action.go index 08d1a75..556fd63 100644 --- a/route/api_route_action.go +++ b/route/api_route_action.go @@ -100,6 +100,8 @@ func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name Timestamp: utils.Ptr(time.Now()), } + go r.services.SocketIOService.Publish(in.ActionAdapter(action)) + result, err := r.services.ActionServiceWS.Trigger(in.ActionAdapter(action)) if err != nil { message := err.Error() @@ -199,14 +201,3 @@ func (r *APIRoute) SubscribeActionWS(c echo.Context, sourceID codegen.SourceID, return nil } - -func (r *APIRoute) SubscribeActionSIO(ctx echo.Context) error { - server := r.services.ActionServiceSIO.Server() - server.ServeHTTP(ctx.Response(), ctx.Request()) - return nil -} - -// unfortunately need to duplicate this func to support both `/action` and `/action/` API endpoints -func (r *APIRoute) SubscribeActionSIO2(ctx echo.Context) error { - return r.SubscribeActionSIO(ctx) -} diff --git a/route/api_route_event.go b/route/api_route_event.go index 365146d..27ff3d8 100644 --- a/route/api_route_event.go +++ b/route/api_route_event.go @@ -112,7 +112,7 @@ func (r *APIRoute) PublishEvent(ctx echo.Context, sourceID codegen.SourceID, nam Uuid: &uuidStr, } - go r.services.EventServiceSIO.Publish(in.EventAdapter(event)) + go r.services.SocketIOService.Publish(in.EventAdapter(event)) result, err := r.services.EventServiceWS.Publish(in.EventAdapter(event)) if err != nil { @@ -217,14 +217,3 @@ func (r *APIRoute) SubscribeEventWS(c echo.Context, sourceID codegen.SourceID, p return nil } - -func (r *APIRoute) SubscribeEventSIO(ctx echo.Context) error { - server := r.services.EventServiceSIO.Server() - server.ServeHTTP(ctx.Response(), ctx.Request()) - return nil -} - -// unfortunately need to duplicate the func to support both `/event` and `/event/` API endpoints -func (r *APIRoute) SubscribeEventSIO2(ctx echo.Context) error { - return r.SubscribeEventSIO(ctx) -} diff --git a/route/api_route_socketio.go b/route/api_route_socketio.go new file mode 100644 index 0000000..fcd0bf1 --- /dev/null +++ b/route/api_route_socketio.go @@ -0,0 +1,25 @@ +package route + +import "github.com/labstack/echo/v4" + +func (r *APIRoute) SubscribeSIO(ctx echo.Context) error { + server := r.services.SocketIOService.Server() + server.ServeHTTP(ctx.Response(), ctx.Request()) + return nil +} + +// unfortunately need to duplicate the func to support both `/socket.io` and `/socket.io/` (with a trailing slash) API endpoints +func (r *APIRoute) SubscribeSIO2(ctx echo.Context) error { + return r.SubscribeSIO(ctx) +} + +func (r *APIRoute) PollSIO(ctx echo.Context) error { + server := r.services.SocketIOService.Server() + server.ServeHTTP(ctx.Response(), ctx.Request()) + return nil +} + +// unfortunately need to duplicate the func to support both `/socket.io` and `/socket.io/` (with a trailing slash) API endpoints +func (r *APIRoute) PollSIO2(ctx echo.Context) error { + return r.PollSIO(ctx) +} diff --git a/service/action_service_socketio.go b/service/action_service_socketio.go deleted file mode 100644 index 71d98ca..0000000 --- a/service/action_service_socketio.go +++ /dev/null @@ -1,34 +0,0 @@ -package service - -import ( - "context" - - "github.com/IceWhaleTech/CasaOS-Common/utils/logger" - "github.com/IceWhaleTech/CasaOS-MessageBus/model" - socketio "github.com/googollee/go-socket.io" - "go.uber.org/zap" -) - -type ActionServiceSIO struct { - server *socketio.Server -} - -func (s *ActionServiceSIO) Trigger(action model.Action) { - s.server.BroadcastToNamespace("/", action.SourceID, action) -} - -func (s *ActionServiceSIO) Start(ctx *context.Context) { - if err := s.server.Serve(); err != nil { - logger.Error("error when serving socketio for actions", zap.Error(err)) - } -} - -func (s *ActionServiceSIO) Server() *socketio.Server { - return s.server -} - -func NewActionServiceSIO() *ActionServiceSIO { - return &ActionServiceSIO{ - server: buildServer(), - } -} diff --git a/service/event_service_socketio.go b/service/event_service_socketio.go deleted file mode 100644 index bf7a6eb..0000000 --- a/service/event_service_socketio.go +++ /dev/null @@ -1,34 +0,0 @@ -package service - -import ( - "context" - - "github.com/IceWhaleTech/CasaOS-Common/utils/logger" - "github.com/IceWhaleTech/CasaOS-MessageBus/model" - socketio "github.com/googollee/go-socket.io" - "go.uber.org/zap" -) - -type EventServiceSIO struct { - server *socketio.Server -} - -func (s *EventServiceSIO) Publish(event model.Event) { - s.server.BroadcastToNamespace("/", event.SourceID, event) -} - -func (s *EventServiceSIO) Start(ctx *context.Context) { - if err := s.server.Serve(); err != nil { - logger.Error("error when serving socketio for events", zap.Error(err)) - } -} - -func (s *EventServiceSIO) Server() *socketio.Server { - return s.server -} - -func NewEventServiceSIO() *EventServiceSIO { - return &EventServiceSIO{ - server: buildServer(), - } -} diff --git a/service/services.go b/service/services.go index 305e7d9..34d5e76 100644 --- a/service/services.go +++ b/service/services.go @@ -3,26 +3,18 @@ package service import ( "context" "errors" - "net/http" - "github.com/IceWhaleTech/CasaOS-Common/utils/logger" "github.com/IceWhaleTech/CasaOS-MessageBus/repository" - socketio "github.com/googollee/go-socket.io" - "github.com/googollee/go-socket.io/engineio" - "github.com/googollee/go-socket.io/engineio/transport" - "github.com/googollee/go-socket.io/engineio/transport/polling" - "github.com/googollee/go-socket.io/engineio/transport/websocket" - "go.uber.org/zap" ) type Services struct { EventTypeService *EventTypeService EventServiceWS *EventServiceWS - EventServiceSIO *EventServiceSIO ActionTypeService *ActionTypeService ActionServiceWS *ActionServiceWS - ActionServiceSIO *ActionServiceSIO + + SocketIOService *SocketIOService } var ( @@ -35,8 +27,7 @@ func (s *Services) Start(ctx *context.Context) { go s.EventServiceWS.Start(ctx) go s.ActionServiceWS.Start(ctx) - go s.EventServiceSIO.Start(ctx) - go s.ActionServiceSIO.Start(ctx) + go s.SocketIOService.Start(ctx) } func NewServices(repository *repository.Repository) Services { @@ -46,45 +37,9 @@ func NewServices(repository *repository.Repository) Services { return Services{ EventTypeService: eventTypeService, EventServiceWS: NewEventServiceWS(eventTypeService), - EventServiceSIO: NewEventServiceSIO(), + SocketIOService: NewSocketIOService(), ActionTypeService: actionTypeService, ActionServiceWS: NewActionServiceWS(actionTypeService), - ActionServiceSIO: NewActionServiceSIO(), - } -} - -func buildServer() *socketio.Server { - websocketTransport := websocket.Default - websocketTransport.CheckOrigin = func(r *http.Request) bool { - return true // TODO remove this debug setting - } - - pollingTransport := polling.Default - pollingTransport.CheckOrigin = func(r *http.Request) bool { - return true // TODO remove this debug setting } - - server := socketio.NewServer(&engineio.Options{ - Transports: []transport.Transport{ - websocketTransport, - pollingTransport, - }, - }) - - server.OnConnect("/", func(s socketio.Conn) error { - s.SetContext("") - logger.Info("a socketio connection has started", zap.Any("remote_addr", s.RemoteAddr())) - return nil - }) - - server.OnError("/", func(s socketio.Conn, e error) { - logger.Error("error in socketio connnection", zap.Any("error", e)) - }) - - server.OnDisconnect("/", func(s socketio.Conn, reason string) { - logger.Info("a socketio connection is disconnected", zap.Any("reason", reason)) - }) - - return server } diff --git a/service/socketio_service.go b/service/socketio_service.go new file mode 100644 index 0000000..c772493 --- /dev/null +++ b/service/socketio_service.go @@ -0,0 +1,88 @@ +package service + +import ( + "context" + "net/http" + + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" + socketio "github.com/googollee/go-socket.io" + "github.com/googollee/go-socket.io/engineio" + "github.com/googollee/go-socket.io/engineio/transport" + "github.com/googollee/go-socket.io/engineio/transport/polling" + "github.com/googollee/go-socket.io/engineio/transport/websocket" + "go.uber.org/zap" +) + +type SocketIOService struct { + server *socketio.Server +} + +func (s *SocketIOService) Publish(entity interface{}) { + if event, ok := entity.(model.Event); ok { + s.server.BroadcastToRoom("/", "event", event.Name, event) + return + } + + if action, ok := entity.(model.Action); ok { + s.server.BroadcastToRoom("/", "action", action.Name, action) + return + } + + logger.Error("unknown entity type", zap.Any("entity", entity)) +} + +func (s *SocketIOService) Start(ctx *context.Context) { + if err := s.server.Serve(); err != nil { + logger.Error("error when serving socketio for events", zap.Error(err)) + } +} + +func (s *SocketIOService) Server() *socketio.Server { + return s.server +} + +func NewSocketIOService() *SocketIOService { + return &SocketIOService{ + server: buildServer(), + } +} + +func buildServer() *socketio.Server { + websocketTransport := websocket.Default + websocketTransport.CheckOrigin = func(r *http.Request) bool { + return true // TODO remove this debug setting + } + + pollingTransport := polling.Default + pollingTransport.CheckOrigin = func(r *http.Request) bool { + return true // TODO remove this debug setting + } + + server := socketio.NewServer(&engineio.Options{ + Transports: []transport.Transport{ + websocketTransport, + pollingTransport, + }, + }) + + server.OnConnect("/", func(s socketio.Conn) error { + s.SetContext("") + logger.Info("a socketio connection has started", zap.Any("remote_addr", s.RemoteAddr())) + + s.Join("event") + s.Join("action") + + return nil + }) + + server.OnError("/", func(s socketio.Conn, e error) { + logger.Error("error in socketio connnection", zap.Any("error", e)) + }) + + server.OnDisconnect("/", func(s socketio.Conn, reason string) { + logger.Info("a socketio connection is disconnected", zap.Any("reason", reason)) + }) + + return server +}