- 支持Window TVF
- 时区和时间函数
- 提升Hive语法兼容性
- 改善SQL Client
- 增强DataStream和Table的转换
- 1.13之前是一个特殊的GroupWindowFunction,1.13使用Table-Valued Function进行语法标准化。
# 1.13之前
select TUMBLE_START(bidtime,INTERVAL '10' MINUTE) as window_start,
TUMBLE_END(bidtime,INTERVAL '10' MINUTES) as window_end,
TUMBLE_ROWTIME(bidtime,INTERVAL '10' MINUTES) as rt,
sum(price)
from MyTable
GROUP BY TUMBLE(bidtime,INTERVAL '10' MINUTE);
# 1.13
select window_start,window_end,window_time,sum(price)
from TABLE(TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL '10' MINUTES))
GROUP BY window_start,window_end;
- 完整的关系代数表达式,输入是一个关系,输出也是一个关系,每个关系对应一个数据集
![](./img/window vtf1.jpg)
- 支持TUMBLE、HOP WIndow
- 新增CUMULATE WINDOW,适合计算增量UV统计需求,比如当前uv从10点到下午6点增量数据
CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)
# 10分钟进行计算,没2分钟累加计算输出一次
SELECT window_start, window_end, SUM(price)
FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
+------------------+------------------+-------+
| window_start | window_end | price |
+------------------+------------------+-------+
| 2020-04-15 08:00 | 2020-04-15 08:06 | 4.00 |
| 2020-04-15 08:00 | 2020-04-15 08:08 | 6.00 |
| 2020-04-15 08:00 | 2020-04-15 08:10 | 11.00 |
| 2020-04-15 08:10 | 2020-04-15 08:12 | 3.00 |
| 2020-04-15 08:10 | 2020-04-15 08:14 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:16 | 4.00 |
| 2020-04-15 08:10 | 2020-04-15 08:18 | 10.00 |
| 2020-04-15 08:10 | 2020-04-15 08:20 | 10.00 |
+------------------+------------------+-------+
![](./img/cumulate window.png)
- 内存优化:通过内存预分配缓存window的数据,通过window watermark触发计算。
- 切片优化:将window切片,尽可能复用已计算结果,如hop window,cumulate window。
- 算子优化:window支持local-global优化,同时支持count(distinct)自动解热点优化。
- 迟到数据:支持迟到数据计算到后续分片,保证数据准确性。
- GROUPING SETS:支持对多纬度数据分组
- ROLLUP:下卷
- CUBE
- Row Number:window topN
- 处理事件窗口刚好晚8个小时
- BIGINT 转 TIMESTAMP是否隐含时区信息
- watermark + 8h 问题
- SQL 时区问题
- CURRENT_DATE 函数问题
- CURRENT_DATE, CURRENT_TIMESTAMP 时区问题
- long 转 timestamp 怎么做
- Hive 分区提交的时区差问题
- PROCTIME()函数语义问题
- 毫秒值如何声明成时间类型
# 通过设置本地时区来决定timestamp_ltz时区
CREATE VIEW T1 AS SELECT TO_TIMESTAMP_LTZ(1621649473000,3)
set table.local-time-zone=UTC;
SELECT * FROM T1;
- PROCTIME()、CURRENT_TIMESTAMP类型为TIMESTAMP_LTZ,LOCALTIMESTAMP类型为TIMESTAMP
- EVENT_TIME也支持TIMESTAMP_LTZ,时间窗口可以根据本地时区而定。
- SORT、CLUSTER、DISTRIBUTE BY
- Group By
- Join
- Union
- LATERL VIEW
- Window Functions
- SubQueries
- CTE:cte
- INSERT INTO dest schema
- Implicit type conversions
# 使用hive catalog、hive module
create catalog myhive with('type'='hive');
use catalog myhive;
load module hive;
use modules hive,core;
set table.sql-dialect=hive;
# 创建hive function
create function function_name as class_name;
drop function [if exists]function_name;
# -i参数支持初始化表的多个ddl语句
sql-client.sh -i init.sql
# -f参数,其中sqlfile文件支持DML(insert into)语句
sql-client.sh -i init.sql -f sqlfile
# 开启verbose模式,输出异常堆栈信息
set sql-client.verbose=true;
# 支持设置作业模式
set execution.runtime-mode=streaming;
# 设置作业名
set pipeline.name=xxx;
# 设置作业savepoint路径
set execution.savepoint.path=xxx
# 指定insert into同步或异步执行,批模式下有依赖的pipeline非常有用
set table.dml-sync=true;
- 支持statement set
begin statement set;
insert into xx select a,b,c from d;
insert into xx1 select a,b,c from d;
end;
- 支持Datastream和Table转换时传递event time和watermark
Table table=tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime","TIMESTAMP(3)")
.watermark("rowtime","SOURCE_WATERMARK()")
.build());
- 支持changlog数据流在table和datastream间互相转换
# DataStream转Table
StreamTableEnvironment.fromChangelogStream(DataStream<Row>):Table
StreamTableEnvironment.fromChangelogStream(DataStream<Row>,Schema):Table
# Table转DataStream
StreamTableEnvironment.toChangelogStream(Table):DataStream<Row>
StreamTableEnvironment.toChangelogStream(Table,Schema):DataStream<Row>
- 响应模式在调整事件上重新启动作业,并从最近完成的检查点恢复作业。这意味着没有创建保存点的开销(手动调整作业需要保存点)。此外,重新调整后重新处理的数据量取决于检查点间隔,恢复时间取决于状态大小。
./bin/standalone-job.sh start -Dscheduler-mode=reactive -Dexecution.checkpointing.interval="10s" -j org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
# 动态增加tm
./bin/taskmanager.sh start
# 动态减少tm
./bin/taaskmanager.sh stop
- scheduler-mode=reactive开启reactive模式
- 可以删除字段。一旦删除,被删除字段的前值将会在将来的 checkpoints 以及 savepoints 中删除。
- 可以添加字段。新字段会使用类型对应的默认值进行初始化
- 不可以修改字段的声明类型。
- 不可以改变 POJO 类型的类名,包括类的命名空间。
-
Flink 完全支持 Avro 状态类型的升级,只要数据结构的修改是被 Avro 的数据结构解析规则认为兼容的即可。
-
一个例外是如果新的 Avro 数据 schema 生成的类无法被重定位或者使用了不同的命名空间,在作业恢复时状态数据会被认为是不兼容的。