Flink: Window、Time、Watermark

2020/12/07 Flink 共 4891 字,约 14 分钟

Flink通过Window、Time、Watermark完成乱序事件处理,定义窗口以及维护和更新用户定义状态等

Window

流式计算最终的目的是去统计数据产生汇总结果的,而在无界数据集上,如果做一个全局的窗口统计是不现实的,所以只能去划定一定大小的窗口范围去汇总

  • 滚动窗口(Tumbling Window): 窗口数据有固定的大小(时间,计数),窗口不会重叠
  • 滑动窗口(Sliding Window): 窗口数据有固定的大小(时间),有生成间隔,窗口会重叠
  • 会话窗口(Session Window): 窗口数据没有固定的大小,根据会话参数划分,窗口不会重叠

窗口是时间驱动(如每30s)或者事件驱动(如每100个元素)的。Flink使用assigner(分配record到特定窗口),trigger(触发窗口的执行的条件),evictor(trigger触发后,处理数据前,过滤一部分record)来配置一个窗口。以下代码描述了建立一个GlobalWindow,当窗口积累了1000个事件时,保留最新的100个并触发计算。

stream
  .window(GlobalWindow.create())
  .trigger(Count.of(1000))
  .evict(Count.of(100))

Time

通过env.setStreamTimeCharacteristic()可以设置使用时间

  • 事件时间(Event Time): 事件实际发生的时间,由数据生产方标记
    • 确定性,对于乱序、延时、或者数据重放等情况,都能给出正确的结果
    • 处理无序事件时性能和延迟受到影响
  • 摄入时间(Ingestion Time): 事件进入流处理框架的时间。
    • 处于 Event Time 和 Processing Time之间,性能和准确度的折中方案
    • 比起Event Time,Ingestion Time可以不需要设置复杂的Watermark,因此也不需要太多缓存,延迟较低。不能保证生产者到source的这段线路的有序性,因此不能处理无序事件和延迟数据
    • 比起Processing Time,Ingestion Time的时间是Souce赋值的,一个事件在整个处理过程从头至尾都使用这个时间,保证Flink内部有序,即后续算子不受前序算子处理速度的影响,计算结果相对准确一些,但计算成本稍高
  • 处理时间(Processing Time): 事件被处理的时间
    • 只依赖当前执行机器的系统时钟,无需缓存,最佳的性能和最低的延迟
    • 不确定性,每台机器不一样 ,容易受到各种因素影像(event产生的速度、到达flink的速度、在算子之间传输速度等)

WaterMark

WaterMark本质上是一个时间戳,是DataStream 中一个带有时间戳的元素,一般结合事件时间使用,为了解决实时计算中的数据乱序问题

  • WaterMark是Flink判断迟到数据的标准,同时也是窗口触发的标记。如果 Flink中出现了一个WaterMark(T),那么就意味着 EventTime < T 的数据都已经到达
  • 在程序并行度大于 1 的情况下,会有多个流产生WaterMark和窗口,这时候 Flink 会选取时间戳最小的WaterMark
  • WaterMark可以加在source算子或者非source算子,建议加在source算子上,只有当不可以加在source算子上才能加在非source算子上

如上图所示,对于有序的数据流来说,WaterMark周期性的出现在数据流中;对于无序的数据流来说,WaterMark表示该点之前,所有到特定事件时间的事件都应该到达。一旦WaterMark到达算子,算子就可以将其内部事件时钟提前到WaterMark的值。

如图所示,WaterMark由source直接生成,每个算子并行化的子任务都维护自己的WaterMark。当算子接收到WaterMark,它会将内部事件时钟提前到WaterMark的时间,并且为后序算子生成新的WaterMark。当算子从多个输入流获得WaterMark(如keyBy,partition等算子)时,算子会选择WaterMark中的最小值更新时间

WatermarkStrategy

如果不是Kafka或者是Kinesis,那么需要用WatermarkStrategy来指定TimestampAssigner

WatermarkStrategy.forMonotonousTimestamps()
    .withTimestampAssigner((event, timestamp) -> event.time);

Flink中已经实现WatermarkGenerator接口的预制OOTB的周期性Watermark生成器

  • 单调增加的生成策略: WatermarkStrategy.forMonotonousTimestamps()
  • 固定延迟的生成策略(基于事件时间): WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))

WatermarkGenerators

如果需要自定义WatermarkStrategy,需要继承WatermarkStrategy并实现createWatermarkGenerator()方法,TimestampAssigner对象可以在构建时传递,所以不需要实现createTimestampAssigner()方法。

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

    /**
     * Instantiates a {@link TimestampAssigner} for assigning timestamps according to this
     * strategy.
     */
    @Override
    TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

    /**
     * Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
     */
    @Override
    WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

对于WatermarkGenerators来说,需要继承并实现以下方法

@Public
public interface WatermarkGenerator<T> {

    /**
     * 为每个事件调用,用于检查并记住事件时间戳,或基于事件本身产生Watermark
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 由ExecutionConfig.getAutoWatermarkInterval()时间间隔调用,产生Watermark
     */
    void onPeriodicEmit(WatermarkOutput output);
}

周期性 WatermarkGenerator

通过ExecutionConfig.setAutoWatermarkInterval()设置时间间隔

/**
 * 固定延迟的生成器(基于事件时间)
 */
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // emit the watermark as current highest timestamp minus the out-of-orderness bound
        output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
    }

}

/**
 * 固定延迟的生成器(基于系统时间)
 */
public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        // don't need to do anything because we work on processing time
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
    }
}

Punctuated WatermarkGenerator

public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {

    @Override
    public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
        if (event.hasWatermarkMarker()) {
            output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
        }
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // don't need to do anything because we emit in reaction to events above
    }
}

Idle WaterMark

如果source的一个分区在某段时间内没有数据,那么这个分区产生的WaterMark值将停滞,由于Flink取时间戳最小的WaterMark,那么会导致下游整体的WaterMark停滞

WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(20))
    // 超过1min没有数据, 生成的Watermark会携带idle标记, 不会阻碍下游的运行
    .withIdleness(Duration.ofMinutes(1));

REFERENCE

  1. CARBONE P, KATSIFODIMOS A, EWEN S, 等. Apache flink: Stream and batch processing in a single engine[J]. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, IEEE Computer Society, 2015, 36(4).
  2. flink官方文档

文档信息

Search

    Table of Contents