Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is this flink jdbc just support batch process? #9

Open
zjw940419 opened this issue May 7, 2020 · 1 comment
Open

Is this flink jdbc just support batch process? #9

zjw940419 opened this issue May 7, 2020 · 1 comment

Comments

@zjw940419
Copy link

mycode is :

public static void main(String[] args) throws Exception {
        Connection connection = DriverManager.getConnection("jdbc:flink://10.33.164.97:8083?planner=old");
        Statement statement = connection.createStatement();

        String sql = "CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = 'universal',\n" +
                "    'connector.topic' = 'user_behavior',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9094',\n" +
                "    'format.type' = 'json'\n" +
                ")";

        int i = statement.executeUpdate(sql);
        ResultSet rs = statement.executeQuery("SELECT * FROM user_behavior");
        while (rs.next()) {
            System.out.println(rs.getString(4));
        }
        statement.close();
        connection.close();
    }

But the result has some error.

End of exception on server side>]
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
	at com.ververica.flink.table.jdbc.rest.SessionClient.submitStatement(SessionClient.java:158)
	... 5 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
com.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL query.
	at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:253)
	at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:89)
	at com.ververica.flink.table.gateway.Session.runStatement(Session.java:84)
	at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
	at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.
	at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:118)
	at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
	at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
	at org.apache.flink.table.api.internal.BatchTableEnvImpl.writeToSink(BatchTableEnvImpl.scala:138)
	at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:664)
	at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:616)
	at org.apache.flink.table.api.internal.BatchTableEnvImpl.insertInto(BatchTableEnvImpl.scala:60)
	at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428)
	at com.ververica.flink.table.gateway.operation.SelectOperation.lambda$executeQueryInternal$0(SelectOperation.java:243)
	at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:230)
	at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:241)
	... 46 more

End of exception on server side>]
	at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
	at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

So what am i doing wrong? or maybe it's just supported batch process,thanks~

@egwada
Copy link

egwada commented Jul 16, 2020

Hi all,

I get the same issue when I try to query an existing table from the Flink-Sql-Training example: as already defined in the Flink-Sql-Gatway as mentioned on that blog post https://gethue.com/blog/sql-editor-for-apache-flink-sql/ and the conf file of the gateway (https://raw.githubusercontent.com/romainr/flink-sql-gateway/master/docs/demo/sql-gateway-defaults.yaml).

Could indicate please if a properties, a config parameter is needed somewhere ?

Regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants