Skip to content

Commit

Permalink
add: 1.实战例子 2.官方的数据 3.搜集资料
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyofin committed Jun 20, 2019
1 parent 9c57653 commit 054ba26
Show file tree
Hide file tree
Showing 17 changed files with 7,057 additions and 4 deletions.
46 changes: 43 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,53 @@
## 基本原理
RDD,弹性分布式数据集,即分布式的元素集合。
在spark中,对所有数据的操作不外乎是创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化。
Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。
用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序中分发驱动器程序中的对象集合,比如list或者set。
RDD的转化操作都是惰性求值的,这意味着我们对RDD调用转化操作,操作不会立即执行。相反,Spark会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD看做存放着特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。数据读取到RDD中的操作也是惰性的,数据只会在必要时读取。转化操作和读取操作都有可能多次执行。

### 代码样例

## 代码样例
Spark入门(四)--Spark的map、flatMap、mapToPair:
https://juejin.im/post/5c77e383f265da2d8f474e29#heading-9

### 本地测试和提交作业
官方例子:
https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples

## 常用api
1. map(func):对每行数据使用func,然后返回一个新的RDD,数据处理-每行。
2. filter(func):对每行数据使用func,然后返回func后为true的数据,用于过滤。
3. flatMap(func):和map差不多,但是flatMap生成的是多个结果,用于行转列。
4. groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoop中reduce函数接受的
key-valuelist
5. reduceByKey(func,[numTasks]):就是用一个给定的reduce func再作用在groupByKey产
生的(K,Seq[V]),比如求和,求平均数
6. sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending


## 本地测试和提交作业
参考:https://blog.csdn.net/dream_an/article/details/54915894

- idea上测试spark作业

- 提交作业到本地spark
引入依赖
```
<dependency>
<!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<!--不加上会出现org/glassfish/jersey/server/spi/Container not found-->
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-server</artifactId>
<version>2.0-m03</version>
</dependency>
```

- 提交作业到本机的spark环境

将使用`mvn clean package`打包好的作业提交到本地安装好的spark上跑
```
~/opt/spark-2.4.0-bin-hadoop2.7 » bin/spark-submit --class "com.wugui.sparkstarter.SimpleApp" /Users/huzekang/study/spark-starter/target/spark-starter-1.0-SNAPSHOT.jar
Expand Down
50 changes: 49 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency> <!-- Spark dependency -->
<!-- Spark 核心 dependency -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
Expand All @@ -29,6 +30,53 @@
<version>2.0-m03</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>

<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-hive_2.11</artifactId>-->
<!--<version>2.4.0</version>-->
<!--</dependency>-->

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>

<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-streaming_2.11</artifactId>-->
<!--<version>2.1.3</version>-->
<!--</dependency>-->

<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-mllib_2.11</artifactId>-->
<!--<version>2.1.3</version>-->
<!--</dependency>-->



<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>


<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
<!--<artifactId>hadoop-client</artifactId>-->
<!--<version>2.8.4</version>-->
<!--</dependency>-->



</dependencies>

</project>
153 changes: 153 additions & 0 deletions src/main/java/com/wugui/sparkstarter/SparkSqlJava.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package com.wugui.sparkstarter;


import lombok.Data;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.List;

/**
* @program: spark-starter
* @author: huzekang
* @create: 2019-06-20 21:46
**/
public class SparkSqlJava {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.master("local")
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();


code(spark);


}

/**
* 快速测试例子
*/
public static void quickStart(SparkSession spark) {
Dataset<Row> df = spark.read().text("/Users/huzekang/study/spark-starter/src/main/resources/students.txt");
df.show();
}


/**
* 使用反射机制推断RDD的数据结构 :
*   当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简洁有效。
*/
public static void reflection(SparkSession spark) {
// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
.textFile("/Users/huzekang/study/spark-starter/src/main/resources/people.txt")
.javaRDD()
.map(line -> {
String[] parts = line.split(",");
Person person = new Person();
person.setName(parts[0]);
person.setAge(Integer.parseInt(parts[1].trim()));
return person;
});

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
}


/**
* 通过编程接口构造一个数据结构,然后映射到RDD上
*   当spark应用无法推断RDD数据结构时,可使用这种方式。
*/
public static void code(SparkSession spark) {
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("/Users/huzekang/study/spark-starter/src/main/resources/people.txt", 1)
.toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
}

@Data
public static class Person {

private String name;
private Integer age;

}
}
28 changes: 28 additions & 0 deletions src/main/java/com/wugui/sparkstarter/SparkStarter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.wugui.sparkstarter;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

/**
* @program: spark-starter
* @author: huzekang
* @create: 2019-06-20 21:28
**/
public class SparkStarter {

public static void main(String[] args) {
SparkConf sparkConf = new SparkConf()
.setMaster("local[5]")
.setAppName("SparkStarter");
//之后你用的是Rdd
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.setLogLevel("ERROR");

JavaRDD<String> stringJavaRDD = sc.textFile("/Users/huzekang/study/spark-starter/src/main/resources/students.txt");

stringJavaRDD.foreach(o -> System.out.println(o));


}
}
21 changes: 21 additions & 0 deletions src/main/java/com/wugui/sparkstarter/demo1/AccessLogInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.wugui.sparkstarter.demo1;

import lombok.AllArgsConstructor;
import lombok.Data;

import java.io.Serializable;


//log日志的javaBean
@Data
@AllArgsConstructor
public class AccessLogInfo implements Serializable {

private static final long serivaVersionUID = 1L;
private long Timestamp;
private long upTraffic;
private long downTraffic;



}
Loading

0 comments on commit 054ba26

Please sign in to comment.