Skip to content

Commit

Permalink
[GLUTEN-6067][CH] Support Spark3.5 with Scala2.13 for CH backend (#6311)
Browse files Browse the repository at this point in the history
Support Spark3.5 with Scala2.13 for CH backend:
1. add a profile for the Scala 2.13
2. Add `toSeq` for all the ArrayBuffer
  • Loading branch information
zzcclp authored Jul 3, 2024
1 parent 403be39 commit 7b0caf4
Show file tree
Hide file tree
Showing 39 changed files with 223 additions and 67 deletions.
6 changes: 3 additions & 3 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.13.5</version>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -126,13 +126,13 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ClickhouseOptimisticTransaction(
def this(
deltaLog: DeltaLog,
catalogTable: Option[CatalogTable],
snapshotOpt: Option[Snapshot] = None) {
snapshotOpt: Option[Snapshot] = None) = {
this(
deltaLog,
catalogTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.StateCache
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql._
Expand Down Expand Up @@ -126,7 +125,27 @@ class Snapshot(
* This potentially triggers an IO operation to read the inCommitTimestamp.
* This is a lazy val, so repeated calls will not trigger multiple IO operations.
*/
protected lazy val getInCommitTimestampOpt: Option[Long] =
protected lazy val getInCommitTimestampOpt: Option[Long] = {
// --- modified start
// This implicit is for scala 2.12, copy from scala 2.13
implicit class OptionExtCompanion(opt: Option.type) {
/**
* When a given condition is true, evaluates the a argument and returns Some(a).
* When the condition is false, a is not evaluated and None is returned.
*/
def when[A](cond: Boolean)(a: => A): Option[A] = if (cond) Some(a) else None

/**
* When a given condition is false, evaluates the a argument and returns Some(a).
* When the condition is true, a is not evaluated and None is returned.
*/
def whenNot[A](cond: Boolean)(a: => A): Option[A] = if (!cond) Some(a) else None

/** Sum up all the `options`, substituting `default` for each `None`. */
def sum[N: Numeric](default: N)(options: Option[N]*): N =
options.map(_.getOrElse(default)).sum
}
// --- modified end
Option.when(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(metadata)) {
_reconstructedProtocolMetadataAndICT.inCommitTimestamp
.getOrElse {
Expand Down Expand Up @@ -158,6 +177,7 @@ class Snapshot(
}
}
}
}


private[delta] lazy val nonFileActions: Seq[Action] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ trait VacuumCommandImpl extends DeltaCommand {
// This is never going to be a path relative to `basePath` for DVs.
None
}
case None => None
case _ => None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
partitionColumns: Seq[String]) = {
this(protocol, metadata)
this.database = database
this.tableName = tableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
}
dataSchema += newField
}
StructType(dataSchema)
StructType(dataSchema.toSeq)
}

private def createNativeIterator(
Expand Down Expand Up @@ -114,7 +114,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
if (scan.fileFormat == ReadFileFormat.TextReadFormat) {
val names =
ConverterUtils.collectAttributeNamesWithoutExprId(scan.outputAttributes())
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema, names.asScala))
localFilesNode.setFileSchema(getFileSchema(scan.getDataSchema, names.asScala.toSeq))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ object MetricsUtil extends Logging {

/** Get all processors */
def getAllProcessorList(metricData: MetricsData): Seq[MetricsProcessor] = {
metricData.steps.asScala.flatMap(
step => {
step.processors.asScala
})
metricData.steps.asScala
.flatMap(
step => {
step.processors.asScala
})
.toSeq
}

/** Update extra time metric by the processors */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
sparkSession
)
}
partitions
partitions.toSeq
}

def genInputPartitionSeq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,12 @@ abstract class MergeTreeFileFormatDataWriter(
releaseResources()
val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
// committer.commitTask(taskAttemptContext)
val statuses = returnedMetrics.map(
v => {
v._2
})
val statuses = returnedMetrics
.map(
v => {
v._2
})
.toSeq
new TaskCommitMessage(statuses)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.types.DoubleType
import java.util.concurrent.ForkJoinPool

import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector

class GlutenClickHouseTPCHParquetAQEConcurrentSuite
extends GlutenClickHouseTPCHAbstractSuite
Expand Down Expand Up @@ -74,7 +75,7 @@ class GlutenClickHouseTPCHParquetAQEConcurrentSuite

test("fix race condition at the global variable of ColumnarOverrideRules::isAdaptiveContext") {

val queries = ((1 to 22) ++ (1 to 22) ++ (1 to 22) ++ (1 to 22)).par
val queries = ParVector((1 to 22) ++ (1 to 22) ++ (1 to 22) ++ (1 to 22): _*)
queries.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(22))
queries.map(queryId => runTPCHQuery(queryId) { df => })

Expand Down
6 changes: 3 additions & 3 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.13.5</version>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -113,13 +113,13 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
4 changes: 2 additions & 2 deletions gluten-celeborn/clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
Expand All @@ -138,7 +138,7 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
6 changes: 3 additions & 3 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<version>1.13.5</version>
<version>1.17.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand All @@ -111,13 +111,13 @@
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-mockito_2.12</artifactId>
<artifactId>scalatestplus-mockito_${scala.binary.version}</artifactId>
<version>1.0.0-M2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatestplus</groupId>
<artifactId>scalatestplus-scalacheck_2.12</artifactId>
<artifactId>scalatestplus-scalacheck_${scala.binary.version}</artifactId>
<version>3.1.0.0-RC2</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ private[gluten] class GlutenSessionExtensions extends (SparkSessionExtensions =>
}

private[gluten] trait GlutenSparkExtensionsInjector {
def inject(extensions: SparkSessionExtensions)
def inject(extensions: SparkSessionExtensions): Unit
}

private[gluten] object GlutenPlugin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
}

transformChildren(child, basicScanExecTransformers)
basicScanExecTransformers
basicScanExecTransformers.toSeq
}

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object ConverterUtils extends Logging {
}

def collectAttributeTypeNodes(attributes: JList[Attribute]): JList[TypeNode] = {
collectAttributeTypeNodes(attributes.asScala)
collectAttributeTypeNodes(attributes.asScala.toSeq)
}

def collectAttributeTypeNodes(attributes: Seq[Attribute]): JList[TypeNode] = {
Expand All @@ -85,7 +85,7 @@ object ConverterUtils extends Logging {
}

def collectAttributeNamesWithExprId(attributes: JList[Attribute]): JList[String] = {
collectAttributeNamesWithExprId(attributes.asScala)
collectAttributeNamesWithExprId(attributes.asScala.toSeq)
}

def collectAttributeNamesWithExprId(attributes: Seq[Attribute]): JList[String] = {
Expand Down Expand Up @@ -197,7 +197,7 @@ object ConverterUtils extends Logging {
val (field, nullable) = parseFromSubstraitType(typ)
StructField("", field, nullable)
}
(StructType(fields), isNullable(substraitType.getStruct.getNullability))
(StructType(fields.toSeq), isNullable(substraitType.getStruct.getNullability))
case Type.KindCase.LIST =>
val list = substraitType.getList
val (elementType, containsNull) = parseFromSubstraitType(list.getType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object UDFMappings extends Logging {
val pythonUDFMap: Map[String, String] = Map()
val scalaUDFMap: Map[String, String] = Map()

private def appendKVToMap(key: String, value: String, res: Map[String, String]) {
private def appendKVToMap(key: String, value: String, res: Map[String, String]): Unit = {
if (key.isEmpty || value.isEmpty()) {
throw new IllegalArgumentException(s"key:$key or value:$value is empty")
}
Expand All @@ -46,7 +46,7 @@ object UDFMappings extends Logging {
res.put(key.toLowerCase(Locale.ROOT), value)
}

private def parseStringToMap(input: String, res: Map[String, String]) {
private def parseStringToMap(input: String, res: Map[String, String]): Unit = {
input.split(",").map {
item =>
val keyValue = item.split(":")
Expand All @@ -57,7 +57,7 @@ object UDFMappings extends Logging {
}
}

def loadFromSparkConf(conf: SparkConf) {
def loadFromSparkConf(conf: SparkConf): Unit = {
val strHiveUDFs = conf.get(GlutenConfig.GLUTEN_SUPPORTED_HIVE_UDFS, "")
if (!StringUtils.isBlank(strHiveUDFs)) {
parseStringToMap(strHiveUDFs, hiveUDFMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,12 @@ class EnumeratedApplier(session: SparkSession)
with Logging
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex = 16
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
16
} else {
14
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ class HeuristicApplier(session: SparkSession)
with Logging
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other versions of spark.
private val aqeStackTraceIndex = 19
private val aqeStackTraceIndex =
if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
19
} else {
17
}
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)

override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object Validators {
if (buffer.isEmpty) {
NoopValidator
} else {
new ValidatorPipeline(buffer)
new ValidatorPipeline(buffer.toSeq)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object GlutenOptimization {
GlutenMetadataModel(),
GlutenPropertyModel(),
GlutenExplain,
RasRule.Factory.reuse(rules))
RasRule.Factory.reuse(rules.toSeq))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging {
rand.shuffle(hosts)
logOnLevel(logLevel, s"get host for $f: ${hosts.distinct.mkString(",")}")
}
hosts.distinct
hosts.distinct.toSeq
}

def updatePartitionMap(f: FilePartition, rddId: Int): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ object GlutenImplicits {
FallbackSummary(
totalNumGlutenNodes,
totalNumFallbackNodes,
totalPhysicalPlanDescription,
totalFallbackNodeToReason
totalPhysicalPlanDescription.toSeq,
totalFallbackNodeToReason.toSeq
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ShuffledColumnarBatchRDD(
}
}

override def clearDependencies() {
override def clearDependencies(): Unit = {
super.clearDependencies()
dependency = null
}
Expand Down
Loading

0 comments on commit 7b0caf4

Please sign in to comment.