MapReduce实现Apriori算法
发布时间
阅读量:
阅读量
Apiroi算法在Hadoop MapReduce上的实现
输入格式:
一行为一个Bucket
输出格式:
<item1,item2,...itemK, frequency>
代码:
1package apriori;
2 3import java.io.IOException;
4import java.util.Iterator;
5import java.util.StringTokenizer;
6import java.util.List;
7import java.util.ArrayList;
8import java.util.Collections;
9import java.util.Map;
10import java.util.HashMap;
11import;
12 13import org.apache.hadoop.conf.Configuration;
14import org.apache.hadoop.conf.Configured;
15import org.apache.hadoop.fs.Path;
16import org.apache.hadoop.fs.FileSystem;
17import org.apache.hadoop.io.Text;
18import org.apache.hadoop.io.IntWritable;
19import org.apache.hadoop.mapreduce.Job;
20import org.apache.hadoop.mapreduce.Mapper;
21import org.apache.hadoop.mapreduce.Mapper.Context;
22import org.apache.hadoop.mapreduce.Reducer;
23import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
24import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
25import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
26import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
27import org.apache.hadoop.util.Tool;
28import org.apache.hadoop.util.ToolRunner;
29 30classextends{
31privatefinalstaticnew);
32privatenew Text();
33 34//第一次pass的Mapper只要把每个item映射为1 35publicvoidthrows IOException,InterruptedException{
36 37);
38forint){
39new Text(ids[i]),one);
40 }
41 }
42}
43 44classextends{
45privatenew IntWritable();
46 47//所有Pass的job共用一个reducer,即统计一种itemset的个数,并筛选除大于s的 48publicvoidthrows IOException,InterruptedException{
49int;
50 51int);
52for(IntWritable val : values){
53 val.get();
54 }
55 result.set(sum);
56 57if minSup){
58 context.write(key,result);
59 }
60 }
61}
62 63classextends{
64privatefinalstaticnew);
65privatenew Text();
66 67privatenew();
68privatenew();
69privatenew();
70 71 72//第一个以后的pass使用该Mapper,在map函数执行前会执行setup来从k-1次pass的输出中构建候选itemsets,对应于apriori算法 73 @Override
74publicvoidthrows IOException, InterruptedException{
75int);
76);
77;
78;
79 80try{
81new Path(lastPass);
82 FileSystem.get(context.getConfiguration());
83newnew InputStreamReader(fs.open(path)));
84null;
85 86whilenull){
87 88new();
89 90];
91for)){
92 itemset.add(Integer.parseInt(itemStr));
93 }
94 95 prevItemsets.add(itemset);
96 }
97catch (Exception e){
98 e.printStackTrace();
99 }
100101//get candidate itemsets from the prev itemsets102);
103 }
104105106publicvoidthrows IOException,InterruptedException{
107);
108109new();
110for(String id : ids){
111 itemset.add(Integer.parseInt(id));
112 }
113114//遍历所有候选集合115for candidateItemset : candidateItemsets){
116//如果输入的一行中包含该候选集合,则映射1,这样来统计候选集合被包括的次数
117//子集合,消耗掉了大部分时间118if(contains(candidateItemset,itemset)){
119;
120forint){
121;
122 }
123);
124new Text(outputKey),one);
125 }
126 }
127 }
128129//返回items是否是allItems的子集130privateboolean allItems){
131132int;
133int;
134while allItems.size()){
135if items.get(i)){
136returnfalse;
137elseif items.get(i)){
138;
139;
140else{
141;
142 }
143 }
144145if items.size()){
146returnfalse;
147 }
148returntrue;
149 }
150151//获取所有候选集合,参考apriori算法152privateint passNum){
153154new();
155156//上次pass的输出中选取连个itemset构造大小为k + 1的候选集合157forint){
158forint){
159 prevItemsets.get(i);
160 prevItemsets.get(j);
161162null;
163if){
164new();
165));
166));
167 }
168else{
169int;
170int;
171forint){
172ifinnerItems.contains(outerItems.get(k))){
173;
174 k;
175 }
176 }
177178if){
179//System.out.println("inner " + innerItems + " outer : " + outerItems);180new();
181 newItems.addAll(innerItems);
182 newItems.add(outerItems.get(index));
183 }
184 }
185ifnullcontinue;}
186187 Collections.sort(newItems);
188189//候选集合必须满足所有的子集都在上次pass的输出中,调用isCandidate进行检测,通过后加入到候选子集和列表190ifcandidateItemsets.contains(newItems)){
191 candidateItemsets.add(newItems);
192//System.out.println(newItems);193 }
194 }
195 }
196197return candidateItemsets;
198 }
199200privateboolean prevItemsets){
201202 getSubsets(newItems);
203204for subset : subsets){
205ifprevItemsets.contains(subset)){
206returnfalse;
207 }
208 }
209210returntrue;
211 }
212213private items){
214215new();
216forint){
217new(items);
218 subset.remove(i);
219 subsets.add(subset);
220 }
221222return subsets;
223 }
224}
225226publicclassextendsimplements Tool{
227228publicstaticint s;
229publicstaticint k;
230231publicintthrows IOException,InterruptedException,ClassNotFoundException{
232long System.currentTimeMillis();
233234//从参数1中读取输入数据235//参数2为输出数据前缀,和第pass次组成输出目录236//阈值237//k次pass
238239//循环执行K次pass240forint){
241long System.currentTimeMillis();
242243//配置执行该job244ifrunPassKMRJob(hdfsInputDir,hdfsOutputDirPrefix,pass)){
245return;
246 }
247248long System.currentTimeMillis();
249 passStartTime));
250 }
251252long System.currentTimeMillis();
253 startTime));
254255return;
256 }
257258privatestaticbooleanint passNum)
259throws IOException,InterruptedException,ClassNotFoundException{
260261new Configuration();
262,passNum);
263,hdfsOutputDirPrefix);
264,s);
265266new passNum);
267class);
268if){
269//第一次pass的Mapper类特殊对待,不许要构造候选itemsets270class);
271 }
272else{
273//第一次之后的pass的Mapper类特殊对待,不许要构造候选itemsets274class);
275 }
276class);
277class);
278class);
279280new Path(hdfsInputDir));
281new passNum));
282283returntrue);
284 }
285286publicstaticvoidthrows Exception{
287intnew Apriori(),args);
288 System.exit(exitCode);
289 }
290
全部评论 (0)
还没有任何评论哟~
