From 539ac24a7ab2ec045b05b65ab0c7e55ab9ab9b9d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 23 Jan 2025 19:30:49 +0200 Subject: [PATCH] Completely remove gh-ost and pt-osc Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/schemamanager/tablet_executor_test.go | 21 +- go/vt/vttablet/onlineddl/executor.go | 697 +------------------- go/vt/vttablet/onlineddl/schema.go | 15 - go/vt/vttablet/onlineddl/util.go | 86 --- go/vt/vttablet/onlineddl/util_test.go | 32 - go/vt/vttablet/tabletserver/tabletserver.go | 13 - 6 files changed, 12 insertions(+), 852 deletions(-) delete mode 100644 go/vt/vttablet/onlineddl/util.go delete mode 100644 go/vt/vttablet/onlineddl/util_test.go diff --git a/go/vt/schemamanager/tablet_executor_test.go b/go/vt/schemamanager/tablet_executor_test.go index a683ef4d22e..ac6ba84b0c6 100644 --- a/go/vt/schemamanager/tablet_executor_test.go +++ b/go/vt/schemamanager/tablet_executor_test.go @@ -206,12 +206,7 @@ func TestIsOnlineSchemaDDL(t *testing.T) { query: "CREATE TABLE t(id int)", isOnlineDDL: false, }, - { - query: "CREATE TABLE t(id int)", - ddlStrategy: "gh-ost", - isOnlineDDL: true, - strategy: schema.DDLStrategyGhost, - }, + { query: "ALTER TABLE t ADD COLUMN i INT", ddlStrategy: "online", @@ -231,16 +226,16 @@ func TestIsOnlineSchemaDDL(t *testing.T) { }, { query: "ALTER TABLE t ADD COLUMN i INT", - ddlStrategy: "gh-ost", + ddlStrategy: "vitess", isOnlineDDL: true, - strategy: schema.DDLStrategyGhost, + strategy: schema.DDLStrategyVitess, }, { query: "ALTER TABLE t ADD COLUMN i INT", - ddlStrategy: "gh-ost --max-load=Threads_running=100", + ddlStrategy: "vitess --declarative", isOnlineDDL: true, - strategy: schema.DDLStrategyGhost, - options: "--max-load=Threads_running=100", + strategy: schema.DDLStrategyVitess, + options: "--declarative", }, { query: "TRUNCATE TABLE t", @@ -249,12 +244,12 @@ func TestIsOnlineSchemaDDL(t *testing.T) { }, { query: "TRUNCATE TABLE t", - ddlStrategy: "gh-ost", + ddlStrategy: "vitess", isOnlineDDL: false, }, { query: "RENAME TABLE t to t2", - ddlStrategy: "gh-ost", + ddlStrategy: "vitess", isOnlineDDL: false, }, } diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index 76c7af7fc2e..9449ceb99d4 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -43,7 +43,6 @@ import ( "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/syscallutil" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -69,7 +68,7 @@ import ( ) var ( - // ErrExecutorNotWritableTablet is generated when executor is asked to run gh-ost on a read-only server + // ErrExecutorNotWritableTablet is generated when executor is asked to run a migration on a read-only server ErrExecutorNotWritableTablet = errors.New("cannot run migration on non-writable tablet") // ErrExecutorMigrationAlreadyRunning is generated when an attempt is made to run an operation that conflicts with a running migration ErrExecutorMigrationAlreadyRunning = errors.New("cannot run migration since a migration is already running") @@ -90,8 +89,6 @@ var acceptableDropTableIfExistsErrorCodes = []sqlerror.ErrorCode{sqlerror.ERCant var copyAlgorithm = sqlparser.AlgorithmValue(sqlparser.CopyStr) var ( - ghostBinaryPath = "gh-ost" - ptOSCBinaryPath = "/usr/bin/pt-online-schema-change" migrationCheckInterval = 1 * time.Minute retainOnlineDDLTables = 24 * time.Hour maxConcurrentOnlineDDLs = 256 @@ -112,8 +109,6 @@ func init() { } func registerOnlineDDLFlags(fs *pflag.FlagSet) { - fs.StringVar(&ghostBinaryPath, "gh-ost-path", ghostBinaryPath, "override default gh-ost binary full path") - fs.StringVar(&ptOSCBinaryPath, "pt-osc-path", ptOSCBinaryPath, "override default pt-online-schema-change binary full path") fs.DurationVar(&migrationCheckInterval, "migration_check_interval", migrationCheckInterval, "Interval between migration checks") fs.DurationVar(&retainOnlineDDLTables, "retain_online_ddl_tables", retainOnlineDDLTables, "How long should vttablet keep an old migrated table before purging it") fs.IntVar(&maxConcurrentOnlineDDLs, "max_concurrent_online_ddl", maxConcurrentOnlineDDLs, "Maximum number of online DDL changes that may run concurrently") @@ -135,22 +130,7 @@ const ( vreplicationTestSuiteWaitSeconds = 5 ) -var ( - migrationLogFileName = "migration.log" - migrationFailureFileName = "migration-failure.log" - onlineDDLUser = "vt-online-ddl-internal" - onlineDDLGrant = fmt.Sprintf("'%s'@'%s'", onlineDDLUser, "%") -) - -type mysqlVariables struct { - host string - port int - readOnly bool - version string - versionComment string -} - -// Executor wraps and manages the execution of a gh-ost migration. +// Executor is a state machine running migrations type Executor struct { env tabletenv.Env pool *connpool.Pool @@ -219,7 +199,7 @@ func safeMigrationCutOverThreshold(threshold time.Duration) (time.Duration, erro } } -// NewExecutor creates a new gh-ost executor. +// NewExecutor creates a new executor. func NewExecutor(env tabletenv.Env, tabletAlias *topodatapb.TabletAlias, ts *topo.Server, lagThrottler *throttle.Throttler, tabletTypeFunc func() topodatapb.TabletType, @@ -400,7 +380,7 @@ func (e *Executor) allowConcurrentMigration(onlineDDL *schema.OnlineDDL) (action case sqlparser.RevertDDLAction: // REVERT is allowed to run concurrently. // Reminder that REVERT is supported for CREATE, DROP and for 'vitess' ALTER, but never for - // 'gh-ost' or 'pt-osc' ALTERs + // 'direct' or 'mysql' ALTERs return action, true } return action, false @@ -446,127 +426,10 @@ func (e *Executor) isAnyConflictingMigrationRunning(onlineDDL *schema.OnlineDDL) return (conflictingMigration != nil), conflictingMigration } -func (e *Executor) ghostPanicFlagFileName(uuid string) string { - return path.Join(os.TempDir(), fmt.Sprintf("ghost.%s.panic.flag", uuid)) -} - -func (e *Executor) createGhostPanicFlagFile(uuid string) error { - _, err := os.Create(e.ghostPanicFlagFileName(uuid)) - return err -} - -func (e *Executor) deleteGhostPanicFlagFile(uuid string) error { - // We use RemoveAll because if the file does not exist that's fine. Remove will return an error - // if file does not exist; RemoveAll does not. - return os.RemoveAll(e.ghostPanicFlagFileName(uuid)) -} - -func (e *Executor) ghostPostponeFlagFileName(uuid string) string { - return path.Join(os.TempDir(), fmt.Sprintf("ghost.%s.postpone.flag", uuid)) -} - -func (e *Executor) deleteGhostPostponeFlagFile(uuid string) error { - // We use RemoveAll because if the file does not exist that's fine. Remove will return an error - // if file does not exist; RemoveAll does not. - return os.RemoveAll(e.ghostPostponeFlagFileName(uuid)) -} - func (e *Executor) ptPidFileName(uuid string) string { return path.Join(os.TempDir(), fmt.Sprintf("pt-online-schema-change.%s.pid", uuid)) } -// readMySQLVariables contacts the backend MySQL server to read some of its configuration -func (e *Executor) readMySQLVariables(ctx context.Context) (variables *mysqlVariables, err error) { - conn, err := e.pool.Get(ctx, nil) - if err != nil { - return nil, err - } - defer conn.Recycle() - - tm, err := conn.Conn.Exec(ctx, `select - @@global.hostname as hostname, - @@global.port as port, - @@global.read_only as read_only, - @@global.version AS version, - @@global.version_comment AS version_comment - from dual`, 1, true) - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not read MySQL variables: %v", err) - } - row := tm.Named().Row() - if row == nil { - return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "unexpected result for MySQL variables: %+v", tm.Rows) - } - variables = &mysqlVariables{} - - if e.env.Config().DB.Host != "" { - variables.host = e.env.Config().DB.Host - } else { - variables.host = row["hostname"].ToString() - } - - if e.env.Config().DB.Port != 0 { - variables.port = e.env.Config().DB.Port - } else if port, err := row.ToInt("port"); err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not parse @@global.port %v: %v", tm, err) - } else { - variables.port = port - } - if variables.readOnly, err = row.ToBool("read_only"); err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not parse @@global.read_only %v: %v", tm, err) - } - - variables.version = row["version"].ToString() - variables.versionComment = row["version_comment"].ToString() - - return variables, nil -} - -// createOnlineDDLUser creates a gh-ost or pt-osc user account with all -// necessary privileges and with a random password -func (e *Executor) createOnlineDDLUser(ctx context.Context) (password string, err error) { - conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaConnector()) - if err != nil { - return password, err - } - defer conn.Close() - - password = RandomHash()[0:maxPasswordLength] - - for _, query := range sqlCreateOnlineDDLUser { - parsed := sqlparser.BuildParsedQuery(query, onlineDDLGrant, password) - if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { - return password, err - } - } - for _, query := range sqlGrantOnlineDDLSuper { - parsed := sqlparser.BuildParsedQuery(query, onlineDDLGrant) - conn.ExecuteFetch(parsed.Query, 0, false) - // We ignore failure, since we might not be able to grant - // SUPER privs (e.g. Aurora) - } - for _, query := range sqlGrantOnlineDDLUser { - parsed := sqlparser.BuildParsedQuery(query, onlineDDLGrant) - if _, err := conn.ExecuteFetch(parsed.Query, 0, false); err != nil { - return password, err - } - } - return password, err -} - -// dropOnlineDDLUser drops the given ddl user account at the end of migration -func (e *Executor) dropOnlineDDLUser(ctx context.Context) error { - conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaConnector()) - if err != nil { - return err - } - defer conn.Close() - - parsed := sqlparser.BuildParsedQuery(sqlDropOnlineDDLUser, onlineDDLGrant) - _, err = conn.ExecuteFetch(parsed.Query, 0, false) - return err -} - // tableExists checks if a given table exists. func (e *Executor) tableExists(ctx context.Context, tableName string) (bool, error) { tableName = strings.ReplaceAll(tableName, `_`, `\_`) @@ -610,16 +473,6 @@ func (e *Executor) getCreateTableStatement(ctx context.Context, tableName string return createTable, nil } -func (e *Executor) parseAlterOptions(ctx context.Context, onlineDDL *schema.OnlineDDL) string { - // Temporary hack (2020-08-11) - // Because sqlparser does not do full blown ALTER TABLE parsing, - // and because we don't want gh-ost to know about WITH_GHOST and WITH_PT syntax, - // we resort to regexp-based parsing of the query. - // TODO(shlomi): generate _alter options_ via sqlparser when it full supports ALTER TABLE syntax. - _, _, alterOptions := schema.ParseAlterTableOptions(onlineDDL.SQL) - return alterOptions -} - // executeDirectly runs a DDL query directly on the backend MySQL server func (e *Executor) executeDirectly(ctx context.Context, onlineDDL *schema.OnlineDDL, acceptableMySQLErrorCodes ...sqlerror.ErrorCode) (acceptableErrorCodeFound bool, err error) { conn, err := dbconnpool.NewDBConnection(ctx, e.env.Config().DB.DbaWithDB()) @@ -1589,438 +1442,6 @@ func (e *Executor) ExecuteWithVReplication(ctx context.Context, onlineDDL *schem return nil } -// ExecuteWithGhost validates and runs a gh-ost process. -// Validation included testing the backend MySQL server and the gh-ost binary itself -// Execution runs first a dry run, then an actual migration -func (e *Executor) ExecuteWithGhost(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { - return ErrExecutorNotWritableTablet - } - variables, err := e.readMySQLVariables(ctx) - if err != nil { - log.Errorf("Error before running gh-ost: %+v", err) - return err - } - if variables.readOnly { - err := fmt.Errorf("Error before running gh-ost: MySQL server is read_only") - log.Errorf(err.Error()) - return err - } - onlineDDLPassword, err := e.createOnlineDDLUser(ctx) - if err != nil { - err := fmt.Errorf("Error creating gh-ost user: %+v", err) - log.Errorf(err.Error()) - return err - } - tempDir, err := createTempDir(onlineDDL.UUID) - if err != nil { - log.Errorf("Error creating temporary directory: %+v", err) - return err - } - credentialsConfigFileContent := fmt.Sprintf(`[client] -user=%s -password=${ONLINE_DDL_PASSWORD} -`, onlineDDLUser) - credentialsConfigFileName, err := createTempScript(tempDir, "gh-ost-conf.cfg", credentialsConfigFileContent) - if err != nil { - log.Errorf("Error creating config file: %+v", err) - return err - } - wrapperScriptContent := fmt.Sprintf(`#!/bin/bash -ghost_log_path="%s" -ghost_log_file="%s" -ghost_log_failure_file="%s" - -mkdir -p "$ghost_log_path" - -export ONLINE_DDL_PASSWORD -%s "$@" > "$ghost_log_path/$ghost_log_file" 2>&1 -exit_code=$? -grep -o '\bFATAL\b.*' "$ghost_log_path/$ghost_log_file" | tail -1 > "$ghost_log_path/$ghost_log_failure_file" -exit $exit_code - `, tempDir, migrationLogFileName, migrationFailureFileName, ghostBinaryPath, - ) - wrapperScriptFileName, err := createTempScript(tempDir, "gh-ost-wrapper.sh", wrapperScriptContent) - if err != nil { - log.Errorf("Error creating wrapper script: %+v", err) - return err - } - onHookContent := func(status schema.OnlineDDLStatus, hint string) string { - return fmt.Sprintf(`#!/bin/bash - curl --max-time 10 -s 'http://localhost:%d/schema-migration/report-status?uuid=%s&status=%s&hint=%s&dryrun='"$GH_OST_DRY_RUN"'&progress='"$GH_OST_PROGRESS"'&eta='"$GH_OST_ETA_SECONDS"'&rowscopied='"$GH_OST_COPIED_ROWS" - `, servenv.Port(), onlineDDL.UUID, string(status), hint) - } - if _, err := createTempScript(tempDir, "gh-ost-on-startup", onHookContent(schema.OnlineDDLStatusRunning, emptyHint)); err != nil { - log.Errorf("Error creating script: %+v", err) - return err - } - if _, err := createTempScript(tempDir, "gh-ost-on-status", onHookContent(schema.OnlineDDLStatusRunning, emptyHint)); err != nil { - log.Errorf("Error creating script: %+v", err) - return err - } - if _, err := createTempScript(tempDir, "gh-ost-on-success", onHookContent(schema.OnlineDDLStatusComplete, emptyHint)); err != nil { - log.Errorf("Error creating script: %+v", err) - return err - } - if _, err := createTempScript(tempDir, "gh-ost-on-failure", onHookContent(schema.OnlineDDLStatusFailed, emptyHint)); err != nil { - log.Errorf("Error creating script: %+v", err) - return err - } - if _, err := createTempScript(tempDir, "gh-ost-on-begin-postponed", onHookContent(schema.OnlineDDLStatusRunning, readyToCompleteHint)); err != nil { - log.Errorf("Error creating script: %+v", err) - return err - } - serveSocketFile := path.Join(tempDir, "serve.sock") - - if err := e.deleteGhostPanicFlagFile(onlineDDL.UUID); err != nil { - log.Errorf("Error removing gh-ost panic flag file %s: %+v", e.ghostPanicFlagFileName(onlineDDL.UUID), err) - return err - } - if err := e.deleteGhostPostponeFlagFile(onlineDDL.UUID); err != nil { - log.Errorf("Error removing gh-ost postpone flag file %s before migration: %+v", e.ghostPostponeFlagFileName(onlineDDL.UUID), err) - return err - } - // Validate gh-ost binary: - _ = e.updateMigrationMessage(ctx, onlineDDL.UUID, "validating gh-ost --version") - log.Infof("Will now validate gh-ost binary") - _, err = execCmd( - "bash", - []string{ - wrapperScriptFileName, - "--version", - }, - os.Environ(), - "/tmp", - nil, - nil, - ) - if err != nil { - log.Errorf("Error testing gh-ost binary: %+v", err) - return err - } - _ = e.updateMigrationMessage(ctx, onlineDDL.UUID, "validated gh-ost --version") - log.Infof("+ OK") - - if err := e.updateMigrationLogPath(ctx, onlineDDL.UUID, variables.host, tempDir); err != nil { - return err - } - - runGhost := func(execute bool) error { - alterOptions := e.parseAlterOptions(ctx, onlineDDL) - forceTableNames := fmt.Sprintf("%s_%s", onlineDDL.UUID, schema.ReadableTimestamp()) - - if err := e.updateArtifacts(ctx, onlineDDL.UUID, - fmt.Sprintf("_%s_gho", forceTableNames), - fmt.Sprintf("_%s_ghc", forceTableNames), - fmt.Sprintf("_%s_del", forceTableNames), - ); err != nil { - return err - } - - os.Setenv("ONLINE_DDL_PASSWORD", onlineDDLPassword) - args := []string{ - wrapperScriptFileName, - fmt.Sprintf(`--host=%s`, variables.host), - fmt.Sprintf(`--port=%d`, variables.port), - fmt.Sprintf(`--conf=%s`, credentialsConfigFileName), // user & password found here - `--allow-on-master`, - `--max-load=Threads_running=900`, - `--critical-load=Threads_running=1000`, - `--critical-load-hibernate-seconds=60`, - `--approve-renamed-columns`, - `--debug`, - `--exact-rowcount`, - `--default-retries=120`, - fmt.Sprintf("--force-table-names=%s", forceTableNames), - fmt.Sprintf("--serve-socket-file=%s", serveSocketFile), - fmt.Sprintf("--hooks-path=%s", tempDir), - fmt.Sprintf(`--hooks-hint-token=%s`, onlineDDL.UUID), - fmt.Sprintf(`--throttle-http=http://localhost:%d/throttler/check?app=%s:%s:%s&p=low`, servenv.Port(), throttlerapp.OnlineDDLName, throttlerapp.GhostName, onlineDDL.UUID), - fmt.Sprintf(`--database=%s`, e.dbName), - fmt.Sprintf(`--table=%s`, onlineDDL.Table), - fmt.Sprintf(`--alter=%s`, alterOptions), - fmt.Sprintf(`--panic-flag-file=%s`, e.ghostPanicFlagFileName(onlineDDL.UUID)), - fmt.Sprintf(`--execute=%t`, execute), - } - if onlineDDL.StrategySetting().IsAllowZeroInDateFlag() { - args = append(args, "--allow-zero-in-date") - } - if execute && onlineDDL.StrategySetting().IsPostponeCompletion() { - args = append(args, "--postpone-cut-over-flag-file", e.ghostPostponeFlagFileName(onlineDDL.UUID)) - } - - args = append(args, onlineDDL.StrategySetting().RuntimeOptions()...) - _ = e.updateMigrationMessage(ctx, onlineDDL.UUID, fmt.Sprintf("executing gh-ost --execute=%v", execute)) - _, err := execCmd("bash", args, os.Environ(), "/tmp", nil, nil) - _ = e.updateMigrationMessage(ctx, onlineDDL.UUID, fmt.Sprintf("executed gh-ost --execute=%v, err=%v", execute, err)) - if err != nil { - // See if we can get more info from the failure file - if content, ferr := os.ReadFile(path.Join(tempDir, migrationFailureFileName)); ferr == nil { - failureMessage := strings.TrimSpace(string(content)) - if failureMessage != "" { - // This message was produced by gh-ost itself. It is more informative than the default "migration failed..." message. Overwrite. - return errors.New(failureMessage) - } - } - } - return err - } - - e.ownedRunningMigrations.Store(onlineDDL.UUID, onlineDDL) - - go func() error { - defer e.ownedRunningMigrations.Delete(onlineDDL.UUID) - defer e.deleteGhostPostponeFlagFile(onlineDDL.UUID) // irrespective whether the file was in fact in use or not - defer e.dropOnlineDDLUser(ctx) - defer e.gcArtifacts(ctx) - - log.Infof("Will now dry-run gh-ost on: %s:%d", variables.host, variables.port) - if err := runGhost(false); err != nil { - // perhaps gh-ost was interrupted midway and didn't have the chance to send a "failed" status - _ = e.failMigration(ctx, onlineDDL, err) - - log.Errorf("Error executing gh-ost dry run: %+v", err) - return err - } - log.Infof("+ OK") - - log.Infof("Will now run gh-ost on: %s:%d", variables.host, variables.port) - startedMigrations.Add(1) - if err := runGhost(true); err != nil { - // perhaps gh-ost was interrupted midway and didn't have the chance to send a "failes" status - _ = e.failMigration(ctx, onlineDDL, err) - failedMigrations.Add(1) - log.Errorf("Error running gh-ost: %+v", err) - return err - } - // Migration successful! - defer e.reloadSchema(ctx) - successfulMigrations.Add(1) - log.Infof("+ OK") - return nil - }() - return nil -} - -// ExecuteWithPTOSC validates and runs a pt-online-schema-change process. -// Validation included testing the backend MySQL server and the pt-online-schema-change binary itself -// Execution runs first a dry run, then an actual migration -func (e *Executor) ExecuteWithPTOSC(ctx context.Context, onlineDDL *schema.OnlineDDL) error { - if e.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { - return ErrExecutorNotWritableTablet - } - variables, err := e.readMySQLVariables(ctx) - if err != nil { - log.Errorf("Error before running pt-online-schema-change: %+v", err) - return err - } - if variables.readOnly { - err := fmt.Errorf("Error before running pt-online-schema-change: MySQL server is read_only") - log.Errorf(err.Error()) - return err - } - onlineDDLPassword, err := e.createOnlineDDLUser(ctx) - if err != nil { - err := fmt.Errorf("Error creating pt-online-schema-change user: %+v", err) - log.Errorf(err.Error()) - return err - } - tempDir, err := createTempDir(onlineDDL.UUID) - if err != nil { - log.Errorf("Error creating temporary directory: %+v", err) - return err - } - - wrapperScriptContent := fmt.Sprintf(`#!/bin/bash -pt_log_path="%s" -pt_log_file="%s" - -mkdir -p "$pt_log_path" - -export MYSQL_PWD -%s "$@" > "$pt_log_path/$pt_log_file" 2>&1 - `, tempDir, migrationLogFileName, ptOSCBinaryPath, - ) - wrapperScriptFileName, err := createTempScript(tempDir, "pt-online-schema-change-wrapper.sh", wrapperScriptContent) - if err != nil { - log.Errorf("Error creating wrapper script: %+v", err) - return err - } - pluginCode := ` - package pt_online_schema_change_plugin; - - use strict; - use LWP::Simple; - - sub new { - my($class, % args) = @_; - my $self = { %args }; - return bless $self, $class; - } - - sub init { - my($self, % args) = @_; - } - - sub before_create_new_table { - my($self, % args) = @_; - get("http://localhost:{{VTTABLET_PORT}}/schema-migration/report-status?uuid={{MIGRATION_UUID}}&status={{OnlineDDLStatusRunning}}&hint=&dryrun={{DRYRUN}}"); - } - - sub before_exit { - my($self, % args) = @_; - my $exit_status = $args{exit_status}; - if ($exit_status == 0) { - get("http://localhost:{{VTTABLET_PORT}}/schema-migration/report-status?uuid={{MIGRATION_UUID}}&status={{OnlineDDLStatusComplete}}&hint=&dryrun={{DRYRUN}}"); - } else { - get("http://localhost:{{VTTABLET_PORT}}/schema-migration/report-status?uuid={{MIGRATION_UUID}}&status={{OnlineDDLStatusFailed}}&hint=&dryrun={{DRYRUN}}"); - } - } - - sub get_slave_lag { - my ($self, %args) = @_; - - return sub { - if (head("http://localhost:{{VTTABLET_PORT}}/throttler/check?app={{THROTTLER_ONLINE_DDL_APP}}:{{THROTTLER_PT_OSC_APP}}:{{MIGRATION_UUID}}&p=low")) { - # Got HTTP 200 OK, means throttler is happy - return 0; - } else { - # Throttler requests to hold back - return 2147483647; # maxint, report *very* high lag - } - }; - } - - 1; - ` - pluginCode = strings.ReplaceAll(pluginCode, "{{VTTABLET_PORT}}", fmt.Sprintf("%d", servenv.Port())) - pluginCode = strings.ReplaceAll(pluginCode, "{{MIGRATION_UUID}}", onlineDDL.UUID) - pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_ONLINE_DDL_APP}}", throttlerapp.OnlineDDLName.String()) - pluginCode = strings.ReplaceAll(pluginCode, "{{THROTTLER_PT_OSC_APP}}", throttlerapp.PTOSCName.String()) - - pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusRunning}}", string(schema.OnlineDDLStatusRunning)) - pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusComplete}}", string(schema.OnlineDDLStatusComplete)) - pluginCode = strings.ReplaceAll(pluginCode, "{{OnlineDDLStatusFailed}}", string(schema.OnlineDDLStatusFailed)) - - // Validate pt-online-schema-change binary: - log.Infof("Will now validate pt-online-schema-change binary") - _, err = execCmd( - "bash", - []string{ - wrapperScriptFileName, - "--version", - }, - os.Environ(), - "/tmp", - nil, - nil, - ) - if err != nil { - log.Errorf("Error testing pt-online-schema-change binary: %+v", err) - return err - } - log.Infof("+ OK") - - if err := e.updateMigrationLogPath(ctx, onlineDDL.UUID, variables.host, tempDir); err != nil { - return err - } - - alterOptions := e.parseAlterOptions(ctx, onlineDDL) - - // The following sleep() is temporary and artificial. Because we create a new user for this - // migration, and because we throttle by replicas, we need to wait for the replicas to be - // caught up with the new user creation. Otherwise, the OSC tools will fail connecting to the replicas... - // Once we have a built in throttling service , we will no longer need to have the OSC tools probe the - // replicas. Instead, they will consult with our throttling service. - // TODO(shlomi): replace/remove this when we have a proper throttling solution - time.Sleep(time.Second) - - runPTOSC := func(execute bool) error { - os.Setenv("MYSQL_PWD", onlineDDLPassword) - newTableName := fmt.Sprintf("_%s_%s_new", onlineDDL.UUID, schema.ReadableTimestamp()) - - if err := e.updateArtifacts(ctx, onlineDDL.UUID, - fmt.Sprintf("_%s_old", onlineDDL.Table), - fmt.Sprintf("__%s_old", onlineDDL.Table), - newTableName, - ); err != nil { - return err - } - - executeFlag := "--dry-run" - if execute { - executeFlag = "--execute" - } - finalPluginCode := strings.ReplaceAll(pluginCode, "{{DRYRUN}}", fmt.Sprintf("%t", !execute)) - pluginFile, err := createTempScript(tempDir, "pt-online-schema-change-plugin", finalPluginCode) - if err != nil { - log.Errorf("Error creating script: %+v", err) - return err - } - args := []string{ - wrapperScriptFileName, - `--pid`, - e.ptPidFileName(onlineDDL.UUID), - `--plugin`, - pluginFile, - `--new-table-name`, - newTableName, - `--alter`, - alterOptions, - `--check-slave-lag`, // We use primary's identity so that pt-online-schema-change calls our lag plugin for exactly 1 server - fmt.Sprintf(`h=%s,P=%d,D=%s,t=%s,u=%s`, variables.host, variables.port, e.dbName, onlineDDL.Table, onlineDDLUser), - executeFlag, - fmt.Sprintf(`h=%s,P=%d,D=%s,t=%s,u=%s`, variables.host, variables.port, e.dbName, onlineDDL.Table, onlineDDLUser), - } - - if execute { - args = append(args, - `--no-drop-new-table`, - `--no-drop-old-table`, - ) - } - args = append(args, onlineDDL.StrategySetting().RuntimeOptions()...) - _, err = execCmd("bash", args, os.Environ(), "/tmp", nil, nil) - return err - } - - e.ownedRunningMigrations.Store(onlineDDL.UUID, onlineDDL) - - go func() error { - defer e.ownedRunningMigrations.Delete(onlineDDL.UUID) - defer e.dropOnlineDDLUser(ctx) - defer e.gcArtifacts(ctx) - - log.Infof("Will now dry-run pt-online-schema-change on: %s:%d", variables.host, variables.port) - if err := runPTOSC(false); err != nil { - // perhaps pt-osc was interrupted midway and didn't have the chance to send a "failes" status - _ = e.failMigration(ctx, onlineDDL, err) - _ = e.updateMigrationTimestamp(ctx, "completed_timestamp", onlineDDL.UUID) - log.Errorf("Error executing pt-online-schema-change dry run: %+v", err) - return err - } - log.Infof("+ OK") - - log.Infof("Will now run pt-online-schema-change on: %s:%d", variables.host, variables.port) - startedMigrations.Add(1) - if err := runPTOSC(true); err != nil { - // perhaps pt-osc was interrupted midway and didn't have the chance to send a "failes" status - _ = e.failMigration(ctx, onlineDDL, err) - _ = e.updateMigrationTimestamp(ctx, "completed_timestamp", onlineDDL.UUID) - _ = e.dropPTOSCMigrationTriggers(ctx, onlineDDL) - failedMigrations.Add(1) - log.Errorf("Error running pt-online-schema-change: %+v", err) - return err - } - // Migration successful! - defer e.reloadSchema(ctx) - successfulMigrations.Add(1) - log.Infof("+ OK") - return nil - }() - return nil -} - func (e *Executor) readMigration(ctx context.Context, uuid string) (onlineDDL *schema.OnlineDDL, row sqltypes.RowNamedValues, err error) { query, err := sqlparser.ParseAndBind(sqlSelectMigration, @@ -2087,36 +1508,6 @@ func (e *Executor) terminateMigration(ctx context.Context, onlineDDL *schema.Onl if err := e.terminateVReplMigration(ctx, onlineDDL.UUID); err != nil { return foundRunning, fmt.Errorf("Error terminating migration, vreplication exec error: %+v", err) } - case schema.DDLStrategyPTOSC: - // see if pt-osc is running (could have been executed by this vttablet or one that crashed in the past) - if running, pid, _ := e.isPTOSCMigrationRunning(ctx, onlineDDL.UUID); running { - foundRunning = true - // Because pt-osc doesn't offer much control, we take a brute force approach to killing it, - // revoking its privileges, and cleaning up its triggers. - if err := syscallutil.Kill(pid, syscall.SIGTERM); err != nil { - return foundRunning, nil - } - if err := syscallutil.Kill(pid, syscall.SIGKILL); err != nil { - return foundRunning, nil - } - if err := e.dropOnlineDDLUser(ctx); err != nil { - return foundRunning, nil - } - if err := e.dropPTOSCMigrationTriggers(ctx, onlineDDL); err != nil { - return foundRunning, nil - } - } - case schema.DDLStrategyGhost: - // double check: is the running migration the very same one we wish to cancel? - if _, ok := e.ownedRunningMigrations.Load(onlineDDL.UUID); ok { - // assuming all goes well in next steps, we can already report that there has indeed been a migration - foundRunning = true - } - // gh-ost migrations are easy to kill: just touch their specific panic flag files. We trust - // gh-ost to terminate. No need to KILL it. And there's no trigger cleanup. - if err := e.createGhostPanicFlagFile(onlineDDL.UUID); err != nil { - return foundRunning, fmt.Errorf("Error terminating gh-ost migration, flag file error: %+v", err) - } } return foundRunning, nil } @@ -3184,14 +2575,6 @@ func (e *Executor) executeAlterDDLActionMigration(ctx context.Context, onlineDDL if err := e.ExecuteWithVReplication(ctx, onlineDDL, nil); err != nil { return failMigration(err) } - case schema.DDLStrategyGhost: - if err := e.ExecuteWithGhost(ctx, onlineDDL); err != nil { - return failMigration(err) - } - case schema.DDLStrategyPTOSC: - if err := e.ExecuteWithPTOSC(ctx, onlineDDL); err != nil { - return failMigration(err) - } case schema.DDLStrategyMySQL: if _, err := e.executeDirectly(ctx, onlineDDL); err != nil { return failMigration(err) @@ -3840,37 +3223,6 @@ func (e *Executor) reviewRunningMigrations(ctx context.Context) (countRunnning i if err := reviewVReplRunningMigration(); err != nil { return countRunnning, cancellable, err } - case schema.DDLStrategyPTOSC: - { - // Since pt-osc doesn't have a "liveness" plugin entry point, we do it externally: - // if the process is alive, we update the `liveness_timestamp` for this migration. - running, _, err := e.isPTOSCMigrationRunning(ctx, uuid) - if err != nil { - return countRunnning, cancellable, err - } - if running { - _ = e.updateMigrationTimestamp(ctx, "liveness_timestamp", uuid) - } - if _, ok := e.ownedRunningMigrations.Load(uuid); !ok { - // Ummm, the migration is running but we don't own it. This means the migration - // is rogue. Maybe executed by another tablet. Anyway, if we don't own it, we can't - // complete the migration. Even if it runs, the logic around announcing it as complete - // is missing. So we may as well cancel it. - message := fmt.Sprintf("cancelling a pt-osc running migration %s which is not owned (not started, or is assumed to be terminated) by this executor", uuid) - cancellable = append(cancellable, newCancellableMigration(uuid, message)) - } - } - case schema.DDLStrategyGhost: - { - if _, ok := e.ownedRunningMigrations.Load(uuid); !ok { - // Ummm, the migration is running but we don't own it. This means the migration - // is rogue. Maybe executed by another tablet. Anyway, if we don't own it, we can't - // complete the migration. Even if it runs, the logic around announcing it as complete - // is missing. So we may as well cancel it. - message := fmt.Sprintf("cancelling a gh-ost running migration %s which is not owned by this executor. This can happen when the migration was started by a different tablet. Then, either a MySQL failure, a PRS, or ERS took place. gh-ost does not survive a MySQL restart or a shard failing over to a new PRIMARY", uuid) - cancellable = append(cancellable, newCancellableMigration(uuid, message)) - } - } } countRunnning++ } @@ -4205,21 +3557,6 @@ func (e *Executor) updateMigrationTimestamp(ctx context.Context, timestampColumn return err } -func (e *Executor) updateMigrationLogPath(ctx context.Context, uuid string, hostname, logPath string) error { - logFile := path.Join(logPath, migrationLogFileName) - hostLogPath := fmt.Sprintf("%s:%s", hostname, logPath) - query, err := sqlparser.ParseAndBind(sqlUpdateMigrationLogPath, - sqltypes.StringBindVariable(hostLogPath), - sqltypes.StringBindVariable(logFile), - sqltypes.StringBindVariable(uuid), - ) - if err != nil { - return err - } - _, err = e.execQuery(ctx, query) - return err -} - func (e *Executor) updateArtifacts(ctx context.Context, uuid string, artifacts ...string) error { bindArtifacts := strings.Join(artifacts, ",") query, err := sqlparser.ParseAndBind(sqlUpdateArtifacts, @@ -4817,11 +4154,6 @@ func (e *Executor) CompleteMigration(ctx context.Context, uuid string) (result * return nil, err } defer e.triggerNextCheckInterval() - if err := e.deleteGhostPostponeFlagFile(uuid); err != nil { - // This should work without error even if the migration is not a gh-ost migration, and even - // if the file does not exist. An error here indicates a general system error of sorts. - return nil, err - } rs, err := e.execQuery(ctx, query) if err != nil { return nil, err @@ -5247,27 +4579,6 @@ func (e *Executor) onSchemaMigrationStatus(ctx context.Context, return nil } -// OnSchemaMigrationStatus is called by TabletServer's API, which is invoked by a running gh-ost migration's hooks. -func (e *Executor) OnSchemaMigrationStatus(ctx context.Context, - uuidParam, statusParam, dryrunParam, progressParam, etaParam, rowsCopiedParam, hint string) (err error) { - status := schema.OnlineDDLStatus(statusParam) - dryRun := (dryrunParam == "true") - var progressPct float64 - if pct, err := strconv.ParseFloat(progressParam, 64); err == nil { - progressPct = pct - } - var etaSeconds int64 = etaSecondsUnknown - if eta, err := strconv.ParseInt(etaParam, 10, 64); err == nil { - etaSeconds = eta - } - var rowsCopied int64 - if rows, err := strconv.ParseInt(rowsCopiedParam, 10, 64); err == nil { - rowsCopied = rows - } - - return e.onSchemaMigrationStatus(ctx, uuidParam, status, dryRun, progressPct, etaSeconds, rowsCopied, hint) -} - // checkOnPreparedPool checks if there are any cross-shard prepared transactions on the given table func (e *Executor) checkOnPreparedPool(ctx context.Context, table string, waitTime time.Duration) error { if e.isPreparedPoolEmpty(table) { diff --git a/go/vt/vttablet/onlineddl/schema.go b/go/vt/vttablet/onlineddl/schema.go index 6c0bff1086f..15a3a4b1687 100644 --- a/go/vt/vttablet/onlineddl/schema.go +++ b/go/vt/vttablet/onlineddl/schema.go @@ -577,18 +577,3 @@ const ( metadata_locks.OBJECT_SCHEMA=database() AND metadata_locks.OBJECT_NAME=%a ` ) - -var ( - sqlCreateOnlineDDLUser = []string{ - `CREATE USER IF NOT EXISTS %s IDENTIFIED BY '%s'`, - `ALTER USER %s IDENTIFIED BY '%s'`, - } - sqlGrantOnlineDDLSuper = []string{ - `GRANT SUPER ON *.* TO %s`, - } - sqlGrantOnlineDDLUser = []string{ - `GRANT PROCESS, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO %s`, - `GRANT ALTER, CREATE, CREATE VIEW, SHOW VIEW, DELETE, DROP, INDEX, INSERT, LOCK TABLES, SELECT, TRIGGER, UPDATE ON *.* TO %s`, - } - sqlDropOnlineDDLUser = `DROP USER IF EXISTS %s` -) diff --git a/go/vt/vttablet/onlineddl/util.go b/go/vt/vttablet/onlineddl/util.go deleted file mode 100644 index 3d06e6df60e..00000000000 --- a/go/vt/vttablet/onlineddl/util.go +++ /dev/null @@ -1,86 +0,0 @@ -/* -Copyright 2019 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package onlineddl - -import ( - "crypto/rand" - "crypto/sha256" - "encoding/hex" - "fmt" - "io" - "os" - "os/exec" - "path/filepath" - "strings" - - "vitess.io/vitess/go/vt/log" -) - -// execCmd searches the PATH for a command and runs it, logging the output. -// If input is not nil, pipe it to the command's stdin. -func execCmd(name string, args, env []string, dir string, input io.Reader, output io.Writer) (cmd *exec.Cmd, err error) { - cmdPath, err := exec.LookPath(name) - if err != nil { - return cmd, err - } - log.Infof("execCmd: %v %v %v", name, cmdPath, args) - - cmd = exec.Command(cmdPath, args...) - cmd.Env = env - cmd.Dir = dir - if input != nil { - cmd.Stdin = input - } - if output != nil { - cmd.Stdout = output - cmd.Stderr = output - } - err = cmd.Run() - if err != nil { - err = fmt.Errorf("failed running command: %v %s; error=%v", name, strings.Join(args, " "), err) - log.Errorf(err.Error()) - } - log.Infof("execCmd success: %v", name) - return cmd, err -} - -// createTempDir creates a temporary directory and returns its name -func createTempDir(hint string) (dirName string, err error) { - if hint != "" { - return os.MkdirTemp("", fmt.Sprintf("online-ddl-%s-*", hint)) - } - return os.MkdirTemp("", "online-ddl-*") -} - -// createTempScript creates an executable file in given directory and with given text as content. -func createTempScript(dirName, fileName, text string) (fullName string, err error) { - fullName = filepath.Join(dirName, fileName) - bytes := []byte(text) - err = os.WriteFile(fullName, bytes, 0755) - return fullName, err -} - -// RandomHash returns a 64 hex character random string -func RandomHash() string { - size := 64 - rb := make([]byte, size) - _, _ = rand.Read(rb) - - hasher := sha256.New() - hasher.Write(rb) - return hex.EncodeToString(hasher.Sum(nil)) -} diff --git a/go/vt/vttablet/onlineddl/util_test.go b/go/vt/vttablet/onlineddl/util_test.go deleted file mode 100644 index 4beb154c0ae..00000000000 --- a/go/vt/vttablet/onlineddl/util_test.go +++ /dev/null @@ -1,32 +0,0 @@ -/* -Copyright 2019 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package onlineddl - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRandomHash(t *testing.T) { - h1 := RandomHash() - h2 := RandomHash() - - assert.Equal(t, len(h1), 64) - assert.Equal(t, len(h2), 64) - assert.NotEqual(t, h1, h2) -} diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 30f73d2d818..4be9fb59600 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -229,7 +229,6 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c tsv.registerTxlogzHandler() tsv.registerQueryListHandlers([]*QueryList{tsv.statelessql, tsv.statefulql, tsv.olapql}) tsv.registerTwopczHandler() - tsv.registerMigrationStatusHandler() tsv.registerThrottlerHandlers() tsv.registerDebugEnvHandler() @@ -1898,18 +1897,6 @@ func (tsv *TabletServer) registerTwopczHandler() { }) } -func (tsv *TabletServer) registerMigrationStatusHandler() { - tsv.exporter.HandleFunc("/schema-migration/report-status", func(w http.ResponseWriter, r *http.Request) { - ctx := tabletenv.LocalContext() - query := r.URL.Query() - if err := tsv.onlineDDLExecutor.OnSchemaMigrationStatus(ctx, query.Get("uuid"), query.Get("status"), query.Get("dryrun"), query.Get("progress"), query.Get("eta"), query.Get("rowscopied"), query.Get("hint")); err != nil { - http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError) - return - } - w.Write([]byte("ok")) - }) -} - // registerThrottlerCheckHandlers registers throttler "check" requests func (tsv *TabletServer) registerThrottlerCheckHandlers() { handle := func(path string, scope base.Scope) {