Skip to content

Commit 2873d6a

Browse files
luoyuxiawuchong
authored andcommitted
address comment2
1 parent ec3ae3b commit 2873d6a

File tree

3 files changed

+40
-22
lines changed

3 files changed

+40
-22
lines changed

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@
5757
/** Factory to create table source and table sink for Fluss. */
5858
public class FlinkTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
5959

60+
private static final ConfigOption<ConfigOptions.MergeEngine> MERGE_ENGINE_OPTION =
61+
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
62+
.enumType(ConfigOptions.MergeEngine.class)
63+
.noDefaultValue();
64+
6065
private volatile LakeTableFactory lakeTableFactory;
6166

6267
@Override
@@ -146,20 +151,14 @@ public DynamicTableSink createDynamicTableSink(Context context) {
146151
== RuntimeExecutionMode.STREAMING;
147152

148153
RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
149-
ConfigOptions.MergeEngine mergeEngine =
150-
helper.getOptions()
151-
.get(
152-
key(ConfigOptions.TABLE_MERGE_ENGINE.key())
153-
.enumType(ConfigOptions.MergeEngine.class)
154-
.noDefaultValue());
155154

156155
return new FlinkTableSink(
157156
toFlussTablePath(context.getObjectIdentifier()),
158157
toFlussClientConfig(helper.getOptions(), context.getConfiguration()),
159158
rowType,
160159
context.getPrimaryKeyIndexes(),
161160
isStreamingMode,
162-
mergeEngine);
161+
helper.getOptions().get(MERGE_ENGINE_OPTION));
163162
}
164163

165164
@Override

fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSink.java

+17-11
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,20 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
116116
// is 0, when no column specified, it's not partial update
117117
// see FLINK-36000
118118
&& context.getTargetColumns().get().length != 0) {
119-
// check partial update
120-
if (primaryKeyIndexes.length == 0
121-
&& context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
122-
throw new ValidationException(
123-
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
124-
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");
119+
120+
// is partial update, check whether partial update is supported or not
121+
if (context.getTargetColumns().get().length != tableRowType.getFieldCount()) {
122+
if (primaryKeyIndexes.length == 0) {
123+
throw new ValidationException(
124+
"Fluss table sink does not support partial updates for table without primary key. Please make sure the "
125+
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.");
126+
} else if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
127+
throw new ValidationException(
128+
String.format(
129+
"Table %s uses the '%s' merge engine which does not support partial updates. Please make sure the "
130+
+ "number of specified columns in INSERT INTO matches columns of the Fluss table.",
131+
tablePath, ConfigOptions.MergeEngine.FIRST_ROW));
132+
}
125133
}
126134
int[][] targetColumns = context.getTargetColumns().get();
127135
targetColumnIndexes = new int[targetColumns.length];
@@ -291,13 +299,11 @@ private void validateUpdatableAndDeletable() {
291299
tablePath));
292300
}
293301

294-
if (mergeEngine != null && mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
302+
if (mergeEngine == ConfigOptions.MergeEngine.FIRST_ROW) {
295303
throw new UnsupportedOperationException(
296304
String.format(
297-
"Table %s is with merge engine '%s'. Table with '%s' merge engine doesn't support DELETE and UPDATE statements.",
298-
tablePath,
299-
ConfigOptions.MergeEngine.FIRST_ROW,
300-
ConfigOptions.MergeEngine.FIRST_ROW));
305+
"Table %s uses the '%s' merge engine which does not support DELETE or UPDATE statements.",
306+
tablePath, ConfigOptions.MergeEngine.FIRST_ROW));
301307
}
302308
}
303309

fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/sink/FlinkTableSinkITCase.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3838
import org.apache.flink.table.api.EnvironmentSettings;
3939
import org.apache.flink.table.api.TableEnvironment;
40+
import org.apache.flink.table.api.ValidationException;
4041
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
4142
import org.apache.flink.table.api.config.ExecutionConfigOptions;
4243
import org.apache.flink.types.Row;
@@ -746,21 +747,22 @@ void testUnsupportedDeleteAndUpdateStmtOnPartialPK() {
746747
}
747748

748749
@Test
749-
void testUnsupportedDeleteAndUpdateStmtOnFirstRowMergeEngine() {
750+
void testUnsupportedStmtOnFirstRowMergeEngine() {
750751
String t1 = "firstRowMergeEngineTable";
751752
TablePath tablePath = TablePath.of(DEFAULT_DB, t1);
752753
tBatchEnv.executeSql(
753754
String.format(
754755
"create table %s ("
755756
+ " a int not null,"
756-
+ " b bigint not null, "
757+
+ " b bigint null, "
758+
+ " c string null, "
757759
+ " primary key (a) not enforced"
758760
+ ") with ('table.merge-engine' = 'first_row')",
759761
t1));
760762
assertThatThrownBy(() -> tBatchEnv.executeSql("DELETE FROM " + t1 + " WHERE a = 1").await())
761763
.isInstanceOf(UnsupportedOperationException.class)
762764
.hasMessage(
763-
"Table %s is with merge engine 'first_row'. Table with 'first_row' merge engine doesn't support DELETE and UPDATE statements.",
765+
"Table %s uses the 'first_row' merge engine which does not support DELETE or UPDATE statements.",
764766
tablePath);
765767

766768
assertThatThrownBy(
@@ -770,7 +772,18 @@ void testUnsupportedDeleteAndUpdateStmtOnFirstRowMergeEngine() {
770772
.await())
771773
.isInstanceOf(UnsupportedOperationException.class)
772774
.hasMessage(
773-
"Table %s is with merge engine 'first_row'. Table with 'first_row' merge engine doesn't support DELETE and UPDATE statements.",
775+
"Table %s uses the 'first_row' merge engine which does not support DELETE or UPDATE statements.",
776+
tablePath);
777+
778+
assertThatThrownBy(
779+
() ->
780+
tBatchEnv
781+
.executeSql("INSERT INTO " + t1 + "(a, c) VALUES(1, 'c1')")
782+
.await())
783+
.isInstanceOf(ValidationException.class)
784+
.hasMessage(
785+
"Table %s uses the 'first_row' merge engine which does not support partial updates."
786+
+ " Please make sure the number of specified columns in INSERT INTO matches columns of the Fluss table.",
774787
tablePath);
775788
}
776789

0 commit comments

Comments
 (0)