Flink 1.10.0 SQL DDL中如何定义watermark和计算列

随着Flink1.10.0版本的发布,在SQL上一个重大的优化是支持了watermark语义的计算,在之前的Flink1.9.x版本中是不支持的,当时只能用SQL DDL进行processing time的计算,但是现在可以进行eventtime语义的计算了,那在Flink1.10.0版本中也推出了很多新的特性,这里就不在多介绍了,本篇文章主要是接上一篇文章,FlinkSQL使用DDL语句创建kafka源表,主要来介绍一下Flink1.10.0中怎么定义watermark.

SQL DDL 中的 watermark 和计算列语法

Flink 1.10 在 SQL DDL 中增加了针对流处理定义时间属性及产生 watermark 的语法扩展(FLIP-66 [22])。这使得用户可以在用 DDL 语句创建的表上进行基于时间的操作(例如窗口)以及定义 watermark 策略[23]。

CREATE TABLE table_name (
WATERMARK FOR columnName AS <watermark_strategy_expression>
) WITH (...)

Flink提供了几种常用的watermark策略。

1,严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。

WATERMARK FOR rowtime_column AS rowtime_column

2,递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。

WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘0.001’ SECOND.

3,有界时间戳(乱序) 发出水印,它是观察到的最大时间戳减去指定的延迟,例如,WATERMARK FOR rowtime_column AS rowtime_column-INTERVAL’5’SECOND是5秒的延迟水印策略。

WATERMARK FOR rowtime_column AS rowtime_column – INTERVAL ‘string’ timeUnit.

使用 DDL 创建 Kafka 表

val createTable = """ |CREATE TABLE PERSON ( | name VARCHAR COMMENT '姓名', | age VARCHAR COMMENT '年龄', | city VARCHAR COMMENT '所在城市', | address VARCHAR COMMENT '家庭住址', | ts BIGINT COMMENT '时间戳', | t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'yyyy-MM-dd HH:mm:ss')), | proctime as PROCTIME(), | WATERMARK FOR t AS t - INTERVAL '5' SECOND |) |WITH ( | 'connector.type' = 'kafka', -- 使用 kafka connector | 'connector.version' = '0.11', -- kafka 版本 | 'connector.topic' = '', -- kafka topic | 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取 | 'connector.properties.zookeeper.connect' = '', -- zk连接信息 | 'connector.properties.bootstrap.servers' = '', -- broker连接信息 | 'connector.properties.group.id' = '', | 'update-mode' = 'append', | 'format.type' = 'json', -- 数据源格式为 json | 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 |) """.stripMargin

需要注意:

1,Flink目前不支持直接把Long类型的转成Timestamp类型的,如果你的数据源中ts是Long类型的时间戳,建表语句不能直接写成ts TIMESTAMP(3),如果想用timestamp类型的,数据源中的时间需要时UTC格式的.或者可以用我上面那种写法,利用Flink的日期函数TO_TIMESTAMP把bigint类型的转成timestamp类型的.

2,PROCTIME是内置函数产生的一个虚拟列,可以参与后续的计算.

3,WATERMARK是在ts的列上声明了一个可以容忍5秒的乱序.

4,新版本的一些connector属性和之前略微有差异,比如
connector.properties.zookeeper.connect连接zk的属性,之前是

‘connector.properties.0.key’ = ‘zookeeper.connect’,  — 连接信息

 ‘connector.properties.0.value’ = ‘xxx’,

现在是合在一起了.

5,format.type目前支持的数据结构主要有三种,’csv’, ‘json’ and ‘avro’.

6,connector.version,在kafka0.11以上的版本可以写universal表示通用.

下面看完成的代码:

package flink.ddl
import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala._import org.apache.flink.table.api.EnvironmentSettingsimport org.apache.flink.table.api.scala.StreamTableEnvironmentimport org.apache.flink.types.Rowimport org.apache.flink.table.api.scala._
/** * @program: Flink1.9.0 * @description: Flink1.9.1使用DDL创建kafka源表和mysql sink表 * @author: JasonLee * @create: 2020-01-14 19:49 */object FlinkKafkaDDLDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(6) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build()
val tEnv = StreamTableEnvironment.create(env, settings) // 创建kafka源表 val createTable = """ |CREATE TABLE PERSON ( | name VARCHAR COMMENT '姓名', | age VARCHAR COMMENT '年龄', | city VARCHAR COMMENT '所在城市', | address VARCHAR COMMENT '家庭住址', | ts BIGINT COMMENT '时间戳', | t as TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000,'yyyy-MM-dd HH:mm:ss')), | proctime as PROCTIME(), | WATERMARK FOR t AS t - INTERVAL '5' SECOND |) |WITH ( | 'connector.type' = 'kafka', -- 使用 kafka connector | 'connector.version' = '0.11', -- kafka 版本 | 'connector.topic' = '', -- kafka topic | 'connector.startup-mode' = 'latest-offset', -- 从起始 offset 开始读取 | 'connector.properties.zookeeper.connect' = '', -- zk连接信息 | 'connector.properties.bootstrap.servers' = '', -- broker连接信息 | 'connector.properties.group.id' = '', | 'update-mode' = 'append', | 'format.type' = 'json', -- 数据源格式为 json | 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 |) """.stripMargin
tEnv.sqlUpdate(createTable)
val query = """ | SELECT name,COUNT(age) as pv,count(distinct age) as uv, | TUMBLE_START(t, INTERVAL '5' second) as t_start, | TUMBLE_END(t, INTERVAL '5' second) as t_end | FROM PERSON GROUP BY name,TUMBLE(t, INTERVAL '5' second) """.stripMargin
val result = tEnv.sqlQuery(query) result.toRetractStream[Row].print()
tEnv.execute("Flink SQL DDL") }}

声明:文中观点不代表本站立场。本文传送门:https://eyangzhen.com/247151.html

联系我们
联系我们
分享本页
返回顶部