Spark中的事件总线用于接受事件并提交到对应的监听器中。
ListenerBus
ListenerBus[L <: AnyRef, E]
是事件总线的基础特质,支持任何类型的监听器,E
表示事件类型,接受事件并且将事件提交到对应事件的监听器。
主要属性如下
listeners
,listenersPlusTimers
:维护了所有的监听器和对应的定时器,数据结构为线程安全的CopyOnWriteArrayList
适用于读多写少的业务场景,满足数据的最终一致性private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])] // Marked `private[spark]` for access in tests. private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
主要方法如下
addListener()
,removeListener()
:从listenersPlusTimers
中增加或者删除监听器和计时器postToAll()
:遍历listenersPlusTimers
并调用未实现的doPostEvent()
方法发送事件
如图所示,每个实现类实现了doPostEvent
方法,利用模式匹配将特定的事件投递到对应的监视器类型。
以SparkListenerBus
为例,实现的doPostEvent
方法用于将继承自SparkListenerEvent
的样例类事件投递到继承自SparkListenerInterface
的监听器中。SparkListenerInterface
监视器定义了未实现的回调函数,用于处理对应的事件。
// 监听器
private[spark] trait SparkListenerInterface {
/**
* Called when a stage completes successfully or fails, with information on the completed stage.
*/
def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit
...
}
// 事件
@DeveloperApi
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent {
/* Whether output this event to the event log */
protected[spark] def logEvent: Boolean = true
}
...
@DeveloperApi
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
...
// 事件总线
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
...
}
}
}
AsyncEventQueue
AsyncEventQueue
继承自SparkListenerBus
是事件的异步队列,事件的分发都将分配独立的线程,防止在监听器和事件较多的情况下,同步调用造成事件积压的情况。
下面是一些重要的属性:
eventQueue
:数据结构为LinkedBlockingQueue
,维护了队列前文所述的继承自SparkListenerEvent
的样例类事件- 通过
spark.scheduler.listenerbus.eventqueue.capacity
属性来设置阻塞队列的大小,默认为10000。 LinkedBlockingQueue
如果不指定大小就为无限大,设置队列大小是为了可以抛出显式的异常而不是一个OOM异常。
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) // org.apache.spark.internal.config private[spark] val LISTENER_BUS_EVENT_QUEUE_CAPACITY = ConfigBuilder("spark.scheduler.listenerbus.eventqueue.capacity") .intConf .checkValue(_ > 0, "The capacity of listener bus event queue must be positive") .createWithDefault(10000)
- 通过
eventCount
:记录了未处理事件的个数,从eventQueue
中弹出的事件并不代表被完全处理完,不能用队列大小代表未处理事件大小。- 使用了
AtomicLong
原子量来保证修改的原子性
- 使用了
droppedEventsCounter
:记录被丢弃事件的计数,eventQueue
队列满了后,新产生的事件被丢弃。- 使用了
AtomicLong
原子量来保证修改的原子性
- 使用了
started
、stopped
:记录队列的启动和停止状态- 使用
AtomicBoolean
保证修改的原子性
- 使用
dispatchThread
:循环调用dispatch()
方法的线程,tryOrStopSparkContext(sc)
保证了遇到无法控制的异常时SparkContext
能正常退出
下面是一些重要的方法
dispatch()
:将队列中的事件循环取出并调用其特质ListenerBus
实现的postToAll
方法发送给对应的注册过的监听器,直到遇到了放在队列中的哨兵POISON_PILL
就会停止发送事件private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { var next: SparkListenerEvent = eventQueue.take() while (next != POISON_PILL) { val ctx = processingTime.time() try { super.postToAll(next) } finally { ctx.stop() } eventCount.decrementAndGet() next = eventQueue.take() } eventCount.decrementAndGet() }
post()
:向队列中添加事件,如果队列满了,丢弃当前事件并记录日志。这是个生产者消费者模型,当队列满时生产者丢弃事件,但队列为空时消费者等待生产者。def post(event: SparkListenerEvent): Unit = { if (stopped.get()) { return } eventCount.incrementAndGet() if (eventQueue.offer(event)) { return } eventCount.decrementAndGet() droppedEvents.inc() droppedEventsCounter.incrementAndGet() if (logDroppedEvent.compareAndSet(false, true)) { // Only log the following message once to avoid duplicated annoying logs. logError(s"Dropping event from queue $name. " + "This likely means one of the listeners is too slow and cannot keep up with " + "the rate at which tasks are being started by the scheduler.") } logTrace(s"Dropping event $event") val droppedCount = droppedEventsCounter.get if (droppedCount > 0) { // Don't log too frequently if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { // There may be multiple threads trying to decrease droppedEventsCounter. // Use "compareAndSet" to make sure only one thread can win. // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and // then that thread will update it. if (droppedEventsCounter.compareAndSet(droppedCount, 0)) { val prevLastReportTimestamp = lastReportTimestamp lastReportTimestamp = System.currentTimeMillis() val previous = new java.util.Date(prevLastReportTimestamp) logWarning(s"Dropped $droppedCount events from $name since $previous.") } } } }
start()
:启动dispatchThread
,开始消费并分发事件stop()
:插入哨兵POISON_PILL
,dispatchThread
线程读取到哨兵时就会停止事件的分发private[scheduler] def stop(): Unit = { if (!started.get()) { throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") } if (stopped.compareAndSet(false, true)) { eventCount.incrementAndGet() eventQueue.put(POISON_PILL) } // this thread might be trying to stop itself as part of error handling -- we can't join // in that case. if (Thread.currentThread() != dispatchThread) { dispatchThread.join() } }
LiveListenerBus
LiveListenerBus
内部使用AsyncEventQueue
作为核心,异步地发送SparkListenerEvents
事件给已注册的SparkListeners
监听器。
属性与AsyncEventQueue
大同小异,下面介绍一些重要属性
queues
:CopyOnWriteArrayList[AsyncEventQueue]
保证了线程安全,维护一个AsyncEventQueue
列表。queuedEvents
:mutable.ListBuffer[SparkListenerEvent]
,缓存再启动前接受到的事件。
下面介绍一下重要的方法,主要时启动停止和注册
addToQueue()
方法将监听器注册到指定名称的队列,addToxxx()
根据SparkListenerInterface
未实现的回调函数进一步封装了addToQueue()
方法- 首先寻找队列是否存在,如果存在就注册,不存在就创建新队列并注册
private[spark] def addToQueue( listener: SparkListenerInterface, queue: String): Unit = synchronized { if (stopped.get()) { throw new IllegalStateException("LiveListenerBus is stopped.") } queues.asScala.find(_.name == queue) match { case Some(queue) => queue.addListener(listener) case None => val newQueue = new AsyncEventQueue(queue, conf, metrics, this) newQueue.addListener(listener) if (started.get()) { newQueue.start(sparkContext) } queues.add(newQueue) } } def addToEventLogQueue(listener: SparkListenerInterface): Unit = { addToQueue(listener, EVENT_LOG_QUEUE) }
removeListener()
:在queues
中的所有队列中删除指定的监视器,如果删除后队列为空,就移除队列post()
:将事件发送到所有队列上。如果queuedEvents
不为空,即存在缓存的事件,则将缓存的事件和当前事件一起发送。def post(event: SparkListenerEvent): Unit = { if (stopped.get()) { return } metrics.numEventsPosted.inc() // If the event buffer is null, it means the bus has been started and we can avoid // synchronization and post events directly to the queues. This should be the most // common case during the life of the bus. if (queuedEvents == null) { postToQueues(event) return } // Otherwise, need to synchronize to check whether the bus is started, to make sure the thread // calling start() picks up the new event. synchronized { if (!started.get()) { queuedEvents += event return } } // If the bus was already started when the check above was made, just post directly to the // queues. postToQueues(event) } private def postToQueues(event: SparkListenerEvent): Unit = { val it = queues.iterator() while (it.hasNext()) { it.next().post(event) } }
start()
:启动每个队列,并发送queuedEvents
中缓存的事件。每个队列就开始消费之前post的事件并调用postToAll()
方法将事件发送给监视器。stop()
:停止所有队列并清空queue
。
总结
REFERENCE
- AsyncEventQueue与LiveListenerBus
- Spark消息总线实现
- Spark内核设计的艺术:架构设计与实现
文档信息
- 本文作者:wzx
- 本文链接:https://masterwangzx.com/2020/07/22/listener-bus/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)