diff --git a/go/mysql/binlog_dump.go b/go/mysql/binlog_dump.go index de91484fc64..cfa37cc0878 100644 --- a/go/mysql/binlog_dump.go +++ b/go/mysql/binlog_dump.go @@ -77,7 +77,8 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6 return logFile, logPos, position, readPacketErr } if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" { - position, err = replication.DecodePosition(gtid) + // ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56 + position, _, err = replication.DecodePositionMySQL56(gtid) if err != nil { return logFile, logPos, position, err } diff --git a/go/mysql/replication_test.go b/go/mysql/replication_test.go index ded22d838f6..ca69ec05f2a 100644 --- a/go/mysql/replication_test.go +++ b/go/mysql/replication_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/test/utils" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -88,14 +89,46 @@ func TestComBinlogDumpGTID(t *testing.T) { cConn.Close() }() + t.Run("WriteComBinlogDumpGTIDEmptyGTID", func(t *testing.T) { + // Write ComBinlogDumpGTID packet, read it, compare. + var flags uint16 = 0x0d0e + err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{}) + assert.NoError(t, err) + data, err := sConn.ReadPacket() + require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err) + require.NotEmpty(t, data) + require.EqualValues(t, data[0], ComBinlogDumpGTID) + + expectedData := []byte{ + ComBinlogDumpGTID, + 0x0e, 0x0d, // flags + 0x04, 0x03, 0x02, 0x01, // server-id + 0x07, 0x00, 0x00, 0x00, // binlog-filename-len + 'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename + 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos + 0x00, 0x00, 0x00, 0x00, // data-size is zero, no GTID payload + } + assert.Equal(t, expectedData, data) + logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data) + require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err) + assert.Equal(t, "moofarm", logFile) + assert.Equal(t, uint64(0x05060708090a0b0c), logPos) + assert.True(t, pos.IsZero()) + }) + t.Run("WriteComBinlogDumpGTID", func(t *testing.T) { // Write ComBinlogDumpGTID packet, read it, compare. var flags uint16 = 0x0d0e assert.Equal(t, flags, flags|BinlogThroughGTID) - err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{0xfa, 0xfb}) + gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243") + require.NoError(t, err) + // dataSize := uint32(len(gtidSet.String())) + err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte(gtidSet.String())) assert.NoError(t, err) data, err := sConn.ReadPacket() require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err) + require.NotEmpty(t, data) + require.EqualValues(t, data[0], ComBinlogDumpGTID) expectedData := []byte{ ComBinlogDumpGTID, @@ -104,10 +137,15 @@ func TestComBinlogDumpGTID(t *testing.T) { 0x07, 0x00, 0x00, 0x00, // binlog-filename-len 'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename 0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos - 0x02, 0x00, 0x00, 0x00, // data-size - 0xfa, 0xfb, // data + 0x02a, 0x00, 0x00, 0x00, // data-size + 0x31, 0x36, 0x62, 0x31, 0x30, 0x33, 0x39, 0x66, 0x2d, 0x32, 0x32, 0x62, 0x36, 0x2d, 0x31, 0x31, 0x65, 0x64, 0x2d, 0x62, 0x37, 0x36, 0x35, 0x2d, 0x30, 0x61, 0x34, 0x33, 0x66, 0x39, 0x35, 0x66, 0x32, 0x38, 0x61, 0x33, 0x3a, 0x31, 0x2d, 0x32, 0x34, 0x33, // data } assert.Equal(t, expectedData, data) + logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data) + require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err) + assert.Equal(t, "moofarm", logFile) + assert.Equal(t, uint64(0x05060708090a0b0c), logPos) + assert.Equal(t, gtidSet, pos.GTIDSet) }) sConn.sequence = 0