用Spark
实现了频繁项集挖掘算法,记录自己实践中的思考
Apriori
定义与性质
定义
- 项集中项目(属性)的个数称为项集的维数或长度,若项集的长度为k,称为k-项集。
- 项集的出现频率是包含项集的事务个数,称为项集的支持度计数。
- 如果一个项集的支持度大于或等于预先设定的最小支持度
min_sup
,则称该项集为频繁k项集,记为 $L_k$
性质
- 频繁项集的所有子项集都是频繁的,非频繁项集的所有超集都是非频繁的
- 频繁k项集只能由频繁k-1项集组合而成
- 频繁k项集的子项集都是频繁的
算法流程
- 扫描事务数据库,生成所有候选1项集记为 $C_1$
- 扫描事务数据库,计算 $C_1$ 中各项集的支持度,并根据最小支持度
min_sup
过滤,得到频繁1项集 $L_1$ - 由 $L_1$ 中的频繁1项集相互连接生成候选2项集 $C_2$
- 对于 $C_2$ 中的每一个项集x,x的每一个子项集都应该存在于 $L_1$ 中(每一个子集都是频繁项集),依据这个性质对 $C_2$ 剪枝
- 扫描事务数据库,计算 $C_2$ 中各项集的支持度,并根据最小支持度
min_sup
过滤,得到频繁2项集 $L_2$ - 重复以上流程,直至不能生成非空的频繁项集
优化
事务压缩
计算支持度计数时,存在扫描事务数据库的次数过多,每次更新支持度的时候都需要重新扫描事务数据库的问题。如果事务不包含任何频繁k项集,那么这个事务就不会包含任何频繁k+1项集。所以我们可以将该事务删去,提高之后的迭代中,扫描事务数据库的效率。
具体地,在算法流程中的第5步,计算候选集支持度时,增加判断该事务是否使用。
布尔矩阵
将事务数据库转化为布尔矩阵,如图所示,矩阵中的行代表每个事务,列代表每个事务项的存在性。这样做的好处是,可以非常方便地计算项集的支持度计数。如果要计算 $I_2I_4$ 的支持度计数,只需把 $I_2$ 和 $I_4$ 两列的向量做按位与运算,运算结果中为1
的个数即为该项集的支持度计数
算法实现
算法可以主要划分为初始化,生成候选集,候选集计数以及主体迭代计算这几个模块。
初始化
这一阶段主要工作是,简化事务项,过滤重复的事务,生成布尔矩阵和频繁1项集。
transactions
为读取的事务数据库。为简化transactions
中每个事务的事务项,将其转化为特定的index数字。根据transactions
生成频繁1项集l1
,l1
就规定了事务项到index的映射。item2Rank
规定了index到事务项的映射
val temp = transactions.flatMap(_.map((_, 1)))
.reduceByKey(_ + _)
.filter { case (_, count) => count >= minCount }
// 根据支持度计数由大到小排列
.sortBy(-_._2)
.cache()
// 频繁1项集,用作索引映射元素值
val l1 = temp.map(_._1).collect()
// 事务集中的元素值映射索引
val item2Rank = l1.zipWithIndex.toMap
过滤重复事务,由cntMap
记录事务的个数。事务压缩,过滤掉不包含频繁1项集的事务以及事务项。
val l1BC = sc.broadcast(l1)
val tmp = transactions
// 事务压缩,过滤掉不包含频繁1项集的事务项
.filter(_.exists(l1BC.value.contains))
// 过滤掉不包含频繁1项集的元素
.map(_.filter(l1BC.value.contains))
// 统计重复事务项
.map(x => (x.toSet, 1)).reduceByKey(_ + _)
// 事务计数表
var cntMap = tmp.map(_._2).collect()
构造布尔矩阵并collect回本地
// 构造布尔矩阵
var boolMatrix = tmp.map(_._1)
.map { transaction =>
val item2Rank = item2RankBC.value
val boolRow = new Array[Boolean](l1.length)
transaction.foreach { x =>
boolRow(item2Rank(x)) = true
}
boolRow
}.collect()
生成候选集
freqItems
为频繁k项集,是一个RDD。为避免生成的候选集过多,采用以下方法生成候选集:当两个频繁k项集中有 k-2 个元素相同时,便连接这两个项集,恰好生成一个候选k+1项集。所以在生成候选集时,必须先把频繁项集collect到本地。
当然,还有另外一种方法生成候选集而不用把频繁项集collect到本地。对于每个频繁k项集,只要加上一个不在频繁k项集的事务项,就可以组成候选k+1项集。但是经过测试,这种方法生成的候选集太多,导致支持度计数的计算量增加,事务压缩的效果也减弱,在面对大量数据时,效率不高。
val kItem = freqItems.collect()
val candidates = collection.mutable.ListBuffer.empty[Set[Int]]
kItem.indices.foreach { i =>
Range(i + 1, kItem.length).foreach { j =>
if ((kItem(i) & kItem(j)).size == k - 2) {
candidates.append(kItem(i) | kItem(j))
}
}
}
// 剪枝,频繁项集的子集也是频繁项集
sc.parallelize(candidates.distinct.filter(x => kItem.exists(_.subsetOf(x))))
支持度计数
此阶段除了计算候选集的支持度计数还要进行事务压缩。
由于已经构建布尔矩阵,相比事务数据库已经压缩了很多,所以将本地的布尔矩阵广播到各个worker,计算worker上的候选集的支持度计数。计算支持度计数时,对列向量间做按位与运算,统计true的个数;事务压缩时,对列向量间做位或运算,过滤编号为false的事务(布尔矩阵的行)
val boolMatrixBC = sc.broadcast(boolMatrix)
val cntMapBC = sc.broadcast(cntMap)
// 统计候选项对应事务的布尔向量
val tmp = candidates.map { item =>
val boolMatrix = boolMatrixBC.value
val boolVector = item.map(x => boolMatrix.map(_ (x)))
// 布尔向量间做按位与
.reduce(_.zip(_).map(x => x._1 & x._2))
(item, boolVector)
}.cache()
// 支持度计数
val candidatesWithCnt = tmp.map { case (item, vector) =>
val cntMap = cntMapBC.value
val count = vector.zip(cntMap).map { case (flag, count) =>
if (flag) count else 0
}.sum
(item, count)
}
// 事务的使用表
val usedMap = tmp.map(_._2)
// 布尔向量间做按位或
.reduce(_.zip(_).map(x => x._1 | x._2))
// 事务压缩
val newBoolMatrix = boolMatrix.zip(usedMap).filter(_._2).map(_._1)
val newCntMap = cntMap.zip(usedMap).filter(_._2).map(_._1)
迭代计算
依次执行 生成候选集->计算支持度->支持度过滤,直至不能生成非空的频繁项集和候选集为止。
freqItemsWithCnt
通过不断合并新的频繁项集来保存结果。由于union
是transformation算子,而在genCandidates
方法中,lkWithCnt
会collect到本地而计算一次,所以将lkWithCnt
持久化以防止重复计算。
val loop = new Breaks
loop.breakable {
while (!lkWithCnt.isEmpty) {
// k阶候选集
val candidates = genCandidates(sc, lkWithCnt.map(_._1), k, minCount)
.persist(StorageLevel.MEMORY_AND_DISK)
if (candidates.isEmpty) {
loop.break()
}
// 事务压缩,计算支持度计数
val (candidatesWithCnt, newBoolMatrix, newCntMap) = calCount(sc, candidates, boolMatrix, cntMap)
boolMatrix = newBoolMatrix
cntMap = newCntMap
// 支持度过滤
lkWithCnt = candidatesWithCnt.filter { case (_, count) => count >= minCount }
.persist(StorageLevel.MEMORY_AND_DISK)
freqItemsWithCnt = freqItemsWithCnt.union(lkWithCnt)
candidates.unpersist()
k += 1
}
}
实验
数据集采用fimi.ua.ac.be/data下的蘑菇数据集。集群采用本地虚拟机,一个master和两个slave。
在最小支持度为0.4的情况下,对是否采用事务压缩的 Apriori
算法进行对比。
原始 事务压缩
-----transaction : 8124----- -----transaction : 8124-----
-----L1 : 21----- -----L1 : 21-----
-----C2 : 210----- -----C2 : 210-----
-----transaction : 141----- -----transaction : 141-----
-----L2 : 97----- -----L2 : 97-----
-----C3 : 689----- -----C3 : 689-----
-----transaction : 141----- -----transaction : 141-----
-----L3 : 185----- -----L3 : 185-----
-----C4 : 1012----- -----C4 : 1012-----
-----transaction : 141----- -----transaction : 141-----
-----L4 : 170----- -----L4 : 170-----
-----C5 : 727----- -----C5 : 727-----
-----transaction : 141----- -----transaction : 141-----
-----L5 : 76----- -----L5 : 76-----
-----C6 : 242----- -----C6 : 242-----
-----transaction : 141----- -----transaction : 130-----
-----L6 : 15----- -----L6 : 15-----
-----C7 : 28----- -----C7 : 28-----
-----transaction : 141----- -----transaction : 88-----
-----L7 : 1----- -----L7 : 1-----
-----C8 : 0----- -----C8 : 0-----
-----time : 18059ms----- -----time : 14323ms-----
在生成频繁6项集之后,事务压缩的特性才体现出来。
布尔矩阵的优化方法针对大量候选集时,才能体现出优势。在最小支持度为0.4的情况下,是否使用布尔矩阵优化对时间并无太大影响。但在最小支持度为0.2的情况下,使用了布尔矩阵优化的算法执行时间为100800ms,而不使用布尔矩阵优化的时间为251127ms
代码
REFERENCE
[1]一步步教你轻松学关联规则Apriori算法.白宁超的官网[EB/OL].
[2]廖纪勇,吴晟,刘爱莲.基于布尔矩阵约简的Apriori算法改进研究[J].计算机工程与科学
文档信息
- 本文作者:wzx
- 本文链接:https://masterwangzx.com/2020/01/31/apriori/
- 版权声明:自由转载-非商用-非衍生-保持署名(创意共享3.0许可证)