基于Spark的Apriori算法

2020/01/31 Spark 共 5396 字,约 16 分钟

Spark实现了频繁项集挖掘算法,记录自己实践中的思考

Apriori

定义与性质

定义

  • 项集中项目(属性)的个数称为项集的维数或长度,若项集的长度为k,称为k-项集
  • 项集的出现频率是包含项集的事务个数,称为项集的支持度计数
  • 如果一个项集的支持度大于或等于预先设定的最小支持度min_sup,则称该项集为频繁k项集,记为 $L_k$

性质

  • 频繁项集的所有子项集都是频繁的,非频繁项集的所有超集都是非频繁的
    • 频繁k项集只能由频繁k-1项集组合而成
    • 频繁k项集的子项集都是频繁的

算法流程

  1. 扫描事务数据库,生成所有候选1项集记为 $C_1$
  2. 扫描事务数据库,计算 $C_1$ 中各项集的支持度,并根据最小支持度min_sup过滤,得到频繁1项集 $L_1$
  3. 由 $L_1$ 中的频繁1项集相互连接生成候选2项集 $C_2$
  4. 对于 $C_2$ 中的每一个项集x,x的每一个子项集都应该存在于 $L_1$ 中(每一个子集都是频繁项集),依据这个性质对 $C_2$ 剪枝
  5. 扫描事务数据库,计算 $C_2$ 中各项集的支持度,并根据最小支持度min_sup过滤,得到频繁2项集 $L_2$
  6. 重复以上流程,直至不能生成非空的频繁项集

优化

事务压缩

计算支持度计数时,存在扫描事务数据库的次数过多,每次更新支持度的时候都需要重新扫描事务数据库的问题。如果事务不包含任何频繁k项集,那么这个事务就不会包含任何频繁k+1项集。所以我们可以将该事务删去,提高之后的迭代中,扫描事务数据库的效率。

具体地,在算法流程中的第5步,计算候选集支持度时,增加判断该事务是否使用。

布尔矩阵

将事务数据库转化为布尔矩阵,如图所示,矩阵中的行代表每个事务,列代表每个事务项的存在性。这样做的好处是,可以非常方便地计算项集的支持度计数。如果要计算 $I_2I_4$ 的支持度计数,只需把 $I_2$ 和 $I_4$ 两列的向量做按位与运算运算结果中为1的个数即为该项集的支持度计数

算法实现

算法可以主要划分为初始化,生成候选集,候选集计数以及主体迭代计算这几个模块。

初始化

这一阶段主要工作是,简化事务项,过滤重复的事务,生成布尔矩阵和频繁1项集。

transactions为读取的事务数据库。为简化transactions中每个事务的事务项,将其转化为特定的index数字。根据transactions生成频繁1项集l1l1就规定了事务项到index的映射。item2Rank规定了index到事务项的映射

val temp = transactions.flatMap(_.map((_, 1)))
  .reduceByKey(_ + _)
  .filter { case (_, count) => count >= minCount }
  // 根据支持度计数由大到小排列
  .sortBy(-_._2)
  .cache()
// 频繁1项集,用作索引映射元素值
val l1 = temp.map(_._1).collect()
// 事务集中的元素值映射索引
val item2Rank = l1.zipWithIndex.toMap

过滤重复事务,由cntMap记录事务的个数。事务压缩,过滤掉不包含频繁1项集的事务以及事务项。

val l1BC = sc.broadcast(l1)
val tmp = transactions
  // 事务压缩,过滤掉不包含频繁1项集的事务项
  .filter(_.exists(l1BC.value.contains))
  // 过滤掉不包含频繁1项集的元素
  .map(_.filter(l1BC.value.contains))
  // 统计重复事务项
  .map(x => (x.toSet, 1)).reduceByKey(_ + _)
// 事务计数表
var cntMap = tmp.map(_._2).collect()

构造布尔矩阵并collect回本地

// 构造布尔矩阵
var boolMatrix = tmp.map(_._1)
  .map { transaction =>
    val item2Rank = item2RankBC.value
    val boolRow = new Array[Boolean](l1.length)
    transaction.foreach { x =>
      boolRow(item2Rank(x)) = true
    }

    boolRow
  }.collect()

生成候选集

freqItems为频繁k项集,是一个RDD。为避免生成的候选集过多,采用以下方法生成候选集:当两个频繁k项集中有 k-2 个元素相同时,便连接这两个项集,恰好生成一个候选k+1项集。所以在生成候选集时,必须先把频繁项集collect到本地。

当然,还有另外一种方法生成候选集而不用把频繁项集collect到本地。对于每个频繁k项集,只要加上一个不在频繁k项集的事务项,就可以组成候选k+1项集。但是经过测试,这种方法生成的候选集太多,导致支持度计数的计算量增加,事务压缩的效果也减弱,在面对大量数据时,效率不高。

val kItem = freqItems.collect()
val candidates = collection.mutable.ListBuffer.empty[Set[Int]]
kItem.indices.foreach { i =>
    Range(i + 1, kItem.length).foreach { j =>
        if ((kItem(i) & kItem(j)).size == k - 2) {
            candidates.append(kItem(i) | kItem(j))
        }
    }
}
// 剪枝,频繁项集的子集也是频繁项集
sc.parallelize(candidates.distinct.filter(x => kItem.exists(_.subsetOf(x))))

支持度计数

此阶段除了计算候选集的支持度计数还要进行事务压缩。

由于已经构建布尔矩阵,相比事务数据库已经压缩了很多,所以将本地的布尔矩阵广播到各个worker,计算worker上的候选集的支持度计数。计算支持度计数时,对列向量间做按位与运算,统计true的个数;事务压缩时,对列向量间做位或运算,过滤编号为false的事务(布尔矩阵的行)

val boolMatrixBC = sc.broadcast(boolMatrix)
val cntMapBC = sc.broadcast(cntMap)
// 统计候选项对应事务的布尔向量
val tmp = candidates.map { item =>
    val boolMatrix = boolMatrixBC.value
    val boolVector = item.map(x => boolMatrix.map(_ (x)))
        // 布尔向量间做按位与
        .reduce(_.zip(_).map(x => x._1 & x._2))

    (item, boolVector)
}.cache()
// 支持度计数
val candidatesWithCnt = tmp.map { case (item, vector) =>
    val cntMap = cntMapBC.value
    val count = vector.zip(cntMap).map { case (flag, count) =>
        if (flag) count else 0
    }.sum

(item, count)
}
// 事务的使用表
val usedMap = tmp.map(_._2)
     // 布尔向量间做按位或
     .reduce(_.zip(_).map(x => x._1 | x._2))
// 事务压缩
val newBoolMatrix = boolMatrix.zip(usedMap).filter(_._2).map(_._1)
val newCntMap = cntMap.zip(usedMap).filter(_._2).map(_._1)

迭代计算

依次执行 生成候选集->计算支持度->支持度过滤,直至不能生成非空的频繁项集和候选集为止。

freqItemsWithCnt通过不断合并新的频繁项集来保存结果。由于union是transformation算子,而在genCandidates方法中,lkWithCnt会collect到本地而计算一次,所以lkWithCnt持久化以防止重复计算

val loop = new Breaks
loop.breakable {
    while (!lkWithCnt.isEmpty) {
        // k阶候选集
        val candidates = genCandidates(sc, lkWithCnt.map(_._1), k, minCount)
          .persist(StorageLevel.MEMORY_AND_DISK)
        if (candidates.isEmpty) {
          loop.break()
        }
        // 事务压缩,计算支持度计数
        val (candidatesWithCnt, newBoolMatrix, newCntMap) = calCount(sc, candidates, boolMatrix, cntMap)
        boolMatrix = newBoolMatrix
        cntMap = newCntMap
        // 支持度过滤
        lkWithCnt = candidatesWithCnt.filter { case (_, count) => count >= minCount }
          .persist(StorageLevel.MEMORY_AND_DISK)
        freqItemsWithCnt = freqItemsWithCnt.union(lkWithCnt)
        candidates.unpersist()
        k += 1
    }
}

实验

数据集采用fimi.ua.ac.be/data下的蘑菇数据集。集群采用本地虚拟机,一个master和两个slave。

在最小支持度为0.4的情况下,对是否采用事务压缩的 Apriori 算法进行对比。

        原始                                   事务压缩
-----transaction : 8124-----		-----transaction : 8124-----															
-----L1          : 21-----			-----L1          : 21-----														

-----C2          : 210-----			-----C2          : 210-----														
-----transaction : 141-----			-----transaction : 141-----														
-----L2          : 97-----			-----L2          : 97-----														

-----C3          : 689-----			-----C3          : 689-----														
-----transaction : 141-----			-----transaction : 141-----														
-----L3          : 185-----			-----L3          : 185-----														

-----C4          : 1012-----		-----C4          : 1012-----															
-----transaction : 141-----			-----transaction : 141-----														
-----L4          : 170-----			-----L4          : 170-----														

-----C5          : 727-----			-----C5          : 727-----														
-----transaction : 141-----			-----transaction : 141-----														
-----L5          : 76-----			-----L5          : 76-----														

-----C6          : 242-----			-----C6          : 242-----														
-----transaction : 141-----			-----transaction : 130-----														
-----L6          : 15-----			-----L6          : 15-----														

-----C7          : 28-----			-----C7          : 28-----														
-----transaction : 141-----			-----transaction : 88-----														
-----L7          : 1-----			-----L7          : 1-----														

-----C8          : 0-----			-----C8          : 0-----														
-----time        : 18059ms-----		-----time        : 14323ms-----

在生成频繁6项集之后,事务压缩的特性才体现出来。

布尔矩阵的优化方法针对大量候选集时,才能体现出优势。在最小支持度为0.4的情况下,是否使用布尔矩阵优化对时间并无太大影响。但在最小支持度为0.2的情况下,使用了布尔矩阵优化的算法执行时间为100800ms,而不使用布尔矩阵优化的时间为251127ms

代码

wzx140/OptimizedApriori - GitHub

REFERENCE

[1]一步步教你轻松学关联规则Apriori算法.白宁超的官网[EB/OL].
[2]廖纪勇,吴晟,刘爱莲.基于布尔矩阵约简的Apriori算法改进研究[J].计算机工程与科学

文档信息

Search

    Table of Contents