diff --git a/api/message_bus/openapi.yaml b/api/message_bus/openapi.yaml index de7719c..28e630b 100644 --- a/api/message_bus/openapi.yaml +++ b/api/message_bus/openapi.yaml @@ -467,6 +467,10 @@ components: EventType: type: object + required: + - "sourceID" + - "name" + - "propertyTypeList" properties: sourceID: type: string @@ -515,6 +519,10 @@ components: ActionType: type: object + required: + - "sourceID" + - "name" + - "propertyTypeList" properties: sourceID: type: string diff --git a/cmd/message-bus-tool/cmd/constants.go b/cmd/message-bus-tool/cmd/constants.go deleted file mode 100644 index 1a356c3..0000000 --- a/cmd/message-bus-tool/cmd/constants.go +++ /dev/null @@ -1,8 +0,0 @@ -package cmd - -const ( - FlagSourceID = "source-id" - FlagEventNames = "event-name" - FlagMessageBufferSize = "message-buffer-size" - FlagBaseURL = "base-url" -) diff --git a/cmd/message-bus-tool/cmd/eventTypes.go b/cmd/message-bus-tool/cmd/eventTypes.go deleted file mode 100644 index 169a869..0000000 --- a/cmd/message-bus-tool/cmd/eventTypes.go +++ /dev/null @@ -1,81 +0,0 @@ -/* -Copyright © 2022 NAME HERE -*/ -package cmd - -import ( - "context" - "fmt" - "net/http" - "strings" - "text/tabwriter" - "time" - - "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" - "github.com/spf13/cobra" -) - -// eventTypesCmd represents the eventTypes command -var eventTypesCmd = &cobra.Command{ - Use: "event-types", - Short: "list event types", - Run: func(cmd *cobra.Command, args []string) { - baseURL, err := rootCmd.PersistentFlags().GetString(FlagBaseURL) - if err != nil { - panic(err) - } - - url := fmt.Sprintf("http://%s/%s", strings.TrimRight(baseURL, "/"), basePath) - client, err := codegen.NewClientWithResponses(url) - if err != nil { - fmt.Println(err) - return - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - response, err := client.GetEventTypesWithResponse(ctx) - if err != nil { - fmt.Println(err) - return - } - - if response.StatusCode() != http.StatusOK { - fmt.Println("error") - return - } - - w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 3, ' ', 0) - - if len(*response.JSON200) > 0 { - fmt.Fprintln(w, "SOURCE ID\tEVENT NAME\tPROPERTY TYPES") - fmt.Fprintln(w, "---------\t----------\t--------------") - } - - for _, eventType := range *response.JSON200 { - propertyTypes := make([]string, 0) - for _, propertyType := range *eventType.PropertyTypeList { - propertyTypes = append(propertyTypes, *propertyType.Name) - } - - fmt.Fprintf(w, "%s\t%s\t%s\n", *eventType.SourceID, *eventType.Name, strings.Join(propertyTypes, ",")) - } - - w.Flush() - }, -} - -func init() { - listCmd.AddCommand(eventTypesCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // eventTypesCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // eventTypesCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} diff --git a/cmd/message-bus-tool/cmd/list.go b/cmd/message-bus-tool/cmd/list.go deleted file mode 100644 index 4ee7f45..0000000 --- a/cmd/message-bus-tool/cmd/list.go +++ /dev/null @@ -1,28 +0,0 @@ -/* -Copyright © 2022 NAME HERE -*/ -package cmd - -import ( - "github.com/spf13/cobra" -) - -// listCmd represents the list command -var listCmd = &cobra.Command{ - Use: "list", - Short: "list messages", -} - -func init() { - rootCmd.AddCommand(listCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // listCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // listCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} diff --git a/cmd/message-bus-tool/cmd/root.go b/cmd/message-bus-tool/cmd/root.go deleted file mode 100644 index 10882d5..0000000 --- a/cmd/message-bus-tool/cmd/root.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package cmd - -import ( - "os" - - "github.com/spf13/cobra" -) - -const basePath = "/v2/message_bus" - -// rootCmd represents the base command when called without any subcommands -var rootCmd = &cobra.Command{ - Use: "message-bus-tool", - Short: "This tool is used to test websocket endpoint of CasaOS MessageBus service", - // Uncomment the following line if your bare application - // has an action associated with it: - // Run: func(cmd *cobra.Command, args []string) { }, -} - -// Execute adds all child commands to the root command and sets flags appropriately. -// This is called by main.main(). It only needs to happen once to the rootCmd. -func Execute() { - err := rootCmd.Execute() - if err != nil { - os.Exit(1) - } -} - -func init() { - // Here you will define your flags and configuration settings. - // Cobra supports persistent flags, which, if defined here, - // will be global for your application. - - // rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.message-bus-tool.yaml)") - rootCmd.PersistentFlags().StringP(FlagBaseURL, "u", "localhost:80", "base url of CasaOS") -} diff --git a/cmd/message-bus-tool/cmd/subscribe.go b/cmd/message-bus-tool/cmd/subscribe.go deleted file mode 100644 index f2c69ba..0000000 --- a/cmd/message-bus-tool/cmd/subscribe.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright © 2022 NAME HERE -*/ -package cmd - -import ( - "fmt" - "log" - "strings" - - "github.com/spf13/cobra" - "golang.org/x/net/websocket" -) - -const origin = "http://localhost/" - -// subscribeCmd represents the subscribe command -var subscribeCmd = &cobra.Command{ - Use: "subscribe", - Short: "subscribe to a websocket URL", - Run: func(cmd *cobra.Command, args []string) { - baseURL, err := rootCmd.PersistentFlags().GetString(FlagBaseURL) - if err != nil { - panic(err) - } - - sourceID, err := cmd.Flags().GetString(FlagSourceID) - if err != nil { - panic(err) - } - - eventNames, err := cmd.Flags().GetString(FlagEventNames) - if err != nil { - panic(err) - } - - var wsURL string - - if eventNames == "" { - wsURL = fmt.Sprintf("ws://%s%s/event/%s", strings.TrimRight(baseURL, "/"), basePath, sourceID) - } else { - wsURL = fmt.Sprintf("ws://%s%s/event/%s?names=%s", strings.TrimRight(baseURL, "/"), basePath, sourceID, eventNames) - } - fmt.Printf("subscribed to %s\n", wsURL) - - ws, err := websocket.Dial(wsURL, "", origin) - if err != nil { - log.Fatal(err) - } - defer ws.Close() - - bufferSize, err := cmd.Flags().GetUint(FlagMessageBufferSize) - if err != nil { - log.Fatal(err) - } - - for { - msg := make([]byte, bufferSize) - var n int - if n, err = ws.Read(msg); err != nil { - log.Fatal(err) - } - log.Printf("%s\n", msg[:n]) - } - }, -} - -func init() { - rootCmd.AddCommand(subscribeCmd) - - subscribeCmd.Flags().UintP(FlagMessageBufferSize, "m", 1024, "message buffer size in bytes") - subscribeCmd.Flags().StringP(FlagSourceID, "s", "", "source id") - subscribeCmd.Flags().StringP(FlagEventNames, "n", "", "event names (comma separated)") - - if err := subscribeCmd.MarkFlagRequired(FlagSourceID); err != nil { - panic(err) - } - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // subscribeCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // subscribeCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} diff --git a/cmd/message-bus-tool/cmd/version.go b/cmd/message-bus-tool/cmd/version.go deleted file mode 100644 index 3de85bf..0000000 --- a/cmd/message-bus-tool/cmd/version.go +++ /dev/null @@ -1,34 +0,0 @@ -/* -Copyright © 2022 NAME HERE -*/ -package cmd - -import ( - "fmt" - - "github.com/IceWhaleTech/CasaOS-MessageBus/common" - "github.com/spf13/cobra" -) - -// versionCmd represents the version command -var versionCmd = &cobra.Command{ - Use: "version", - Short: "Show version", - Run: func(cmd *cobra.Command, args []string) { - fmt.Println(common.MessageBusVersion) - }, -} - -func init() { - rootCmd.AddCommand(versionCmd) - - // Here you will define your flags and configuration settings. - - // Cobra supports Persistent Flags which will work for this command - // and all subcommands, e.g.: - // versionCmd.PersistentFlags().String("foo", "", "A help for foo") - - // Cobra supports local flags which will only run when this command - // is called directly, e.g.: - // versionCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") -} diff --git a/cmd/message-bus-tool/main.go b/cmd/message-bus-tool/main.go deleted file mode 100644 index a9895cb..0000000 --- a/cmd/message-bus-tool/main.go +++ /dev/null @@ -1,20 +0,0 @@ -/* -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package main - -import "github.com/IceWhaleTech/CasaOS-MessageBus/cmd/message-bus-tool/cmd" - -func main() { - cmd.Execute() -} diff --git a/common/constants.go b/common/constants.go index 1a87113..6fafa1b 100644 --- a/common/constants.go +++ b/common/constants.go @@ -4,6 +4,6 @@ const ( MessageBusVersion = "0.3.8" MessageBusServiceName = "message-bus" - MessageBusSourceID = "message-bus" - MessageBusHeartbeatEventName = "message-bus:heartbeat" + MessageBusSourceID = "message-bus" + MessageBusHeartbeatName = "message-bus:heartbeat" ) diff --git a/go.mod b/go.mod index 0f89aed..9517d4d 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/IceWhaleTech/CasaOS-Common v0.3.8-alpha1 github.com/gobwas/ws v1.1.0 github.com/json-iterator/go v1.1.12 - github.com/spf13/cobra v1.6.1 go.uber.org/goleak v1.1.11 gorm.io/driver/sqlite v1.4.3 gorm.io/gorm v1.24.1 @@ -31,7 +30,6 @@ require ( github.com/golang/snappy v0.0.2 // indirect github.com/google/go-cmp v0.5.8 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.11.4 // indirect @@ -45,7 +43,6 @@ require ( github.com/pelletier/go-toml/v2 v2.0.5 // indirect github.com/pierrec/lz4/v4 v4.1.2 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/spf13/pflag v1.0.5 // indirect github.com/ugorji/go/codec v1.2.7 // indirect github.com/ulikunitz/xz v0.5.9 // indirect github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect @@ -75,7 +72,7 @@ require ( go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.21.0 golang.org/x/crypto v0.1.0 // indirect - golang.org/x/net v0.1.0 + golang.org/x/net v0.1.0 // indirect golang.org/x/sys v0.1.0 // indirect golang.org/x/text v0.4.0 // indirect gopkg.in/ini.v1 v1.67.0 diff --git a/go.sum b/go.sum index f78b0ac..eb25ecb 100644 --- a/go.sum +++ b/go.sum @@ -13,7 +13,6 @@ github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pq github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= -github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -67,8 +66,6 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= -github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= -github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/invopop/yaml v0.1.0 h1:YW3WGUoJEXYfzWBjn00zIlrw7brGVD0fUKRYDPAPhrc= github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= @@ -139,11 +136,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= -github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/spf13/cobra v1.6.1 h1:o94oiPyS4KD1mPy2fmcYYHHfCxLqYjJOhGsCHFZtEzA= -github.com/spf13/cobra v1.6.1/go.mod h1:IOw/AERYS7UzyrGinqmz6HLUo219MORXGxhbaJUqzrY= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= -github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= diff --git a/model/event.go b/model/event.go deleted file mode 100644 index b1cb6b5..0000000 --- a/model/event.go +++ /dev/null @@ -1,9 +0,0 @@ -package model - -type Event struct { - ID uint `gorm:"primaryKey"` - SourceID string `gorm:"index"` - Name string `gorm:"index"` - Properties []Property `gorm:"foreignKey:Id"` - Timestamp int64 `gorm:"autoCreateTime:milli"` -} diff --git a/model/event_type.go b/model/event_type.go deleted file mode 100644 index 0a34dc4..0000000 --- a/model/event_type.go +++ /dev/null @@ -1,11 +0,0 @@ -package model - -// TODO - add validation - see https://github.com/go-playground/validator - -const PropertyTypeList = "PropertyTypeList" - -type EventType struct { - SourceID string `gorm:"primaryKey"` - Name string `gorm:"primaryKey"` - PropertyTypeList []PropertyType `gorm:"many2many:event_type_property_type;"` -} diff --git a/model/property.go b/model/property.go deleted file mode 100644 index a4afe0d..0000000 --- a/model/property.go +++ /dev/null @@ -1,7 +0,0 @@ -package model - -type Property struct { - ID uint `gorm:"primaryKey"` - Name string - Value string -} diff --git a/model/property_type.go b/model/property_type.go deleted file mode 100644 index ffed5c7..0000000 --- a/model/property_type.go +++ /dev/null @@ -1,5 +0,0 @@ -package model - -type PropertyType struct { - Name string `gorm:"primaryKey"` -} diff --git a/model/structs.go b/model/structs.go new file mode 100644 index 0000000..2ae59d6 --- /dev/null +++ b/model/structs.go @@ -0,0 +1,50 @@ +package model + +const ( + EventTypeList = "EventTypeList" + ActionTypeList = "ActionTypeList" + PropertyTypeList = "PropertyTypeList" +) + +type EventType struct { + SourceID string `gorm:"primaryKey"` + Name string `gorm:"primaryKey"` + PropertyTypeList []PropertyType `gorm:"many2many:event_type_property_type;"` +} + +type Event struct { + ID uint `gorm:"primaryKey"` + SourceID string `gorm:"index"` + Name string `gorm:"index"` + Properties []Property `gorm:"foreignKey:Id"` + Timestamp int64 `gorm:"autoCreateTime:milli"` +} + +type ActionType struct { + SourceID string `gorm:"primaryKey"` + Name string `gorm:"primaryKey"` + PropertyTypeList []PropertyType `gorm:"many2many:action_type_property_type;"` +} + +type Action struct { + ID uint `gorm:"primaryKey"` + SourceID string `gorm:"index"` + Name string `gorm:"index"` + Properties []Property `gorm:"foreignKey:Id"` + Timestamp int64 `gorm:"autoCreateTime:milli"` +} + +type PropertyType struct { + Name string `gorm:"primaryKey"` +} + +type Property struct { + ID uint `gorm:"primaryKey"` + Name string + Value string +} + +type GenericType struct { + SourceID string `gorm:"primaryKey"` + Name string `gorm:"primaryKey"` +} diff --git a/repository/repository.go b/repository/repository.go index 1c26d5b..80fb90f 100644 --- a/repository/repository.go +++ b/repository/repository.go @@ -7,5 +7,11 @@ type Repository interface { RegisterEventType(eventType model.EventType) (*model.EventType, error) GetEventTypesBySourceID(sourceID string) ([]model.EventType, error) GetEventType(sourceID string, name string) (*model.EventType, error) + + GetActionTypes() ([]model.ActionType, error) + RegisterActionType(actionType model.ActionType) (*model.ActionType, error) + GetActionTypesBySourceID(sourceID string) ([]model.ActionType, error) + GetActionType(sourceID string, name string) (*model.ActionType, error) + Close() } diff --git a/repository/repository_db.go b/repository/repository_db.go index 4b9b4c5..cd8a253 100644 --- a/repository/repository_db.go +++ b/repository/repository_db.go @@ -52,6 +52,22 @@ func (r *DatabaseRepository) GetEventType(sourceID string, name string) (*model. return &eventType, nil } +func (r *DatabaseRepository) GetActionTypes() ([]model.ActionType, error) { + return GetTypes[model.ActionType](r.db) +} + +func (r *DatabaseRepository) RegisterActionType(actionType model.ActionType) (*model.ActionType, error) { + return RegisterType(r.db, actionType) +} + +func (r *DatabaseRepository) GetActionTypesBySourceID(sourceID string) ([]model.ActionType, error) { + return GetTypesBySourceID[model.ActionType](r.db, sourceID) +} + +func (r *DatabaseRepository) GetActionType(sourceID string, name string) (*model.ActionType, error) { + return GetType[model.ActionType](r.db, sourceID, name) +} + func (r *DatabaseRepository) Close() { sqlDB, err := r.db.DB() if err == nil { @@ -59,6 +75,45 @@ func (r *DatabaseRepository) Close() { } } +func GetTypes[T any](db *gorm.DB) ([]T, error) { + var types []T + + if err := db.Preload(model.PropertyTypeList).Find(&types).Error; err != nil { + return nil, err + } + + return types, nil +} + +func RegisterType[T any](db *gorm.DB, t T) (*T, error) { + // upsert + if err := db.Clauses(clause.OnConflict{UpdateAll: true}).Create(&t).Error; err != nil { + return nil, err + } + + return &t, nil +} + +func GetTypesBySourceID[T any](db *gorm.DB, sourceID string) ([]T, error) { + var types []T + + if err := db.Preload(model.PropertyTypeList).Where(&model.GenericType{SourceID: sourceID}).Find(&types).Error; err != nil { + return nil, err + } + + return types, nil +} + +func GetType[T any](db *gorm.DB, sourceID string, name string) (*T, error) { + var t T + + if err := db.Preload(model.PropertyTypeList).Where(&model.GenericType{SourceID: sourceID, Name: name}).First(&t).Error; err != nil { + return nil, err + } + + return &t, nil +} + func NewDatabaseRepositoryInMemory() (Repository, error) { return NewDatabaseRepository("file::memory:?cache=shared") } @@ -74,7 +129,7 @@ func NewDatabaseRepository(databaseFilePath string) (Repository, error) { c.SetMaxOpenConns(100) c.SetConnMaxIdleTime(1000 * time.Second) - if err := db.AutoMigrate(&model.EventType{}, &model.PropertyType{}); err != nil { + if err := db.AutoMigrate(&model.EventType{}, &model.ActionType{}, &model.PropertyType{}); err != nil { return nil, err } diff --git a/route/adapter/in/action.go b/route/adapter/in/action.go new file mode 100644 index 0000000..40a1a15 --- /dev/null +++ b/route/adapter/in/action.go @@ -0,0 +1,25 @@ +package in + +import ( + "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" +) + +func ActionAdapter(action codegen.Action) model.Action { + properties := make([]model.Property, 0) + for _, property := range *action.Properties { + properties = append(properties, PropertyAdapter(property)) + } + + var timestamp int64 + if action.Timestamp != nil { + timestamp = action.Timestamp.Unix() + } + + return model.Action{ + SourceID: *action.SourceID, + Name: *action.Name, + Properties: properties, + Timestamp: timestamp, + } +} diff --git a/route/adapter/in/action_type.go b/route/adapter/in/action_type.go new file mode 100644 index 0000000..9c2c518 --- /dev/null +++ b/route/adapter/in/action_type.go @@ -0,0 +1,19 @@ +package in + +import ( + "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" +) + +func ActionTypeAdapter(actionType codegen.ActionType) model.ActionType { + propertyTypeList := make([]model.PropertyType, 0) + for _, propertyType := range actionType.PropertyTypeList { + propertyTypeList = append(propertyTypeList, PropertyTypeAdapter(propertyType)) + } + + return model.ActionType{ + SourceID: actionType.SourceID, + Name: actionType.Name, + PropertyTypeList: propertyTypeList, + } +} diff --git a/route/adapter/in/event_type.go b/route/adapter/in/event_type.go index 88687b2..bb200ca 100644 --- a/route/adapter/in/event_type.go +++ b/route/adapter/in/event_type.go @@ -7,13 +7,13 @@ import ( func EventTypeAdapter(eventType codegen.EventType) model.EventType { propertyTypeList := make([]model.PropertyType, 0) - for _, propertyType := range *eventType.PropertyTypeList { + for _, propertyType := range eventType.PropertyTypeList { propertyTypeList = append(propertyTypeList, PropertyTypeAdapter(propertyType)) } return model.EventType{ - SourceID: *eventType.SourceID, - Name: *eventType.Name, + SourceID: eventType.SourceID, + Name: eventType.Name, PropertyTypeList: propertyTypeList, } } diff --git a/route/adapter/out/action.go b/route/adapter/out/action.go new file mode 100644 index 0000000..6f0da68 --- /dev/null +++ b/route/adapter/out/action.go @@ -0,0 +1,23 @@ +package out + +import ( + "time" + + "github.com/IceWhaleTech/CasaOS-Common/utils" + "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" +) + +func ActionAdapter(action model.Action) codegen.Action { + properties := make([]codegen.Property, 0) + for _, property := range action.Properties { + properties = append(properties, PropertyAdapter(property)) + } + + return codegen.Action{ + SourceID: &action.SourceID, + Name: &action.Name, + Properties: &properties, + Timestamp: utils.Ptr(time.Unix(action.Timestamp, 0)), + } +} diff --git a/route/adapter/out/action_type.go b/route/adapter/out/action_type.go new file mode 100644 index 0000000..e7fa241 --- /dev/null +++ b/route/adapter/out/action_type.go @@ -0,0 +1,19 @@ +package out + +import ( + "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" +) + +func ActionTypeAdapter(actionType model.ActionType) codegen.ActionType { + propertyTypeList := make([]codegen.PropertyType, 0) + for _, propertyType := range actionType.PropertyTypeList { + propertyTypeList = append(propertyTypeList, PropertyTypeAdapter(propertyType)) + } + + return codegen.ActionType{ + SourceID: actionType.SourceID, + Name: actionType.Name, + PropertyTypeList: propertyTypeList, + } +} diff --git a/route/adapter/out/event.go b/route/adapter/out/event.go index 0dbcb42..1b0d6c2 100644 --- a/route/adapter/out/event.go +++ b/route/adapter/out/event.go @@ -3,6 +3,7 @@ package out import ( "time" + "github.com/IceWhaleTech/CasaOS-Common/utils" "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" "github.com/IceWhaleTech/CasaOS-MessageBus/model" ) @@ -13,12 +14,10 @@ func EventAdapter(event model.Event) codegen.Event { properties = append(properties, PropertyAdapter(property)) } - timestamp := time.Unix(event.Timestamp, 0) - return codegen.Event{ SourceID: &event.SourceID, Name: &event.Name, Properties: &properties, - Timestamp: ×tamp, + Timestamp: utils.Ptr(time.Unix(event.Timestamp, 0)), } } diff --git a/route/adapter/out/event_type.go b/route/adapter/out/event_type.go index c3f3403..ebc143d 100644 --- a/route/adapter/out/event_type.go +++ b/route/adapter/out/event_type.go @@ -12,8 +12,8 @@ func EventTypeAdapter(eventType model.EventType) codegen.EventType { } return codegen.EventType{ - SourceID: &eventType.SourceID, - Name: &eventType.Name, - PropertyTypeList: &propertyTypeList, + SourceID: eventType.SourceID, + Name: eventType.Name, + PropertyTypeList: propertyTypeList, } } diff --git a/route/api_route.go b/route/api_route.go index 32af461..454700b 100644 --- a/route/api_route.go +++ b/route/api_route.go @@ -3,12 +3,15 @@ package route import ( "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" "github.com/IceWhaleTech/CasaOS-MessageBus/service" + jsoniter "github.com/json-iterator/go" ) type APIRoute struct { services *service.Services } +var json = jsoniter.ConfigCompatibleWithStandardLibrary + func NewAPIRoute(services *service.Services) codegen.ServerInterface { return &APIRoute{ services: services, diff --git a/route/api_route_action.go b/route/api_route_action.go index 8a14bee..defc6ef 100644 --- a/route/api_route_action.go +++ b/route/api_route_action.go @@ -1,30 +1,199 @@ package route import ( + "net" + "net/http" + "time" + + "github.com/IceWhaleTech/CasaOS-Common/utils" + "github.com/IceWhaleTech/CasaOS-Common/utils/logger" "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" + "github.com/IceWhaleTech/CasaOS-MessageBus/common" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" + "github.com/IceWhaleTech/CasaOS-MessageBus/route/adapter/in" + "github.com/IceWhaleTech/CasaOS-MessageBus/route/adapter/out" + "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" "github.com/labstack/echo/v4" + "go.uber.org/zap" ) -func (r *APIRoute) GetActionTypes(ctx echo.Context) error { - panic("implement me") // TODO: Implement +func (r *APIRoute) GetActionTypes(c echo.Context) error { + actionType, err := r.services.ActionService.GetActionTypes() + if err != nil { + message := err.Error() + return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) + } + + results := make([]codegen.ActionType, 0) + + for _, actionType := range actionType { + results = append(results, out.ActionTypeAdapter(actionType)) + } + + return c.JSON(http.StatusOK, results) } -func (r *APIRoute) RegisterActionType(ctx echo.Context) error { - panic("implement me") // TODO: Implement +func (r *APIRoute) RegisterActionType(c echo.Context) error { + var actionType codegen.ActionType + if err := c.Bind(&actionType); err != nil { + message := err.Error() + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) + } + + result, err := r.services.ActionService.RegisterActionType(in.ActionTypeAdapter(actionType)) + if err != nil { + message := err.Error() + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) + } + + return c.JSON(http.StatusCreated, result) } -func (r *APIRoute) GetActionTypesBySourceID(ctx echo.Context, sourceID codegen.SourceID) error { - panic("implement me") // TODO: Implement +func (r *APIRoute) GetActionTypesBySourceID(c echo.Context, sourceID codegen.SourceID) error { + results, err := r.services.ActionService.GetActionTypesBySourceID(sourceID) + if err != nil { + message := err.Error() + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) + } + + return c.JSON(http.StatusOK, results) } -func (r *APIRoute) GetActionType(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - panic("implement me") // TODO: Implement +func (r *APIRoute) GetActionType(c echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { + result, err := r.services.ActionService.GetActionType(sourceID, name) + if err != nil { + message := err.Error() + return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) + } + + if result == nil { + return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: utils.Ptr("not found")}) + } + + return c.JSON(http.StatusOK, result) } -func (r *APIRoute) TriggerAction(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - panic("implement me") // TODO: Implement +func (r *APIRoute) TriggerAction(c echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { + actionType, err := r.services.ActionService.GetActionType(sourceID, name) + if err != nil { + message := err.Error() + return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) + } + + if actionType == nil { + return c.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: utils.Ptr("not found")}) + } + + var properties []codegen.Property + if err := c.Bind(&properties); err != nil { + message := err.Error() + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) + } + + action := codegen.Action{ + SourceID: &sourceID, + Name: &name, + Properties: &properties, + Timestamp: utils.Ptr(time.Now()), + } + + result, err := r.services.ActionService.Trigger(in.ActionAdapter(action)) + if err != nil { + message := err.Error() + return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) + } + + return c.JSON(http.StatusOK, out.ActionAdapter(*result)) } -func (r *APIRoute) SubscribeAction(ctx echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionParams) error { - panic("implement me") // TODO: Implement +func (r *APIRoute) SubscribeAction(c echo.Context, sourceID codegen.SourceID, params codegen.SubscribeActionParams) error { + var actionNames []string + if params.Names != nil { + for _, actionName := range *params.Names { + actionType, err := r.services.ActionService.GetActionType(sourceID, actionName) + if err != nil { + message := err.Error() + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) + } + + if actionType == nil { + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: utils.Ptr("action type not found")}) + } + + actionNames = append(actionNames, actionName) + } + } else { + actionTypes, err := r.services.ActionService.GetActionTypesBySourceID(sourceID) + if err != nil { + message := err.Error() + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) + } + + for _, actionType := range actionTypes { + actionNames = append(actionNames, actionType.Name) + } + } + + conn, _, _, err := ws.UpgradeHTTP(c.Request(), c.Response()) + if err != nil { + message := err.Error() + return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) + } + + channel, err := r.services.ActionService.Subscribe(sourceID, actionNames) + if err != nil { + conn.Close() // need to close connection here, instead of defer, because of the goroutine + message := err.Error() + return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) + } + + go func(conn net.Conn, channel chan model.Action, actionNames []string) { + defer conn.Close() + defer close(channel) + defer func(actionNames []string) { + for _, name := range actionNames { + if err := r.services.ActionService.Unsubscribe(sourceID, name, channel); err != nil { + logger.Error("error when trying to unsubscribe an action type", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name)) + } + } + }(actionNames) + + logger.Info("started", zap.String("remote_addr", conn.RemoteAddr().String())) + + for { + action, ok := <-channel + if !ok { + logger.Info("channel closed") + return + } + + if action.SourceID == common.MessageBusSourceID && action.Name == common.MessageBusHeartbeatName { + if err := wsutil.WriteServerMessage(conn, ws.OpPing, []byte{}); err != nil { + logger.Error("error when trying to send ping message", zap.Error(err)) + return + } + continue + } + + message, err := json.Marshal(out.ActionAdapter(action)) + if err != nil { + logger.Error("error when trying to marshal action", zap.Error(err)) + continue + } + + logger.Info("sending", zap.String("remote_addr", conn.RemoteAddr().String()), zap.String("message", string(message))) + + if err := wsutil.WriteServerBinary(conn, message); err != nil { + if _, ok := err.(*net.OpError); ok { + logger.Info("ended", zap.String("error", err.Error())) + } else { + logger.Error("error", zap.String("error", err.Error())) + } + return + } + } + }(conn, channel, actionNames) + + return nil } diff --git a/route/api_route_event.go b/route/api_route_event.go index bb94501..9342ea8 100644 --- a/route/api_route_event.go +++ b/route/api_route_event.go @@ -1,12 +1,12 @@ package route import ( + "fmt" "net" "net/http" "time" - jsoniter "github.com/json-iterator/go" - + "github.com/IceWhaleTech/CasaOS-Common/utils" "github.com/IceWhaleTech/CasaOS-Common/utils/logger" "github.com/IceWhaleTech/CasaOS-MessageBus/codegen" "github.com/IceWhaleTech/CasaOS-MessageBus/common" @@ -19,10 +19,8 @@ import ( "go.uber.org/zap" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary - func (r *APIRoute) GetEventTypes(ctx echo.Context) error { - eventTypes, err := r.services.EventTypeService.GetEventTypes() + eventTypes, err := r.services.EventService.GetEventTypes() if err != nil { message := err.Error() return ctx.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) @@ -44,18 +42,18 @@ func (r *APIRoute) RegisterEventType(ctx echo.Context) error { return ctx.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) } - result, err := r.services.EventTypeService.RegisterEventType(in.EventTypeAdapter(eventType)) + result, err := r.services.EventService.RegisterEventType(in.EventTypeAdapter(eventType)) if err != nil { message := err.Error() return ctx.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) } - return ctx.JSON(http.StatusOK, result) + return ctx.JSON(http.StatusCreated, result) } func (r *APIRoute) GetEventTypesBySourceID(ctx echo.Context, sourceID codegen.SourceID) error { - results, err := r.services.EventTypeService.GetEventTypesBySourceID(sourceID) - if err != nil || results == nil { + results, err := r.services.EventService.GetEventTypesBySourceID(sourceID) + if err != nil { message := err.Error() return ctx.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) } @@ -64,38 +62,44 @@ func (r *APIRoute) GetEventTypesBySourceID(ctx echo.Context, sourceID codegen.So } func (r *APIRoute) GetEventType(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - result, err := r.services.EventTypeService.GetEventType(sourceID, name) - if err != nil || result == nil { + result, err := r.services.EventService.GetEventType(sourceID, name) + if err != nil { message := err.Error() return ctx.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) } + if result == nil { + return ctx.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: utils.Ptr("not found")}) + } + return ctx.JSON(http.StatusOK, result) } func (r *APIRoute) PublishEvent(ctx echo.Context, sourceID codegen.SourceID, name codegen.EventName) error { - eventType, err := r.services.EventTypeService.GetEventType(sourceID, name) - if err != nil || eventType == nil { + eventType, err := r.services.EventService.GetEventType(sourceID, name) + if err != nil { message := err.Error() return ctx.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: &message}) } + if eventType == nil { + return ctx.JSON(http.StatusNotFound, codegen.ResponseNotFound{Message: utils.Ptr("not found")}) + } + var properties []codegen.Property if err := ctx.Bind(&properties); err != nil { message := err.Error() return ctx.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) } - timestamp := time.Now() - event := codegen.Event{ SourceID: &sourceID, Name: &name, Properties: &properties, - Timestamp: ×tamp, + Timestamp: utils.Ptr(time.Now()), } - result, err := r.services.EventTypeService.Publish(in.EventAdapter(event)) + result, err := r.services.EventService.Publish(in.EventAdapter(event)) if err != nil { message := err.Error() return ctx.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) @@ -108,17 +112,23 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par var eventNames []string if params.Names != nil { for _, eventName := range *params.Names { - eventType, err := r.services.EventTypeService.GetEventType(sourceID, eventName) - if err != nil || eventType == nil { + eventType, err := r.services.EventService.GetEventType(sourceID, eventName) + if err != nil { message := err.Error() return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) } + + if eventType == nil { + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: utils.Ptr(fmt.Sprintf("event type `%s` of source ID `%s` not found", eventName, sourceID))}) + } + eventNames = append(eventNames, eventName) } } else { - eventTypes, err := r.services.EventTypeService.GetEventTypesBySourceID(sourceID) + eventTypes, err := r.services.EventService.GetEventTypesBySourceID(sourceID) if err != nil { - return err + message := err.Error() + return c.JSON(http.StatusBadRequest, codegen.ResponseBadRequest{Message: &message}) } for _, eventType := range eventTypes { @@ -128,13 +138,15 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par conn, _, _, err := ws.UpgradeHTTP(c.Request(), c.Response()) if err != nil { - return err + message := err.Error() + return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) } - channel, err := r.services.EventTypeService.Subscribe(sourceID, eventNames) + channel, err := r.services.EventService.Subscribe(sourceID, eventNames) if err != nil { conn.Close() // need to close connection here, instead of defer, because of the goroutine - return err + message := err.Error() + return c.JSON(http.StatusInternalServerError, codegen.ResponseInternalServerError{Message: &message}) } go func(conn net.Conn, channel chan model.Event, eventNames []string) { @@ -142,7 +154,7 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par defer close(channel) defer func(eventNames []string) { for _, name := range eventNames { - if err := r.services.EventTypeService.Unsubscribe(sourceID, name, channel); err != nil { + if err := r.services.EventService.Unsubscribe(sourceID, name, channel); err != nil { logger.Error("error when trying to unsubscribe an event type", zap.Error(err), zap.String("source_id", sourceID), zap.String("name", name)) } } @@ -157,7 +169,7 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par return } - if event.SourceID == common.MessageBusSourceID && event.Name == common.MessageBusHeartbeatEventName { + if event.SourceID == common.MessageBusSourceID && event.Name == common.MessageBusHeartbeatName { if err := wsutil.WriteServerMessage(conn, ws.OpPing, []byte{}); err != nil { logger.Error("error when trying to send ping message", zap.Error(err)) return @@ -167,7 +179,7 @@ func (r *APIRoute) SubscribeEvent(c echo.Context, sourceID codegen.SourceID, par message, err := json.Marshal(out.EventAdapter(event)) if err != nil { - logger.Error("failed to marshal event", zap.Error(err)) + logger.Error("error when trying to marshal event", zap.Error(err)) continue } diff --git a/route/api_route_event_test.go b/route/api_route_event_test.go index 51f34e0..7902999 100644 --- a/route/api_route_event_test.go +++ b/route/api_route_event_test.go @@ -54,7 +54,7 @@ func TestEventRoute(t *testing.T) { err = apiRoute.RegisterEventType(e.NewContext(req, rec)) assert.NilError(t, err) - assert.Equal(t, rec.Code, http.StatusOK) + assert.Equal(t, rec.Code, http.StatusCreated) var actualEventType model.EventType err = json2.UnmarshalFromString(rec.Body.String(), &actualEventType) diff --git a/service/action_type_service.go b/service/action_type_service.go new file mode 100644 index 0000000..21bf150 --- /dev/null +++ b/service/action_type_service.go @@ -0,0 +1,236 @@ +package service + +import ( + "context" + "errors" + "time" + + "github.com/IceWhaleTech/CasaOS-MessageBus/common" + "github.com/IceWhaleTech/CasaOS-MessageBus/model" + "github.com/IceWhaleTech/CasaOS-MessageBus/repository" +) + +type ActionService struct { + ctx *context.Context + repository *repository.Repository + inboundChannel chan model.Action + subscriberChannels map[string]map[string][]chan model.Action + stop chan struct{} +} + +var ( + ErrActionSourceIDNotFound = errors.New("event source id not found") + ErrActionNameNotFound = errors.New("event name not found") +) + +func (s *ActionService) GetActionTypes() ([]model.ActionType, error) { + return (*s.repository).GetActionTypes() +} + +func (s *ActionService) RegisterActionType(actionType model.ActionType) (*model.ActionType, error) { + // TODO - ensure sourceID and name are URL safe + + return (*s.repository).RegisterActionType(actionType) +} + +func (s *ActionService) GetActionTypesBySourceID(sourceID string) ([]model.ActionType, error) { + return (*s.repository).GetActionTypesBySourceID(sourceID) +} + +func (s *ActionService) GetActionType(sourceID string, name string) (*model.ActionType, error) { + return (*s.repository).GetActionType(sourceID, name) +} + +func (s *ActionService) Trigger(action model.Action) (*model.Action, error) { + if s.inboundChannel == nil { + return nil, ErrInboundChannelNotFound + } + + if action.Timestamp == 0 { + action.Timestamp = time.Now().Unix() + } + + // TODO - ensure properties are valid for event type + + select { + case s.inboundChannel <- action: + + case <-(*s.ctx).Done(): + return nil, (*s.ctx).Err() + + default: // drop event if no one is listening + } + + return &action, nil +} + +func (s *ActionService) Subscribe(sourceID string, names []string) (chan model.Action, error) { + if len(names) == 0 { + actionTypes, err := s.GetActionTypesBySourceID(sourceID) + if err != nil { + return nil, err + } + + for _, actionType := range actionTypes { + names = append(names, actionType.Name) + } + } + + for _, name := range names { + actionType, err := s.GetActionType(sourceID, name) + if err != nil { + return nil, err + } + + if actionType == nil { + return nil, ErrActionNameNotFound + } + } + + if s.subscriberChannels == nil { + s.subscriberChannels = make(map[string]map[string][]chan model.Action) + } + + if s.subscriberChannels[sourceID] == nil { + s.subscriberChannels[sourceID] = make(map[string][]chan model.Action) + } + + c := make(chan model.Action, 1) + + for _, name := range names { + if s.subscriberChannels[sourceID][name] == nil { + s.subscriberChannels[sourceID][name] = make([]chan model.Action, 0) + } + s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name], c) + } + + return c, nil +} + +func (s *ActionService) Unsubscribe(sourceID string, name string, c chan model.Action) error { + if s.subscriberChannels == nil { + return ErrSubscriberChannelsNotFound + } + + if s.subscriberChannels[sourceID] == nil { + return ErrActionSourceIDNotFound + } + + if s.subscriberChannels[sourceID][name] == nil { + return ErrActionNameNotFound + } + + for i, subscriber := range s.subscriberChannels[sourceID][name] { + if subscriber == c { + s.subscriberChannels[sourceID][name] = append(s.subscriberChannels[sourceID][name][:i], s.subscriberChannels[sourceID][name][i+1:]...) + return nil + } + } + + return nil +} + +func (s *ActionService) Start(ctx *context.Context) { + s.ctx = ctx + + s.inboundChannel = make(chan model.Action) + s.subscriberChannels = make(map[string]map[string][]chan model.Action) + s.stop = make(chan struct{}) + + defer func() { + if s.subscriberChannels != nil { + for sourceID, source := range s.subscriberChannels { + for actionName, subscribers := range source { + for _, subscriber := range subscribers { + select { + case _, ok := <-subscriber: + if ok { + close(subscriber) + } + default: + continue + } + } + delete(s.subscriberChannels[sourceID], actionName) + } + delete(s.subscriberChannels, sourceID) + } + s.subscriberChannels = nil + } + + close(s.inboundChannel) + s.inboundChannel = nil + + close(s.stop) + s.stop = nil + }() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + + case <-(*s.ctx).Done(): + return + + case action, ok := <-s.inboundChannel: + if !ok { + return + } + + if s.subscriberChannels == nil { + continue + } + + if s.subscriberChannels[action.SourceID] == nil { + continue + } + + if s.subscriberChannels[action.SourceID][action.Name] == nil { + continue + } + + for _, c := range s.subscriberChannels[action.SourceID][action.Name] { + select { + case c <- action: + case <-(*s.ctx).Done(): + return + default: // drop event if no one is listening + continue + } + } + + case <-ticker.C: + if s.subscriberChannels == nil { + continue + } + + heartbeat := model.Action{ + SourceID: common.MessageBusSourceID, + Name: common.MessageBusHeartbeatName, + Timestamp: time.Now().Unix(), + } + + for _, source := range s.subscriberChannels { + for _, subscribers := range source { + for _, subscriber := range subscribers { + select { + case subscriber <- heartbeat: + case <-(*s.ctx).Done(): + return + default: // drop event if no one is listening + continue + } + } + } + } + } + } +} + +func NewActionService(repository *repository.Repository) *ActionService { + return &ActionService{ + repository: repository, + } +} diff --git a/service/event_type_service.go b/service/event_type_service.go index afe2306..7f17a5c 100644 --- a/service/event_type_service.go +++ b/service/event_type_service.go @@ -12,7 +12,7 @@ import ( "go.uber.org/zap" ) -type EventTypeService struct { +type EventService struct { ctx *context.Context repository *repository.Repository inboundChannel chan model.Event @@ -21,31 +21,29 @@ type EventTypeService struct { } var ( - ErrInboundChannelNotFound = errors.New("inbound channel not found") - ErrSubscriberChannelsNotFound = errors.New("subscriber channels not found") - ErrEventSourceIDNotFound = errors.New("event source id not found") - ErrEventNameNotFound = errors.New("event name not found") + ErrEventSourceIDNotFound = errors.New("event source id not found") + ErrEventNameNotFound = errors.New("event name not found") ) -func (s *EventTypeService) GetEventTypes() ([]model.EventType, error) { +func (s *EventService) GetEventTypes() ([]model.EventType, error) { return (*s.repository).GetEventTypes() } -func (s *EventTypeService) RegisterEventType(eventType model.EventType) (*model.EventType, error) { +func (s *EventService) RegisterEventType(eventType model.EventType) (*model.EventType, error) { // TODO - ensure sourceID and name are URL safe return (*s.repository).RegisterEventType(eventType) } -func (s *EventTypeService) GetEventTypesBySourceID(sourceID string) ([]model.EventType, error) { +func (s *EventService) GetEventTypesBySourceID(sourceID string) ([]model.EventType, error) { return (*s.repository).GetEventTypesBySourceID(sourceID) } -func (s *EventTypeService) GetEventType(sourceID string, name string) (*model.EventType, error) { +func (s *EventService) GetEventType(sourceID string, name string) (*model.EventType, error) { return (*s.repository).GetEventType(sourceID, name) } -func (s *EventTypeService) Publish(event model.Event) (*model.Event, error) { +func (s *EventService) Publish(event model.Event) (*model.Event, error) { if s.inboundChannel == nil { return nil, ErrInboundChannelNotFound } @@ -68,7 +66,7 @@ func (s *EventTypeService) Publish(event model.Event) (*model.Event, error) { return &event, nil } -func (s *EventTypeService) Subscribe(sourceID string, names []string) (chan model.Event, error) { +func (s *EventService) Subscribe(sourceID string, names []string) (chan model.Event, error) { if len(names) == 0 { eventTypes, err := s.GetEventTypesBySourceID(sourceID) if err != nil { @@ -111,7 +109,7 @@ func (s *EventTypeService) Subscribe(sourceID string, names []string) (chan mode return c, nil } -func (s *EventTypeService) Unsubscribe(sourceID string, name string, c chan model.Event) error { +func (s *EventService) Unsubscribe(sourceID string, name string, c chan model.Event) error { if s.subscriberChannels == nil { return ErrSubscriberChannelsNotFound } @@ -135,7 +133,7 @@ func (s *EventTypeService) Unsubscribe(sourceID string, name string, c chan mode return nil } -func (s *EventTypeService) Start(ctx *context.Context) { +func (s *EventService) Start(ctx *context.Context) { s.ctx = ctx s.inboundChannel = make(chan model.Event) @@ -205,14 +203,15 @@ func (s *EventTypeService) Start(ctx *context.Context) { continue } } + case <-ticker.C: if s.subscriberChannels == nil { continue } - heartbeatEvent := model.Event{ + heartbeat := model.Event{ SourceID: common.MessageBusSourceID, - Name: common.MessageBusHeartbeatEventName, + Name: common.MessageBusHeartbeatName, Timestamp: time.Now().Unix(), } @@ -220,7 +219,7 @@ func (s *EventTypeService) Start(ctx *context.Context) { for _, subscribers := range source { for _, subscriber := range subscribers { select { - case subscriber <- heartbeatEvent: + case subscriber <- heartbeat: case <-(*s.ctx).Done(): return default: // drop event if no one is listening @@ -233,8 +232,8 @@ func (s *EventTypeService) Start(ctx *context.Context) { } } -func NewEventTypeService(repository *repository.Repository) *EventTypeService { - return &EventTypeService{ +func NewEventService(repository *repository.Repository) *EventService { + return &EventService{ repository: repository, } } diff --git a/service/event_type_service_test.go b/service/event_type_service_test.go index 49e0853..d194c5c 100644 --- a/service/event_type_service_test.go +++ b/service/event_type_service_test.go @@ -19,7 +19,7 @@ func TestEventTypeService(t *testing.T) { defer repository.Close() // new service - service := NewEventTypeService(&repository) + service := NewEventService(&repository) // new context ctx, cancel := context.WithCancel(context.Background()) diff --git a/service/services.go b/service/services.go index f0fba6a..f6932ac 100644 --- a/service/services.go +++ b/service/services.go @@ -2,20 +2,29 @@ package service import ( "context" + "errors" "github.com/IceWhaleTech/CasaOS-MessageBus/repository" ) type Services struct { - EventTypeService *EventTypeService + EventService *EventService + ActionService *ActionService } +var ( + ErrInboundChannelNotFound = errors.New("inbound channel not found") + ErrSubscriberChannelsNotFound = errors.New("subscriber channels not found") +) + func (s *Services) Start(ctx *context.Context) { - go s.EventTypeService.Start(ctx) + go s.EventService.Start(ctx) + go s.ActionService.Start(ctx) } func NewServices(repository *repository.Repository) Services { return Services{ - EventTypeService: NewEventTypeService(repository), + EventService: NewEventService(repository), + ActionService: NewActionService(repository), } }