From 5514052bec6df48c9f333d3b2d2e0a330ce375ea Mon Sep 17 00:00:00 2001 From: huzekang <1040080742@qq.com> Date: Sun, 23 Jun 2019 02:33:02 +0800 Subject: [PATCH] =?UTF-8?q?change:=201.=E6=9B=B4=E6=96=B0readme=E5=8A=A0?= =?UTF-8?q?=E5=85=A5SparkSql=E7=9B=B8=E5=85=B3=E7=AC=94=E8=AE=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 63 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index c7bebe7..c8a0fda 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,13 @@ ## 基本原理 -RDD,弹性分布式数据集,即分布式的元素集合。 +### RDD,弹性分布式数据集,即分布式的元素集合。 在spark中,对所有数据的操作不外乎是创建RDD、转化已有的RDD以及调用RDD操作进行求值。在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化。 Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象。 用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序中分发驱动器程序中的对象集合,比如list或者set。 RDD的转化操作都是惰性求值的,这意味着我们对RDD调用转化操作,操作不会立即执行。相反,Spark会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD看做存放着特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。数据读取到RDD中的操作也是惰性的,数据只会在必要时读取。转化操作和读取操作都有可能多次执行。 - +### Dataframe/Dataset API简介 +Dataframe/Dataset也是分布式数据集,但与RDD不同的是其带有schema信息,类似一张表。 +可以用下面一张图详细对比Dataset/dataframe和rdd的区别: +![](https://pic3.zhimg.com/80/v2-2224e315ac70f1ad22c238c9b5798ade_hd.jpg) ## 代码样例 Spark入门(四)--Spark的map、flatMap、mapToPair: @@ -23,7 +26,49 @@ key-valuelist 生的(K,Seq[V]),比如求和,求平均数 6. sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending +## SparkSql 使用 +可以参考:https://zhuanlan.zhihu.com/p/45729547 + + * 不使用hive元数据: + * SparkSession spark = SparkSession.builder().getOrCreate() + * 使用hive元数据 + * SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate() + +### 基本操作 + +``` +val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”) +df.show() +import spark.implicits._ +df.printSchema() +df.select("name").show() +df.select($"name", $"age" + 1).show() +df.filter($"age" > 21).show() +df.groupBy("age").count().show() +spark.stop() +``` + +### 分区分桶 排序 + +``` +分桶排序保存hive表 +df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”) +分区以parquet输出到指定目录 +df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") +分区分桶保存到hive表 +df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed") +``` + +### cube rullup pivot +``` +cube +sales.cube("city", "year”).agg(sum("amount")as "amount”) .show() +rull up +sales.rollup("city", "year”).agg(sum("amount")as "amount”).show() +pivot 只能跟在groupby之后 +sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show() +``` ## 本地测试和提交作业 参考:https://blog.csdn.net/dream_an/article/details/54915894 @@ -31,8 +76,8 @@ key-valuelist 引入依赖 ``` - + org.apache.spark spark-core_2.11 2.4.0 @@ -44,6 +89,18 @@ key-valuelist jersey-server 2.0-m03 + + + org.apache.spark + spark-hive_2.11 + 2.4.0 + + + + org.apache.spark + spark-sql_2.11 + 2.4.0 + ``` - 提交作业到本机的spark环境