Skip to content

Commit

Permalink
update the doc and use the collection start position for the repeated…
Browse files Browse the repository at this point in the history
… collection

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG committed Jul 31, 2024
1 parent 9ff5f50 commit 97a9e23
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 31 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ jobs:
working-directory: server
shell: bash
run: |
cp ../deployment/docker/cdc.yaml configs/cdc.yaml
./milvus-cdc > server.log 2>&1 &
sleep 20s
Expand Down
7 changes: 7 additions & 0 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type ChannelManager interface {
GetChannelChan() <-chan string
GetMsgChan(pChannel string) <-chan *ReplicateMsg
GetEventChan() <-chan *ReplicateAPIEvent

GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)
}

type TargetAPI interface {
Expand Down Expand Up @@ -131,6 +133,11 @@ func (d *DefaultChannelManager) GetEventChan() <-chan *ReplicateAPIEvent {
return nil
}

func (d *DefaultChannelManager) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error) {
log.Warn("GetChannelLatestMsgID is not implemented, please check it")
return nil, nil
}

type DefaultTargetAPI struct{}

var _ TargetAPI = (*DefaultTargetAPI)(nil)
Expand Down
55 changes: 55 additions & 0 deletions core/mocks/channel_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {

recordCreateCollectionTime := make(map[string]*pb.CollectionInfo)
repeatedCollectionID := make(map[int64]struct{})
repeatedCollectionName := make(map[string]struct{})
for _, info := range existedCollectionInfos {
collectionName := info.Schema.GetName()
createTime := info.CreateTime
Expand All @@ -208,6 +209,7 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
} else {
repeatedCollectionID[info.ID] = struct{}{}
}
repeatedCollectionName[collectionName] = struct{}{}
} else {
recordCreateCollectionTime[collectionName] = info
}
Expand Down Expand Up @@ -235,8 +237,8 @@ func (reader *CollectionReader) StartRead(ctx context.Context) {
seekPositions := make([]*msgpb.MsgPosition, 0)
if collectionSeekPositionMap != nil {
seekPositions = lo.Values(collectionSeekPositionMap)
} else {
log.Warn("server warn: the seek position of the existed collection is not found, use the collection start position.", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID))
} else if _, ok := repeatedCollectionName[info.Schema.Name]; ok {
log.Warn("server warn: find the repeated collection, the latest collection will use the collection start position.", zap.String("name", info.Schema.Name), zap.Int64("collection_id", info.ID))
for _, v := range info.StartPositions {
seekPositions = append(seekPositions, &msgstream.MsgPosition{
ChannelName: v.GetKey(),
Expand Down
6 changes: 5 additions & 1 deletion core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,10 @@ func (r *replicateChannelManager) GetEventChan() <-chan *api.ReplicateAPIEvent {
return r.apiEventChan
}

func (r *replicateChannelManager) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error) {
return r.streamCreator.GetChannelLatestMsgID(ctx, channelName)
}

// startReadChannel start read channel
// pChannelName: source milvus channel name, collectionID: source milvus collection id, startPosition: start position of the source milvus collection
// targetInfo: target collection info, it will be used to replace the message info in the source milvus channel
Expand Down Expand Up @@ -1571,7 +1575,7 @@ func initReplicateChannelHandler(ctx context.Context,
targetClient api.TargetAPI, metaOp api.MetaOp, apiEventChan chan *api.ReplicateAPIEvent,
opts *model.HandlerOpts, streamCreator StreamCreator,
) (*replicateChannelHandler, error) {
err := streamCreator.CheckConnection(sourceInfo.VChannelName, sourceInfo.SeekPosition)
err := streamCreator.CheckConnection(ctx, sourceInfo.VChannelName, sourceInfo.SeekPosition)
if err != nil {
log.Warn("fail to connect the mq stream",
zap.String("channel_name", sourceInfo.VChannelName),
Expand Down
19 changes: 14 additions & 5 deletions core/reader/stream_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import (

type StreamCreator interface {
GetStreamChan(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) (<-chan *msgstream.MsgPack, io.Closer, error)
CheckConnection(vchannel string, seekPosition *msgstream.MsgPosition) error
CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error
GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)
}

type FactoryStreamCreator struct {
Expand All @@ -58,15 +59,19 @@ func (fsc *FactoryStreamCreator) GetStreamChan(ctx context.Context,
}), nil
}

func (fsc *FactoryStreamCreator) CheckConnection(vchannel string, seekPosition *msgstream.MsgPosition) error {
stream, err := getStream(context.Background(), fsc.factory, vchannel, seekPosition)
func (fsc *FactoryStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error {
stream, err := getStream(ctx, fsc.factory, vchannel, seekPosition)
if err != nil {
return err
}
stream.Close()
return nil
}

func (fsc *FactoryStreamCreator) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error) {
return msgstream.GetChannelLatestMsgID(ctx, fsc.factory, channelName)
}

func getStream(ctx context.Context,
factory msgstream.Factory,
vchannel string,
Expand Down Expand Up @@ -140,15 +145,19 @@ func (dcsc *DisptachClientStreamCreator) GetStreamChan(ctx context.Context,
}), nil
}

func (dcsc *DisptachClientStreamCreator) CheckConnection(vchannel string, seekPosition *msgstream.MsgPosition) error {
stream, err := getStream(context.Background(), dcsc.factory, vchannel, seekPosition)
func (dcsc *DisptachClientStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error {
stream, err := getStream(ctx, dcsc.factory, vchannel, seekPosition)
if err != nil {
return err
}
stream.Close()
return nil
}

func (dcsc *DisptachClientStreamCreator) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error) {
return msgstream.GetChannelLatestMsgID(ctx, dcsc.factory, channelName)
}

type StreamCloser func()

func (sc StreamCloser) Close() error {
Expand Down
22 changes: 11 additions & 11 deletions doc/cdc-usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,18 @@ maxNameLength: 256
# cdc meta data config
metaStoreConfig:
# the metastore type, available value: etcd, mysql
storeType: etcd
storeType: mysql
# etcd address
etcd:
address:
- http://127.0.0.1:2379
enableAuth: false
username: root
password: root123456
enableTLS: false
tlsCertPath: deployment/cert/client.pem # path to your cert file
tlsKeyPath: deployment/cert/client.key # path to your key file
tlsCACertPath: deployment/cert/ca.pem # path to your CACert file
# etcd:
# address:
# - http://127.0.0.1:2379
# enableAuth: false
# username: root
# password: root123456
# enableTLS: false
# tlsCertPath: deployment/cert/client.pem # path to your cert file
# tlsKeyPath: deployment/cert/client.key # path to your key file
# tlsCACertPath: deployment/cert/ca.pem # path to your CACert file
# mysql connection address
mysqlSourceUrl: root:root@tcp(127.0.0.1:3306)/milvus-cdc?charset=utf8
# meta data prefix, if multiple cdc services use the same store service, you can set different rootPaths to achieve multi-tenancy
Expand Down
24 changes: 12 additions & 12 deletions server/configs/cdc.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
address: 0.0.0.0:8444
maxTaskNum: 100
metaStoreConfig:
storeType: etcd
etcd:
address:
- http://127.0.0.1:2379
enableAuth: false
username: root
password: root123456
enableTLS: false
tlsCertPath: deployment/cert/client.pem # path to your cert file
tlsKeyPath: deployment/cert/client.key # path to your key file
tlsCACertPath: deployment/cert/ca.pem # path to your CACert file
tlsMinVersion: 1.3
storeType: mysql
# etcd:
# address:
# - http://127.0.0.1:2379
# enableAuth: false
# username: root
# password: root123456
# enableTLS: false
# tlsCertPath: deployment/cert/client.pem # path to your cert file
# tlsKeyPath: deployment/cert/client.key # path to your key file
# tlsCACertPath: deployment/cert/ca.pem # path to your CACert file
# tlsMinVersion: 1.3
mysqlSourceUrl: root:root@tcp(127.0.0.1:3306)/milvuscdc?charset=utf8
rootPath: cdc-by-dev
sourceConfig:
Expand Down

0 comments on commit 97a9e23

Please sign in to comment.