Advertisement

MapReduce实现Apriori算法

阅读量:

Apiroi算法在Hadoop MapReduce上的实现

输入格式:

一行为一个Bucket

复制代码

输出格式:

<item1,item2,...itemK, frequency>

复制代码

代码:

复制代码
      1package apriori;
      2  3import java.io.IOException;
      4import java.util.Iterator;
      5import java.util.StringTokenizer;
      6import java.util.List;
      7import java.util.ArrayList;
      8import java.util.Collections;
      9import java.util.Map;
     10import java.util.HashMap;
     11import;
     12 13import org.apache.hadoop.conf.Configuration;
     14import org.apache.hadoop.conf.Configured;
     15import org.apache.hadoop.fs.Path;
     16import org.apache.hadoop.fs.FileSystem;
     17import org.apache.hadoop.io.Text;
     18import org.apache.hadoop.io.IntWritable;
     19import org.apache.hadoop.mapreduce.Job;
     20import org.apache.hadoop.mapreduce.Mapper;
     21import org.apache.hadoop.mapreduce.Mapper.Context;
     22import org.apache.hadoop.mapreduce.Reducer;
     23import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
     24import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     25import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
     26import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
     27import org.apache.hadoop.util.Tool;
     28import org.apache.hadoop.util.ToolRunner;
     29 30classextends{
     31privatefinalstaticnew);
     32privatenew Text();
     33 34//第一次pass的Mapper只要把每个item映射为1 35publicvoidthrows IOException,InterruptedException{
     36 37);
     38forint){
     39new Text(ids[i]),one);
     40        }
     41    }
     42}
     43 44classextends{
     45privatenew IntWritable();
     46 47//所有Pass的job共用一个reducer,即统计一种itemset的个数,并筛选除大于s的 48publicvoidthrows IOException,InterruptedException{
     49int;
     50 51int);
     52for(IntWritable val : values){
     53 val.get();
     54        }
     55        result.set(sum);
     56 57if minSup){
     58            context.write(key,result);
     59        }
     60    }
     61}
     62 63classextends{
     64privatefinalstaticnew);
     65privatenew Text();
     66 67privatenew();
     68privatenew();
     69privatenew();
     70 71 72//第一个以后的pass使用该Mapper,在map函数执行前会执行setup来从k-1次pass的输出中构建候选itemsets,对应于apriori算法 73    @Override
     74publicvoidthrows IOException, InterruptedException{
     75int);
     76);
     77;
     78;
     79 80try{
     81new Path(lastPass);
     82 FileSystem.get(context.getConfiguration());
     83newnew InputStreamReader(fs.open(path)));
     84null;
     85 86whilenull){
     87 88new();
     89 90];
     91for)){
     92                    itemset.add(Integer.parseInt(itemStr));
     93                }
     94 95                prevItemsets.add(itemset);
     96            }
     97catch (Exception e){
     98            e.printStackTrace();
     99        }
    100101//get candidate itemsets from the prev itemsets102);
    103    }
    104105106publicvoidthrows IOException,InterruptedException{
    107);
    108109new();
    110for(String id : ids){ 
    111            itemset.add(Integer.parseInt(id));
    112        }
    113114//遍历所有候选集合115for candidateItemset : candidateItemsets){
    116//如果输入的一行中包含该候选集合,则映射1,这样来统计候选集合被包括的次数 
    117//子集合,消耗掉了大部分时间118if(contains(candidateItemset,itemset)){
    119;
    120forint){
    121;
    122                }
    123);
    124new Text(outputKey),one);
    125            }
    126        }
    127    }
    128129//返回items是否是allItems的子集130privateboolean allItems){
    131132int;
    133int;
    134while allItems.size()){
    135if items.get(i)){
    136returnfalse;
    137elseif items.get(i)){
    138;
    139;
    140else{
    141;
    142            }    
    143        }
    144145if items.size()){
    146returnfalse;
    147        }
    148returntrue;
    149    }
    150151//获取所有候选集合,参考apriori算法152privateint passNum){
    153154new();
    155156//上次pass的输出中选取连个itemset构造大小为k + 1的候选集合157forint){
    158forint){
    159 prevItemsets.get(i);
    160 prevItemsets.get(j);
    161162null;
    163if){
    164new();
    165));        
    166));        
    167                }
    168else{    
    169int;
    170int;
    171forint){
    172ifinnerItems.contains(outerItems.get(k))){
    173;
    174 k;
    175                        }
    176                    }
    177178if){
    179//System.out.println("inner " + innerItems + " outer : " + outerItems);180new();
    181                        newItems.addAll(innerItems);
    182                        newItems.add(outerItems.get(index));
    183                    }
    184                }
    185ifnullcontinue;}
    186187                Collections.sort(newItems);
    188189//候选集合必须满足所有的子集都在上次pass的输出中,调用isCandidate进行检测,通过后加入到候选子集和列表190ifcandidateItemsets.contains(newItems)){
    191                    candidateItemsets.add(newItems);    
    192//System.out.println(newItems);193                }
    194            }
    195        }
    196197return candidateItemsets;
    198    }
    199200privateboolean prevItemsets){
    201202 getSubsets(newItems);     
    203204for subset : subsets){
    205ifprevItemsets.contains(subset)){
    206returnfalse;
    207            }
    208        }
    209210returntrue;
    211    }
    212213private items){
    214215new();
    216forint){
    217new(items);
    218            subset.remove(i);
    219            subsets.add(subset);
    220        }
    221222return subsets;
    223    }
    224}
    225226publicclassextendsimplements Tool{
    227228publicstaticint s;
    229publicstaticint k;
    230231publicintthrows IOException,InterruptedException,ClassNotFoundException{
    232long System.currentTimeMillis();
    233234//从参数1中读取输入数据235//参数2为输出数据前缀,和第pass次组成输出目录236//阈值237//k次pass
    238239//循环执行K次pass240forint){
    241long System.currentTimeMillis();
    242243//配置执行该job244ifrunPassKMRJob(hdfsInputDir,hdfsOutputDirPrefix,pass)){
    245return;    
    246            }
    247248long System.currentTimeMillis();
    249 passStartTime));
    250        }
    251252long System.currentTimeMillis();
    253 startTime));
    254255return;
    256    }
    257258privatestaticbooleanint passNum)
    259throws IOException,InterruptedException,ClassNotFoundException{
    260261new Configuration();
    262,passNum);
    263,hdfsOutputDirPrefix);
    264,s);
    265266new passNum);
    267class);
    268if){
    269//第一次pass的Mapper类特殊对待,不许要构造候选itemsets270class);
    271            }
    272else{
    273//第一次之后的pass的Mapper类特殊对待,不许要构造候选itemsets274class);
    275            }
    276class);
    277class);
    278class);
    279280new Path(hdfsInputDir));
    281new passNum));
    282283returntrue);
    284    }
    285286publicstaticvoidthrows Exception{
    287intnew Apriori(),args);
    288        System.exit(exitCode);
    289    }
    290

转载于:https://www.cnblogs.com/codingMozart/p/5914945.html

全部评论 (0)

还没有任何评论哟~