diff --git a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java index d9c487780b49f..5d1d499ec49a9 100644 --- a/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java +++ b/flink-end-to-end-tests/flink-sql-client-test/src/main/java/org/apache/flink/table/toolbox/TestSourceFunction.java @@ -86,12 +86,10 @@ public RelativeClock getInputActivityClock() { for (int i = 0; i < rowDataSize; i++) { row.setField(i, list.get(i)); } - generator.onEvent(row, Long.MIN_VALUE, output); - generator.onPeriodicEmit(output); - ctx.collect(row); - index++; + generator.onEvent(row, Long.MIN_VALUE, output); + generator.onPeriodicEmit(output); } ctx.close(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index c626581dbb28c..5049332a438af 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -289,8 +289,6 @@ public RelativeClock getInputActivityClock() { RowData next; try { next = serializer.deserialize(input); - generator.onEvent(next, Long.MIN_VALUE, output); - generator.onPeriodicEmit(output); } catch (Exception e) { throw new IOException( "Failed to deserialize an element from the source. " @@ -303,6 +301,8 @@ public RelativeClock getInputActivityClock() { synchronized (lock) { ctx.collect(next); numElementsEmitted++; + generator.onEvent(next, Long.MIN_VALUE, output); + generator.onPeriodicEmit(output); } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java index 0ccbaccc7a0ac..289633cfbf28e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/CalcTestPrograms.java @@ -18,6 +18,8 @@ package org.apache.flink.table.planner.plan.nodes.exec.common; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc1; import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc2; @@ -28,14 +30,16 @@ import org.apache.flink.table.test.program.TableTestProgram; import org.apache.flink.types.Row; +import java.time.Instant; import java.time.LocalDateTime; -/** - * {@link TableTestProgram} definitions for testing {@link - * org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc}. - */ +/** {@link TableTestProgram}s for testing {@link StreamExecCalc} and {@link BatchExecCalc}. */ public class CalcTestPrograms { + // -------------------------------------------------------------------------------------------- + // With restore data + // -------------------------------------------------------------------------------------------- + public static final TableTestProgram SIMPLE_CALC = TableTestProgram.of("calc-simple", "validates basic calc node") .setupTableSource( @@ -188,7 +192,7 @@ public class CalcTestPrograms { "11 and 11 and 1702688461000", "hello world11", "$hello", - LocalDateTime.of(2023, 12, 16, 01, 01, 00, 0))) + LocalDateTime.of(2023, 12, 16, 1, 1, 0, 0))) .consumedAfterRestore( Row.of( 5L, @@ -197,7 +201,7 @@ public class CalcTestPrograms { "11 and 11 and 1702688461000", "hello world11", "$hello", - LocalDateTime.of(2023, 12, 16, 01, 01, 00, 0))) + LocalDateTime.of(2023, 12, 16, 1, 1, 0, 0))) .build()) .runSql( "INSERT INTO sink_t SELECT " @@ -211,4 +215,38 @@ public class CalcTestPrograms { + "from source_t where " + "(udf1(a) > 0 or (a * b) < 100) and b > 10") .build(); + + // -------------------------------------------------------------------------------------------- + // Without restore data + // -------------------------------------------------------------------------------------------- + + public static final TableTestProgram CURRENT_WATERMARK = + TableTestProgram.of( + "calc-current-watermark", "validates the CURRENT_WATERMARK function") + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema( + "name STRING", + "ts TIMESTAMP_LTZ(3)", + "WATERMARK FOR ts AS ts") + .producedValues( + Row.of("Bob", Instant.ofEpochMilli(0)), + Row.of("Bob", Instant.ofEpochMilli(1)), + Row.of("Alice", Instant.ofEpochMilli(2)), + Row.of("Bob", Instant.ofEpochMilli(3))) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "name STRING", + "ts TIMESTAMP_LTZ(3)", + "w TIMESTAMP_LTZ(3)") + .consumedValues( + "+I[Bob, 1970-01-01T00:00:00Z, null]", + "+I[Bob, 1970-01-01T00:00:00.001Z, 1970-01-01T00:00:00Z]", + "+I[Alice, 1970-01-01T00:00:00.002Z, 1970-01-01T00:00:00.001Z]", + "+I[Bob, 1970-01-01T00:00:00.003Z, 1970-01-01T00:00:00.002Z]") + .build()) + .runSql("INSERT INTO sink_t SELECT name, ts, CURRENT_WATERMARK(ts) AS w FROM t") + .build(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscSemanticTests.java index 39e3c1b9fe9fc..7467e7cf45ec9 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MiscSemanticTests.java @@ -18,6 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms; import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; import org.apache.flink.table.test.program.TableTestProgram; @@ -28,6 +29,8 @@ public class MiscSemanticTests extends SemanticTestBase { @Override public List programs() { - return List.of(WindowRankTestPrograms.WINDOW_RANK_HOP_TVF_NAMED_MIN_TOP_1); + return List.of( + WindowRankTestPrograms.WINDOW_RANK_HOP_TVF_NAMED_MIN_TOP_1, + CalcTestPrograms.CURRENT_WATERMARK); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/SemanticTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/SemanticTestBase.java index c983ce21cce3b..2d8f4546996fa 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/SemanticTestBase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/testutils/SemanticTestBase.java @@ -39,7 +39,6 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.EnumSet; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -94,19 +93,14 @@ void runTests(TableTestProgram program) throws Exception { final String id = TestValuesTableFactory.registerData( sourceTestStep.dataBeforeRestore); - final Map options = new HashMap<>(); - options.put("connector", "values"); - options.put("data-id", id); - options.put("runtime-source", "NewSource"); + final Map options = createSourceOptions(id); sourceTestStep.apply(env, options); } break; case SINK_WITH_DATA: { final SinkTestStep sinkTestStep = (SinkTestStep) testStep; - final Map options = new HashMap<>(); - options.put("connector", "values"); - options.put("sink-insert-only", "false"); + final Map options = createSinkOptions(); sinkTestStep.apply(env, options); } break; @@ -135,6 +129,22 @@ void runTests(TableTestProgram program) throws Exception { } } + private static Map createSourceOptions(String id) { + return Map.ofEntries( + Map.entry("connector", "values"), + Map.entry("data-id", id), + Map.entry("runtime-source", "NewSource"), + // Enforce per-record watermarks for testing + Map.entry("disable-lookup", "true"), + Map.entry("enable-watermark-push-down", "true"), + Map.entry("scan.watermark.emit.strategy", "on-event")); + } + + private static Map createSinkOptions() { + return Map.ofEntries( + Map.entry("connector", "values"), Map.entry("sink-insert-only", "false")); + } + private static List getActualResults(SinkTestStep sinkTestStep, String tableName) { if (sinkTestStep.shouldTestChangelogData()) { return TestValuesTableFactory.getRawResultsAsStrings(tableName); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.java index 4f7530b27f453..6d811ee58b396 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/WatermarkITCase.java @@ -65,7 +65,7 @@ void testWatermarkNotMovingBack() { tEnv().executeSql(ddl); tEnv().getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1); - String query = "SELECT a, c, current_watermark(c) FROM VirtualTable order by c"; + String query = "SELECT a, c, CURRENT_WATERMARK(c) FROM VirtualTable ORDER BY c"; final List result = CollectionUtil.iteratorToList(tEnv().executeSql(query).collect()); final List actualWatermarks = @@ -79,30 +79,27 @@ void testWatermarkNotMovingBack() { // Underneath, we use FromElementSourceFunctionWithWatermark which is a SourceFunction. // SourceFunction does not support watermark moving back. SourceStreamTask does not support - // WatermarkGenerator natively. The test implementation calls - // WatermarkGenerator#onPeriodicEmit - // after each record, which makes the test deterministic. + // WatermarkGenerator natively. + // + // The test implementation calls WatermarkGenerator#onPeriodicEmit after each record, which + // makes the test deterministic. + // // Additionally, the GeneratedWatermarkGeneratorSupplier does not deduplicate already - // emitted - // watermarks. This is usually handled by the target WatermarkOutput. In this test, we do - // not deduplicate watermarks because we use TestValuesWatermarkOutput. + // emitted watermarks. This is usually handled by the target WatermarkOutput. In this test, + // we do not deduplicate watermarks because we use TestValuesWatermarkOutput. // Given the fact watermarks are generated after every record and we don't deduplicate them, // we have "2024-01-03T00:00" twice in the expected watermarks. + // + // Since the third row is late, the ORDER BY filters the late event. Thus, the result + // contains only 2 elements. assertThat(actualWatermarks) .containsExactly("2024-01-01T00:00", "2024-01-03T00:00", "2024-01-03T00:00"); assertThat(result) .containsExactly( - Row.of( - 1, - LocalDateTime.parse("2024-01-01T00:00"), - LocalDateTime.parse("2024-01-01T00:00")), - Row.of( - 2, - LocalDateTime.parse("2024-01-02T00:00"), - LocalDateTime.parse("2024-01-03T00:00")), + Row.of(1, LocalDateTime.parse("2024-01-01T00:00"), null), Row.of( 3, LocalDateTime.parse("2024-01-03T00:00"), - LocalDateTime.parse("2024-01-03T00:00"))); + LocalDateTime.parse("2024-01-01T00:00"))); } }