Skip to content

Commit

Permalink
add:1.增加spark和es的示例
Browse files Browse the repository at this point in the history
change:1. 更新包名
  • Loading branch information
Kyofin committed Aug 30, 2019
1 parent 7396537 commit 3e359a9
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 15 deletions.
10 changes: 4 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,15 @@ sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("am
2. 直接在idea上运行相应的main方法。并在spark上下文设置master为local即可。

```
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
SparkConf conf = new SparkConf().setMaster("local").setAppName("com.wugui.SparkFlatMapJava");
```



### 提交作业到本机的spark环境

1.启动spark
```
~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar
```
![](https://raw.githubusercontent.com/huzekang/picbed/master/20190830154203.png)
可以观察到起来了一个master和worker进程。
![](https://raw.githubusercontent.com/huzekang/picbed/master/20190626112610.png)

Expand All @@ -145,7 +143,7 @@ SparkSession spark = SparkSession

3.使用`mvn clean package`打包好的作业,并提交到本地安装好的spark环境上跑
```
~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar
~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.com.wugui.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar
```
4.打开spark server界面,可以看到已经完成的spark作业。
Expand All @@ -169,7 +167,7 @@ export HADOOP_CONF_DIR=/Users/huzekang/opt/hadoop-cdh/hadoop-2.6.0-cdh5.14.2/et

3.使用`mvn clean package`打包好的作业,并提交到本地安装好的yarn环境上跑。
```
~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --master yarn --deploy-mode cluster --class "com.wugui.sparkstarter.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar
~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --master yarn --deploy-mode cluster --class "com.wugui.sparkstarter.com.wugui.SparkHiveNewVersion" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar
```
4.打开yarn观察到作业已经完成了。
![](https://raw.githubusercontent.com/huzekang/picbed/master/20190626133707.png)
26 changes: 25 additions & 1 deletion spark-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
<version>2.4.0</version>
</dependency>


<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-streaming_2.11</artifactId>-->
Expand All @@ -57,7 +58,11 @@
<!--</dependency>-->



<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.5.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand All @@ -81,4 +86,23 @@

</dependencies>

<build>
<plugins>
<!--将所有依赖都打到jar包中,要是有命令mvn assembly:assembly-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<!--这部分可有可无,加上的话则直接生成可运行jar包-->
<!--<archive>-->
<!--<manifest>-->
<!--<mainClass>${exec.mainClass}</mainClass>-->
<!--</manifest>-->
<!--</archive>-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.wugui.sparkstarter;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -9,7 +11,7 @@
public class SparkFlatMapJava {

public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
SparkConf conf = new SparkConf().setMaster("local").setAppName("com.wugui.SparkFlatMapJava");
JavaSparkContext sc = new JavaSparkContext(conf);


Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.wugui.sparkstarter;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.wugui.sparkstarter;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.wugui.sparkstarter;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -8,7 +10,7 @@
public class SparkMapJava {

public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
SparkConf conf = new SparkConf().setMaster("local").setAppName("com.wugui.SparkFlatMapJava");
JavaSparkContext sc = new JavaSparkContext(conf);

// java实现
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.wugui.sparkstarter;

import lombok.Data;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
package com.wugui.sparkstarter;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -14,7 +16,7 @@ public class SparkStarter {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setMaster("local[5]")
.setAppName("SparkStarter");
.setAppName("com.wugui.SparkStarter");
//之后你用的是Rdd
JavaSparkContext sc = new JavaSparkContext(sparkConf);
// Should be some file on remote hdfs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package demo1;
package com.wugui.sparkstarter.demo1;

import lombok.AllArgsConstructor;
import lombok.Data;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package demo1;
package com.wugui.sparkstarter.demo1;

import lombok.Data;
import scala.math.Ordered;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package demo1;
package com.wugui.sparkstarter.demo1;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package demo1;
package com.wugui.sparkstarter.demo1;

import java.sql.Connection;
import java.sql.DriverManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package demo1;
package com.wugui.sparkstarter.demo1;

import java.io.FileNotFoundException;
import java.io.FileOutputStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.wugui.sparkstarter.es;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;

import java.util.Map;

/**
* spark 结合elasticsearch 例子
* 参考资料:https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-native
*/
public class EsSparkTest {

public static void main(String[] args) {
// new EsSparkTest().writeEs();
// new EsSparkTest().readEs();
new EsSparkTest().writeBeanEs();
}

/**
* 以map方式存入es
*/
public void writeEs() {
String elasticIndex = "spark/docs";
SparkConf sparkConf = new SparkConf()
.setAppName("writeEs")
.setMaster("local[*]")
.set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.25")
.set("es.port", "9200")
.set("es.nodes.wan.only", "true");

SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2);
Map<String, ?> airports = ImmutableMap.of("city", "广州", "airportName", "广州白云机场");
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports));
JavaEsSpark.saveToEs(javaRDD, elasticIndex);
}

/**
* 以对象存入es
*/
public void writeBeanEs() {
SparkConf sparkConf = new SparkConf()
.setAppName("writeEs")
.setMaster("local[*]")
.set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.25")
.set("es.port", "9200")
.set("es.nodes.wan.only", "true");
SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter


TripBean upcoming = new TripBean("广州白云机场", "昆明机场");
TripBean lastWeek = new TripBean("昆明机场", "广州白云机场");

JavaRDD<TripBean> javaRDD = jsc.parallelize(
ImmutableList.of(upcoming, lastWeek));
JavaEsSpark.saveToEs(javaRDD, "spark/docs");
}

public void readEs() {
SparkConf sparkConf = new SparkConf()
.setAppName("writeEs")
.setMaster("local[*]")
.set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.25")
.set("es.port", "9200")
.set("es.nodes.wan.only", "true");

SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter
JavaRDD<Map<String, Object>> searchRdd = JavaEsSpark.esRDD(jsc, "spark/docs", "?q=广州").values();
for (Map<String, Object> item : searchRdd.collect()) {
item.forEach((key, value) -> System.out.println("search key:" + key + ", search value:" + value));
}
sparkSession.stop();
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.wugui.sparkstarter.es;

import java.io.Serializable;

public class TripBean implements Serializable {
private String departure, arrival;

public TripBean(String departure, String arrival) {
setDeparture(departure);
setArrival(arrival);
}

public TripBean() {
}

public String getDeparture() {
return departure;
}

public String getArrival() {
return arrival;
}

public void setDeparture(String dep) {
departure = dep;
}

public void setArrival(String arr) {
arrival = arr;
}
}

0 comments on commit 3e359a9

Please sign in to comment.