Skip to content

Commit 7e98240

Browse files
Merge pull request #99 from pradeepmaripala/spark-3.1
Added Cache Argument Method for 3.1
2 parents 7fb6a7d + 73b3a70 commit 7e98240

File tree

6 files changed

+56
-26
lines changed

6 files changed

+56
-26
lines changed

README.md

+21-12
Original file line numberDiff line numberDiff line change
@@ -60,29 +60,32 @@ The Almaren Framework provides a simplified consistent minimalistic layer over A
6060
To add Almaren Framework dependency to your sbt build:
6161

6262
```
63-
libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.10-3.1"
63+
libraryDependencies += "com.github.music-of-the-ainur" %% "almaren-framework" % "0.9.11-3.1"
6464
```
6565

6666
To run in spark-shell:
6767

6868
```
69-
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.1"
69+
spark-shell --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.1"
7070
```
7171

7272
Almaren Connector is available in
7373
[Maven Central](https://mvnrepository.com/artifact/com.github.music-of-the-ainur)
7474
repository.
7575

76-
| version | Connector Artifact |
77-
|----------------------------|------------------------------------------------------------------|
78-
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.4` |
79-
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.4` |
80-
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.10-3.3` |
81-
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.3` |
82-
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.2` |
83-
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.1` |
84-
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-2.4` |
85-
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.10-2.4` |
76+
| version | Connector Artifact |
77+
|----------------------------|-------------------------------------------------------------------|
78+
| Spark 3.5.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.5` |
79+
| Spark 3.5.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.5` |
80+
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.4` |
81+
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.4` |
82+
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:almaren-framework_2.13:0.9.11-3.3` |
83+
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.3` |
84+
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.2` |
85+
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-3.1` |
86+
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:almaren-framework_2.12:0.9.11-2.4` |
87+
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:almaren-framework_2.11:0.9.11-2.4` |
88+
8689

8790
### Batch Example
8891
```scala
@@ -352,6 +355,12 @@ Cache/Uncache both DataFrame or Table
352355
cache(true)
353356
```
354357

358+
Cache Dataframe with Storage Level
359+
360+
```scala
361+
cache(true,storageLevel = MEMORY_AND_DISK)
362+
```
363+
355364
#### Coalesce
356365

357366
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

build.sbt

+4-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ lazy val scala212 = "2.12.10"
66
crossScalaVersions := Seq(scala212)
77
ThisBuild / scalaVersion := scala212
88

9-
val sparkVersion = "3.1.2"
9+
val sparkVersion = "3.1.3"
1010
val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r
1111

1212
val majorVersionReg(majorVersion) = sparkVersion
@@ -18,10 +18,10 @@ libraryDependencies ++= Seq(
1818
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
1919
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "provided" excludeAll(ExclusionRule(organization = "net.jpountz.lz4")),
2020
"org.apache.spark" %% "spark-avro" % sparkVersion,
21-
"com.databricks" %% "spark-xml" % "0.14.0",
21+
"com.databricks" %% "spark-xml" % "0.17.0",
2222
"com.github.music-of-the-ainur" %% "quenya-dsl" % s"1.2.3-${majorVersion}",
23-
"org.scalatest" %% "scalatest" % "3.2.14" % "test",
24-
"org.postgresql" % "postgresql" % "42.2.8" % "test"
23+
"org.scalatest" %% "scalatest" % "3.2.17" % "test",
24+
"org.postgresql" % "postgresql" % "42.6.0" % "test"
2525
)
2626

2727
enablePlugins(GitVersioning)

project/build.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.7.1
1+
sbt.version=1.9.6

src/main/scala/com/github/music/of/the/ainur/almaren/builder/core/Main.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import com.github.music.of.the.ainur.almaren.Tree
44
import com.github.music.of.the.ainur.almaren.builder.Core
55
import com.github.music.of.the.ainur.almaren.state.core._
66
import org.apache.spark.sql.Column
7+
import org.apache.spark.storage.StorageLevel
78

89
private[almaren] trait Main extends Core {
910
def sql(sql: String): Option[Tree] =
@@ -12,8 +13,8 @@ private[almaren] trait Main extends Core {
1213
def alias(alias:String): Option[Tree] =
1314
Alias(alias)
1415

15-
def cache(opType:Boolean = true,tableName:Option[String] = None): Option[Tree] =
16-
Cache(opType, tableName)
16+
def cache(opType:Boolean = true,tableName:Option[String] = None,storageLevel: Option[StorageLevel] = None): Option[Tree] =
17+
Cache(opType, tableName = tableName, storageLevel = storageLevel)
1718

1819
def coalesce(size:Int): Option[Tree] =
1920
Coalesce(size)

src/main/scala/com/github/music/of/the/ainur/almaren/state/core/Main.scala

+14-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.github.music.of.the.ainur.almaren.state.core
33
import com.github.music.of.the.ainur.almaren.State
44
import com.github.music.of.the.ainur.almaren.util.Constants
55
import org.apache.spark.sql.{Column, DataFrame}
6+
import org.apache.spark.storage.StorageLevel
67

78
private[almaren] abstract class Main extends State {
89
override def executor(df: DataFrame): DataFrame = core(df)
@@ -81,22 +82,28 @@ case class Alias(alias:String) extends Main {
8182
}
8283
}
8384

84-
case class Cache(opType:Boolean = true,tableName:Option[String] = None) extends Main {
85+
case class Cache(opType: Boolean = true, tableName: Option[String] = None, storageLevel: Option[StorageLevel] = None) extends Main {
8586
override def core(df: DataFrame): DataFrame = cache(df)
8687
def cache(df: DataFrame): DataFrame = {
87-
logger.info(s"opType:{$opType}, tableName{$tableName}")
88+
logger.info(s"opType:{$opType}, tableName:{$tableName}, StorageType:{$storageLevel}")
8889
tableName match {
89-
case Some(t) => cacheTable(df,t)
90-
case None => cacheDf(df)
90+
case Some(t) => cacheTable(df, t)
91+
case None => cacheDf(df, storageLevel)
9192
}
9293
df
9394
}
94-
private def cacheDf(df:DataFrame): Unit = opType match {
95-
case true => df.persist()
95+
private def cacheDf(df: DataFrame, storageLevel: Option[StorageLevel]): Unit = opType match {
96+
case true => {
97+
storageLevel match {
98+
case Some(value) => df.persist(value)
99+
case None => df.persist()
100+
}
101+
}
96102
case false => df.unpersist()
97103

98104
}
99-
private def cacheTable(df:DataFrame,tableName: String): Unit =
105+
106+
private def cacheTable(df: DataFrame, tableName: String): Unit =
100107
opType match {
101108
case true => df.sqlContext.cacheTable(tableName)
102109
case false => df.sqlContext.uncacheTable(tableName)

src/test/scala/com/github/music/of/the/ainur/almaren/Test.scala

+13
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.apache.spark.sql.avro._
99
import java.io.File
1010
import scala.collection.immutable._
1111
import org.scalatest.funsuite.AnyFunSuite
12+
import org.apache.spark.storage.StorageLevel._
1213

1314
class Test extends AnyFunSuite with BeforeAndAfter {
1415
val almaren = Almaren("App Test")
@@ -381,6 +382,18 @@ class Test extends AnyFunSuite with BeforeAndAfter {
381382
assert(bool_cache)
382383
}
383384

385+
val testCacheDfStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true,storageLevel = Some(MEMORY_ONLY)).batch
386+
val bool_cache_storage = testCacheDfStorage.storageLevel.useMemory
387+
test("Testing Cache Memory Storage") {
388+
assert(bool_cache_storage)
389+
}
390+
391+
val testCacheDfDiskStorage: DataFrame = almaren.builder.sourceSql("select * from cache_test").cache(true, storageLevel = Some(DISK_ONLY)).batch
392+
val bool_cache_disk_storage = testCacheDfDiskStorage.storageLevel.useDisk
393+
test("Testing Cache Disk Storage") {
394+
assert(bool_cache_disk_storage)
395+
}
396+
384397
val testUnCacheDf = almaren.builder.sourceSql("select * from cache_test").cache(false).batch
385398
val bool_uncache = testUnCacheDf.storageLevel.useMemory
386399
test("Testing Uncache") {

0 commit comments

Comments
 (0)