Skip to content

Commit 86269ac

Browse files
committed
[GLUTEN-6067][CH] Support CH backend with Spark3.5 (Task 1 and Taks 2)
Support CH backend with Spark3.5: 1. Upgrade Spark version to 3.5 and compile passed; 2. Upgrade Delta version to 3.2 and compile passed; 3. CH backend UT passed: (now only the MergeTree + Delta UT passed); 4. Parquet native write passed; 5. Gluten UT passed; 6. Support to run Gluten CH CI with Spark 3.5
1 parent 142cf0f commit 86269ac

File tree

49 files changed

+8913
-279
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+8913
-279
lines changed

backends-clickhouse/pom.xml

+33
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,38 @@
215215
<version>8.5.9</version>
216216
<scope>test</scope>
217217
</dependency>
218+
<dependency>
219+
<groupId>org.apache.arrow</groupId>
220+
<artifactId>arrow-memory-core</artifactId>
221+
<version>${arrow.version}</version>
222+
<scope>provided</scope>
223+
<exclusions>
224+
<exclusion>
225+
<groupId>io.netty</groupId>
226+
<artifactId>netty-common</artifactId>
227+
</exclusion>
228+
<exclusion>
229+
<groupId>io.netty</groupId>
230+
<artifactId>netty-buffer</artifactId>
231+
</exclusion>
232+
</exclusions>
233+
</dependency>
234+
<dependency>
235+
<groupId>org.apache.arrow</groupId>
236+
<artifactId>arrow-vector</artifactId>
237+
<version>${arrow.version}</version>
238+
<scope>provided</scope>
239+
<exclusions>
240+
<exclusion>
241+
<groupId>io.netty</groupId>
242+
<artifactId>netty-common</artifactId>
243+
</exclusion>
244+
<exclusion>
245+
<groupId>io.netty</groupId>
246+
<artifactId>netty-buffer</artifactId>
247+
</exclusion>
248+
</exclusions>
249+
</dependency>
218250
</dependencies>
219251

220252
<build>
@@ -272,6 +304,7 @@
272304
</includes>
273305
<excludes>
274306
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/*.scala</exclude>
307+
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/commands/merge/*.scala</exclude>
275308
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/files/*.scala</exclude>
276309
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/DeltaLog.scala</exclude>
277310
<exclude>src/main/delta-${delta.binary.version}/org/apache/spark/sql/delta/Snapshot.scala</exclude>

backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala

+1-15
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
3232
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FakeRowAdaptor, FileFormatWriter, WriteJobStatsTracker}
3333
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeFileFormatWriter
3434
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
35-
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
3635
import org.apache.spark.util.{Clock, SerializableConfiguration}
3736

3837
import org.apache.commons.lang3.exception.ExceptionUtils
@@ -139,20 +138,7 @@ class ClickhouseOptimisticTransaction(
139138
MergeTreeFileFormatWriter.write(
140139
sparkSession = spark,
141140
plan = newQueryPlan,
142-
fileFormat = new DeltaMergeTreeFileFormat(
143-
metadata,
144-
tableV2.dataBaseName,
145-
tableV2.tableName,
146-
ClickhouseSnapshot.genSnapshotId(tableV2.snapshot),
147-
tableV2.orderByKeyOption,
148-
tableV2.lowCardKeyOption,
149-
tableV2.minmaxIndexKeyOption,
150-
tableV2.bfIndexKeyOption,
151-
tableV2.setIndexKeyOption,
152-
tableV2.primaryKeyOption,
153-
tableV2.clickhouseTableConfigs,
154-
tableV2.partitionColumns
155-
),
141+
fileFormat = tableV2.getFileFormat(metadata),
156142
// formats.
157143
committer = committer,
158144
outputSpec = outputSpec,
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,8 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql.execution.datasources.v2.clickhouse
17+
package org.apache.spark.sql.delta
1818

19-
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
20-
21-
object DeltaLogAdapter {
22-
def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.unsafeVolatileSnapshot
19+
object DeltaAdapter extends DeltaAdapterTrait {
20+
override def snapshot(deltaLog: DeltaLog): Snapshot = deltaLog.snapshot
2321
}

backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala

+11-153
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
package org.apache.spark.sql.delta.catalog
1818
import org.apache.spark.internal.Logging
1919
import org.apache.spark.sql.SparkSession
20-
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
20+
import org.apache.spark.sql.catalyst.catalog.CatalogTable
2121
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2222
import org.apache.spark.sql.connector.read.InputPartition
2323
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
24-
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTimeTravelSpec}
24+
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog, DeltaTimeTravelSpec, Snapshot}
2525
import org.apache.spark.sql.delta.actions.Metadata
2626
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
2727
import org.apache.spark.sql.delta.sources.DeltaDataSource
@@ -54,8 +54,8 @@ class ClickHouseTableV2(
5454
tableIdentifier,
5555
timeTravelOpt,
5656
options,
57-
cdcOptions) {
58-
protected def getMetadata: Metadata = if (snapshot == null) Metadata() else snapshot.metadata
57+
cdcOptions)
58+
with ClickHouseTableV2Base {
5959

6060
lazy val (rootPath, partitionFilters, timeTravelByPath) = {
6161
if (catalogTable.isDefined) {
@@ -93,126 +93,6 @@ class ClickHouseTableV2(
9393
new WriteIntoDeltaBuilder(deltaLog, info.options)
9494
}
9595

96-
lazy val dataBaseName = catalogTable
97-
.map(_.identifier.database.getOrElse("default"))
98-
.getOrElse("clickhouse")
99-
100-
lazy val tableName = catalogTable
101-
.map(_.identifier.table)
102-
.getOrElse(path.toUri.getPath)
103-
104-
lazy val bucketOption: Option[BucketSpec] = {
105-
val tableProperties = properties()
106-
if (tableProperties.containsKey("numBuckets")) {
107-
val numBuckets = tableProperties.get("numBuckets").trim.toInt
108-
val bucketColumnNames: Seq[String] =
109-
tableProperties.get("bucketColumnNames").split(",").map(_.trim).toSeq
110-
val sortColumnNames: Seq[String] = if (tableProperties.containsKey("orderByKey")) {
111-
tableProperties.get("orderByKey").split(",").map(_.trim).toSeq
112-
} else Seq.empty[String]
113-
Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames))
114-
} else {
115-
None
116-
}
117-
}
118-
119-
lazy val lowCardKeyOption: Option[Seq[String]] = {
120-
getCommaSeparatedColumns("lowCardKey")
121-
}
122-
123-
lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
124-
getCommaSeparatedColumns("minmaxIndexKey")
125-
}
126-
127-
lazy val bfIndexKeyOption: Option[Seq[String]] = {
128-
getCommaSeparatedColumns("bloomfilterIndexKey")
129-
}
130-
131-
lazy val setIndexKeyOption: Option[Seq[String]] = {
132-
getCommaSeparatedColumns("setIndexKey")
133-
}
134-
135-
private def getCommaSeparatedColumns(keyName: String) = {
136-
val tableProperties = properties()
137-
if (tableProperties.containsKey(keyName)) {
138-
if (tableProperties.get(keyName).nonEmpty) {
139-
val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
140-
keys.foreach(
141-
s => {
142-
if (s.contains(".")) {
143-
throw new IllegalStateException(
144-
s"$keyName $s can not contain '.' (not support nested column yet)")
145-
}
146-
})
147-
Some(keys.map(s => s.toLowerCase()))
148-
} else {
149-
None
150-
}
151-
} else {
152-
None
153-
}
154-
}
155-
156-
lazy val orderByKeyOption: Option[Seq[String]] = {
157-
if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) {
158-
val orderByKes = bucketOption.get.sortColumnNames
159-
val invalidKeys = orderByKes.intersect(partitionColumns)
160-
if (invalidKeys.nonEmpty) {
161-
throw new IllegalStateException(
162-
s"partition cols $invalidKeys can not be in the order by keys.")
163-
}
164-
Some(orderByKes)
165-
} else {
166-
val tableProperties = properties()
167-
if (tableProperties.containsKey("orderByKey")) {
168-
if (tableProperties.get("orderByKey").nonEmpty) {
169-
val orderByKes = tableProperties.get("orderByKey").split(",").map(_.trim).toSeq
170-
val invalidKeys = orderByKes.intersect(partitionColumns)
171-
if (invalidKeys.nonEmpty) {
172-
throw new IllegalStateException(
173-
s"partition cols $invalidKeys can not be in the order by keys.")
174-
}
175-
Some(orderByKes)
176-
} else {
177-
None
178-
}
179-
} else {
180-
None
181-
}
182-
}
183-
}
184-
185-
lazy val primaryKeyOption: Option[Seq[String]] = {
186-
if (orderByKeyOption.isDefined) {
187-
val tableProperties = properties()
188-
if (tableProperties.containsKey("primaryKey")) {
189-
if (tableProperties.get("primaryKey").nonEmpty) {
190-
val primaryKeys = tableProperties.get("primaryKey").split(",").map(_.trim).toSeq
191-
if (!orderByKeyOption.get.mkString(",").startsWith(primaryKeys.mkString(","))) {
192-
throw new IllegalStateException(
193-
s"Primary key $primaryKeys must be a prefix of the sorting key")
194-
}
195-
Some(primaryKeys)
196-
} else {
197-
None
198-
}
199-
} else {
200-
None
201-
}
202-
} else {
203-
None
204-
}
205-
}
206-
207-
lazy val partitionColumns = snapshot.metadata.partitionColumns
208-
209-
lazy val clickhouseTableConfigs: Map[String, String] = {
210-
val tableProperties = properties()
211-
val configs = scala.collection.mutable.Map[String, String]()
212-
configs += ("storage_policy" -> tableProperties.getOrDefault("storage_policy", "default"))
213-
configs.toMap
214-
}
215-
21696
def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
21797
new DeltaMergeTreeFileFormat(
21898
meta,
@@ -230,41 +110,19 @@ class ClickHouseTableV2(
230110
)
231111
}
232112

233-
def cacheThis(): Unit = {
234-
deltaLog2Table.put(deltaLog, this)
235-
}
113+
override def deltaProperties(): ju.Map[String, String] = properties()
236114

237-
cacheThis()
115+
override def deltaCatalog(): Option[CatalogTable] = catalogTable
238116

239-
def primaryKey(): String = primaryKeyOption match {
240-
case Some(keys) => keys.mkString(",")
241-
case None => ""
242-
}
243-
244-
def orderByKey(): String = orderByKeyOption match {
245-
case Some(keys) => keys.mkString(",")
246-
case None => "tuple()"
247-
}
248-
249-
def lowCardKey(): String = lowCardKeyOption match {
250-
case Some(keys) => keys.mkString(",")
251-
case None => ""
252-
}
117+
override def deltaPath(): Path = path
253118

254-
def minmaxIndexKey(): String = minmaxIndexKeyOption match {
255-
case Some(keys) => keys.mkString(",")
256-
case None => ""
257-
}
119+
override def deltaSnapshot(): Snapshot = snapshot
258120

259-
def bfIndexKey(): String = bfIndexKeyOption match {
260-
case Some(keys) => keys.mkString(",")
261-
case None => ""
121+
def cacheThis(): Unit = {
122+
deltaLog2Table.put(deltaLog, this)
262123
}
263124

264-
def setIndexKey(): String = setIndexKeyOption match {
265-
case Some(keys) => keys.mkString(",")
266-
case None => ""
267-
}
125+
cacheThis()
268126
}
269127

270128
@SuppressWarnings(Array("io.github.zhztheplayer.scalawarts.InheritFromCaseClass"))

0 commit comments

Comments
 (0)