diff --git a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java index ac642c29d1..3d0bc48604 100644 --- a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogColumnConverter.java @@ -41,6 +41,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -67,7 +68,7 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce String schema = binlogEventRow.getSchema(); String table = binlogEventRow.getTable(); String key = schema + ConstantValue.POINT_SYMBOL + table; - IDeserializationConverter[] converters = super.cdcConverterCacheMap.get(key); + List converters = super.cdcConverterCacheMap.get(key); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (converters == null) { List list = rowData.getBeforeColumnsList(); @@ -75,9 +76,10 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce list = rowData.getAfterColumnsList(); } converters = - list.stream() - .map(x -> createInternalConverter(x.getMysqlType())) - .toArray(IDeserializationConverter[]::new); + Arrays.asList( + list.stream() + .map(x -> createInternalConverter(x.getMysqlType())) + .toArray(IDeserializationConverter[]::new)); cdcConverterCacheMap.put(key, converters); } @@ -162,7 +164,7 @@ public LinkedList toInternal(BinlogEventRow binlogEventRow) throws Exce * @param after */ private void parseColumnList( - IDeserializationConverter[] converters, + List converters, List entryColumnList, List columnList, List headerList, @@ -171,7 +173,8 @@ private void parseColumnList( for (int i = 0; i < entryColumnList.size(); i++) { CanalEntry.Column entryColumn = entryColumnList.get(i); if (!entryColumn.getIsNull()) { - AbstractBaseColumn column = converters[i].deserialize(entryColumn.getValue()); + AbstractBaseColumn column = + (AbstractBaseColumn) converters.get(i).deserialize(entryColumn.getValue()); columnList.add(column); } else { columnList.add(new NullColumn()); diff --git a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogRowConverter.java b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogRowConverter.java index 19f4968903..4f0a4993a0 100644 --- a/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogRowConverter.java +++ b/flinkx-connectors/flinkx-connector-binlog/src/main/java/com/dtstack/flinkx/connector/binlog/converter/BinlogRowConverter.java @@ -44,6 +44,7 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalQueries; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -59,9 +60,9 @@ public class BinlogRowConverter extends AbstractCDCRowConverter(); for (int i = 0; i < rowType.getFieldCount(); i++) { - super.converters[i] = createInternalConverter(rowType.getTypeAt(i)); + super.converters.add(createInternalConverter(rowType.getTypeAt(i))); } } diff --git a/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraColumnConverter.java b/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraColumnConverter.java index a8c9e57cad..ed5083f72d 100644 --- a/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraColumnConverter.java @@ -61,13 +61,13 @@ public CassandraColumnConverter(RowType rowType, List fieldConfList) super(rowType); this.fieldConfList = fieldConfList; for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(fieldConfList.get(i).getType())); - toExternalConverters[i] = + createInternalConverter(fieldConfList.get(i).getType()))); + toExternalConverters.add( wrapIntoNullableExternalConverter( createExternalConverter(fieldConfList.get(i).getType()), - fieldConfList.get(i).getType()); + fieldConfList.get(i).getType())); } } @@ -87,14 +87,14 @@ public ISerializationConverter wrapIntoNullableExternalConverter @Override @SuppressWarnings("unchecked") public RowData toInternal(ResultSet resultSet) throws Exception { - ColumnRowData columnRowData = new ColumnRowData(toInternalConverters.length); + ColumnRowData columnRowData = new ColumnRowData(toInternalConverters.size()); Row row = resultSet.one(); - for (int i = 0; i < toInternalConverters.length; i++) { + for (int i = 0; i < toInternalConverters.size(); i++) { final Object value = row.getObject(i); columnRowData.setField( - i, (AbstractBaseColumn) toInternalConverters[i].deserialize(value)); + i, (AbstractBaseColumn) toInternalConverters.get(i).deserialize(value)); } return columnRowData; } @@ -103,7 +103,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { @SuppressWarnings("unchecked") public BoundStatement toExternal(RowData rowData, BoundStatement statement) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, statement); + toExternalConverters.get(index).serialize(rowData, index, statement); } return statement; } diff --git a/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraRowConverter.java b/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraRowConverter.java index a5105a9f78..1042380e7a 100644 --- a/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraRowConverter.java +++ b/flinkx-connectors/flinkx-connector-cassandra/src/main/java/com/dtstack/flinkx/connector/cassandra/converter/CassandraRowConverter.java @@ -59,12 +59,12 @@ public CassandraRowConverter(RowType rowType, List columnNameList) { super(rowType); this.columnNameList = columnNameList; for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -84,7 +84,7 @@ private RowData deserializeInput(Row input) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = input.getObject(pos); - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -92,7 +92,7 @@ private RowData deserializeInput(Row input) throws Exception { @Override public BoundStatement toExternal(RowData rowData, BoundStatement statement) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, statement); + toExternalConverters.get(index).serialize(rowData, index, statement); } return statement; } diff --git a/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6ColumnConverter.java b/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6ColumnConverter.java index 6cae1264f3..746a5dd44c 100644 --- a/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6ColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6ColumnConverter.java @@ -68,12 +68,12 @@ public Elasticsearch6ColumnConverter(RowType rowType) { super(rowType); List fieldNames = rowType.getFieldNames(); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); typeIndexList.add(new Tuple3<>(fieldNames.get(i), i, rowType.getTypeAt(i))); } } @@ -96,7 +96,7 @@ protected ISerializationConverter wrapIntoNullableExternalConverter( @Override public RowData toInternal(Map input) throws Exception { ColumnRowData columnRowData = new ColumnRowData(rowType.getFieldCount()); - for (int i = 0; i < toInternalConverters.length; i++) { + for (int i = 0; i < toInternalConverters.size(); i++) { final int index = i; List> collect = typeIndexList.stream() @@ -110,7 +110,8 @@ public RowData toInternal(Map input) throws Exception { Tuple3 typeTuple = collect.get(0); Object field = input.get(typeTuple._1()); - columnRowData.addField((AbstractBaseColumn) toInternalConverters[i].deserialize(field)); + columnRowData.addField( + (AbstractBaseColumn) toInternalConverters.get(i).deserialize(field)); } return columnRowData; } @@ -119,7 +120,7 @@ public RowData toInternal(Map input) throws Exception { public Map toExternal(RowData rowData, Map output) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, output); + toExternalConverters.get(index).serialize(rowData, index, output); } return output; } diff --git a/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6RowConverter.java b/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6RowConverter.java index 0f5bcf424f..76fb7319f4 100644 --- a/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6RowConverter.java +++ b/flinkx-connectors/flinkx-connector-elasticsearch6/src/main/java/com/dtstack/flinkx/connector/elasticsearch6/converter/Elasticsearch6RowConverter.java @@ -70,12 +70,12 @@ public Elasticsearch6RowConverter(RowType rowType) { super(rowType); List fieldNames = rowType.getFieldNames(); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); typeIndexList.add(new Tuple3<>(fieldNames.get(i), i, rowType.getTypeAt(i))); } } @@ -121,7 +121,7 @@ private GenericRowData genericRowData(Map input) throws Exceptio Tuple3 typeTuple = collect.get(0); genericRowData.setField( typeTuple._2(), - toInternalConverters[typeTuple._2()].deserialize(input.get(key))); + toInternalConverters.get(typeTuple._2()).deserialize(input.get(key))); } return genericRowData; } @@ -130,7 +130,7 @@ private GenericRowData genericRowData(Map input) throws Exceptio public Map toExternal(RowData rowData, Map output) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, output); + toExternalConverters.get(index).serialize(rowData, index, output); } return output; } diff --git a/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/converter/FtpColumnConverter.java b/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/converter/FtpColumnConverter.java index cc3b38c4e1..269d32cd30 100644 --- a/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/converter/FtpColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-ftp/src/main/java/com/dtstack/flinkx/connector/ftp/converter/FtpColumnConverter.java @@ -54,11 +54,11 @@ public FtpColumnConverter(FtpConfig ftpConfig) { for (int i = 0; i < ftpConfig.getColumn().size(); i++) { FieldConf fieldConf = ftpConfig.getColumn().get(i); - toInternalConverters[i] = - wrapIntoNullableInternalConverter(createInternalConverter(fieldConf)); - toExternalConverters[i] = + toInternalConverters.add( + wrapIntoNullableInternalConverter(createInternalConverter(fieldConf))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldConf), fieldConf); + createExternalConverter(fieldConf), fieldConf)); } } @@ -68,7 +68,8 @@ public RowData toInternal(RowData input) throws Exception { if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; for (int i = 0; i < input.getArity(); i++) { - row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i))); + row.setField( + i, toInternalConverters.get(i).deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -85,7 +86,7 @@ public String toExternal(RowData rowData, String output) throws Exception { List columnData = new ArrayList<>(ftpConfig.getColumn().size()); for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, columnData); + toExternalConverters.get(index).serialize(rowData, index, columnData); if (index != 0) { sb.append(ftpConfig.getFieldDelimiter()); } diff --git a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseColumnConverter.java b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseColumnConverter.java index 7794e1208f..1a7c26db1f 100644 --- a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseColumnConverter.java @@ -64,10 +64,10 @@ public HBaseColumnConverter(List fieldConfList) { if (left > 0 && right > 0) { type = type.substring(0, left); } - toInternalConverters[i] = - wrapIntoNullableInternalConverter(createInternalConverter(type)); - toExternalConverters[i] = - wrapIntoNullableExternalConverter(createExternalConverter(type), type); + toInternalConverters.add( + wrapIntoNullableInternalConverter(createInternalConverter(type))); + toExternalConverters.add( + wrapIntoNullableExternalConverter(createExternalConverter(type), type)); } } @@ -78,7 +78,8 @@ public RowData toInternal(RowData input) throws Exception { if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; for (int i = 0; i < input.getArity(); i++) { - row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i))); + row.setField( + i, toInternalConverters.get(i).deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -92,7 +93,7 @@ public RowData toInternal(RowData input) throws Exception { @SuppressWarnings("unchecked") public Object[] toExternal(RowData rowData, Object[] data) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, data); + toExternalConverters.get(index).serialize(rowData, index, data); } return data; } diff --git a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseConverter.java b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseConverter.java index 9e23b7569d..671297a7b0 100644 --- a/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseConverter.java +++ b/flinkx-connectors/flinkx-connector-hbase-base/src/main/java/com/dtstack/flinkx/connector/hbase14/HBaseConverter.java @@ -36,12 +36,12 @@ public class HBaseConverter extends AbstractRowConverter fieldConfList) { if (left > 0 && right > 0) { type = type.substring(0, left); } - toInternalConverters[i] = - wrapIntoNullableInternalConverter(createInternalConverter(type)); - toExternalConverters[i] = - wrapIntoNullableExternalConverter(createExternalConverter(type), type); + toInternalConverters.add( + wrapIntoNullableInternalConverter(createInternalConverter(type))); + toExternalConverters.add( + wrapIntoNullableExternalConverter(createExternalConverter(type), type)); } } @@ -84,7 +84,9 @@ public RowData toInternal(RowData input) throws Exception { for (int i = 0; i < input.getArity(); i++) { row.addField( (AbstractBaseColumn) - toInternalConverters[i].deserialize(genericRowData.getField(i))); + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -99,7 +101,7 @@ public RowData toInternal(RowData input) throws Exception { @SuppressWarnings("unchecked") public Object[] toExternal(RowData rowData, Object[] data) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, data); + toExternalConverters.get(index).serialize(rowData, index, data); } return data; } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsOrcRowConverter.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsOrcRowConverter.java index ed0e294452..4db16e5921 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsOrcRowConverter.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsOrcRowConverter.java @@ -54,12 +54,12 @@ public class HdfsOrcRowConverter public HdfsOrcRowConverter(RowType rowType) { super(rowType); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -70,7 +70,8 @@ public RowData toInternal(RowData input) throws Exception { if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; for (int i = 0; i < input.getArity(); i++) { - row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i))); + row.setField( + i, toInternalConverters.get(i).deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -85,7 +86,7 @@ public RowData toInternal(RowData input) throws Exception { @SuppressWarnings("unchecked") public Object[] toExternal(RowData rowData, Object[] data) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, data); + toExternalConverters.get(index).serialize(rowData, index, data); } return data; } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetColumnConverter.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetColumnConverter.java index 13cc67f8f8..5a88882bd0 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetColumnConverter.java @@ -72,10 +72,10 @@ public HdfsParquetColumnConverter(List fieldConfList) { if (left > 0 && right > 0) { type = type.substring(0, left); } - toInternalConverters[i] = - wrapIntoNullableInternalConverter(createInternalConverter(type)); - toExternalConverters[i] = - wrapIntoNullableExternalConverter(createExternalConverter(type), type); + toInternalConverters.add( + wrapIntoNullableInternalConverter(createInternalConverter(type))); + toExternalConverters.add( + wrapIntoNullableExternalConverter(createExternalConverter(type), type)); } } @@ -88,7 +88,9 @@ public RowData toInternal(RowData input) throws Exception { for (int i = 0; i < input.getArity(); i++) { row.addField( (AbstractBaseColumn) - toInternalConverters[i].deserialize(genericRowData.getField(i))); + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -103,7 +105,7 @@ public RowData toInternal(RowData input) throws Exception { @SuppressWarnings("unchecked") public Group toExternal(RowData rowData, Group group) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, group); + toExternalConverters.get(index).serialize(rowData, index, group); } return group; } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetRowConverter.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetRowConverter.java index 6bfdcee8ee..24eaf6eb09 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetRowConverter.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsParquetRowConverter.java @@ -60,12 +60,12 @@ public class HdfsParquetRowConverter public HdfsParquetRowConverter(RowType rowType) { super(rowType); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -76,7 +76,8 @@ public RowData toInternal(RowData input) throws Exception { if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; for (int i = 0; i < input.getArity(); i++) { - row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i))); + row.setField( + i, toInternalConverters.get(i).deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -91,7 +92,7 @@ public RowData toInternal(RowData input) throws Exception { @SuppressWarnings("unchecked") public Group toExternal(RowData rowData, Group group) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, group); + toExternalConverters.get(index).serialize(rowData, index, group); } return group; } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextColumnConverter.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextColumnConverter.java index 7833f02ef2..0d5442f8ff 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextColumnConverter.java @@ -57,10 +57,10 @@ public HdfsTextColumnConverter(List fieldConfList) { if (left > 0 && right > 0) { type = type.substring(0, left); } - toInternalConverters[i] = - wrapIntoNullableInternalConverter(createInternalConverter(type)); - toExternalConverters[i] = - wrapIntoNullableExternalConverter(createExternalConverter(type), type); + toInternalConverters.add( + wrapIntoNullableInternalConverter(createInternalConverter(type))); + toExternalConverters.add( + wrapIntoNullableExternalConverter(createExternalConverter(type), type)); } } @@ -73,7 +73,9 @@ public RowData toInternal(RowData input) throws Exception { for (int i = 0; i < input.getArity(); i++) { row.addField( (AbstractBaseColumn) - toInternalConverters[i].deserialize(genericRowData.getField(i))); + toInternalConverters + .get(i) + .deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -88,7 +90,7 @@ public RowData toInternal(RowData input) throws Exception { @SuppressWarnings("unchecked") public String[] toExternal(RowData rowData, String[] data) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, data); + toExternalConverters.get(index).serialize(rowData, index, data); } return data; } diff --git a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextRowConverter.java b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextRowConverter.java index 2977f8472c..59230d3181 100644 --- a/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextRowConverter.java +++ b/flinkx-connectors/flinkx-connector-hdfs/src/main/java/com/dtstack/flinkx/connector/hdfs/converter/HdfsTextRowConverter.java @@ -55,12 +55,12 @@ public class HdfsTextRowConverter public HdfsTextRowConverter(RowType rowType) { super(rowType); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -71,7 +71,8 @@ public RowData toInternal(RowData input) throws Exception { if (input instanceof GenericRowData) { GenericRowData genericRowData = (GenericRowData) input; for (int i = 0; i < input.getArity(); i++) { - row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i))); + row.setField( + i, toInternalConverters.get(i).deserialize(genericRowData.getField(i))); } } else { throw new FlinkxRuntimeException( @@ -86,7 +87,7 @@ public RowData toInternal(RowData input) throws Exception { @SuppressWarnings("unchecked") public String[] toExternal(RowData rowData, String[] data) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, data); + toExternalConverters.get(index).serialize(rowData, index, data); } return data; } diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java index 6a207167fb..694fef05fb 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcColumnConverter.java @@ -58,12 +58,12 @@ public JdbcColumnConverter(RowType rowType) { public JdbcColumnConverter(RowType rowType, FlinkxCommonConf commonConf) { super(rowType, commonConf); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -92,7 +92,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { Object field = resultSet.getObject(converterIndex + 1); baseColumn = (AbstractBaseColumn) - toInternalConverters[converterIndex].deserialize(field); + toInternalConverters.get(converterIndex).deserialize(field); converterIndex++; } result.addField(assembleFieldProps(fieldConf, baseColumn)); @@ -104,7 +104,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { public FieldNamedPreparedStatement toExternal( RowData rowData, FieldNamedPreparedStatement statement) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, statement); + toExternalConverters.get(index).serialize(rowData, index, statement); } return statement; } diff --git a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcRowConverter.java b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcRowConverter.java index 81697a44dc..1613a133ae 100644 --- a/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcRowConverter.java +++ b/flinkx-connectors/flinkx-connector-jdbc-base/src/main/java/com/dtstack/flinkx/connector/jdbc/converter/JdbcRowConverter.java @@ -57,12 +57,12 @@ public class JdbcRowConverter public JdbcRowConverter(RowType rowType) { super(rowType); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -91,7 +91,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = resultSet.getObject(pos + 1); - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -101,7 +101,7 @@ public RowData toInternalLookup(JsonArray jsonArray) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = jsonArray.getValue(pos); - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -110,7 +110,7 @@ public RowData toInternalLookup(JsonArray jsonArray) throws Exception { public FieldNamedPreparedStatement toExternal( RowData rowData, FieldNamedPreparedStatement statement) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, statement); + toExternalConverters.get(index).serialize(rowData, index, statement); } return statement; } diff --git a/flinkx-connectors/flinkx-connector-kafka/src/main/java/com/dtstack/flinkx/connector/kafka/converter/KafkaColumnConverter.java b/flinkx-connectors/flinkx-connector-kafka/src/main/java/com/dtstack/flinkx/connector/kafka/converter/KafkaColumnConverter.java index 3be24ff022..5ac9f5103d 100644 --- a/flinkx-connectors/flinkx-connector-kafka/src/main/java/com/dtstack/flinkx/connector/kafka/converter/KafkaColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-kafka/src/main/java/com/dtstack/flinkx/connector/kafka/converter/KafkaColumnConverter.java @@ -42,6 +42,7 @@ import java.math.BigDecimal; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -94,10 +95,10 @@ public KafkaColumnConverter(KafkaConf kafkaConf) { kafkaConf.getColumn().stream() .map(FieldConf::getType) .collect(Collectors.toList()); - this.toInternalConverters = new IDeserializationConverter[typeList.size()]; - for (int i = 0; i < typeList.size(); i++) { - toInternalConverters[i] = - wrapIntoNullableInternalConverter(createInternalConverter(typeList.get(i))); + this.toInternalConverters = new ArrayList<>(); + for (String s : typeList) { + toInternalConverters.add( + wrapIntoNullableInternalConverter(createInternalConverter(s))); } } } @@ -106,7 +107,7 @@ public KafkaColumnConverter(KafkaConf kafkaConf) { public RowData toInternal(String input) throws Exception { Map map = decode.decode(input); ColumnRowData result; - if (toInternalConverters == null || toInternalConverters.length == 0) { + if (toInternalConverters == null || toInternalConverters.size() == 0) { result = new ColumnRowData(1); result.addField(new MapColumn(map)); } else { @@ -116,7 +117,7 @@ public RowData toInternal(String input) throws Exception { FieldConf fieldConf = fieldConfList.get(i); Object value = map.get(fieldConf.getName()); AbstractBaseColumn baseColumn = - (AbstractBaseColumn) toInternalConverters[i].deserialize(value); + (AbstractBaseColumn) toInternalConverters.get(i).deserialize(value); result.addField(assembleFieldProps(fieldConf, baseColumn)); } } diff --git a/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduColumnConverter.java b/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduColumnConverter.java index 207c5a3d6d..e1132534ac 100644 --- a/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduColumnConverter.java @@ -59,13 +59,13 @@ public KuduColumnConverter(RowType rowType, List columnName) { super(rowType); this.columnName = columnName; for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i).getTypeRoot().name())); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i).getTypeRoot().name()))); + toExternalConverters.add( wrapIntoNullableExternalConverter( createExternalConverter(rowType.getTypeAt(i).getTypeRoot().name()), - rowType.getTypeAt(i).getTypeRoot().name()); + rowType.getTypeAt(i).getTypeRoot().name())); } } @@ -88,7 +88,7 @@ public RowData toInternal(RowResult input) throws Exception { ColumnRowData data = new ColumnRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = input.getObject(pos); - data.addField((AbstractBaseColumn) toInternalConverters[pos].deserialize(field)); + data.addField((AbstractBaseColumn) toInternalConverters.get(pos).deserialize(field)); } return data; } @@ -97,7 +97,7 @@ public RowData toInternal(RowResult input) throws Exception { @SuppressWarnings("unchecked") public Operation toExternal(RowData rowData, Operation operation) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, operation); + toExternalConverters.get(index).serialize(rowData, index, operation); } return operation; } diff --git a/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduRowConverter.java b/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduRowConverter.java index 047ebfeca3..7a90afd666 100644 --- a/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduRowConverter.java +++ b/flinkx-connectors/flinkx-connector-kudu/src/main/java/com/dtstack/flinkx/connector/kudu/converter/KuduRowConverter.java @@ -58,12 +58,12 @@ public KuduRowConverter(RowType rowType, List columnName) { super(rowType); this.columnName = columnName; for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i)); + createExternalConverter(rowType.getTypeAt(i)), rowType.getTypeAt(i))); } } @@ -97,7 +97,7 @@ private RowData deserializeInput(RowResult input) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = input.getObject(pos); - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -105,7 +105,7 @@ private RowData deserializeInput(RowResult input) throws Exception { @Override public Operation toExternal(RowData rowData, Operation operation) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, operation); + toExternalConverters.get(index).serialize(rowData, index, operation); } return operation; } diff --git a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbColumnConverter.java b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbColumnConverter.java index 70c6d6383a..9f39482ea6 100644 --- a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbColumnConverter.java @@ -39,6 +39,8 @@ import java.sql.Date; import java.sql.Time; +import java.util.ArrayList; +import java.util.List; /** * @author Ada Wong @@ -48,22 +50,22 @@ public class MongodbColumnConverter extends AbstractRowConverter { - private final MongoDeserializationConverter[] toInternalConverters; - private final MongoSerializationConverter[] toExternalConverters; + private final List toInternalConverters; + private final List toExternalConverters; private final String[] fieldNames; public MongodbColumnConverter(RowType rowType, String[] fieldNames) { super(rowType); this.fieldNames = fieldNames; - toInternalConverters = new MongoDeserializationConverter[rowType.getFieldCount()]; - toExternalConverters = new MongoSerializationConverter[rowType.getFieldCount()]; + toInternalConverters = new ArrayList<>(); + toExternalConverters = new ArrayList<>(); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createMongoInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createMongoInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableMongodbExternalConverter( - createMongodbExternalConverter(fieldTypes[i]), fieldTypes[i]); + createMongodbExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -93,13 +95,13 @@ protected MongoSerializationConverter wrapIntoNullableMongodbExternalConverter( @Override public RowData toInternal(Document document) { - ColumnRowData data = new ColumnRowData(toInternalConverters.length); - for (int pos = 0; pos < toInternalConverters.length; pos++) { + ColumnRowData data = new ColumnRowData(toInternalConverters.size()); + for (int pos = 0; pos < toInternalConverters.size(); pos++) { Object field = document.get(fieldNames[pos]); if (field instanceof ObjectId) { field = field.toString(); } - data.addField((AbstractBaseColumn) toInternalConverters[pos].deserialize(field)); + data.addField((AbstractBaseColumn) toInternalConverters.get(pos).deserialize(field)); } return data; } @@ -107,7 +109,7 @@ public RowData toInternal(Document document) { @Override public Document toExternal(RowData rowData, Document document) { for (int pos = 0; pos < rowData.getArity(); pos++) { - toExternalConverters[pos].serialize(rowData, pos, fieldNames[pos], document); + toExternalConverters.get(pos).serialize(rowData, pos, fieldNames[pos], document); } return document; } diff --git a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbRowConverter.java b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbRowConverter.java index e1a73c3818..d0d6afefd8 100644 --- a/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbRowConverter.java +++ b/flinkx-connectors/flinkx-connector-mongodb/src/main/java/com/dtstack/flinkx/connector/mongodb/converter/MongodbRowConverter.java @@ -41,6 +41,8 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalTime; +import java.util.ArrayList; +import java.util.List; /** * @author Ada Wong @@ -50,22 +52,22 @@ public final class MongodbRowConverter extends AbstractRowConverter { - private final MongoDeserializationConverter[] toInternalConverters; - private final MongoSerializationConverter[] toExternalConverters; + private final List toInternalConverters; + private final List toExternalConverters; private final String[] fieldNames; public MongodbRowConverter(RowType rowType, String[] fieldNames) { super(rowType); this.fieldNames = fieldNames; - toInternalConverters = new MongoDeserializationConverter[rowType.getFieldCount()]; - toExternalConverters = new MongoSerializationConverter[rowType.getFieldCount()]; + toInternalConverters = new ArrayList<>(); + toExternalConverters = new ArrayList<>(); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createMongoInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createMongoInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableMongodbExternalConverter( - createMongodbExternalConverter(fieldTypes[i]), fieldTypes[i]); + createMongodbExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -101,7 +103,7 @@ public RowData toInternal(Document document) { if (field instanceof ObjectId) { field = field.toString(); } - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -114,7 +116,7 @@ public RowData toInternalLookup(Document document) { @Override public Document toExternal(RowData rowData, Document document) { for (int pos = 0; pos < rowData.getArity(); pos++) { - toExternalConverters[pos].serialize(rowData, pos, fieldNames[pos], document); + toExternalConverters.get(pos).serialize(rowData, pos, fieldNames[pos], document); } return document; } diff --git a/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerColumnConverter.java b/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerColumnConverter.java index 1ff309d27f..299a9e8736 100644 --- a/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerColumnConverter.java @@ -48,6 +48,7 @@ import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalQueries; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -83,7 +84,7 @@ public LinkedList toInternal(EventRow eventRow) throws Exception { String schema = eventRow.getSchema(); String table = eventRow.getTable(); String key = schema + ConstantValue.POINT_SYMBOL + table; - IDeserializationConverter[] converters = super.cdcConverterCacheMap.get(key); + List converters = super.cdcConverterCacheMap.get(key); TableMetaData metadata = tableMetaDataCacheMap.get(key); List beforeColumnList = eventRow.getBeforeColumnList(); @@ -91,7 +92,7 @@ public LinkedList toInternal(EventRow eventRow) throws Exception { // 如果缓存为空 或者 长度变了 或者名字变了 重新更新缓存 if (Objects.isNull(converters) || Objects.isNull(metadata) - || beforeColumnList.size() != converters.length + || beforeColumnList.size() != converters.size() || !beforeColumnList.stream() .map(EventRowData::getName) .collect(Collectors.toCollection(HashSet::new)) @@ -99,10 +100,13 @@ public LinkedList toInternal(EventRow eventRow) throws Exception { Pair, List> latestMetaData = JdbcUtil.getTableMetaData(schema, table, connection); converters = - latestMetaData.getRight().stream() - .map(x -> wrapIntoNullableInternalConverter(createInternalConverter(x))) - .toArray(IDeserializationConverter[]::new); - + Arrays.asList( + latestMetaData.getRight().stream() + .map( + x -> + wrapIntoNullableInternalConverter( + createInternalConverter(x))) + .toArray(IDeserializationConverter[]::new)); metadata = new TableMetaData( schema, table, latestMetaData.getLeft(), latestMetaData.getRight()); @@ -199,7 +203,7 @@ public LinkedList toInternal(EventRow eventRow) throws Exception { * @param prefix after_/before_ */ private void parseColumnList( - IDeserializationConverter[] converters, + List converters, List fieldList, List entryColumnList, List columnList, @@ -220,7 +224,8 @@ private void parseColumnList( + GsonUtil.GSON.toJson(fieldList)); } - AbstractBaseColumn column = converters[index].deserialize(entryColumn.getData()); + AbstractBaseColumn column = + (AbstractBaseColumn) converters.get(index).deserialize(entryColumn.getData()); columnList.add(column); headerList.add(prefix + entryColumn.getName()); } diff --git a/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerRowConverter.java b/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerRowConverter.java index 06f4f2950b..222a9d7995 100644 --- a/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerRowConverter.java +++ b/flinkx-connectors/flinkx-connector-oraclelogminer/src/main/java/com/dtstack/flinkx/connector/oraclelogminer/converter/LogMinerRowConverter.java @@ -42,6 +42,7 @@ import java.time.LocalTime; import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalQueries; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -55,9 +56,9 @@ public class LogMinerRowConverter extends AbstractCDCRowConverter(); for (int i = 0; i < rowType.getFieldCount(); i++) { - super.converters[i] = createInternalConverter(rowType.getTypeAt(i)); + super.converters.add(createInternalConverter(rowType.getTypeAt(i))); } } diff --git a/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalColumnConverter.java b/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalColumnConverter.java index 012f4d0347..4b3d1d01b3 100644 --- a/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalColumnConverter.java @@ -57,6 +57,7 @@ import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -107,20 +108,21 @@ public LinkedList toInternal(ChangeLog entity) throws Exception { String schema = entity.getSchema(); String table = entity.getTable(); String key = schema + ConstantValue.POINT_SYMBOL + table; - IDeserializationConverter[] converters = cdcConverterCacheMap.get(key); + List converters = cdcConverterCacheMap.get(key); List columnList = entity.getColumnList(); if (converters == null || needChange(entity)) { cdcConverterCacheMap.put( key, - columnList.stream() - .map( - column -> - createInternalConverter( - translateDataType(column.getType()) - .getLogicalType())) - .toArray(IDeserializationConverter[]::new)); + Arrays.asList( + columnList.stream() + .map( + column -> + createInternalConverter( + translateDataType(column.getType()) + .getLogicalType())) + .toArray(IDeserializationConverter[]::new))); } converters = cdcConverterCacheMap.get(key); @@ -268,7 +270,7 @@ private Map processColumnList(List columnList, Objec * @return */ private List parseColumnList( - IDeserializationConverter[] converters, + List converters, List entryColumnList, List columnList, List headerList, @@ -279,7 +281,8 @@ private List parseColumnList( Object entryColumn = entryColumnList.get(i); if (entryColumn != null) { AbstractBaseColumn column = - converters[i].deserialize(entryColumnList.get(i).toString()); + (AbstractBaseColumn) + converters.get(i).deserialize(entryColumnList.get(i).toString()); columnList.add(column); originList.add(after + headerList.get(i)); } diff --git a/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalRowConverter.java b/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalRowConverter.java index c3853000d2..1353286e77 100644 --- a/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalRowConverter.java +++ b/flinkx-connectors/flinkx-connector-pgwal/src/main/java/com/dtstack/flinkx/connector/pgwal/converter/PGWalRowConverter.java @@ -42,6 +42,7 @@ import java.time.format.DateTimeFormatter; import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalQueries; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; @@ -53,9 +54,9 @@ public class PGWalRowConverter extends AbstractCDCRowConverter(); for (int i = 0; i < rowType.getFieldCount(); i++) { - super.converters[i] = createInternalConverter(rowType.getTypeAt(i)); + super.converters.add(createInternalConverter(rowType.getTypeAt(i))); } } diff --git a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseColumnConverter.java b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseColumnConverter.java index bffac4f83b..2986d31341 100644 --- a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseColumnConverter.java @@ -96,9 +96,9 @@ public HBaseColumnConverter(RowType rowType, RowProjector rowProjector) { phoenixTypeList = new ArrayList<>(fieldNames.size()); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); + createInternalConverter(rowType.getTypeAt(i)))); phoenixTypeList.add(getPDataType(fieldTypes[i].getTypeRoot().toString())); this.rowProjector = rowProjector; @@ -139,7 +139,8 @@ public RowData toInternal(NoTagsKeyValue input) throws Exception { ColumnProjector columnProjector = rowProjector.getColumnProjector(i); // phoenix PDataType pDataType = phoenixTypeList.get(i); // phoenix-core Object value = columnProjector.getValue(resultTuple, pDataType, pointer); - columnRowData.addField((AbstractBaseColumn) toInternalConverters[i].deserialize(value)); + columnRowData.addField( + (AbstractBaseColumn) toInternalConverters.get(i).deserialize(value)); } return columnRowData; } diff --git a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseRowConverter.java b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseRowConverter.java index 20cc589df4..a978b2b957 100644 --- a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseRowConverter.java +++ b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/HBaseRowConverter.java @@ -95,9 +95,9 @@ public HBaseRowConverter(RowType rowType, RowProjector rowProjector) { phoenixTypeList = new ArrayList<>(fieldNames.size()); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); + createInternalConverter(rowType.getTypeAt(i)))); phoenixTypeList.add(getPDataType(fieldTypes[i].getTypeRoot().toString())); this.rowProjector = rowProjector; } @@ -137,7 +137,7 @@ public RowData toInternal(NoTagsKeyValue input) throws Exception { ColumnProjector columnProjector = rowProjector.getColumnProjector(i); PDataType pDataType = phoenixTypeList.get(i); Object value = columnProjector.getValue(resultTuple, pDataType, pointer); - genericRowData.setField(i, toInternalConverters[i].deserialize(value)); + genericRowData.setField(i, toInternalConverters.get(i).deserialize(value)); } return genericRowData; } diff --git a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5ColumnConverter.java b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5ColumnConverter.java index cc1e692577..094b479958 100644 --- a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5ColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5ColumnConverter.java @@ -52,12 +52,12 @@ public class Phoenix5ColumnConverter public Phoenix5ColumnConverter(RowType rowType, FlinkxCommonConf commonConf) { super(rowType, commonConf); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -76,10 +76,10 @@ public Phoenix5ColumnConverter(RowType rowType, FlinkxCommonConf commonConf) { @Override public RowData toInternal(ResultSet resultSet) throws Exception { - ColumnRowData data = new ColumnRowData(toInternalConverters.length); - for (int i = 0; i < toInternalConverters.length; i++) { + ColumnRowData data = new ColumnRowData(toInternalConverters.size()); + for (int i = 0; i < toInternalConverters.size(); i++) { Object field = resultSet.getObject(i + 1); - data.addField((AbstractBaseColumn) toInternalConverters[i].deserialize(field)); + data.addField((AbstractBaseColumn) toInternalConverters.get(i).deserialize(field)); } return data; } @@ -88,7 +88,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { public FieldNamedPreparedStatement toExternal( RowData rowData, FieldNamedPreparedStatement statement) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, statement); + toExternalConverters.get(index).serialize(rowData, index, statement); } return statement; } diff --git a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5RowConverter.java b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5RowConverter.java index 58fdc3fe25..ae02b8ba44 100644 --- a/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5RowConverter.java +++ b/flinkx-connectors/flinkx-connector-phoenix5/src/main/java/com/dtstack/flinkx/connector/phoenix5/converter/Phoenix5RowConverter.java @@ -58,12 +58,12 @@ public class Phoenix5RowConverter public Phoenix5RowConverter(RowType rowType) { super(rowType); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -91,7 +91,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = resultSet.getObject(pos + 1); - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -101,7 +101,7 @@ public RowData toInternalLookup(JsonArray jsonArray) throws Exception { GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = jsonArray.getValue(pos); - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -110,7 +110,7 @@ public RowData toInternalLookup(JsonArray jsonArray) throws Exception { public FieldNamedPreparedStatement toExternal( RowData rowData, FieldNamedPreparedStatement statement) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, statement); + toExternalConverters.get(index).serialize(rowData, index, statement); } return statement; } diff --git a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisRowConverter.java b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisRowConverter.java index 64b4567df2..f24096f7f7 100644 --- a/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisRowConverter.java +++ b/flinkx-connectors/flinkx-connector-redis/src/main/java/com/dtstack/flinkx/connector/redis/converter/RedisRowConverter.java @@ -68,9 +68,9 @@ public RedisRowConverter(RowType rowType) { super(rowType); List fieldNames = rowType.getFieldNames(); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); + createInternalConverter(rowType.getTypeAt(i)))); typeIndexList.add(new Triplet(fieldNames.get(i), i, rowType.getTypeAt(i))); } } @@ -78,9 +78,9 @@ public RedisRowConverter(RowType rowType) { public RedisRowConverter(RowType rowType, RedisConf redisConf) { super(rowType); for (int i = 0; i < rowType.getFieldCount(); i++) { - toExternalConverters[i] = + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } this.redisConf = redisConf; } @@ -120,7 +120,7 @@ private GenericRowData getGenericRowData(Map input) throws Excep Triplet typeTriplet = collect.get(0); genericRowData.setField( typeTriplet.second, - toInternalConverters[typeTriplet.second].deserialize(input.get(key))); + toInternalConverters.get(typeTriplet.second).deserialize(input.get(key))); } return genericRowData; @@ -131,7 +131,7 @@ public JedisCommands toExternal(RowData rowData, JedisCommands jedis) throws Exc List fieldNames = rowType.getFieldNames(); List fieldValue = new ArrayList<>(); for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, fieldValue); + toExternalConverters.get(index).serialize(rowData, index, fieldValue); } Map collect = diff --git a/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiColumnConverter.java b/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiColumnConverter.java index 694cf558c9..b5bdf0e69d 100644 --- a/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiColumnConverter.java @@ -40,6 +40,7 @@ import org.apache.commons.collections.CollectionUtils; import java.math.BigDecimal; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.Map; @@ -65,10 +66,11 @@ public RestapiColumnConverter(HttpRestConfig httpRestConfig) { httpRestConfig.getColumn().stream() .map(FieldConf::getType) .collect(Collectors.toList()); - this.toInternalConverters = new IDeserializationConverter[typeList.size()]; + this.toInternalConverters = new ArrayList<>(); for (int i = 0; i < typeList.size(); i++) { - toInternalConverters[i] = - wrapIntoNullableInternalConverter(createInternalConverter(typeList.get(i))); + toInternalConverters.add( + wrapIntoNullableInternalConverter( + createInternalConverter(typeList.get(i)))); } } } @@ -84,10 +86,10 @@ public RowData toInternal(String input) throws Exception { ColumnRowData row; if (httpRestConfig.getDecode().equals(ConstantValue.DEFAULT_DECODE) && toInternalConverters != null - && toInternalConverters.length > 0) { + && toInternalConverters.size() > 0) { Map result = DefaultRestHandler.gson.fromJson(input, GsonUtil.gsonMapTypeToken); - row = new ColumnRowData(toInternalConverters.length); + row = new ColumnRowData(toInternalConverters.size()); List column = httpRestConfig.getColumn(); for (int i = 0; i < column.size(); i++) { Object value = @@ -95,7 +97,7 @@ public RowData toInternal(String input) throws Exception { result, column.get(i).getName(), httpRestConfig.getFieldDelimiter()); - row.addField((AbstractBaseColumn) toInternalConverters[i].deserialize(value)); + row.addField((AbstractBaseColumn) toInternalConverters.get(i).deserialize(value)); } } else { row = new ColumnRowData(1); diff --git a/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiRowConverter.java b/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiRowConverter.java index c86f24e099..57eda6560b 100644 --- a/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiRowConverter.java +++ b/flinkx-connectors/flinkx-connector-restapi/src/main/java/com/dtstack/flinkx/connector/restapi/convert/RestapiRowConverter.java @@ -52,9 +52,9 @@ public class RestapiRowConverter extends AbstractRowConverter { - protected SolrSerializationConverter[] toExternalConverters; + protected List toExternalConverters; protected String[] fieldNames; public SolrColumnConverter(RowType rowType, String[] fieldNames) { @@ -61,21 +62,21 @@ public SolrColumnConverter(RowType rowType, String[] fieldNames) { rowType.getFields().stream() .map(RowType.RowField::getType) .toArray(LogicalType[]::new); - this.toInternalConverters = new IDeserializationConverter[rowType.getFieldCount()]; - this.toExternalConverters = new SolrSerializationConverter[rowType.getFieldCount()]; + this.toInternalConverters = new ArrayList<>(); + this.toExternalConverters = new ArrayList<>(); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableSolrExternalConverter( - createSolrExternalConverter(fieldTypes[i])); + createSolrExternalConverter(fieldTypes[i]))); } } @Override public RowData toInternal(SolrDocument input) throws Exception { - ColumnRowData columnRowData = new ColumnRowData(toInternalConverters.length); + ColumnRowData columnRowData = new ColumnRowData(toInternalConverters.size()); for (int pos = 0; pos < rowType.getFieldCount(); pos++) { Object field = input.getFieldValue(fieldNames[pos]); // when Solr collection is schemaless, it will return a ArrayList. @@ -83,7 +84,7 @@ public RowData toInternal(SolrDocument input) throws Exception { field = ((ArrayList) field).get(0); } columnRowData.addField( - (AbstractBaseColumn) toInternalConverters[pos].deserialize(field)); + (AbstractBaseColumn) toInternalConverters.get(pos).deserialize(field)); } return columnRowData; } @@ -92,7 +93,9 @@ public RowData toInternal(SolrDocument input) throws Exception { public SolrInputDocument toExternal(RowData rowData, SolrInputDocument solrInputDocument) throws Exception { for (int pos = 0; pos < rowData.getArity(); pos++) { - toExternalConverters[pos].serialize(rowData, pos, fieldNames[pos], solrInputDocument); + toExternalConverters + .get(pos) + .serialize(rowData, pos, fieldNames[pos], solrInputDocument); } return solrInputDocument; } diff --git a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/converter/SolrRowConverter.java b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/converter/SolrRowConverter.java index 9aff295b0c..82bc25ac9d 100644 --- a/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/converter/SolrRowConverter.java +++ b/flinkx-connectors/flinkx-connector-solr/src/main/java/com/dtstack/flinkx/connector/solr/converter/SolrRowConverter.java @@ -42,6 +42,7 @@ import java.time.LocalDate; import java.time.LocalTime; import java.util.ArrayList; +import java.util.List; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -53,7 +54,7 @@ public class SolrRowConverter extends AbstractRowConverter { - protected SolrSerializationConverter[] toExternalConverters; + protected List toExternalConverters; protected String[] fieldNames; public SolrRowConverter(RowType rowType, String[] fieldNames) { @@ -63,15 +64,15 @@ public SolrRowConverter(RowType rowType, String[] fieldNames) { rowType.getFields().stream() .map(RowType.RowField::getType) .toArray(LogicalType[]::new); - this.toInternalConverters = new IDeserializationConverter[rowType.getFieldCount()]; - this.toExternalConverters = new SolrSerializationConverter[rowType.getFieldCount()]; + this.toInternalConverters = new ArrayList<>(); + this.toExternalConverters = new ArrayList<>(); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableSolrExternalConverter( - createSolrExternalConverter(fieldTypes[i])); + createSolrExternalConverter(fieldTypes[i]))); } } @@ -84,7 +85,7 @@ public RowData toInternal(SolrDocument input) throws Exception { if (field instanceof ArrayList) { field = ((ArrayList) field).get(0); } - genericRowData.setField(pos, toInternalConverters[pos].deserialize(field)); + genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field)); } return genericRowData; } @@ -92,7 +93,9 @@ public RowData toInternal(SolrDocument input) throws Exception { @Override public SolrInputDocument toExternal(RowData rowData, SolrInputDocument solrInputDocument) { for (int pos = 0; pos < rowData.getArity(); pos++) { - toExternalConverters[pos].serialize(rowData, pos, fieldNames[pos], solrInputDocument); + toExternalConverters + .get(pos) + .serialize(rowData, pos, fieldNames[pos], solrInputDocument); } return solrInputDocument; } diff --git a/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverJtdsColumnConverter.java b/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverJtdsColumnConverter.java index 5bc267f124..71d76dcd8c 100644 --- a/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverJtdsColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverJtdsColumnConverter.java @@ -64,7 +64,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { } else { baseColumn = (AbstractBaseColumn) - toInternalConverters[converterIndex].deserialize(field); + toInternalConverters.get(converterIndex).deserialize(field); } converterIndex++; } diff --git a/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverMicroSoftColumnConverter.java b/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverMicroSoftColumnConverter.java index 0f2aebe1f0..c649b1bba7 100644 --- a/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverMicroSoftColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-sqlserver/src/main/java/com/dtstack/flinkx/connector/sqlserver/converter/SqlserverMicroSoftColumnConverter.java @@ -77,7 +77,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception { } else { baseColumn = (AbstractBaseColumn) - toInternalConverters[converterIndex].deserialize(field); + toInternalConverters.get(converterIndex).deserialize(field); } converterIndex++; } diff --git a/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcColumnConverter.java b/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcColumnConverter.java index ea088177ea..64fbd033f1 100644 --- a/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcColumnConverter.java @@ -43,6 +43,7 @@ import java.nio.charset.StandardCharsets; import java.sql.Timestamp; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.LinkedList; import java.util.List; @@ -72,14 +73,15 @@ public LinkedList toInternal(SqlServerCdcEventRow sqlServerCdcEventRow) String schema = sqlServerCdcEventRow.getSchema(); String table = sqlServerCdcEventRow.getTable(); String key = schema + ConstantValue.POINT_SYMBOL + table; - IDeserializationConverter[] converters = super.cdcConverterCacheMap.get(key); + List converters = super.cdcConverterCacheMap.get(key); if (converters == null) { List columnTypes = sqlServerCdcEventRow.getColumnTypes(); converters = - columnTypes.stream() - .map(x -> createInternalConverter(x)) - .toArray(IDeserializationConverter[]::new); + Arrays.asList( + columnTypes.stream() + .map(x -> createInternalConverter(x)) + .toArray(IDeserializationConverter[]::new)); cdcConverterCacheMap.put(key, converters); } @@ -222,7 +224,7 @@ public LinkedList toInternal(SqlServerCdcEventRow sqlServerCdcEventRow) * @param after */ private void parseColumnList( - IDeserializationConverter[] converters, + List converters, Object[] data, List columnsNames, List columnList, @@ -232,7 +234,8 @@ private void parseColumnList( for (int i = 0; i < data.length; i++) { headerList.add(after + columnsNames.get(i)); if (data[i] != null) { - AbstractBaseColumn column = converters[i].deserialize(data[i]); + AbstractBaseColumn column = + (AbstractBaseColumn) converters.get(i).deserialize(data[i]); columnList.add(column); } else { columnList.add(null); diff --git a/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcRowConverter.java b/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcRowConverter.java index c1422cc987..7d62db53ca 100644 --- a/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcRowConverter.java +++ b/flinkx-connectors/flinkx-connector-sqlservercdc/src/main/java/com/dtstack/flinkx/connector/sqlservercdc/convert/SqlServerCdcRowConverter.java @@ -46,6 +46,7 @@ import java.time.ZoneOffset; import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalQueries; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -63,9 +64,9 @@ public class SqlServerCdcRowConverter public SqlServerCdcRowConverter(RowType rowType, TimestampFormat timestampFormat) { super.fieldNameList = rowType.getFieldNames(); this.timestampFormat = timestampFormat; - super.converters = new IDeserializationConverter[rowType.getFieldCount()]; + super.converters = new ArrayList<>(); for (int i = 0; i < rowType.getFieldCount(); i++) { - super.converters[i] = createInternalConverter(rowType.getTypeAt(i)); + super.converters.add(createInternalConverter(rowType.getTypeAt(i))); } } diff --git a/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamColumnConverter.java b/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamColumnConverter.java index 4abde374c6..bb7e1cfbf0 100644 --- a/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamColumnConverter.java +++ b/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamColumnConverter.java @@ -36,6 +36,7 @@ import java.math.BigDecimal; import java.time.LocalTime; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.concurrent.atomic.AtomicLong; @@ -58,14 +59,13 @@ public StreamColumnConverter(FlinkxCommonConf commonConf) { .map(FieldConf::getType) .collect(Collectors.toList()); super.commonConf = commonConf; - super.toInternalConverters = new IDeserializationConverter[typeList.size()]; - super.toExternalConverters = new ISerializationConverter[typeList.size()]; + toInternalConverters = new ArrayList<>(typeList.size()); + toExternalConverters = new ArrayList<>(typeList.size()); - for (int i = 0; i < typeList.size(); i++) { - toInternalConverters[i] = createInternalConverter(typeList.get(i)); - toExternalConverters[i] = - wrapIntoNullableExternalConverter( - createExternalConverter(typeList.get(i)), typeList.get(i)); + for (String s : typeList) { + toInternalConverters.add(createInternalConverter(s)); + toExternalConverters.add( + wrapIntoNullableExternalConverter(createExternalConverter(s), s)); } } @@ -128,7 +128,7 @@ public ColumnRowData toInternal(ColumnRowData rowData) throws Exception { ColumnRowData result = new ColumnRowData(fieldConfList.size()); for (int i = 0; i < fieldConfList.size(); i++) { AbstractBaseColumn baseColumn = - (AbstractBaseColumn) toInternalConverters[i].deserialize(null); + (AbstractBaseColumn) toInternalConverters.get(i).deserialize(null); result.addField(assembleFieldProps(fieldConfList.get(i), baseColumn)); } return result; diff --git a/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamRowConverter.java b/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamRowConverter.java index 3c68353a4d..a51c03b278 100644 --- a/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamRowConverter.java +++ b/flinkx-connectors/flinkx-connector-stream/src/main/java/com/dtstack/flinkx/connector/stream/converter/StreamRowConverter.java @@ -58,12 +58,12 @@ public class StreamRowConverter public StreamRowConverter(RowType rowType) { super(rowType); for (int i = 0; i < rowType.getFieldCount(); i++) { - toInternalConverters[i] = + toInternalConverters.add( wrapIntoNullableInternalConverter( - createInternalConverter(rowType.getTypeAt(i))); - toExternalConverters[i] = + createInternalConverter(rowType.getTypeAt(i)))); + toExternalConverters.add( wrapIntoNullableExternalConverter( - createExternalConverter(fieldTypes[i]), fieldTypes[i]); + createExternalConverter(fieldTypes[i]), fieldTypes[i])); } } @@ -85,7 +85,7 @@ protected ISerializationConverter wrapIntoNullableExternalConver public RowData toInternal(RowData input) throws Exception { GenericRowData row = new GenericRowData(input.getArity()); for (int i = 0; i < input.getArity(); i++) { - row.setField(i, toInternalConverters[i].deserialize(input)); + row.setField(i, toInternalConverters.get(i).deserialize(input)); } return row; } @@ -93,7 +93,7 @@ public RowData toInternal(RowData input) throws Exception { @Override public RowData toExternal(RowData rowData, RowData output) throws Exception { for (int index = 0; index < rowData.getArity(); index++) { - toExternalConverters[index].serialize(rowData, index, output); + toExternalConverters.get(index).serialize(rowData, index, output); } return output; } diff --git a/flinkx-core/pom.xml b/flinkx-core/pom.xml index 039c546eaf..6beea0e44c 100644 --- a/flinkx-core/pom.xml +++ b/flinkx-core/pom.xml @@ -19,6 +19,7 @@ org.apache.logging.log4j log4j-core 2.12.1 + provided @@ -26,8 +27,28 @@ org.apache.logging.log4j log4j-slf4j-impl 2.12.1 + provided + + org.slf4j + slf4j-api + 1.7.21 + provided + + + + ch.qos.logback + logback-classic + 1.1.7 + + + + ch.qos.logback + logback-core + 1.1.7 + + com.google.guava guava diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractCDCRowConverter.java b/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractCDCRowConverter.java index 8ecc9d1e80..d285bf8560 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractCDCRowConverter.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractCDCRowConverter.java @@ -87,13 +87,13 @@ public abstract class AbstractCDCRowConverter implements Serializabl .appendPattern("'Z'") .toFormatter(); - protected final Map cdcConverterCacheMap = + protected final Map> cdcConverterCacheMap = new ConcurrentHashMap<>(32); protected final SnowflakeIdWorker idWorker = new SnowflakeIdWorker(1, 1); protected boolean pavingData; protected boolean splitUpdate; protected List fieldNameList; - protected IDeserializationConverter[] converters; + protected List converters; /** * 将外部数据库类型转换为flink内部类型 @@ -152,7 +152,7 @@ protected RowKind getRowKindByType(String type) { @SuppressWarnings("unchecked") protected RowData createRowDataByConverters( List fieldNameList, - IDeserializationConverter[] converters, + List converters, Map valueMap) throws Exception { GenericRowData genericRowData = new GenericRowData(fieldNameList.size()); @@ -160,7 +160,7 @@ protected RowData createRowDataByConverters( String fieldName = fieldNameList.get(i); Object value = valueMap.get(fieldName); if (value != null) { - value = converters[i].deserialize(value); + value = converters.get(i).deserialize(value); } genericRowData.setField(i, value); } diff --git a/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractRowConverter.java b/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractRowConverter.java index 3f94a3f8b6..a2391738c5 100644 --- a/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractRowConverter.java +++ b/flinkx-core/src/main/java/com/dtstack/flinkx/converter/AbstractRowConverter.java @@ -33,6 +33,7 @@ import java.io.Serializable; import java.sql.ResultSet; +import java.util.ArrayList; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -51,8 +52,8 @@ public abstract class AbstractRowConverter implement private static final long serialVersionUID = 1L; protected RowType rowType; - protected IDeserializationConverter[] toInternalConverters; - protected ISerializationConverter[] toExternalConverters; + protected ArrayList toInternalConverters; + protected ArrayList toExternalConverters; protected LogicalType[] fieldTypes; protected FlinkxCommonConf commonConf; @@ -78,8 +79,8 @@ public AbstractRowConverter(RowType rowType, FlinkxCommonConf commonConf) { } public AbstractRowConverter(int converterSize) { - this.toInternalConverters = new IDeserializationConverter[converterSize]; - this.toExternalConverters = new ISerializationConverter[converterSize]; + this.toInternalConverters = new ArrayList<>(converterSize); + this.toExternalConverters = new ArrayList<>(converterSize); } protected IDeserializationConverter wrapIntoNullableInternalConverter(