Skip to content

Commit

Permalink
Merge branch 'feat_1.12_pluginMerge_hbase_support' into 'feat_1.12_pl…
Browse files Browse the repository at this point in the history
…uginMerge'

Feat 1.12 plugin merge hbase support

See merge request dt-insight-engine/flinkx!550
  • Loading branch information
FlechazoW committed Aug 18, 2021
2 parents e58edfc + ae8d45e commit d9c641a
Show file tree
Hide file tree
Showing 10 changed files with 147 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.dtstack.flinkx.connector.hbase14.util.HBaseUtils;
import com.dtstack.flinkx.lookup.AbstractAllTableFunction;
import com.dtstack.flinkx.lookup.conf.LookupConf;

import com.dtstack.flinkx.security.KerberosUtil;

import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -91,7 +90,9 @@ protected void loadData(Object cacheRef) {
String principal = HBaseConfigUtils.getPrincipal(hbaseConf.getHbaseConfig());

HBaseConfigUtils.fillSyncKerberosConfig(conf, hbaseConf.getHbaseConfig());
String keytab = HBaseConfigUtils.loadKeyFromConf(hbaseConf.getHbaseConfig(), HBaseConfigUtils.KEY_KEY_TAB);
String keytab =
HBaseConfigUtils.loadKeyFromConf(
hbaseConf.getHbaseConfig(), HBaseConfigUtils.KEY_KEY_TAB);

LOG.info("kerberos principal:{},keytab:{}", principal, keytab);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public HBase14SinkFactory(SyncConf config) {
public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
HBaseOutputFormatBuilder builder = new HBaseOutputFormatBuilder();
builder.setConfig(hbaseConf);
builder.setColumMetaInfos(hbaseConf.getColumnMetaInfos());
builder.setColumnMetaInfos(hbaseConf.getColumnMetaInfos());
builder.setEncoding(hbaseConf.getEncoding());
builder.setHbaseConfig(hbaseConf.getHbaseConfig());
builder.setNullMode(hbaseConf.getNullMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class HBaseDynamicTableSink implements DynamicTableSink {

private final HBaseConf conf;
private final TableSchema tableSchema;
private HBaseTableSchema hbaseSchema;
private final HBaseTableSchema hbaseSchema;

public HBaseDynamicTableSink(
HBaseConf conf, TableSchema tableSchema, HBaseTableSchema hbaseSchema) {
Expand Down Expand Up @@ -70,11 +70,11 @@ public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
}
HBaseOutputFormatBuilder builder = new HBaseOutputFormatBuilder();
if (conf.getColumn() != null) {
builder.setColumMetaInfos(conf.getColumn());
builder.setColumnMetaInfos(conf.getColumn());
} else if (conf.getColumnMetaInfos() != null) {
builder.setColumMetaInfos(conf.getColumnMetaInfos());
builder.setColumnMetaInfos(conf.getColumnMetaInfos());
} else if (!columnList.isEmpty()) {
builder.setColumMetaInfos(columnList);
builder.setColumnMetaInfos(columnList);
}
builder.setEncoding(conf.getEncoding());
builder.setHbaseConfig(conf.getHbaseConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,63 +72,52 @@
*/
public class HBaseOutputFormat extends BaseRichOutputFormat {

protected Map<String, Object> hbaseConfig;
private Map<String, Object> hbaseConfig;

protected String tableName;
private String tableName;
private String encoding;
private String nullMode;
private boolean walFlag;
private long writeBufferSize;

protected String encoding;
private List<String> columnTypes;
private List<String> columnNames;

protected String nullMode;
private String rowkeyExpress;
private Integer versionColumnIndex;

protected boolean walFlag;
private String versionColumnValue;
private List<String> rowKeyColumns = Lists.newArrayList();
private List<Integer> rowKeyColumnIndex = Lists.newArrayList();

protected long writeBufferSize;

protected List<String> columnTypes;

protected List<String> columnNames;

protected String rowkeyExpress;

protected Integer versionColumnIndex;

protected String versionColumnValue;
protected List<String> rowKeyColumns = Lists.newArrayList();
protected List<Integer> rowKeyColumnIndex = Lists.newArrayList();
private transient Connection connection;
private transient BufferedMutator bufferedMutator;
private transient FunctionTree functionTree;
private transient Map<String, String[]> nameMaps;

private transient Map<String, byte[][]> nameByteMaps;

private transient ThreadLocal<SimpleDateFormat> timeSecondFormatThreadLocal;

private transient ThreadLocal<SimpleDateFormat> timeMillisecondFormatThreadLocal;

private boolean openKerberos = false;
private transient Table table;

@Override
public void configure(Configuration parameters) {}

private Put generatePutCommand(RowData rowData) throws WriteRecordException {
RowData record = rowData;
int i = 0;
try {
byte[] rowkey = getRowkey(record);
byte[] rowkey = getRowkey(rowData);
Put put;
if (versionColumnIndex == null) {
put = new Put(rowkey);
if (!walFlag) {
put.setDurability(Durability.SKIP_WAL);
}
} else {
long timestamp = getVersion(record);
long timestamp = getVersion(rowData);
put = new Put(rowkey, timestamp);
}

for (; i < record.getArity(); ++i) {
for (; i < rowData.getArity(); ++i) {
if (rowKeyColumnIndex.contains(i)) {
continue;
}
Expand All @@ -155,10 +144,10 @@ private Put generatePutCommand(RowData rowData) throws WriteRecordException {

ColumnType columnType = ColumnType.getType(type);
Object column = null;
if (record instanceof GenericRowData) {
column = ((GenericRowData) record).getField(i);
} else if (record instanceof ColumnRowData) {
column = ((ColumnRowData) record).getField(i);
if (rowData instanceof GenericRowData) {
column = ((GenericRowData) rowData).getField(i);
} else if (rowData instanceof ColumnRowData) {
column = ((ColumnRowData) rowData).getField(i);
}
byte[] columnBytes = getColumnByte(columnType, column);
// columnBytes 为null忽略这列
Expand All @@ -168,9 +157,9 @@ private Put generatePutCommand(RowData rowData) throws WriteRecordException {
}
return put;
} catch (Exception ex) {
if (i < record.getArity()) {
if (i < rowData.getArity()) {
throw new WriteRecordException(
recordConvertDetailErrorMessage(i, record), ex, i, record);
recordConvertDetailErrorMessage(i, rowData), ex, i, rowData);
}
throw new WriteRecordException(ex.getMessage(), ex);
}
Expand All @@ -192,7 +181,7 @@ protected void writeSingleRecordInternal(RowData rawRecord) throws WriteRecordEx

@Override
public void openInternal(int taskNumber, int numTasks) throws IOException {
openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig);
boolean openKerberos = HBaseConfigUtils.isEnableKerberos(hbaseConfig);
if (openKerberos) {
sleepRandomTime();

Expand Down Expand Up @@ -517,7 +506,7 @@ private byte[] longToBytes(Object column) {
}

private byte[] doubleToBytes(Object column) {
Double doubleValue = null;
Double doubleValue;
if (column instanceof Integer) {
doubleValue = ((Integer) column).doubleValue();
} else if (column instanceof Long) {
Expand Down Expand Up @@ -620,4 +609,64 @@ public void closeInternal() throws IOException {
HBaseHelper.closeBufferedMutator(bufferedMutator);
HBaseHelper.closeConnection(connection);
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public void setHbaseConfig(Map<String, Object> hbaseConfig) {
this.hbaseConfig = hbaseConfig;
}

public void setColumnTypes(List<String> columnTypes) {
this.columnTypes = columnTypes;
}

public void setColumnNames(List<String> columnNames) {
this.columnNames = columnNames;
}

public void setRowkeyExpress(String rowkeyExpress) {
this.rowkeyExpress = rowkeyExpress;
}

public void setVersionColumnIndex(Integer versionColumnIndex) {
this.versionColumnIndex = versionColumnIndex;
}

public void setVersionColumnValue(String versionColumnValue) {
this.versionColumnValue = versionColumnValue;
}

public void setEncoding(String defaultEncoding) {
this.encoding = defaultEncoding;
}

public void setWriteBufferSize(Long writeBufferSize) {
this.writeBufferSize = writeBufferSize;
}

public void setNullMode(String nullMode) {
this.nullMode = nullMode;
}

public void setWalFlag(Boolean walFlag) {
this.walFlag = walFlag;
}

public String getTableName() {
return tableName;
}

public List<String> getColumnNames() {
return columnNames;
}

public List<String> getColumnTypes() {
return columnTypes;
}

public Map<String, Object> getHbaseConfig() {
return hbaseConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,87 +38,81 @@
*/
public class HBaseOutputFormatBuilder extends BaseRichOutputFormatBuilder {

private HBaseOutputFormat format;
private final HBaseOutputFormat format;

public HBaseOutputFormatBuilder() {
super.format = format = new HBaseOutputFormat();
}

public void setTableName(String tableName) {
format.tableName = tableName;
format.setTableName(tableName);
}

public void setHbaseConfig(Map<String, Object> hbaseConfig) {
format.hbaseConfig = hbaseConfig;
format.setHbaseConfig(hbaseConfig);
}

public void setColumnTypes(List<String> columnTypes) {
format.columnTypes = columnTypes;
format.setColumnTypes(columnTypes);
}

public void setColumnNames(List<String> columnNames) {
format.columnNames = columnNames;
format.setColumnNames(columnNames);
}

public void setRowkeyExpress(String rowkeyExpress) {
format.rowkeyExpress = rowkeyExpress;
format.setRowkeyExpress(rowkeyExpress);
}

public void setVersionColumnIndex(Integer versionColumnIndex) {
format.versionColumnIndex = versionColumnIndex;
format.setVersionColumnIndex(versionColumnIndex);
}

public void setVersionColumnValues(String versionColumnValue) {
format.versionColumnValue = versionColumnValue;
format.setVersionColumnValue(versionColumnValue);
}

public void setEncoding(String encoding) {
if (StringUtils.isEmpty(encoding)) {
format.encoding = HBaseConfigConstants.DEFAULT_ENCODING;
format.setEncoding(HBaseConfigConstants.DEFAULT_ENCODING);
} else {
format.encoding = encoding;
format.setEncoding(encoding);
}
}

public void setWriteBufferSize(Long writeBufferSize) {
if (writeBufferSize == null || writeBufferSize.longValue() == 0L) {
format.writeBufferSize = HBaseConfigConstants.DEFAULT_WRITE_BUFFER_SIZE;
if (writeBufferSize == null || writeBufferSize == 0L) {
format.setWriteBufferSize(HBaseConfigConstants.DEFAULT_WRITE_BUFFER_SIZE);
} else {
format.writeBufferSize = writeBufferSize;
format.setWriteBufferSize(writeBufferSize);
}
}

public void setNullMode(String nullMode) {
if (StringUtils.isEmpty(nullMode)) {
format.nullMode = HBaseConfigConstants.DEFAULT_NULL_MODE;
format.setNullMode(HBaseConfigConstants.DEFAULT_NULL_MODE);
} else {
format.nullMode = nullMode;
format.setNullMode(nullMode);
}
}

public void setWalFlag(Boolean walFlag) {
if (walFlag == null) {
format.walFlag = false;
format.setWalFlag(false);
} else {
format.walFlag = walFlag;
format.setWalFlag(walFlag);
}
}

@Override
protected void checkFormat() {
Preconditions.checkArgument(StringUtils.isNotEmpty(format.tableName));
Preconditions.checkNotNull(format.hbaseConfig);
Preconditions.checkNotNull(format.columnNames);
Preconditions.checkNotNull(format.columnTypes);

// if (format.getRestoreConfig() != null && format.getRestoreConfig().isRestore()){
// throw new UnsupportedOperationException("This plugin not support restore from
// failed state");
// }
// notSupportBatchWrite("HbaseWriter");
Preconditions.checkArgument(StringUtils.isNotEmpty(format.getTableName()));
Preconditions.checkNotNull(format.getHbaseConfig());
Preconditions.checkNotNull(format.getColumnNames());
Preconditions.checkNotNull(format.getColumnTypes());
}

public void setColumMetaInfos(List<FieldConf> columnMetaInfos) {
public void setColumnMetaInfos(List<FieldConf> columnMetaInfos) {
if (columnMetaInfos != null && !columnMetaInfos.isEmpty()) {
List<String> names =
columnMetaInfos.stream().map(FieldConf::getName).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,10 @@ public static String loadKeyFromConf(Map<String, Object> config, String key) {
String value = MapUtils.getString(config, key);
if (!StringUtils.isEmpty(value)) {
String krb5ConfPath;
if(Paths.get(value).toFile().exists()) {
if (Paths.get(value).toFile().exists()) {
krb5ConfPath = value;
}else {
krb5ConfPath =
System.getProperty("user.dir") + File.separator + value;
} else {
krb5ConfPath = System.getProperty("user.dir") + File.separator + value;
}
LOG.info("[{}]:{}", key, krb5ConfPath);
return krb5ConfPath;
Expand Down
Loading

0 comments on commit d9c641a

Please sign in to comment.