Skip to content

Commit

Permalink
fix: Prevent simultaneous balance of segments and channels (#37850) (#…
Browse files Browse the repository at this point in the history
…37939)

issue: #33550
pr: #37850
balance segment and balance segment execute at same time, which will
cause bounch of corner case.

This PR disable simultaneous balance of segments and channels

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Nov 26, 2024
1 parent 8601f3e commit 93063ce
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 45 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ queryCoord:
balanceCostThreshold: 0.001 # the threshold of balance cost, if the difference of cluster's cost after executing the balance plan is less than this value, the plan will not be executed
checkSegmentInterval: 1000
checkChannelInterval: 1000
checkBalanceInterval: 10000
checkBalanceInterval: 3000
checkIndexInterval: 10000
channelTaskTimeout: 60000 # 1 minute
segmentTaskTimeout: 120000 # 2 minute
Expand Down
8 changes: 8 additions & 0 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ func (b *RoundRobinBalancer) BalanceReplica(replica *meta.Replica) ([]SegmentAss
return nil, nil
}

func (b *RoundRobinBalancer) permitBalanceChannel(collectionID int64) bool {
return b.scheduler.GetSegmentTaskNum(task.WithCollectionID2TaskFilter(collectionID), task.WithTaskTypeFilter(task.TaskTypeMove)) == 0
}

func (b *RoundRobinBalancer) permitBalanceSegment(collectionID int64) bool {
return b.scheduler.GetChannelTaskNum(task.WithCollectionID2TaskFilter(collectionID), task.WithTaskTypeFilter(task.TaskTypeMove)) == 0
}

func (b *RoundRobinBalancer) getNodes(nodes []int64) []*session.NodeInfo {
ret := make([]*session.NodeInfo, 0, len(nodes))
for _, n := range nodes {
Expand Down
11 changes: 7 additions & 4 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,19 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(replica *meta.Replica) (segme
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, channelName, rwNodes, roNodes)...)
if len(channelPlans) == 0 {
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, channelName, rwNodes, roNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, channelName, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(replica, channelName, rwNodes)...)
}

if len(channelPlans) == 0 {
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genSegmentPlan(br, replica, channelName, rwNodes)...)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (suite *ChannelLevelScoreBalancerTestSuite) SetupTest() {

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *ChannelLevelScoreBalancerTestSuite) TearDownTest() {
Expand Down
10 changes: 6 additions & 4 deletions internal/querycoordv2/balance/multi_target_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,16 +497,18 @@ func (b *MultiTargetBalancer) BalanceReplica(replica *meta.Replica) (segmentPlan
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, rwNodes, roNodes)...)
if len(channelPlans) == 0 {
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, rwNodes, roNodes)...)
}
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(br, replica, rwNodes)...)
}

if len(channelPlans) == 0 {
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = b.genSegmentPlan(replica, rwNodes)
}
}
Expand Down
11 changes: 7 additions & 4 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,19 @@ func (b *RowCountBasedBalancer) BalanceReplica(replica *meta.Replica) (segmentPl
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, rwNodes, roNodes)...)
if len(channelPlans) == 0 {
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, rwNodes, roNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(br, replica, rwNodes)...)
}

if len(channelPlans) == 0 {
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genSegmentPlan(replica, rwNodes)...)
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/querycoordv2/balance/rowcount_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *RowCountBasedBalancerTestSuite) TearDownTest() {
Expand Down
10 changes: 6 additions & 4 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,16 +308,18 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans
)
br.AddRecord(StrRecordf("executing stopping balance: %v", roNodes))
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, rwNodes, roNodes)...)
if len(channelPlans) == 0 {
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(replica, rwNodes, roNodes)...)
}
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(replica, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(br, replica, rwNodes)...)
}

if len(channelPlans) == 0 {
if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genSegmentPlan(br, replica, rwNodes)...)
}
}
Expand Down
91 changes: 91 additions & 0 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *ScoreBasedBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -605,6 +607,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
for i, node := range c.nodes {
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
}

// 4. balance and verify result
Expand Down Expand Up @@ -1123,3 +1127,90 @@ func (suite *ScoreBasedBalancerTestSuite) getCollectionBalancePlans(balancer *Sc
}
return segmentPlans, channelPlans
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceSegmentAndChannel() {
nodes := []int64{1, 2, 3}
collectionID := int64(1)
replicaID := int64(1)
collectionsSegments := []*datapb.SegmentInfo{
{ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1},
}
states := []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}

balancer := suite.balancer

collection := utils.CreateTestCollection(collectionID, int32(replicaID))
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(
nil, collectionsSegments, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, collectionID).Return([]int64{collectionID}, nil).Maybe()
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(collectionID, collectionID))
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(replicaID, collectionID, nodes))
balancer.targetMgr.UpdateCollectionNextTarget(collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(collectionID)

for i := range nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: nodes[i],
Address: "127.0.0.1:0",
Hostname: "localhost",
Version: common.Version,
})
nodeInfo.SetState(states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(nodes[i])
}
utils.RecoverAllCollection(balancer.meta)

// set unbalance segment distribution
balancer.dist.SegmentDistManager.Update(1, []*meta.Segment{
{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1},
{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 1},
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 10}, Node: 1},
}...)

// expect to generate 2 balance segment task
suite.mockScheduler.ExpectedCalls = nil
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
segmentPlans, _ := suite.getCollectionBalancePlans(balancer, collectionID)
suite.Equal(len(segmentPlans), 2)

// mock balance channel is executing, expect to generate 0 balance segment task
suite.mockScheduler.ExpectedCalls = nil
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(1).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
segmentPlans, _ = suite.getCollectionBalancePlans(balancer, collectionID)
suite.Equal(len(segmentPlans), 0)

// set unbalance channel distribution
balancer.dist.ChannelDistManager.Update(1, []*meta.DmChannel{
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "channel1"}, Node: 1},
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "channel2"}, Node: 1},
{VchannelInfo: &datapb.VchannelInfo{CollectionID: 1, ChannelName: "channel3"}, Node: 1},
}...)

// expect to generate 2 balance segment task
suite.mockScheduler.ExpectedCalls = nil
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
_, channelPlans := suite.getCollectionBalancePlans(balancer, collectionID)
suite.Equal(len(channelPlans), 2)

// mock balance channel is executing, expect to generate 0 balance segment task
suite.mockScheduler.ExpectedCalls = nil
suite.mockScheduler.EXPECT().GetChannelTaskNum(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskNum(mock.Anything, mock.Anything).Return(1).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
_, channelPlans = suite.getCollectionBalancePlans(balancer, collectionID)
suite.Equal(len(channelPlans), 0)
}
68 changes: 48 additions & 20 deletions internal/querycoordv2/task/mock_scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 93063ce

Please sign in to comment.