diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/pom.xml b/flinkx-connectors/flinkx-connector-hbase-1.4/pom.xml index 7b449dc8a4..ea2eefe666 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/pom.xml +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/pom.xml @@ -175,6 +175,16 @@ ${project.basedir}/target/hbase-1.4 false + + + org.jboss.netty + com.dtstack.flinkx.connector.hbase14.org.jboss.netty + + + org.apache.http + com.dtstack.flinkx.connector.hbase14.org.apache.http + + org.slf4j:slf4j-api diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/converter/AsyncHBaseSerde.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/converter/AsyncHBaseSerde.java index 74191e833a..ad78f681ad 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/converter/AsyncHBaseSerde.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/converter/AsyncHBaseSerde.java @@ -28,8 +28,6 @@ import java.util.Map; -import static org.apache.flink.util.Preconditions.checkArgument; - /** * @program: flinkx * @author: wuren @@ -45,7 +43,7 @@ public AsyncHBaseSerde(HBaseTableSchema hbaseSchema, String nullStringLiteral) { * *

Note: this method is thread-safe. */ - public RowData convertToNewRow(Map result, byte[] rowkey) { + public RowData convertToNewRow(Map> result, byte[] rowkey) { // The output rows needs to be initialized each time // to prevent the possibility of putting the output object into the cache. GenericRowData resultRow = new GenericRowData(fieldLength); @@ -53,17 +51,18 @@ public RowData convertToNewRow(Map result, byte[] rowkey) { for (int f = 0; f < families.length; f++) { familyRows[f] = new GenericRowData(qualifiers[f].length); } + return convertToRow(result, resultRow, familyRows, rowkey); } protected RowData convertToRow( - Map result, + Map> result, GenericRowData resultRow, GenericRowData[] familyRows, byte[] rowkey) { for (int i = 0; i < fieldLength; i++) { if (rowkeyIndex == i) { - resultRow.setField(rowkeyIndex, rowkey); + resultRow.setField(rowkeyIndex, keyDecoder.decode(rowkey)); } else { int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i; // get family key @@ -73,8 +72,7 @@ protected RowData convertToRow( // get quantifier key byte[] qualifier = qualifiers[f][q]; // read value - String key = new String(familyKey) + ":" + new String(qualifier); - byte[] value = result.get(key); + byte[] value = result.get(new String(familyKey)).get(new String(qualifier)); familyRow.setField(q, qualifierDecoders[f][q].decode(value)); } resultRow.setField(i, familyRow); @@ -82,15 +80,4 @@ protected RowData convertToRow( } return resultRow; } - - public byte[] getRowKey(Object rowKey) { - checkArgument(keyEncoder != null, "row key is not set."); - rowWithRowKey.setField(0, rowKey); - byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0); - if (rowkey.length == 0) { - // drop dirty records, rowkey shouldn't be zero length - return null; - } - return rowkey; - } } diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/HBaseDynamicTableSource.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/HBaseDynamicTableSource.java index 25660a9173..c433972c67 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/HBaseDynamicTableSource.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/HBaseDynamicTableSource.java @@ -24,6 +24,7 @@ import com.dtstack.flinkx.connector.hbase14.source.HBaseInputFormatBuilder; import com.dtstack.flinkx.connector.hbase14.table.lookup.HBaseAllTableFunction; import com.dtstack.flinkx.connector.hbase14.table.lookup.HBaseLruTableFunction; +import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils; import com.dtstack.flinkx.converter.AbstractRowConverter; import com.dtstack.flinkx.enums.CacheType; import com.dtstack.flinkx.lookup.conf.LookupConf; @@ -45,6 +46,8 @@ import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; +import org.apache.hadoop.conf.Configuration; + import java.util.ArrayList; import java.util.List; @@ -56,20 +59,24 @@ public class HBaseDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown { - private final HBaseConf conf; + private final HBaseConf hBaseConf; + private Configuration conf; private TableSchema tableSchema; private final LookupConf lookupConf; private final HBaseTableSchema hbaseSchema; + protected final String nullStringLiteral; public HBaseDynamicTableSource( HBaseConf conf, TableSchema tableSchema, LookupConf lookupConf, - HBaseTableSchema hbaseSchema) { - this.conf = conf; + HBaseTableSchema hbaseSchema, + String nullStringLiteral) { + this.hBaseConf = conf; this.tableSchema = tableSchema; this.lookupConf = lookupConf; this.hbaseSchema = hbaseSchema; + this.nullStringLiteral = nullStringLiteral; } @Override @@ -85,24 +92,25 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon field.setIndex(i); columnList.add(field); } - conf.setColumn(columnList); + hBaseConf.setColumn(columnList); HBaseInputFormatBuilder builder = new HBaseInputFormatBuilder(); - builder.setColumnMetaInfos(conf.getColumnMetaInfos()); - builder.setConfig(conf); - builder.setEncoding(conf.getEncoding()); - builder.setHbaseConfig(conf.getHbaseConfig()); - builder.setTableName(conf.getTableName()); + builder.setColumnMetaInfos(hBaseConf.getColumnMetaInfos()); + builder.setConfig(hBaseConf); + builder.setEncoding(hBaseConf.getEncoding()); + builder.setHbaseConfig(hBaseConf.getHbaseConfig()); + builder.setTableName(hBaseConf.getTableName()); AbstractRowConverter rowConverter = new HBaseConverter(rowType); builder.setRowConverter(rowConverter); return ParallelSourceFunctionProvider.of( new DtInputFormatSourceFunction<>(builder.finish(), typeInformation), true, - conf.getParallelism()); + hBaseConf.getParallelism()); } @Override public DynamicTableSource copy() { - return new HBaseDynamicTableSource(this.conf, tableSchema, lookupConf, hbaseSchema); + return new HBaseDynamicTableSource( + this.hBaseConf, tableSchema, lookupConf, hbaseSchema, nullStringLiteral); } @Override @@ -124,19 +132,39 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { innerKeyArr.length == 1, "redis only support non-nested look up keys"); keyNames[i] = tableSchema.getFieldNames()[innerKeyArr[0]]; } - final RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); - + setConf(); + hbaseSchema.setTableName(hBaseConf.getTableName()); if (lookupConf.getCache().equalsIgnoreCase(CacheType.LRU.toString())) { return ParallelAsyncTableFunctionProvider.of( - new HBaseLruTableFunction(conf, lookupConf, hbaseSchema), + new HBaseLruTableFunction( + conf, lookupConf, hbaseSchema, hBaseConf.getNullMode()), lookupConf.getParallelism()); } return ParallelTableFunctionProvider.of( - new HBaseAllTableFunction( - conf, lookupConf, tableSchema.getFieldNames(), keyNames, hbaseSchema), + new HBaseAllTableFunction(conf, lookupConf, hbaseSchema, nullStringLiteral), lookupConf.getParallelism()); } + private void setConf() { + if (HBaseConfigUtils.isEnableKerberos(hBaseConf.getHbaseConfig())) { + conf = HBaseConfigUtils.getHadoopConfiguration(hBaseConf.getHbaseConfig()); + String principal = HBaseConfigUtils.getPrincipal(hBaseConf.getHbaseConfig()); + HBaseConfigUtils.fillSyncKerberosConfig(conf, hBaseConf.getHbaseConfig()); + String keytab = + HBaseConfigUtils.loadKeyFromConf( + hBaseConf.getHbaseConfig(), HBaseConfigUtils.KEY_KEY_TAB); + String krb5Conf = + HBaseConfigUtils.loadKeyFromConf( + hBaseConf.getHbaseConfig(), + HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF); + conf.set(HBaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE, keytab); + conf.set(HBaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL, principal); + conf.set(HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5Conf); + } else { + conf = HBaseConfigUtils.getConfig(hBaseConf.getHbaseConfig()); + } + } + @Override public boolean supportsNestedProjection() { return false; diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/Hbase14DynamicTableFactory.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/Hbase14DynamicTableFactory.java index 75c9f21215..e51eb06e1d 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/Hbase14DynamicTableFactory.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/Hbase14DynamicTableFactory.java @@ -119,7 +119,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { LookupConf lookupConf = getLookupConf(config, context.getObjectIdentifier().getObjectName()); HBaseTableSchema hbaseSchema = HBaseTableSchema.fromTableSchema(physicalSchema); - return new HBaseDynamicTableSource(conf, physicalSchema, lookupConf, hbaseSchema); + String nullStringLiteral = helper.getOptions().get(NULL_STRING_LITERAL); + return new HBaseDynamicTableSource( + conf, physicalSchema, lookupConf, hbaseSchema, nullStringLiteral); } private static void validatePrimaryKey(TableSchema schema) { diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseAllTableFunction.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseAllTableFunction.java index 65564c0090..d5073760d2 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseAllTableFunction.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseAllTableFunction.java @@ -18,20 +18,20 @@ package com.dtstack.flinkx.connector.hbase14.table.lookup; -import com.dtstack.flinkx.conf.FieldConf; +import com.dtstack.flinkx.connector.hbase.HBaseConfigurationUtil; import com.dtstack.flinkx.connector.hbase.HBaseSerde; import com.dtstack.flinkx.connector.hbase.HBaseTableSchema; -import com.dtstack.flinkx.connector.hbase14.conf.HBaseConf; import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils; -import com.dtstack.flinkx.connector.hbase14.util.HBaseUtils; import com.dtstack.flinkx.lookup.AbstractAllTableFunction; import com.dtstack.flinkx.lookup.conf.LookupConf; import com.dtstack.flinkx.security.KerberosUtil; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; + +import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ChoreService; import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.TableName; @@ -41,78 +41,61 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.PrivilegedAction; -import java.util.HashMap; +import java.time.LocalDateTime; import java.util.Map; -import java.util.Optional; - -import static com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils.KEY_PRINCIPAL; public class HBaseAllTableFunction extends AbstractAllTableFunction { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseAllTableFunction.class); - private final HBaseConf hbaseConf; + private Configuration conf; + private final byte[] serializedConfig; private Connection conn; + private String tableName; private Table table; private ResultScanner resultScanner; - private final transient Map hbaseConfig; private final HBaseTableSchema hbaseTableSchema; private transient HBaseSerde serde; + private final String nullStringLiteral; public HBaseAllTableFunction( - HBaseConf conf, + Configuration conf, LookupConf lookupConf, - String[] fieldNames, - String[] keyNames, - HBaseTableSchema hbaseTableSchema) { - super(fieldNames, keyNames, lookupConf, null); - this.hbaseConf = conf; + HBaseTableSchema hbaseTableSchema, + String nullStringLiteral) { + super(null, null, lookupConf, null); + this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); + this.tableName = hbaseTableSchema.getTableName(); this.hbaseTableSchema = hbaseTableSchema; - hbaseConfig = conf.getHbaseConfig(); + this.nullStringLiteral = nullStringLiteral; + } + + @Override + public void open(FunctionContext context) throws Exception { + this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral); + super.open(context); } @Override protected void loadData(Object cacheRef) { - Configuration conf; + conf = HBaseConfigurationUtil.prepareRuntimeConfiguration(serializedConfig); int loadDataCount = 0; try { - if (HBaseConfigUtils.isEnableKerberos(hbaseConfig)) { - conf = HBaseConfigUtils.getHadoopConfiguration(hbaseConf.getHbaseConfig()); - conf.set( - HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, - (String) - hbaseConf - .getHbaseConfig() - .get(HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM)); - conf.set( - HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, - (String) - hbaseConf - .getHbaseConfig() - .get(HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM)); - - String principal = HBaseConfigUtils.getPrincipal(hbaseConf.getHbaseConfig()); - - HBaseConfigUtils.fillSyncKerberosConfig(conf, hbaseConf.getHbaseConfig()); - String keytab = - HBaseConfigUtils.loadKeyFromConf( - hbaseConf.getHbaseConfig(), HBaseConfigUtils.KEY_KEY_TAB); - + if (HBaseConfigUtils.isEnableKerberos(conf)) { + String principal = conf.get(HBaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL); + String keytab = conf.get(HBaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE); + String krb5Conf = conf.get(HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF); LOG.info("kerberos principal:{},keytab:{}", principal, keytab); - - conf.set(HBaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE, keytab); - conf.set(HBaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL, principal); - + System.setProperty(HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, krb5Conf); UserGroupInformation userGroupInformation = - KerberosUtil.loginAndReturnUgi(conf.get(KEY_PRINCIPAL), principal, keytab); + KerberosUtil.loginAndReturnUgi(principal, keytab, krb5Conf); Configuration finalConf = conf; conn = userGroupInformation.doAs( @@ -139,47 +122,18 @@ protected void loadData(Object cacheRef) { }); } else { - conf = HBaseConfigUtils.getConfig(hbaseConf.getHbaseConfig()); - conf.set( - HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, - (String) - hbaseConf - .getHbaseConfig() - .get(HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM)); - conf.set( - HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, - (String) - hbaseConf - .getHbaseConfig() - .get(HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM)); conn = ConnectionFactory.createConnection(conf); } - table = conn.getTable(TableName.valueOf(hbaseConf.getTableName())); + table = conn.getTable(TableName.valueOf(tableName)); resultScanner = table.getScanner(new Scan()); + Map tmpCache = (Map) cacheRef; for (Result r : resultScanner) { - Map kv = new HashMap<>(); - for (Cell cell : r.listCells()) { - String family = Bytes.toString(CellUtil.cloneFamily(cell)); - String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell)); - StringBuilder key = new StringBuilder(); - key.append(family).append(":").append(qualifier); - Optional typeOption = - hbaseConf.getColumnMetaInfos().stream() - .filter(fieldConf -> key.toString().equals(fieldConf.getName())) - .map(FieldConf::getType) - .findAny(); - Object value = - HBaseUtils.convertByte( - CellUtil.cloneValue(cell), - typeOption.orElseThrow(IllegalArgumentException::new)); - kv.put((key.toString()), value); - } + tmpCache.put(serde.getRowKey(r.getRow()), serde.convertToReusedRow(r)); loadDataCount++; - fillData(kv); } } catch (IOException e) { - throw new RuntimeException(e); + LOG.error(e.getMessage(), e); } finally { LOG.info("load Data count: {}", loadDataCount); try { @@ -199,4 +153,34 @@ protected void loadData(Object cacheRef) { } } } + + @Override + protected void initCache() { + Map newCache = Maps.newConcurrentMap(); + cacheRef.set(newCache); + loadData(newCache); + } + + /** 定时加载数据库中数据 */ + @Override + protected void reloadCache() { + // reload cacheRef and replace to old cacheRef + Map newCache = Maps.newConcurrentMap(); + loadData(newCache); + cacheRef.set(newCache); + LOG.info( + "----- " + lookupConf.getTableName() + ": all cacheRef reload end:{}", + LocalDateTime.now()); + } + + /** + * The invoke entry point of lookup function. + * + * @param keys the lookup key. Currently only support single rowkey. + */ + @Override + public void eval(Object... keys) { + Map cache = (Map) cacheRef.get(); + collect(cache.get(keys[0])); + } } diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseLruTableFunction.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseLruTableFunction.java index fe1dafeb90..2c2d70cd3e 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseLruTableFunction.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/table/lookup/HBaseLruTableFunction.java @@ -18,9 +18,8 @@ package com.dtstack.flinkx.connector.hbase14.table.lookup; -import com.dtstack.flinkx.conf.FieldConf; +import com.dtstack.flinkx.connector.hbase.HBaseConfigurationUtil; import com.dtstack.flinkx.connector.hbase.HBaseTableSchema; -import com.dtstack.flinkx.connector.hbase14.conf.HBaseConf; import com.dtstack.flinkx.connector.hbase14.converter.AsyncHBaseSerde; import com.dtstack.flinkx.connector.hbase14.util.DtFileUtils; import com.dtstack.flinkx.connector.hbase14.util.HBaseConfigUtils; @@ -38,7 +37,7 @@ import com.google.common.collect.Maps; import com.stumbleupon.async.Deferred; -import org.apache.commons.collections.MapUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authentication.util.KerberosName; import org.hbase.async.Config; import org.hbase.async.GetRequest; @@ -50,9 +49,9 @@ import javax.security.auth.login.AppConfigurationEntry; -import java.io.File; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -63,33 +62,43 @@ public class HBaseLruTableFunction extends AbstractLruTableFunction { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(HBaseLruTableFunction.class); - private final HBaseConf conf; + private Config asyncClientConfig; + private Configuration conf; + private final byte[] serializedConfig; + private final String nullStringLiteral; private static final int DEFAULT_BOSS_THREADS = 1; private static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2; private static final int DEFAULT_POOL_SIZE = DEFAULT_IO_THREADS + DEFAULT_BOSS_THREADS; private transient HBaseClient hBaseClient; private String tableName; - private String[] colNames; private final HBaseTableSchema hbaseTableSchema; private transient AsyncHBaseSerde serde; public HBaseLruTableFunction( - HBaseConf conf, LookupConf lookupConf, HBaseTableSchema hbaseTableSchema) { + Configuration conf, + LookupConf lookupConf, + HBaseTableSchema hbaseTableSchema, + String nullStringLiteral) { super(lookupConf, null); - this.conf = conf; + this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); this.lookupConf = lookupConf; this.hbaseTableSchema = hbaseTableSchema; + this.nullStringLiteral = nullStringLiteral; } @Override public void open(FunctionContext context) throws Exception { super.open(context); - this.serde = new AsyncHBaseSerde(hbaseTableSchema, conf.getNullMode()); - tableName = conf.getTableName(); - colNames = - conf.getColumnMetaInfos().stream().map(FieldConf::getName).toArray(String[]::new); - Map hbaseConfig = conf.getHbaseConfig(); + conf = HBaseConfigurationUtil.prepareRuntimeConfiguration(serializedConfig); + asyncClientConfig = new Config(); + Iterator> iterator = conf.iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + asyncClientConfig.overrideConfig(entry.getKey(), entry.getValue()); + } + this.serde = new AsyncHBaseSerde(hbaseTableSchema, nullStringLiteral); + tableName = hbaseTableSchema.getTableName(); ExecutorService executorService = new ThreadPoolExecutor( DEFAULT_POOL_SIZE, @@ -98,45 +107,31 @@ public void open(FunctionContext context) throws Exception { TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new FlinkxThreadFactory("hbase-async")); - - Config config = new Config(); - config.overrideConfig( - HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM, - (String) conf.getHbaseConfig().get(HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_QUORUM)); - config.overrideConfig( - HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM, - (String) - conf.getHbaseConfig() - .get(HBaseConfigUtils.KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM)); - hbaseConfig.forEach((key, value) -> config.overrideConfig(key, (String) value)); - - if (HBaseConfigUtils.isEnableKerberos(hbaseConfig)) { - HBaseConfigUtils.loadKrb5Conf(hbaseConfig); - String principal = MapUtils.getString(hbaseConfig, HBaseConfigUtils.KEY_PRINCIPAL); - HBaseConfigUtils.checkOpt(principal, HBaseConfigUtils.KEY_PRINCIPAL); - String regionserverPrincipal = - MapUtils.getString( - hbaseConfig, - HBaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL); - HBaseConfigUtils.checkOpt( - regionserverPrincipal, - HBaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL); - String keytab = MapUtils.getString(hbaseConfig, HBaseConfigUtils.KEY_KEY_TAB); - HBaseConfigUtils.checkOpt(keytab, HBaseConfigUtils.KEY_KEY_TAB); - String keytabPath = System.getProperty("user.dir") + File.separator + keytab; - DtFileUtils.checkExists(keytabPath); - + if (HBaseConfigUtils.isEnableKerberos(conf)) { + System.setProperty( + HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF, + asyncClientConfig.getString(HBaseConfigUtils.KEY_JAVA_SECURITY_KRB5_CONF)); + String principal = + asyncClientConfig.getString( + HBaseConfigUtils.KEY_HBASE_CLIENT_KERBEROS_PRINCIPAL); + String keytab = + asyncClientConfig.getString(HBaseConfigUtils.KEY_HBASE_CLIENT_KEYTAB_FILE); + DtFileUtils.checkExists(keytab); LOG.info("Kerberos login with keytab: {} and principal: {}", keytab, principal); String name = "HBaseClient"; - config.overrideConfig("hbase.sasl.clientconfig", name); + asyncClientConfig.overrideConfig("hbase.sasl.clientconfig", name); appendJaasConf(name, keytab, principal); refreshConfig(); } - hBaseClient = new HBaseClient(config, executorService); + hBaseClient = new HBaseClient(asyncClientConfig, executorService); try { Deferred deferred = - hBaseClient.ensureTableExists(tableName).addCallbacks(arg -> arg, arg -> arg); + hBaseClient + .ensureTableExists(tableName) + .addCallbacks( + arg -> new CheckResult(true, ""), + arg -> new CheckResult(false, arg.toString())); CheckResult result = (CheckResult) deferred.join(); if (!result.isConnect()) { @@ -160,12 +155,17 @@ public void handleAsyncInvoke( .addCallbacks( keyValues -> { try { - Map sideMap = Maps.newHashMap(); + Map> sideMap = Maps.newHashMap(); for (KeyValue keyValue : keyValues) { String cf = new String(keyValue.family()); String col = new String(keyValue.qualifier()); - String mapKey = cf + ":" + col; - sideMap.put(mapKey, keyValue.value()); + if (!sideMap.containsKey(cf)) { + Map cfMap = Maps.newHashMap(); + cfMap.put(col, keyValue.value()); + sideMap.put(cf, cfMap); + } else { + sideMap.get(cf).putIfAbsent(col, keyValue.value()); + } } RowData rowData = serde.convertToNewRow(sideMap, key); if (keyValues.size() > 0) { @@ -226,10 +226,6 @@ public void close() throws Exception { hBaseClient.shutdown(); } - protected RowData fillData(Object sideInput) throws Exception { - return rowConverter.toInternalLookup(sideInput); - } - class CheckResult { private boolean connect; diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/ByteUtils.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/ByteUtils.java deleted file mode 100644 index b73d394b50..0000000000 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/ByteUtils.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flinkx.connector.hbase14.util; - -import org.apache.commons.io.Charsets; - -import java.nio.ByteBuffer; - -/** - * Date: 2018/8/28 Company: www.dtstack.com - * - * @author xuchao - */ -public class ByteUtils { - - public static boolean toBoolean(final byte[] b) { - if (b.length != 1) { - throw new IllegalArgumentException("Array has wrong size: " + b.length); - } - return b[0] != (byte) 0; - } - - public static String byteToString(byte[] bytes) { - return new String(bytes, Charsets.UTF_8); - } - - public static byte[] shortToByte4(short i) { - byte[] targets = new byte[2]; - targets[1] = (byte) (i & 0xFF); - targets[0] = (byte) (i >> 8 & 0xFF); - return targets; - } - - public static Short byte2ToShort(byte[] bytes) { - - if (bytes.length != 2) { - throw new RuntimeException("byte2ToUnsignedShort input bytes length need == 2"); - } - - short high = (short) (bytes[0] & 0xFF); - short low = (short) (bytes[1] & 0xFF); - return (short) (high << 8 | low); - } - - /*将INT类型转化为10进制byte数组(占4字节)*/ - public static byte[] int2Bytes(int num) { - byte[] byteNum = new byte[4]; - for (int ix = 0; ix < 4; ++ix) { - int offset = 32 - (ix + 1) * 8; - byteNum[ix] = (byte) ((num >> offset) & 0xff); - } - return byteNum; - } - - /** - * byte数组转换为int整数 - * - * @param byteNum byte数组 - * @return int整数 - */ - public static int byte4ToInt(byte[] byteNum) { - - if (byteNum.length != 4) { - throw new RuntimeException("byte4ToInt input bytes length need == 4"); - } - - int num = 0; - for (int ix = 0; ix < 4; ++ix) { - num <<= 8; - num |= (byteNum[ix] & 0xff); - } - return num; - } - - /*将长整形转化为byte数组*/ - public static byte[] long2Bytes(long num) { - byte[] byteNum = new byte[8]; - for (int ix = 0; ix < 8; ++ix) { - int offset = 64 - (ix + 1) * 8; - byteNum[ix] = (byte) ((num >> offset) & 0xff); - } - return byteNum; - } - - /*将byte数组(长度为8)转化为长整形*/ - public static long bytes2Long(byte[] byteNum) { - - if (byteNum.length != 8) { - throw new RuntimeException("bytes2Long input bytes length need == 8"); - } - - long num = 0; - for (int ix = 0; ix < 8; ++ix) { - num <<= 8; - num |= (byteNum[ix] & 0xff); - } - return num; - } - - public static byte bytes2Byte(byte[] byteNum) { - if (byteNum.length != 8) { - throw new RuntimeException("bytes2Byte input bytes length need == 1"); - } - - return byteNum[0]; - } - - /** 将float转化为byte数组,占用4个字节* */ - public static byte[] float2ByteArray(float value) { - return ByteBuffer.allocate(4).putFloat(value).array(); - } - - /** - * 将10进制byte数组转化为Float - * - * @param b 字节(至少4个字节) - * @return - */ - public static float bytes2Float(byte[] b) { - int l; - l = b[0]; - l &= 0xff; - l |= ((long) b[1] << 8); - l &= 0xffff; - l |= ((long) b[2] << 16); - l &= 0xffffff; - l |= ((long) b[3] << 24); - return Float.intBitsToFloat(l); - } - - public static byte[] double2Bytes(double d) { - long value = Double.doubleToRawLongBits(d); - byte[] byteRet = new byte[8]; - for (int i = 0; i < 8; i++) { - byteRet[i] = (byte) ((value >> 8 * i) & 0xff); - } - return byteRet; - } - - public static double bytes2Double(byte[] arr) { - long value = 0; - for (int i = 0; i < 8; i++) { - value |= ((long) (arr[i] & 0xff)) << (8 * i); - } - return Double.longBitsToDouble(value); - } -} diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseConfigUtils.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseConfigUtils.java index 0d702cbd4f..a0d2fa152a 100644 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseConfigUtils.java +++ b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseConfigUtils.java @@ -24,11 +24,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.hbase.async.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; import java.nio.file.Paths; import java.util.Arrays; import java.util.List; @@ -58,10 +58,6 @@ public class HBaseConfigUtils { "hbase.client.kerberos.principal"; private static final String KEY_HBASE_SECURITY_AUTH_ENABLE = "hbase.security.auth.enable"; - - public static final String KEY_HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - public static final String KEY_HBASE_ZOOKEEPER_ZNODE_QUORUM = "hbase.zookeeper.znode.parent"; - public static final String KEY_ZOOKEEPER_SASL_CLIENT = "zookeeper.sasl.client"; public static final String KEY_JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; public static final String KEY_KEY_TAB = "hbase.keytab"; @@ -82,6 +78,21 @@ public static Configuration getConfig(Map hbaseConfigMap) { return hConfiguration; } + public static boolean isEnableKerberos(Configuration configuration) { + boolean hasAuthorization = + KRB_STR.equalsIgnoreCase(configuration.get(KEY_HBASE_SECURITY_AUTHORIZATION)); + boolean hasAuthentication = + KRB_STR.equalsIgnoreCase(configuration.get(KEY_HBASE_SECURITY_AUTHENTICATION)); + boolean hasAuthEnable = + Boolean.getBoolean(configuration.get(KEY_HBASE_SECURITY_AUTH_ENABLE)); + + if (hasAuthentication || hasAuthorization || hasAuthEnable) { + LOG.info("Enable kerberos for hbase."); + return true; + } + return false; + } + public static boolean isEnableKerberos(Map hbaseConfigMap) { boolean hasAuthorization = KRB_STR.equalsIgnoreCase( @@ -100,7 +111,7 @@ public static boolean isEnableKerberos(Map hbaseConfigMap) { return false; } - private static void setKerberosConf(Map hbaseConfigMap) { + public static void setKerberosConf(Map hbaseConfigMap) { hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTHORIZATION, KRB_STR); hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTHENTICATION, KRB_STR); hbaseConfigMap.put(KEY_HBASE_SECURITY_AUTH_ENABLE, true); @@ -113,7 +124,7 @@ public static Configuration getHadoopConfiguration(Map hbaseConf String.format("Must provide [%s] when authentication is Kerberos", key)); } } - return HBaseConfiguration.create(); + return HBaseConfigUtils.getConfig(hbaseConfigMap); } public static String getPrincipal(Map hbaseConfigMap) { @@ -125,17 +136,8 @@ public static String getPrincipal(Map hbaseConfigMap) { throw new IllegalArgumentException(KEY_PRINCIPAL + " is not set!"); } - public static String getKeytab(Map hbaseConfigMap) { - String keytab = MapUtils.getString(hbaseConfigMap, KEY_KEY_TAB); - if (StringUtils.isNotEmpty(keytab)) { - return keytab; - } - - throw new IllegalArgumentException(KEY_KEY_TAB + " is not exist"); - } - public static void fillSyncKerberosConfig( - Configuration config, Map hbaseConfigMap) throws IOException { + Configuration config, Map hbaseConfigMap) { if (StringUtils.isEmpty( MapUtils.getString(hbaseConfigMap, KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL))) { throw new IllegalArgumentException( @@ -149,13 +151,37 @@ public static void fillSyncKerberosConfig( HBaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionServerPrincipal); config.set(HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true"); config.set(HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, KRB_STR); + config.set(HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, KRB_STR); + + if (!StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, KEY_ZOOKEEPER_SASL_CLIENT))) { + System.setProperty( + HBaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, + MapUtils.getString(hbaseConfigMap, KEY_ZOOKEEPER_SASL_CLIENT)); + } + } + + public static void fillSyncKerberosConfig(Config config, Map hbaseConfigMap) { + if (StringUtils.isEmpty( + MapUtils.getString(hbaseConfigMap, KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL))) { + throw new IllegalArgumentException( + "Must provide region server Principal when authentication is Kerberos"); + } + + String regionServerPrincipal = + MapUtils.getString(hbaseConfigMap, KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL); + config.overrideConfig( + HBaseConfigUtils.KEY_HBASE_MASTER_KERBEROS_PRINCIPAL, regionServerPrincipal); + config.overrideConfig( + HBaseConfigUtils.KEY_HBASE_REGIONSERVER_KERBEROS_PRINCIPAL, regionServerPrincipal); + config.overrideConfig(HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, "true"); + config.overrideConfig(HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHENTICATION, KRB_STR); + config.overrideConfig(HBaseConfigUtils.KEY_HBASE_SECURITY_AUTHORIZATION, KRB_STR); if (!StringUtils.isEmpty(MapUtils.getString(hbaseConfigMap, KEY_ZOOKEEPER_SASL_CLIENT))) { System.setProperty( HBaseConfigUtils.KEY_ZOOKEEPER_SASL_CLIENT, MapUtils.getString(hbaseConfigMap, KEY_ZOOKEEPER_SASL_CLIENT)); } - loadKrb5Conf(hbaseConfigMap); } public static void loadKrb5Conf(Map config) { diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseReadWriteHelper.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseReadWriteHelper.java deleted file mode 100644 index 1628d91000..0000000000 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseReadWriteHelper.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flinkx.connector.hbase14.util; - -import com.dtstack.flinkx.connector.hbase.HBaseTableSchema; -import com.dtstack.flinkx.connector.hbase.HBaseTypeUtils; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.types.Row; - -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.Scan; - -import java.nio.charset.Charset; - -/** - * A read and write helper for HBase. The helper can used to create a {@link Scan} and {@link Get} - * for scanning or lookuping a HBase table, and create a {@link Put} and {@link Delete} for writing - * to HBase table, and supports converting the HBase {@link Result} to Flink {@link Row}. - */ -@Internal -public class HBaseReadWriteHelper { - - // family keys - private final byte[][] families; - // qualifier keys - private final byte[][][] qualifiers; - // qualifier types - private final int[][] qualifierTypes; - - // row key index in output row - private final int rowKeyIndex; - // type of row key - private final int rowKeyType; - - private final int fieldLength; - - // charset is not serializable - private final Charset charset; - - // row which is returned - private Row resultRow; - // nested family rows - private Row[] familyRows; - - public HBaseReadWriteHelper(HBaseTableSchema hbaseTableSchema) { - this.families = hbaseTableSchema.getFamilyKeys(); - this.qualifiers = new byte[this.families.length][][]; - this.qualifierTypes = new int[this.families.length][]; - this.familyRows = new Row[this.families.length]; - String[] familyNames = hbaseTableSchema.getFamilyNames(); - for (int f = 0; f < families.length; f++) { - this.qualifiers[f] = hbaseTableSchema.getQualifierKeys(familyNames[f]); - TypeInformation[] typeInfos = hbaseTableSchema.getQualifierTypes(familyNames[f]); - this.qualifierTypes[f] = new int[typeInfos.length]; - for (int i = 0; i < typeInfos.length; i++) { - qualifierTypes[f][i] = HBaseTypeUtils.getTypeIndex(typeInfos[i]); - } - this.familyRows[f] = new Row(typeInfos.length); - } - this.charset = Charset.forName(hbaseTableSchema.getStringCharset()); - // row key - this.rowKeyIndex = hbaseTableSchema.getRowKeyIndex(); - this.rowKeyType = - hbaseTableSchema.getRowKeyTypeInfo().map(HBaseTypeUtils::getTypeIndex).orElse(-1); - - // field length need take row key into account if it exists. - this.fieldLength = rowKeyIndex == -1 ? families.length : families.length + 1; - - // prepare output rows - this.resultRow = new Row(fieldLength); - } - - /** - * Returns an instance of Get that retrieves the matches records from the HBase table. - * - * @return The appropriate instance of Get for this use case. - */ - public Get createGet(Object rowKey) { - byte[] rowkey = HBaseTypeUtils.serializeFromObject(rowKey, rowKeyType, charset); - Get get = new Get(rowkey); - for (int f = 0; f < families.length; f++) { - byte[] family = families[f]; - for (byte[] qualifier : qualifiers[f]) { - get.addColumn(family, qualifier); - } - } - return get; - } - - /** - * Returns an instance of Scan that retrieves the required subset of records from the HBase - * table. - * - * @return The appropriate instance of Scan for this use case. - */ - public Scan createScan() { - Scan scan = new Scan(); - for (int f = 0; f < families.length; f++) { - byte[] family = families[f]; - for (int q = 0; q < qualifiers[f].length; q++) { - byte[] quantifier = qualifiers[f][q]; - scan.addColumn(family, quantifier); - } - } - return scan; - } - - /** Parses HBase {@link Result} into {@link Row}. */ - public Row parseToRow(Result result) { - if (rowKeyIndex == -1) { - return parseToRow(result, null); - } else { - Object rowkey = - HBaseTypeUtils.deserializeToObject(result.getRow(), rowKeyType, charset); - return parseToRow(result, rowkey); - } - } - - /** Parses HBase {@link Result} into {@link Row}. */ - public Row parseToRow(Result result, Object rowKey) { - for (int i = 0; i < fieldLength; i++) { - if (rowKeyIndex == i) { - resultRow.setField(rowKeyIndex, rowKey); - } else { - int f = (rowKeyIndex != -1 && i > rowKeyIndex) ? i - 1 : i; - // get family key - byte[] familyKey = families[f]; - Row familyRow = familyRows[f]; - for (int q = 0; q < this.qualifiers[f].length; q++) { - // get quantifier key - byte[] qualifier = qualifiers[f][q]; - // get quantifier type idx - int typeIdx = qualifierTypes[f][q]; - // read value - byte[] value = result.getValue(familyKey, qualifier); - if (value != null) { - familyRow.setField( - q, HBaseTypeUtils.deserializeToObject(value, typeIdx, charset)); - } else { - familyRow.setField(q, null); - } - } - resultRow.setField(i, familyRow); - } - } - return resultRow; - } - - /** - * Returns an instance of Put that writes record to HBase table. - * - * @return The appropriate instance of Put for this use case. - */ - public Put createPutMutation(Row row) { - assert rowKeyIndex != -1; - byte[] rowkey = - HBaseTypeUtils.serializeFromObject(row.getField(rowKeyIndex), rowKeyType, charset); - // upsert - Put put = new Put(rowkey); - for (int i = 0; i < fieldLength; i++) { - if (i != rowKeyIndex) { - int f = i > rowKeyIndex ? i - 1 : i; - // get family key - byte[] familyKey = families[f]; - Row familyRow = (Row) row.getField(i); - for (int q = 0; q < this.qualifiers[f].length; q++) { - // get quantifier key - byte[] qualifier = qualifiers[f][q]; - // get quantifier type idx - int typeIdx = qualifierTypes[f][q]; - // read value - byte[] value = - HBaseTypeUtils.serializeFromObject( - familyRow.getField(q), typeIdx, charset); - put.addColumn(familyKey, qualifier, value); - } - } - } - return put; - } - - /** - * Returns an instance of Delete that remove record from HBase table. - * - * @return The appropriate instance of Delete for this use case. - */ - public Delete createDeleteMutation(Row row) { - assert rowKeyIndex != -1; - byte[] rowkey = - HBaseTypeUtils.serializeFromObject(row.getField(rowKeyIndex), rowKeyType, charset); - // delete - Delete delete = new Delete(rowkey); - for (int i = 0; i < fieldLength; i++) { - if (i != rowKeyIndex) { - int f = i > rowKeyIndex ? i - 1 : i; - // get family key - byte[] familyKey = families[f]; - for (int q = 0; q < this.qualifiers[f].length; q++) { - // get quantifier key - byte[] qualifier = qualifiers[f][q]; - delete.addColumn(familyKey, qualifier); - } - } - } - return delete; - } -} diff --git a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseUtils.java b/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseUtils.java deleted file mode 100644 index bc7d25f9dc..0000000000 --- a/flinkx-connectors/flinkx-connector-hbase-1.4/src/main/java/com/dtstack/flinkx/connector/hbase14/util/HBaseUtils.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dtstack.flinkx.connector.hbase14.util; - -import org.apache.hadoop.hbase.util.Bytes; - -/** - * Reason: Date: 2018/9/10 Company: www.dtstack.com - * - * @author xuchao - */ -public class HBaseUtils { - - public static Object convertByte(byte[] hbaseData, String type) { - if (type == null) { - return new String(hbaseData); - } - - switch (type.toLowerCase()) { - case "boolean": - return Bytes.toBoolean(hbaseData); - - case "int": - return Bytes.toInt(hbaseData); - - case "bigint": - return Bytes.toLong(hbaseData); - - case "tinyint": - case "byte": - return ByteUtils.bytes2Byte(hbaseData); - - case "short": - case "smallint": - return Bytes.toShort(hbaseData); - - case "char": - case "varchar": - case "string": - return Bytes.toString(hbaseData); - - case "float": - return Bytes.toFloat(hbaseData); - - case "double": - return Bytes.toDouble(hbaseData); - case "decimal": - return Bytes.toBigDecimal(hbaseData); - default: - throw new RuntimeException("not support type of " + type); - } - } -} diff --git a/flinkx-connectors/flinkx-connector-hbase-base/pom.xml b/flinkx-connectors/flinkx-connector-hbase-base/pom.xml index 8797b9a5a3..5be016d537 100644 --- a/flinkx-connectors/flinkx-connector-hbase-base/pom.xml +++ b/flinkx-connectors/flinkx-connector-hbase-base/pom.xml @@ -115,10 +115,6 @@ org.slf4j slf4j-log4j12 - - netty-all - io.netty - diff --git a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseConfigurationUtil.java b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseConfigurationUtil.java new file mode 100644 index 0000000000..04d597d273 --- /dev/null +++ b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseConfigurationUtil.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.flinkx.connector.hbase; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.StringUtils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; + +/** This class helps to do serialization for hadoop Configuration and HBase-related classes. */ +@Internal +public class HBaseConfigurationUtil { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseConfigurationUtil.class); + + public static final String ENV_HBASE_CONF_DIR = "HBASE_CONF_DIR"; + + public static Configuration getHBaseConfiguration() { + + // Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from + // the classpath. + Configuration result = HBaseConfiguration.create(); + boolean foundHBaseConfiguration = false; + + // We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration + // The properties of a newly added resource will override the ones in previous resources, so + // a configuration + // file with higher priority should be added later. + + // Approach 1: HBASE_HOME environment variables + String possibleHBaseConfPath = null; + + final String hbaseHome = System.getenv("HBASE_HOME"); + if (hbaseHome != null) { + LOG.debug("Searching HBase configuration files in HBASE_HOME: {}", hbaseHome); + possibleHBaseConfPath = hbaseHome + "/conf"; + } + + if (possibleHBaseConfPath != null) { + foundHBaseConfiguration = addHBaseConfIfFound(result, possibleHBaseConfPath); + } + + // Approach 2: HBASE_CONF_DIR environment variable + String hbaseConfDir = System.getenv("HBASE_CONF_DIR"); + if (hbaseConfDir != null) { + LOG.debug("Searching HBase configuration files in HBASE_CONF_DIR: {}", hbaseConfDir); + foundHBaseConfiguration = + addHBaseConfIfFound(result, hbaseConfDir) || foundHBaseConfiguration; + } + + if (!foundHBaseConfiguration) { + LOG.warn( + "Could not find HBase configuration via any of the supported methods " + + "(Flink configuration, environment variables)."); + } + + return result; + } + + /** + * Search HBase configuration files in the given path, and add them to the configuration if + * found. + */ + private static boolean addHBaseConfIfFound( + Configuration configuration, String possibleHBaseConfPath) { + boolean foundHBaseConfiguration = false; + if (new File(possibleHBaseConfPath).exists()) { + if (new File(possibleHBaseConfPath + "/hbase-default.xml").exists()) { + configuration.addResource( + new org.apache.hadoop.fs.Path( + possibleHBaseConfPath + "/hbase-default.xml")); + LOG.debug( + "Adding " + + possibleHBaseConfPath + + "/hbase-default.xml to hbase configuration"); + foundHBaseConfiguration = true; + } + if (new File(possibleHBaseConfPath + "/hbase-site.xml").exists()) { + configuration.addResource( + new org.apache.hadoop.fs.Path(possibleHBaseConfPath + "/hbase-site.xml")); + LOG.debug( + "Adding " + + possibleHBaseConfPath + + "/hbase-site.xml to hbase configuration"); + foundHBaseConfiguration = true; + } + } + return foundHBaseConfiguration; + } + + /** Serialize a Hadoop {@link Configuration} into byte[]. */ + public static byte[] serializeConfiguration(Configuration conf) { + try { + return serializeWritable(conf); + } catch (IOException e) { + throw new RuntimeException( + "Encounter an IOException when serialize the Configuration.", e); + } + } + + /** + * Deserialize a Hadoop {@link Configuration} from byte[]. Deserialize configs to {@code + * targetConfig} if it is set. + */ + public static Configuration deserializeConfiguration( + byte[] serializedConfig, Configuration targetConfig) { + if (null == targetConfig) { + targetConfig = new Configuration(); + } + try { + deserializeWritable(targetConfig, serializedConfig); + } catch (IOException e) { + throw new RuntimeException( + "Encounter an IOException when deserialize the Configuration.", e); + } + return targetConfig; + } + + /** + * Serialize writable byte[]. + * + * @param the type parameter + * @param writable the writable + * @return the byte [ ] + * @throws IOException the io exception + */ + private static byte[] serializeWritable(T writable) throws IOException { + Preconditions.checkArgument(writable != null); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + writable.write(outputStream); + return byteArrayOutputStream.toByteArray(); + } + + /** + * Deserialize writable. + * + * @param the type parameter + * @param writable the writable + * @param bytes the bytes + * @throws IOException the io exception + */ + private static void deserializeWritable(T writable, byte[] bytes) + throws IOException { + Preconditions.checkArgument(writable != null); + Preconditions.checkArgument(bytes != null); + + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); + DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream); + writable.readFields(dataInputStream); + } + + public static Configuration createHBaseConf() { + Configuration hbaseClientConf = HBaseConfiguration.create(); + + String hbaseConfDir = System.getenv(ENV_HBASE_CONF_DIR); + + if (hbaseConfDir != null) { + if (new File(hbaseConfDir).exists()) { + String coreSite = hbaseConfDir + "/core-site.xml"; + String hdfsSite = hbaseConfDir + "/hdfs-site.xml"; + String hbaseSite = hbaseConfDir + "/hbase-site.xml"; + if (new File(coreSite).exists()) { + hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(coreSite)); + LOG.info("Adding " + coreSite + " to hbase configuration"); + } + if (new File(hdfsSite).exists()) { + hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hdfsSite)); + LOG.info("Adding " + hdfsSite + " to hbase configuration"); + } + if (new File(hbaseSite).exists()) { + hbaseClientConf.addResource(new org.apache.hadoop.fs.Path(hbaseSite)); + LOG.info("Adding " + hbaseSite + " to hbase configuration"); + } + } else { + LOG.warn( + "HBase config directory '{}' not found, cannot load HBase configuration.", + hbaseConfDir); + } + } else { + LOG.warn( + "{} env variable not found, cannot load HBase configuration.", + ENV_HBASE_CONF_DIR); + } + return hbaseClientConf; + } + + public static Configuration prepareRuntimeConfiguration(byte[] serializedConfig) { + // create default configuration from current runtime env (`hbase-site.xml` in classpath) + // first, + // and overwrite configuration using serialized configuration from client-side env + // (`hbase-site.xml` in classpath). + // user params from client-side have the highest priority + Configuration runtimeConfig = + HBaseConfigurationUtil.deserializeConfiguration( + serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration()); + + // do validation: check key option(s) in final runtime configuration + if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) { + LOG.error( + "can not connect to HBase without {} configuration", + HConstants.ZOOKEEPER_QUORUM); + throw new IllegalArgumentException( + "check HBase configuration failed, lost: '" + + HConstants.ZOOKEEPER_QUORUM + + "'!"); + } + + return runtimeConfig; + } +} diff --git a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseSerde.java b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseSerde.java index d09452e473..68e2d2812d 100644 --- a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseSerde.java +++ b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseSerde.java @@ -527,4 +527,19 @@ protected static FieldDecoder createTimestampDecoder() { return TimestampData.fromEpochMillis(milliseconds); }; } + + public byte[] getRowKey(Object rowKey) { + checkArgument(keyEncoder != null, "row key is not set."); + rowWithRowKey.setField(0, rowKey); + byte[] rowkey = keyEncoder.encode(rowWithRowKey, 0); + if (rowkey.length == 0) { + // drop dirty records, rowkey shouldn't be zero length + return null; + } + return rowkey; + } + + public Object getRowKey(byte[] rowKey) { + return keyDecoder.decode(rowKey); + } } diff --git a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseTableSchema.java b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseTableSchema.java index 814b1194d7..601472cdea 100644 --- a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseTableSchema.java +++ b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase/HBaseTableSchema.java @@ -49,6 +49,8 @@ public class HBaseTableSchema implements Serializable { private static final long serialVersionUID = 1L; + private String tableName; + // A Map with key as column family. private final Map> familyMap = new LinkedHashMap<>(); @@ -58,6 +60,14 @@ public class HBaseTableSchema implements Serializable { // charset to parse HBase keys and strings. UTF-8 by default. private String charset = "UTF-8"; + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + /** * Adds a column defined by family, qualifier, and type to the table schema. * diff --git a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/lookup/RedisAllTableFunction.java b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/lookup/RedisAllTableFunction.java index 771b6fd1ef..012e432d2f 100644 --- a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/lookup/RedisAllTableFunction.java +++ b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/lookup/RedisAllTableFunction.java @@ -72,7 +72,9 @@ public void eval(Object... keys) { keyPattern .append("_") .append(Arrays.stream(keys).map(String::valueOf).collect(Collectors.joining("_"))); - List> cacheList = cacheRef.get().get(keyPattern.toString()); + List> cacheList = + ((Map>>) cacheRef.get()) + .get(keyPattern.toString()); // 有数据才往下发,(左/内)连接flink会做相应的处理 if (!CollectionUtils.isEmpty(cacheList)) { diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/lookup/AbstractAllTableFunction.java b/flinkx-core/src/main/java/com/dtstack/flinkx/lookup/AbstractAllTableFunction.java index 331f2823b6..2a1b5ed6ab 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/lookup/AbstractAllTableFunction.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/lookup/AbstractAllTableFunction.java @@ -55,8 +55,7 @@ public abstract class AbstractAllTableFunction extends TableFunction { /** 和维表join字段的名称 */ protected final String[] keyNames; /** 缓存 */ - protected AtomicReference>>> cacheRef = - new AtomicReference<>(); + protected AtomicReference cacheRef = new AtomicReference<>(); /** 定时加载 */ private ScheduledExecutorService es; /** 维表配置 */ @@ -148,7 +147,8 @@ protected void buildCache( */ public void eval(Object... keys) { String cacheKey = Arrays.stream(keys).map(String::valueOf).collect(Collectors.joining("_")); - List> cacheList = cacheRef.get().get(cacheKey); + List> cacheList = + ((Map>>) (cacheRef.get())).get(cacheKey); // 有数据才往下发,(左/内)连接flink会做相应的处理 if (!CollectionUtils.isEmpty(cacheList)) { cacheList.forEach(one -> collect(fillData(one)));