Skip to content

Commit

Permalink
change: 1.修改phoenix工具类测试 2.精简同步作业
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyofin committed Oct 14, 2019
1 parent 22c0189 commit 0179167
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 57 deletions.
11 changes: 11 additions & 0 deletions hbase-starter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@
<scope>test</scope>
</dependency>

<!--hikari数据源-->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions hbase-starter/src/test/java/ColumnInfoVO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* @program: bigdata-starter
* @author: huzekang
* @create: 2019-10-12 11:10
**/
@Data
@AllArgsConstructor
public class ColumnInfoVO {
private String displayColumnName;
private int sqlType;
private String sqlTypeName;

}
56 changes: 48 additions & 8 deletions hbase-starter/src/test/java/PhoenixUtilTest.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,50 @@
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.lang.StringUtils;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Test;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* @program: bigdata-starter
* @author: huzekang
* @create: 2019-10-11 15:09
**/
public class PhoenixUtilTest {
private DataSource getHikariDataSource() {
HikariConfig jdbcConfig = new HikariConfig();
jdbcConfig.setPoolName(getClass().getName());
jdbcConfig.setDriverClassName("org.apache.phoenix.jdbc.PhoenixDriver");
jdbcConfig.setJdbcUrl("jdbc:phoenix:cdh01:2181");
jdbcConfig.setMaximumPoolSize(10);
jdbcConfig.setMaxLifetime(1000);
jdbcConfig.setConnectionTimeout(5000);
jdbcConfig.setIdleTimeout(2000);

return new HikariDataSource(jdbcConfig);
}

private Connection getConnection() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
// 写上zookeeper地址
Connection connection = DriverManager.getConnection("jdbc:phoenix:cdh01:2181");
return connection;
// Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
// // 写上zookeeper地址
// Connection connection = DriverManager.getConnection("jdbc:phoenix:cdh01:2181");
DataSource dataSource = getHikariDataSource();

return dataSource.getConnection();
}

@Test
public void checkConnection() throws ClassNotFoundException, SQLException {

Connection connection = getConnection();
if (Objects.nonNull(connection)) {
try {
Expand Down Expand Up @@ -74,15 +93,36 @@ public void getTableList() throws SQLException, ClassNotFoundException {

}

/**
* 获取主键
* @throws SQLException
* @throws ClassNotFoundException
*/
@Test
public void getPrimaryKey() throws SQLException, ClassNotFoundException {
Connection connection = getConnection();
final String schema = "HZK_SCHEMA";
final String tableName = "HZK_SCHEMA";
ResultSet rs = connection.getMetaData().getPrimaryKeys(null, schema, tableName);
final String tableName = "TESTTABLE";
ResultSet rs = connection.prepareStatement(String.format("SELECT COLUMN_NAME,DATA_TYPE,NULLABLE,ORDINAL_POSITION,KEY_SEQ from SYSTEM.CATALOG where TABLE_SCHEM = '%s' AND TABLE_NAME = '%s' AND KEY_SEQ = 1", schema,tableName)).executeQuery();
while (rs.next()) {
System.out.println(rs.getString(1));
}

}

/**
* 获取指定表的列信息
* @throws SQLException
* @throws ClassNotFoundException
*/
@Test
public void getColumnInfoFromTable() throws SQLException, ClassNotFoundException {
Connection connection = getConnection();
final String schema = "HZK_SCHEMA";
final String tableName = "TESTTABLE";
List<ColumnInfo> columnInfos = SchemaUtil.generateColumnInfo(connection, schema + QueryConstants.NAME_SEPARATOR + tableName, null, true);
List<ColumnInfoVO> columnInfoVOS = columnInfos.stream().map(e -> new ColumnInfoVO(e.getDisplayName(), e.getSqlType(), e.getPDataType().getSqlTypeName())).collect(Collectors.toList());
System.out.println(columnInfoVOS);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static void main(String[] args) throws AnalysisException {
.master("local")
.getOrCreate();

readOutPatientRecordFromHbase(sparkSession);
// readOutPatientRecordFromHbase(sparkSession);

readInPatientRecordFromHbase(sparkSession);

Expand Down Expand Up @@ -96,7 +96,7 @@ private static void readInPatientRecordFromHbase(SparkSession sparkSession) thro
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"hospitalized_case_index\"")
.load();
hospitalizedCaseIndex.createOrReplaceTempView("hospitalized_case_index");
Expand All @@ -106,7 +106,7 @@ private static void readInPatientRecordFromHbase(SparkSession sparkSession) thro
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"patient_basic_information\"")
.load();
patientBasicInformation.createOrReplaceTempView("patient_basic_information");
Expand All @@ -116,7 +116,7 @@ private static void readInPatientRecordFromHbase(SparkSession sparkSession) thro
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"inspection_record\"")
.load();
inspectionRecord.createOrReplaceTempView("inspection_record");
Expand All @@ -126,7 +126,7 @@ private static void readInPatientRecordFromHbase(SparkSession sparkSession) thro
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"assay_record\"")
.load();
assayRecord.createOrReplaceTempView("assay_record");
Expand All @@ -136,39 +136,18 @@ private static void readInPatientRecordFromHbase(SparkSession sparkSession) thro
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"consultation_note\"")
.load();
consultationNote.createOrReplaceTempView("consultation_note");
}

private static void readOutPatientRecordFromHbase(SparkSession sparkSession) throws AnalysisException {
// 使用phoenix jdbc连接驱动读取数据
Dataset<Row> sanitationNegligenceRecord = sparkSession.read()
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("dbtable", "\"gzhonghui\".\"sanitation_negligence_record\"")
.load();
sanitationNegligenceRecord.createOrReplaceTempView("sanitation_negligence_record");


Dataset<Row> patientBasicInformation = sparkSession.read()
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("dbtable", "\"gzhonghui\".\"patient_basic_information\"")
.load();
patientBasicInformation.createOrReplaceTempView("patient_basic_information");


Dataset<Row> outpatientRecord = sparkSession.read()
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"outpatient_record\"")
.load();
outpatientRecord.createOrReplaceTempView("outpatient_record");
Expand All @@ -178,39 +157,29 @@ private static void readOutPatientRecordFromHbase(SparkSession sparkSession) thr
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"emergency_observation_record\"")
.load();
emergencyObservationRecord.createOrReplaceTempView("emergency_observation_record");


Dataset<Row> inspectionRecord = sparkSession.read()
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("dbtable", "\"gzhonghui\".\"inspection_record\"")
.load();
inspectionRecord.createOrReplaceTempView("inspection_record");


Dataset<Row> assayRecord = sparkSession.read()
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("dbtable", "\"gzhonghui\".\"assay_record\"")
.load();
assayRecord.createOrReplaceTempView("assay_record");


Dataset<Row> westernMedicinePrescription = sparkSession.read()
.format("jdbc")
.option("driver", "org.apache.phoenix.jdbc.PhoenixDriver")
.option("phoenix.schema.isNamespaceMappingEnabled", "true")
.option("url", "jdbc:phoenix:cdh01:2181")
.option("url", JDBC_URL_PHOENIX)
.option("dbtable", "\"gzhonghui\".\"western_medicine_prescription\"")
.load();
westernMedicinePrescription.createOrReplaceTempView("western_medicine_prescription");
}

private static void readOutPatientRecordFromHbase(SparkSession sparkSession) throws AnalysisException {
// 使用phoenix jdbc连接驱动读取数据





}
}

0 comments on commit 0179167

Please sign in to comment.