diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..25df0ac --- /dev/null +++ b/.gitignore @@ -0,0 +1,93 @@ +# Created by .ignore support plugin (hsz.mobi) +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 +.idea +*.iml +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/dictionaries +.idea/**/shelf + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# CMake +cmake-build-debug/ +cmake-build-release/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests +### Maven template +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties + +# Avoid ignoring Maven wrapper jar file (.jar files are usually ignored) +!/.mvn/wrapper/maven-wrapper.jar +### Java template +# Compiled class file +*.class + +# Log file +*.log + +# BlueJ files +*.ctxt + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.nar +*.ear +*.zip +*.tar.gz +*.rar + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* + diff --git a/README.md b/README.md new file mode 100644 index 0000000..0315be2 --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ + +### 代码样例 +https://juejin.im/post/5c77e383f265da2d8f474e29#heading-9 + +### 本地测试和提交作业 +参考:https://blog.csdn.net/dream_an/article/details/54915894 + +- idea上测试spark作业 + +- 提交作业到本地spark +将使用`mvn clean package`打包好的作业提交到本地安装好的spark上跑 +``` +~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.SimpleApp" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar + +``` +![](https://raw.githubusercontent.com/huzekang/picbed/master/20190620155332.png) + +- 提交作业到yarn \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..b91f7a7 --- /dev/null +++ b/pom.xml @@ -0,0 +1,34 @@ + + + 4.0.0 + + com.wugui + spark-starter + 1.0-SNAPSHOT + + + UTF-8 + UTF-8 + 1.8 + 1.8 + 1.8 + + + + org.apache.spark + spark-core_2.11 + 2.4.0 + + + + + org.glassfish.jersey.core + jersey-server + 2.0-m03 + + + + + \ No newline at end of file diff --git a/src/main/java/com/wugui/sparkstarter/SimpleApp.java b/src/main/java/com/wugui/sparkstarter/SimpleApp.java new file mode 100644 index 0000000..54d8b20 --- /dev/null +++ b/src/main/java/com/wugui/sparkstarter/SimpleApp.java @@ -0,0 +1,27 @@ +package com.wugui.sparkstarter; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +public class SimpleApp { + public static void main(String[] args) { + // Should be some file on your system + String logFile = "file:///Users/huzekang/study/spark-starter/src/main/resources/students.txt"; + SparkConf conf = new SparkConf().setMaster("local").setAppName("Simple Application"); + JavaSparkContext sc = new JavaSparkContext(conf); + // 减少日志输出 + sc.setLogLevel("ERROR"); + + + JavaRDD logData = sc.textFile(logFile).cache(); + + long numAs = logData.filter(s -> s.contains("a")).count(); + + long numBs = logData.filter( s -> s.contains("b")).count(); + + System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); + + sc.stop(); + } +} diff --git a/src/main/java/com/wugui/sparkstarter/SparkFlatMapJava.java b/src/main/java/com/wugui/sparkstarter/SparkFlatMapJava.java new file mode 100644 index 0000000..3d7bdd8 --- /dev/null +++ b/src/main/java/com/wugui/sparkstarter/SparkFlatMapJava.java @@ -0,0 +1,70 @@ +package com.wugui.sparkstarter; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; + +import java.io.File; +import java.util.Arrays; +import java.util.Iterator; + +public class SparkFlatMapJava { + + public static void main(String[] args){ + SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); + JavaSparkContext sc = new JavaSparkContext(conf); + // 减少日志输出 + sc.setLogLevel("ERROR"); + + //java实现 + flatMapJava(sc); + + + //java8实现 + flatMapJava8(sc); + + + } + + public static void flatMapJava(JavaSparkContext sc){ + //设置数据路径 + JavaRDD textData = sc.textFile("/Users/huzekang/study/spark-starter/src/main/resources/students.txt"); + + //输出处理前总行数 + System.out.println("before:"+textData.count()+"行"); + + //输出处理前第一行数据 + System.out.println("first line:"+textData.first()+"行"); + + //进行flatMap处理 + JavaRDD flatData = textData.flatMap(new FlatMapFunction() { + @Override + public Iterator call(String s) throws Exception { + return Arrays.asList(s.split(" ")).iterator(); + } + }); + + //输出处理后总行数 + System.out.println("after:"+flatData.count()+"行"); + + //输出处理后第一行数据 + System.out.println("first line:"+flatData.first()+"行"); + + + String outPutPath = "./flatResultJava"; + //将结果保存在flatResultScala文件夹中 + flatData.saveAsTextFile(outPutPath); + } + + + public static void flatMapJava8(JavaSparkContext sc){ + String outPutPath = "./flatMapJava8"; + + + sc.textFile("/Users/huzekang/study/spark-starter/src/main/resources/students.txt") + .flatMap(line -> Arrays.asList(line.split(" ")).iterator()) + .saveAsTextFile(outPutPath); + } + +} diff --git a/src/main/java/com/wugui/sparkstarter/SparkMapJava.java b/src/main/java/com/wugui/sparkstarter/SparkMapJava.java new file mode 100644 index 0000000..04d74b1 --- /dev/null +++ b/src/main/java/com/wugui/sparkstarter/SparkMapJava.java @@ -0,0 +1,84 @@ +package com.wugui.sparkstarter; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import scala.Tuple2; +import scala.Tuple3; + +public class SparkMapJava { + + public static void main(String[] args){ + SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava"); + JavaSparkContext sc = new JavaSparkContext(conf); + // 减少日志输出 + sc.setLogLevel("ERROR"); + + // java实现 + mapJava(sc); + + //java8实现 + mapJava8(sc); + + + } + + public static void mapJava(JavaSparkContext sc){ + JavaRDD txtData = sc.textFile("/Users/huzekang/study/spark-starter/src/main/resources/students.txt"); + + //保留最后一个值 + JavaRDD mapData1 = txtData.map(new Function() { + @Override + public String call(String s) throws Exception { + return s.split(" ")[2]; + } + }); + + System.out.println(mapData1.count()); + System.out.println(mapData1.first()); + + //保留最后两个值 + JavaRDD> mapData2 = txtData.map(new Function>() { + @Override + public Tuple2 call(String s) throws Exception { + return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]); + } + }); + + System.out.println(mapData2.count()); + System.out.println(mapData2.first()); + + //保留最后三个值 + JavaRDD> mapData3 = txtData.map(new Function>() { + @Override + public Tuple3 call(String s) throws Exception { + return new Tuple3<>(s.split(" ")[0],s.split(" ")[1],s.split(" ")[2]); + } + }); + + System.out.println(mapData2.count()); + System.out.println(mapData2.first()); + + + } + + + public static void mapJava8(JavaSparkContext sc){ + String path = "/Users/huzekang/study/spark-starter/src/main/resources/students.txt"; + JavaRDD mapData1 = sc.textFile(path).map(line -> line.split(" ")[0]); + System.out.println("count: "+mapData1.count()); + System.out.println("first word:"+mapData1.first()); + + JavaRDD> mapData2 = sc.textFile(path).map(line -> new Tuple2(line.split(" ")[1],line.split(" ")[2])); + System.out.println(mapData2.count()); + System.out.println(mapData2.first()); + + JavaRDD> mapData3 = sc.textFile(path).map(line -> new Tuple3(line.split(" ")[0],line.split(" ")[1],line.split(" ")[2])); + System.out.println(mapData3.count()); + System.out.println(mapData3.first()); + + } + +} + diff --git a/src/main/resources/students.txt b/src/main/resources/students.txt new file mode 100644 index 0000000..cc91311 --- /dev/null +++ b/src/main/resources/students.txt @@ -0,0 +1,5 @@ +俄洛伊 20 150.0 +盖伦 23 130.0 +赵信 19 111.0 +爆炸人 25 88.0 +黑寡妇 18 87.2 \ No newline at end of file