Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
set hive.auto.convert.join=true;
set hive.convert.join.bucket.mapjoin.tez=true;

CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG;
INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');

EXPLAIN
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@tbl
POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) PARTITIONED BY SPEC(bucket(10, id), bucket(10, part)) STORED BY ICEBERG
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl
PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl
POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl
PREHOOK: query: EXPLAIN
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: EXPLAIN
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl
POSTHOOK: Output: hdfs://### HDFS PATH ###
Plan optimized by CBO.

Vertex dependency in root stage
Map 1 <- Map 2 (CUSTOM_EDGE)

Stage-0
Fetch Operator
limit:-1
Stage-1
Map 1 vectorized
File Output Operator [FS_53]
Map Join Operator [MAPJOIN_52] (rows=2 width=530)
BucketMapJoin:true,Conds:SEL_51._col1, _col2=RS_49._col1, _col2(Inner),Output:["_col0","_col1","_col2","_col3","_col4","_col5"]
<-Map 2 [CUSTOM_EDGE] vectorized
MULTICAST [RS_49]
PartitionCols:_col2, _col1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this list is inverted on the master branch, and the result would be empty.

PartitionCols:_col1, _col2

Select Operator [SEL_48] (rows=2 width=265)
Output:["_col0","_col1","_col2"]
Filter Operator [FIL_47] (rows=2 width=265)
predicate:(id is not null and part is not null)
TableScan [TS_3] (rows=2 width=265)
default@tbl,tbl2,Tbl:COMPLETE,Col:COMPLETE,Output:["foid","part","id"]
<-Select Operator [SEL_51] (rows=2 width=265)
Output:["_col0","_col1","_col2"]
Filter Operator [FIL_50] (rows=2 width=265)
predicate:(id is not null and part is not null)
TableScan [TS_0] (rows=2 width=265)
default@tbl,tbl,Tbl:COMPLETE,Col:COMPLETE,Grouping Num Buckets:100,Grouping Partition Columns:["id","part"],Output:["foid","part","id"]

PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl
POSTHOOK: Output: hdfs://### HDFS PATH ###
1234 PART_123 1 1234 PART_123 1
1235 PART_124 2 1235 PART_124 2
Original file line number Diff line number Diff line change
Expand Up @@ -656,32 +656,39 @@ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcCon
// on small table(s).
ReduceSinkOperator bigTableRS = (ReduceSinkOperator)joinOp.getParentOperators().get(bigTablePosition);
OpTraits opTraits = bigTableRS.getOpTraits();
List<List<String>> listBucketCols = opTraits.getBucketColNames();
// It is guaranteed there is only 1 list within bigTableRS.getOpTraits().getBucketColNames().
List<String> listBucketCols = opTraits.getBucketColNames().get(0);
List<ExprNodeDesc> bigTablePartitionCols = bigTableRS.getConf().getPartitionCols();
boolean updatePartitionCols = false;
boolean updatePartitionCols = listBucketCols.size() != bigTablePartitionCols.size();
List<Integer> positions = new ArrayList<>();

CustomBucketFunction bucketFunction = opTraits.getCustomBucketFunctions().get(0);
if (listBucketCols.get(0).size() != bigTablePartitionCols.size()) {
updatePartitionCols = true;
// Prepare updated partition columns for small table(s).
// Get the positions of bucketed columns

int bigTableExprPos = 0;
Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
final boolean[] retainedColumns = new boolean[listBucketCols.get(0).size()];
for (ExprNodeDesc bigTableExpr : bigTablePartitionCols) {
// It is guaranteed there is only 1 list within listBucketCols.
for (int i = 0; i < listBucketCols.get(0).size(); i++) {
final String colName = listBucketCols.get(0).get(i);
if (colExprMap.get(colName).isSame(bigTableExpr)) {
positions.add(bigTableExprPos);
retainedColumns[i] = true;
}
// Compare the partition columns and the bucket columns of bigTableRS.
Map<String, ExprNodeDesc> colExprMap = bigTableRS.getColumnExprMap();
final boolean[] retainedColumns = new boolean[listBucketCols.size()];
for (int bucketColIdx = 0; bucketColIdx < listBucketCols.size(); bucketColIdx++) {
for (int bigTablePartIdx = 0; bigTablePartIdx < bigTablePartitionCols.size(); bigTablePartIdx++) {
ExprNodeDesc bigTablePartExpr = bigTablePartitionCols.get(bigTablePartIdx);
ExprNodeDesc bucketColExpr = colExprMap.get(listBucketCols.get(bucketColIdx));
if (bigTablePartExpr.isSame(bucketColExpr)) {
positions.add(bigTablePartIdx);
retainedColumns[bucketColIdx] = true;
// If the positions of the partition column and the bucket column are not the same,
// then we need to update the position of the partition column in small tables.
updatePartitionCols = updatePartitionCols || bucketColIdx != bigTablePartIdx;
break;
}
bigTableExprPos = bigTableExprPos + 1;
}
}

// If the number of partition columns is less than the number of bucket columns,
// then we cannot properly distribute small tables onto bucketized map tasks.
// Bail out.
if (positions.size() < listBucketCols.size()) {
return false;
}

CustomBucketFunction bucketFunction = opTraits.getCustomBucketFunctions().get(0);
if (updatePartitionCols) {
Preconditions.checkState(opTraits.getCustomBucketFunctions().size() == 1);
if (opTraits.getCustomBucketFunctions().get(0) != null) {
final Optional<CustomBucketFunction> selected =
Expand Down
9 changes: 9 additions & 0 deletions ql/src/test/queries/clientpositive/bucketmapjoin14.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
set hive.auto.convert.join=true;
set hive.convert.join.bucket.mapjoin.tez=true;

CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS;
INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2');

EXPLAIN
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part;
112 changes: 112 additions & 0 deletions ql/src/test/results/clientpositive/llap/bucketmapjoin14.q.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
PREHOOK: query: CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@tbl
POSTHOOK: query: CREATE TABLE tbl (foid string, part string, id string) CLUSTERED BY (id, part) INTO 64 BUCKETS
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl
PREHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl
POSTHOOK: query: INSERT INTO tbl VALUES ('1234', 'PART_123', '1'), ('1235', 'PART_124', '2')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl
POSTHOOK: Lineage: tbl.foid SCRIPT []
POSTHOOK: Lineage: tbl.id SCRIPT []
POSTHOOK: Lineage: tbl.part SCRIPT []
PREHOOK: query: EXPLAIN
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl
#### A masked pattern was here ####
POSTHOOK: query: EXPLAIN
SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl
#### A masked pattern was here ####
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Tez
#### A masked pattern was here ####
Edges:
Map 1 <- Map 2 (CUSTOM_EDGE)
#### A masked pattern was here ####
Vertices:
Map 1
Map Operator Tree:
TableScan
alias: tbl
filterExpr: (id is not null and part is not null) (type: boolean)
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: (id is not null and part is not null) (type: boolean)
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: foid (type: string), part (type: string), id (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
Map Join Operator
condition map:
Inner Join 0 to 1
keys:
0 _col1 (type: string), _col2 (type: string)
1 _col1 (type: string), _col2 (type: string)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
input vertices:
1 Map 2
Statistics: Num rows: 2 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE
File Output Operator
compressed: false
Statistics: Num rows: 2 Data size: 1060 Basic stats: COMPLETE Column stats: COMPLETE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Execution mode: vectorized, llap
LLAP IO: all inputs
Map 2
Map Operator Tree:
TableScan
alias: tbl2
filterExpr: (id is not null and part is not null) (type: boolean)
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
Filter Operator
predicate: (id is not null and part is not null) (type: boolean)
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: foid (type: string), part (type: string), id (type: string)
outputColumnNames: _col0, _col1, _col2
Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
Reduce Output Operator
key expressions: _col1 (type: string), _col2 (type: string)
null sort order: zz
sort order: ++
Map-reduce partition columns: _col2 (type: string), _col1 (type: string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I verified that this list is inverted on the master branch, and the result would be empty.

Map-reduce partition columns: _col1 (type: string), _col2 (type: string)

Statistics: Num rows: 2 Data size: 530 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: string)
Execution mode: vectorized, llap
LLAP IO: all inputs

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

PREHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl
#### A masked pattern was here ####
POSTHOOK: query: SELECT * FROM tbl JOIN tbl tbl2 ON tbl.id = tbl2.id AND tbl.part = tbl2.part
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl
#### A masked pattern was here ####
1234 PART_123 1 1234 PART_123 1
1235 PART_124 2 1235 PART_124 2
Loading