Skip to content

Commit

Permalink
release 3.0: rebase master (#253)
Browse files Browse the repository at this point in the history
* diff: add connection manage (#237)

* column-mapping: set schemaID/tableID to 0 if numeric suffix is missing (#236)

* column-mapping: set schemaID/tableID to 0 if numeric suffix is missing

* column-mapping: demand an explicit separator

* pkg/column-mapping: addressed comments

* column-mapping: addressed comments

* diff: some minor fix (#242)

* remove row_id (#240)

* diff: update column's length for checkpoint table (#245)

* diff: add integration test (#239)

* diff: fix num is nil (#250)
  • Loading branch information
WangXiangUSTC authored and ericsyh committed Jun 24, 2019
1 parent 9aa2598 commit eb4fb5d
Show file tree
Hide file tree
Showing 40 changed files with 788 additions and 286 deletions.
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ test:
@export log_level=error; \
$(GOTEST) -cover $(PACKAGES)

integration_test: build
@which bin/tidb-server
@which bin/tikv-server
@which bin/pd-server
@which bin/sync_diff_inspector
@which bin/mydumper
@which bin/loader
@which bin/importer
tests/run.sh

fmt:
go fmt ./...
@goimports -w $(FILES)
Expand Down
3 changes: 0 additions & 3 deletions ddl_checker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/ddl-checker"
"github.com/pingcap/tidb-tools/pkg/utils"
)

var (
Expand Down Expand Up @@ -65,8 +64,6 @@ func main() {
initialise()
mainLoop()
destroy()

utils.SyncLog()
}

func initialise() {
Expand Down
2 changes: 0 additions & 2 deletions dump_region/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,4 @@ func main() {

fmt.Println(string(infos))
}

utils.SyncLog()
}
3 changes: 0 additions & 3 deletions importer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-tools/pkg/importer"
"github.com/pingcap/tidb-tools/pkg/utils"
"go.uber.org/zap"
)

Expand All @@ -47,6 +46,4 @@ func main() {
}

importer.DoProcess(importerCfg)

utils.SyncLog()
}
7 changes: 5 additions & 2 deletions pkg/column-mapping/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ add prefix, with arguments[prefix]

add suffix, with arguments[suffix]

partition id, with arguments [instance_id, prefix of schema, prefix of table]
partition id, with arguments [instance_id, prefix of schema, prefix of table, separator]
[1:1 bit][2:9 bits][3:10 bits][4:44 bits] int64 (using default bits length)
- 1: useless, no reason
- 2: schema ID (schema suffix)
- 3: table ID (table suffix)
- 4: origin ID (>= 0, <= 17592186044415)
And schema = arguments[1] + schema suffix, table = arguments[2] + table suffix
And
- schema = arguments[1] + arguments[3] + schema suffix or arguments[1]
- table = arguments[2] + arguments[3] + table suffix or arguments[2]
The separator argument defaults to an empty string if omitted.
```

## notice
Expand Down
76 changes: 45 additions & 31 deletions pkg/column-mapping/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ var Exprs = map[Expr]func(*mappingInfo, []interface{}) ([]interface{}, error){
// # 3 table ID (table suffix)
// # 4 origin ID (>= 0, <= 17592186044415)
//
// others: schema = arguments[1] + schema suffix
// table = arguments[2] + table suffix
// example: schema = schema_1 table = t_1 => arguments[1] = "schema_", arguments[2] = "t_"
// others: schema = arguments[1] or arguments[1] + arguments[3] + schema suffix
// table = arguments[2] or arguments[2] + arguments[3] + table suffix
// example: schema = schema_1 table = t_1
// => arguments[1] = "schema", arguments[2] = "t", arguments[3] = "_"
// if arguments[1]/arguments[2] == "", it means we don't use schemaID/tableID to compute partition ID
// if length of arguments is < 4, arguments[3] is set to "" (empty string)
PartitionID: partitionID,
}

Expand All @@ -90,7 +92,7 @@ func (r *Rule) ToLower() {

// Valid checks validity of rule.
// add prefix/suffix: it should have target column and one argument
// partition id: it should have 3 arguments
// partition id: it should have 3 to 4 arguments
func (r *Rule) Valid() error {
if _, ok := Exprs[r.Expression]; !ok {
return errors.NotFoundf("expression %s", r.Expression)
Expand All @@ -107,14 +109,25 @@ func (r *Rule) Valid() error {
}

if r.Expression == PartitionID {
if len(r.Arguments) != 3 {
switch len(r.Arguments) {
case 3, 4:
break
default:
return errors.NotValidf("arguments %v for patition id", r.Arguments)
}
}

return nil
}

// Adjust normalizes the rule into an easier-to-process form, e.g. filling in
// optional arguments with the default values.
func (r *Rule) Adjust() {
if r.Expression == PartitionID && len(r.Arguments) == 3 {
r.Arguments = append(r.Arguments, "")
}
}

// check source and target position
func (r *Rule) adjustColumnPosition(source, target int) (int, int, error) {
// if not found target, ignore it
Expand Down Expand Up @@ -165,8 +178,7 @@ func NewMapping(caseSensitive bool, rules []*Rule) (*Mapping, error) {
return m, nil
}

// AddRule adds a rule into mapping
func (m *Mapping) AddRule(rule *Rule) error {
func (m *Mapping) addOrUpdateRule(rule *Rule, isUpdate bool) error {
if m == nil || rule == nil {
return nil
}
Expand All @@ -178,37 +190,31 @@ func (m *Mapping) AddRule(rule *Rule) error {
if !m.caseSensitive {
rule.ToLower()
}
rule.Adjust()

m.resetCache()
err = m.Insert(rule.PatternSchema, rule.PatternTable, rule, false)
err = m.Insert(rule.PatternSchema, rule.PatternTable, rule, isUpdate)
if err != nil {
return errors.Annotatef(err, "add rule %+v into mapping", rule)
var method string
if isUpdate {
method = "update"
} else {
method = "add"
}
return errors.Annotatef(err, "%s rule %+v into mapping", method, rule)
}

return nil
}

// AddRule adds a rule into mapping
func (m *Mapping) AddRule(rule *Rule) error {
return m.addOrUpdateRule(rule, false)
}

// UpdateRule updates mapping rule
func (m *Mapping) UpdateRule(rule *Rule) error {
if m == nil || rule == nil {
return nil
}

err := rule.Valid()
if err != nil {
return errors.Trace(err)
}
if !m.caseSensitive {
rule.ToLower()
}

m.resetCache()
err = m.Insert(rule.PatternSchema, rule.PatternTable, rule, true)
if err != nil {
return errors.Annotatef(err, "update rule %+v in mapping", rule)
}

return nil
return m.addOrUpdateRule(rule, true)
}

// RemoveRule removes a rule from mapping
Expand Down Expand Up @@ -490,22 +496,30 @@ func computePartitionID(schema, table string, rule *Rule) (instanceID int64, sch
instanceID = int64(instanceIDUnsign << shiftCnt)
}

sep := rule.Arguments[3]

if schemaIDBitSize > 0 && len(rule.Arguments[1]) > 0 {
shiftCnt = shiftCnt - uint(schemaIDBitSize)
schemaID, err = computeID(schema, rule.Arguments[1], schemaIDBitSize, shiftCnt)
schemaID, err = computeID(schema, rule.Arguments[1], sep, schemaIDBitSize, shiftCnt)
if err != nil {
return
}
}

if tableIDBitSize > 0 && len(rule.Arguments[2]) > 0 {
shiftCnt = shiftCnt - uint(tableIDBitSize)
tableID, err = computeID(table, rule.Arguments[2], tableIDBitSize, shiftCnt)
tableID, err = computeID(table, rule.Arguments[2], sep, tableIDBitSize, shiftCnt)
}

return
}

func computeID(name string, prefix string, bitSize int, shiftCount uint) (int64, error) {
func computeID(name string, prefix, sep string, bitSize int, shiftCount uint) (int64, error) {
if name == prefix {
return 0, nil
}

prefix += sep
if len(prefix) >= len(name) || prefix != name[:len(prefix)] {
return 0, errors.NotValidf("%s is not the prefix of %s", prefix, name)
}
Expand Down
44 changes: 39 additions & 5 deletions pkg/column-mapping/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,28 +153,62 @@ func (t *testColumnMappingSuit) TestComputePartitionID(c *C) {
c.Assert(err, NotNil)

rule = &Rule{
Arguments: []string{"2", "test_", "t_"},
Arguments: []string{"2", "test", "t", "_"},
}
instanceID, schemaID, tableID, err := computePartitionID("test_1", "t_1", rule)
c.Assert(err, IsNil)
c.Assert(instanceID, Equals, int64(2<<59))
c.Assert(schemaID, Equals, int64(1<<52))
c.Assert(tableID, Equals, int64(1<<44))

// test default partition ID to zero
instanceID, schemaID, tableID, err = computePartitionID("test", "t_3", rule)
c.Assert(err, IsNil)
c.Assert(instanceID, Equals, int64(2<<59))
c.Assert(schemaID, Equals, int64(0))
c.Assert(tableID, Equals, int64(3<<44))

instanceID, schemaID, tableID, err = computePartitionID("test_5", "t", rule)
c.Assert(err, IsNil)
c.Assert(instanceID, Equals, int64(2<<59))
c.Assert(schemaID, Equals, int64(5<<52))
c.Assert(tableID, Equals, int64(0))

_, _, _, err = computePartitionID("unrelated", "t_6", rule)
c.Assert(err, ErrorMatches, "test_ is not the prefix of unrelated.*")

_, _, _, err = computePartitionID("test", "x", rule)
c.Assert(err, ErrorMatches, "t_ is not the prefix of x.*")

_, _, _, err = computePartitionID("test_0", "t_0xa", rule)
c.Assert(err, ErrorMatches, "the suffix of 0xa can't be converted to int64.*")

_, _, _, err = computePartitionID("test_0", "t_", rule)
c.Assert(err, ErrorMatches, "t_ is not the prefix of t_.*") // needs a better error message

_, _, _, err = computePartitionID("testx", "t_3", rule)
c.Assert(err, ErrorMatches, "test_ is not the prefix of testx.*")

SetPartitionRule(4, 0, 8)
rule = &Rule{
Arguments: []string{"2", "test_", "t_"},
Arguments: []string{"2", "test_", "t_", ""},
}
instanceID, schemaID, tableID, err = computePartitionID("test_1", "t_1", rule)
c.Assert(err, IsNil)
c.Assert(instanceID, Equals, int64(2<<59))
c.Assert(schemaID, Equals, int64(0))
c.Assert(tableID, Equals, int64(1<<51))

instanceID, schemaID, tableID, err = computePartitionID("test_", "t_", rule)
c.Assert(err, IsNil)
c.Assert(instanceID, Equals, int64(2<<59))
c.Assert(schemaID, Equals, int64(0))
c.Assert(tableID, Equals, int64(0))

// test ignore instance ID
SetPartitionRule(4, 7, 8)
rule = &Rule{
Arguments: []string{"", "test_", "t_"},
Arguments: []string{"", "test_", "t_", ""},
}
instanceID, schemaID, tableID, err = computePartitionID("test_1", "t_1", rule)
c.Assert(err, IsNil)
Expand All @@ -184,7 +218,7 @@ func (t *testColumnMappingSuit) TestComputePartitionID(c *C) {

// test ignore schema ID
rule = &Rule{
Arguments: []string{"2", "", "t_"},
Arguments: []string{"2", "", "t_", ""},
}
instanceID, schemaID, tableID, err = computePartitionID("test_1", "t_1", rule)
c.Assert(err, IsNil)
Expand All @@ -194,7 +228,7 @@ func (t *testColumnMappingSuit) TestComputePartitionID(c *C) {

// test ignore schema ID
rule = &Rule{
Arguments: []string{"2", "test_", ""},
Arguments: []string{"2", "test_", "", ""},
}
instanceID, schemaID, tableID, err = computePartitionID("test_1", "t_1", rule)
c.Assert(err, IsNil)
Expand Down
25 changes: 10 additions & 15 deletions pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ import (
)

const (
// ImplicitColName is name of implicit column in TiDB
ImplicitColName = "_tidb_rowid"

// ImplicitColID is ID implicit column in TiDB
ImplicitColID = -1

// DefaultRetryTime is the default retry time to execute sql
DefaultRetryTime = 10

Expand Down Expand Up @@ -72,6 +66,8 @@ type DBConfig struct {
Password string `toml:"password" json:"password"`

Schema string `toml:"schema" json:"schema"`

Snapshot string `toml:"snapshot" json:"snapshot"`
}

// String returns native format of database configuration
Expand Down Expand Up @@ -109,7 +105,14 @@ func GetDBConfigFromEnv(schema string) DBConfig {

// OpenDB opens a mysql connection FD
func OpenDB(cfg DBConfig) (*sql.DB, error) {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", cfg.User, cfg.Password, cfg.Host, cfg.Port)
var dbDSN string
if len(cfg.Snapshot) != 0 {
log.Info("create connection with snapshot", zap.String("snapshot", cfg.Snapshot))
dbDSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&tidb_snapshot=%s", cfg.User, cfg.Password, cfg.Host, cfg.Port, cfg.Snapshot)
} else {
dbDSN = fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4", cfg.User, cfg.Password, cfg.Host, cfg.Port)
}

dbConn, err := sql.Open("mysql", dbDSN)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -561,14 +564,6 @@ func GetTidbLatestTSO(ctx context.Context, db *sql.DB) (int64, error) {
return 0, errors.New("get slave cluster's ts failed")
}

// SetSnapshot set the snapshot variable for tidb
func SetSnapshot(ctx context.Context, db *sql.DB, snapshot string) error {
sql := fmt.Sprintf("SET @@tidb_snapshot='%s'", snapshot)
log.Info("set history snapshot", zap.String("sql", sql))
_, err := db.ExecContext(ctx, sql)
return errors.Trace(err)
}

// GetDBVersion returns the database's version
func GetDBVersion(ctx context.Context, db *sql.DB) (string, error) {
/*
Expand Down
Loading

0 comments on commit eb4fb5d

Please sign in to comment.