Flink實時大數(shù)據(jù)處理技術(shù) 課件 08章.Table API和SQL_第1頁
Flink實時大數(shù)據(jù)處理技術(shù) 課件 08章.Table API和SQL_第2頁
Flink實時大數(shù)據(jù)處理技術(shù) 課件 08章.Table API和SQL_第3頁
Flink實時大數(shù)據(jù)處理技術(shù) 課件 08章.Table API和SQL_第4頁
Flink實時大數(shù)據(jù)處理技術(shù) 課件 08章.Table API和SQL_第5頁
已閱讀5頁,還剩57頁未讀, 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進(jìn)行舉報或認(rèn)領(lǐng)

文檔簡介

第六章時間與窗口Flink實時大數(shù)據(jù)處理技術(shù)Table

API

&

SQL與關(guān)系型數(shù)據(jù)庫中的查詢相似基于數(shù)據(jù)表Table使用執(zhí)行計劃器(Planner)將關(guān)系型查詢轉(zhuǎn)換為可執(zhí)行的Flink作業(yè)Blink

Planner和Flink

Planner,Blink

Planner將逐漸取代Flink

PlannerTable

API

&

SQL迭代速度較快,最好參考最新的官方文檔創(chuàng)建執(zhí)行環(huán)境(ExecutionEnvironment)和表環(huán)境(TableEnvironment)獲取表使用TableAPI或SQL在表上做查詢等操作將結(jié)果輸出到外部系統(tǒng)執(zhí)行作業(yè)Table

API

&

SQL骨架程序//基于StreamExecutionEnvironment創(chuàng)建TableEnvironment

StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironmenttEnv=StreamTableEnvironment.create(env);//讀取數(shù)據(jù)源,創(chuàng)建數(shù)據(jù)表Table

user_behavior//注冊輸出數(shù)據(jù)表Table

output_table//使用TableAPI查詢user_behavior

TabletabApiResult=tableEnv.from("user_behavior").select(...);//使用SQL查詢TablesqlResult=tableEnv.sqlQuery("SELECT...FROMuser_behavior...");//將查詢結(jié)果輸出到outputTable

tabApiResult.insertInto("output_table");sqlResult.insertInto("output_table");Table

API

&

SQL程序主要步驟:注意添加Maven依賴TableEnvironment是整個程序的入口,功能包括:連接外部系統(tǒng)向目錄(Catalog)中注冊表或者從中獲取表執(zhí)行TableAPI或SQL操作注冊用戶自定義函數(shù)提供一些其他配置功能TableEnvironment

是最頂級的接口StreamTableEnvironment用于流處理,有DataStream和Table之間的轉(zhuǎn)換接口BatchTableEnvironment用于批處理,有DataSet和Table之間的轉(zhuǎn)換接口創(chuàng)建TableEnvironment共5個TableEnvironment,分別面向不同的場景和編程語言用Table來表示廣義的表:需要連接外部系統(tǒng),需要定義Schema,將外部系統(tǒng)數(shù)據(jù)轉(zhuǎn)化為Table。臨時表(TemporaryTable):Flink作業(yè)啟動后臨時創(chuàng)建的表,隨著這個Flink作業(yè)的結(jié)束,臨時表也被銷毀常駐表(PermanentTable):為整個集群上所有用戶和作業(yè)提供服務(wù),基于Catalog,作業(yè)結(jié)束后,Table元數(shù)據(jù)不會被銷毀。Catalog:維護(hù)著常駐表的名字、類型(文件、消息隊列或數(shù)據(jù)庫)、數(shù)據(jù)存儲位置等元數(shù)據(jù)數(shù)據(jù)管理團(tuán)隊在Catalog中創(chuàng)建常駐表,注冊好該表的Schema、注明該表使用何種底層技術(shù)、寫明數(shù)據(jù)存儲位置等;數(shù)據(jù)分析團(tuán)隊無需關(guān)心元數(shù)據(jù),無需了解這個表到底是存儲在Kafka還是HDFS,直接在這個表上進(jìn)行查詢。獲取表調(diào)用TableAPI或SQL進(jìn)行查詢可以在Table上使用TableAPI可以在Table上執(zhí)行SQL語句可以使用TableAPI生成一個表,在此之上進(jìn)行SQL查詢;也可以先進(jìn)行SQL查詢得到一個表,在此之上再調(diào)用TableAPI

在表上執(zhí)行語句StreamTableEnvironmenttEnv=...//創(chuàng)建一個TemporaryTable:user_behavior

TableuserBehaviorTable=tEnv.from("user_behavior");//在Table上使用TableAPI執(zhí)行關(guān)系型操作

TablegroupByUserId=userBehaviorTable.groupBy("user_id").select("user_id,COUNT(behavior)ascnt");//在Table上使用SQL執(zhí)行關(guān)系型操作

TablegroupByUserId=tEnv.sqlQuery("SELECTuser_id,COUNT(behavior)FROMuser_behaviorGROUPBYuser_id");Table

APISQL通過TableSink輸出到外部系統(tǒng)與DataStream

Sink相似將表結(jié)果輸出StreamTableEnvironmenttEnv=...//獲取名為CsvSinkTable的Table

//執(zhí)行查詢操作,得到一個名為result的Table

Tableresult=...//將result發(fā)送到名為CsvSinkTable的TableSink

result.executeInsert("CsvSinkTable");TableAPI或者SQL經(jīng)過Planner轉(zhuǎn)化為JobGraph,Planner在中間起到一個轉(zhuǎn)換和優(yōu)化的作用未經(jīng)優(yōu)化的邏輯執(zhí)行計劃(Logical

Plan)、優(yōu)化器(Optimizer)對Logical

Plan進(jìn)行優(yōu)化,得到物理執(zhí)行計劃(Physical

Plan),Physical

Plan最后轉(zhuǎn)換為Flink的JobGraph可以使用Table.explain()來查看語法樹、邏輯執(zhí)行計劃和物理執(zhí)行計劃執(zhí)行作業(yè)需要配置外部系統(tǒng)的必要參數(shù)、序列化方式、Schema:兩種方式:在程序中使用代碼編輯配置connect()或?qū)ataStream/DataSet轉(zhuǎn)化為表使用聲明式語言,如SQL

DDL或YAMLYAML只能和SQL

Client配合熟悉SQL

DDL的用戶多,未來將主要推廣SQL

DDL獲取表的具體方式流處理上的關(guān)系型查詢借鑒了物化視圖的實現(xiàn)思路批處理關(guān)系型查詢與流處理

批處理關(guān)系型查詢流處理輸入數(shù)據(jù)數(shù)據(jù)是有界的,在有限的數(shù)據(jù)上進(jìn)行查詢數(shù)據(jù)流是無界的,在源源不斷的數(shù)據(jù)流上進(jìn)行查詢執(zhí)行過程一次查詢是在一個批次的數(shù)據(jù)上進(jìn)行查詢,所查詢的數(shù)據(jù)是靜態(tài)確定的一次查詢啟動后需要等待數(shù)據(jù)不斷流入,所查詢的數(shù)據(jù)在未來源源不斷地到達(dá)查詢結(jié)果一次查詢完成后即結(jié)束。結(jié)果是確定的一次查詢會根據(jù)新流入數(shù)據(jù)不斷更新結(jié)果動態(tài)表(DynamicTable)用來表示不斷流入的數(shù)據(jù)表,表中的數(shù)據(jù)不斷更新。在動態(tài)表上進(jìn)行查詢,被稱為持續(xù)查詢。一個持續(xù)查詢的結(jié)果也是動態(tài)表。動態(tài)表上的持續(xù)查詢電商平臺用戶行為分析左側(cè)為數(shù)據(jù)流右側(cè)為轉(zhuǎn)化后的動態(tài)表動態(tài)表上的持續(xù)查詢按user_id字段分組,統(tǒng)計每個user_id所產(chǎn)生的行為總數(shù)新數(shù)據(jù)的插入會導(dǎo)致統(tǒng)計結(jié)果的更新動態(tài)表上的持續(xù)查詢SQL

1SELECT

user_id,COUNT(behavior)ASbehavior_cntFROMuser_behaviorGROUP

BYuser_id按照user_id字段分組,統(tǒng)計每分鐘每個user_id所產(chǎn)生的行為總數(shù)數(shù)據(jù)按照滾動時間窗口來分組動態(tài)表上的持續(xù)查詢SQL

2SELECT

user_id,COUNT(behavior)ASbehavior_cnt,TUMBLE_END(ts,INTERVAL

'1'

MINUTE)ASend_tsFROMuser_behaviorGROUP

BY

user_id,

TUMBLE(ts,INTERVAL

'1'

MINUTE)兩種生成結(jié)果的方式:SQL

2只追加結(jié)果,或者說只在結(jié)果表上進(jìn)行插入操作。SQL

1追加結(jié)果的同時,也對結(jié)果不斷更新,或者說既進(jìn)行插入操作又進(jìn)行更新操作或刪除操作。動態(tài)表上的持續(xù)查詢兩種輸出方式:追加(Append-only)模式:在結(jié)果末尾追加。更新(Update)模式:既在結(jié)果末尾追加,又對已有數(shù)據(jù)更新。對數(shù)據(jù)更新又分為兩種:先將舊數(shù)據(jù)撤回,再添加新數(shù)據(jù),被稱為撤回(Retract)模式直接在舊數(shù)據(jù)上做更新,被稱為插入更新(Upsert)模式動態(tài)表的兩種輸出方式結(jié)果共有3列(flag,user_id,behavior_cnt)其中第一列為標(biāo)志位,表示本行數(shù)據(jù)是加入還是撤回,后兩列是查詢結(jié)果。Retract模式//將table轉(zhuǎn)換為DataStream

//Retract模式,Boolean為標(biāo)志位

DataStream<Tuple2<Boolean,Row>>retractStream=tableEnv.toRetractStream(table,Row.class);輸出結(jié)果需有一個唯一ID,可以根據(jù)唯一ID更新結(jié)果例如user_id一般不重復(fù),可以被用來作為唯一IDUpsert模式要和特定的TableSink緊密結(jié)合Key-Value數(shù)據(jù)更適合進(jìn)行Upsert操作Upsert模式Flink通過狀態(tài)保存中間數(shù)據(jù),狀態(tài)不能無限增加,否則會突破存儲限制??臻e狀態(tài)數(shù)據(jù)是指該數(shù)據(jù)長時間沒有更新,仍然保留在狀態(tài)中。清除空閑狀態(tài)數(shù)據(jù):minTime和maxTime:空閑狀態(tài)至少會保留minTime的時間,這個時間內(nèi)數(shù)據(jù)不會被清理;超過maxTime的時間后,空閑狀態(tài)會被清除。部分狀態(tài)被清除后,會導(dǎo)致計算結(jié)果是近似準(zhǔn)確的狀態(tài)過期時間tEnv.getConfig.setIdleStateRetentionTime(Time.hours(1),Time.hours(2));時間屬性使用TIMESTAMP(intprecision)數(shù)據(jù)類型來表示,對應(yīng)SQL標(biāo)準(zhǔn)中的時間戳類型

precision為精度,表示秒以下保留幾位小數(shù)點時間的格式一般為:year-month-dayhour:minute:second[.fractional]絕大多數(shù)情況可以使用毫秒精度:TIMESTAMP(3)Flink提供的時間單位:MILLISECOND、SECOND、MINUTE、HOUR、DAY、MONTH和YEAR

時間屬性需要在Java/Scala代碼中設(shè)置使用哪種時間語義三種時間語義StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//默認(rèn)使用ProcessingTime

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

//使用IngestionTime

env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);//使用EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);SQL

DDL時間屬性列proctime,使用PROCTIME()函數(shù)計算得到Processing

Time:時間屬性CREATE

TABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3),--在原有Schema基礎(chǔ)上添加一列proctime

proctimeasPROCTIME())WITH(

...);將DataStream轉(zhuǎn)化為表時間屬性列proctimectime:使用proctime函數(shù),生成proctime列Processing

Time:時間屬性DataStream<UserBehavior>userBehaviorDataStream=...//定義了Schema中各字段的名字,其中proctime使用了.proctime屬性,這個屬性幫我們生成一個ProcessingTime

tEnv.createTemporaryView("user_behavior",userBehaviorDataStream,"userIdasuser_id,itemIdasitem_id,categoryIdascategory_id,behavior,ctime");指定時間屬性和Watermark策略SQL:使用WATERMARK關(guān)鍵字,并設(shè)置Watermark策略語法:WATERMARKFORrowtime_columnASwatermark_strategy_expressionEvent

Time:時間屬性&

WatermarkCREATE

TABLEuser_behavior(user_idBIGINT,item_idBIGINT,category_idBIGINT,behaviorSTRING,tsTIMESTAMP(3),--定義ts字段為EventTime時間戳,Watermark比監(jiān)測到的最晚時間還晚5秒

WATERMARKFORtsasts-INTERVAL

'5'

SECOND

)WITH(

...);

語法:WATERMARKFORrowtime_columnASwatermark_strategy_expressionrowtime_column為時間屬性,必須是TIMESTAMP(3)類型watermark_strategy_expression定義了Watermark的生成策略:時間戳嚴(yán)格單調(diào)遞增WATERMARKFORrowtime_columnASrowtime_columnWATERMARKFORrowtime_columnASrowtime_column-INTERVAL'0.001'SECOND監(jiān)測所有數(shù)據(jù)時間戳,并記錄時間戳最大值,在最大值基礎(chǔ)上添加一個1毫秒的延遲作為Watermark時間時間戳是亂序到達(dá)的WATERMARKFORrowtime_columnASrowtime_column-INTERVAL'duration'timeUnittimeUnit可以是SECOND、MINUTE或HOUR等時間單位Event

Time:時間屬性&

Watermark由DataStream轉(zhuǎn)換為表在DataStream

API中設(shè)置好時間戳和Watermarkts.rowtime:使用rowtime函數(shù),生成ts時間戳列Event

Time:時間屬性&

Watermarkenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);DataStream<UserBehavior>userBehaviorDataStream=env.addSource(...)//在DataStream里設(shè)置時間戳和Watermark

.assignTimestampsAndWatermarks(...);//創(chuàng)建一個user_behavior表//ts.rowtime表示該列使用EventTimeTimestamp

tEnv.createTemporaryView("user_behavior",userBehaviorDataStream,"userIdasuser_id,itemIdasitem_id,categoryIdascategory_id,behavior,ts.rowtime");基于時間屬性窗口分組GROUP

BYOVERWINDOW聚合窗口聚合GROUPBYfield1,time_attr_window:time_attr_window窗口分組函數(shù):例如TUMBLE(proctime,INTERVAL'1'MINUTE)所有含有相同field1+time_attr_window的行都會被分到一組再對這組數(shù)據(jù)中的其他字段(如field2)進(jìn)行聚合操作聚合操作:COUNT、SUM、AVG、MAX等將多行數(shù)據(jù)分到一組,然后對一組數(shù)據(jù)集進(jìn)行聚合:多行變一行GROUP

BYTUMBLE(time_attr,interval):滾動窗口窗口是定長的,長度為interval,窗口之間不重疊,滾動向前HOP(time_attr,slide_interval,size_interval)

窗口長度是定長的,長度為size_interval,窗口以slide_interval的速度向前滑動slide_interval

<

size_interval:窗口重疊slide_interval

>

size_interval:窗口之間有間隙SESSION(time_attr,interval):會話窗口窗口長度是變長的,根據(jù)interval劃分窗口時間間隔格式:INTERVAL‘duration’timeUnitINTERVAL'1'MINUTE窗口分組函數(shù)TUMBLE_START(time_attr,interval):當(dāng)前窗口的起始時間返回值不再是時間屬性TUMBLE_END(time_attr,interval)

:當(dāng)前窗口的結(jié)束時間返回值不再是時間屬性TUMBLE_ROWTIME(time_attr,interval)

:窗口的結(jié)束時間返回值是一個時間屬性,后續(xù)的查詢可以使用這個字段基于Event

TimeTUMBLE_PROCTIME(time-attr,interval)

:窗口的結(jié)束時間返回值是一個時間屬性,后續(xù)的查詢可以使用這個字段基于Processing

Time窗口的起始和結(jié)束時間TUMBLE_START

/

TUMBLE_END使用方法:TUMBLE(time_attr,interval)中的interval和TUMBLE_START(time_attr,interval)中的interval保持一致,即INTERVAL‘duration’timeUnit中的duration時間長度和timeUnit時間單位,兩者保持一致TUMBLE_START

/

TUMBLE_ENDTUMBLE_ROWTIME

/TUMBLE_PROCTIME使用方法可以用在內(nèi)聯(lián)視圖子查詢或Join上案例:先使用TUMBLE_ROWTIME創(chuàng)建一個10秒鐘的視圖再在視圖的基礎(chǔ)上進(jìn)行20分鐘的聚合TUMBLE_ROWTIME

/

TUMBLE_PROCTIMESELECTTUMBLE_END(rowtime,INTERVAL'20'MINUTE),user_id,SUM(cnt)

FROM

(SELECTuser_id,COUNT(behavior)AScnt,TUMBLE_ROWTIME(ts,INTERVAL'10'SECOND)ASrowtimeFROMuser_behaviorGROUPBYuser_id,TUMBLE(ts,INTERVAL'10'SECOND)

)GROUPBYTUMBLE(rowtime,INTERVAL'20'MINUTE),user_id每行數(shù)據(jù)生成窗口,在窗口上進(jìn)行聚合,聚合的結(jié)果會生成一個新字段:一行變一行OVER

WINDOW計算流程:先對field1做分組,包含相同field1的行被分到一起,按照時間屬性排序(PARTITION

BY

ORDER

BY

…)每行數(shù)據(jù)建立一個窗口,窗口起始點為第一行數(shù)據(jù),窗口結(jié)束點是當(dāng)前行對窗口內(nèi)field2字段做各類聚合操作,生成field2_agg的新字段(COUNT、SUM、AVG、MAX等)Flink為每行元素維護(hù)一個窗口,為每行元素執(zhí)行一次窗口計算,完成計算后清除過期數(shù)據(jù)OVER

WINDOWOVER

WINDOW的計算過程windowDefinition中定義了窗口規(guī)則使用哪些字段進(jìn)行PARTITIONBY使用時間屬性進(jìn)行ORDER

BY定義窗口的起始點和結(jié)束點在定義好的窗口上,使用聚合函數(shù)AGG_FUNCTION對某個字段進(jìn)行聚合計算COUNT、MAX等OVER

WINDOW語法SELECTAGG_FUNCTION(field2)OVER(windowDefinition2)ASfield2_agg,...AGG_FUNCTION(fieldN)OVER(windowDefinitionN)ASfieldN_aggFROMtab1SELECT

AGG_FUNCTION(field2)OVERwASfield2_agg,...FROMtab1WINDOWwAS(windowDefinition)在SQL語句最后,使用別名AS定義WINDOW使用OVER

windowDefinition

AS

…語法結(jié)構(gòu)定義窗口ROWS按行劃分WINDOWwAS(...)定義了名為w的窗口,根據(jù)user_id來分組,按照ts排序,相同user_id的行會分到一組,組內(nèi)按照時間戳ts來排序ROWSBETWEENUNBOUNDEDPRECEDINGANDCURRENTROW定義了窗口的起始點和結(jié)束點,起始點為UNBOUNDEDPRECEDING,即數(shù)據(jù)流的最開始的行,結(jié)束點為CURRENTROW當(dāng)前行ORDER

BY只支持時間屬性的排序,無法對其他字段進(jìn)行排序窗口劃分方式-

ROWSSELECT

user_id,behavior,COUNT(*)OVERwASbehavior_count,tsFROMuser_behaviorWINDOWwAS(PARTITION

BYuser_idORDER

BYtsROWS

BETWEEN

UNBOUNDED

PRECEDING

AND

CURRENT

ROW

)右圖上半部分,窗口起始點為數(shù)據(jù)流的第一個元素,結(jié)束點為當(dāng)前行右圖下半部分,窗口起始點本元素的前一個元素,結(jié)束點為當(dāng)前行右圖下半部分,最后兩個元素同時到達(dá),按行劃分,被劃分到2個窗口窗口劃分方式-

ROWSPARTITIONBY可選,根據(jù)一到多個字段對數(shù)據(jù)進(jìn)行分組ORDERBY之后必須是時間屬性,按照時間排序ROWSBETWEEN...AND...界定窗口的起始點和結(jié)束點窗口劃分方式-

ROWSSELECT

field1,AGG_FUNCTION(field2)OVER([PARTITION

BY(value_expression1,...,value_expressionN)]ORDER

BYtimeAttrROWS

BETWEEN(UNBOUNDED|rowCount)PRECEDING

AND

CURRENT

ROW)ASfieldNameFROMtab1--使用AS

SELECT

field1,AGG_FUNCTION(field2)OVERwASfieldNameFROMtab1WINDOWwAS([PARTITION

BY(value_expression1,...,value_expressionN)]ORDER

BYtimeAttrROWS

BETWEEN(UNBOUNDED|rowCount)PRECEDING

AND

CURRENT

ROW

)RANGE按時間段劃分WINDOWwAS(...)語法結(jié)構(gòu)與之前的類似使用RANGE關(guān)鍵字窗口的結(jié)束點是當(dāng)前行,起始點是當(dāng)前行之前的某個時間點(當(dāng)前行的時間-

interval)窗口劃分方式-

RANGESELECT

user_id,COUNT(*)OVERwASbehavior_count,tsFROMuser_behaviorWINDOWwAS(PARTITION

BYuser_idORDER

BYtsRANGE

BETWEEN

INTERVAL

'2'

SECOND

PRECEDING

AND

CURRENT

ROW

)右圖上半部分,窗口起始點為數(shù)據(jù)流的第一個元素,結(jié)束點為當(dāng)前元素。與ROWS不同,最后兩個元素同時到達(dá),被劃分到一個窗口w4中。右圖下半部分,窗口起始點為當(dāng)前元素減去2秒,結(jié)束點為當(dāng)前元素。最后兩個元素也被劃分到同一個窗口w4中。窗口劃分方式-

RANGEPARTITIONBY可選,根據(jù)一到多個字段對數(shù)據(jù)進(jìn)行分組ORDERBY之后必須是時間屬性,按照時間排序RANGE

BETWEEN...AND...界定窗口的起始點和結(jié)束點可以使用timeIntervalPRECEDING來表示當(dāng)前行之前的某個時間點作為起始點窗口劃分方式-

RANGESELECT

field1,AGG_FUNCTION(field2)OVER([PARTITION

BY(value_expression1,...,value_expressionN)]ORDER

BYtimeAttrRANGE

BETWEEN(UNBOUNDED|timeInterval)PRECEDING

AND

CURRENT

ROW)ASfieldNameFROMtab1--使用AS

SELECT

field1,AGG_FUNCTION(field2)OVERwASfieldNameFROMtab1WINDOWwAS([PARTITION

BY(value_expression1,...,value_expressionN)]ORDER

BYtimeAttrRANGE

BETWEEN(UNBOUNDED|timeInterval)PRECEDING

AND

CURRENT

ROW

)常見的Join:INNER

JOIN、LEFT/RIGHT/FULLOUTERJOIN使用批處理,在靜態(tài)數(shù)據(jù)集上進(jìn)行Join已經(jīng)比較成熟:嵌套循環(huán)、排序合并、哈希合并

Flink的三種Join時間窗口Join(Time-windowedJoin)臨時表Join(TemporalTableJoin)傳統(tǒng)意義上的Join(RegularJoin)JoinSELECT

orders.order_id,customers.customer_name,orders.order_dateFROMordersINNER

JOINcustomersONorders.customer_id=customers.customer_id;//循環(huán)遍歷orders的每個元素forrow_orderinorders://循環(huán)遍歷customers的每個元素forrow_customerincustomers:ifrow_order.customer_id=row_customer.customer_idreturn(row_order.order_id,row_customer.customer_mame,row_order.order_date)endend循環(huán)嵌套偽代碼一個INNER

JOIN案例案例:聊天對話數(shù)據(jù)流chat表包含了買家和賣家聊天信息,chat表與user_behavior進(jìn)行Join對item_id字段進(jìn)行Join,并增加時間窗口的限制時間窗口JoinSELECT

user_behavior.item_id,user_behavior.tsASbuy_tsFROMchat,user_behaviorWHEREchat.item_id=user_behavior.item_idANDuser_behavior.behavior='buy’

ANDuser_behavior.tsBETWEENchat.tsANDchat.ts+INTERVAL

'1'

MINUTE;與DataStream

API中的Interval

Join相似A表中所有包含在界限內(nèi)的元素與B表元素連接BETWEEN...AND...設(shè)置了時間窗口,也可以使用比較符號>,<,>=,<=A表和B表必須都是Append-only模式的表Flink使用狀態(tài)存儲時間窗口相關(guān)數(shù)據(jù)時間窗口JoinSELECT

*FROMA,BWHEREA.id=B.id

ANDA.tsBETWEENB.ts-lowBoundANDB.ts+upperBound;

將一個基于時間的日志表抽象成為臨時表(Temporal

Table)案例:商品價格日志表item_log,包含了每個商品的每次價格變動,price為當(dāng)前的價格,version_ts為價格改動的時間戳不同時間點,商品價格不同。TemporalTable為某個時間點的臨時表臨時表Join其他表與臨時表進(jìn)行Join,希望得到某個時間點的Join結(jié)果案例:user_behavior與item_log表進(jìn)行Join,得到產(chǎn)生用戶行為時間點的價格臨時表Joinitem_log與user_behavior進(jìn)行臨時表Join示意圖臨時表使用方法:注冊臨時表在SQL語句中使用臨時表registerFunction()注冊臨時表,名為item在SQL語句中,item(user_behavior.ts)按照user_behavior表中的ts來獲取該時間點上對應(yīng)的臨時表,將這個表命名為latest_itemuser_behavior與latest_item進(jìn)行Join臨時表JoinDataStream<Tuple3<Long,Long,Timestamp>>itemStream=...//獲取Table

TableitemTable=tEnv.fromDataStream(itemStream,"item_id,price,version_ts.rowtime");//注冊TemporalTableFunction,指定時間屬性和Key

tEnv.registerFunction("item",itemTable.createTemporalTableFunction("version_ts","item_id"));在Java代碼中注冊臨時表SELECT

user_behavior.item_id,latest_item.price,user_behavior.tsFROMuser_behavior,LATERALTABLE(item(user_behavior.ts))ASlatest_itemWHEREuser_behavior.item_id=latest_item.item_id ANDuser_behavior.behavior='buy'在SQL語句中使用臨時表item臨時表Join注意事項:A表必須是一個Append-only的追加表。臨時表B的數(shù)據(jù)源必須是一個Append-only的追加表,必須使用registerFunction()將該追加表注冊到Catalog中。注冊時需要指定Key和時間屬性。表A和臨時表B通過Key進(jìn)行等于謂詞匹配:A.id=B.id。Flink用狀態(tài)維護(hù)中間數(shù)據(jù)臨時表JoinSELECT

*FROMA,LATERALTABLE(B(A.ts))WHEREA.id=B.id從時間維度上理解臨時表Join最常規(guī)的Join案例:商品價格表只保存了當(dāng)前最新的價格,沒有保存修改記錄傳統(tǒng)意義上的Join

SELECT

user_behavior.item_id,item.priceFROMuser_behavior,itemWHEREuser_behavior.item_id=item.item_idANDuser_behavior.behavior='buy'A和B可以是Append-only的追加表,也可以是可更新的Update表,A、B兩個表中的數(shù)據(jù)可以插入、刪除和更新。A、B表對應(yīng)的元素都會被連接起來。盡量避免笛卡爾積式的連接。Flink用狀態(tài)存儲一些中間數(shù)據(jù),最好設(shè)置狀態(tài)過期時間。傳統(tǒng)意義上的Join

SELECT

*FROMAINNER

JOINBONA.id=B.idCatalog記錄并管理各類元數(shù)據(jù)信息有哪些數(shù)據(jù)庫(Database)、存儲形式為文件、消息隊列、數(shù)據(jù)庫數(shù)據(jù)庫中有哪些表有哪些可用的函數(shù)一個Catalog下有一到多個Database,一個Database下有一到多個表GenericInMemoryCatalog:將元數(shù)據(jù)存儲在內(nèi)存,只在一個Session內(nèi)生效HiveCatalog:可以將元數(shù)據(jù)持久化,數(shù)據(jù)管理團(tuán)隊將數(shù)據(jù)注冊到HiveCatalog中,數(shù)據(jù)分析團(tuán)隊從HiveCatalog中獲取表,直接進(jìn)行計算Catalog使用SQL

DDL:CREATE

TABLE

…未來將主要使用這種方式使用TableEnvironment.connect()未來將逐漸廢棄從Catalog中獲取已注冊的表如何獲取表USE、SHOWCREATE、DROP、ALTER

將SQL語句粘貼到Java代碼的executeSql()中執(zhí)行常見SQL

DDL根據(jù)是否為系統(tǒng)內(nèi)置來分類系統(tǒng)內(nèi)置函數(shù)(System

Function):Flink提供的內(nèi)置函數(shù)非系統(tǒng)內(nèi)置函數(shù):需要我們注冊到某個Catalog中,又被稱為目錄函數(shù)(Catalog

Function)根據(jù)是否為臨時函數(shù)來分類臨時函數(shù)(TemporaryFunction),只存在于某個Flink

Session中,Session結(jié)束后就不可使用。非臨時函數(shù),又被稱為持久化函數(shù)(PersistentFunction),可以是一個系統(tǒng)內(nèi)置函數(shù),也可以是一個目錄函數(shù)。函數(shù)根據(jù)上述維度分為:臨時系統(tǒng)內(nèi)置函數(shù)(TemporarySystemFunction)。持久化系統(tǒng)內(nèi)置函數(shù)(SystemFunction)。臨時目錄函數(shù)(TemporaryCatalogFunction)。持久化目錄函數(shù)(CatalogFunction)。函數(shù)分類標(biāo)量函數(shù)邏輯函數(shù)數(shù)學(xué)函數(shù)字符串函數(shù)時間函數(shù)判斷函數(shù)類型轉(zhuǎn)化函數(shù)集合函數(shù)聚合函數(shù)系統(tǒng)內(nèi)置函數(shù)當(dāng)系統(tǒng)內(nèi)置函數(shù)無法滿足特定的需求時,可以進(jìn)行用戶自定義函數(shù)registerFunction():將函數(shù)名和對應(yīng)實現(xiàn)記錄下來。自定義函數(shù):需要自己實現(xiàn)函數(shù)的業(yè)務(wù)邏輯三種自定義函數(shù):標(biāo)量函數(shù)表函數(shù)聚合函數(shù)用戶自定義函數(shù)標(biāo)量函數(shù)接收零個、一個或者多個輸入,生成一個單值輸出。案例:經(jīng)緯度,判斷給定經(jīng)緯度數(shù)據(jù)是否在北京四環(huán)以內(nèi)

繼承ScalarFunction,實現(xiàn)eval()方法注意eval()方法的輸入?yún)?shù)類型和返回結(jié)果類型eval()方法的輸入和輸出類型決定了函數(shù)的輸入和輸出類型標(biāo)量函數(shù)public

class

IsInFourRing

extends

ScalarFunction{//北京四環(huán)經(jīng)緯度范圍

private

static

doubleLON_EAST=116.48;private

static

doubleLON_WEST=116.27;private

static

doubleLAT_NORTH=39.988;private

static

doubleLAT_SOUTH=39.83;//判斷輸入的經(jīng)緯度是否在四環(huán)內(nèi)

public

boolean

eval(doublelon,doublelat)

{return!(lon>LON_EAST||lon<LON_WEST)&& !(lat>LAT_NORTH||lat<LAT_SOUTH);}}自定義好函數(shù)后,還需

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負(fù)責(zé)。
  • 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時也不承擔(dān)用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論