版權(quán)說(shuō)明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡(jiǎn)介
時(shí)間和窗口本章將重點(diǎn)介紹Flink的時(shí)間和窗口相關(guān)知識(shí)。通過(guò)本章的學(xué)習(xí)我們將了解不同時(shí)間語(yǔ)義下對(duì)數(shù)據(jù)流的相關(guān)處理。Window是Flink流計(jì)算的核心,我們將帶著大家學(xué)習(xí)窗口的生命周期,不同類型窗口的相關(guān)操作等。通過(guò)本節(jié)學(xué)習(xí)您將可以:熟悉Flink的時(shí)間語(yǔ)義。熟悉Flink最底層的API:ProcessFunction。掌握窗口以及窗口上的計(jì)算。掌握數(shù)據(jù)流上的Join操作。了解如何處理遲到數(shù)據(jù)。Flink的時(shí)間語(yǔ)義ProcessFunction系列函數(shù)窗口算子的使用雙流關(guān)聯(lián)處理遲到數(shù)據(jù)
三種時(shí)間語(yǔ)義事件到達(dá)Flink的時(shí)間可能是亂序的Event
Time:事件發(fā)生的時(shí)間無(wú)需擔(dān)心亂序到達(dá)問(wèn)題Watermark假設(shè)不會(huì)有更晚的數(shù)據(jù)需要用緩存存儲(chǔ)中間數(shù)據(jù),增大了延遲Processing
Time:當(dāng)前節(jié)點(diǎn)的系統(tǒng)時(shí)鐘時(shí)間計(jì)算結(jié)果有不確定性不需要設(shè)置Watermark延遲較低Ingestion
Time事件到達(dá)FlinkSource的時(shí)間不需要設(shè)置Watermark延遲較低在執(zhí)行環(huán)境層面設(shè)置使用哪種時(shí)間語(yǔ)義TimeCharacteristic.EventTimeTimeCharacteristic.ProcessingTimeTimeCharacteristic.IngestionTimefromElements()
或fromCollection()
創(chuàng)建的DataStream應(yīng)該使用Event
Time,并對(duì)數(shù)據(jù)流中的每個(gè)元素的Event
Time賦值使用帶時(shí)序性的Source:Socket、Kafka等或者不帶時(shí)序性的Source,使用Event
Time,并生成Watermark:文件等設(shè)置時(shí)間語(yǔ)義env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);EventTime時(shí)間語(yǔ)義使用時(shí)間戳為數(shù)據(jù)流中的每個(gè)事件的Event
Time賦值生成WatermarkUnix時(shí)間戳系統(tǒng)、毫秒精度Watermark是插入到數(shù)據(jù)流中的一種特殊數(shù)據(jù)結(jié)構(gòu),它包含一個(gè)時(shí)間戳,并假設(shè)后續(xù)不會(huì)有小于該時(shí)間戳的數(shù)據(jù)Watermark時(shí)間戳必須單調(diào)遞增Watermark與事件時(shí)間越緊湊,越容易產(chǎn)生延遲數(shù)據(jù);越寬松,等待時(shí)間越長(zhǎng)、延遲越高Event
Time和WatermarkWatermark需要在并行環(huán)境下向前傳播每個(gè)算子子任務(wù)維護(hù)一個(gè)針對(duì)該子任務(wù)的Event
Time時(shí)鐘,時(shí)鐘記錄了這個(gè)算子子任務(wù)的處理速度上游算子的Watermark數(shù)據(jù)不斷向下發(fā)送,算子子任務(wù)的Event
Time時(shí)鐘也要不斷向前更新每個(gè)算子子任務(wù)也要維護(hù)來(lái)自上游多個(gè)分區(qū)的Watermark信息:
PartitionWatermark
分布式環(huán)境下Watermark的傳播Flink先判斷新流入的Watermark時(shí)間戳是否大于PartitionWatermark列表內(nèi)該分區(qū)的歷史Watermark時(shí)間戳,大于則更新該P(yáng)artition
Watermark時(shí)間戳遍歷Partition
Watermark列表,選擇最小的作為該算子子任務(wù)Event
Time時(shí)鐘,如果更新了Event
Time時(shí)鐘,則將更新的Event
Time作為Watermark發(fā)送給下游所有算子子任務(wù)分布式環(huán)境下Watermark的傳播抽取時(shí)間戳和生成Watermark兩者緊密結(jié)合只在Event
Time語(yǔ)義下有效時(shí)間越早設(shè)置越好設(shè)置方法:在Source中設(shè)置在Source之后設(shè)置抽取時(shí)間戳及生成Watermark老的Source接口:實(shí)現(xiàn)SourceFunction或RichSourceFunction抽取時(shí)間戳:collectWithTimestamp()生成Watermark:emitWatermark()SourceclassMySourceextendsRichSourceFunction[MyType]{@Overridepublicvoidrun(SourceContext<MyType>ctx)throwsException{while(/*condition*/){MyTypenext=getNext();ctx.collectWithTimestamp(next,next.eventTime);if(next.hasWatermarkTime()){
ctx.emitWatermark(newWatermark(next.watermarkTime));}}}}assignTimestampsAndWatermarks()方法:Flink
1.11之后進(jìn)行了重構(gòu)WatermarkStrategy抽取時(shí)間戳:withTimestampAssigner().withTimestampAssigner((event,timestamp)->event.eventTime)生成Watermark:forGenerator()實(shí)現(xiàn)自己的Watermark策略周期性地Periodic逐個(gè)式地PunctuatedSource之后DataStream<MyType>stream=...DataStream<MyType>withTimestampsAndWatermarks=stream.assignTimestampsAndWatermarks( WatermarkStrategy
.forGenerator(...)
.withTimestampAssigner(...));周期可以設(shè)置默認(rèn)每200毫秒生成一次env.getConfig.setAutoWatermarkInterval(5000L)實(shí)現(xiàn)WatermarkGeneratoronEvent():數(shù)據(jù)流中每個(gè)元素到達(dá)后調(diào)用該方法onPeriodicEmit():定期發(fā)射WatermarkMyPeriodicGeneratorcurrentMaxTimestamp記錄已抽取的時(shí)間戳最大值時(shí)間戳最大值減1分鐘作為Watermark發(fā)送出去周期性地生成Watermark//定期生成Watermark
//數(shù)據(jù)流元素Tuple2<String,Long>共兩個(gè)字段
//第一個(gè)字段為數(shù)據(jù)本身
//第二個(gè)字段是時(shí)間戳
public
static
class
MyPeriodicGenerator
implements
WatermarkGenerator<Tuple2<String,Long>>{private
final
longmaxOutOfOrderness=60*1000;//1分鐘
private
longcurrentMaxTimestamp;//已抽取的Timestamp最大值
@Overridepublic
void
onEvent(Tuple2<String,Long>event,longeventTimestamp,WatermarkOutputoutput)
{//更新currentMaxTimestamp為當(dāng)前遇到的最大值
currentMaxTimestamp=Math.max(currentMaxTimestamp,eventTimestamp);}@Overridepublic
void
onPeriodicEmit(WatermarkOutputoutput)
{//Watermark比currentMaxTimestamp最大值慢1分鐘
output.emitWatermark(newWatermark(currentMaxTimestamp-maxOutOfOrderness));}}用forGenerator()方法調(diào)用MyPeriodicGenerator類基于時(shí)間戳最大值的場(chǎng)景比較普遍,F(xiàn)link做了進(jìn)一步封裝BoundedOutOfOrdernessWatermarksforBoundedOutOfOrderness()方法AscendingTimestampsWatermarksEvent
Time時(shí)間戳單調(diào)遞增forMonotonousTimestamps()方法周期性地生成Watermark//第二個(gè)字段是時(shí)間戳
DataStream<Tuple2<String,Long>>watermark=input.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((context->newMyPeriodicGenerator())).withTimestampAssigner((event,recordTimestamp)->event.f1));//第二個(gè)字段是時(shí)間戳
DataStream<Tuple2<String,Long>>input=env.addSource(newMySource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event,timestamp)->event.f1));使用自己實(shí)現(xiàn)的MyPeriodicGenerator使用Flink封裝的forBoundedOutOfOrderness數(shù)據(jù)流元素有特殊標(biāo)記,標(biāo)記哪些元素為Watermark根據(jù)元素是否有特殊標(biāo)記,判斷是否生成Watermark每個(gè)元素都生成一個(gè)Watermark,增大下游計(jì)算的延遲,拖累整個(gè)Flink作業(yè)的性能。逐個(gè)式地生成Watermark//逐個(gè)檢查數(shù)據(jù)流中的元素,根據(jù)元素中的特殊字段,判斷是否要生成Watermark
//數(shù)據(jù)流元素Tuple3<String,Long,Boolean>共三個(gè)字段
//第一個(gè)字段為數(shù)據(jù)本身
//第二個(gè)字段是時(shí)間戳
//第三個(gè)字段判斷是否為Watermark的標(biāo)記
public
static
class
MyPunctuatedGenerator
implements
WatermarkGenerator<Tuple3<String,Long,Boolean>>{@Overridepublic
void
onEvent(Tuple3<String,Long,Boolean>event,longeventTimestamp,WatermarkOutputoutput)
{if(event.f2){output.emitWatermark(newWatermark(event.f1));}}@Overridepublic
void
onPeriodicEmit(WatermarkOutputoutput)
{//這里不需要做任何事情,因?yàn)槲覀冊(cè)趏nEvent()方法中生成了Watermark
}}Watermark策略:在延遲和準(zhǔn)確性之間平衡Watermark策略沒(méi)有標(biāo)準(zhǔn)答案我們無(wú)法預(yù)知流處理有多少遲到數(shù)據(jù)Watermark與事件時(shí)間戳貼合較緊,一些數(shù)據(jù)會(huì)被當(dāng)成遲到數(shù)據(jù),影響計(jì)算結(jié)果的準(zhǔn)確性Watarmark設(shè)置得較松,更多數(shù)據(jù)會(huì)先緩存起來(lái)以等待計(jì)算,整個(gè)應(yīng)用的延遲增加;增大了內(nèi)存的壓力延遲與準(zhǔn)確性Flink的時(shí)間語(yǔ)義ProcessFunction系列函數(shù)窗口算子的使用雙流關(guān)聯(lián)處理遲到數(shù)據(jù)Flink體系中最底層的API提供了對(duì)數(shù)據(jù)流更細(xì)粒度的操作權(quán)限訪問(wèn)和更新狀態(tài)獲取時(shí)間戳、使用定時(shí)器(Timer)主要包括:KeyedProcessFunction、ProcessFunction、CoProcessFunction等ProcessFunction系列函數(shù)Timer就像一個(gè)鬧鐘先在Timer中注冊(cè)一個(gè)未來(lái)的時(shí)間當(dāng)這個(gè)時(shí)間到達(dá),“鬧鐘”響起,程序執(zhí)行回調(diào)函數(shù),回調(diào)函數(shù)執(zhí)行一定的業(yè)務(wù)邏輯ProcessFunction兩大重要接口:processElement()方法處理數(shù)據(jù)流中的一條元素,并通過(guò)Collector<O>輸出出來(lái)。Context是processElement()方法的特色,可以獲取時(shí)間戳、訪問(wèn)TimerService,設(shè)置TimeronTimer()是回調(diào)函數(shù),當(dāng)?shù)搅恕棒[鐘”時(shí)間,F(xiàn)link會(huì)調(diào)用onTimer()方法,執(zhí)行一些業(yè)務(wù)邏輯Timer的使用方法//處理數(shù)據(jù)流中的一條元素
public
abstract
void
processElement(Ivalue,Contextctx,Collector<O>out)//時(shí)間到達(dá)后的回調(diào)函數(shù)
public
void
onTimer(longtimestamp,OnTimerContextctx,Collector<O>out)
Timer的使用方法在processElement()方法中通過(guò)Context注冊(cè)一個(gè)未來(lái)的時(shí)間戳t在onTimer()方法中實(shí)現(xiàn)一些邏輯,到達(dá)t時(shí)刻,onTimer()方法被自動(dòng)調(diào)用。只能在KeyedStream上注冊(cè)Timer每個(gè)Key下可以注冊(cè)多個(gè)不同時(shí)間戳作為Timer每個(gè)Key下某個(gè)時(shí)間戳下只能注冊(cè)一個(gè)Timer未來(lái)的時(shí)間戳t可以是Processing
Time也可以是Event
Time從Context中,我們可以獲取一個(gè)TimerService,這是一個(gè)訪問(wèn)時(shí)間戳和Timer的接口Context.timerService.registerProcessingTimeTimer()注冊(cè)一個(gè)Processing
Time的TimerContext.timerService.deleteProcessingTimeTimer()刪除之前注冊(cè)的TimerContext.timerService.currentProcessingTime()獲取當(dāng)前時(shí)間戳某支股票未來(lái)某段interval時(shí)間間隔是否一致連續(xù)上漲如果未來(lái)interval間隔內(nèi)一直上漲,發(fā)送一個(gè)提示解決思路:如果新數(shù)據(jù)比上次數(shù)據(jù)價(jià)格更高且沒(méi)有注冊(cè)Timer,注冊(cè)一個(gè)未來(lái)interval之后的Timer在interval期間內(nèi),如果價(jià)格回落,則把剛才的Timer刪掉在interval期間內(nèi),如果價(jià)格一直上升,觸發(fā)onTimer()onTimer()發(fā)送提示Timer案例:股票交易場(chǎng)景將一部分?jǐn)?shù)據(jù)發(fā)送到另一個(gè)流中兩個(gè)流數(shù)據(jù)類型可以不一樣通過(guò)OutputTag<T>標(biāo)記另外一個(gè)數(shù)據(jù)流將交易量大于100的數(shù)據(jù)流側(cè)輸出側(cè)輸出OutputTag<StockPrice>highVolumeOutput=
newOutputTag<StockPrice>("high-volume-trade"){};
public
static
class
SideOutputFunction
extends
KeyedProcessFunction<String,StockPrice,String>{@Overridepublic
void
processElement(StockPricestock,Contextcontext,Collector<String>out)
throwsException{if(stock.volume>100){
context.output(highVolumeOutput,stock);}else{
out.collect("normaltickdata");}}}DataStream<StockPrice>inputStream=...SingleOutputStreamOperator<String>mainStream=inputStream.keyBy(stock->stock.symbol)//調(diào)用process函數(shù),包含側(cè)輸出邏輯
.process(newSideOutputFunction());DataStream<StockPrice>sideOutputStream=mainStream.getSideOutput(highVolumeOutput);CoProcessFunction或KeyedCoProcessFunctionprocessElement1()方法:對(duì)第一個(gè)數(shù)據(jù)流的每個(gè)元素處理processElement2()方法:對(duì)第二個(gè)數(shù)據(jù)流的每個(gè)元素處理第一個(gè)流、第二個(gè)流可以共享狀態(tài)第一個(gè)流、第二個(gè)流、輸出流三者的數(shù)據(jù)類型可以不一樣案例:實(shí)現(xiàn)兩個(gè)數(shù)據(jù)流上的Join:創(chuàng)建狀態(tài),兩個(gè)流都可以訪問(wèn)狀態(tài),例如狀態(tài)變量aprocessElement1()方法處理第一個(gè)數(shù)據(jù)流,更新狀態(tài)a。processElement2()方法處理第二個(gè)數(shù)據(jù)流,根據(jù)狀態(tài)a中的數(shù)據(jù),生成相應(yīng)的輸出。兩個(gè)流上使用ProcessFunction股票流包含價(jià)格、交易量、時(shí)間戳等媒體評(píng)價(jià)流包含了對(duì)各支股票的正負(fù)評(píng)價(jià)兩支數(shù)據(jù)流一起流入KeyedCoProcessFunction主邏輯中先將兩個(gè)數(shù)據(jù)流connect(),然后按照股票代號(hào)進(jìn)行keyBy(),進(jìn)而使用process():案例:股票價(jià)格流與媒體評(píng)價(jià)流做Join/**
*四個(gè)泛型:Key,第一個(gè)流類型,第二個(gè)流類型,輸出。*/
publicstaticclass
JoinStockMediaProcessFunction
extends
KeyedCoProcessFunction<String,StockPrice,Media,StockPrice>{//mediaState
privateValueState<String>mediaState;@Overridepublicvoidopen(Configurationparameters)throwsException{//從RuntimeContext中獲取狀態(tài)
mediaState=getRuntimeContext().getState(newValueStateDescriptor<String>("mediaStatusState",Types.STRING));}@OverridepublicvoidprocessElement1(StockPricestock,Contextcontext,Collector<StockPrice>collector)throwsException{StringmediaStatus=mediaState.value();if(null!=mediaStatus){stock.mediaStatus=mediaStatus;collector.collect(stock);}}@OverridepublicvoidprocessElement2(Mediamedia,Contextcontext,Collector<StockPrice>collector)throwsException{//第二個(gè)流更新mediaState
mediaState.update(media.status);}}//讀入股票數(shù)據(jù)流
DataStream<StockPrice>stockStream=...//讀入媒體評(píng)價(jià)數(shù)據(jù)流
DataStream<Media>mediaStream=...DataStream<StockPrice>joinStream=stockStream.connect(mediaStream).keyBy("symbol","symbol")//調(diào)用process函數(shù)
.process(newJoinStockMediaProcessFunction());Flink的時(shí)間語(yǔ)義ProcessFunction系列函數(shù)窗口算子的使用雙流關(guān)聯(lián)處理遲到數(shù)據(jù)根據(jù)是否keyBy()分為Keyed
Window和Non-Keyed
WindowkeyBy()windowAll()下游算子并行度為1窗口程序兩個(gè)必須操作使用窗口分配器(WindowAssigner)將數(shù)據(jù)流中的元素分配到對(duì)應(yīng)的窗口當(dāng)滿足窗口觸發(fā)條件后,對(duì)窗口內(nèi)的數(shù)據(jù)使用窗口處理函數(shù)(WindowFunction)進(jìn)行處理,常用的WindowFunction有reduce()、aggregate()、process()其他的trigger()、evictor()則是窗口的觸發(fā)和銷毀過(guò)程中的附加選項(xiàng),主要面向需要更多自定義的高級(jí)編程者,如果不設(shè)置則會(huì)使用默認(rèn)的配置。窗口程序的骨架結(jié)構(gòu)//KeyedWindow
stream.keyBy(<KeySelector>)
//按照一個(gè)Key進(jìn)行分組
.window(<WindowAssigner>)
//將數(shù)據(jù)流中的元素分配到相應(yīng)的窗口中
[.trigger(<Trigger>)]//指定觸發(fā)器Trigger(可選)
[.evictor(<Evictor>)]//指定清除器Evictor(可選)
.reduce/aggregate/process()//窗口處理函數(shù)WindowFunction
//Non-KeyedWindow
stream.windowAll(WindowAssigner)//不分組,將數(shù)據(jù)流中的所有元素分配到相應(yīng)的窗口中
[.trigger(<Trigger>)]//指定觸發(fā)器Trigger(可選)
[.evictor(<Evictor>)]//指定清除器Evictor(可選)
.reduce/aggregate/process()//窗口處理函數(shù)WindowFunction窗口分配器WindowAssigner將元素分配給不同的時(shí)間窗口時(shí)間窗口上進(jìn)行Window
Function計(jì)算案例:設(shè)置10分鐘的時(shí)間窗口確定窗口的長(zhǎng)度0:00
–
0:10、0:10
–
0:20數(shù)據(jù)流元素流入,根據(jù)元素的時(shí)間,分配到不同的窗口當(dāng)窗口滿足了觸發(fā)條件,觸發(fā)相應(yīng)的Window
Function計(jì)算窗口的生命周期窗口之間不重疊,窗口長(zhǎng)度(Size)是固定的可以設(shè)置偏移量Offset內(nèi)置的窗口劃分方法–滾動(dòng)窗口DataStream<T>input=...//基于Event
Time的時(shí)間窗口
input.keyBy(<KeySelector>).window(TumblingEventTimeWindows.of(Time.seconds(5))).<windowfunction>(...)//在小時(shí)級(jí)滾動(dòng)窗口上設(shè)置15分鐘的Offset偏移
input.keyBy(<KeySelector>).window(TumblingEventTimeWindows.of(Time.hours(1),Time.minutes(15))).<windowfunction>(...)以一個(gè)步長(zhǎng)(Slide)不斷向前滑動(dòng),窗口的長(zhǎng)度(Size)固定Slide小于窗口的Size時(shí),相鄰窗口會(huì)重疊,一個(gè)元素會(huì)被分配到多個(gè)窗口Slide大于Size,有些元素可能被丟掉
內(nèi)置的窗口劃分方法–滑動(dòng)窗口DataStream<T>input=...//基于EventTime的滑動(dòng)窗口
input.keyBy(<KeySelector>).window(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5))).<windowfunction>(...)兩個(gè)窗口之間有一個(gè)間隙,被稱為SessionGap當(dāng)一個(gè)窗口在大于SessionGap的時(shí)間內(nèi)沒(méi)有接收到新數(shù)據(jù)時(shí),窗口將關(guān)閉窗口的長(zhǎng)度可變、窗口的開始和結(jié)束時(shí)間不確定可以設(shè)置定長(zhǎng)的SessionGap,也可以使用SessionWindowTimeGapExtractor動(dòng)態(tài)地確定SessionGap的長(zhǎng)度內(nèi)置的窗口劃分方法–會(huì)話窗口DataStream<T>input=...//基于EventTime定長(zhǎng)SessionGap的會(huì)話窗口
input.keyBy(<KeySelector>).window(EventTimeSessionWindows.withGap(Time.minutes(10))).<windowfunction>(...)//基于EventTime變長(zhǎng)SessionGap的會(huì)話窗口
input.keyBy(<KeySelector>).window(EventTimeSessionWindows .withDynamicGap((element)->{//返回SessionGap的長(zhǎng)度
})).<windowfunction>(...)數(shù)據(jù)流元素經(jīng)過(guò)WindowAssigner后,被分配給不同的窗口使用窗口函數(shù),在每個(gè)窗口上對(duì)窗口內(nèi)的元素進(jìn)行處理窗口函數(shù)分為兩類:增量計(jì)算,如reduce()和aggregate()全量計(jì)算,如process()增量計(jì)算:窗口保存一份中間數(shù)據(jù),每流入一個(gè)新元素,新元素與中間數(shù)據(jù)兩兩合一,生成新的中間數(shù)據(jù),再保存到窗口中。全量計(jì)算:窗口先緩存所有元素,等到觸發(fā)條件后對(duì)窗口內(nèi)的全量元素執(zhí)行計(jì)算。窗口函數(shù)ReduceFunction使用reduce()需要實(shí)現(xiàn)ReduceFunctionReduceFunction接受兩個(gè)相同類型的輸入,生成一個(gè)輸出。兩兩合一地進(jìn)行匯總操作,生成一個(gè)同類型的新元素需要維護(hù)一個(gè)狀態(tài)數(shù)據(jù),狀態(tài)數(shù)據(jù)的數(shù)據(jù)類型和輸入、輸出的數(shù)據(jù)類型是一致的優(yōu)點(diǎn):狀態(tài)數(shù)據(jù)小,ReduceFunction好實(shí)現(xiàn)缺點(diǎn):功能受限//讀入股票數(shù)據(jù)流
DataStream<StockPrice>stockStream=...senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//reduce的返回類型必須和輸入類型StockPrice一致
DataStream<StockPrice>sum=stockStream.keyBy(s->s.symbol).timeWindow(Time.seconds(10)).reduce((s1,s2)->StockPrice.of(s1.symbol,s2.price,s2.ts,s1.volume+s2.volume));AggregateFunction使用aggregate()需要實(shí)現(xiàn)AggregateFunction實(shí)現(xiàn)起來(lái)稍復(fù)雜:輸入類型IN、輸出類型OUT、中間狀態(tài)數(shù)據(jù)ACC三者不相同,可以自定義ACC的數(shù)據(jù)結(jié)構(gòu)需要實(shí)現(xiàn)多個(gè)虛方法:createAccumulator()、add()、getResult()public
interface
AggregateFunction<IN,ACC,OUT>extends
Function,Serializable{//在一次新的aggregate發(fā)起時(shí),創(chuàng)建一個(gè)新的Accumulator,Accumulator是我們所說(shuō)的中間狀態(tài)數(shù)據(jù),簡(jiǎn)稱ACC
//這個(gè)函數(shù)一般在初始化時(shí)調(diào)用
ACCcreateAccumulator();//當(dāng)一個(gè)新元素流入時(shí),將新元素與狀態(tài)數(shù)據(jù)ACC合并,返回狀態(tài)數(shù)據(jù)ACC
ACCadd(INvalue,ACCaccumulator);//將兩個(gè)ACC合并
ACCmerge(ACCa,ACCb);//將中間數(shù)據(jù)轉(zhuǎn)成結(jié)果數(shù)據(jù)
OUTgetResult(ACCaccumulator);}AggregateFunction源碼AggregateFunction計(jì)算一個(gè)窗口內(nèi)某支股票的平均值A(chǔ)CC中要保存總和(sum)、個(gè)數(shù)(count)以及股票代號(hào)(symbol)createAccumulator():創(chuàng)建新的ACC,初始化ACC數(shù)據(jù)add():新數(shù)據(jù)到達(dá),更新ACC中的sum和countgetResult():將ACC轉(zhuǎn)換為最終結(jié)果merge():窗口融合時(shí),多個(gè)窗口里的ACC合并,生成新的ACCAggregateFunction計(jì)算流程
AggregateFunction/**
*接收三個(gè)泛型:*IN:StockPrice
*ACC:(String,Double,Int)-(symbol,sum,count)
*OUT:(String,Double)-(symbol,average)
*/
publicstaticclass
AverageAggregate
implements
AggregateFunction<StockPrice,Tuple3<String,Double,Integer>,Tuple2<String,Double>>{@OverridepublicTuple3<String,Double,Integer>createAccumulator(){returnTuple3.of("",0d,0);}@OverridepublicTuple3<String,Double,Integer>add(StockPriceitem,Tuple3<String,Double,Integer>accumulator){doubleprice=accumulator.f1+item.price;intcount=accumulator.f2+1;returnTuple3.of(item.symbol,price,count);}@OverridepublicTuple2<String,Double>getResult(Tuple3<String,Double,Integer>accumulator){returnTuple2.of(accumulator.f0,accumulator.f1/accumulator.f2);}@OverridepublicTuple3<String,Double,Integer>merge(Tuple3<String,Double,Integer>a,Tuple3<String,Double,Integer>b){returnTuple3.of(a.f0,a.f1+b.f1,a.f2+b.f2);}}DataStream<StockPrice>stockStream=...DataStream<Tuple2<String,Double>>average=stockStream.keyBy(s->s.symbol).timeWindow(Time.seconds(10)).aggregate(newAverageAggregate());主程序?qū)崿F(xiàn)一個(gè)AggregateFunctionProcessWindowFunctionProcessWindowFunction要對(duì)窗口內(nèi)的全量數(shù)據(jù)都緩存Flink將某個(gè)Key下某個(gè)窗口的所有元素都緩存在Iterable<IN>中,對(duì)其進(jìn)行處理后,用Collector<OUT>收集輸出Context獲取窗口內(nèi)更多的信息,包括時(shí)間、狀態(tài)、遲到數(shù)據(jù)發(fā)送位置等/**
*函數(shù)接收四個(gè)泛型*IN輸入類型*OUT輸出類型*KEYkeyBy中按照Key分組,Key的類型*W窗口的類型*/
public
abstract
class
ProcessWindowFunction<IN,OUT,KEY,W
extends
Window>extends
AbstractRichFunction{/**
*對(duì)一個(gè)窗口內(nèi)的元素進(jìn)行處理,窗口內(nèi)的元素緩存在Iterable<IN>,進(jìn)行處理后輸出到Collector<OUT>中*我們可以輸出一到多個(gè)結(jié)果*/
public
abstract
void
process(KEYkey,Contextcontext,Iterable<IN>elements,Collector<OUT>out)
throwsException;/**
*當(dāng)窗口執(zhí)行完畢被清理時(shí),刪除各類狀態(tài)數(shù)據(jù)。*/
public
void
clear(Contextcontext)
throwsException{}…}ProcessWindowFunction源碼ProcessWindowFunction與增量計(jì)算相結(jié)合想訪問(wèn)窗口中Context等元數(shù)據(jù),又想使用增量計(jì)算,不緩存所有數(shù)據(jù),可以將ProcessWindowFunction與增量計(jì)算函數(shù)reduce()或aggregate()相結(jié)合Flink先進(jìn)行增量計(jì)算,窗口結(jié)束前,將增量計(jì)算結(jié)果發(fā)送給ProcessWindowFunction再處理案例:計(jì)算時(shí)間窗口下股票的最大值、最小值以及窗口結(jié)束時(shí)間戳窗口的最大值、最小值可以由reduce()增量計(jì)算得到窗口結(jié)束時(shí)間戳需要從Context中獲得觸發(fā)器Trigger每個(gè)窗口都有一個(gè)Trigger,Trigger決定了窗口何時(shí)啟動(dòng)Window
Function執(zhí)行計(jì)算、何時(shí)清理窗口中的數(shù)據(jù)例如:Processing
Time下的時(shí)間窗口帶有一個(gè)默認(rèn)的Trigger,當(dāng)?shù)竭_(dá)這個(gè)窗口的結(jié)束時(shí)間,觸發(fā)相應(yīng)的計(jì)算其他窗口觸發(fā)的特例:窗口中遇到某些特定的元素、元素總數(shù)達(dá)到一定數(shù)量或窗口中元素按照某個(gè)特定模式順序到達(dá)針對(duì)這些特例,可以自定義Trigger觸發(fā)器TriggerTrigger返回結(jié)果:CONTINUE:什么都不做。FIRE:?jiǎn)?dòng)計(jì)算并將結(jié)果發(fā)送給下游,不清理窗口數(shù)據(jù)。PURGE:清理窗口數(shù)據(jù)但不執(zhí)行計(jì)算。FIRE_AND_PURGE:?jiǎn)?dòng)計(jì)算,發(fā)送結(jié)果然后清理窗口數(shù)據(jù)。Trigger本質(zhì)上是一種定時(shí)器Timer,注冊(cè)一個(gè)合適的時(shí)間,到達(dá)這個(gè)時(shí)間,根據(jù)業(yè)務(wù)邏輯決定發(fā)送上面四個(gè)結(jié)果中的一個(gè)。清除器EvictorEvictor用來(lái)清除數(shù)據(jù)增量計(jì)算沒(méi)必要使用EvictorevictBefore()和evictAfter()分別在WindowFunction之前和之后被調(diào)用,方法里可以自定義一些業(yè)務(wù)邏輯,清除窗口中的數(shù)據(jù)Flink的時(shí)間語(yǔ)義ProcessFunction系列函數(shù)窗口算子的使用雙流關(guān)聯(lián)處理遲到數(shù)據(jù)雙流關(guān)聯(lián)將兩個(gè)數(shù)據(jù)流的數(shù)據(jù)關(guān)聯(lián)(Join)流處理的Join是在時(shí)間窗口上進(jìn)行兩個(gè)流的Join目前,F(xiàn)link支持兩種:窗口連接Window
Join時(shí)間間隔連接Interval
Join窗口連接Window
Join同一個(gè)窗口上兩個(gè)流按照某個(gè)Key進(jìn)行Join窗口劃分可以使用滾動(dòng)窗口、滑動(dòng)窗口和會(huì)話窗口一個(gè)窗口包含來(lái)自兩個(gè)數(shù)據(jù)流中的元素,兩個(gè)流之間以Inner
Join語(yǔ)義關(guān)聯(lián),形成數(shù)據(jù)對(duì)窗口結(jié)束時(shí)間,F(xiàn)link使用JoinFunction來(lái)對(duì)窗口中的數(shù)據(jù)進(jìn)行處理input1.join(input2)
.where(<KeySelector>) <-input1使用哪個(gè)字段作為Key.equalTo(<KeySelector>) <-input2使用哪個(gè)字段作為Key.window(<WindowAssigner>) <-指定WindowAssigner[.trigger(<Trigger>)] <-指定Trigger(可選)[.evictor(<Evictor>)] <-指定Evictor(可選).apply(<JoinFunction>) <-指定JoinFunction
窗口連接Window
Joinpublic
static
class
MyJoinFunction
implements
JoinFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,String>{@OverridepublicStringjoin(Tup
溫馨提示
- 1. 本站所有資源如無(wú)特殊說(shuō)明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁(yè)內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒(méi)有圖紙預(yù)覽就沒(méi)有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫(kù)網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年度KTV停車場(chǎng)設(shè)施改造合同范本3篇
- 道閘安裝合同
- 二零二五年人工智能研發(fā)與技術(shù)服務(wù)合同2篇
- 二零二五年度二婚離婚財(cái)產(chǎn)評(píng)估與分配合同
- 二零二五年度中小企業(yè)設(shè)備更新借款合同3篇
- 2024至2030年三丁基磷項(xiàng)目投資價(jià)值分析報(bào)告
- 二零二五InvitrogenGeneArt生物科研設(shè)備維護(hù)與升級(jí)服務(wù)合同2篇
- 北京政法職業(yè)學(xué)院《新媒體營(yíng)銷》2023-2024學(xué)年第一學(xué)期期末試卷
- 2025版酒吧場(chǎng)地租賃合同范本及營(yíng)銷合作協(xié)議3篇
- 北京語(yǔ)言大學(xué)《軟件構(gòu)造》2023-2024學(xué)年第一學(xué)期期末試卷
- 高中英語(yǔ)外刊-小貓釣魚50篇
- 【打油詩(shī)】72則創(chuàng)意期末評(píng)語(yǔ)模板-每頁(yè)8張
- QBT 2460-1999 聚碳酸酯(PC)飲用水罐
- 2024新《公司法》修訂重點(diǎn)解讀課件
- 《電子吊秤校準(zhǔn)規(guī)范》公示件
- 《跟上兔子》繪本四年級(jí)第1季Can-I-Play-with-You教學(xué)課件
- 手術(shù)室敏感指標(biāo)構(gòu)建
- 書法創(chuàng)作設(shè)計(jì)方案
- MOOC 軟件工程概論-北京聯(lián)合大學(xué) 中國(guó)大學(xué)慕課答案
- 2023年鐵路工務(wù)安全規(guī)則正文
- MOOC 傳熱學(xué)-西安交通大學(xué) 中國(guó)大學(xué)慕課答案
評(píng)論
0/150
提交評(píng)論