Skip to content

Commit

Permalink
gRPC streaming clean up (dydxprotocol#1906)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 authored Jul 13, 2024
1 parent cd75070 commit 629ddf5
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 143 deletions.
32 changes: 16 additions & 16 deletions protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ import (
servicemetrics "github.com/skip-mev/slinky/service/metrics"
promserver "github.com/skip-mev/slinky/service/servers/prometheus"

// Grpc Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/grpc/types"
// Full Node Streaming
streaming "github.com/dydxprotocol/v4-chain/protocol/streaming"
streamingtypes "github.com/dydxprotocol/v4-chain/protocol/streaming/types"
)

var (
Expand Down Expand Up @@ -333,9 +333,9 @@ type App struct {
// module configurator
configurator module.Configurator

IndexerEventManager indexer_manager.IndexerEventManager
GrpcStreamingManager streamingtypes.GrpcStreamingManager
Server *daemonserver.Server
IndexerEventManager indexer_manager.IndexerEventManager
FullNodeStreamingManager streamingtypes.FullNodeStreamingManager
Server *daemonserver.Server

// startDaemons encapsulates the logic that starts all daemons and daemon services. This function contains a
// closure of all relevant data structures that are shared with various keepers. Daemon services startup is
Expand Down Expand Up @@ -469,8 +469,8 @@ func New(
if app.SlinkyClient != nil {
app.SlinkyClient.Stop()
}
if app.GrpcStreamingManager != nil {
app.GrpcStreamingManager.Stop()
if app.FullNodeStreamingManager != nil {
app.FullNodeStreamingManager.Stop()
}
return nil
},
Expand Down Expand Up @@ -732,7 +732,7 @@ func New(
indexerFlags.SendOffchainData,
)

app.GrpcStreamingManager = getGrpcStreamingManagerFromOptions(appFlags, logger)
app.FullNodeStreamingManager = getFullNodeStreamingManagerFromOptions(appFlags, logger)

timeProvider := &timelib.TimeProviderImpl{}

Expand Down Expand Up @@ -1041,7 +1041,7 @@ func New(
logger.Info("Parsed CLOB flags", "Flags", clobFlags)

memClob := clobmodulememclob.NewMemClobPriceTimePriority(app.IndexerEventManager.Enabled())
memClob.SetGenerateOrderbookUpdates(app.GrpcStreamingManager.Enabled())
memClob.SetGenerateOrderbookUpdates(app.FullNodeStreamingManager.Enabled())

app.ClobKeeper = clobmodulekeeper.NewKeeper(
appCodec,
Expand All @@ -1064,7 +1064,7 @@ func New(
app.StatsKeeper,
app.RewardsKeeper,
app.IndexerEventManager,
app.GrpcStreamingManager,
app.FullNodeStreamingManager,
txConfig.TxDecoder(),
clobFlags,
rate_limit.NewPanicRateLimiter[sdk.Msg](),
Expand Down Expand Up @@ -1960,15 +1960,15 @@ func getIndexerFromOptions(
return indexerMessageSender, indexerFlags
}

// getGrpcStreamingManagerFromOptions returns an instance of a streamingtypes.GrpcStreamingManager from the specified
// options. This function will default to returning a no-op instance.
func getGrpcStreamingManagerFromOptions(
// getFullNodeStreamingManagerFromOptions returns an instance of a streamingtypes.FullNodeStreamingManager
// from the specified options. This function will default to returning a no-op instance.
func getFullNodeStreamingManagerFromOptions(
appFlags flags.Flags,
logger log.Logger,
) (manager streamingtypes.GrpcStreamingManager) {
) (manager streamingtypes.FullNodeStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager(
return streaming.NewFullNodeStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBatchSize,
Expand Down
4 changes: 2 additions & 2 deletions protocol/mocks/ClobKeeper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 629ddf5

Please sign in to comment.