From 3e359a930b01379975a022b0ba3c119ec5d65f3c Mon Sep 17 00:00:00 2001 From: huzekang <1040080742@qq.com> Date: Fri, 30 Aug 2019 16:34:07 +0800 Subject: [PATCH] =?UTF-8?q?add:1.=E5=A2=9E=E5=8A=A0spark=E5=92=8Ces?= =?UTF-8?q?=E7=9A=84=E7=A4=BA=E4=BE=8B=20change:1.=20=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E5=8C=85=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 10 +-- spark-starter/pom.xml | 26 +++++- .../wugui/sparkstarter}/SparkFlatMapJava.java | 4 +- .../sparkstarter}/SparkHiveNewVersion.java | 2 + .../sparkstarter}/SparkHiveOldVersion.java | 2 + .../wugui/sparkstarter}/SparkMapJava.java | 4 +- .../sparkstarter}/SparkSessionStarter.java | 2 + .../wugui/sparkstarter}/SparkStarter.java | 4 +- .../sparkstarter}/demo1/AccessLogInfo.java | 2 +- .../sparkstarter}/demo1/AccessLogSortKey.java | 2 +- .../demo1/AppLogSparkApplication.java | 2 +- .../wugui/sparkstarter}/demo1/DBHelper.java | 2 +- .../wugui/sparkstarter}/demo1/MockData.java | 2 +- .../wugui/sparkstarter/es/EsSparkTest.java | 89 +++++++++++++++++++ .../com/wugui/sparkstarter/es/TripBean.java | 31 +++++++ 15 files changed, 169 insertions(+), 15 deletions(-) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/SparkFlatMapJava.java (96%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/SparkHiveNewVersion.java (99%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/SparkHiveOldVersion.java (98%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/SparkMapJava.java (97%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/SparkSessionStarter.java (99%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/SparkStarter.java (93%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/demo1/AccessLogInfo.java (89%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/demo1/AccessLogSortKey.java (98%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/demo1/AppLogSparkApplication.java (99%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/demo1/DBHelper.java (95%) rename spark-starter/src/main/java/{ => com/wugui/sparkstarter}/demo1/MockData.java (97%) create mode 100644 spark-starter/src/main/java/com/wugui/sparkstarter/es/EsSparkTest.java create mode 100644 spark-starter/src/main/java/com/wugui/sparkstarter/es/TripBean.java diff --git a/README.md b/README.md index b600e03..bd95ab8 100644 --- a/README.md +++ b/README.md @@ -122,7 +122,7 @@ sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("am 2. 直接在idea上运行相应的main方法。并在spark上下文设置master为local即可。 ``` - SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); + SparkConf conf = new SparkConf().setMaster("local").setAppName("com.wugui.SparkFlatMapJava"); ``` @@ -130,9 +130,7 @@ sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("am ### 提交作业到本机的spark环境 1.启动spark -``` -~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar -``` +![](https://raw.githubusercontent.com/huzekang/picbed/master/20190830154203.png) 可以观察到起来了一个master和worker进程。 ![](https://raw.githubusercontent.com/huzekang/picbed/master/20190626112610.png) @@ -145,7 +143,7 @@ SparkSession spark = SparkSession 3.使用`mvn clean package`打包好的作业,并提交到本地安装好的spark环境上跑 ``` -~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar +~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.com.wugui.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar ``` 4.打开spark server界面,可以看到已经完成的spark作业。 @@ -169,7 +167,7 @@ export HADOOP_CONF_DIR=/Users/huzekang/opt/hadoop-cdh/hadoop-2.6.0-cdh5.14.2/et 3.使用`mvn clean package`打包好的作业,并提交到本地安装好的yarn环境上跑。 ``` -~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --master yarn --deploy-mode cluster --class "com.wugui.sparkstarter.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar +~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --master yarn --deploy-mode cluster --class "com.wugui.sparkstarter.com.wugui.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar ``` 4.打开yarn观察到作业已经完成了。 ![](https://raw.githubusercontent.com/huzekang/picbed/master/20190626133707.png) \ No newline at end of file diff --git a/spark-starter/pom.xml b/spark-starter/pom.xml index 95619e6..4074f1d 100644 --- a/spark-starter/pom.xml +++ b/spark-starter/pom.xml @@ -44,6 +44,7 @@ 2.4.0 + @@ -57,7 +58,11 @@ - + + org.elasticsearch + elasticsearch-spark-20_2.11 + 5.5.1 + mysql mysql-connector-java @@ -81,4 +86,23 @@ + + + + + maven-assembly-plugin + + + + + + + + + jar-with-dependencies + + + + + \ No newline at end of file diff --git a/spark-starter/src/main/java/SparkFlatMapJava.java b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkFlatMapJava.java similarity index 96% rename from spark-starter/src/main/java/SparkFlatMapJava.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/SparkFlatMapJava.java index 09c89ec..4a52b70 100644 --- a/spark-starter/src/main/java/SparkFlatMapJava.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkFlatMapJava.java @@ -1,3 +1,5 @@ +package com.wugui.sparkstarter; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -9,7 +11,7 @@ public class SparkFlatMapJava { public static void main(String[] args){ - SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); + SparkConf conf = new SparkConf().setMaster("local").setAppName("com.wugui.SparkFlatMapJava"); JavaSparkContext sc = new JavaSparkContext(conf); diff --git a/spark-starter/src/main/java/SparkHiveNewVersion.java b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkHiveNewVersion.java similarity index 99% rename from spark-starter/src/main/java/SparkHiveNewVersion.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/SparkHiveNewVersion.java index 84a168e..bdcfcde 100644 --- a/spark-starter/src/main/java/SparkHiveNewVersion.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkHiveNewVersion.java @@ -1,3 +1,5 @@ +package com.wugui.sparkstarter; + import org.apache.spark.sql.Dataset; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; diff --git a/spark-starter/src/main/java/SparkHiveOldVersion.java b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkHiveOldVersion.java similarity index 98% rename from spark-starter/src/main/java/SparkHiveOldVersion.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/SparkHiveOldVersion.java index fab01f0..d3144d0 100644 --- a/spark-starter/src/main/java/SparkHiveOldVersion.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkHiveOldVersion.java @@ -1,3 +1,5 @@ +package com.wugui.sparkstarter; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; diff --git a/spark-starter/src/main/java/SparkMapJava.java b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkMapJava.java similarity index 97% rename from spark-starter/src/main/java/SparkMapJava.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/SparkMapJava.java index 9c48373..e763e50 100644 --- a/spark-starter/src/main/java/SparkMapJava.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkMapJava.java @@ -1,3 +1,5 @@ +package com.wugui.sparkstarter; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -8,7 +10,7 @@ public class SparkMapJava { public static void main(String[] args){ - SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); + SparkConf conf = new SparkConf().setMaster("local").setAppName("com.wugui.SparkFlatMapJava"); JavaSparkContext sc = new JavaSparkContext(conf); // java实现 diff --git a/spark-starter/src/main/java/SparkSessionStarter.java b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkSessionStarter.java similarity index 99% rename from spark-starter/src/main/java/SparkSessionStarter.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/SparkSessionStarter.java index c264434..f62ab50 100644 --- a/spark-starter/src/main/java/SparkSessionStarter.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkSessionStarter.java @@ -1,3 +1,5 @@ +package com.wugui.sparkstarter; + import lombok.Data; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; diff --git a/spark-starter/src/main/java/SparkStarter.java b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkStarter.java similarity index 93% rename from spark-starter/src/main/java/SparkStarter.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/SparkStarter.java index 26c8eeb..cde467f 100644 --- a/spark-starter/src/main/java/SparkStarter.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/SparkStarter.java @@ -1,3 +1,5 @@ +package com.wugui.sparkstarter; + import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -14,7 +16,7 @@ public class SparkStarter { public static void main(String[] args) { SparkConf sparkConf = new SparkConf() .setMaster("local[5]") - .setAppName("SparkStarter"); + .setAppName("com.wugui.SparkStarter"); //之后你用的是Rdd JavaSparkContext sc = new JavaSparkContext(sparkConf); // Should be some file on remote hdfs diff --git a/spark-starter/src/main/java/demo1/AccessLogInfo.java b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AccessLogInfo.java similarity index 89% rename from spark-starter/src/main/java/demo1/AccessLogInfo.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AccessLogInfo.java index c28d337..a154cf2 100644 --- a/spark-starter/src/main/java/demo1/AccessLogInfo.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AccessLogInfo.java @@ -1,4 +1,4 @@ -package demo1; +package com.wugui.sparkstarter.demo1; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/spark-starter/src/main/java/demo1/AccessLogSortKey.java b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AccessLogSortKey.java similarity index 98% rename from spark-starter/src/main/java/demo1/AccessLogSortKey.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AccessLogSortKey.java index 8b4e91c..f3425e5 100644 --- a/spark-starter/src/main/java/demo1/AccessLogSortKey.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AccessLogSortKey.java @@ -1,4 +1,4 @@ -package demo1; +package com.wugui.sparkstarter.demo1; import lombok.Data; import scala.math.Ordered; diff --git a/spark-starter/src/main/java/demo1/AppLogSparkApplication.java b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AppLogSparkApplication.java similarity index 99% rename from spark-starter/src/main/java/demo1/AppLogSparkApplication.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AppLogSparkApplication.java index bd86ead..b1fbe2f 100644 --- a/spark-starter/src/main/java/demo1/AppLogSparkApplication.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/AppLogSparkApplication.java @@ -1,4 +1,4 @@ -package demo1; +package com.wugui.sparkstarter.demo1; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; diff --git a/spark-starter/src/main/java/demo1/DBHelper.java b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/DBHelper.java similarity index 95% rename from spark-starter/src/main/java/demo1/DBHelper.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/demo1/DBHelper.java index c9dee88..7c1a556 100644 --- a/spark-starter/src/main/java/demo1/DBHelper.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/DBHelper.java @@ -1,4 +1,4 @@ -package demo1; +package com.wugui.sparkstarter.demo1; import java.sql.Connection; import java.sql.DriverManager; diff --git a/spark-starter/src/main/java/demo1/MockData.java b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/MockData.java similarity index 97% rename from spark-starter/src/main/java/demo1/MockData.java rename to spark-starter/src/main/java/com/wugui/sparkstarter/demo1/MockData.java index 286b821..2fb003f 100644 --- a/spark-starter/src/main/java/demo1/MockData.java +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/demo1/MockData.java @@ -1,4 +1,4 @@ -package demo1; +package com.wugui.sparkstarter.demo1; import java.io.FileNotFoundException; import java.io.FileOutputStream; diff --git a/spark-starter/src/main/java/com/wugui/sparkstarter/es/EsSparkTest.java b/spark-starter/src/main/java/com/wugui/sparkstarter/es/EsSparkTest.java new file mode 100644 index 0000000..ab3a8f8 --- /dev/null +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/es/EsSparkTest.java @@ -0,0 +1,89 @@ +package com.wugui.sparkstarter.es; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; +import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; + +import java.util.Map; + +/** + * spark 结合elasticsearch 例子 + * 参考资料:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-native + */ +public class EsSparkTest { + + public static void main(String[] args) { +// new EsSparkTest().writeEs(); +// new EsSparkTest().readEs(); + new EsSparkTest().writeBeanEs(); + } + + /** + * 以map方式存入es + */ + public void writeEs() { + String elasticIndex = "spark/docs"; + SparkConf sparkConf = new SparkConf() + .setAppName("writeEs") + .setMaster("local[*]") + .set("es.index.auto.create", "true") + .set("es.nodes", "192.168.1.25") + .set("es.port", "9200") + .set("es.nodes.wan.only", "true"); + + SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); + JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter + Map numbers = ImmutableMap.of("one", 1, "two", 2); + Map airports = ImmutableMap.of("city", "广州", "airportName", "广州白云机场"); + JavaRDD> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); + JavaEsSpark.saveToEs(javaRDD, elasticIndex); + } + + /** + * 以对象存入es + */ + public void writeBeanEs() { + SparkConf sparkConf = new SparkConf() + .setAppName("writeEs") + .setMaster("local[*]") + .set("es.index.auto.create", "true") + .set("es.nodes", "192.168.1.25") + .set("es.port", "9200") + .set("es.nodes.wan.only", "true"); + SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); + JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter + + + TripBean upcoming = new TripBean("广州白云机场", "昆明机场"); + TripBean lastWeek = new TripBean("昆明机场", "广州白云机场"); + + JavaRDD javaRDD = jsc.parallelize( + ImmutableList.of(upcoming, lastWeek)); + JavaEsSpark.saveToEs(javaRDD, "spark/docs"); + } + + public void readEs() { + SparkConf sparkConf = new SparkConf() + .setAppName("writeEs") + .setMaster("local[*]") + .set("es.index.auto.create", "true") + .set("es.nodes", "192.168.1.25") + .set("es.port", "9200") + .set("es.nodes.wan.only", "true"); + + SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); + JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter + JavaRDD> searchRdd = JavaEsSpark.esRDD(jsc, "spark/docs", "?q=广州").values(); + for (Map item : searchRdd.collect()) { + item.forEach((key, value) -> System.out.println("search key:" + key + ", search value:" + value)); + } + sparkSession.stop(); + } + + + +} \ No newline at end of file diff --git a/spark-starter/src/main/java/com/wugui/sparkstarter/es/TripBean.java b/spark-starter/src/main/java/com/wugui/sparkstarter/es/TripBean.java new file mode 100644 index 0000000..2ccd231 --- /dev/null +++ b/spark-starter/src/main/java/com/wugui/sparkstarter/es/TripBean.java @@ -0,0 +1,31 @@ +package com.wugui.sparkstarter.es; + +import java.io.Serializable; + +public class TripBean implements Serializable { + private String departure, arrival; + + public TripBean(String departure, String arrival) { + setDeparture(departure); + setArrival(arrival); + } + + public TripBean() { + } + + public String getDeparture() { + return departure; + } + + public String getArrival() { + return arrival; + } + + public void setDeparture(String dep) { + departure = dep; + } + + public void setArrival(String arr) { + arrival = arr; + } + } \ No newline at end of file