Skip to content
This repository has been archived by the owner on Aug 28, 2018. It is now read-only.

Latest commit

 

History

History
28 lines (19 loc) · 1.03 KB

File metadata and controls

28 lines (19 loc) · 1.03 KB

flink-on-iae-messagehub-to-phoenix

Write kafka data to phoenix on IAE

//String query = "select CURRENT_DATE SEGMENTSTARTTIME, CURRENT_DATE SEGMENTENDTIME, cast (imsi as varchar) imsi, cast(imei as varchar) imei from ts ";
//Table table = ste.sqlQuery(query);

// Table table = from kafka?

JDBCAppendTableSinkBuilder jdbc = JDBCAppendTableSink.builder();
jdbc.setDrivername("org.apache.phoenix.jdbc.PhoenixDriver");
jdbc.setDBUrl("jdbc:phoenix:hosts:2181:/hbase-unsecure;autocommit=true");
jdbc.setQuery("upsert INTO GEO_ANALYTICS_STREAMING_DATA (SEGMENTSTARTTIME,SEGMENTENDTIME, imsi, imei) values (?,?,?, ?)");
jdbc.setParameterTypes(Types.SQL_DATE, Types.SQL_DATE, Types.STRING, Types.STRING);
JDBCAppendTableSink sink = jdbc.build();
table.writeToSink(sink);

Source: https://issues.apache.org/jira/browse/FLINK-8356

Query from hive (not all reporting tools support phoenix)

https://hortonworks.com/blog/hbase-hive-better-together/

Export to COS for access by other wdp tools

Refinery Hive connector? IAE Spark job?