背景
最近有局点客户有这么一个场景:利用Flink CDC读取MySql数据binlog日志,然后使用窗口进行聚合统计,遇到的问题就是Flink现有窗口的触发机制(定时或者定量)、不满足他们的实际需求(定时和定量)。温故而知新,可以为师矣。本文基于官网和源码梳理Flink现有窗口的类型、触发机制等内容。最后,基于自定义触发器实现定时定量触发机制解决该局点客户的实际场景问题。
Flink窗口分类
窗口是处理无限实时流的核心,将无界数据流切分为逻辑概念的桶、进行实际计算逻辑。
分类方法一
从窗口是否分组来说,Flink窗口分为两类:分组窗口(Keyed Windows)、非分组窗口(Non-Keyed Windows),单从概念上看,区别就是前者用于keyed streams(即keyby操作后的DataStream)、后者直接用于DataStream。
实际区别很大:keyed streams因为按照指定key(keyselector)进行分组、相同key的元素发送到相同subtask,所以keyed windows允许在多个subtask中并行化计算;而non-keyed streams不进行分组,所有non-keyed windows计算逻辑只能在单个stask中处理,即并行度只能为1。
分类方法二
从窗口属性来说,Flink窗口分为两类:基于时间的窗口(滚动窗口-Tumbling Window、滑动窗口-Sliding Window、会话窗口-Session Window)、基于计数的窗口(全局窗口-GlobalWindow)。很明显时间窗口与时间相关、每个窗口都有开始时间和结束时间;计数窗口与数据条数相关、与时间无关。内置的这几种窗口的详细介绍,后续单独发文描述。
两种分类相关性
两种分类方式的关系是什么呢?
从窗口是否分组的图中我们可以知道Keyed Windows使用window方法、Non-Keyed Windows使用windowAll方法,我们可以看下源码中这两个方法的使用情况。
这两个方法的入参都为WindowAssigner抽象类。
上图红线部分可以看出,window方法和windowAll方法都可以使用时间窗口和计数窗口;
上图标黄部分可以看出,window方法和windowAll方法可以使用相同的WindowAssigner(即TumblingWindow、SlidingWindow、GlobalWindow);
细心读者可能会发现,为何没有SessionWindows的踪影?
我们继续看下WindowAssigner抽象类的子类,一目了然、豁然开朗:
客户需求
客户原本是需要数据条数达到时触发后续操作,但是发现某些时间段(如非高峰期)数据条数长时间达不到以至于不触发后续操作。所以,需要本文开头所说的定量触发基础上加上定时触发。很显然,从上面介绍的Flink窗口分类来看,内置的这几类窗口类型并不满足(时间窗口属于定时触发、计数窗口属于定量触发)。
自定义触发器
那么,到底是选择计数窗口+自定义时间触发器还是时间窗口+自定义计数触发器?
根据现有Flink API架构上说,时间窗口+自定义计数触发器是唯一选择。
Trigger触发器决定了窗口function何时对窗口进行运算,自定义Trigger触发器需要继承实现Trigger抽象类。
该类包括抽象方法如下:
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)
TriggerResult onEventTime(long time, W window, TriggerContext ctx)
boolean canMerge()
void onMerge(W window, OnMergeContext ctx)
void clear(W window, TriggerContext ctx)
该类有多种类型的实现子类,感兴趣可以自行阅读,方便自己实现自定义触发器:
案例实践:计数窗口
完整代码见github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/CountWindowDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
数据源:nc –l 4444
首先输入5个1(注意别忘回车)和1个2,过一会再输入4个2。
从上图可以看出,每5条数据触发一次窗口计算,效果与实际代码预期相符。
实际代码详见countWindowAll内部使用的CountTrigger触发器
public AllWindowedStream countWindowAll(long size) {return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));}
案例实践:时间窗口
完整代码见github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
数据源:nc –l 4444
首先输入任意字符如felixzh,然后分别5秒内和5秒后输入。
从上图可以看出,每5秒触发一次窗口计算,效果与实际代码预期相符。
实际代码详见TumblingProcessingTimeWindows内部使用的ProcessingTimeTrigger触发器。
案例实践:时间窗口+CountTrigger
乐于思考的朋友很容易想到,既然有时间窗口也有CountTrigger触发器,直接组合不就解决背景所述的定时定量触发了吗?不需要自定义计数触发器了吧?
该思路下的完整代码见github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddCountTriggerDemo.java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);CountTrigger countTrigger = CountTrigger.of(5);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(countTrigger)//.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
数据源:nc –l 4444
输入任意6行字符
从上图可以看出,即使等待30秒之后,实际效果并非预期。
预期的定时定量触发:即5条数据达到触发条件,就要触发计算,这点没毛病。
而30秒达到触发条件,并没有触发计算。
究其原因:ProcessingTimeTrigger触发器onProcessingTime方法返回TriggerResult.FIRE;而CountTrigger触发器onProcessingTime方法返回TriggerResult.CONTINUE。
看完TriggerResult枚举类,相信你会一目了然:
/** No action is taken on the window. */CONTINUE(false, false),/** {@code FIRE_AND_PURGE} evaluates the window function and emits the window result. */FIRE_AND_PURGE(true, true),/*** On {@code FIRE}, the window is evaluated and results are emitted. The window is not purged,* though, all elements are retained.*/FIRE(true, false),/*** All elements in the window are cleared and the window is discarded, without evaluating the* window function or emitting any elements.*/PURGE(false, true);
简而言之一句话:FIRE会触发计算,CONTINUE不会触发计算。
案例实践:时间窗口+自定义计数触发器
经过上述描述,我们还是需要实现自定义的计数触发器,需要区分事件时间和处理时间。
当然,思路还是借鉴CountTrigger触发器的已有内容。
定义MyCountTrigger自定义触发器继承Trigger,完整代码见github:
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/MyCountTrigger.java
https://github.com/felixzh2020/felixzh-learning-flink/blob/master/MyWindowTrigger/src/main/java/TimeWindowAddMyCountTriggerDemo.java
案例实践代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream socket = env.socketTextStream(“felixzh”, 4444, “”);socket.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(30))).trigger(MyCountTrigger.of(5)).process(new ProcessAllWindowFunction() {@Overridepublic void process(Context context, Iterable elements, Collector out) throws Exception {elements.forEach(value -> out.collect(value + ” ” + new Date()));}}).print();env.execute();
数据源:nc –l 4444
输入任意6行字符
从上图可以看出,每5条数据触发一次窗口计算,每30秒触发一次窗口计算,效果与实际代码预期相符。
结论
以上,借鉴Flink原生CountTrigger和ProcessingTimeTrigger,实现自定义Trigger触发器,解决客户现场定时定量触发窗口聚合计算的效果。