Skip to content

Commit

Permalink
add a more friendly method AddOnePath for DynamicStream
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Aug 22, 2024
1 parent 3b63651 commit c1cf73b
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
4 changes: 2 additions & 2 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func NewDispatcher(tableSpan *common.TableSpan, sink sink.Sink, startTs uint64,

dispatcherEventsDynamicStream := appcontext.GetService[dynstream.DynamicStream[common.DispatcherID, *common.TxnEvent, *Dispatcher]](appcontext.DispatcherEventsDynamicStream)

err := dispatcherEventsDynamicStream.AddPath(dynstream.PathAndDest[common.DispatcherID, *Dispatcher]{Path: dispatcher.id, Dest: dispatcher})
err := dispatcherEventsDynamicStream.AddOnePath(dispatcher.id, dispatcher)
if err != nil {
log.Error("add dispatcher to dynamic stream failed", zap.Error(err))
}

dispatcherStatusDynamicStream := appcontext.GetService[dynstream.DynamicStream[common.DispatcherID, DispatcherStatusWithDispatcherID, *Dispatcher]](appcontext.DispatcherStatusDynamicStream)
err = dispatcherStatusDynamicStream.AddPath(dynstream.PathAndDest[common.DispatcherID, *Dispatcher]{Path: dispatcher.id, Dest: dispatcher})
err = dispatcherStatusDynamicStream.AddOnePath(dispatcher.id, dispatcher)
if err != nil {
log.Error("add dispatcher to dynamic stream failed", zap.Error(err))
}
Expand Down
4 changes: 2 additions & 2 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func NewHeartBeatCollector(serverId messaging.ServerId) *HeartBeatCollector {

func (c *HeartBeatCollector) RegisterEventDispatcherManager(m *EventDispatcherManager) error {
m.SetHeartbeatRequestQueue(c.requestQueue)
err := c.heartBeatResponseDynamicStream.AddPath(dynstream.PathAndDest[model.ChangeFeedID, *EventDispatcherManager]{Path: m.changefeedID, Dest: m})
err := c.heartBeatResponseDynamicStream.AddOnePath(m.changefeedID, m)
if err != nil {
log.Error("heartBeatResponseDynamicStream Failed to add path", zap.Any("ChangefeedID", m.changefeedID))
return err
}
err = c.schedulerDispatcherRequestDynamicStream.AddPath(dynstream.PathAndDest[model.ChangeFeedID, *EventDispatcherManager]{Path: m.changefeedID, Dest: m})
err = c.schedulerDispatcherRequestDynamicStream.AddOnePath(m.changefeedID, m)
if err != nil {
log.Error("schedulerDispatcherRequestDynamicStream Failed to add path", zap.Any("ChangefeedID", m.changefeedID))
return err
Expand Down
4 changes: 4 additions & 0 deletions utils/dynstream/dynamic_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func (d *dynamicStreamImpl[P, T, D]) AddPath(paths ...PathAndDest[P, D]) error {
return add.error
}

func (d *dynamicStreamImpl[P, T, D]) AddOnePath(path P, dest D) error {
return d.AddPath(PathAndDest[P, D]{Path: path, Dest: dest})
}

func (d *dynamicStreamImpl[P, T, D]) RemovePath(paths ...P) []error {
remove := &removePathCmd[P]{paths: paths}
cmd := &cmd{
Expand Down
1 change: 1 addition & 0 deletions utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type DynamicStream[P Path, T Event, D Dest] interface {
// If some paths already exist, it will return an ErrorTypeDuplicate error. And no paths are added.
// If the stream is closed, it will return an ErrorTypeClosed error.
AddPath(paths ...PathAndDest[P, D]) error
AddOnePath(path P, dest D) error

// RemovePath removes the paths from the dynamic stream.
// After this call return, future events with the paths will be dropped, including events which are already in the stream.
Expand Down

0 comments on commit c1cf73b

Please sign in to comment.