Skip to content

Commit

Permalink
优化代码
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyofin committed Jun 22, 2019
1 parent 054ba26 commit 9a36e2e
Showing 1 changed file with 45 additions and 22 deletions.
67 changes: 45 additions & 22 deletions src/main/java/com/wugui/sparkstarter/SparkSqlJava.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.List;

/**
*
* @program: spark-starter
* @author: huzekang
* @create: 2019-06-20 21:46
Expand All @@ -27,9 +28,11 @@ public static void main(String[] args) {
.config("spark.some.config.option", "some-value")
.getOrCreate();

// quickStart(spark);

code(spark);
// code(spark);

reflection(spark);

}

Expand All @@ -43,6 +46,8 @@ public static void quickStart(SparkSession spark) {


/**
*
* 文件 => JavaRDD => DataFrame
* 使用反射机制推断RDD的数据结构 :
*   当spark应用可以推断RDD数据结构时,可使用这种方式。这种基于反射的方法可以使代码更简洁有效。
*/
Expand All @@ -65,7 +70,17 @@ public static void reflection(SparkSession spark) {
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
Dataset<Row> teenagersDF = spark.sql("SELECT name ,age FROM people WHERE age BETWEEN 13 AND 19");
teenagersDF.show();
// +------+---+
// | name|age|
// +------+---+
// |Justin| 19|
// +------+---+

// 写出df到指定文件
// 参考例子:![](https://i.loli.net/2019/06/22/5d0e1eac239d177427.png)
teenagersDF.select("age").write().mode(SaveMode.Append).format("parquet").save("tmp/person_age");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Expand Down Expand Up @@ -97,50 +112,58 @@ public static void reflection(SparkSession spark) {
*   当spark应用无法推断RDD数据结构时,可使用这种方式。
*/
public static void code(SparkSession spark) {
// Create an RDD
// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
.textFile("/Users/huzekang/study/spark-starter/src/main/resources/people.txt", 1)
.toJavaRDD();

// The schema is encoded in a string
// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
String[] attributes = record.split(",");
// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) line -> {
String[] attributes = line.split(",");
return RowFactory.create(attributes[0], attributes[1].trim());
});

// Apply the schema to the RDD
// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name ,age FROM people");
results.show();
// +-------+---+
// | name|age|
// +-------+---+
// |Michael| 29|
// | Andy| 30|
// | Justin| 19|
// +-------+---+

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+
}

@Data
Expand Down

0 comments on commit 9a36e2e

Please sign in to comment.