更新時(shí)間:2021年01月15日17時(shí)49分 來(lái)源:傳智教育 瀏覽次數(shù):
問題分析
假如我們自己寫一個(gè)流式框架。我們?cè)撊绾翁幚硐?。正常情況下,我們看到消息按照順序一個(gè)個(gè)發(fā)送,接受后按照順序處理,這是沒有什么問題的。然而也要考慮到一些特殊情況下,消息不在是按照順序發(fā)送,產(chǎn)生了亂序,這時(shí)候該怎么處理?
核心問題講解
(1)watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)。但是對(duì)于late element,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算。這個(gè)特別的機(jī)制,就是watermark。
(2)通常,在接收到source的數(shù)據(jù)后,應(yīng)該立刻生成watermark;但是,也可以在接收source后,應(yīng)用簡(jiǎn)單的map或者filter操作,然后再生成watermark。
(3)如果延遲的數(shù)據(jù)有業(yè)務(wù)需要,則設(shè)置好允許延遲的時(shí)間,因?yàn)槲覀儾荒軣o(wú)限期的等下去。每個(gè)窗口都有屬于自己的最大等待延遲數(shù)據(jù)的時(shí)間限制,窗口結(jié)束時(shí)間+延遲時(shí)間=最大waterMark值,即當(dāng)waterMark值大于的上述計(jì)算出的最大waterMark值,該窗口內(nèi)的數(shù)據(jù)就屬于遲到的數(shù)據(jù),無(wú)法參與window計(jì)算。
問題擴(kuò)展
結(jié)合項(xiàng)目中使用
watermark如何處理亂序數(shù)據(jù)?
假如我們?cè)O(shè)置10s的時(shí)間窗口(window),那么0~10s,10~20s都是一個(gè)窗口,以0~10s為例,0位start-time,10為end-time。假如有4個(gè)數(shù)據(jù)的event-time分別是8(A),12.5(B),9(C),13.5(D),我們?cè)O(shè)置Watermarks為當(dāng)前所有到達(dá)數(shù)據(jù)event-time的最大值減去延遲值3.5秒。
當(dāng)A到達(dá)的時(shí)候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會(huì)觸發(fā)計(jì)算;
當(dāng)B到達(dá)的時(shí)候,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算;
當(dāng)C到達(dá)的時(shí)候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會(huì)觸發(fā)計(jì)算;
當(dāng)D到達(dá)的時(shí)候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發(fā)計(jì)算;
觸發(fā)計(jì)算的時(shí)候,會(huì)將AC(因?yàn)樗麄兌夹∮?0)都計(jì)算進(jìn)去。
通過上面這種方式,我們就將遲到的C計(jì)算進(jìn)去了。這里的延遲3.5s是我們假設(shè)一個(gè)數(shù)據(jù)到達(dá)的時(shí)候,比他早3.5s的數(shù)據(jù)肯定也都到達(dá)了,這個(gè)是需要根據(jù)經(jīng)驗(yàn)推算的,加入D到達(dá)以后有到達(dá)了一個(gè)E,event-time=6,但是由于0~10的時(shí)間窗口已經(jīng)開始計(jì)算了,所以E就丟了。
猜你喜歡:
Flink cep庫(kù)在處理事件時(shí)間延遲問題
Scala重寫父類有哪些注意事項(xiàng)?重寫代碼演示
北京校區(qū)