WZX's blog 海滩上捡贝壳的孩子

Flink基本概念


Flink是一个分布式处理流式或者批量数据的系统,适用于多种数据处理应用,如实时分析,历史数据处理,迭代算法等,将它们转化为拥有流水机制和容错机制的数据流进行处理。

架构

部署层

  • 本地模式:本地JVM模拟集群
  • 集群模式:standalone,YARN
  • 云模式:部署在云服务器上

runtime层

runtime层以JobGraph的形式接收程序并执行

  • client:把代码转化为数据流图,提交给 JobManager
  • JobManager:协调数据流的分布式执行
    • 跟踪每个运算符和流的状态和进度,调度新算子
    • 协调检查点,当配置了高可用以后,JobManager 可以把每个检查点处最小的元数据集持久化到一个具有容错性的存储中。之后standby JobManager 就可以重建检查点并且恢复数据流的执行。
  • TaskManager:具体执行任务
    • 执行一个或多个产生流的算子,并将其状态报告给JobManager。
    • 维护缓冲池以缓冲或物化流,并维护网络连接以在算子之间交换数据流

API&LIB层

DataSet用于处理有限数据集即批处理,而DataStream用于处理无界数据流即流处理DataStream APIDataSet API均通过单独的编译过程生成JobGraphs,即由有状态算子连接的数据流DAG,交由下层runtime层去执行。

Flink自带了一些用于特定领域的库,这些库会生成DataSet APIDataStream API程序。目前,FlinkML用于机器学习,Gelly用于图计算,而Table用于SQL操作。

数据流

数据流图

数据流图是一个由有状态的算子和可被其他算子消费的由算子产生的数据流组成。如图所示,数据流图以并行化的方式执行,所以算子会被并行化为一个或多个并行子任务,而数据流被拆分为一个或多个流分区,每个子任务对应于一个分区。

数据交换

如图所示,OP1算子接收SRC1算子的输出,并将运算结果传递到下一层的SNK1算子。这三个算子由两个数据流进行连接,分别是IS1和IS3。

IS1是一种暂时性的中间结果,称为流水线中间流,生产者算子处理一个record之后,可以直接将其传递到消费者算子继续处理,SRC1算子和OP1算子可以并行运行。流水线中间流通过中间缓冲池来补偿短期的吞吐量波动,下层消费者会向上层生产者传播背压以维持流水线机制。Flink将流水线流用于连续流以及批处理程序中,以尽可能避免中间流的持久化。

IS3是需要将流序列化到非易失性存储的数据流,称为阻塞数据流,OP1算子的输出数据序列化到磁盘中之后,SNK1算子才能启动消费,这样将生产与消费划分为了不同的执行阶段。阻塞数据流要求生产者必须生产一定量的数据之后,才能用于下层的消费,它会先将积累的records存储到内存中,如果内存不够,那就序列化到磁盘中。

延迟与吞吐量

当在生产者端的一个record准备完成时,record会被序列化放入缓冲中,算子间通过交换缓冲的方式来交换数据当缓冲已满或达到超时条件时,生产者将缓冲发送给消费者。如图所示,缓冲容量大,吞吐量就高;缓冲超时短,延迟就低

控制事件

算子产生的控制事件随着其他record在数据流分区中传输。

  • checkpoint barriers:用于容错。在流中插入checkpoint事件,会促使流将当前的状态保存下来,当发生故障后,可以直接使用上一次的checkpoint来恢复
  • watermarks:标识流分区中的事件时间
  • iteration barriers:用于类似机器学习的迭代计算

迭代数据流

增量处理和迭代对于图计算和机器学习非常重要。一般的并行处理平台通过提交新job,增加新结点或者feedback边来支持迭代处理。Flink将迭代的头部和尾部算子用feedback边隐式连接。

容错

Flink通过严格只处理一次的一致性保证检查点与分区重新执行来保证执行的可靠性。因为数据源是持久并可以重新获得的,如文件,持久的消息队列等,非持久数据源通过日志实现持久化。

如果发生程序故障,Flink将停止分布式流数据流。然后,系统重新启动算子,并将他们重置为最近检查点的状态。输入流也用状态快照重置,并保证作为重新启动的并行数据流中的的任何record都在所恢复检查点之后。

检查点

容错机制会绘制分布式数据流和算子的状态的一致快照,包含所有算子的状态和一定时间间隔内的输入数据流。

Barrier

checkpoint barrier 插入到数据流中,并和record一起流动。barrier 严格地按照直线流动并且不会超过 record。如图所示,barrier 带有一个ID,并且将数据流中的record分为当前快照的record集(右部)和下一个快照的record集(左部)。barrier 不会中断数据流的流动,因此非常轻量级。来自不同快照的多个barrier可以同时出现在数据流中,这意味着可以同时进行多个快照。

算子接收到barrier时进入对齐阶段,确保接收到每个输入流中的 barrier,才会向输出流中传递barrier。当sink(数据流DAG中的结尾)接收到barrier n时,它会向检查点协调器确认快照n。当所有的sink都接收到barrier时,快照n完成。

如图所示,当算子接收多个输入流时,需要对 barrier 进行对齐:

  1. 当算子接收到某个输入流的 barrier n 后,它就不能继续处理该数据流的后续record,直到算子接收到其余输入流的 barrier n。否则快照n和快照n+1的record将会混淆。
  2. 算子将不能处理的record放到input buffer中
  3. 当算子接收到最后一个输入流中的 barrier n 时,算子会向后传递所有等待的输出record以及 barrier n
  4. 经过以上步骤,算子恢复所有输入流数据的处理,优先处理input buffer中的record

状态

所有类型的状态都是快照的一部分。

  • 用户定义状态:由transformation函数(map(),filter())直接创建或者修改的状态
  • 系统状态:这种状态指的是数据缓存,是算子计算的一部分。例如是窗口,其中缓存一定数量的record,直到计算完成为止。

如图所示,算子在从输入流接收到所有barrier之后,向输出流发出barrier之前,对其状态进行快照。在这个时间点,barrier之前的record进行的所有状态更新已经完成,并且没有依赖于barrier之后的record。由于快照状态占用空间可能很大,因此将其存储在可配置的后端存储系统中。默认情况下,使用JobManager的内存,但对于生产用途,应配置分布式可靠存储(例如HDFS)。状态存储后,算子确认检查点,将barrier发送到输出流,然后继续处理数据。

快照包含:

  • 对于并行输入数据源:快照创建时数据流中的位置偏移
  • 对于算子:存储在快照中的状态的指针

只一次 vs. 至少一次

对齐阶段可能会导致流处理的延迟。通常,这种额外的延迟大约是几毫秒,但是出现过异常的延迟显着增加的情况。对于要求所有记录始终具有超低等待时间(几毫秒)的应用程序,Flink可以跳过对齐阶段。当算子接收到一个barrier,就立即进行状态快照。算子在检查点n创建之前,会继续处理属于检查点n+1的record,这就导致检查点n与检查点n+1之间存在数据重叠

对于map(), flatmap(), fliter()等的并行操作即使在只一次的模式中仍然会保证至少一次,因为他们没有多个输入流

异步状态快照

当快照存储在后端存储系统中时,会停止处理输入数据,这种同步操作会在每次快照创建时引入延迟。

让算子在存储状态快照时继续处理输入,从而将状态快照以后台异步进行。为了做到这一点,算子必须能够生成一个后续修改不影响之前状态的状态对象。例如RocksDB中使用的写时复制类型的数据结构。

恢复

一旦遇到故障,Flink选择最近一个完成的检查点k,重置所有算子的状态到检查点k,数据源被置为从检查点k位置读取。如果是增量快照,算子需要从最新的全量快照回复,然后对此状态进行一系列增量更新。

基于数据流的流分析

Flink的DataStream API实现了基于Flink runtime的流分析的框架,包含乱序事件处理,定义窗口以及维护和更新用户定义状态等。

时间

  • 事件时间:创建事件的时间。通常用时间戳记来描述,例如由生产传感器或生产服务附加的时间
  • 摄取时间:事件进入数据流中的source算子的时间
  • 处理时间:正在处理数据的机器上的时钟时间

流处理器需要一种衡量事件时间进度的方法,才能支持与事件时间相关的操作。watermark是一种衡量事件时间进度的机制。watermark在数据流中流动并且携带一个时间戳t,watermark(t)表示数据流中的事件时间已达到时间t,该数据流中不应再有更老的数据(事件时间早于或等于t)

如图所示,对于有序的数据流来说,watermark周期性的出现在数据流中;对于无序的数据流来说,watermark表示该点之前,所有到特定事件时间的事件都应该到达。一旦watermark到达算子,算子就可以将其内部事件时钟提前到watermark的值。

如图所示,watermark由source直接生成,每个算子并行化的子任务都维护自己的watermark。当算子接收到watermark时,它会将内部事件时钟提前到watermark的时间,并且为后序算子生成新的watermark。当算子从多个输入流获得watermark(如keyBy partition等算子)时,算子会选择watermark中的最小值更新时间

算子的状态

尽管数据流中的许多算子一次仅处理一个事件,但有些算子会记住多个事件的信息,例如窗口是有状态的算子,将record存储在内存中不断更新的桶中,作为算子状态的一部分。

Keyed and Operator State

Keyed State以键值对方式存储,并且与数据流一样严格分区。如图所示,只有在keyBy函数之后,才可以在对应keyed stream的子任务中访问到Keyed State。这种key的对齐保证了状态更新是本地运算,从而保证了一致性且没有事务开销。

Operator State绑定在一个并行算子实例(子任务)上,流入相同子任务的数据可以访问和共享Operator State

Raw and Managed State

Managed State用Flink runtime维护的数据结构表示,例如内部哈希表或RocksDB。Managed State如ValueState、ListState、MapState等。Flink runtime对state进行编码,然后将其写入检查点。

Raw State用算子维护state的数据结构。快照时,仅将字节序列写入检查点。Raw State的数据结构对Flink不透明,只能观察到原始的字节数组。

流窗口

窗口是时间驱动(如每30s)或者事件驱动(如每100个元素)的。Flink使用assigner(分配record到特定窗口),trigger(触发窗口的执行的条件),evictor(trigger触发后,处理数据前,过滤一部分record)来配置一个窗口。以下代码描述了建立一个GlobalWindow,当窗口积累了1000个事件时,保留最新的100个并触发计算。

stream
  .window(GlobalWindow.create())
  .trigger(Count.of(1000))
  .evict(Count.of(100))

基于数据流的批处理分析

有界数据集的批处理是无界数据流的流处理的特例。

  • 批处理的容错不再使用检查点,而是完全重新执行流,因为输入是有界的。这使恢复的成本更多了,但由于避免了检查点,因此使常规处理的成本降低了
  • DataSet API中的状态操作使用简化的内存数据结构,而不是键值对索引
  • DataSet API引入特殊的算子
  • DataSet可以优化调度阶段

REFERENCE

[1] CARBONE P, KATSIFODIMOS A, EWEN S, 等. Apache flink: Stream and batch processing in a single engine[J]. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, IEEE Computer Society, 2015, 36(4).
[2] flink官方文档


Comments

Content