Skip to content

Commit c3ef625

Browse files
committed
[cdc] Allow preventing corrupted record logging
1 parent c4c8980 commit c3ef625

File tree

4 files changed

+49
-14
lines changed

4 files changed

+49
-14
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperator.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.io.IOException;
3636
import java.util.Optional;
3737

38+
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD;
3839
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
3940
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
4041
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
@@ -53,6 +54,8 @@ public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<Cdc
5354

5455
private final boolean skipCorruptRecord;
5556

57+
private final boolean logCorruptRecord;
58+
5659
private CdcDynamicBucketWriteOperator(
5760
StreamOperatorParameters<Committable> parameters,
5861
FileStoreTable table,
@@ -63,6 +66,7 @@ private CdcDynamicBucketWriteOperator(
6366
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
6467
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
6568
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
69+
this.logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD);
6670
}
6771

6872
@Override
@@ -79,11 +83,13 @@ protected boolean containLogSystem() {
7983
@Override
8084
public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) throws Exception {
8185
Tuple2<CdcRecord, Integer> record = element.getValue();
82-
Optional<GenericRow> optionalConverted = toGenericRow(record.f0, table.schema().fields());
86+
Optional<GenericRow> optionalConverted =
87+
toGenericRow(record.f0, table.schema().fields(), logCorruptRecord);
8388
if (!optionalConverted.isPresent()) {
8489
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
8590
table = table.copyWithLatestSchema();
86-
optionalConverted = toGenericRow(record.f0, table.schema().fields());
91+
optionalConverted =
92+
toGenericRow(record.f0, table.schema().fields(), logCorruptRecord);
8793
if (optionalConverted.isPresent()) {
8894
break;
8995
}
@@ -94,9 +100,13 @@ public void processElement(StreamRecord<Tuple2<CdcRecord, Integer>> element) thr
94100

95101
if (!optionalConverted.isPresent()) {
96102
if (skipCorruptRecord) {
97-
LOG.warn("Skipping corrupt or unparsable record {}", record);
103+
LOG.warn(
104+
"Skipping corrupt or unparsable record {}",
105+
(logCorruptRecord ? record : "<redacted>"));
98106
} else {
99-
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
107+
throw new RuntimeException(
108+
"Unable to process element. Possibly a corrupt record: "
109+
+ (logCorruptRecord ? record : "<redacted>"));
100110
}
101111
} else {
102112
try {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import java.util.concurrent.Executors;
5555
import java.util.stream.Collectors;
5656

57+
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.LOG_CORRUPT_RECORD;
5758
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.MAX_RETRY_NUM_TIMES;
5859
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME;
5960
import static org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.SKIP_CORRUPT_RECORD;
@@ -155,14 +156,17 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
155156

156157
((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor);
157158

159+
boolean logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD);
158160
Optional<GenericRow> optionalConverted =
159-
toGenericRow(record.record(), table.schema().fields());
161+
toGenericRow(record.record(), table.schema().fields(), logCorruptRecord);
160162
if (!optionalConverted.isPresent()) {
161163
FileStoreTable latestTable = table;
162164
for (int retry = 0; retry < retryCnt; ++retry) {
163165
latestTable = latestTable.copyWithLatestSchema();
164166
tables.put(tableId, latestTable);
165-
optionalConverted = toGenericRow(record.record(), latestTable.schema().fields());
167+
optionalConverted =
168+
toGenericRow(
169+
record.record(), latestTable.schema().fields(), logCorruptRecord);
166170
if (optionalConverted.isPresent()) {
167171
break;
168172
}
@@ -178,9 +182,13 @@ public void processElement(StreamRecord<CdcMultiplexRecord> element) throws Exce
178182

179183
if (!optionalConverted.isPresent()) {
180184
if (skipCorruptRecord) {
181-
LOG.warn("Skipping corrupt or unparsable record {}", record);
185+
LOG.warn(
186+
"Skipping corrupt or unparsable record {}",
187+
(logCorruptRecord ? record : "<redacted>"));
182188
} else {
183-
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
189+
throw new RuntimeException(
190+
"Unable to process element. Possibly a corrupt record: "
191+
+ (logCorruptRecord ? record : "<redacted>"));
184192
}
185193
} else {
186194
try {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,21 @@ public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {
6767
.defaultValue(false)
6868
.withDescription("Skip corrupt record if we fail to parse it");
6969

70+
public static final ConfigOption<Boolean> LOG_CORRUPT_RECORD =
71+
ConfigOptions.key("cdc.log-corrupt-record")
72+
.booleanType()
73+
.defaultValue(true)
74+
.withDescription(
75+
"Whether to allow corrupt record logging when unable to parse CDC records.");
76+
7077
private final long retrySleepMillis;
7178

7279
private final int maxRetryNumTimes;
7380

7481
private final boolean skipCorruptRecord;
7582

83+
private final boolean logCorruptRecord;
84+
7685
protected CdcRecordStoreWriteOperator(
7786
StreamOperatorParameters<Committable> parameters,
7887
FileStoreTable table,
@@ -83,6 +92,7 @@ protected CdcRecordStoreWriteOperator(
8392
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
8493
this.maxRetryNumTimes = table.coreOptions().toConfiguration().get(MAX_RETRY_NUM_TIMES);
8594
this.skipCorruptRecord = table.coreOptions().toConfiguration().get(SKIP_CORRUPT_RECORD);
95+
this.logCorruptRecord = table.coreOptions().toConfiguration().get(LOG_CORRUPT_RECORD);
8696
}
8797

8898
@Override
@@ -99,11 +109,12 @@ protected boolean containLogSystem() {
99109
@Override
100110
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
101111
CdcRecord record = element.getValue();
102-
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
112+
Optional<GenericRow> optionalConverted =
113+
toGenericRow(record, table.schema().fields(), logCorruptRecord);
103114
if (!optionalConverted.isPresent()) {
104115
for (int retry = 0; retry < maxRetryNumTimes; ++retry) {
105116
table = table.copyWithLatestSchema();
106-
optionalConverted = toGenericRow(record, table.schema().fields());
117+
optionalConverted = toGenericRow(record, table.schema().fields(), logCorruptRecord);
107118
if (optionalConverted.isPresent()) {
108119
break;
109120
}
@@ -114,9 +125,13 @@ public void processElement(StreamRecord<CdcRecord> element) throws Exception {
114125

115126
if (!optionalConverted.isPresent()) {
116127
if (skipCorruptRecord) {
117-
LOG.warn("Skipping corrupt or unparsable record {}", record);
128+
LOG.warn(
129+
"Skipping corrupt or unparsable record {}",
130+
(logCorruptRecord ? record : "<redacted>"));
118131
} else {
119-
throw new RuntimeException("Unable to process element. Possibly a corrupt record");
132+
throw new RuntimeException(
133+
"Unable to process element. Possibly a corrupt record: "
134+
+ (logCorruptRecord ? record : "<redacted>"));
120135
}
121136
} else {
122137
try {

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordUtils.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,14 @@ public static GenericRow projectAsInsert(CdcRecord record, List<DataField> dataF
7373
* CdcRecordUtils#projectAsInsert} instead.
7474
*
7575
* @param dataFields {@link DataField}s of the converted {@link GenericRow}.
76+
* @param logCorruptRecord whether to log data during conversion error
7677
* @return if all field names of {@code dataFields} existed in keys of {@code fields} and all
7778
* values of {@code fields} can be correctly converted to the specified type, an {@code
7879
* Optional#of(GenericRow)} will be returned, otherwise an {@code Optional#empty()} will be
7980
* returned
8081
*/
81-
public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField> dataFields) {
82+
public static Optional<GenericRow> toGenericRow(
83+
CdcRecord record, List<DataField> dataFields, boolean logCorruptRecord) {
8284
GenericRow genericRow = new GenericRow(record.kind(), dataFields.size());
8385
List<String> fieldNames =
8486
dataFields.stream().map(DataField::name).collect(Collectors.toList());
@@ -105,7 +107,7 @@ public static Optional<GenericRow> toGenericRow(CdcRecord record, List<DataField
105107
} catch (Exception e) {
106108
LOG.info(
107109
"Failed to convert value "
108-
+ value
110+
+ (logCorruptRecord ? value : "<redacted>")
109111
+ " to type "
110112
+ type
111113
+ ". Waiting for schema update.",

0 commit comments

Comments
 (0)