Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into vschema_topo_version
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 23, 2025
2 parents 60e9de3 + 1df1dc8 commit 2e008f6
Show file tree
Hide file tree
Showing 35 changed files with 5,807 additions and 4,120 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check_make_vtadmin_web_proto.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
uses: actions/setup-node@1e60f620b9541d16bece96c5465dc8ee9832be0b # v4.0.3
with:
# node-version should match package.json
node-version: '20.12.2'
node-version: '22.13.1'

- name: Install npm dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.proto_changes == 'true'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/static_checks_etc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ jobs:
uses: actions/setup-node@1e60f620b9541d16bece96c5465dc8ee9832be0b # v4.0.3
with:
# make proto requires newer node than the pre-installed one
node-version: '20.12.2'
node-version: '22.13.1'

- name: check_make_proto
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.proto_changes == 'true'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/vtadmin_web_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false'
with:
# node-version should match package.json
node-version: '20.12.2'
node-version: '22.13.1'

- name: Install dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/vtadmin_web_lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false'
with:
# node-version should match package.json
node-version: '20.12.2'
node-version: '22.13.1'

- name: Install dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/vtadmin_web_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false'
with:
# node-version should match package.json
node-version: '20.12.2'
node-version: '22.13.1'

- name: Install dependencies
if: steps.skip-workflow.outputs.skip-workflow == 'false'
Expand Down
9 changes: 9 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
- **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)**
- **[VTAdmin](#vtadmin)**
- [Updated to node v22.13.1](#updated-node)

## <a id="major-changes"/>Major Changes</a>

Expand Down Expand Up @@ -150,3 +152,10 @@ While the flag will continue to accept float values (interpreted as seconds) for
The `--topo_read_concurrency` flag was added to all components that access the topology and the provided limit is now applied separately for each global or local cell _(default `32`)_.

All topology read calls _(`Get`, `GetVersion`, `List` and `ListDir`)_ now respect this per-cell limit. Previous to this version a single limit was applied to all cell calls and it was not respected by many topology calls.

### <a id="vtadmin"/>VTAdmin

#### <a id="updated-node"/>vtadmin-web updated to node v22.13.1 (LTS)

Building `vtadmin-web` now requires node >= v22.13.0 (LTS). Breaking changes from v20 to v22 can be found at https://nodejs.org/en/blog/release/v22.13.0 -- with no known issues that apply to VTAdmin.
Full details on the node v20.12.2 release can be found at https://nodejs.org/en/blog/release/v22.13.1.
2 changes: 1 addition & 1 deletion docker/binaries/vtadmin/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ ARG DEBIAN_VER=bookworm-slim

FROM vitess/lite:${VT_BASE_VER} AS lite

FROM node:20-${DEBIAN_VER} as node
FROM node:22-${DEBIAN_VER} as node

# Prepare directory structure.
RUN mkdir -p /vt/web
Expand Down
4 changes: 4 additions & 0 deletions go/cmd/vtctldclient/command/vreplication/workflow/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,17 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

var state binlogdatapb.VReplicationWorkflowState
var shards []string
switch strings.ToLower(cmd.Name()) {
case "start":
if err := common.CanRestartWorkflow(baseOptions.Keyspace, baseOptions.Workflow); err != nil {
return err
}
state = binlogdatapb.VReplicationWorkflowState_Running
shards = baseOptions.Shards
case "stop":
state = binlogdatapb.VReplicationWorkflowState_Stopped
shards = baseOptions.Shards
default:
return fmt.Errorf("invalid workflow state: %s", args[0])
}
Expand All @@ -80,6 +83,7 @@ func commandUpdateState(cmd *cobra.Command, args []string) error {
Cells: textutil.SimulatedNullStringSlice,
TabletTypes: textutil.SimulatedNullTabletTypeSlice,
State: &state,
Shards: shards,
},
}

Expand Down
8 changes: 5 additions & 3 deletions go/mysql/binlog_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ func (c *Conn) parseComBinlogDumpGTID(data []byte) (logFile string, logPos uint6
if !ok {
return logFile, logPos, position, readPacketErr
}
if gtid := string(data[pos : pos+int(dataSize)]); gtid != "" {
position, err = replication.DecodePosition(gtid)
if gtidBytes := data[pos : pos+int(dataSize)]; len(gtidBytes) != 0 {
gtid, err := replication.NewMysql56GTIDSetFromSIDBlock(gtidBytes)
if err != nil {
return logFile, logPos, position, err
return logFile, logPos, position, vterrors.Wrapf(err, "error parsing GTID from BinlogDumpGTID packet")
}
// ComBinlogDumpGTID is a MySQL specific protocol. The GTID flavor is necessarily MySQL 56
position = replication.Position{GTIDSet: gtid}
}
if flags2&BinlogDumpNonBlock != 0 {
return logFile, logPos, position, io.EOF
Expand Down
1 change: 1 addition & 0 deletions go/mysql/decimal/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion go/mysql/flavor_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func (mysqlFlavor) sendBinlogDumpCommand(c *Conn, serverID uint32, binlogFilenam
}

// Build the command.
sidBlock := gtidSet.SIDBlock()
var sidBlock []byte
if gtidSet != nil {
sidBlock = gtidSet.SIDBlock()
}
var flags2 uint16
if binlogFilename != "" {
flags2 |= BinlogThroughPosition
Expand Down
9 changes: 5 additions & 4 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (c *Conn) AnalyzeSemiSyncAckRequest(buf []byte) (strippedBuf []byte, ackReq
// WriteComBinlogDumpGTID writes a ComBinlogDumpGTID command.
// Only works with MySQL 5.6+ (and not MariaDB).
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html for syntax.
func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, gtidSet []byte) error {
// sidBlock must be the result of a gtidSet.SIDBlock() function.
func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16, sidBlock []byte) error {
c.sequence = 0
length := 1 + // ComBinlogDumpGTID
2 + // flags
Expand All @@ -90,16 +91,16 @@ func (c *Conn) WriteComBinlogDumpGTID(serverID uint32, binlogFilename string, bi
len(binlogFilename) + // binlog-filename
8 + // binlog-pos
4 + // data-size
len(gtidSet) // data
len(sidBlock) // data
data, pos := c.startEphemeralPacketWithHeader(length)
pos = writeByte(data, pos, ComBinlogDumpGTID) // nolint
pos = writeUint16(data, pos, flags) // nolint
pos = writeUint32(data, pos, serverID) // nolint
pos = writeUint32(data, pos, uint32(len(binlogFilename))) // nolint
pos = writeEOFString(data, pos, binlogFilename) // nolint
pos = writeUint64(data, pos, binlogPos) // nolint
pos = writeUint32(data, pos, uint32(len(gtidSet))) // nolint
pos += copy(data[pos:], gtidSet) // nolint
pos = writeUint32(data, pos, uint32(len(sidBlock))) // nolint
pos += copy(data[pos:], sidBlock) // nolint
if err := c.writeEphemeralPacket(); err != nil {
return sqlerror.NewSQLErrorf(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "%v", err)
}
Expand Down
48 changes: 45 additions & 3 deletions go/mysql/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/test/utils"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -88,14 +89,50 @@ func TestComBinlogDumpGTID(t *testing.T) {
cConn.Close()
}()

t.Run("WriteComBinlogDumpGTIDEmptyGTID", func(t *testing.T) {
// Write ComBinlogDumpGTID packet, read it, compare.
var flags uint16 = 0x0d0e
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{})
assert.NoError(t, err)
data, err := sConn.ReadPacket()
require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err)
require.NotEmpty(t, data)
require.EqualValues(t, data[0], ComBinlogDumpGTID)

expectedData := []byte{
ComBinlogDumpGTID,
0x0e, 0x0d, // flags
0x04, 0x03, 0x02, 0x01, // server-id
0x07, 0x00, 0x00, 0x00, // binlog-filename-len
'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename
0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos
0x00, 0x00, 0x00, 0x00, // data-size is zero, no GTID payload
}
assert.Equal(t, expectedData, data)
logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data)
require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err)
assert.Equal(t, "moofarm", logFile)
assert.Equal(t, uint64(0x05060708090a0b0c), logPos)
assert.True(t, pos.IsZero())
})

sConn.sequence = 0

t.Run("WriteComBinlogDumpGTID", func(t *testing.T) {
// Write ComBinlogDumpGTID packet, read it, compare.
var flags uint16 = 0x0d0e
assert.Equal(t, flags, flags|BinlogThroughGTID)
err := cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, []byte{0xfa, 0xfb})
gtidSet, err := replication.ParseMysql56GTIDSet("16b1039f-22b6-11ed-b765-0a43f95f28a3:1-243")
require.NoError(t, err)
sidBlock := gtidSet.SIDBlock()
assert.Len(t, sidBlock, 48)

err = cConn.WriteComBinlogDumpGTID(0x01020304, "moofarm", 0x05060708090a0b0c, flags, sidBlock)
assert.NoError(t, err)
data, err := sConn.ReadPacket()
require.NoError(t, err, "sConn.ReadPacket - ComBinlogDumpGTID failed: %v", err)
require.NotEmpty(t, data)
require.EqualValues(t, data[0], ComBinlogDumpGTID)

expectedData := []byte{
ComBinlogDumpGTID,
Expand All @@ -104,10 +141,15 @@ func TestComBinlogDumpGTID(t *testing.T) {
0x07, 0x00, 0x00, 0x00, // binlog-filename-len
'm', 'o', 'o', 'f', 'a', 'r', 'm', // bilog-filename
0x0c, 0x0b, 0x0a, 0x09, 0x08, 0x07, 0x06, 0x05, // binlog-pos
0x02, 0x00, 0x00, 0x00, // data-size
0xfa, 0xfb, // data
0x30, 0x00, 0x00, 0x00, // data-size
}
expectedData = append(expectedData, sidBlock...) // data
assert.Equal(t, expectedData, data)
logFile, logPos, pos, err := sConn.parseComBinlogDumpGTID(data)
require.NoError(t, err, "parseComBinlogDumpGTID failed: %v", err)
assert.Equal(t, "moofarm", logFile)
assert.Equal(t, uint64(0x05060708090a0b0c), logPos)
assert.Equal(t, gtidSet, pos.GTIDSet)
})

sConn.sequence = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/encoding/protojson"

Expand Down Expand Up @@ -501,6 +502,28 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Stopped.String())
rs.Start()
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())

t.Run("Test --shards in workflow start/stop", func(t *testing.T) {
// This subtest expects workflow to be running at the start and restarts it at the end.
type tCase struct {
shards string
action string
expected int
}
testCases := []tCase{
{"-40", "stop", 1},
{"40-80", "stop", 1},
{"-40,40-80", "start", 2},
}
for _, tc := range testCases {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", keyspace, tc.action, "--workflow", workflowName, "--shards", tc.shards)
require.NoError(t, err, "failed to %s workflow: %v", tc.action, err)
cnt := gjson.Get(output, "details.#").Int()
require.EqualValuesf(t, tc.expected, cnt, "expected %d shards, got %d for action %s, shards %s", tc.expected, cnt, tc.action, tc.shards)
}
})
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", keyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String())

for _, targetTab := range targetTabs {
catchup(t, targetTab, workflowName, "Reshard")
}
Expand Down
Loading

0 comments on commit 2e008f6

Please sign in to comment.