From 6c18ca7eddd98fe67c9bf1a820a1fb2b001ef490 Mon Sep 17 00:00:00 2001 From: Joe Martin Date: Wed, 15 Jul 2020 21:15:00 -0700 Subject: [PATCH 1/4] refresh of examples to be runnable again. simplify sbt etc. --- aerojoin-example/build.sbt | 36 ++++++++ .../com/aerospike/spark/aeroJoinExample.scala | 86 +++++++++++++++++++ build.sbt | 3 +- 3 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 aerojoin-example/build.sbt create mode 100644 aerojoin-example/src/main/scala/com/aerospike/spark/aeroJoinExample.scala diff --git a/aerojoin-example/build.sbt b/aerojoin-example/build.sbt new file mode 100644 index 0000000..e24bacb --- /dev/null +++ b/aerojoin-example/build.sbt @@ -0,0 +1,36 @@ +import scala.util.Properties + +name := "joexample" + +version := "0.1" + +scalaVersion := "2.12.11" +def sysPropOrDefault(propName:String,default:String):String = Option(System.getProperty(propName)).getOrElse(default) + +val sparkVer = sysPropOrDefault("version","2.4.4") +val hadoopVer = "2.7.3" +val asClientVer = "4.4.13" +val asSparkConnectorVer = "2.3.0" +val sparkHome = Properties.envOrElse("SPARK_HOME", "/opt/spark") + +//libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" % "provided" +//libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.4" % "provided" + +libraryDependencies ++= Seq( + "org.apache.spark" %% "spark-core" % sparkVer % "provided", + "org.apache.spark" %% "spark-sql" % sparkVer , + "org.apache.hadoop" % "hadoop-common" % hadoopVer , + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVer % Provided, + "com.aerospike" %% "aerospike-spark" % "provided" from s"file://${sparkHome}/jars/aerospike-spark-assembly-${asSparkConnectorVer}.jar", + "com.aerospike" % "aerospike-client" % asClientVer, + +) +// +//resolvers ++= Seq("Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository") +//resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository" +//resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) +////resolvers += "Party" at "file://" + Path.userHome.absolutePath + "/jars" +//resolvers += Resolver.url("my-test-repo", url("file://" + Path.userHome.absolutePath + "/jars"))( Patterns("/[artifact].[ext]") ) +resolvers ++= Seq("Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository") +resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository" +publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository"))) diff --git a/aerojoin-example/src/main/scala/com/aerospike/spark/aeroJoinExample.scala b/aerojoin-example/src/main/scala/com/aerospike/spark/aeroJoinExample.scala new file mode 100644 index 0000000..2230a07 --- /dev/null +++ b/aerojoin-example/src/main/scala/com/aerospike/spark/aeroJoinExample.scala @@ -0,0 +1,86 @@ +package com.aerospike.spark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.{Row, SaveMode, SparkSession} + +/** + * This example will load some data into a "Business" namespace in an aerospike database. + * It will then use aeroJoin to take a sequence of ids and load the appropriate customer data, filter it, and print out the result. + * + * Prereqs: A working Aerospike Connect for Spark and an Aerospike server running on default port on localhost with at least 1 namespace named "Business" + */ +object aeroJoinExample extends Serializable { + val conf: SparkConf = new SparkConf() + .setAppName("AeroJoin") + .set("aerospike.seedhost", "localhost") + .set("aerospike.port", "3000") + .set("aerospike.namespace", "test") + + val session: SparkSession = SparkSession.builder() + .config(conf) + .master("local[*]") + .appName("Aerospike Example(II)") + .config("spark.ui.enabled", "false") + .getOrCreate() + + def main(args: Array[String]) { + import session.implicits._ + + loadCustomerData() + + val ids = Seq("customer1", "customer2", + "customer3", "customer4", "IDontExist") + + val customerIdsDF = ids.toDF("customer_id").as[CustomerID] + + val customerDS = customerIdsDF.aeroJoin[CustomerKV]("customer_id", "Customers") + customerDS.foreach(b => println(b)) + + val bestCustomers = customerDS.filter(customer => customer.stars > 4) + bestCustomers.foreach(b => println(b)) + + bestCustomers.map(c => new Customer(c.key, c.customer_id, c.first, c.last, c.stars)).toDF("key", "customer_id", "last", "first", "stars"). + write.mode(SaveMode.Overwrite). + format("com.aerospike.spark.sql"). + option("aerospike.updateByKey", "customer_id"). + option("aerospike.set", "BestCustomers"). + save() + session.stop() + } + + def loadCustomerData(): Unit = { + + val schema: StructType = new StructType(Array( + StructField("key", StringType, nullable = true), + StructField("customer_id", StringType, nullable = false), + StructField("last", StringType, nullable = true), + StructField("first", StringType, nullable = true), + StructField("stars", IntegerType, nullable = true) + )) + + val rows = Seq( + Row("Fraser_Malcolm", "customer1", "Fraser", "Malcolm", 5), + Row("Hawke_Bob", "customer2", "Hawke", "Bob", 4), + Row("Keating_Paul", "customer3", "Keating", "Paul", 1), + Row("Im_Nothere", "secretcustomer", "Nothere", "Im", 5), + Row("Howard_John", "customer4", "Howard", "John", 1) + ) + + val customerRDD = session.sparkContext.parallelize(rows) + val customerDF = session.createDataFrame(customerRDD, schema) + + customerDF.write. + mode(SaveMode.Overwrite). + format("com.aerospike.spark.sql"). + option("aerospike.updateByKey", "customer_id"). + option("aerospike.set", "Customers"). + save() + } +} + +case class CustomerID(customer_id: String) + +case class Customer(key: String, customer_id: String, first: String, last: String, stars: Long) + +case class CustomerKV(__key: Any, key: String, customer_id: String, first: String, last: String, stars: Long) extends AeroKV diff --git a/build.sbt b/build.sbt index 260caec..293096a 100755 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,3 @@ -import sbtassembly.MergeStrategy._ import scala.util.Properties name := "spark-aerojoin-example" @@ -13,7 +12,7 @@ crossScalaVersions := Seq("2.10.6", "2.11.8", "2.12.0") scalaVersion := "2.11.8" -val sparkHome = Properties.envOrElse("RAF_HOME", "/opt/spark") +val sparkHome = Properties.envOrElse("SPARK_HOME", "/opt/spark") javacOptions ++= Seq("-source", "1.7", "-target", "1.7") From 50cabc52476afc1e955a3841a819fbe9593f1aab Mon Sep 17 00:00:00 2001 From: Joe Martin Date: Wed, 15 Jul 2020 22:29:16 -0700 Subject: [PATCH 2/4] removing old non-working code --- build.sbt | 85 ------------------ project/assembly.sbt | 1 - .../com/aerospike/spark/aeroJoinExample.scala | 87 ------------------- 3 files changed, 173 deletions(-) delete mode 100755 build.sbt delete mode 100644 project/assembly.sbt delete mode 100644 src/main/scala/com/aerospike/spark/aeroJoinExample.scala diff --git a/build.sbt b/build.sbt deleted file mode 100755 index 293096a..0000000 --- a/build.sbt +++ /dev/null @@ -1,85 +0,0 @@ -import scala.util.Properties - -name := "spark-aerojoin-example" - -version := "1.0" - -organization := "com.aerospike" -mainClass in (Compile, run) := Some("com.aerospike.spark.aeroJoinExample") -mainClass in assembly := Some("com.aerospike.spark.aeroJoinExample") - -crossScalaVersions := Seq("2.10.6", "2.11.8", "2.12.0") - -scalaVersion := "2.11.8" - -val sparkHome = Properties.envOrElse("SPARK_HOME", "/opt/spark") - -javacOptions ++= Seq("-source", "1.7", "-target", "1.7") - -parallelExecution in test := false - -fork in test := true - -libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-core" % "2.2.0", - "org.apache.spark" %% "spark-sql" % "2.2.0", - "org.apache.spark" %% "spark-mllib" % "2.2.0", - "org.apache.spark" %% "spark-streaming" % "2.2.0", - "com.ning" % "async-http-client" % "1.9.10", - - "com.databricks" %% "spark-csv" % "1.5.0", - "com.aerospike" %% "aerospike-spark" % "provided" from s"file://${sparkHome}/jars/aerospike-spark-assembly-1.0.0.jar", - "com.aerospike" % "aerospike-helper-java" % "1.2.2", - "com.aerospike" % "aerospike-client" % "4.0.8", - "com.typesafe.scala-logging" %% "scala-logging-slf4j" % "2.1.2", - "org.scalatest" %% "scalatest" % "2.2.1" % Test, - "joda-time" % "joda-time" % "2.9.9" % Test -) - -resolvers ++= Seq("Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository") -resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository" -publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository"))) - -cancelable in Global := true - -assemblyMergeStrategy in assembly := { - case x if Assembly.isConfigFile(x) => - MergeStrategy.concat - case PathList(ps @ _*) if Assembly.isReadme(ps.last) || Assembly.isLicenseFile(ps.last) => - MergeStrategy.rename - case PathList("META-INF", "maven","com.aerospike","aerospike-client", "pom.properties") => - MergeStrategy.discard - case PathList("META-INF", "maven","com.aerospike","aerospike-client", "pom.xml") => - MergeStrategy.discard - case PathList("META-INF", "maven","org.slf4j","slf4j-api", "pom.xml") => - MergeStrategy.discard - case PathList("META-INF", "maven","com.fasterxml.jackson.core","jackson-annotations", "pom.xml") => - MergeStrategy.discard - case PathList("META-INF", "maven","com.fasterxml.jackson.core","jackson-core", "pom.xml") => - MergeStrategy.discard - case PathList("META-INF", "maven","com.fasterxml.jackson.core","jackson-databind", "pom.xml") => - MergeStrategy.discard - case PathList("META-INF", "maven","commons-logging","commons-logging", "pom.xml") => - MergeStrategy.discard - case PathList("META-INF", "maven","com.google.guava","guava", "pom.xml") => - MergeStrategy.discard - case PathList("META-INF", "maven","jline","jline", "pom.xml") => - MergeStrategy.discard - case PathList(ps @ _*) if ps.last endsWith "pom.properties" => - MergeStrategy.discard - case PathList("META-INF", xs @ _*) => - xs.map(_.toLowerCase) match { - case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => - MergeStrategy.discard - case ps @ (x :: _) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa") => - MergeStrategy.discard - case "plexus" :: _ => - MergeStrategy.discard - case "services" :: _ => - MergeStrategy.filterDistinctLines - case ("spring.schemas" :: Nil) | ("spring.handlers" :: Nil) => - MergeStrategy.filterDistinctLines - case _ => MergeStrategy.deduplicate - } - case _ => MergeStrategy.first -} diff --git a/project/assembly.sbt b/project/assembly.sbt deleted file mode 100644 index d6540b8..0000000 --- a/project/assembly.sbt +++ /dev/null @@ -1 +0,0 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.8") diff --git a/src/main/scala/com/aerospike/spark/aeroJoinExample.scala b/src/main/scala/com/aerospike/spark/aeroJoinExample.scala deleted file mode 100644 index 99d0490..0000000 --- a/src/main/scala/com/aerospike/spark/aeroJoinExample.scala +++ /dev/null @@ -1,87 +0,0 @@ -package com.aerospike.spark - -import com.typesafe.scalalogging.slf4j.LazyLogging -import org.apache.spark.SparkConf -import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} -import org.apache.spark.sql.{Row, SaveMode, SparkSession} - -/** - * This example will load some data into a "Business" namespace in an aerospike database. - * It will then use aeroJoin to take a sequence of ids and load the appropriate customer data, filter it, and print out the result. - * - * Prereqs: A working Aerospike Connect for Spark and an Aerospike server running on default port on localhost with at least 1 namespace named "Business" - */ -object aeroJoinExample extends LazyLogging with Serializable { - val conf: SparkConf = new SparkConf() - .setAppName("AeroJoin") - .set("aerospike.seedhost", "localhost") - .set("aerospike.port", "3000") - .set("aerospike.namespace", "Business") - - val session: SparkSession = SparkSession.builder() - .config(conf) - .master("local[*]") - .appName("Aerospike Example(II)") - .config("spark.ui.enabled", "false") - .getOrCreate() - - def main(args: Array[String]) { - import session.implicits._ - - loadCustomerData() - - val ids = Seq("customer1", "customer2", - "customer3", "customer4", "IDontExist") - - val customerIdsDF = ids.toDF("customer_id").as[CustomerID] - - val customerDS = customerIdsDF.aeroJoin[CustomerKV]("customer_id", "Customers") - customerDS.foreach(b => println(b)) - - val bestCustomers = customerDS.filter(customer => customer.stars > 4) - bestCustomers.foreach(b => println(b)) - - bestCustomers.map(c => new Customer(c.key, c.customer_id, c.first, c.last, c.stars)).toDF("key", "customer_id", "last", "first", "stars"). - write.mode(SaveMode.Overwrite). - format("com.aerospike.spark.sql"). - option("aerospike.updateByKey", "customer_id"). - option("aerospike.set", "BestCustomers"). - save() - session.stop() - } - - def loadCustomerData(): Unit = { - - val schema: StructType = new StructType(Array( - StructField("key", StringType, nullable = true), - StructField("customer_id", StringType, nullable = false), - StructField("last", StringType, nullable = true), - StructField("first", StringType, nullable = true), - StructField("stars", IntegerType, nullable = true) - )) - - val rows = Seq( - Row("Fraser_Malcolm", "customer1", "Fraser", "Malcolm", 5), - Row("Hawke_Bob", "customer2", "Hawke", "Bob", 4), - Row("Keating_Paul", "customer3", "Keating", "Paul", 1), - Row("Im_Nothere", "secretcustomer", "Nothere", "Im", 5), - Row("Howard_John", "customer4", "Howard", "John", 1) - ) - - val customerRDD = session.sparkContext.parallelize(rows) - val customerDF = session.createDataFrame(customerRDD, schema) - - customerDF.write. - mode(SaveMode.Overwrite). - format("com.aerospike.spark.sql"). - option("aerospike.updateByKey", "customer_id"). - option("aerospike.set", "Customers"). - save() - } -} - -case class CustomerID(customer_id: String) - -case class Customer(key: String, customer_id: String, first: String, last: String, stars: Long) - -case class CustomerKV(__key: Any, key: String, customer_id: String, first: String, last: String, stars: Long) extends AeroKV From c2ae7bd895dc4f272562f5fe6e18c5a64ee6c9c3 Mon Sep 17 00:00:00 2001 From: Joe Martin Date: Wed, 15 Jul 2020 22:34:17 -0700 Subject: [PATCH 3/4] removed joexample names --- aerojoin-example/build.sbt | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/aerojoin-example/build.sbt b/aerojoin-example/build.sbt index e24bacb..1095f28 100644 --- a/aerojoin-example/build.sbt +++ b/aerojoin-example/build.sbt @@ -1,6 +1,6 @@ import scala.util.Properties -name := "joexample" +name := "aerojoin-example" version := "0.1" @@ -13,8 +13,6 @@ val asClientVer = "4.4.13" val asSparkConnectorVer = "2.3.0" val sparkHome = Properties.envOrElse("SPARK_HOME", "/opt/spark") -//libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" % "provided" -//libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.4" % "provided" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVer % "provided", @@ -25,12 +23,6 @@ libraryDependencies ++= Seq( "com.aerospike" % "aerospike-client" % asClientVer, ) -// -//resolvers ++= Seq("Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository") -//resolvers += "Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository" -//resolvers += Resolver.url("bintray-sbt-plugins", url("http://dl.bintray.com/sbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) -////resolvers += "Party" at "file://" + Path.userHome.absolutePath + "/jars" -//resolvers += Resolver.url("my-test-repo", url("file://" + Path.userHome.absolutePath + "/jars"))( Patterns("/[artifact].[ext]") ) resolvers ++= Seq("Local Maven" at Path.userHome.asFile.toURI.toURL + ".m2/repository") resolvers += "Local Maven Repository" at "file://"+Path.userHome.absolutePath+"/.m2/repository" publishTo := Some(Resolver.file("file", new File(Path.userHome.absolutePath+"/.m2/repository"))) From 8f5fb774fd7cec1fdc9a17ef1d50f999c3ded691 Mon Sep 17 00:00:00 2001 From: Joe Martin Date: Wed, 15 Jul 2020 22:36:20 -0700 Subject: [PATCH 4/4] more cleanup --- aerojoin-example/build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aerojoin-example/build.sbt b/aerojoin-example/build.sbt index 1095f28..2896545 100644 --- a/aerojoin-example/build.sbt +++ b/aerojoin-example/build.sbt @@ -2,7 +2,7 @@ import scala.util.Properties name := "aerojoin-example" -version := "0.1" +version := "1.2.0" scalaVersion := "2.12.11" def sysPropOrDefault(propName:String,default:String):String = Option(System.getProperty(propName)).getOrElse(default)