17
17
package com ._4paradigm .openmldb .batch .nodes
18
18
19
19
import java .util
20
-
21
20
import com ._4paradigm .hybridse .vm .PhysicalWindowAggrerationNode
22
- import com ._4paradigm .openmldb .batch .utils .{AutoDestructibleIterator , HybridseUtil , PhysicalNodeUtil , SkewUtils , SparkUtil }
21
+ import com ._4paradigm .openmldb .batch .utils .{AutoDestructibleIterator , HybridseUtil ,
22
+ PhysicalNodeUtil , SkewUtils , SparkUtil }
23
23
import com ._4paradigm .openmldb .batch .window .WindowAggPlanUtil .WindowAggConfig
24
24
import com ._4paradigm .openmldb .batch .window .{WindowAggPlanUtil , WindowComputer }
25
- import com ._4paradigm .openmldb .batch .{PlanContext , OpenmldbBatchConfig , SparkInstance }
25
+ import com ._4paradigm .openmldb .batch .{OpenmldbBatchConfig , PlanContext , SparkInstance }
26
26
import org .apache .spark .sql .catalyst .InternalRow
27
- import org .apache .spark .sql .types ._
27
+ import org .apache .spark .sql .types .{ IntegerType , LongType , StructType , TimestampType }
28
28
import org .apache .spark .sql .{DataFrame , Row }
29
29
import org .apache .spark .util .SerializableConfiguration
30
30
import org .slf4j .LoggerFactory
31
31
32
- import scala .collection .JavaConverters ._
33
-
32
+ import scala .collection .JavaConverters .{bufferAsJavaListConverter , asScalaBufferConverter }
34
33
35
34
/** The planner which implements window agg physical node.
36
35
*
@@ -68,7 +67,11 @@ object WindowAggPlan {
68
67
val dfWithIndex = inputTable.getDfConsideringIndex(ctx, physicalNode.GetNodeId ())
69
68
70
69
// Do union if physical node has union flag
71
- val unionTable = if (isWindowWithUnion) WindowAggPlanUtil .windowUnionTables(ctx, physicalNode, dfWithIndex) else dfWithIndex
70
+ val unionTable = if (isWindowWithUnion) {
71
+ WindowAggPlanUtil .windowUnionTables(ctx, physicalNode, dfWithIndex)
72
+ } else {
73
+ dfWithIndex
74
+ }
72
75
73
76
// Do groupby and sort with window skew optimization or not
74
77
val repartitionDf = if (isWindowSkewOptimization) {
@@ -99,7 +102,7 @@ object WindowAggPlan {
99
102
val computer = WindowAggPlanUtil .createComputer(partitionIndex, hadoopConf, sparkFeConfig, windowAggConfig)
100
103
unsafeWindowAggIter(computer, iter, sparkFeConfig, windowAggConfig, outputSchema)
101
104
}
102
- SparkUtil .RddInternalRowToDf (ctx.getSparkSession, outputInternalRowRdd, outputSchema)
105
+ SparkUtil .rddInternalRowToDf (ctx.getSparkSession, outputInternalRowRdd, outputSchema)
103
106
104
107
} else { // isUnsafeRowOptimization is false
105
108
val outputRdd = if (isWindowWithUnion) {
@@ -170,7 +173,8 @@ object WindowAggPlan {
170
173
val distributionTableName = " _DISTRIBUTION_TABLE_" + uniqueNamePostfix
171
174
val countColumnName = " _COUNT_" + uniqueNamePostfix
172
175
173
- val distributionSqlText = SkewUtils .genPercentileSql(inputTableName, quantile.intValue(), repartitionColNames, orderbyColName, countColumnName)
176
+ val distributionSqlText = SkewUtils
177
+ .genPercentileSql(inputTableName, quantile.intValue(), repartitionColNames, orderbyColName, countColumnName)
174
178
logger.info(s " Generate distribution sql: $distributionSqlText" )
175
179
val distributionDf = ctx.sparksql(distributionSqlText)
176
180
distributionDf.createOrReplaceTempView(distributionTableName)
@@ -179,7 +183,8 @@ object WindowAggPlan {
179
183
val keysMap = new util.HashMap [String , String ]()
180
184
keyScala.foreach(e => keysMap.put(e, e))
181
185
182
- val addColumnsSqlText = SkewUtils .genPercentileTagSql(inputTableName, distributionTableName, quantile.intValue(), schemas, keysMap, orderbyColName,
186
+ val addColumnsSqlText = SkewUtils .genPercentileTagSql(inputTableName, distributionTableName,
187
+ quantile.intValue(), schemas, keysMap, orderbyColName,
183
188
partColumnName, expandColumnName, countColumnName, ctx.getConf.skewCnt.longValue())
184
189
logger.info(s " Generate add columns sql: $addColumnsSqlText" )
185
190
ctx.sparksql(addColumnsSqlText)
@@ -189,14 +194,16 @@ object WindowAggPlan {
189
194
190
195
val distributionMap = Map (distributionCollect.map(p => (p.get(0 ), p.get(1 ))):_* )
191
196
192
- val outputSchema = inputDf.schema.add(" _PART_" , IntegerType , false ).add(" _EXPAND_" , IntegerType , false )
197
+ val outputSchema = inputDf.schema.add(" _PART_" , IntegerType , false )
198
+ .add(" _EXPAND_" , IntegerType , false )
193
199
194
200
val outputRdd = inputDf.rdd.map(row => {
195
201
// Combine the repartition keys to one string which is equal to the first column of skew config
196
202
val combineString = repartitionColIndexes.map(index => row.get(index)).mkString(" _" )
197
203
// TODO: Support for more datatype of orderby columns
198
204
val condition = if (orderbyColType.equals(TimestampType )) {
199
- row.get(orderbyColIndex).asInstanceOf [java.sql.Timestamp ].compareTo(distributionMap(combineString).asInstanceOf [java.sql.Timestamp ])
205
+ row.get(orderbyColIndex).asInstanceOf [java.sql.Timestamp ].compareTo(distributionMap(combineString)
206
+ .asInstanceOf [java.sql.Timestamp ])
200
207
} else if (orderbyColType.equals(LongType )) {
201
208
row.get(orderbyColIndex).asInstanceOf [Long ].compareTo(distributionMap(combineString).asInstanceOf [Long ])
202
209
} else {
0 commit comments