MapReduce编程:最大值、最小值、平均值、计数、中位数、标准差
在MapReduce编程中,最基础且经典的案例无异于Wordcount程序。其核心功能则体现在对评论数据的最大值、最小值、平均值及标准差进行计算上。作为教材,《MapReduce编程与设计模式》一书中亦在第一章详细阐述了这一核心算法。具体而言,这些数据以.xml文件的形式作为输入提供。为了实现映射器的功能,在将XML数据导入mapper之前,必须将其转换为键-值对的形式:transformXmlToMap(value.toString());
<row Id="1" PostId="35314" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2018-09-06T08:07:10.730" UserId="1" />
<row Id="1" PostId="35315" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2007-09-06T08:05:33.730" UserId="1" />
<row Id="1" PostId="35316" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-09-06T08:07:10.730" UserId="1" />
<row Id="1" PostId="35317" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-08-06T08:07:26.730" UserId="1" />
<row Id="2" PostId="35318" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-05-06T08:11:10.730" UserId="1" />
<row Id="2" PostId="35319" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-09-06T08:12:10.730" UserId="1" />
<row Id="2" PostId="35320" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-06-06T08:03:10.730" UserId="1" />
<row Id="2" PostId="35321" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-09-06T08:07:10.880" UserId="1" />
<row Id="2" PostId="35322" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2016-09-06T08:07:39.730" UserId="1" />
<row Id="2" PostId="35323" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2008-03-06T08:07:10.730" UserId="1" />
<row Id="3" PostId="35324" Score="39" Text="not sure why this is getting downvoted -- it is correct! Double check it in your compiler if you don't believe him!" CreationDate="2007-09-06T08:00:22.730" UserId="1" />
代码解释
在课本中并未展示该函数的具体实现细节。然而它仍然是一个基础且重要的工具类。我们可以自行开发这一功能的主要目标在于将文本内容提取并组织成一个映射数据结构。
public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
"p4", "p6" };
// This helper function parses the stackoverflow into a Map for us.
public static Map<String, String> transformXmlToMap(String xml) {
Map<String, String> map = new HashMap<String, String>();
try {
String[] tokens = xml.trim().substring(5, xml.trim().length() - 3)
.split("\"");
for (int i = 0; i < tokens.length - 1; i += 2) {
String key = tokens[i].trim();
String val = tokens[i + 1];
map.put(key.substring(0, key.length() - 1), val);
}
} catch (StringIndexOutOfBoundsException e) {
System.err.println(xml);
}
return map;
}
代码解释
然后紧接着是一个最大最小值类:这段代码完全源自教材原版内容。尽管平均值与标准差的相关计算在教材中受限于篇幅未能展开详细编码过程,但仅写出核心mapper功能模块与reducer部分。然而,在编写主类结构时仍可参考最大最小值类的经验和模式,在编写过程中遵循类似的架构模式完成完整的实现方案。
最小值
最小值
通过分析计算最大最小值类中的字段时发现该代码设计了一个能够处理用户多个评论的数据结构,并且特别关注了两个关键指标:最早出现的评论时间和最晚出现的评论时间(分别对应于最小值和最大值)。此外还包含了一个用于计算用户的总数的count字段。
需要注意的是其中重载了一个toString函数用于输出格式的统一和规范。
其次,在运行过程中,默认情况下MapReduce框架预设了数据排序功能。无论用户的处理目标是否需要进行最终的排序,在mapper阶段该框架仍会自动执行一次数据整理与排列操作。由于这种机制通常能提高处理效率和数据一致性,在设计上这正是该框架核心理念之一。
package mapreduce_2019;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.hadoop.fs.Path;
//import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
public class MinMaxCountTuple implements Writable{
private Date min=new Date();///最早时间
private Date max=new Date();///最晚时间
private long count = 0;///总计数
private final static SimpleDateFormat frmt =new SimpleDateFormat(
"yyy-MM-dd'T'HH:mm:ss.SSS");
public Date getMin(){
return min;
}
public void setMin(Date min){
this.min=min;
}
public Date getMax(){
return max;
}
public void setMax(Date max){
this.max=max;
}
public long getCount(){
return count;
}
public void setCount(long count){
this.count=count;
}
public void readFields(DataInput in) throws IOException{
min=new Date(in.readLong());
max=new Date(in.readLong());
count=in.readLong();
}
public void write(DataOutput out) throws IOException {
out.writeLong(min.getTime());
out.writeLong(max.getTime());
out.writeLong(count);
}
public String toString(){
return frmt.format(min)+"\t"+frmt.format(max)+"\t"+count;
}
public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
"p4", "p6" };
// This helper function parses the stackoverflow into a Map for us.
public static Map<String, String> transformXmlToMap(String xml) {
Map<String, String> map = new HashMap<String, String>();
try {
String[] tokens = xml.trim().substring(5, xml.trim().length() - 3)
.split("\"");
for (int i = 0; i < tokens.length - 1; i += 2) {
String key = tokens[i].trim();
String val = tokens[i + 1];
map.put(key.substring(0, key.length() - 1), val);
}
} catch (StringIndexOutOfBoundsException e) {
System.err.println(xml);
}
return map;
}
public static class MinMaxCountMapper extends Mapper <Object,Text,Text,MinMaxCountTuple>{
private Text outUserId = new Text();
private MinMaxCountTuple outTuple = new MinMaxCountTuple();
private final static SimpleDateFormat frmt= new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
Map <String,String> parsed=transformXmlToMap(value.toString());
String strDate = parsed.get("CreationDate");
String userId = parsed.get("UserId");
Date creationDate = null;
try {
creationDate = frmt.parse(strDate);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
outTuple.setMin(creationDate);
outTuple.setMax(creationDate);
outTuple.setCount(1);
outUserId.set(userId);
context.write(outUserId, outTuple);
}
}
public static class MinMaxCountReducer
extends Reducer<Text,MinMaxCountTuple,Text,MinMaxCountTuple>{
private MinMaxCountTuple result = new MinMaxCountTuple();
public void reduce(Text key,Iterable <MinMaxCountTuple> values,Context context) throws IOException,InterruptedException{
result.setMin(null);
result.setMax(null);
result.setCount(0);
int sum=0;
for(MinMaxCountTuple val:values){
if(result.getMin()==null||val.getMin().compareTo(result.getMin())<0){
result.setMin(val.getMin());
}
if(result.getMax()==null||result.getMax().compareTo(result.getMax())>0){
result.setMax(val.getMax());
}
sum+=val.getCount();
}
result.setCount(sum);
context.write(key, result);
}
}
/*============================================================================================================*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// FileUtil.deleteDir("output");
// String[] otherargs = new String[]{"hdfs://master:9000/input/Comments.xml", "hdfs://master:9000/output1"};
// if (otherargs.length != 2) {
// System.err.println("Usage: mergesort <in> <out>");
// System.exit(2);
// }
Job job = Job.getInstance();
job.setJarByClass(MinMaxCountTuple.class);
job.setMapperClass(MinMaxCountMapper.class);
job.setReducerClass(MinMaxCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MinMaxCountTuple.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
代码解释
此代码输入带两个参数:文件输入路径,结果输出路径。
将其导出.jar包之后再hdfs上运行即可。
在本节中将介绍统计指标中的平均值和计数模块。该模块的关键组件包括mapper和reducer两个主要功能单元。需要注意的是,在这一模块中计算的是评论文本长度的均值。
package mapreduce_2019;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Reducer;
public class CountAverageTuple implements Writable{
private float count =0;///总计数
private float average=0;///平均值
public float getCount(){
return count;
}
public void setCount(float count2){
this.count=count2;
}
public float getAverage(){
return average;
}
public void setAverage(float f){
this.average=f;
}
public void readFields(DataInput in) throws IOException{
count=in.readFloat();
average=in.readFloat();
}
public void write(DataOutput out) throws IOException {
out.writeFloat(count);
out.writeFloat(average);
}
public String toString(){///重载toString函数定义输出格式
return count+"\t"+average;
}
public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
"p4", "p6" };
public static Map<String, String> transformXmlToMap(String xml) {
Map<String, String> map = new HashMap<String, String>();
try {
String[] tokens = xml.trim().substring(5, xml.trim().length() - 3).split("\"");
for (int i = 0; i < tokens.length - 1; i += 2) {
String key = tokens[i].trim();
String val = tokens[i + 1];
map.put(key.substring(0, key.length() - 1), val);
}
} catch (StringIndexOutOfBoundsException e) {
System.err.println(xml);
}
return map;
}
public static class AverageMapper extends Mapper <Object, Text, IntWritable, CountAverageTuple> {
private IntWritable outHour = new IntWritable();
private CountAverageTuple outCountAverage = new CountAverageTuple();
private final static SimpleDateFormat frmt = new SimpleDateFormat ("yyyy-MM-dd'T'HH:mm:ss.SSS");
public void map (Object key, Text value, Context context)
throws IOException, InterruptedException {
Map <String, String> parsed = transformXmlToMap (value.toString());
String strDate = parsed.get("CreationDate");
String text = parsed.get("Text");
Date creationDate = null;
try {
creationDate = frmt.parse(strDate);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
outHour.set(creationDate.getHours());///时间戳
outCountAverage.setCount(1);
outCountAverage.setAverage (text.length());
context.write(outHour, outCountAverage);
}
}
public static class AverageReducer
extends Reducer <IntWritable, CountAverageTuple,IntWritable, CountAverageTuple> {
private CountAverageTuple result = new CountAverageTuple();
public void reduce (IntWritable key, Iterable<CountAverageTuple> values, Context context) throws IOException, InterruptedException {
float sum = 0;
float count = 0;
for (CountAverageTuple val : values) {
sum+=val.getCount() * val.getAverage();
count +=val.getCount();
}
result.setCount (count) ;
result.setAverage (sum / count);///计算平均值
context.write(key,result);
}
}
/*============================================================================================================*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// FileUtil.deleteDir("output");
String[] otherargs = new String[]{"hdfs://master:9000/input/Comments.xml", "hdfs://master:9000/output2"};
if (otherargs.length != 2) {
System.err.println("Usage: mergesort <in> <out>");
System.exit(2);
}
Job job = Job.getInstance();
job.setJarByClass(CountAverageTuple.class);
job.setMapperClass(AverageMapper.class);
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(CountAverageTuple.class);
FileInputFormat.addInputPath(job, new Path(otherargs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherargs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
代码解释
计算标准差与中位数的过程,在这种情况下我们可以使用这个不带内存优化的最朴实版本。通过这种方式可以看出一个用户的评论数量,并将其以键值对的形式存储能够大幅减少所需的空间占用。原始方法是直接遍历每条评论,在整合后的处理方式下,则只需要关注同一个用户的评论数量即可确定所需的中位数值是否位于对应的键值对中。需要注意的是,在这种情况下前期可能需要跳过大部分评论数据块中的信息量较大的部分。
package mapreduce_2019;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MedianStdDevTuple implements Writable{
private float median =0;///中位数
private float stddev =0;///标准差
private final static SimpleDateFormat frmt =new SimpleDateFormat(
"yyy-MM-dd'T'HH:mm:ss.SSS");
public float getMedian(){
return median;
}
public void setMedian(float median){
this.median=median;
}
public float getStdDev(){
return stddev;
}
public void setStdDev(float f){
this.stddev=f;
}
public String toString(){///重载toString定义输出格式
return median+"\t"+stddev;
}
@Override
public void readFields(DataInput arg0) throws IOException {
// TODO Auto-generated method stub
}
@Override
public void write(DataOutput arg0) throws IOException {
// TODO Auto-generated method stub
}
public static final String[] REDIS_INSTANCES = { "p0", "p1", "p2", "p3",
"p4", "p6" };
public static Map<String, String> transformXmlToMap(String xml) {
Map<String, String> map = new HashMap<String, String>();
try {
String[] tokens = xml.trim().substring(5, xml.trim().length() - 3).split("\"");
for (int i = 0; i < tokens.length - 1; i += 2) {
String key = tokens[i].trim();
String val = tokens[i + 1];
map.put(key.substring(0, key.length() - 1), val);
}
} catch (StringIndexOutOfBoundsException e) {
System.err.println(xml);
}
return map;
}
public static class MedianStdDevMapper extends Mapper <Object,Text,IntWritable,IntWritable>{
private IntWritable outHour = new IntWritable();
private IntWritable outCommentLength = new IntWritable();
private final static SimpleDateFormat frmt= new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
Map <String,String> parsed=transformXmlToMap(value.toString());
String strDate = parsed.get("CreationDate");
String text = parsed.get("Text");
Date creationDate = null;
try {
creationDate = frmt.parse(strDate);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
outHour.set(creationDate.getHours());
outCommentLength.set(text.length());
context.write(outHour,outCommentLength);
}
}
public static class MedianStdDevReducer extends Reducer <IntWritable,IntWritable,IntWritable,MedianStdDevTuple>{
private MedianStdDevTuple result = new MedianStdDevTuple();
private ArrayList <Float> commentLengths = new ArrayList <Float>();
public void reduce (IntWritable key,Iterable <IntWritable> values,Context context) throws IOException,InterruptedException{
float sum= 0 ;
float count = 0;
commentLengths.clear();
result.setStdDev(0);
for(IntWritable val:values){
commentLengths.add((float) val.get());
sum+=val.get();
++count;
}
Collections.sort(commentLengths);///按评论长度二次排序
if(count % 2==0){//判断总数奇偶性,偶数需要对中间两个中位数取平均值
result.setMedian((commentLengths.get((int)count/2-1)+commentLengths.get((int)count/2))/2.0f);
}
else{
result.setMedian(commentLengths.get((int)count/2));
}
float mean=sum/count;
float sumOfSquares = 0.0f;
for(Float f: commentLengths){///标准差计算公式
sumOfSquares+=(f-mean)*(f-mean);
}
result.setStdDev((float)Math.sqrt(sumOfSquares/(count-1)));
context.write(key, result);
}
}
/*============================================================================================================*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String[] otherargs = new String[]{"hdfs://master:9000/input/Comments.xml", "hdfs://master:9000/output3"};
if (otherargs.length != 2) {
System.err.println("Usage: mergesort <in> <out>");
System.exit(2);
}
Job job = Job.getInstance();
job.setJarByClass(MedianStdDevTuple.class);
job.setMapperClass(MedianStdDevMapper.class);
job.setReducerClass(MedianStdDevReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherargs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherargs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
代码解释
