Skip to content

Commit

Permalink
[feat][core/plugins] fix serialzition bug
Browse files Browse the repository at this point in the history
  • Loading branch information
lvyanquan authored and a49a committed Aug 23, 2021
1 parent 7f995f0 commit c4e635e
Show file tree
Hide file tree
Showing 44 changed files with 318 additions and 251 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
Expand All @@ -67,17 +68,18 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
String schema = binlogEventRow.getSchema();
String table = binlogEventRow.getTable();
String key = schema + ConstantValue.POINT_SYMBOL + table;
IDeserializationConverter[] converters = super.cdcConverterCacheMap.get(key);
List<IDeserializationConverter> converters = super.cdcConverterCacheMap.get(key);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (converters == null) {
List<CanalEntry.Column> list = rowData.getBeforeColumnsList();
if (CollectionUtils.isEmpty(list)) {
list = rowData.getAfterColumnsList();
}
converters =
list.stream()
.map(x -> createInternalConverter(x.getMysqlType()))
.toArray(IDeserializationConverter[]::new);
Arrays.asList(
list.stream()
.map(x -> createInternalConverter(x.getMysqlType()))
.toArray(IDeserializationConverter[]::new));
cdcConverterCacheMap.put(key, converters);
}

Expand Down Expand Up @@ -162,7 +164,7 @@ public LinkedList<RowData> toInternal(BinlogEventRow binlogEventRow) throws Exce
* @param after
*/
private void parseColumnList(
IDeserializationConverter<String, AbstractBaseColumn>[] converters,
List<IDeserializationConverter> converters,
List<CanalEntry.Column> entryColumnList,
List<AbstractBaseColumn> columnList,
List<String> headerList,
Expand All @@ -171,7 +173,8 @@ private void parseColumnList(
for (int i = 0; i < entryColumnList.size(); i++) {
CanalEntry.Column entryColumn = entryColumnList.get(i);
if (!entryColumn.getIsNull()) {
AbstractBaseColumn column = converters[i].deserialize(entryColumn.getValue());
AbstractBaseColumn column =
(AbstractBaseColumn) converters.get(i).deserialize(entryColumn.getValue());
columnList.add(column);
} else {
columnList.add(new NullColumn());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.time.temporal.TemporalQueries;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -59,9 +60,9 @@ public class BinlogRowConverter extends AbstractCDCRowConverter<BinlogEventRow,
public BinlogRowConverter(RowType rowType, TimestampFormat timestampFormat) {
super.fieldNameList = rowType.getFieldNames();
this.timestampFormat = timestampFormat;
super.converters = new IDeserializationConverter[rowType.getFieldCount()];
super.converters = new ArrayList<>();
for (int i = 0; i < rowType.getFieldCount(); i++) {
super.converters[i] = createInternalConverter(rowType.getTypeAt(i));
super.converters.add(createInternalConverter(rowType.getTypeAt(i)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ public CassandraColumnConverter(RowType rowType, List<FieldConf> fieldConfList)
super(rowType);
this.fieldConfList = fieldConfList;
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters[i] =
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(fieldConfList.get(i).getType()));
toExternalConverters[i] =
createInternalConverter(fieldConfList.get(i).getType())));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldConfList.get(i).getType()),
fieldConfList.get(i).getType());
fieldConfList.get(i).getType()));
}
}

Expand All @@ -87,14 +87,14 @@ public ISerializationConverter<BoundStatement> wrapIntoNullableExternalConverter
@Override
@SuppressWarnings("unchecked")
public RowData toInternal(ResultSet resultSet) throws Exception {
ColumnRowData columnRowData = new ColumnRowData(toInternalConverters.length);
ColumnRowData columnRowData = new ColumnRowData(toInternalConverters.size());

Row row = resultSet.one();

for (int i = 0; i < toInternalConverters.length; i++) {
for (int i = 0; i < toInternalConverters.size(); i++) {
final Object value = row.getObject(i);
columnRowData.setField(
i, (AbstractBaseColumn) toInternalConverters[i].deserialize(value));
i, (AbstractBaseColumn) toInternalConverters.get(i).deserialize(value));
}
return columnRowData;
}
Expand All @@ -103,7 +103,7 @@ public RowData toInternal(ResultSet resultSet) throws Exception {
@SuppressWarnings("unchecked")
public BoundStatement toExternal(RowData rowData, BoundStatement statement) throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, statement);
toExternalConverters.get(index).serialize(rowData, index, statement);
}
return statement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ public CassandraRowConverter(RowType rowType, List<String> columnNameList) {
super(rowType);
this.columnNameList = columnNameList;
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters[i] =
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(rowType.getTypeAt(i)));
toExternalConverters[i] =
createInternalConverter(rowType.getTypeAt(i))));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldTypes[i]), fieldTypes[i]);
createExternalConverter(fieldTypes[i]), fieldTypes[i]));
}
}

Expand All @@ -84,15 +84,15 @@ private RowData deserializeInput(Row input) throws Exception {
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
Object field = input.getObject(pos);
genericRowData.setField(pos, toInternalConverters[pos].deserialize(field));
genericRowData.setField(pos, toInternalConverters.get(pos).deserialize(field));
}
return genericRowData;
}

@Override
public BoundStatement toExternal(RowData rowData, BoundStatement statement) throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, statement);
toExternalConverters.get(index).serialize(rowData, index, statement);
}
return statement;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ public Elasticsearch6ColumnConverter(RowType rowType) {
super(rowType);
List<String> fieldNames = rowType.getFieldNames();
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters[i] =
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(rowType.getTypeAt(i)));
toExternalConverters[i] =
createInternalConverter(rowType.getTypeAt(i))));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldTypes[i]), fieldTypes[i]);
createExternalConverter(fieldTypes[i]), fieldTypes[i]));
typeIndexList.add(new Tuple3<>(fieldNames.get(i), i, rowType.getTypeAt(i)));
}
}
Expand All @@ -96,7 +96,7 @@ protected ISerializationConverter wrapIntoNullableExternalConverter(
@Override
public RowData toInternal(Map<String, Object> input) throws Exception {
ColumnRowData columnRowData = new ColumnRowData(rowType.getFieldCount());
for (int i = 0; i < toInternalConverters.length; i++) {
for (int i = 0; i < toInternalConverters.size(); i++) {
final int index = i;
List<Tuple3<String, Integer, LogicalType>> collect =
typeIndexList.stream()
Expand All @@ -110,7 +110,8 @@ public RowData toInternal(Map<String, Object> input) throws Exception {

Tuple3<String, Integer, LogicalType> typeTuple = collect.get(0);
Object field = input.get(typeTuple._1());
columnRowData.addField((AbstractBaseColumn) toInternalConverters[i].deserialize(field));
columnRowData.addField(
(AbstractBaseColumn) toInternalConverters.get(i).deserialize(field));
}
return columnRowData;
}
Expand All @@ -119,7 +120,7 @@ public RowData toInternal(Map<String, Object> input) throws Exception {
public Map<String, Object> toExternal(RowData rowData, Map<String, Object> output)
throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, output);
toExternalConverters.get(index).serialize(rowData, index, output);
}
return output;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public Elasticsearch6RowConverter(RowType rowType) {
super(rowType);
List<String> fieldNames = rowType.getFieldNames();
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters[i] =
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(rowType.getTypeAt(i)));
toExternalConverters[i] =
createInternalConverter(rowType.getTypeAt(i))));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldTypes[i]), fieldTypes[i]);
createExternalConverter(fieldTypes[i]), fieldTypes[i]));
typeIndexList.add(new Tuple3<>(fieldNames.get(i), i, rowType.getTypeAt(i)));
}
}
Expand Down Expand Up @@ -121,7 +121,7 @@ private GenericRowData genericRowData(Map<String, Object> input) throws Exceptio
Tuple3<String, Integer, LogicalType> typeTuple = collect.get(0);
genericRowData.setField(
typeTuple._2(),
toInternalConverters[typeTuple._2()].deserialize(input.get(key)));
toInternalConverters.get(typeTuple._2()).deserialize(input.get(key)));
}
return genericRowData;
}
Expand All @@ -130,7 +130,7 @@ private GenericRowData genericRowData(Map<String, Object> input) throws Exceptio
public Map<String, Object> toExternal(RowData rowData, Map<String, Object> output)
throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, output);
toExternalConverters.get(index).serialize(rowData, index, output);
}
return output;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public FtpColumnConverter(FtpConfig ftpConfig) {

for (int i = 0; i < ftpConfig.getColumn().size(); i++) {
FieldConf fieldConf = ftpConfig.getColumn().get(i);
toInternalConverters[i] =
wrapIntoNullableInternalConverter(createInternalConverter(fieldConf));
toExternalConverters[i] =
toInternalConverters.add(
wrapIntoNullableInternalConverter(createInternalConverter(fieldConf)));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldConf), fieldConf);
createExternalConverter(fieldConf), fieldConf));
}
}

Expand All @@ -68,7 +68,8 @@ public RowData toInternal(RowData input) throws Exception {
if (input instanceof GenericRowData) {
GenericRowData genericRowData = (GenericRowData) input;
for (int i = 0; i < input.getArity(); i++) {
row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i)));
row.setField(
i, toInternalConverters.get(i).deserialize(genericRowData.getField(i)));
}
} else {
throw new FlinkxRuntimeException(
Expand All @@ -85,7 +86,7 @@ public String toExternal(RowData rowData, String output) throws Exception {

List<String> columnData = new ArrayList<>(ftpConfig.getColumn().size());
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, columnData);
toExternalConverters.get(index).serialize(rowData, index, columnData);
if (index != 0) {
sb.append(ftpConfig.getFieldDelimiter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ public HBaseColumnConverter(List<FieldConf> fieldConfList) {
if (left > 0 && right > 0) {
type = type.substring(0, left);
}
toInternalConverters[i] =
wrapIntoNullableInternalConverter(createInternalConverter(type));
toExternalConverters[i] =
wrapIntoNullableExternalConverter(createExternalConverter(type), type);
toInternalConverters.add(
wrapIntoNullableInternalConverter(createInternalConverter(type)));
toExternalConverters.add(
wrapIntoNullableExternalConverter(createExternalConverter(type), type));
}
}

Expand All @@ -78,7 +78,8 @@ public RowData toInternal(RowData input) throws Exception {
if (input instanceof GenericRowData) {
GenericRowData genericRowData = (GenericRowData) input;
for (int i = 0; i < input.getArity(); i++) {
row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i)));
row.setField(
i, toInternalConverters.get(i).deserialize(genericRowData.getField(i)));
}
} else {
throw new FlinkxRuntimeException(
Expand All @@ -92,7 +93,7 @@ public RowData toInternal(RowData input) throws Exception {
@SuppressWarnings("unchecked")
public Object[] toExternal(RowData rowData, Object[] data) throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, data);
toExternalConverters.get(index).serialize(rowData, index, data);
}
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ public class HBaseConverter extends AbstractRowConverter<RowData, RowData, Objec
public HBaseConverter(RowType rowType) {
super(rowType);
for (int i = 0; i < rowType.getFieldCount(); i++) {
toInternalConverters[i] =
toInternalConverters.add(
wrapIntoNullableInternalConverter(
createInternalConverter(rowType.getTypeAt(i)));
toExternalConverters[i] =
createInternalConverter(rowType.getTypeAt(i))));
toExternalConverters.add(
wrapIntoNullableExternalConverter(
createExternalConverter(fieldTypes[i]), fieldTypes[i]);
createExternalConverter(fieldTypes[i]), fieldTypes[i]));
}
}

Expand All @@ -51,7 +51,8 @@ public RowData toInternal(RowData input) throws Exception {
if (input instanceof GenericRowData) {
GenericRowData genericRowData = (GenericRowData) input;
for (int i = 0; i < input.getArity(); i++) {
row.setField(i, toInternalConverters[i].deserialize(genericRowData.getField(i)));
row.setField(
i, toInternalConverters.get(i).deserialize(genericRowData.getField(i)));
}
} else {
throw new FlinkxRuntimeException(
Expand All @@ -65,7 +66,7 @@ public RowData toInternal(RowData input) throws Exception {
@Override
public Object toExternal(RowData rowData, Object data) throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, data);
toExternalConverters.get(index).serialize(rowData, index, data);
}
return data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public HdfsOrcColumnConverter(List<FieldConf> fieldConfList) {
if (left > 0 && right > 0) {
type = type.substring(0, left);
}
toInternalConverters[i] =
wrapIntoNullableInternalConverter(createInternalConverter(type));
toExternalConverters[i] =
wrapIntoNullableExternalConverter(createExternalConverter(type), type);
toInternalConverters.add(
wrapIntoNullableInternalConverter(createInternalConverter(type)));
toExternalConverters.add(
wrapIntoNullableExternalConverter(createExternalConverter(type), type));
}
}

Expand All @@ -84,7 +84,9 @@ public RowData toInternal(RowData input) throws Exception {
for (int i = 0; i < input.getArity(); i++) {
row.addField(
(AbstractBaseColumn)
toInternalConverters[i].deserialize(genericRowData.getField(i)));
toInternalConverters
.get(i)
.deserialize(genericRowData.getField(i)));
}
} else {
throw new FlinkxRuntimeException(
Expand All @@ -99,7 +101,7 @@ public RowData toInternal(RowData input) throws Exception {
@SuppressWarnings("unchecked")
public Object[] toExternal(RowData rowData, Object[] data) throws Exception {
for (int index = 0; index < rowData.getArity(); index++) {
toExternalConverters[index].serialize(rowData, index, data);
toExternalConverters.get(index).serialize(rowData, index, data);
}
return data;
}
Expand Down
Loading

0 comments on commit c4e635e

Please sign in to comment.