在介绍完一些基本概念之后,我们来认识一下,FlinkSQL中的数据类型。 FlinkSQL内置了很多常见的数据类型,并且也为用户提供了自定义数据类型的能力。 总共包含3部分:原子数据类型复合数据类型用户自定义数据类型1。原子数据类型 字符串类型:CHAR、CHAR(n):定长字符串,就和Java中的Char一样,n代表字符的定长,取值范围〔1,2,147,483,647〕。如果不指定n,则默认为1。VARCHAR、VARCHAR(n)、STRING:可变长字符串,就和Java中的String一样,n代表字符的最大长度,取值范围〔1,2,147,483,647〕。如果不指定n,则默认为1。STRING等同于VARCHAR(2147483647)。 二进制字符串类型:BINARY、BINARY(n):定长二进制字符串,n代表定长,取值范围〔1,2,147,483,647〕。如果不指定n,则默认为1。VARBINARY、VARBINARY(n)、BYTES:可变长二进制字符串,n代表字符的最大长度,取值范围〔1,2,147,483,647〕。如果不指定n,则默认为1。BYTES等同于VARBINARY(2147483647)。 精确数值类型:DECIMAL、DECIMAL(p)、DECIMAL(p,s)、DEC、DEC(p)、DEC(p,s)、NUMERIC、NUMERIC(p)、NUMERIC(p,s):固定长度和精度的数值类型,就和Java中的BigDecimal一样,p代表数值位数(长度),取值范围〔1,38〕;s代表小数点后的位数(精度),取值范围〔0,p〕。如果不指定,p默认为10,s默认为0。TINYINT:128到127的1字节大小的有符号整数,就和Java中的byte一样。SMALLINT:32,768to32,767的2字节大小的有符号整数,就和Java中的short一样。INT、INTEGER:2,147,483,648to2,147,483,647的4字节大小的有符号整数,就和Java中的int一样。BIGINT:9,223,372,036,854,775,808to9,223,372,036,854,775,807的8字节大小的有符号整数,就和Java中的long一样。 有损精度数值类型:FLOAT:4字节大小的单精度浮点数值,就和Java中的float一样。DOUBLE、DOUBLEPRECISION:8字节大小的双精度浮点数值,就和Java中的double一样。关于FLOAT和DOUBLE的区别可见https:www。runoob。comw3cnotefloatanddoubledifferent。html 布尔类型:BOOLEAN NULL类型:NULL Raw类型:RAW(class,snapshot)。只会在数据发生网络传输时进行序列化,反序列化操作,可以保留其原始数据。以Java举例,class参数代表具体对应的Java类型,snapshot代表类型在发生网络传输时的序列化器 日期、时间类型:DATE:由年月日组成的不带时区含义的日期类型,取值范围〔00000101,99991231〕TIME、TIME(p):由小时:分钟:秒〔。小数秒〕组成的不带时区含义的的时间的数据类型,精度高达纳秒,取值范围〔00:00:00。000000000到23:59:59。9999999〕。其中p代表小数秒的位数,取值范围〔0,9〕,如果不指定p,默认为0。TIMESTAMP、TIMESTAMP(p)、TIMESTAMPWITHOUTTIMEZONE、TIMESTAMP(p)WITHOUTTIMEZONE:由年月日小时:分钟:秒〔。小数秒〕组成的不带时区含义的时间类型,取值范围〔0000010100:00:00。000000000,9999123123:59:59。999999999〕。其中p代表小数秒的位数,取值范围〔0,9〕,如果不指定p,默认为6。TIMESTAMPWITHTIMEZONE、TIMESTAMP(p)WITHTIMEZONE:由年月日小时:分钟:秒〔。小数秒〕时区组成的带时区含义的时间类型,取值范围〔0000010100:00:00。00000000014:59,9999123123:59:59。99999999914:59〕。其中p代表小数秒的位数,取值范围〔0,9〕,如果不指定p,默认为6。TIMESTAMPLTZ、TIMESTAMPLTZ(p):由年月日小时:分钟:秒〔。小数秒〕时区组成的带时区含义的时间类型,取值范围〔0000010100:00:00。00000000014:59,9999123123:59:59。99999999914:59〕。其中p代表小数秒的位数,取值范围〔0,9〕,如果不指定p,默认为6。TIMESTAMPLTZ与TIMESTAMPWITHTIMEZONE的区别在于:TIMESTAMPWITHTIMEZONE的时区信息是携带在数据中的,举例:其输入数据应该是2022010100:00:00。00000000008:00;TIMESTAMPLTZ的时区信息不是携带在数据中的,而是由FlinkSQL任务的全局配置决定的,我们可以由table。localtimezone参数来设置时区。INTERVALYEARTOMONTH、INTERVALDAYTOSECOND:interval的涉及到的种类比较多。INTERVAL主要是用于给TIMESTAMP、TIMESTAMPLTZ添加偏移量的。举例,比如给TIMESTAMP加、减几天、几个月、几年。INTERVAL子句总共涉及到的语法种类如下FlinkSQL案例所示。CREATETABLEsinktable(resultintervalyearTIMESTAMP(3),resultintervalyearpTIMESTAMP(3),resultintervalyearptomonthTIMESTAMP(3),resultintervalmonthTIMESTAMP(3),resultintervaldayTIMESTAMP(3),resultintervaldayp1TIMESTAMP(3),resultintervaldayp1tohourTIMESTAMP(3),resultintervaldayp1tominuteTIMESTAMP(3),resultintervaldayp1tosecondp2TIMESTAMP(3),resultintervalhourTIMESTAMP(3),resultintervalhourtominuteTIMESTAMP(3),resultintervalhourtosecondTIMESTAMP(3),resultintervalminuteTIMESTAMP(3),resultintervalminutetosecondp2TIMESTAMP(3),resultintervalsecondTIMESTAMP(3),resultintervalsecondp2TIMESTAMP(3))WITH(connectorprint);INSERTINTOsinktableSELECTFlinkSQL支持的所有INTERVAL子句如下,总体可以分为年月、日小时秒两种1。年月。取值范围为〔999911,999911〕,其中p是指有效位数,取值范围〔1,4〕,默认值为2。比如如果值为1000,但是p2,则会直接报错。INTERVALYEARf1INTERVAL10YEARasresultintervalyearINTERVALYEAR(p),f1INTERVAL100YEAR(3)asresultintervalyearpINTERVALYEAR(p)TOMONTH,f1INTERVAL1003YEAR(3)TOMONTHasresultintervalyearptomonthINTERVALMONTH,f1INTERVAL13MONTHasresultintervalmonth2。日小时秒。取值范围为〔99999923:59:59。999999999,99999923:59:59。999999999〕,其中p1p2都是有效位数,p1取值范围〔1,6〕,默认值为2;p2取值范围〔0,9〕,默认值为6INTERVALDAY,f1INTERVAL10DAYasresultintervaldayINTERVALDAY(p1),f1INTERVAL100DAY(3)asresultintervaldayp1INTERVALDAY(p1)TOHOUR,f1INTERVAL1003DAY(3)TOHOURasresultintervaldayp1tohourINTERVALDAY(p1)TOMINUTE,f1INTERVAL1003:12DAY(3)TOMINUTEasresultintervaldayp1tominuteINTERVALDAY(p1)TOSECOND(p2),f1INTERVAL1000:00:00。004DAYTOSECOND(3)asresultintervaldayp1tosecondp2INTERVALHOUR,f1INTERVAL10HOURasresultintervalhourINTERVALHOURTOMINUTE,f1INTERVAL10:03HOURTOMINUTEasresultintervalhourtominuteINTERVALHOURTOSECOND(p2),f1INTERVAL00:00:00。004HOURTOSECOND(3)asresultintervalhourtosecondINTERVALMINUTE,f1INTERVAL10MINUTEasresultintervalminuteINTERVALMINUTETOSECOND(p2),f1INTERVAL05:05。006MINUTETOSECOND(3)asresultintervalminutetosecondp2INTERVALSECOND,f1INTERVAL3SECONDasresultintervalsecondINTERVALSECOND(p2),f1INTERVAL300SECOND(3)asresultintervalsecondp2FROM(SELECTTOTIMESTAMPLTZ(1640966476500,3)asf1)2。复合数据类型 数组类型:ARRAY、tARRAY。数组最大长度为2,147,483,647。t代表数组内的数据类型。举例ARRAY、ARRAY,其等同于INTARRAY、STRINGARRAY Map类型:MAPkt,vt。Map类型就和Java中的Map类型一样,key是没有重复的。举例MapSTRING,INT、MapBIGINT,STRING 集合类型:MULTISET、tMULTISET。就和Java中的List类型,一样,运行重复的数据。举例MULTISET,其等同于INTMULTISET 对象类型:ROW、ROW、ROW(n0t0,n1t1,。。。、ROW(n0t0d0,n1t1d1,。。。)。就和Java中的自定义对象一样。举例:ROW(myFieldINT,myOtherFieldBOOLEAN),其等同于ROW3。用户自定义数据类型 用户自定义类型就是运行用户使用Java等语言自定义一个数据类型出来。但是目前数据类型不支持使用CREATETABLE的DDL进行定义,只支持作为函数的输入输出参数。如下案例:第一步,自定义数据类型publicclassUser{1。基础类型,Flink可以通过反射类型信息自动把数据类型获取到关于SQL类型和Java类型之间的映射见:https:nightlies。apache。orgflinkflinkdocsrelease1。13docsdevtabletypesdatatypeextractionpublicintage;publicStringname;2。复杂类型,用户可以通过DataTypeHint(DECIMAL(10,2))注解标注此字段的数据类型publicDataTypeHint(DECIMAL(10,2))BigDecimaltotalBalance;}第二步,在UDF中使用此数据类型publicclassUserScalarFunctionextendsScalarFunction{1。自定义数据类型作为输出参数publicUsereval(longi){if(i0i5){UserunewUser();u。age(int)i;u。namename1;u。totalBalancenewBigDecimal(1。1d);returnu;}else{UserunewUser();u。age(int)i;u。namename2;u。totalBalancenewBigDecimal(2。2d);returnu;}}2。自定义数据类型作为输入参数publicStringeval(Useri){if(i。age0i。age5){UserunewUser();u。age1;u。namename1;u。totalBalancenewBigDecimal(1。1d);returnu。name;}else{UserunewUser();u。age2;u。namename2;u。totalBalancenewBigDecimal(2。2d);returnu。name;}}}第三步,在FlinkSQL中使用1。创建UDFCREATEFUNCTIONuserscalarfuncASflink。examples。sql。12datatype。02userdefined。UserScalarFunction;2。创建数据源表CREATETABLEsourcetable(useridBIGINTNOTNULLCOMMENT用户id)WITH(connectordatagen,rowspersecond1,fields。userid。min1,fields。userid。max10);3。创建数据汇表CREATETABLEsinktable(resultrow1ROW,resultrow2STRING)WITH(connectorprint);4。SQL查询语句INSERTINTOsinktableselect4。a。用户自定义类型作为输出userscalarfunc(userid)asresultrow1,4。b。用户自定义类型作为输出及输入userscalarfunc(userscalarfunc(userid))asresultrow2fromsourcetable;5。查询结果I〔I〔9,name2,2。20〕,name2〕I〔I〔1,name1,1。10〕,name1〕I〔I〔5,name1,1。10〕,name1〕