Skip to content

Commit

Permalink
add: 1.新增spark连接hbase原生示例
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyofin committed Aug 30, 2019
1 parent 3e359a9 commit 65ef143
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 1 deletion.
50 changes: 49 additions & 1 deletion spark-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@
<!--<version>2.1.3</version>-->
<!--</dependency>-->


<!--es-->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.5.1</version>
</dependency>


<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand All @@ -76,6 +78,52 @@
<version>42.2.3.jre7</version>
</dependency>

<!--hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.4</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>hbase-client</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
<exclusions>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
</exclusions>
</dependency>


<!--<dependency>-->
<!--<groupId>org.apache.hadoop</groupId>-->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.wugui.sparkstarter.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.SparkSession;
import scala.Tuple2;

import java.io.IOException;

/**
* spark 原生连接 hbase
* 参考资料:http://dblab.xmu.edu.cn/blog/1094-2/
*
* @program: bigdata-starter
* @author: huzekang
* @create: 2019-08-30 16:58
**/
public class SparkHbaseRdd {

public static void main(String[] args) throws IOException {
String tableName = "FileTable";
SparkSession sc = SparkSession.builder().appName("SparkHBaseRDD").master("local[1]").getOrCreate();
Configuration hbaseConf = HBaseConfiguration.create();
//设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
hbaseConf.set("hbase.zookeeper.quorum", "cdh01");
//设置zookeeper连接端口,默认2181
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181");
hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName);


// 如果表不存在,则创建表
HBaseAdmin admin = new HBaseAdmin(hbaseConf);
if (!admin.isTableAvailable(tableName)) {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
admin.createTable(tableDesc);
}
//读取数据并转化成rdd
RDD<Tuple2<ImmutableBytesWritable, Result>> hBaseRDD = sc.sparkContext()
.newAPIHadoopRDD(hbaseConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

JavaRDD<Tuple2<ImmutableBytesWritable, Result>> hBaseJavaRdd = hBaseRDD.toJavaRDD();
hBaseJavaRdd.foreach(v1 -> {
//获取行键
String key = new String(v1._2.getRow());
//通过列族和列名获取列
String name = new String(v1._2.getValue("fileInfo".getBytes(), "name".getBytes()));
String type = new String(v1._2.getValue("fileInfo".getBytes(), "type".getBytes()));
System.out.println("Row key:" + key + "\tfileInfo.Name:" + name + "\tfileInfo.type:" + type);
});

admin.close();

sc.stop();
}
}

0 comments on commit 65ef143

Please sign in to comment.