Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,10 @@ public RelativeClock getInputActivityClock() {
for (int i = 0; i < rowDataSize; i++) {
row.setField(i, list.get(i));
}
generator.onEvent(row, Long.MIN_VALUE, output);
generator.onPeriodicEmit(output);

ctx.collect(row);

index++;
generator.onEvent(row, Long.MIN_VALUE, output);
generator.onPeriodicEmit(output);
}
ctx.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,6 @@ public RelativeClock getInputActivityClock() {
RowData next;
try {
next = serializer.deserialize(input);
generator.onEvent(next, Long.MIN_VALUE, output);
generator.onPeriodicEmit(output);
} catch (Exception e) {
throw new IOException(
"Failed to deserialize an element from the source. "
Expand All @@ -303,6 +301,8 @@ public RelativeClock getInputActivityClock() {
synchronized (lock) {
ctx.collect(next);
numElementsEmitted++;
generator.onEvent(next, Long.MIN_VALUE, output);
generator.onPeriodicEmit(output);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.planner.plan.nodes.exec.common;

import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecCalc;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc1;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc2;
Expand All @@ -28,14 +30,16 @@
import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;

import java.time.Instant;
import java.time.LocalDateTime;

/**
* {@link TableTestProgram} definitions for testing {@link
* org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecCalc}.
*/
/** {@link TableTestProgram}s for testing {@link StreamExecCalc} and {@link BatchExecCalc}. */
public class CalcTestPrograms {

// --------------------------------------------------------------------------------------------
// With restore data
// --------------------------------------------------------------------------------------------

public static final TableTestProgram SIMPLE_CALC =
TableTestProgram.of("calc-simple", "validates basic calc node")
.setupTableSource(
Expand Down Expand Up @@ -188,7 +192,7 @@ public class CalcTestPrograms {
"11 and 11 and 1702688461000",
"hello world11",
"$hello",
LocalDateTime.of(2023, 12, 16, 01, 01, 00, 0)))
LocalDateTime.of(2023, 12, 16, 1, 1, 0, 0)))
.consumedAfterRestore(
Row.of(
5L,
Expand All @@ -197,7 +201,7 @@ public class CalcTestPrograms {
"11 and 11 and 1702688461000",
"hello world11",
"$hello",
LocalDateTime.of(2023, 12, 16, 01, 01, 00, 0)))
LocalDateTime.of(2023, 12, 16, 1, 1, 0, 0)))
.build())
.runSql(
"INSERT INTO sink_t SELECT "
Expand All @@ -211,4 +215,38 @@ public class CalcTestPrograms {
+ "from source_t where "
+ "(udf1(a) > 0 or (a * b) < 100) and b > 10")
.build();

// --------------------------------------------------------------------------------------------
// Without restore data
// --------------------------------------------------------------------------------------------

public static final TableTestProgram CURRENT_WATERMARK =
TableTestProgram.of(
"calc-current-watermark", "validates the CURRENT_WATERMARK function")
.setupTableSource(
SourceTestStep.newBuilder("t")
.addSchema(
"name STRING",
"ts TIMESTAMP_LTZ(3)",
"WATERMARK FOR ts AS ts")
.producedValues(
Row.of("Bob", Instant.ofEpochMilli(0)),
Row.of("Bob", Instant.ofEpochMilli(1)),
Row.of("Alice", Instant.ofEpochMilli(2)),
Row.of("Bob", Instant.ofEpochMilli(3)))
.build())
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema(
"name STRING",
"ts TIMESTAMP_LTZ(3)",
"w TIMESTAMP_LTZ(3)")
.consumedValues(
"+I[Bob, 1970-01-01T00:00:00Z, null]",
"+I[Bob, 1970-01-01T00:00:00.001Z, 1970-01-01T00:00:00Z]",
"+I[Alice, 1970-01-01T00:00:00.002Z, 1970-01-01T00:00:00.001Z]",
"+I[Bob, 1970-01-01T00:00:00.003Z, 1970-01-01T00:00:00.002Z]")
.build())
.runSql("INSERT INTO sink_t SELECT name, ts, CURRENT_WATERMARK(ts) AS w FROM t")
.build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.planner.plan.nodes.exec.common.CalcTestPrograms;
import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
import org.apache.flink.table.test.program.TableTestProgram;

Expand All @@ -28,6 +29,8 @@ public class MiscSemanticTests extends SemanticTestBase {

@Override
public List<TableTestProgram> programs() {
return List.of(WindowRankTestPrograms.WINDOW_RANK_HOP_TVF_NAMED_MIN_TOP_1);
return List.of(
WindowRankTestPrograms.WINDOW_RANK_HOP_TVF_NAMED_MIN_TOP_1,
CalcTestPrograms.CURRENT_WATERMARK);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -94,19 +93,14 @@ void runTests(TableTestProgram program) throws Exception {
final String id =
TestValuesTableFactory.registerData(
sourceTestStep.dataBeforeRestore);
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("data-id", id);
options.put("runtime-source", "NewSource");
final Map<String, String> options = createSourceOptions(id);
sourceTestStep.apply(env, options);
}
break;
case SINK_WITH_DATA:
{
final SinkTestStep sinkTestStep = (SinkTestStep) testStep;
final Map<String, String> options = new HashMap<>();
options.put("connector", "values");
options.put("sink-insert-only", "false");
final Map<String, String> options = createSinkOptions();
sinkTestStep.apply(env, options);
}
break;
Expand Down Expand Up @@ -135,6 +129,22 @@ void runTests(TableTestProgram program) throws Exception {
}
}

private static Map<String, String> createSourceOptions(String id) {
return Map.ofEntries(
Map.entry("connector", "values"),
Map.entry("data-id", id),
Map.entry("runtime-source", "NewSource"),
// Enforce per-record watermarks for testing
Map.entry("disable-lookup", "true"),
Map.entry("enable-watermark-push-down", "true"),
Map.entry("scan.watermark.emit.strategy", "on-event"));
}

private static Map<String, String> createSinkOptions() {
return Map.ofEntries(
Map.entry("connector", "values"), Map.entry("sink-insert-only", "false"));
}

private static List<String> getActualResults(SinkTestStep sinkTestStep, String tableName) {
if (sinkTestStep.shouldTestChangelogData()) {
return TestValuesTableFactory.getRawResultsAsStrings(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void testWatermarkNotMovingBack() {

tEnv().executeSql(ddl);
tEnv().getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
String query = "SELECT a, c, current_watermark(c) FROM VirtualTable order by c";
String query = "SELECT a, c, CURRENT_WATERMARK(c) FROM VirtualTable ORDER BY c";

final List<Row> result = CollectionUtil.iteratorToList(tEnv().executeSql(query).collect());
final List<String> actualWatermarks =
Expand All @@ -79,30 +79,27 @@ void testWatermarkNotMovingBack() {

// Underneath, we use FromElementSourceFunctionWithWatermark which is a SourceFunction.
// SourceFunction does not support watermark moving back. SourceStreamTask does not support
// WatermarkGenerator natively. The test implementation calls
// WatermarkGenerator#onPeriodicEmit
// after each record, which makes the test deterministic.
// WatermarkGenerator natively.
//
// The test implementation calls WatermarkGenerator#onPeriodicEmit after each record, which
// makes the test deterministic.
//
// Additionally, the GeneratedWatermarkGeneratorSupplier does not deduplicate already
// emitted
// watermarks. This is usually handled by the target WatermarkOutput. In this test, we do
// not deduplicate watermarks because we use TestValuesWatermarkOutput.
// emitted watermarks. This is usually handled by the target WatermarkOutput. In this test,
// we do not deduplicate watermarks because we use TestValuesWatermarkOutput.
// Given the fact watermarks are generated after every record and we don't deduplicate them,
// we have "2024-01-03T00:00" twice in the expected watermarks.
//
// Since the third row is late, the ORDER BY filters the late event. Thus, the result
// contains only 2 elements.
assertThat(actualWatermarks)
.containsExactly("2024-01-01T00:00", "2024-01-03T00:00", "2024-01-03T00:00");
assertThat(result)
.containsExactly(
Row.of(
1,
LocalDateTime.parse("2024-01-01T00:00"),
LocalDateTime.parse("2024-01-01T00:00")),
Row.of(
2,
LocalDateTime.parse("2024-01-02T00:00"),
LocalDateTime.parse("2024-01-03T00:00")),
Row.of(1, LocalDateTime.parse("2024-01-01T00:00"), null),
Row.of(
3,
LocalDateTime.parse("2024-01-03T00:00"),
LocalDateTime.parse("2024-01-03T00:00")));
LocalDateTime.parse("2024-01-01T00:00")));
}
}