Skip to content

Commit 04ca20e

Browse files
committed
GlutenColumnarWriteFilesExec
injectWriteFilesTempPath with optional fileName remove supportTransformWriteFiles
1 parent 4e1fede commit 04ca20e

File tree

19 files changed

+239
-170
lines changed

19 files changed

+239
-170
lines changed

backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java

+8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.spark.SparkConf;
2929
import org.apache.spark.sql.internal.SQLConf;
3030

31+
import java.nio.charset.StandardCharsets;
3132
import java.util.Arrays;
3233
import java.util.List;
3334
import java.util.Map;
@@ -78,6 +79,13 @@ private static Map<String, String> getNativeBackendConf() {
7879
BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs());
7980
}
8081

82+
public static void injectWriteFilesTempPath(String path, String fileName) {
83+
ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath(
84+
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
85+
path.getBytes(StandardCharsets.UTF_8),
86+
fileName.getBytes(StandardCharsets.UTF_8));
87+
}
88+
8189
// Used by WholeStageTransform to create the native computing pipeline and
8290
// return a columnar result iterator.
8391
public static BatchIterator createKernelWithBatchIterator(

backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java

+9
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,13 @@ public static native long nativeCreateKernelWithIterator(
4242
GeneralInIterator[] batchItr,
4343
byte[] confArray,
4444
boolean materializeInput);
45+
46+
/**
47+
* Set the temp path for writing files.
48+
*
49+
* @param allocatorId allocator id for current task attempt(or thread)
50+
* @param path the temp path for writing files
51+
*/
52+
public static native void injectWriteFilesTempPath(
53+
long allocatorId, byte[] path, byte[] filename);
4554
}

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala

-9
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,11 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
2525

2626
import org.apache.spark.SparkEnv
2727
import org.apache.spark.internal.Logging
28-
import org.apache.spark.sql.catalyst.catalog.BucketSpec
2928
import org.apache.spark.sql.catalyst.expressions._
3029
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
3130
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
3231
import org.apache.spark.sql.execution.SparkPlan
3332
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
34-
import org.apache.spark.sql.execution.datasources.FileFormat
3533
import org.apache.spark.sql.internal.SQLConf
3634
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
3735

@@ -286,13 +284,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
286284
.getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT)
287285
}
288286

289-
override def supportWriteFilesExec(
290-
format: FileFormat,
291-
fields: Array[StructField],
292-
bucketSpec: Option[BucketSpec],
293-
options: Map[String, String]): ValidationResult =
294-
ValidationResult.failed("CH backend is unsupported.")
295-
296287
override def enableNativeWriteFiles(): Boolean = {
297288
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
298289
}

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala

+7
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,13 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
290290
None,
291291
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
292292
}
293+
294+
override def injectWriteFilesTempPath(path: String, fileName: Option[String]): Unit = {
295+
if (fileName.isEmpty) {
296+
throw new IllegalArgumentException("fileName should not be empty.")
297+
}
298+
CHNativeExpressionEvaluator.injectWriteFilesTempPath(path, fileName.get)
299+
}
293300
}
294301

295302
class CollectMetricIterator(

backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
4949
import org.apache.spark.sql.delta.files.TahoeFileIndex
5050
import org.apache.spark.sql.execution._
5151
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
52-
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
52+
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, WriteJobDescription}
5353
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
5454
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
5555
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
@@ -679,6 +679,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
679679
throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support in ch backend.")
680680
}
681681

682+
def createBackendWrite(description: WriteJobDescription): BackendWrite =
683+
throw new UnsupportedOperationException("createBackendWrite is not supported in ch backend.")
684+
682685
override def createColumnarArrowEvalPythonExec(
683686
udfs: Seq[PythonUDF],
684687
resultAttrs: Seq[Attribute],

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala

-2
Original file line numberDiff line numberDiff line change
@@ -494,8 +494,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
494494

495495
override def staticPartitionWriteOnly(): Boolean = true
496496

497-
override def supportTransformWriteFiles: Boolean = true
498-
499497
override def allowDecimalArithmetic: Boolean = true
500498

501499
override def enableNativeWriteFiles(): Boolean = {

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
161161
(paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns)
162162
}
163163

164-
override def injectWriteFilesTempPath(path: String): Unit = {
164+
override def injectWriteFilesTempPath(path: String, fileName: Option[String] = None): Unit = {
165165
val transKernel = NativePlanEvaluator.create()
166166
transKernel.injectWriteFilesTempPath(path)
167167
}
@@ -171,7 +171,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
171171
inputPartition: BaseGlutenPartition,
172172
context: TaskContext,
173173
pipelineTime: SQLMetric,
174-
updateInputMetrics: (InputMetricsWrapper) => Unit,
174+
updateInputMetrics: InputMetricsWrapper => Unit,
175175
updateNativeMetrics: IMetrics => Unit,
176176
partitionIndex: Int,
177177
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = {

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
5050
import org.apache.spark.sql.catalyst.rules.Rule
5151
import org.apache.spark.sql.execution._
5252
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
53-
import org.apache.spark.sql.execution.datasources.FileFormat
53+
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription}
5454
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
5555
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode}
5656
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -557,7 +557,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
557557
bucketSpec: Option[BucketSpec],
558558
options: Map[String, String],
559559
staticPartitions: TablePartitionSpec): SparkPlan = {
560-
VeloxColumnarWriteFilesExec(
560+
GlutenColumnarWriteFilesExec(
561561
child,
562562
fileFormat,
563563
partitionColumns,
@@ -566,6 +566,10 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
566566
staticPartitions)
567567
}
568568

569+
override def createBackendWrite(description: WriteJobDescription): BackendWrite = {
570+
VeloxBackendWrite(description)
571+
}
572+
569573
override def createColumnarArrowEvalPythonExec(
570574
udfs: Seq[PythonUDF],
571575
resultAttrs: Seq[Attribute],
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution
18+
19+
import org.apache.gluten.backendsapi.BackendsApiManager
20+
import org.apache.gluten.columnarbatch.ColumnarBatches
21+
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
22+
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
25+
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
26+
import org.apache.spark.sql.execution.datasources._
27+
import org.apache.spark.sql.vectorized.ColumnarBatch
28+
29+
import com.fasterxml.jackson.databind.ObjectMapper
30+
import com.fasterxml.jackson.module.scala.DefaultScalaModule
31+
32+
import scala.collection.mutable
33+
34+
// Velox write files metrics start
35+
//
36+
// Follows the code in velox `HiveDataSink::close()`
37+
// The json can be as following:
38+
// {
39+
// "inMemoryDataSizeInBytes":0,
40+
// "containsNumberedFileNames":true,
41+
// "onDiskDataSizeInBytes":307,
42+
// "fileWriteInfos":[
43+
// {
44+
// "fileSize":307,
45+
// "writeFileName":
46+
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
47+
// "targetFileName":
48+
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
49+
// }
50+
// ],
51+
// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
52+
// "rowCount":1,
53+
// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
54+
// "updateMode":"NEW",
55+
// "name":"part1=1/part2=1"
56+
// }
57+
case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long)
58+
59+
case class VeloxWriteFilesMetrics(
60+
name: String,
61+
updateMode: String,
62+
writePath: String,
63+
targetPath: String,
64+
fileWriteInfos: Seq[VeloxWriteFilesInfo],
65+
rowCount: Long,
66+
inMemoryDataSizeInBytes: Long,
67+
onDiskDataSizeInBytes: Long,
68+
containsNumberedFileNames: Boolean)
69+
70+
// Velox write files metrics end
71+
72+
case class VeloxBackendWrite(description: WriteJobDescription) extends BackendWrite with Logging {
73+
74+
override def injectTempPath(commitProtocol: SparkWriteFilesCommitProtocol): String = {
75+
val writePath = commitProtocol.newTaskAttemptTempPath()
76+
logDebug(s"Velox staging write path: $writePath")
77+
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath)
78+
writePath
79+
}
80+
81+
override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = {
82+
// Currently, the cb contains three columns: row, fragments, and context.
83+
// The first row in the row column contains the number of written numRows.
84+
// The fragments column contains detailed information about the file writes.
85+
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
86+
assert(loadedCb.numCols() == 3)
87+
val numWrittenRows = loadedCb.column(0).getLong(0)
88+
89+
var updatedPartitions = Set.empty[String]
90+
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
91+
var numBytes = 0L
92+
val objectMapper = new ObjectMapper()
93+
objectMapper.registerModule(DefaultScalaModule)
94+
for (i <- 0 until loadedCb.numRows() - 1) {
95+
val fragments = loadedCb.column(1).getUTF8String(i + 1)
96+
val metrics = objectMapper
97+
.readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
98+
logDebug(s"Velox write files metrics: $metrics")
99+
100+
val fileWriteInfos = metrics.fileWriteInfos
101+
assert(fileWriteInfos.length == 1)
102+
val fileWriteInfo = fileWriteInfos.head
103+
numBytes += fileWriteInfo.fileSize
104+
val targetFileName = fileWriteInfo.targetFileName
105+
val outputPath = description.path
106+
107+
// part1=1/part2=1
108+
val partitionFragment = metrics.name
109+
// Write a partitioned table
110+
if (partitionFragment != "") {
111+
updatedPartitions += partitionFragment
112+
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
113+
val customOutputPath = description.customPartitionLocations.get(
114+
PartitioningUtils.parsePathFragment(partitionFragment))
115+
if (customOutputPath.isDefined) {
116+
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
117+
}
118+
}
119+
}
120+
121+
val numFiles = loadedCb.numRows() - 1
122+
val partitionsInternalRows = updatedPartitions.map {
123+
part =>
124+
val parts = new Array[Any](1)
125+
parts(0) = part
126+
new GenericInternalRow(parts)
127+
}.toSeq
128+
val stats = BasicWriteTaskStats(
129+
partitions = partitionsInternalRows,
130+
numFiles = numFiles,
131+
numBytes = numBytes,
132+
numRows = numWrittenRows)
133+
val summary =
134+
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))
135+
136+
// Write an empty iterator
137+
if (numFiles == 0) {
138+
None
139+
} else {
140+
Some(
141+
WriteTaskResult(
142+
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
143+
summary))
144+
}
145+
}
146+
}

backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils
2929
import org.apache.spark.sql.util.QueryExecutionListener
3030

3131
class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
32-
private var _spark: SparkSession = null
32+
private var _spark: SparkSession = _
3333

3434
override protected def beforeAll(): Unit = {
3535
super.beforeAll()
@@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
8686
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
8787
if (!nativeUsed) {
8888
nativeUsed = if (isSparkVersionGE("3.4")) {
89-
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined
89+
qe.executedPlan.find(_.isInstanceOf[GlutenColumnarWriteFilesExec]).isDefined
9090
} else {
9191
qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
9292
}

gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala

-2
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,6 @@ trait BackendSettingsApi {
129129

130130
def staticPartitionWriteOnly(): Boolean = false
131131

132-
def supportTransformWriteFiles: Boolean = false
133-
134132
def requiredInputFilePaths(): Boolean = false
135133

136134
// TODO: Move this to test settings as used in UT only.

gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala

+7-2
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,13 @@ trait IteratorApi {
4747
/**
4848
* Inject the task attempt temporary path for native write files, this method should be called
4949
* before `genFirstStageIterator` or `genFinalStageIterator`
50+
* @param path
51+
* is the temporary directory for native write pipeline
52+
* @param fileName
53+
* is the file name for native write pipeline, if None, backend will generate it.
5054
*/
51-
def injectWriteFilesTempPath(path: String): Unit = throw new UnsupportedOperationException()
55+
def injectWriteFilesTempPath(path: String, fileName: Option[String] = None): Unit =
56+
throw new UnsupportedOperationException()
5257

5358
/**
5459
* Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other
@@ -58,7 +63,7 @@ trait IteratorApi {
5863
inputPartition: BaseGlutenPartition,
5964
context: TaskContext,
6065
pipelineTime: SQLMetric,
61-
updateInputMetrics: (InputMetricsWrapper) => Unit,
66+
updateInputMetrics: InputMetricsWrapper => Unit,
6267
updateNativeMetrics: IMetrics => Unit,
6368
partitionIndex: Int,
6469
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()

gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
3939
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
4040
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
4141
import org.apache.spark.sql.catalyst.rules.Rule
42-
import org.apache.spark.sql.execution.{FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
43-
import org.apache.spark.sql.execution.datasources.FileFormat
42+
import org.apache.spark.sql.execution.{BackendWrite, FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
43+
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription}
4444
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
4545
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
4646
import org.apache.spark.sql.execution.joins.BuildSideRelation
@@ -390,6 +390,9 @@ trait SparkPlanExecApi {
390390
options: Map[String, String],
391391
staticPartitions: TablePartitionSpec): SparkPlan
392392

393+
/** Create BackendWrite */
394+
def createBackendWrite(description: WriteJobDescription): BackendWrite
395+
393396
/** Create ColumnarArrowEvalPythonExec, for velox backend */
394397
def createColumnarArrowEvalPythonExec(
395398
udfs: Seq[PythonUDF],

gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,7 @@ object Validators {
141141
override def validate(plan: SparkPlan): Validator.OutCome = plan match {
142142
case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() => fail(p)
143143
case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => fail(p)
144-
case p: WriteFilesExec
145-
if !(settings.enableNativeWriteFiles() && settings.supportTransformWriteFiles) =>
144+
case p: WriteFilesExec if !settings.enableNativeWriteFiles() =>
146145
fail(p)
147146
case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
148147
fail(p)

0 commit comments

Comments
 (0)