Skip to content

Commit

Permalink
[feat-3769][hbase] add hbase lookup support
Browse files Browse the repository at this point in the history
  • Loading branch information
shifang authored and a49a committed Nov 16, 2021
1 parent 2a093b6 commit 7e0f9f6
Show file tree
Hide file tree
Showing 16 changed files with 489 additions and 652 deletions.
10 changes: 10 additions & 0 deletions flinkx-connectors/flinkx-connector-hbase-1.4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@
<configuration>
<outputDirectory>${project.basedir}/target/hbase-1.4</outputDirectory>
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>org.jboss.netty</pattern>
<shadedPattern>com.dtstack.flinkx.connector.hbase14.org.jboss.netty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>com.dtstack.flinkx.connector.hbase14.org.apache.http</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<excludes>
<exclude>org.slf4j:slf4j-api</exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@

import java.util.Map;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* @program: flinkx
* @author: wuren
Expand All @@ -45,25 +43,26 @@ public AsyncHBaseSerde(HBaseTableSchema hbaseSchema, String nullStringLiteral) {
*
* <p>Note: this method is thread-safe.
*/
public RowData convertToNewRow(Map<String, byte[]> result, byte[] rowkey) {
public RowData convertToNewRow(Map<String, Map<String, byte[]>> result, byte[] rowkey) {
// The output rows needs to be initialized each time
// to prevent the possibility of putting the output object into the cache.
GenericRowData resultRow = new GenericRowData(fieldLength);
GenericRowData[] familyRows = new GenericRowData[families.length];
for (int f = 0; f < families.length; f++) {
familyRows[f] = new GenericRowData(qualifiers[f].length);
}

return convertToRow(result, resultRow, familyRows, rowkey);
}

protected RowData convertToRow(
Map<String, byte[]> result,
Map<String, Map<String, byte[]>> result,
GenericRowData resultRow,
GenericRowData[] familyRows,
byte[] rowkey) {
for (int i = 0; i < fieldLength; i++) {
if (rowkeyIndex == i) {
resultRow.setField(rowkeyIndex, rowkey);
resultRow.setField(rowkeyIndex, keyDecoder.decode(rowkey));
} else {
int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
// get family key
Expand All @@ -73,24 +72,12 @@ protected RowData convertToRow(
// get quantifier key
byte[] qualifier = qualifiers[f][q];
// read value
String key = new String(familyKey) + ":" + new String(qualifier);
byte[] value = result.get(key);
byte[] value = result.get(new String(familyKey)).get(new String(qualifier));
familyRow.setField(q, qualifierDecoders[f][q].decode(value));
}
resultRow.setField(i, familyRow);
}
}
return resultRow;
}

public byte[] getRowKey(Object rowKey) {
checkArgument(keyEncoder != null, "row key is not set.");
rowWithRowKey.setField(0, rowKey);
byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0);
if (rowkey.length == 0) {
// drop dirty records, rowkey shouldn't be zero length
return null;
}
return rowkey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.dtstack.flinkx.connector.hbase14.source.HBaseInputFormatBuilder;
import com.dtstack.flinkx.connector.hbase14.table.lookup.HBaseAllTableFunction;
import com.dtstack.flinkx.connector.hbase14.table.lookup.HBaseLruTableFunction;
import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils;
import com.dtstack.flinkx.converter.AbstractRowConverter;
import com.dtstack.flinkx.enums.CacheType;
import com.dtstack.flinkx.lookup.conf.LookupConf;
Expand All @@ -45,6 +46,8 @@
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;

import org.apache.hadoop.conf.Configuration;

import java.util.ArrayList;
import java.util.List;

Expand All @@ -56,20 +59,24 @@
public class HBaseDynamicTableSource
implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown {

private final HBaseConf conf;
private final HBaseConf hBaseConf;
private Configuration conf;
private TableSchema tableSchema;
private final LookupConf lookupConf;
private final HBaseTableSchema hbaseSchema;
protected final String nullStringLiteral;

public HBaseDynamicTableSource(
HBaseConf conf,
TableSchema tableSchema,
LookupConf lookupConf,
HBaseTableSchema hbaseSchema) {
this.conf = conf;
HBaseTableSchema hbaseSchema,
String nullStringLiteral) {
this.hBaseConf = conf;
this.tableSchema = tableSchema;
this.lookupConf = lookupConf;
this.hbaseSchema = hbaseSchema;
this.nullStringLiteral = nullStringLiteral;
}

@Override
Expand All @@ -85,24 +92,25 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
field.setIndex(i);
columnList.add(field);
}
conf.setColumn(columnList);
hBaseConf.setColumn(columnList);
HBaseInputFormatBuilder builder = new HBaseInputFormatBuilder();
builder.setColumnMetaInfos(conf.getColumnMetaInfos());
builder.setConfig(conf);
builder.setEncoding(conf.getEncoding());
builder.setHbaseConfig(conf.getHbaseConfig());
builder.setTableName(conf.getTableName());
builder.setColumnMetaInfos(hBaseConf.getColumnMetaInfos());
builder.setConfig(hBaseConf);
builder.setEncoding(hBaseConf.getEncoding());
builder.setHbaseConfig(hBaseConf.getHbaseConfig());
builder.setTableName(hBaseConf.getTableName());
AbstractRowConverter rowConverter = new HBaseConverter(rowType);
builder.setRowConverter(rowConverter);
return ParallelSourceFunctionProvider.of(
new DtInputFormatSourceFunction<>(builder.finish(), typeInformation),
true,
conf.getParallelism());
hBaseConf.getParallelism());
}

@Override
public DynamicTableSource copy() {
return new HBaseDynamicTableSource(this.conf, tableSchema, lookupConf, hbaseSchema);
return new HBaseDynamicTableSource(
this.hBaseConf, tableSchema, lookupConf, hbaseSchema, nullStringLiteral);
}

@Override
Expand All @@ -124,19 +132,39 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
innerKeyArr.length == 1, "redis only support non-nested look up keys");
keyNames[i] = tableSchema.getFieldNames()[innerKeyArr[0]];
}
final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();

setConf();
hbaseSchema.setTableName(hBaseConf.getTableName());
if (lookupConf.getCache().equalsIgnoreCase(CacheType.LRU.toString())) {
return ParallelAsyncTableFunctionProvider.of(
new HBaseLruTableFunction(conf, lookupConf, hbaseSchema),
new HBaseLruTableFunction(
conf, lookupConf, hbaseSchema, hBaseConf.getNullMode()),
lookupConf.getParallelism());
}
return ParallelTableFunctionProvider.of(
new HBaseAllTableFunction(
conf, lookupConf, tableSchema.getFieldNames(), keyNames, hbaseSchema),
new HBaseAllTableFunction(conf, lookupConf, hbaseSchema, nullStringLiteral),
lookupConf.getParallelism());
}

private void setConf() {
if (HBaseConfigUtils.isEnableKerberos(hBaseConf.getHbaseConfig())) {
conf = HBaseConfigUtils.getHadoopConfiguration(hBaseConf.getHbaseConfig());
String principal = HBaseConfigUtils.getPrincipal(hBaseConf.getHbaseConfig());
HBaseConfigUtils.fillSyncKerberosConfig(conf, hBaseConf.getHbaseConfig());
String keytab =
HBaseConfigUtils.loadKeyFromConf(
hBaseConf.getHbaseConfig(), HBaseConfigUtils.KEY_KEY_TAB);
String krb5Conf =
HBaseConfigUtils.loadKeyFromConf(
hBaseConf.getHbaseConfig(),
HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF);
conf.set(HBaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE, keytab);
conf.set(HBaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL, principal);
conf.set(HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5Conf);
} else {
conf = HBaseConfigUtils.getConfig(hBaseConf.getHbaseConfig());
}
}

@Override
public boolean supportsNestedProjection() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
LookupConf lookupConf =
getLookupConf(config, context.getObjectIdentifier().getObjectName());
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(physicalSchema);
return new HBaseDynamicTableSource(conf, physicalSchema, lookupConf, hbaseSchema);
String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL);
return new HBaseDynamicTableSource(
conf, physicalSchema, lookupConf, hbaseSchema, nullStringLiteral);
}

private static void validatePrimaryKey(TableSchema schema) {
Expand Down
Loading

0 comments on commit 7e0f9f6

Please sign in to comment.