在介绍完一些基本概念之后,我们来认识一下,Flink SQL 中的数据类型。
Flink SQL 内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。
总共包含 3 部分:
1、字符串类型:
2、二进制字符串类型:
3、 精确数值类型:
4、有损精度数值类型:
5、布尔类型:BOOLEAN。
6、NULL 类型:NULL。
7、Raw 类型:RAW('class', 'snapshot') 。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以 Java 举例,class 参数代表具体对应的 Java 类型,snapshot 代表类型在发生网络传输时的序列化器。
8、日期、时间类型:
CREATE TABLE sink_table (
result_interval_year TIMESTAMP(3),
result_interval_year_p TIMESTAMP(3),
result_interval_year_p_to_month TIMESTAMP(3),
result_interval_month TIMESTAMP(3),
result_interval_day TIMESTAMP(3),
result_interval_day_p1 TIMESTAMP(3),
result_interval_day_p1_to_hour TIMESTAMP(3),
result_interval_day_p1_to_minute TIMESTAMP(3),
result_interval_day_p1_to_second_p2 TIMESTAMP(3),
result_interval_hour TIMESTAMP(3),
result_interval_hour_to_minute TIMESTAMP(3),
result_interval_hour_to_second TIMESTAMP(3),
result_interval_minute TIMESTAMP(3),
result_interval_minute_to_second_p2 TIMESTAMP(3),
result_interval_second TIMESTAMP(3),
result_interval_second_p2 TIMESTAMP(3)
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
-- Flink SQL 支持的所有 INTERVAL 子句如下,总体可以分为 `年-月`、`日-小时-秒` 两种
-- 1. 年-月。取值范围为 [-9999-11, +9999-11],其中 p 是指有效位数,取值范围 [1, 4],默认值为 2。比如如果值为 1000,但是 p = 2,则会直接报错。
-- INTERVAL YEAR
f1 + INTERVAL '10' YEAR as result_interval_year
-- INTERVAL YEAR(p)
, f1 + INTERVAL '100' YEAR(3) as result_interval_year_p
-- INTERVAL YEAR(p) TO MONTH
, f1 + INTERVAL '10-03' YEAR(3) TO MONTH as result_interval_year_p_to_month
-- INTERVAL MONTH
, f1 + INTERVAL '13' MONTH as result_interval_month
-- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, +999999 23:59:59.999999999],其中 p1\p2 都是有效位数,p1 取值范围 [1, 6],默认值为 2;p2 取值范围 [0, 9],默认值为 6
-- INTERVAL DAY
, f1 + INTERVAL '10' DAY as result_interval_day
-- INTERVAL DAY(p1)
, f1 + INTERVAL '100' DAY(3) as result_interval_day_p1
-- INTERVAL DAY(p1) TO HOUR
, f1 + INTERVAL '10 03' DAY(3) TO HOUR as result_interval_day_p1_to_hour
-- INTERVAL DAY(p1) TO MINUTE
, f1 + INTERVAL '10 03:12' DAY(3) TO MINUTE as result_interval_day_p1_to_minute
-- INTERVAL DAY(p1) TO SECOND(p2)
, f1 + INTERVAL '10 00:00:00.004' DAY TO SECOND(3) as result_interval_day_p1_to_second_p2
-- INTERVAL HOUR
, f1 + INTERVAL '10' HOUR as result_interval_hour
-- INTERVAL HOUR TO MINUTE
, f1 + INTERVAL '10:03' HOUR TO MINUTE as result_interval_hour_to_minute
-- INTERVAL HOUR TO SECOND(p2)
, f1 + INTERVAL '00:00:00.004' HOUR TO SECOND(3) as result_interval_hour_to_second
-- INTERVAL MINUTE
, f1 + INTERVAL '10' MINUTE as result_interval_minute
-- INTERVAL MINUTE TO SECOND(p2)
, f1 + INTERVAL '05:05.006' MINUTE TO SECOND(3) as result_interval_minute_to_second_p2
-- INTERVAL SECOND
, f1 + INTERVAL '3' SECOND as result_interval_second
-- INTERVAL SECOND(p2)
, f1 + INTERVAL '300' SECOND(3) as result_interval_second_p2
FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)
用户自定义类型就是运行用户使用 Java 等语言自定义一个数据类型出来。但是目前数据类型不支持使用 CREATE TABLE 的 DDL 进行定义,只支持作为函数的输入输出参数。如下案例:
public class User {
// 1. 基础类型,Flink 可以通过反射类型信息自动把数据类型获取到
// 关于 SQL 类型和 Java 类型之间的映射见:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#data-type-extraction
public int age;
public String name;
// 2. 复杂类型,用户可以通过 @DataTypeHint("DECIMAL(10, 2)") 注解标注此字段的数据类型
public @DataTypeHint("DECIMAL(10, 2)") BigDecimal totalBalance;
}
public class UserScalarFunction extends ScalarFunction {
// 1. 自定义数据类型作为输出参数
public User eval(long i) {
if (i > 0 && i <= 5) {
User u = new User();
u.age = (int) i;
u.name = "name1";
u.totalBalance = new BigDecimal(1.1d);
return u;
} else {
User u = new User();
u.age = (int) i;
u.name = "name2";
u.totalBalance = new BigDecimal(2.2d);
return u;
}
}
// 2. 自定义数据类型作为输入参数
public String eval(User i) {
if (i.age > 0 && i.age <= 5) {
User u = new User();
u.age = 1;
u.name = "name1";
u.totalBalance = new BigDecimal(1.1d);
return u.name;
} else {
User u = new User();
u.age = 2;
u.name = "name2";
u.totalBalance = new BigDecimal(2.2d);
return u.name;
}
}
}
-- 1. 创建 UDF
CREATE FUNCTION user_scalar_func AS 'flink.examples.sql._12_data_type._02_user_defined.UserScalarFunction';
-- 2. 创建数据源表
CREATE TABLE source_table (
user_id BIGINT NOT NULL COMMENT '用户 id'
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
-- 3. 创建数据汇表
CREATE TABLE sink_table (
result_row_1 ROW,
result_row_2 STRING
) WITH (
'connector' = 'print'
);
-- 4. SQL 查询语句
INSERT INTO sink_table
select
-- 4.a. 用户自定义类型作为输出
user_scalar_func(user_id) as result_row_1,
-- 4.b. 用户自定义类型作为输出及输入
user_scalar_func(user_scalar_func(user_id)) as result_row_2
from source_table;
-- 5. 查询结果
+I[+I[9, name2, 2.20], name2]
+I[+I[1, name1, 1.10], name1]
+I[+I[5, name1, 1.10], name1]
新闻名称:Flink SQL 知其所以然:SQL 数据类型大全!
分享地址:http://www.csdahua.cn/qtweb/news27/354777.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网