广东医院项目Flink开发需求:定时定量窗口触发器(从入门到精通)

广东医院项目Flink开发需求:定时定量窗口触发器(从入门到精通)

背景

最近有局点客户有这么一个场景:利用Flink CDC读取MySql数据binlog日志,然后使用窗口进行聚合统计,遇到的问题就是Flink现有窗口的触发机制(定时或者定量)、不满足他们的实际需求(定时和定量)。温故而知新,可以为师矣。本文基于官网和源码梳理Flink现有窗口的类型、触发机制等内容。最后,基于自定义触发器实现定时定量触发机制解决该局点客户的实际场景问题。

Flink窗口分类

窗口是处理无限实时流的核心,将无界数据流切分为逻辑概念的桶、进行实际计算逻辑。

分类方法

从窗口是否分组来说,Flink窗口分为两类:分组窗口(Keyed Windows)、非分组窗口(Non-Keyed Windows),单从概念上看,区别就是前者用于keyed streams(即keyby操作后的DataStream)、后者直接用于DataStream。

实际区别很大:keyed streams因为按照指定key(keyselector)进行分组、相同key的元素发送到相同subtask,所以keyed windows允许在多个subtask中并行化计算;而non-keyed streams不进行分组,所有non-keyed windows计算逻辑只能在单个stask中处理,即并行度只能为1。

分类方法二

从窗口属性来说,Flink窗口分为两类:基于时间的窗口(滚动窗口-Tumbling Window、滑动窗口-Sliding Window、会话窗口-Session Window)、基于计数的窗口(全局窗口-GlobalWindow)。很明显时间窗口与时间相关、每个窗口都有开始时间和结束时间;计数窗口与数据条数相关、与时间无关。内置的这几种窗口的详细介绍,后续单独发文描述。

两种分类相关性

两种分类方式的关系是什么呢?

从窗口是否分组的图中我们可以知道Keyed Windows使用window方法、Non-Keyed Windows使用windowAll方法,我们可以看下源码中这两个方法的使用情况。

这两个方法的入参都为WindowAssigner抽象类。

上图红线部分可以看出,window方法和windowAll方法都可以使用时间窗口和计数窗口;

上图标黄部分可以看出,window方法和windowAll方法可以使用相同的WindowAssigner(即TumblingWindow、SlidingWindow、GlobalWindow);

细心读者可能会发现,为何没有SessionWindows的踪影?

我们继续看下WindowAssigner抽象类的子类,一目了然、豁然开朗:

客户需求

客户原本是需要数据条数达到时触发后续操作,但是发现某些时间段(如非高峰期)数据条数长时间达不到以至于不触发后续操作。所以,需要本文开头所说的定量触发基础上加上定时触发。很显然,从上面介绍的Flink窗口分类来看,内置的这几类窗口类型并不满足(时间窗口属于定时触发、计数窗口属于定量触发)。

自定义触发器

那么,到底是选择计数窗口+自定义时间触发器还是时间窗口+自定义计数触发器?

根据现有Flink API架构上说,时间窗口+自定义计数触发器是唯一选择。

Trigger触发器决定了窗口function何时对窗口进行运算,自定义Trigger触发器需要继承实现Trigger抽象类。

该类包括抽象方法如下:

TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)

TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)

TriggerResult onEventTime(long time, W window, TriggerContext ctx)

boolean canMerge()

void onMerge(W window, OnMergeContext ctx)

void clear(W window, TriggerContext ctx)

该类有多种类型的实现子类,感兴趣可以自行阅读,方便自己实现自定义触发器:

案例实践:计数窗口

完整代码见github:

https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/CountWindowDemo.java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

数据源:nc –l 4444

首先输入5个1(注意别忘回车)和1个2,过一会再输入4个2。

从上图可以看出,每5条数据触发一次窗口计算,效果与实际代码预期相符。

实际代码详见countWindowAll内部使用的CountTrigger触发器

public AllWindowedStream countWindowAll(long size) {return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}

案例实践:时间窗口

完整代码见github:

https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowDemo.java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

数据源:nc –l 4444

首先输入任意字符如felixzh,然后分别5秒内和5秒后输入。

从上图可以看出,每5秒触发一次窗口计算,效果与实际代码预期相符。

实际代码详见TumblingProcessingTimeWindows内部使用的ProcessingTimeTrigger触发器。

案例实践:时间窗口+CountTrigger

乐于思考的朋友很容易想到,既然有时间窗口也有CountTrigger触发器,直接组合不就解决背景所述的定时定量触发了吗?不需要自定义计数触发器了吧?

该思路下的完整代码见github:

https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddCountTriggerDemo.java

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);CountTrigger countTrigger = CountTrigger.of(5);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(countTrigger)//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

数据源:nc –l 4444

输入任意6行字符

从上图可以看出,即使等待30秒之后,实际效果并非预期。

预期的定时定量触发:即5条数据达到触发条件,就要触发计算,这点没毛病。

而30秒达到触发条件,并没有触发计算。

究其原因:ProcessingTimeTrigger触发器onProcessingTime方法返回TriggerResult.FIRE;而CountTrigger触发器onProcessingTime方法返回TriggerResult.CONTINUE。

看完TriggerResult枚举类,相信你会一目了然:

/** No action is taken on the window. */CONTINUE(false, false),/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */FIRE_AND_PURGE(true, true),/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/FIRE(true, false),/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/PURGE(false, true);

简而言之一句话:FIRE会触发计算,CONTINUE不会触发计算。

案例实践:时间窗口+自定义计数触发器

经过上述描述,我们还是需要实现自定义的计数触发器,需要区分事件时间和处理时间。

当然,思路还是借鉴CountTrigger触发器的已有内容。

定义MyCountTrigger自定义触发器继承Trigger,完整代码见github:

https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/MyCountTrigger.java

https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddMyCountTriggerDemo.java

案例实践代码如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(MyCountTrigger.of(5)).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();

数据源:nc –l 4444

输入任意6行字符

从上图可以看出,每5条数据触发一次窗口计算,每30秒触发一次窗口计算,效果与实际代码预期相符。

结论

以上,借鉴Flink原生CountTrigger和ProcessingTimeTrigger,实现自定义Trigger触发器,解决客户现场定时定量触发窗口聚合计算的效果。

郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
(0)
用户投稿
上一篇 2022年6月20日
下一篇 2022年6月20日

相关推荐

  • 了解数据库、SQL

    数据库基础 数据库(database)保存有组织的数据的容器(通常是一个文件或一组文件)。 表(table) 某种特定类型数据的结构化清单。 模式(schema)关于数据库和表的布…

    2022年6月15日
  • KPL DRG.GK战队的高光时刻

    Game1:蓝方苏州KSG(原YTG 平头哥) 红方佛山DRG.GK(原佛山GK) 【BP 环节】 蓝方Ban:沈梦溪 蒙恬 干将莫邪 狄仁杰 蓝方Pick:对抗路阿泽老夫子 打野…

    2022年9月23日
  • 连夺3块省级招牌!连南这个网红打卡点你去过吗?

    广东第二届国土空间生态修复十大范例、广东省自然教育基地、广东省科普教育基地!这是今年来,连南瑶族自治县万山朝王国家石漠公园接连获得的3块省级招牌,彰显了这颗“绿色明珠”的魅力! 2…

    2022年8月11日
  • 小米12 Ultra最新爆料:没有徕卡可乐标 时间定档7月上旬

    小米和徕卡合作的首款新机小米12 Ultra在上个月的时候就已经开始了预热,这段时间小米创始人雷军也不断地在其个人微博发布很多有关徕卡相机的消息,让网友可以更多的了解这个百年影像品…

    2022年6月24日
  • 皱纹 多种皱纹的去除方法

    面部的皱纹最容易暴露年龄,想要面容生动不留衰老迹象, 就要肯下功夫,将皱纹抚平,做个光洁亮丽的时光美人。 去除表情纹、缺水纹 脆弱的肌肤容易形成皱纹,适当按摩能够舒展肌肤,减少皱纹…

    2022年7月3日
  • 刘亦菲新古装剧首播,效果出人意料,网友:村姑装都这么好看

    刘亦菲主演的古装剧《梦华录》最近播出了,首播播放量就刷新了记录,拿到了1.72亿的播放量。重回古装剧的刘亦菲,号召力还是这么强。 从六月一日首播之后,小编就肝完了目前放出的7集,最…

    2022年6月5日
  • 给大学生监考是一种怎样的体验?

    工作中最痛苦的事是监考,监考中最痛苦的事是看到你答错了,却不能传道授业解惑,还把自己气个半死。 为啥大家都交卷了,他怎么还不写完! 两个监考老师陪最后交卷的同学坚持到最后一刻 大学…

    2022年7月29日
  • 仿淘宝大流量高并发电商领域核心项目已上线(完整流程 + 白皮书)

    面对近年来网络的飞速发展,大家已经都习惯了网络购物,从而出现了一些衍生品例如:某宝/某东/拼夕夕等大型网站以及购物 APP! 并且从而导致很多大型互联网企业以及中小厂都需要你有完整…

    2022年8月22日
  • 一张618期间销售排行图,再次刷新国人对iPhone13的喜爱印象

    一张618期间销售排行图,再次刷新国人对iPhone13的喜爱印象 数据证明:618活脱脱地过成了苹果促销日 2022年年终大促已经落下帷幕,如今各种销售数据也相继出炉了,而京东方…

    2022年6月25日
  • 微信查看已删除的好友(微信如何查看删除的好友)

    有时候因为一时的冲动,或者不小心把微信好友删除了,然而需要联系对方的时候,却找不到对方的微信号,这时候别着急,用下面这个方法10秒就能悄悄找回来。 重新找回好友需要具备2个条件: …

    2022年11月3日

联系我们

联系邮箱:admin#wlmqw.com
工作时间:周一至周五,10:30-18:30,节假日休息