From 5bee7e58969279731d53c621aaf172ebbf70d42c Mon Sep 17 00:00:00 2001 From: congqixia Date: Fri, 22 Nov 2024 11:54:32 +0800 Subject: [PATCH] enhance: Refine Replica manager colle2Replicas secondary index (#37906) Related to #37630 This PR add a new util coll2Replicas secondary index to reduce map access & iteration while get replicas by collection --------- Signed-off-by: Congqi Xia --- internal/querycoordv2/meta/replica_manager.go | 101 +++++++++++------- 1 file changed, 60 insertions(+), 41 deletions(-) diff --git a/internal/querycoordv2/meta/replica_manager.go b/internal/querycoordv2/meta/replica_manager.go index f59bc39cf3ed1..8b25e921677fe 100644 --- a/internal/querycoordv2/meta/replica_manager.go +++ b/internal/querycoordv2/meta/replica_manager.go @@ -35,18 +35,43 @@ import ( type ReplicaManager struct { rwmutex sync.RWMutex - idAllocator func() (int64, error) - replicas map[typeutil.UniqueID]*Replica - collIDToReplicaIDs map[typeutil.UniqueID]typeutil.UniqueSet - catalog metastore.QueryCoordCatalog + idAllocator func() (int64, error) + replicas map[typeutil.UniqueID]*Replica + coll2Replicas map[typeutil.UniqueID]*collectionReplicas // typeutil.UniqueSet + catalog metastore.QueryCoordCatalog +} + +// collectionReplicas maintains collection secondary index mapping +type collectionReplicas struct { + id2replicas map[typeutil.UniqueID]*Replica + replicas []*Replica +} + +func (crs *collectionReplicas) removeReplicas(replicaIDs ...int64) (empty bool) { + for _, replicaID := range replicaIDs { + delete(crs.id2replicas, replicaID) + } + crs.replicas = lo.Values(crs.id2replicas) + return len(crs.replicas) == 0 +} + +func (crs *collectionReplicas) putReplica(replica *Replica) { + crs.id2replicas[replica.GetID()] = replica + crs.replicas = lo.Values(crs.id2replicas) +} + +func newCollectionReplicas() *collectionReplicas { + return &collectionReplicas{ + id2replicas: make(map[typeutil.UniqueID]*Replica), + } } func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager { return &ReplicaManager{ - idAllocator: idAllocator, - replicas: make(map[int64]*Replica), - collIDToReplicaIDs: make(map[int64]typeutil.UniqueSet), - catalog: catalog, + idAllocator: idAllocator, + replicas: make(map[int64]*Replica), + coll2Replicas: make(map[int64]*collectionReplicas), + catalog: catalog, } } @@ -163,10 +188,10 @@ func (m *ReplicaManager) putReplicaInMemory(replicas ...*Replica) { m.replicas[replica.GetID()] = replica // update collIDToReplicaIDs. - if m.collIDToReplicaIDs[replica.GetCollectionID()] == nil { - m.collIDToReplicaIDs[replica.GetCollectionID()] = typeutil.NewUniqueSet() + if m.coll2Replicas[replica.GetCollectionID()] == nil { + m.coll2Replicas[replica.GetCollectionID()] = newCollectionReplicas() } - m.collIDToReplicaIDs[replica.GetCollectionID()].Insert(replica.GetID()) + m.coll2Replicas[replica.GetCollectionID()].putReplica(replica) } } @@ -217,7 +242,7 @@ func (m *ReplicaManager) MoveReplica(dstRGName string, toMove []*Replica) error // getSrcReplicasAndCheckIfTransferable checks if the collection can be transfer from srcRGName to dstRGName. func (m *ReplicaManager) getSrcReplicasAndCheckIfTransferable(collectionID typeutil.UniqueID, srcRGName string, replicaNum int) ([]*Replica, error) { // Check if collection is loaded. - if m.collIDToReplicaIDs[collectionID] == nil { + if m.coll2Replicas[collectionID] == nil { return nil, merr.WrapErrParameterInvalid( "Collection not loaded", fmt.Sprintf("collectionID %d", collectionID), @@ -248,11 +273,14 @@ func (m *ReplicaManager) RemoveCollection(collectionID typeutil.UniqueID) error if err != nil { return err } - // Remove all replica of collection and remove collection from collIDToReplicaIDs. - for replicaID := range m.collIDToReplicaIDs[collectionID] { - delete(m.replicas, replicaID) + + if collReplicas, ok := m.coll2Replicas[collectionID]; ok { + // Remove all replica of collection and remove collection from collIDToReplicaIDs. + for _, replica := range collReplicas.replicas { + delete(m.replicas, replica.GetID()) + } + delete(m.coll2Replicas, collectionID) } - delete(m.collIDToReplicaIDs, collectionID) return nil } @@ -275,9 +303,8 @@ func (m *ReplicaManager) removeReplicas(collectionID typeutil.UniqueID, replicas delete(m.replicas, replica) } - m.collIDToReplicaIDs[collectionID].Remove(replicas...) - if m.collIDToReplicaIDs[collectionID].Len() == 0 { - delete(m.collIDToReplicaIDs, collectionID) + if m.coll2Replicas[collectionID].removeReplicas(replicas...) { + delete(m.coll2Replicas, collectionID) } return nil @@ -290,22 +317,20 @@ func (m *ReplicaManager) GetByCollection(collectionID typeutil.UniqueID) []*Repl } func (m *ReplicaManager) getByCollection(collectionID typeutil.UniqueID) []*Replica { - replicas := make([]*Replica, 0) - if m.collIDToReplicaIDs[collectionID] != nil { - for replicaID := range m.collIDToReplicaIDs[collectionID] { - replicas = append(replicas, m.replicas[replicaID]) - } + collReplicas, ok := m.coll2Replicas[collectionID] + if !ok { + return nil } - return replicas + + return collReplicas.replicas } func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.UniqueID) *Replica { m.rwmutex.RLock() defer m.rwmutex.RUnlock() - if m.collIDToReplicaIDs[collectionID] != nil { - for replicaID := range m.collIDToReplicaIDs[collectionID] { - replica := m.replicas[replicaID] + if m.coll2Replicas[collectionID] != nil { + for _, replica := range m.coll2Replicas[collectionID].replicas { if replica.Contains(nodeID) { return replica } @@ -330,19 +355,14 @@ func (m *ReplicaManager) GetByNode(nodeID typeutil.UniqueID) []*Replica { } func (m *ReplicaManager) getByCollectionAndRG(collectionID int64, rgName string) []*Replica { - replicaIDs, ok := m.collIDToReplicaIDs[collectionID] + collReplicas, ok := m.coll2Replicas[collectionID] if !ok { - return make([]*Replica, 0) + return nil } - ret := make([]*Replica, 0) - replicaIDs.Range(func(replicaID typeutil.UniqueID) bool { - if m.replicas[replicaID].GetResourceGroup() == rgName { - ret = append(ret, m.replicas[replicaID]) - } - return true + return lo.Filter(collReplicas.replicas, func(replica *Replica, _ int) bool { + return replica.GetResourceGroup() == rgName }) - return ret } func (m *ReplicaManager) GetByResourceGroup(rgName string) []*Replica { @@ -426,17 +446,16 @@ func (m *ReplicaManager) validateResourceGroups(rgs map[string]typeutil.UniqueSe // getCollectionAssignmentHelper checks if the collection is recoverable and group replicas by resource group. func (m *ReplicaManager) getCollectionAssignmentHelper(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) (*collectionAssignmentHelper, error) { // check if the collection is exist. - replicaIDs, ok := m.collIDToReplicaIDs[collectionID] + collReplicas, ok := m.coll2Replicas[collectionID] if !ok { return nil, errors.Errorf("collection %d not loaded", collectionID) } rgToReplicas := make(map[string][]*Replica) - for replicaID := range replicaIDs { - replica := m.replicas[replicaID] + for _, replica := range collReplicas.replicas { rgName := replica.GetResourceGroup() if _, ok := rgs[rgName]; !ok { - return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replicaID, rgName) + return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replica.GetID(), rgName) } if _, ok := rgToReplicas[rgName]; !ok { rgToReplicas[rgName] = make([]*Replica, 0)