Skip to content

Commit

Permalink
add socket-io support (#22)
Browse files Browse the repository at this point in the history
Signed-off-by: Tiger Wang <[email protected]>
  • Loading branch information
tigerinus authored Jan 10, 2023
1 parent bc6e73d commit 2e9ce83
Show file tree
Hide file tree
Showing 15 changed files with 758 additions and 492 deletions.
71 changes: 65 additions & 6 deletions api/message_bus/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,12 @@ paths:

/event/{source_id}:
get:
summary: Subscribe to an event type (WebSocket)
summary: Subscribe to events by source ID (WebSocket)
description: |
Subscribe to an event type by source ID and name via WebSocket.
operationId: subscribeEvent
Subscribe to event by `source_id` via WebSocket.
> `names` can be specified in the query string to subscribe to specific event types.
operationId: subscribeEventWS
tags:
- Event methods
parameters:
Expand All @@ -162,6 +164,34 @@ paths:
description: |
The connection will be upgraded to a WebSocket connection and the client will receive events as they are published.
/event:
get:
summary: Subscribe to events by source ID (SocketIO)
description: |
Subscribe to events by `source_id` via SocketIO.
> `source_id` should be specified as the SocketIO *event* (which is a different from *event* in message bus) when listening.
operationId: subscribeEventSIO
tags:
- Event methods
responses:
"101":
description: |
The connection will be upgraded to a SocketIO connection and the client will receive events as they are published.
/event/:
get:
summary: Subscribe to all event types (SocketIO)
description: |
Subscribe to all event types via SocketIO.
> In URI definition, `/event` and `/event/` are different paths. This endpoint is a workaround to maximize compatibility.
operationId: subscribeEventSIO2
responses:
"101":
description: |
The connection will be upgraded to a SocketIO connection and the client will receive events as they are published.
/action_type:
get:
summary: List action types
Expand Down Expand Up @@ -247,10 +277,12 @@ paths:

/action/{source_id}:
get:
summary: Subscribe to an action type (WebSocket)
summary: Subscribe to actions by source ID (WebSocket)
description: |
Subscribe to an action type by source ID and name via WebSocket.
operationId: subscribeAction
Subscribe to actions by `source_id` via WebSocket.
> `names` can be specified in the query string to subscribe to specific action types.
operationId: subscribeActionWS
tags:
- Action methods
parameters:
Expand All @@ -261,6 +293,33 @@ paths:
description: |
The connection will be upgraded to a WebSocket connection and the client will receive actions as they are triggered.
/action:
get:
summary: Subscribe to actions by source ID (SocketIO)
description: |
Subscribe to actions by `source_id` via SocketIO.
> `source_id` should be specified as the SocketIO *event* (which is a different from *event* in message bus) when listening.
operationId: subscribeActionSIO
tags:
- Action methods
responses:
"101":
description: |
The connection will be upgraded to a SocketIO connection and the client will receive actions as they are triggered.
/action/:
get:
summary: Subscribe to all action types (SocketIO)
description: |
Subscribe to all action types via SocketIO.
> In URI definition, `/action` and `/action/` are different paths. This endpoint is a workaround to maximize compatibility.
operationId: subscribeActionSIO2
responses:
"101":
description: |
The connection will be upgraded to a SocketIO connection and the client will receive actions as they are triggered.
components:
securitySchemes:
access_token:
Expand Down
2 changes: 1 addition & 1 deletion common/constants.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package common

const (
MessageBusVersion = "0.4.1"
MessageBusVersion = "0.4.2"
MessageBusServiceName = "message-bus"

MessageBusSourceID = "message-bus"
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ require (
github.com/gobwas/pool v0.2.1 // indirect
github.com/goccy/go-json v0.10.0 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gofrs/uuid v4.0.0+incompatible // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang-jwt/jwt/v4 v4.4.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.8.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.15.13 // indirect
Expand Down Expand Up @@ -64,6 +67,7 @@ require (
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/google/uuid v1.3.0
github.com/googollee/go-socket.io v1.6.2
github.com/invopop/yaml v0.2.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/labstack/echo/v4 v4.10.0
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ github.com/goccy/go-json v0.10.0/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MG
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY=
github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I=
github.com/golang-jwt/jwt/v4 v4.4.3 h1:Hxl6lhQFj4AnOX6MLrsCb/+7tCj7DxP7VA+2rDIq5AU=
Expand All @@ -69,6 +71,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.8.4 h1:Z5JUg94HMTR1XpwBaSH4vq3+PNSIykBLxMdglbw10gg=
github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
Expand All @@ -77,8 +81,13 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googollee/go-socket.io v1.6.2 h1:olKLLHJtHz1IkL/OrTyNriZZvVQYEORNkJAqsOwPask=
github.com/googollee/go-socket.io v1.6.2/go.mod h1:0vGP8/dXR9SZUMMD4+xxaGo/lohOw3YWMh2WRiWeKxg=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q=
github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY=
Expand Down Expand Up @@ -160,6 +169,7 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down Expand Up @@ -249,6 +259,7 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
49 changes: 30 additions & 19 deletions route/api_route_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func (r *APIRoute) GetActionTypes(c echo.Context) error {
actionType, err := r.services.ActionService.GetActionTypes()
actionType, err := r.services.ActionTypeService.GetActionTypes()
if err != nil {
message := err.Error()
return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message})
Expand All @@ -42,7 +42,7 @@ func (r *APIRoute) RegisterActionTypes(c echo.Context) error {
}

for _, actionType := range actionTypes {
_, err := r.services.ActionService.RegisterActionType(in.ActionTypeAdapter(actionType))
_, err := r.services.ActionTypeService.RegisterActionType(in.ActionTypeAdapter(actionType))
if err != nil {
message := err.Error()
return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message})
Expand All @@ -53,7 +53,7 @@ func (r *APIRoute) RegisterActionTypes(c echo.Context) error {
}

func (r *APIRoute) GetActionTypesBySourceID(c echo.Context, sourceID codegen.SourceID) error {
results, err := r.services.ActionService.GetActionTypesBySourceID(sourceID)
results, err := r.services.ActionTypeService.GetActionTypesBySourceID(sourceID)
if err != nil {
message := err.Error()
return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message})
Expand All @@ -63,7 +63,7 @@ func (r *APIRoute) GetActionTypesBySourceID(c echo.Context, sourceID codegen.Sou
}

func (r *APIRoute) GetActionType(c echo.Context, sourceID codegen.SourceID, name codegen.EventName) error {
result, err := r.services.ActionService.GetActionType(sourceID, name)
result, err := r.services.ActionTypeService.GetActionType(sourceID, name)
if err != nil {
message := err.Error()
return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message})
Expand All @@ -77,7 +77,7 @@ func (r *APIRoute) GetActionType(c echo.Context, sourceID codegen.SourceID, name
}

func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name codegen.EventName) error {
actionType, err := r.services.ActionService.GetActionType(sourceID, name)
actionType, err := r.services.ActionTypeService.GetActionType(sourceID, name)
if err != nil {
message := err.Error()
return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message})
Expand All @@ -100,7 +100,7 @@ func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name
Timestamp: utils.Ptr(time.Now()),
}

result, err := r.services.ActionService.Trigger(in.ActionAdapter(action))
result, err := r.services.ActionServiceWS.Trigger(in.ActionAdapter(action))
if err != nil {
message := err.Error()
return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message})
Expand All @@ -109,11 +109,11 @@ func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name
return c.JSON(http.StatusOK, out.ActionAdapter(*result))
}

func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionParams) error {
func (r *APIRoute) SubscribeActionWS(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionWSParams) error {
var actionNames []string
if params.Names != nil {
for _, actionName := range *params.Names {
actionType, err := r.services.ActionService.GetActionType(sourceID, actionName)
actionType, err := r.services.ActionTypeService.GetActionType(sourceID, actionName)
if err != nil {
message := err.Error()
return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message})
Expand All @@ -126,7 +126,7 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa
actionNames = append(actionNames, actionName)
}
} else {
actionTypes, err := r.services.ActionService.GetActionTypesBySourceID(sourceID)
actionTypes, err := r.services.ActionTypeService.GetActionTypesBySourceID(sourceID)
if err != nil {
message := err.Error()
return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message})
Expand All @@ -143,7 +143,7 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa
return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message})
}

channel, err := r.services.ActionService.Subscribe(sourceID, actionNames)
channel, err := r.services.ActionServiceWS.Subscribe(sourceID, actionNames)
if err != nil {
conn.Close() // need to close connection here, instead of defer, because of the goroutine
message := err.Error()
Expand All @@ -155,42 +155,42 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa
defer close(channel)
defer func(actionNames []string) {
for _, name := range actionNames {
if err := r.services.ActionService.Unsubscribe(sourceID, name, channel); err != nil {
logger.Error("error when trying to unsubscribe an action type", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name))
if err := r.services.ActionServiceWS.Unsubscribe(sourceID, name, channel); err != nil {
logger.Error("error when trying to unsubscribe an action type via websocket", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name))
}
}
}(actionNames)

logger.Info("started", zap.String("remote_addr", conn.RemoteAddr().String()))
logger.Info("a websocket connection has started for actions", zap.String("remote_addr", conn.RemoteAddr().String()))

for {
action, ok := <-channel
if !ok {
logger.Info("channel closed")
logger.Info("websocket channel for events is closed")
return
}

if action.SourceID == common.MessageBusSourceID && action.Name == common.MessageBusHeartbeatName {
if err := wsutil.WriteServerMessage(conn, ws.OpPing, []byte{}); err != nil {
logger.Error("error when trying to send ping message", zap.Error(err))
logger.Error("error when trying to send ping message via websocket", zap.Error(err))
return
}
continue
}

message, err := json.Marshal(out.ActionAdapter(action))
if err != nil {
logger.Error("error when trying to marshal action", zap.Error(err))
logger.Error("error when trying to marshal action for websocket", zap.Error(err))
continue
}

logger.Info("sending", zap.String("remote_addr", conn.RemoteAddr().String()), zap.String("message", string(message)))
logger.Info("sending action via websocket", zap.String("remote_addr", conn.RemoteAddr().String()), zap.String("message", string(message)))

if err := wsutil.WriteServerBinary(conn, message); err != nil {
if _, ok := err.(*net.OpError); ok {
logger.Info("ended", zap.String("error", err.Error()))
logger.Info("websocket connection ended", zap.String("error", err.Error()))
} else {
logger.Error("error", zap.String("error", err.Error()))
logger.Error("error when sending event via websocket", zap.String("error", err.Error()))
}
return
}
Expand All @@ -199,3 +199,14 @@ func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, pa

return nil
}

func (r *APIRoute) SubscribeActionSIO(ctx echo.Context) error {
server := r.services.ActionServiceSIO.Server()
server.ServeHTTP(ctx.Response(), ctx.Request())
return nil
}

// unfortunately need to duplicate this func to support both `/action` and `/action/` API endpoints
func (r *APIRoute) SubscribeActionSIO2(ctx echo.Context) error {
return r.SubscribeActionSIO(ctx)
}
Loading

0 comments on commit 2e9ce83

Please sign in to comment.