Skip to content

Commit

Permalink
[INLONG-11520][SDK] Remove DirtyServerType, use SinkType
Browse files Browse the repository at this point in the history
  • Loading branch information
vernedeng committed Nov 22, 2024
1 parent 22d9a33 commit 26f3b75
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

public class SinkType {

public static final String ICEBERG = "ICEBERG";
public static final String HIVE = "HIVE";
public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
public class DirtyMessageWrapper {

private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private String delimiter;
@Builder.Default
private String delimiter = "|";
@Builder.Default
@Getter
private int retryTimes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.inlong.sort.base.dirty;

import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.util.PatternReplaceUtils;

import org.apache.flink.table.types.logical.LogicalType;
Expand Down Expand Up @@ -65,7 +64,7 @@ public class DirtyData<T> {
*/
private final DirtyType dirtyType;

private final DirtyServerType serverType;
private final String serverType;
/**
* Dirty describe message, it is the cause of dirty data
*/
Expand All @@ -88,7 +87,7 @@ public class DirtyData<T> {
private final T data;

public DirtyData(T data, String identifier, String labels,
String logTag, DirtyType dirtyType, DirtyServerType serverType, String dirtyMessage,
String logTag, DirtyType dirtyType, String serverType, String dirtyMessage,
@Nullable LogicalType rowType, long dataTime, String extParams) {
this.data = data;
this.dirtyType = dirtyType;
Expand Down Expand Up @@ -131,7 +130,7 @@ public DirtyType getDirtyType() {
return dirtyType;
}

public DirtyServerType getServerType() {
public String getServerType() {
return serverType;
}

Expand Down Expand Up @@ -162,7 +161,7 @@ public static class Builder<T> {
private String labels;
private String logTag;
private DirtyType dirtyType = DirtyType.UNDEFINED;
private DirtyServerType serverType = DirtyServerType.UNDEFINED;
private String serverType;
private String dirtyMessage;
private LogicalType rowType;
private long dataTime;
Expand All @@ -184,7 +183,7 @@ public Builder<T> setDirtyType(DirtyType dirtyType) {
return this;
}

public Builder<T> setServerType(DirtyServerType serverType) {
public Builder<T> setServerType(String serverType) {
this.serverType = serverType;
return this;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.inlong.sort.tubemq.table;

import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
Expand Down Expand Up @@ -144,7 +144,7 @@ public void deserialize(Message message, Collector<RowData> out) throws IOExcept

builder.setData(message.getData())
.setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
.setServerType(DirtyServerType.TUBE_MQ)
.setServerType(MQType.TUBEMQ)
.setDirtyDataTime(dataTime)
.setExtParams(message.getAttribute())
.setLabels(dirtyOptions.getLabels())
Expand Down

0 comments on commit 26f3b75

Please sign in to comment.