大家好,我是老羊,今天来学一波 Flink SQL 中的 DDL。
创新互联专注于高县网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供高县营销型网站建设,高县网站制作、高县网页设计、高县网站官网定制、小程序开发服务,打造高县网络公司原创品牌,更为您提供高县网站排名全网营销落地服务。
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
目前 Flink SQL 支持下列 CREATE 语句:
此节重点介绍建表,建数据库、视图和 UDF 会在后面的扩展章节进行介绍。
下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{| | }[ , ...n]
[]
[][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [()] ] :
column_name column_type [] [COMMENT column_comment] :
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]:
column_name AS computed_column_expression [COMMENT column_comment]:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression:
[catalog_name.][db_name.]table_name:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。
其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
举一个仅包含常规列的表的案例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA 关键字标识。
例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。
举例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 读取 kafka 本身自带的时间戳
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
元数据列可以用于后续数据的处理,或者写入到目标表中。
举例:
INSERT INTO MyTable
SELECT
user_id
, name
, record_time + INTERVAL '1' SECOND
FROM MyTable;
如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。
举例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 读取 kafka 本身自带的时间戳
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);
关于 Flink SQL 的每种 Connector 都提供了哪些 metadata 字段,详细可见官网文档 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。
如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致的话,程序运行时会自动 cast 强转。但是这要求两种数据类型是可以强转的。举例如下:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);
默认情况下,Flink SQL planner 认为 metadata 列是可以 读取 也可以 写入 的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。
那么在往一个表写入的场景下,我们就可以使用 VIRTUAL 关键字来标识某个元数据列不写入到外部存储中(不持久化)。
以 Kafka 举例:
CREATE TABLE MyTable (
-- sink 时会写入
`timestamp` BIGINT METADATA,
-- sink 时不写入
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
在上面这个案例中,Kafka 引擎的 offset 是只读的。所以我们在把 MyTable 作为数据源(输入)表时,schema 中是包含 offset 的。在把 MyTable 作为数据汇(输出)表时,schema 中是不包含 offset 的。如下:
-- 当做数据源(输入)的 schema
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
-- 当做数据汇(输出)的 schema
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
所以这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset 列,否则 Flink SQL 任务会直接报错。
计算列其实就是在写建表的 DDL 时,可以拿已有的一些列经过一些自定义的运算生成的新列。这些列本身是没有以物理形式存储到数据源中的。
举例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
-- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity
`cost` AS price * quanitity,
) WITH (
'connector' = 'kafka'
...
);
注意!!!
计算列可以包含其他列、常量或者函数,但是不能写一个子查询进去。
小伙伴萌这时会问到一个问题,既然只能包含列、常量或者函数计算,我就直接在 DML query 代码中写就完事了呗,为啥还要专门在 DDL 中定义呢?
结论:没错,如果只是简单的四则运算的话直接写在 DML 中就可以,但是计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义)。比如要把输入数据的时间格式标准化。处理时间、事件时间分别举例如下:
注意!!!和虚拟 metadata 列是类似的,计算列也是只能读不能写的。
也就是说,我们在把 MyTable 作为数据源(输入)表时,schema 中是包含 cost 的。
在把 MyTable 作为数据汇(输出)表时,schema 中是不包含 cost 的。举例:
-- 当做数据源(输入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
-- 当做数据汇(输出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)
Watermark 是在 Create Table 中进行定义的。具体 SQL 语法标准是 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression。
其中:
注意:
如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。
Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由 pipeline.auto-watermark-interval 进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,然如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。
Flink SQL 提供了几种 WATERMARK 生产策略:
先看一个案例:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
可以看到 DDL 中 With 子句就是在建表时,描述数据源、数据汇的具体外部存储的元数据信息的。
一般 With 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的 With 配置项都是不同的。
注意:
Flink SQL 中 Connector 其实就是 Flink 用于链接外部数据源的接口。举一个类似的例子,在 Java 中想连接到 MySQL,需要使用 mysql-connector-java 包提供的 Java API 去链接。映射到 Flink SQL 中,在 Flink SQL 中要连接到 Kafka,需要使用 kafka connector。
Flink SQL 已经提供了一系列的内置 Connector,具体可见 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。
回到上述案例中,With 声明了以下几项信息:
从这里也可以看出来 With 中具体要配置哪些配置项都是和每种 Connector 决定的。
Like 子句是 Create Table 子句的一个延伸。举例:
下面定义了一张 Orders 表:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
但是忘记定义 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定义一张带 Watermark 的新表:
CREATE TABLE Orders_with_watermark (
-- 1. 添加了 WATERMARK 定义
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数
'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表
LIKE Orders;
上面这个语句的效果就等同于:
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
不过这种不常使用。就不过多介绍了。如果小伙伴萌感兴趣,直接去官网参考具体注意事项:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#like。
分享文章:FlinkSQL知其所以然:SQLDDL!
标题URL:http://www.36103.cn/qtweb/news23/23223.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联