Skip to content

Commit c2e0502

Browse files
Merge pull request #101 from pradeepmaripala/spark-3.3
Added Cache Argument Method for Spark 3.3
2 parents d1d5deb + 827f76d commit c2e0502

File tree

6 files changed

+56
-27
lines changed

6 files changed

+56
-27
lines changed

README.md

+21-13
Original file line numberDiff line numberDiff line change
@@ -61,33 +61,35 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A
6161
To add Almaren Framework dependency to your sbt build:
6262

6363
```
64-
libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.10-3.3"
64+
libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.11-3.3"
6565
```
6666

6767
To run in spark-shell:
6868
For scala version-2.12:
6969
```
70-
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.3"
70+
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.3"
7171
```
7272
For scala version-2.13:
7373
```
74-
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.3"
74+
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.3"
7575
```
7676

7777
Almaren connector is available in
7878
[Maven Central](https://mvnrepository.com/artifact/com.github.music-of-the-ainur)
7979
repository.
8080

81-
| version | Connector Artifact |
82-
|----------------------------|------------------------------------------------------------------|
83-
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.4` |
84-
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.4` |
85-
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.3` |
86-
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.3` |
87-
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.2` |
88-
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.1` |
89-
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-2.4` |
90-
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.10-2.4` |
81+
| version | Connector Artifact |
82+
|----------------------------|-------------------------------------------------------------------|
83+
| Spark 3.5.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.5` |
84+
| Spark 3.5.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.5` |
85+
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.4` |
86+
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.4` |
87+
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.3` |
88+
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.3` |
89+
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.2` |
90+
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.1` |
91+
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-2.4` |
92+
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.11-2.4` |
9193

9294
### Batch Example
9395
```scala
@@ -357,6 +359,12 @@ Cache/Uncache both DataFrame or Table
357359
cache(true)
358360
```
359361

362+
Cache Dataframe with Storage Level
363+
364+
```scala
365+
cache(true,storageLevel = MEMORY_AND_DISK)
366+
```
367+
360368
#### Coalesce
361369

362370
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

build.sbt

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ lazy val scala213 = "2.13.9"
77
crossScalaVersions := Seq(scala212,scala213)
88
ThisBuild / scalaVersion := scala213
99

10-
val sparkVersion = "3.3.0"
10+
val sparkVersion = "3.3.3"
1111
val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r
1212

1313
val majorVersionReg(majorVersion) = sparkVersion
@@ -19,10 +19,10 @@ libraryDependencies ++= Seq(
1919
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
2020
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided",
2121
"org.apache.spark" %% "spark-avro" % sparkVersion,
22-
"com.databricks" %% "spark-xml" % "0.14.0",
22+
"com.databricks" %% "spark-xml" % "0.17.0",
2323
"com.github.music-of-the-ainur" %% "quenya-dsl" % s"1.2.3-${majorVersion}",
24-
"org.scalatest" %% "scalatest" % "3.2.14" % "test",
25-
"org.postgresql" % "postgresql" % "42.2.8" % "test"
24+
"org.scalatest" %% "scalatest" % "3.2.17" % "test",
25+
"org.postgresql" % "postgresql" % "42.6.0" % "test"
2626
)
2727

2828
enablePlugins(GitVersioning)

project/build.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.7.1
1+
sbt.version=1.9.6

src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Main.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import com.github.music.of.the.ainur.almaren.Tree
44
import com.github.music.of.the.ainur.almaren.builder.Core
55
import com.github.music.of.the.ainur.almaren.state.core._
66
import org.apache.spark.sql.Column
7+
import org.apache.spark.storage.StorageLevel
78

89
private[almaren] trait Main extends Core {
910
def sql(sql: String): Option[Tree] =
@@ -12,8 +13,8 @@ private[almaren] trait Main extends Core {
1213
def alias(alias:String): Option[Tree] =
1314
Alias(alias)
1415

15-
def cache(opType:Boolean = true,tableName:Option[String] = None): Option[Tree] =
16-
Cache(opType, tableName)
16+
def cache(opType:Boolean = true,tableName:Option[String] = None,storageLevel: Option[StorageLevel] = None): Option[Tree] =
17+
Cache(opType, tableName = tableName, storageLevel = storageLevel)
1718

1819
def coalesce(size:Int): Option[Tree] =
1920
Coalesce(size)

src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala

+14-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.github.music.of.the.ainur.almaren.state.core
33
import com.github.music.of.the.ainur.almaren.State
44
import com.github.music.of.the.ainur.almaren.util.Constants
55
import org.apache.spark.sql.{Column, DataFrame}
6+
import org.apache.spark.storage.StorageLevel
67

78
private[almaren] abstract class Main extends State {
89
override def executor(df: DataFrame): DataFrame = core(df)
@@ -81,22 +82,28 @@ case class Alias(alias:String) extends Main {
8182
}
8283
}
8384

84-
case class Cache(opType:Boolean = true,tableName:Option[String] = None) extends Main {
85+
case class Cache(opType: Boolean = true, tableName: Option[String] = None, storageLevel: Option[StorageLevel] = None) extends Main {
8586
override def core(df: DataFrame): DataFrame = cache(df)
87+
8688
def cache(df: DataFrame): DataFrame = {
87-
logger.info(s"opType:{$opType}, tableName{$tableName}")
89+
logger.info(s"opType:{$opType}, tableName:{$tableName}, StorageType:{$storageLevel}")
8890
tableName match {
89-
case Some(t) => cacheTable(df,t)
90-
case None => cacheDf(df)
91+
case Some(t) => cacheTable(df, t)
92+
case None => cacheDf(df, storageLevel)
9193
}
9294
df
9395
}
94-
private def cacheDf(df:DataFrame): Unit = opType match {
95-
case true => df.persist()
96+
private def cacheDf(df: DataFrame, storageLevel: Option[StorageLevel]): Unit = opType match {
97+
case true => {
98+
storageLevel match {
99+
case Some(value) => df.persist(value)
100+
case None => df.persist()
101+
}
102+
}
96103
case false => df.unpersist()
97104

98105
}
99-
private def cacheTable(df:DataFrame,tableName: String): Unit =
106+
private def cacheTable(df: DataFrame, tableName: String): Unit =
100107
opType match {
101108
case true => df.sqlContext.cacheTable(tableName)
102109
case false => df.sqlContext.uncacheTable(tableName)

src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala

+13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.scalatest.funsuite.AnyFunSuite
1010

1111
import java.io.File
1212
import scala.collection.immutable._
13+
import org.apache.spark.storage.StorageLevel._
1314

1415
class Test extends AnyFunSuite with BeforeAndAfter {
1516
val almaren = Almaren("App Test")
@@ -383,6 +384,18 @@ class Test extends AnyFunSuite with BeforeAndAfter {
383384
assert(bool_cache)
384385
}
385386

387+
val testCacheDfStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true,storageLevel = Some(MEMORY_ONLY)).batch
388+
val bool_cache_storage = testCacheDfStorage.storageLevel.useMemory
389+
test("Testing Cache Memory Storage") {
390+
assert(bool_cache_storage)
391+
}
392+
393+
val testCacheDfDiskStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true, storageLevel = Some(DISK_ONLY)).batch
394+
val bool_cache_disk_storage = testCacheDfDiskStorage.storageLevel.useDisk
395+
test("Testing Cache Disk Storage") {
396+
assert(bool_cache_disk_storage)
397+
}
398+
386399
val testUnCacheDf = almaren.builder.sourceSql("select * from cache_test").cache(false).batch
387400
val bool_uncache = testUnCacheDf.storageLevel.useMemory
388401
test("Testing Uncache") {

0 commit comments

Comments
 (0)