Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: [2.4] Refine Replica manager colle2Replicas secondary index (#37906) #37970

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 60 additions & 41 deletions internal/querycoordv2/meta/replica_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,43 @@
type ReplicaManager struct {
rwmutex sync.RWMutex

idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
collIDToReplicaIDs map[typeutil.UniqueID]typeutil.UniqueSet
catalog metastore.QueryCoordCatalog
idAllocator func() (int64, error)
replicas map[typeutil.UniqueID]*Replica
coll2Replicas map[typeutil.UniqueID]*collectionReplicas // typeutil.UniqueSet
catalog metastore.QueryCoordCatalog
}

// collectionReplicas maintains collection secondary index mapping
type collectionReplicas struct {
id2replicas map[typeutil.UniqueID]*Replica
replicas []*Replica
}

func (crs *collectionReplicas) removeReplicas(replicaIDs ...int64) (empty bool) {
for _, replicaID := range replicaIDs {
delete(crs.id2replicas, replicaID)
}
crs.replicas = lo.Values(crs.id2replicas)
return len(crs.replicas) == 0
}

func (crs *collectionReplicas) putReplica(replica *Replica) {
crs.id2replicas[replica.GetID()] = replica
crs.replicas = lo.Values(crs.id2replicas)
}

func newCollectionReplicas() *collectionReplicas {
return &collectionReplicas{
id2replicas: make(map[typeutil.UniqueID]*Replica),
}
}

func NewReplicaManager(idAllocator func() (int64, error), catalog metastore.QueryCoordCatalog) *ReplicaManager {
return &ReplicaManager{
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
collIDToReplicaIDs: make(map[int64]typeutil.UniqueSet),
catalog: catalog,
idAllocator: idAllocator,
replicas: make(map[int64]*Replica),
coll2Replicas: make(map[int64]*collectionReplicas),
catalog: catalog,
}
}

Expand Down Expand Up @@ -163,10 +188,10 @@
m.replicas[replica.GetID()] = replica

// update collIDToReplicaIDs.
if m.collIDToReplicaIDs[replica.GetCollectionID()] == nil {
m.collIDToReplicaIDs[replica.GetCollectionID()] = typeutil.NewUniqueSet()
if m.coll2Replicas[replica.GetCollectionID()] == nil {
m.coll2Replicas[replica.GetCollectionID()] = newCollectionReplicas()
}
m.collIDToReplicaIDs[replica.GetCollectionID()].Insert(replica.GetID())
m.coll2Replicas[replica.GetCollectionID()].putReplica(replica)
}
}

Expand Down Expand Up @@ -217,7 +242,7 @@
// getSrcReplicasAndCheckIfTransferable checks if the collection can be transfer from srcRGName to dstRGName.
func (m *ReplicaManager) getSrcReplicasAndCheckIfTransferable(collectionID typeutil.UniqueID, srcRGName string, replicaNum int) ([]*Replica, error) {
// Check if collection is loaded.
if m.collIDToReplicaIDs[collectionID] == nil {
if m.coll2Replicas[collectionID] == nil {
return nil, merr.WrapErrParameterInvalid(
"Collection not loaded",
fmt.Sprintf("collectionID %d", collectionID),
Expand Down Expand Up @@ -248,11 +273,14 @@
if err != nil {
return err
}
// Remove all replica of collection and remove collection from collIDToReplicaIDs.
for replicaID := range m.collIDToReplicaIDs[collectionID] {
delete(m.replicas, replicaID)

if collReplicas, ok := m.coll2Replicas[collectionID]; ok {
// Remove all replica of collection and remove collection from collIDToReplicaIDs.
for _, replica := range collReplicas.replicas {
delete(m.replicas, replica.GetID())
}
delete(m.coll2Replicas, collectionID)
}
delete(m.collIDToReplicaIDs, collectionID)
return nil
}

Expand All @@ -275,9 +303,8 @@
delete(m.replicas, replica)
}

m.collIDToReplicaIDs[collectionID].Remove(replicas...)
if m.collIDToReplicaIDs[collectionID].Len() == 0 {
delete(m.collIDToReplicaIDs, collectionID)
if m.coll2Replicas[collectionID].removeReplicas(replicas...) {
delete(m.coll2Replicas, collectionID)

Check warning on line 307 in internal/querycoordv2/meta/replica_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/replica_manager.go#L307

Added line #L307 was not covered by tests
}

return nil
Expand All @@ -290,22 +317,20 @@
}

func (m *ReplicaManager) getByCollection(collectionID typeutil.UniqueID) []*Replica {
replicas := make([]*Replica, 0)
if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
replicas = append(replicas, m.replicas[replicaID])
}
collReplicas, ok := m.coll2Replicas[collectionID]
if !ok {
return nil
}
return replicas

return collReplicas.replicas
}

func (m *ReplicaManager) GetByCollectionAndNode(collectionID, nodeID typeutil.UniqueID) *Replica {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

if m.collIDToReplicaIDs[collectionID] != nil {
for replicaID := range m.collIDToReplicaIDs[collectionID] {
replica := m.replicas[replicaID]
if m.coll2Replicas[collectionID] != nil {
for _, replica := range m.coll2Replicas[collectionID].replicas {
if replica.Contains(nodeID) {
return replica
}
Expand All @@ -330,19 +355,14 @@
}

func (m *ReplicaManager) getByCollectionAndRG(collectionID int64, rgName string) []*Replica {
replicaIDs, ok := m.collIDToReplicaIDs[collectionID]
collReplicas, ok := m.coll2Replicas[collectionID]
if !ok {
return make([]*Replica, 0)
return nil

Check warning on line 360 in internal/querycoordv2/meta/replica_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/replica_manager.go#L360

Added line #L360 was not covered by tests
}

ret := make([]*Replica, 0)
replicaIDs.Range(func(replicaID typeutil.UniqueID) bool {
if m.replicas[replicaID].GetResourceGroup() == rgName {
ret = append(ret, m.replicas[replicaID])
}
return true
return lo.Filter(collReplicas.replicas, func(replica *Replica, _ int) bool {
return replica.GetResourceGroup() == rgName
})
return ret
}

func (m *ReplicaManager) GetByResourceGroup(rgName string) []*Replica {
Expand Down Expand Up @@ -426,17 +446,16 @@
// getCollectionAssignmentHelper checks if the collection is recoverable and group replicas by resource group.
func (m *ReplicaManager) getCollectionAssignmentHelper(collectionID typeutil.UniqueID, rgs map[string]typeutil.UniqueSet) (*collectionAssignmentHelper, error) {
// check if the collection is exist.
replicaIDs, ok := m.collIDToReplicaIDs[collectionID]
collReplicas, ok := m.coll2Replicas[collectionID]
if !ok {
return nil, errors.Errorf("collection %d not loaded", collectionID)
}

rgToReplicas := make(map[string][]*Replica)
for replicaID := range replicaIDs {
replica := m.replicas[replicaID]
for _, replica := range collReplicas.replicas {
rgName := replica.GetResourceGroup()
if _, ok := rgs[rgName]; !ok {
return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replicaID, rgName)
return nil, errors.Errorf("lost resource group info, collectionID: %d, replicaID: %d, resourceGroup: %s", collectionID, replica.GetID(), rgName)

Check warning on line 458 in internal/querycoordv2/meta/replica_manager.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/replica_manager.go#L458

Added line #L458 was not covered by tests
}
if _, ok := rgToReplicas[rgName]; !ok {
rgToReplicas[rgName] = make([]*Replica, 0)
Expand Down
Loading