Skip to content

Commit

Permalink
*: move irrelevant code out of package "distsql" (#5893)
Browse files Browse the repository at this point in the history
  • Loading branch information
zz-jason authored Feb 24, 2018
1 parent e112181 commit 8268d4d
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 214 deletions.
99 changes: 8 additions & 91 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -68,11 +66,10 @@ type PartialResult interface {
}

type selectResult struct {
label string
aggregate bool
resp kv.Response
label string
resp kv.Response

results chan newResultWithErr
results chan resultWithErr
closed chan struct{}

rowLen int
Expand All @@ -86,7 +83,7 @@ type selectResult struct {
partialCount int64 // number of partial results.
}

type newResultWithErr struct {
type resultWithErr struct {
result []byte
err error
}
Expand All @@ -107,15 +104,15 @@ func (r *selectResult) fetch(ctx context.Context) {
for {
resultSubset, err := r.resp.Next(ctx)
if err != nil {
r.results <- newResultWithErr{err: errors.Trace(err)}
r.results <- resultWithErr{err: errors.Trace(err)}
return
}
if resultSubset == nil {
return
}

select {
case r.results <- newResultWithErr{result: resultSubset}:
case r.results <- resultWithErr{result: resultSubset}:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
return
Expand Down Expand Up @@ -321,7 +318,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
return &selectResult{
label: "dag",
resp: resp,
results: make(chan newResultWithErr, kvReq.Concurrency),
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
Expand All @@ -338,7 +335,7 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request) (SelectRe
result := &selectResult{
label: "analyze",
resp: resp,
results: make(chan newResultWithErr, kvReq.Concurrency),
results: make(chan resultWithErr, kvReq.Concurrency),
closed: make(chan struct{}),
}
return result, nil
Expand All @@ -348,83 +345,3 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request) (SelectRe
const (
codeInvalidResp = 1
)

// FieldTypeFromPBColumn creates a types.FieldType from tipb.ColumnInfo.
func FieldTypeFromPBColumn(col *tipb.ColumnInfo) *types.FieldType {
return &types.FieldType{
Tp: byte(col.GetTp()),
Flag: uint(col.Flag),
Flen: int(col.GetColumnLen()),
Decimal: int(col.GetDecimal()),
Elems: col.Elems,
Collate: mysql.Collations[uint8(col.GetCollation())],
}
}

func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo {
pc := &tipb.ColumnInfo{
ColumnId: c.ID,
Collation: collationToProto(c.FieldType.Collate),
ColumnLen: int32(c.FieldType.Flen),
Decimal: int32(c.FieldType.Decimal),
Flag: int32(c.Flag),
Elems: c.Elems,
}
pc.Tp = int32(c.FieldType.Tp)
return pc
}

// TODO: update it when more collate is supported.
func collationToProto(c string) int32 {
v := mysql.CollationNames[c]
if v == mysql.BinaryCollationID {
return int32(mysql.BinaryCollationID)
}
// We only support binary and utf8_bin collation.
// Setting other collations to utf8_bin for old data compatibility.
// For the data created when we didn't enforce utf8_bin collation in create table.
return int32(mysql.DefaultCollationID)
}

// ColumnsToProto converts a slice of model.ColumnInfo to a slice of tipb.ColumnInfo.
func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool) []*tipb.ColumnInfo {
cols := make([]*tipb.ColumnInfo, 0, len(columns))
for _, c := range columns {
col := columnToProto(c)
// TODO: Here `PkHandle`'s meaning is changed, we will change it to `IsHandle` when tikv's old select logic
// is abandoned.
if (pkIsHandle && mysql.HasPriKeyFlag(c.Flag)) || c.ID == model.ExtraHandleID {
col.PkHandle = true
} else {
col.PkHandle = false
}
cols = append(cols, col)
}
return cols
}

// IndexToProto converts a model.IndexInfo to a tipb.IndexInfo.
func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo {
pi := &tipb.IndexInfo{
TableId: t.ID,
IndexId: idx.ID,
Unique: idx.Unique,
}
cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns)+1)
for _, c := range idx.Columns {
cols = append(cols, columnToProto(t.Columns[c.Offset]))
}
if t.PKIsHandle {
// Coprocessor needs to know PKHandle column info, so we need to append it.
for _, col := range t.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
colPB := columnToProto(col)
colPB.PkHandle = true
cols = append(cols, colPB)
break
}
}
}
pi.Columns = cols
return pi
}
117 changes: 2 additions & 115 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016 PingCAP, Inc.
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,124 +14,11 @@
package distsql

import (
"errors"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testleak"
"github.com/juju/errors"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)

func TestT(t *testing.T) {
CustomVerboseFlag = true
TestingT(t)
}

var _ = Suite(&testDistsqlSuite{})

type testDistsqlSuite struct{}

func (s *testDistsqlSuite) TestColumnToProto(c *C) {
defer testleak.AfterTest(c)()
// Make sure the Flag is set in tipb.ColumnInfo
tp := types.NewFieldType(mysql.TypeLong)
tp.Flag = 10
tp.Collate = "utf8_bin"
col := &model.ColumnInfo{
FieldType: *tp,
}
pc := columnToProto(col)
c.Assert(pc.GetFlag(), Equals, int32(10))
ntp := FieldTypeFromPBColumn(pc)
c.Assert(ntp, DeepEquals, tp)

cols := []*model.ColumnInfo{col, col}
pcs := ColumnsToProto(cols, false)
for _, v := range pcs {
c.Assert(v.GetFlag(), Equals, int32(10))
}
pcs = ColumnsToProto(cols, true)
for _, v := range pcs {
c.Assert(v.GetFlag(), Equals, int32(10))
}

// Make sure we only convert to supported collate.
tp = types.NewFieldType(mysql.TypeVarchar)
tp.Flag = 10
tp.Collate = "latin1_swedish_ci"
col = &model.ColumnInfo{
FieldType: *tp,
}
pc = columnToProto(col)
c.Assert(pc.Collation, Equals, int32(mysql.DefaultCollationID))
}

func (s *testDistsqlSuite) TestIndexToProto(c *C) {
defer testleak.AfterTest(c)()
cols := []*model.ColumnInfo{
{
ID: 1,
Name: model.NewCIStr("col1"),
Offset: 1,
},
{
ID: 2,
Name: model.NewCIStr("col2"),
Offset: 2,
},
}
cols[0].Flag |= mysql.PriKeyFlag

idxCols := []*model.IndexColumn{
{
Name: model.NewCIStr("col1"),
Offset: 1,
Length: 1,
},
{
Name: model.NewCIStr("col1"),
Offset: 1,
Length: 1,
},
}

idxInfos := []*model.IndexInfo{
{
ID: 1,
Name: model.NewCIStr("idx1"),
Table: model.NewCIStr("test"),
Columns: idxCols,
Unique: true,
Primary: true,
},
{
ID: 2,
Name: model.NewCIStr("idx2"),
Table: model.NewCIStr("test"),
Columns: idxCols,
Unique: true,
Primary: true,
},
}

tbInfo := model.TableInfo{
ID: 1,
Name: model.NewCIStr("test"),
Columns: cols,
Indices: idxInfos,
PKIsHandle: true,
}

pIdx := IndexToProto(&tbInfo, idxInfos[0])
c.Assert(pIdx.TableId, Equals, int64(1))
c.Assert(pIdx.IndexId, Equals, int64(1))
c.Assert(pIdx.Unique, Equals, true)
}

type mockResponse struct {
count int
}
Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plan.AnalyzeColumnsTa
BucketSize: maxBucketSize,
SampleSize: maxRegionSampleSize,
SketchSize: maxSketchSize,
ColumnsInfo: distsql.ColumnsToProto(cols, task.TableInfo.PKIsHandle),
ColumnsInfo: plan.ColumnsToProto(cols, task.TableInfo.PKIsHandle),
CmsketchDepth: &depth,
CmsketchWidth: &width,
}
Expand Down
74 changes: 71 additions & 3 deletions plan/plan_to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ package plan

import (
"github.com/juju/errors"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -93,7 +93,7 @@ func (p *PhysicalTableScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error)
columns := p.Columns
tsExec := &tipb.TableScan{
TableId: p.Table.ID,
Columns: distsql.ColumnsToProto(columns, p.Table.PKIsHandle),
Columns: ColumnsToProto(columns, p.Table.PKIsHandle),
Desc: p.Desc,
}
err := setPBColumnsDefaultValue(ctx, tsExec.Columns, p.Columns)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (p *PhysicalIndexScan) ToPB(ctx sessionctx.Context) (*tipb.Executor, error)
idxExec := &tipb.IndexScan{
TableId: p.Table.ID,
IndexId: p.Index.ID,
Columns: distsql.ColumnsToProto(columns, p.Table.PKIsHandle),
Columns: ColumnsToProto(columns, p.Table.PKIsHandle),
Desc: p.Desc,
}
unique := checkCoverIndex(p.Index, p.Ranges)
Expand Down Expand Up @@ -162,3 +162,71 @@ func setPBColumnsDefaultValue(ctx sessionctx.Context, pbColumns []*tipb.ColumnIn
}
return nil
}

// ColumnsToProto converts a slice of model.ColumnInfo to a slice of tipb.ColumnInfo.
func ColumnsToProto(columns []*model.ColumnInfo, pkIsHandle bool) []*tipb.ColumnInfo {
cols := make([]*tipb.ColumnInfo, 0, len(columns))
for _, c := range columns {
col := columnToProto(c)
// TODO: Here `PkHandle`'s meaning is changed, we will change it to `IsHandle` when tikv's old select logic
// is abandoned.
if (pkIsHandle && mysql.HasPriKeyFlag(c.Flag)) || c.ID == model.ExtraHandleID {
col.PkHandle = true
} else {
col.PkHandle = false
}
cols = append(cols, col)
}
return cols
}

// IndexToProto converts a model.IndexInfo to a tipb.IndexInfo.
func IndexToProto(t *model.TableInfo, idx *model.IndexInfo) *tipb.IndexInfo {
pi := &tipb.IndexInfo{
TableId: t.ID,
IndexId: idx.ID,
Unique: idx.Unique,
}
cols := make([]*tipb.ColumnInfo, 0, len(idx.Columns)+1)
for _, c := range idx.Columns {
cols = append(cols, columnToProto(t.Columns[c.Offset]))
}
if t.PKIsHandle {
// Coprocessor needs to know PKHandle column info, so we need to append it.
for _, col := range t.Columns {
if mysql.HasPriKeyFlag(col.Flag) {
colPB := columnToProto(col)
colPB.PkHandle = true
cols = append(cols, colPB)
break
}
}
}
pi.Columns = cols
return pi
}

func columnToProto(c *model.ColumnInfo) *tipb.ColumnInfo {
pc := &tipb.ColumnInfo{
ColumnId: c.ID,
Collation: collationToProto(c.FieldType.Collate),
ColumnLen: int32(c.FieldType.Flen),
Decimal: int32(c.FieldType.Decimal),
Flag: int32(c.Flag),
Elems: c.Elems,
}
pc.Tp = int32(c.FieldType.Tp)
return pc
}

// TODO: update it when more collate is supported.
func collationToProto(c string) int32 {
v := mysql.CollationNames[c]
if v == mysql.BinaryCollationID {
return int32(mysql.BinaryCollationID)
}
// We only support binary and utf8_bin collation.
// Setting other collations to utf8_bin for old data compatibility.
// For the data created when we didn't enforce utf8_bin collation in create table.
return int32(mysql.DefaultCollationID)
}
Loading

0 comments on commit 8268d4d

Please sign in to comment.