Skip to content

Commit e072510

Browse files
committed
Support Flink-1.12
1 parent 63cc437 commit e072510

19 files changed

+50
-58
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ under the License.
2424
<artifactId>flink-sql-gateway</artifactId>
2525
<groupId>com.ververica</groupId>
2626
<name>flink-sql-gateway</name>
27-
<version>0.2-SNAPSHOT</version>
27+
<version>0.3-SNAPSHOT</version>
2828

2929
<description>
3030
Flink SQL gateway is a service that allows other applications
@@ -34,7 +34,7 @@ under the License.
3434
<packaging>jar</packaging>
3535

3636
<properties>
37-
<flink.version>1.11.1</flink.version>
37+
<flink.version>1.12.0</flink.version>
3838
<java.version>1.8</java.version>
3939
<slf4j.version>1.7.15</slf4j.version>
4040
<log4j.version>1.2.17</log4j.version>

src/main/java/com/ververica/flink/table/gateway/SqlGateway.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,12 +151,9 @@ public static void main(String[] args) {
151151

152152
private static void checkFlinkVersion() {
153153
String flinkVersion = EnvironmentInformation.getVersion();
154-
if (!flinkVersion.startsWith("1.11")) {
155-
LOG.error("Only Flink-1.11 is supported now!");
156-
throw new SqlGatewayException("Only Flink-1.11 is supported now!");
157-
} else if (flinkVersion.startsWith("1.11.0")) {
158-
LOG.error("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!");
159-
throw new SqlGatewayException("Flink-1.11.0 is not supported, please use Flink >= 1.11.1!");
154+
if (!flinkVersion.startsWith("1.12")) {
155+
LOG.error("Only Flink-1.12 is supported now!");
156+
throw new SqlGatewayException("Only Flink-1.12 is supported now!");
160157
}
161158
}
162159

src/main/java/com/ververica/flink/table/gateway/context/ExecutionContext.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@
4545
import org.apache.flink.table.api.EnvironmentSettings;
4646
import org.apache.flink.table.api.Table;
4747
import org.apache.flink.table.api.TableConfig;
48-
import org.apache.flink.table.api.TableEnvironment;
4948
import org.apache.flink.table.api.TableException;
5049
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
5150
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
5251
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
5352
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
53+
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
5454
import org.apache.flink.table.catalog.Catalog;
5555
import org.apache.flink.table.catalog.CatalogManager;
5656
import org.apache.flink.table.catalog.FunctionCatalog;
@@ -120,7 +120,7 @@ public class ExecutionContext<ClusterID> {
120120
private final Configuration flinkConfig;
121121
private final ClusterClientFactory<ClusterID> clusterClientFactory;
122122

123-
private TableEnvironment tableEnv;
123+
private TableEnvironmentInternal tableEnv;
124124
private ExecutionEnvironment execEnv;
125125
private StreamExecutionEnvironment streamExecEnv;
126126
private Executor executor;
@@ -212,7 +212,7 @@ void wrapClassLoader(Runnable runnable) {
212212
}
213213
}
214214

215-
public TableEnvironment getTableEnvironment() {
215+
public TableEnvironmentInternal getTableEnvironment() {
216216
return tableEnv;
217217
}
218218

@@ -281,8 +281,7 @@ private static Configuration createExecutionConfig(
281281
availableCommandLines,
282282
activeCommandLine);
283283

284-
Configuration executionConfig = activeCommandLine.applyCommandLineOptionsToConfiguration(
285-
commandLine);
284+
Configuration executionConfig = activeCommandLine.toConfiguration(commandLine);
286285

287286
try {
288287
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
@@ -355,7 +354,7 @@ private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String
355354
throw new SqlExecutionException("Unsupported execution type for sinks.");
356355
}
357356

358-
private TableEnvironment createStreamTableEnvironment(
357+
private TableEnvironmentInternal createStreamTableEnvironment(
359358
StreamExecutionEnvironment env,
360359
EnvironmentSettings settings,
361360
TableConfig config,
@@ -525,9 +524,9 @@ private void initializeCatalogs() {
525524
}
526525
});
527526
// register table sources
528-
tableSources.forEach(tableEnv::registerTableSource);
527+
tableSources.forEach(tableEnv::registerTableSourceInternal);
529528
// register table sinks
530-
tableSinks.forEach(tableEnv::registerTableSink);
529+
tableSinks.forEach(tableEnv::registerTableSinkInternal);
531530

532531
//--------------------------------------------------------------------------------------------------------------
533532
// Step.4 Register temporal tables.

src/main/java/com/ververica/flink/table/gateway/deployment/ProgramDeployer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class ProgramDeployer {
4141
private final Configuration configuration;
4242
private final Pipeline pipeline;
4343
private final String jobName;
44+
private final ClassLoader classLoader;
4445

4546
/**
4647
* Deploys a table program on the cluster.
@@ -52,10 +53,12 @@ public class ProgramDeployer {
5253
public ProgramDeployer(
5354
Configuration configuration,
5455
String jobName,
55-
Pipeline pipeline) {
56+
Pipeline pipeline,
57+
ClassLoader classLoader) {
5658
this.configuration = configuration;
5759
this.pipeline = pipeline;
5860
this.jobName = jobName;
61+
this.classLoader = classLoader;
5962
}
6063

6164
public CompletableFuture<JobClient> deploy() {
@@ -78,7 +81,7 @@ public CompletableFuture<JobClient> deploy() {
7881

7982
final PipelineExecutor executor = executorFactory.getExecutor(configuration);
8083
try {
81-
return executor.execute(pipeline, configuration);
84+
return executor.execute(pipeline, configuration, classLoader);
8285
} catch (Exception e) {
8386
throw new RuntimeException("Could not execute program.", e);
8487
}

src/main/java/com/ververica/flink/table/gateway/operation/DescribeTableOperation.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,12 @@ public ResultSet execute() {
9090
String type = StringUtils.removeEnd(logicalType.toString(), " NOT NULL");
9191
boolean isNullable = logicalType.isNullable();
9292
String key = fieldToPrimaryKey.getOrDefault(column.getName(), null);
93-
String computedColumn = column.getExpr().orElse(null);
93+
final String computedColumn;
94+
if (column instanceof TableColumn.ComputedColumn) {
95+
computedColumn = ((TableColumn.ComputedColumn) column).getExpression();
96+
} else {
97+
computedColumn = null;
98+
}
9499
String watermark = fieldToWatermark.getOrDefault(column.getName(), null);
95100

96101
data.add(Row.of(name, type, isNullable, key, computedColumn, watermark));

src/main/java/com/ververica/flink/table/gateway/operation/InsertOperation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,8 @@ private <C> JobID executeUpdateInternal(ExecutionContext<C> executionContext) {
155155
configuration.set(DeploymentOptions.ATTACHED, false);
156156

157157
// create execution
158-
final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
158+
final ProgramDeployer deployer = new ProgramDeployer(
159+
configuration, jobName, pipeline, context.getExecutionContext().getClassLoader());
159160

160161
// blocking deployment
161162
try {

src/main/java/com/ververica/flink/table/gateway/operation/SelectOperation.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,11 @@ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> executionC
196196
executionContext.getFlinkConfig(),
197197
executionContext.getEnvironment(),
198198
removeTimeAttributes(table.getSchema()),
199-
executionContext.getExecutionConfig(),
200-
executionContext.getClassLoader());
199+
executionContext.getExecutionConfig());
201200
} else {
202201
result = ResultUtil.createBatchResult(
203202
removeTimeAttributes(table.getSchema()),
204-
executionContext.getExecutionConfig(),
205-
executionContext.getClassLoader());
203+
executionContext.getExecutionConfig());
206204
}
207205

208206
String jobName = getJobName(query);
@@ -211,7 +209,7 @@ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> executionC
211209
try {
212210
// writing to a sink requires an optimization step that might reference UDFs during code compilation
213211
executionContext.wrapClassLoader(() -> {
214-
executionContext.getTableEnvironment().registerTableSink(tableName, result.getTableSink());
212+
executionContext.getTableEnvironment().registerTableSinkInternal(tableName, result.getTableSink());
215213
table.insertInto(tableName);
216214
return null;
217215
});
@@ -239,7 +237,8 @@ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> executionC
239237
configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true);
240238

241239
// create execution
242-
final ProgramDeployer deployer = new ProgramDeployer(configuration, jobName, pipeline);
240+
final ProgramDeployer deployer = new ProgramDeployer(
241+
configuration, jobName, pipeline, context.getExecutionContext().getClassLoader());
243242

244243
JobClient jobClient;
245244
// blocking deployment

src/main/java/com/ververica/flink/table/gateway/operation/SqlCommandParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ private static Optional<SqlCommandCall> parseStmt(String stmt, boolean isBlinkPl
194194
operands = new String[0];
195195
} else if (node instanceof SqlUseCatalog) {
196196
cmd = SqlCommand.USE_CATALOG;
197-
operands = new String[] { ((SqlUseCatalog) node).getCatalogName() };
197+
operands = new String[] { ((SqlUseCatalog) node).getCatalogName().toString() };
198198
} else if (node instanceof SqlUseDatabase) {
199199
cmd = SqlCommand.USE;
200200
operands = new String[] { ((SqlUseDatabase) node).getDatabaseName().toString() };

src/main/java/com/ververica/flink/table/gateway/result/BatchResult.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@
3838
import java.util.concurrent.atomic.AtomicReference;
3939
import java.util.function.Consumer;
4040

41-
import static org.apache.flink.util.Preconditions.checkNotNull;
42-
4341
/**
4442
* Collects results using accumulators and returns them as table snapshots.
4543
*/
@@ -48,7 +46,6 @@ public class BatchResult<C> extends AbstractResult<C, Row> {
4846
private final String accumulatorName;
4947
private final CollectBatchTableSink tableSink;
5048
private final Object resultLock;
51-
private final ClassLoader classLoader;
5249

5350
private AtomicReference<SqlExecutionException> executionException = new AtomicReference<>();
5451
private List<Row> resultTable;
@@ -58,19 +55,17 @@ public class BatchResult<C> extends AbstractResult<C, Row> {
5855
public BatchResult(
5956
TableSchema tableSchema,
6057
RowTypeInfo outputType,
61-
ExecutionConfig config,
62-
ClassLoader classLoader) {
58+
ExecutionConfig config) {
6359
// TODO supports large result set
6460
accumulatorName = new AbstractID().toString();
6561
tableSink = new CollectBatchTableSink(accumulatorName, outputType.createSerializer(config), tableSchema);
6662
resultLock = new Object();
67-
this.classLoader = checkNotNull(classLoader);
6863
}
6964

7065
@Override
7166
public void startRetrieval(JobClient jobClient) {
7267
CompletableFuture.completedFuture(jobClient)
73-
.thenCompose(client -> client.getJobExecutionResult(classLoader))
68+
.thenCompose(JobClient::getJobExecutionResult)
7469
.thenAccept(new ResultRetrievalHandler())
7570
.whenComplete((unused, throwable) -> {
7671
if (throwable != null) {

src/main/java/com/ververica/flink/table/gateway/result/ChangelogResult.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,6 @@
4444
import java.util.concurrent.CompletableFuture;
4545
import java.util.concurrent.atomic.AtomicReference;
4646

47-
import static org.apache.flink.util.Preconditions.checkNotNull;
48-
4947
/**
5048
* A result that works similarly to {@link DataStreamUtils#collect(DataStream)}.
5149
*
@@ -56,7 +54,6 @@ public class ChangelogResult<C> extends AbstractResult<C, Tuple2<Boolean, Row>>
5654
private final SocketStreamIterator<Tuple2<Boolean, Row>> iterator;
5755
private final CollectStreamTableSink collectTableSink;
5856
private final ResultRetrievalThread retrievalThread;
59-
private final ClassLoader classLoader;
6057
private CompletableFuture<JobExecutionResult> jobExecutionResultFuture;
6158

6259
private final Object resultLock;
@@ -70,7 +67,6 @@ public ChangelogResult(
7067
ExecutionConfig config,
7168
InetAddress gatewayAddress,
7269
int gatewayPort,
73-
ClassLoader classLoader,
7470
int maxBufferSize) {
7571
resultLock = new Object();
7672

@@ -90,8 +86,6 @@ public ChangelogResult(
9086
iterator.getBindAddress(), iterator.getPort(), serializer, tableSchema);
9187
retrievalThread = new ResultRetrievalThread();
9288

93-
this.classLoader = checkNotNull(classLoader);
94-
9589
// prepare for changelog
9690
changeRecordBuffer = new ArrayList<>();
9791
this.maxBufferSize = maxBufferSize;
@@ -103,7 +97,7 @@ public void startRetrieval(JobClient jobClient) {
10397
retrievalThread.start();
10498

10599
jobExecutionResultFuture = CompletableFuture.completedFuture(jobClient)
106-
.thenCompose(client -> client.getJobExecutionResult(classLoader))
100+
.thenCompose(JobClient::getJobExecutionResult)
107101
.whenComplete((unused, throwable) -> {
108102
if (throwable != null) {
109103
executionException.compareAndSet(

0 commit comments

Comments
 (0)