Skip to content

Commit

Permalink
add support to subscribe to more than one events of the same source id (
Browse files Browse the repository at this point in the history
  • Loading branch information
tigerinus authored Nov 10, 2022
1 parent fdc449d commit a992423
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 91 deletions.
115 changes: 96 additions & 19 deletions api/message_bus/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ tags:
x-tagGroups:
- name: Methods
tags:
- EventType methods
- Event methods
- ActionType methods
- Action methods

- name: Schemas
Expand All @@ -66,7 +68,7 @@ paths:
List all event types that are currently registered with the message bus.
operationId: getEventTypes
tags:
- Event methods
- EventType methods
responses:
"200":
$ref: "#/components/responses/GetEventTypesOK"
Expand All @@ -79,7 +81,7 @@ paths:
Register a new event type with the message bus.
operationId: RegisterEventType
tags:
- Event methods
- EventType methods
requestBody:
$ref: "#/components/requestBodies/RegisterEventType"
responses:
Expand All @@ -97,9 +99,9 @@ paths:
Get all event types that are registered with the message bus for a specific source ID.
operationId: getEventTypesBySourceID
tags:
- Event methods
- EventType methods
parameters:
- $ref: "#/components/parameters/SourceId"
- $ref: "#/components/parameters/SourceID"
responses:
"200":
$ref: "#/components/responses/GetEventTypesOK"
Expand All @@ -113,16 +115,17 @@ paths:
Get an event type that is registered with the message bus for a specific source ID and event name.
operationId: getEventType
tags:
- Event methods
- EventType methods
parameters:
- $ref: "#/components/parameters/SourceId"
- $ref: "#/components/parameters/Name"
- $ref: "#/components/parameters/SourceID"
- $ref: "#/components/parameters/EventName"
responses:
"200":
$ref: "#/components/responses/GetEventTypeOK"
"404":
$ref: "#/components/responses/ResponseNotFound"

/event/{source_id}/{name}:
post:
summary: Publish an event
description: |
Expand All @@ -131,8 +134,8 @@ paths:
tags:
- Event methods
parameters:
- $ref: "#/components/parameters/SourceId"
- $ref: "#/components/parameters/Name"
- $ref: "#/components/parameters/SourceID"
- $ref: "#/components/parameters/EventName"
requestBody:
$ref: "#/components/requestBodies/PublishEvent"
responses:
Expand All @@ -143,7 +146,7 @@ paths:
"404":
$ref: "#/components/responses/ResponseNotFound"

/event_type/{source_id}/{name}/ws:
/event/{source_id}:
get:
summary: Subscribe to an event type (WebSocket)
description: |
Expand All @@ -152,22 +155,32 @@ paths:
tags:
- Event methods
parameters:
- $ref: "#/components/parameters/SourceId"
- $ref: "#/components/parameters/Name"
- $ref: "#/components/parameters/SourceID"
- $ref: "#/components/parameters/EventNames"
responses:
"101":
description: |
The connection will be upgraded to a WebSocket connection and the client will receive events as they are published.
/action_type:
get:
summary: List action types
description: |
List all action types that are currently registered with the message bus.
operationId: getActionTypes
tags:
- ActionType methods
responses:
"200":
$ref: "#/components/responses/GetActionTypesOK"

post:
summary: Register an action type
description: |
Register a new action type with the message bus.
operationId: RegisterActionType
tags:
- ActionType methods
requestBody:
$ref: "#/components/requestBodies/RegisterActionType"
responses:
Expand All @@ -180,9 +193,14 @@ paths:

/action_type/{source_id}:
get:
summary: Get action types by source ID
description: |
Get all action types that are registered with the message bus for a specific source ID.
operationId: getActionTypesBySourceID
tags:
- ActionType methods
parameters:
- $ref: "#/components/parameters/SourceId"
- $ref: "#/components/parameters/SourceID"
responses:
"200":
$ref: "#/components/responses/GetActionTypesOK"
Expand All @@ -191,21 +209,32 @@ paths:

/action_type/{source_id}/{name}:
get:
summary: Get an action type by source ID and name
description: |
Get an action type that is registered with the message bus for a specific source ID and action name.
operationId: getActionType
tags:
- ActionType methods
parameters:
- $ref: "#/components/parameters/SourceId"
- $ref: "#/components/parameters/Name"
- $ref: "#/components/parameters/SourceID"
- $ref: "#/components/parameters/ActionName"
responses:
"200":
$ref: "#/components/responses/GetActionTypeOK"
"404":
$ref: "#/components/responses/ResponseNotFound"

/action/{source_id}/{name}:
post:
summary: Trigger an action
description: |
Trigger an action on the message bus.
operationId: triggerAction
tags:
- Action methods
parameters:
- $ref: "#/components/parameters/SourceId"
- $ref: "#/components/parameters/Name"
- $ref: "#/components/parameters/SourceID"
- $ref: "#/components/parameters/ActionName"
requestBody:
$ref: "#/components/requestBodies/TriggerAction"
responses:
Expand All @@ -216,6 +245,22 @@ paths:
"404":
$ref: "#/components/responses/ResponseNotFound"

/action/{source_id}:
get:
summary: Subscribe to an action type (WebSocket)
description: |
Subscribe to an action type by source ID and name via WebSocket.
operationId: subscribeAction
tags:
- Action methods
parameters:
- $ref: "#/components/parameters/SourceID"
- $ref: "#/components/parameters/ActionNames"
responses:
"101":
description: |
The connection will be upgraded to a WebSocket connection and the client will receive actions as they are triggered.
components:

securitySchemes:
Expand All @@ -225,19 +270,51 @@ components:
name: Authorization

parameters:
SourceId:
SourceID:
name: source_id
in: path
required: true
schema:
type: string
example: "local-storage"

EventName:
name: name
in: path
required: true
schema:
type: string
example: "local-storage:disk:added"

Name:
EventNames:
name: names
in: query
allowEmptyValue: false
required: false
schema:
type: array
items:
type: string
example: "local-storage:disk:added,local-storage:disk:removed"

ActionName:
name: name
in: path
required: true
schema:
type: string
example: "local-storage:partition:format"

ActionNames:
name: names
in: query
allowEmptyValue: false
required: false
schema:
type: array
items:
type: string
example: "local-storage:partition:format,local-storage:partition:mount"

requestBodies:
RegisterEventType:
Expand Down
2 changes: 1 addition & 1 deletion cmd/message-bus-tool/cmd/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cmd

const (
FlagSourceID = "source-id"
FlagEventName = "event-name"
FlagEventNames = "event-name"
FlagMessageBufferSize = "message-buffer-size"
FlagBaseURL = "base-url"
)
16 changes: 9 additions & 7 deletions cmd/message-bus-tool/cmd/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,18 @@ var subscribeCmd = &cobra.Command{
panic(err)
}

eventName, err := cmd.Flags().GetString(FlagEventName)
eventNames, err := cmd.Flags().GetString(FlagEventNames)
if err != nil {
panic(err)
}

wsURL := fmt.Sprintf("ws://%s%s/event_type/%s/%s/ws", strings.TrimRight(baseURL, "/"), basePath, sourceID, eventName)
var wsURL string

if eventNames == "" {
wsURL = fmt.Sprintf("ws://%s%s/event/%s", strings.TrimRight(baseURL, "/"), basePath, sourceID)
} else {
wsURL = fmt.Sprintf("ws://%s%s/event/%s?names=%s", strings.TrimRight(baseURL, "/"), basePath, sourceID, eventNames)
}
fmt.Printf("subscribed to %s\n", wsURL)

ws, err := websocket.Dial(wsURL, "", origin)
Expand Down Expand Up @@ -64,16 +70,12 @@ func init() {

subscribeCmd.Flags().UintP(FlagMessageBufferSize, "m", 1024, "message buffer size in bytes")
subscribeCmd.Flags().StringP(FlagSourceID, "s", "", "source id")
subscribeCmd.Flags().StringP(FlagEventName, "n", "", "event name")
subscribeCmd.Flags().StringP(FlagEventNames, "n", "", "event names (comma separated)")

if err := subscribeCmd.MarkFlagRequired(FlagSourceID); err != nil {
panic(err)
}

if err := subscribeCmd.MarkFlagRequired(FlagEventName); err != nil {
panic(err)
}

// Here you will define your flags and configuration settings.

// Cobra supports Persistent Flags which will work for this command
Expand Down
10 changes: 7 additions & 3 deletions route/api_route_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ func (r *APIRoute) RegisterActionType(ctx echo.Context) error {
panic("implement me") // TODO: Implement
}

func (r *APIRoute) GetActionTypesBySourceID(ctx echo.Context, sourceID codegen.SourceId) error {
func (r *APIRoute) GetActionTypesBySourceID(ctx echo.Context, sourceID codegen.SourceID) error {
panic("implement me") // TODO: Implement
}

func (r *APIRoute) GetActionType(ctx echo.Context, sourceID codegen.SourceId, name codegen.Name) error {
func (r *APIRoute) GetActionType(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error {
panic("implement me") // TODO: Implement
}

func (r *APIRoute) TriggerAction(ctx echo.Context, sourceID codegen.SourceId, name codegen.Name) error {
func (r *APIRoute) TriggerAction(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error {
panic("implement me") // TODO: Implement
}

func (r *APIRoute) SubscribeAction(ctx echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionParams) error {
panic("implement me") // TODO: Implement
}
Loading

0 comments on commit a992423

Please sign in to comment.