diff --git a/modules/rpc/thrift.go b/modules/rpc/thrift.go index a32e18d44..b07d4ffae 100644 --- a/modules/rpc/thrift.go +++ b/modules/rpc/thrift.go @@ -64,7 +64,8 @@ func newYARPCThriftModule( reg := func(mod *YARPCModule) { _setupMu.Lock() defer _setupMu.Unlock() - Dispatcher().Register(registrants) + + mod.controller.dispatcher.Register(registrants) } return newYARPCModule(mi, reg, options...) diff --git a/modules/rpc/thrift_test.go b/modules/rpc/thrift_test.go index ec7171070..db75c9556 100644 --- a/modules/rpc/thrift_test.go +++ b/modules/rpc/thrift_test.go @@ -25,11 +25,13 @@ import ( "testing" "go.uber.org/fx/config" + "go.uber.org/fx/dig" "go.uber.org/fx/modules" "go.uber.org/fx/service" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" "go.uber.org/yarpc/encoding/thrift" ) @@ -74,6 +76,11 @@ modules: testInitRunModule(t, goofy[0], mci) testInitRunModule(t, gopher[0], mci) + + // Dispatcher must be resolved in the default graph + var dispatcher *yarpc.Dispatcher + assert.NoError(t, dig.Resolve(&dispatcher)) + assert.Equal(t, 2, len(dispatcher.Inbounds())) } func TestThriftModule_BadOptions(t *testing.T) { @@ -105,7 +112,8 @@ func testInitRunModule(t *testing.T, mod service.Module, mci service.ModuleCreat func mch() service.ModuleCreateInfo { return service.ModuleCreateInfo{ - Host: service.NopHost(), + Host: service.NopHost(), + Items: make(map[string]interface{}), } } diff --git a/modules/rpc/yarpc.go b/modules/rpc/yarpc.go index f26c09848..0a83a3d4d 100644 --- a/modules/rpc/yarpc.go +++ b/modules/rpc/yarpc.go @@ -24,6 +24,7 @@ import ( "context" "errors" "fmt" + "strconv" "sync" "go.uber.org/fx/modules" @@ -32,6 +33,7 @@ import ( "go.uber.org/fx/ulog" errs "github.com/pkg/errors" + "go.uber.org/fx/dig" "go.uber.org/yarpc" "go.uber.org/yarpc/api/middleware" "go.uber.org/yarpc/api/transport" @@ -42,13 +44,17 @@ import ( // YARPCModule is an implementation of a core RPC module using YARPC. // All the YARPC modules share the same dispatcher and middleware. // Dispatcher will start when any created module calls Start(). +// The YARPC team advised dispatcher to be a 'singleton' to control +// the lifecycle of all of the in/out bound traffic, so we will +// register it in a dig.Graph provided with options/default graph. type YARPCModule struct { modules.ModuleBase - register registerServiceFunc - config yarpcConfig - log ulog.Log - stateMu sync.RWMutex - isRunning bool + register registerServiceFunc + config yarpcConfig + log ulog.Log + stateMu sync.RWMutex + isRunning bool + controller *dispatcherController } var ( @@ -61,12 +67,6 @@ var ( _starterFn = defaultYARPCStarter _ service.Module = &YARPCModule{} - - // Controller represents a collection of all YARPC configs - // that are stored together to create a shared dispatcher. - // The YARPC team advised it to be a 'singleton' to control - // the lifecycle of all of the in/out bound traffic. - _controller dispatcherController ) type registerServiceFunc func(module *YARPCModule) @@ -91,6 +91,24 @@ type Inbound struct { HTTP *Address } +func (i *Inbound) String() string { + if i == nil { + return "" + } + + http := "none" + if i.HTTP != nil { + http = strconv.Itoa(i.HTTP.Port) + } + + tchannel := "none" + if i.TChannel != nil { + tchannel = strconv.Itoa(i.TChannel.Port) + } + + return fmt.Sprintf("Inbound:{HTTP: %s; TChannel: %s}", http, tchannel) +} + // Address is a struct that have a required port for tchannel/http transports. // TODO(alsam) make it optional type Address struct { @@ -111,7 +129,7 @@ type dispatcherController struct { startError error configs []*yarpcConfig - dispatcher *yarpc.Dispatcher + dispatcher yarpc.Dispatcher } // Adds the config to the controller @@ -155,12 +173,14 @@ func (c *dispatcherController) Start(host service.Host) error { _dispatcherMu.Lock() defer _dispatcherMu.Unlock() - if c.dispatcher, err = _dispatcherFn(host, cfg); err != nil { + var d *yarpc.Dispatcher + if d, err = _dispatcherFn(host, cfg); err != nil { c.startError = err return } - c.startError = _starterFn(c.dispatcher) + c.dispatcher = *d + c.startError = _starterFn(&c.dispatcher) }) return c.startError @@ -249,7 +269,24 @@ func newYARPCModule( module.config.inboundMiddleware = inboundMiddlewareFromCreateInfo(mi) module.config.onewayInboundMiddleware = onewayInboundMiddlewareFromCreateInfo(mi) - _controller.addConfig(module.config) + // Try to resolve a controller first + // TODO(alsam) use dig options when available, because we can overwrite the controller in case of multiple + // modules registering a controller. + if err := dig.Resolve(&module.controller); err != nil { + + // Try to register it then + module.controller = &dispatcherController{} + if errCr := dig.Register(module.controller); errCr != nil { + return nil, errs.Wrap(errCr, "can't register a dispatcher controller") + } + + // Register dispatcher + if err := dig.Register(&module.controller.dispatcher); err != nil { + return nil, errs.Wrap(err, "unable to register the dispatcher") + } + } + + module.controller.addConfig(module.config) module.log.Info("Module successfuly created", "inbounds", module.config.Inbounds) @@ -294,7 +331,7 @@ func (m *YARPCModule) Start(readyCh chan<- struct{}) <-chan error { defer m.stateMu.Unlock() // TODO(alsam) allow services to advertise with a name separate from the host name. - if err := _controller.Start(m.Host()); err != nil { + if err := m.controller.Start(m.Host()); err != nil { ret <- errs.Wrap(err, "unable to start dispatcher") return ret } @@ -316,8 +353,9 @@ func (m *YARPCModule) Stop() error { m.stateMu.Lock() defer m.stateMu.Unlock() + m.isRunning = false - return _controller.Stop() + return m.controller.Stop() } // IsRunning returns whether a module is running @@ -354,9 +392,3 @@ func RegisterStarter(startFn StarterFn) { func defaultYARPCStarter(dispatcher *yarpc.Dispatcher) error { return dispatcher.Start() } - -// Dispatcher returns a dispatcher that can be used to create clients. -// It should be called after at least one module have been started, otherwise it will be nil. -func Dispatcher() *yarpc.Dispatcher { - return _controller.dispatcher -} diff --git a/modules/rpc/yarpc_test.go b/modules/rpc/yarpc_test.go index 75c3243bd..167c563cb 100644 --- a/modules/rpc/yarpc_test.go +++ b/modules/rpc/yarpc_test.go @@ -21,6 +21,7 @@ package rpc import ( + "fmt" "testing" "go.uber.org/fx/service" @@ -68,3 +69,18 @@ func TestMergeOfEmptyConfigCollectionReturnsError(t *testing.T) { assert.EqualError(t, err, "unable to merge empty configs") assert.EqualError(t, c.Start(service.NopHost()), err.Error()) } + +func TestInboundPrint(t *testing.T) { + t.Parallel() + var i *Inbound + assert.Equal(t, "", fmt.Sprint(i)) + + i = &Inbound{} + assert.Equal(t, "Inbound:{HTTP: none; TChannel: none}", fmt.Sprint(i)) + i.HTTP = &Address{8080} + assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: none}", fmt.Sprint(i)) + i.TChannel = &Address{9876} + assert.Equal(t, "Inbound:{HTTP: 8080; TChannel: 9876}", fmt.Sprint(i)) + i.HTTP = nil + assert.Equal(t, "Inbound:{HTTP: none; TChannel: 9876}", fmt.Sprint(i)) +}