Advertisement

MapReduce重点知识

阅读量:

MapReduce

提示:建议您首先阅读与Yarn相关的知识yarn知识,以深入理解其核心概念和应用方法


文章目录

  • MapReduce

  • 前言

  • 一、MapReduce 优缺点

      1. 优点
      1. 缺点
  • 二 MapReduce核心思想

      1. MapReduce流程
      1. 序列化类型
  • 三、MapReduce API操作

    • 1.环境搭建
  • Hadoop序列化技术

  • MapReduce框架及其工作原理

    1. 切分机制
      1. Shuffle过程(重点考察)
      1. Map任务的工作流程
      1. Reduce任务的作用
    • 五 Join 应用

      • 1)需求
      • 2)代码实现
  • 六 数据清洗(ETL)

  • 总结

  • 输入数据接口采用InputFormat格式

  • 逻辑处理模块由Mapper负责

  • 数据分区采用Partitioner组件完成

  • 支持Comparable接口的排序功能

  • 合并操作由Combiner组件实现

  • 数据缩减操作由Reducer组件执行

  • 输出数据遵循OutputFormat规范


前言

MapReduce 是一个分布式运算工具框架,在 Hadoop 环境中被广泛应用于开发数据分析应用。其主要功能是将用户的业务逻辑代码与系统自带组件集成在一起,在 Hadoop 集群上构建并运行完整的分布式计算流程。

一、MapReduce 优缺点

1) 优点

(1)MapReduce 简单易学
该算法只需实现几个核心接口即可完成一个高效的分布式计算框架,在众多廉价个人计算机上即可运行。与编写传统串行程序相比,在这种分布式架构下实现同样功能的过程非常直观简便。正是由于这一特点使得 MapReduce 成为分布式计算领域的热门算法之一。
(2)具有良好的扩展能力
当现有计算资源无法满足需求时, 通过简单的增加节点数量即可显著提升系统的计算性能, 使资源利用率得到最大化利用。
(3)具备高度容错性
该算法的设计初衷就是能够在廉价的个人计算机上实现, 因此必须具备极高的容错能力。即使其中某一台节点出现故障, 系统也能自动将对应的计算任务转移到其他节点继续执行, 这一过程完全由 Hadoop 自行管理, 不需要人工干预即可完成任务重分配机制。
(4)适用于大规模离线数据处理
该系统能够支持数千台服务器同时工作, 提供强大的数据处理能力。

2) 缺点

(1)不擅长实时计算MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。
(2)不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。
这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
(3)不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,
MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,
会造成大量的磁盘 IO,导致性能非常的低下。

二 MapReduce核心思想

1) MapReduce流程

在这里插入图片描述

(1)分布式的运算程序往往需要分成至少 2 个阶段。
(2)第一个阶段的 MapTask 并发实例,完全并行运行,互不相干。
(3)第二个阶段的 ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段
的所有 MapTask 并发实例的输出。
(4)MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业
务逻辑非常复杂,那就只能多个 MapReduce 程序,串行运行。

通过反编译工具对源码进行反编译后发现,在WordCount案例中包含了Map类、Reduce类以及相关的驱动类;此外的数据类型采用了Hadoop自身封装的序列化类型。

用户的程序被划分为三个主要组件:Map、Reduce 和 Driver模块。

2) 序列化类型

在这里插入图片描述

三、MapReduce API操作

1.环境搭建

需求
在给定的文本文件中统计输出每一个单词出现的总次数
(1)输入数据

hadoop
lalalal
ssssss
dieawedas
hadooop
hadoop

(2)期望输出数据
hadoop 2
lalalal 1
ssssss 1
dieawedas 1
hadooop 1

(3) 创建一个maven 并加入下列maven

复制代码
     <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.9.1</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.30</version>
        </dependency>

项目指定的src/main/resources目录中创建一个新的文件夹或存储位置,并命名为log4j.properties,并向其中配置内容

复制代码
    log4j.rootLogger=INFO, stdout 
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n 
    log4j.appender.logfile=org.apache.log4j.FileAppender 
    log4j.appender.logfile.File=target/spring.log 
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout 
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

(5)编写 Mapper 类

复制代码
    package com.example.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 1 获取一行
        String line = value.toString();
    // 2 切割
        String[] words = line.split(" ");
    // 3 输出
        for (String word : words) {
            k.set(word);
            context.write(k, v);
        }
    }
    }

** (6)编写 Reducer 类**

复制代码
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    public class WordCountReducer extends Reducer<Text, IntWritable, Text, 
    IntWritable>{
    int sum;
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,Context 
    context) throws IOException, InterruptedException {
    // 1 累加求和
    sum = 0;
    for (IntWritable count : values) {
    sum += count.get();
    }
    // 2 输出
     v.set(sum);
    context.write(key,v);
    } }

(7)编写 Driver 驱动类

复制代码
    package com.example.mapreduce;
    
    import java.io.IOException;
    
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    // 1 获取配置信息以及获取 job 对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
    // 2 关联本 Driver 程序的 jar
        job.setJarByClass(WordCountDriver.class);
    // 3 关联 Mapper 和 Reducer 的 jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
    // 4 设置 Mapper 输出的 kv 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
    // 5 设置最终输出 kv 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
    // 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 7 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
    }

打成jar包去linux上运行

在这里插入图片描述
复制代码
    hadoop jar 包名 /work /outfile

按照yarn-size-xml配置文件进行设置,在此基础之上详细列出实现HA高可用性的相关参数设置。对于分布式但非HA场景下的参数调整,请自行进行相应的修改。

复制代码
    <property>
    <!-- RM HTTP访问地址 默认:${yarn.resourcemanager.hostname}:8088-->
    <name>yarn.resourcemanager.webapp.address.rm2</name>
    <value>hadoop2:8088</value>
    </property>
    <property>
    <!-- RM HTTP访问地址 默认:${yarn.resourcemanager.hostname}:8088-->
    <name>yarn.resourcemanager.webapp.address.rm3</name>
    <value>hadoop3:8088</value>
    </property>

mapreduce-size.xml

复制代码
    <property>
                <name>yarn.application.classpath</name>
                <value>
                #hadoop classpath 下的值
    /exper/ha/hadoop-3.1.3/etc/hadoop,/exper/ha/hadoop-3.1.3/share/hadoop/common/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/common/*,/exper/ha/hadoop-3.1.3/share/hadoop/hdfs,/exper/ha/hadoop-3.1.3/share/hadoop/hdfs/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/hdfs/*,/exper/ha/hadoop-3.1.3/share/hadoop/mapreduce/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/mapreduce/*,/exper/ha/hadoop-3.1.3/share/hadoop/yarn,/exper/ha/hadoop-3.1.3/share/hadoop/yarn/lib/*,/exper/ha/hadoop-3.1.3/share/hadoop/yarn/*
    
                </value>
        </property>
    
        <property>
                <name>mapreduce.application.classpath</name>
                <value>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*:$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*</value>
         </property>
        <property>
                <name>yarn.app.mapreduce.am.env</name>
                <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
        </property>
        <property>
                <name>mapreduce.map.env</name>
                <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
        </property>
        <property>
                <name>mapreduce.reduce.env</name>
                <value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
        </property>

如果代码出错可以试试官网的示范

复制代码
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
    
      public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
      }
    
      public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();
    
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
      }
    
      public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }
复制代码
     bin/hadoop jar jar包地址 WordCount /user/joe/wordcount/input /user/joe/wordcount/output

注意:jdk要和hadoop里的jdk相同

三 Hadoop 序列化

1)什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁
盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换
成内存中的对象。
2)为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能
由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”
对象,可以将“活的”对象发送到远程计算机。
3)为什么不用 Java 的序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带
很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,
Hadoop 自己开发了一套序列化机制(Writable)。
4)Hadoop 序列化特点:
(1)紧凑 :高效使用存储空间。
(2)快速:读写数据的额外开销小。
(3)互操作:支持多语言的交互
简单说就是当我们需要处理的数据写成一个bean对象时,我们的类型和hadoop的类型不同,所以hadoop有了一套序列化的过程。

复制代码
    import org.apache.hadoop.io.Writable;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    //1 继承 Writable 接口
    public class FlowBean implements Writable {
     private long upFlow; //上行流量
     private long downFlow; //下行流量
     private long sumFlow; //总流量
     //2 提供无参构造
     public FlowBean() {
     }
     //3 提供三个参数的 getter 和 setter 方法
     public long getUpFlow() {
     return upFlow;
     }
     public void setUpFlow(long upFlow) {
     this.upFlow = upFlow;
     }
     public long getDownFlow() {
     return downFlow;
     }
     public void setDownFlow(long downFlow) {
     this.downFlow = downFlow;
     }
     public long getSumFlow() {
     return sumFlow;
      }
     public void setSumFlow(long sumFlow) {
     this.sumFlow = sumFlow;
     }
     public void setSumFlow() {
     this.sumFlow = this.upFlow + this.downFlow;
     }
     //4 实现序列化和反序列化方法,注意顺序一定要保持一致
     @Override
     public void write(DataOutput dataOutput) throws IOException {
     dataOutput.writeLong(upFlow);
     dataOutput.writeLong(downFlow);
     dataOutput.writeLong(sumFlow);
     }
     @Override
     public void readFields(DataInput dataInput) throws IOException {
     this.upFlow = dataInput.readLong();
     this.downFlow = dataInput.readLong();
     this.sumFlow = dataInput.readLong();
     }
     //5 重写 ToString
     @Override
     public String toString() {
     return upFlow + "\t" + downFlow + "\t" + sumFlow;
     } }

四 MapReduce 框架原理

1)切块原理

在这里插入图片描述

Block 是 HDFS 在物理层面上将数据分割成独立的小块进行管理与存储的基本单位。HDFS 将其存储的基本单位定义为一个 Block。从技术角度来看,
Block 的概念使得大规模分布式系统能够实现高效的数据管理和分布式读取操作。
在 MapReduce 框架中,
MapTask 的分配依据的是 DataChunk 的划分结果,
每一个 DataChunk 对应一个独立的 MapTask 作业。
值得注意的是,
DefaultTextInputFormat 切片策略会按照文件规划的方式自动划分 DataChunk,
即使输入文件非常小也会被单独处理为一个切片。
这种设计虽然简化了任务调度过程,
但在实际应用中会导致大量小规模的任务产生,
从而显著增加任务执行的数量和资源消耗,
最终影响系统的整体性能表现。

(a)评估虚拟存储文件的容量是否超过setMaxInputSplitSize值?如果达到或超过,则单独创建一个切片。
(b)如果不满足,则将当前文件与下一个虚拟存储文件被合并到同一个切片中。
源码流程:

在这里插入图片描述

2)Shuffle 机制(重点面试)

Map 方法之后,Reduce 方法之前的数据处理过程称之为 Shuffle。

在这里插入图片描述

1) 分区机制
默认情况下,默认分区机制是基于每个键的hashCode值与ReduceTasks数量取模来分配数据的。普通用户无法自行决定数据将被存储在哪些分区内。
因此我们可以开发自定义的分区策略。(1)我们建议创建一个继承自Partitioner的新类,并重新实现其核心逻辑

复制代码
    public class CustomPartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
    // 控制分区代码逻辑
    … …
    //partition返回数字几就分到哪个分区
    return partition;
     }
    }

(2)在Job驱动中,设置自定义Partitioner

复制代码
    job.setPartitionerClass(CustomPartitioner.class);

在设置自定义Partition之后,需根据自定义Partitioner的逻辑来配置对应数量的ReduceTask

复制代码
    job.setNumReduceTasks(5);

区 分 注意

WritableComparable 排序

复制代码
    package com.atguigu.mapreduce.partitionercompable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {
     @Override
     public int getPartition(FlowBean flowBean, Text text, int numPartitions) 
    {
     //获取手机号前三位
     String phone = text.toString();
     String prePhone = phone.substring(0, 3);
     //定义一个分区号变量 partition,根据 prePhone 设置分区号
     int partition;
     if("136".equals(prePhone)){
     partition = 0;
     }else if("137".equals(prePhone)){
     partition = 1;
     }else if("138".equals(prePhone)){
     partition = 2;
     }else if("139".equals(prePhone)){
     partition = 3;
     }else {
     partition = 4;
     }
     //最后返回分区号 partition
     return partition;
     } }

(2)在驱动类中添加分区类
// 设置自定义分区器

复制代码
    job.setPartitionerClass(ProvincePartitioner2.class);

3) Combiner合并
(1)在MapReduce程序中,Combiner是一种独立于Mapper和Reducer的组件。
(2)其父类即为Reducer组件。
(3)二者的区别主要体现在运行位置上。
具体来说,在每个MapTask执行时的节点上运行。
(4)其作用在于对每个MapTask产生的中间结果进行本地汇总处理,并以此减少数据在网络之间的传输量。
(5)建议自定义一个自实现的Combiner类,并使其继承自Reducer类,并重新实现其中的Reduce方法。

复制代码
    public class WordCountCombiner extends Reducer<Text, IntWritable, Text, 
    IntWritable> {
     private IntWritable outV = new IntWritable();
     @Override
     protected void reduce(Text key, Iterable<IntWritable> values, Context 
    context) throws IOException, InterruptedException {
     int sum = 0;
     for (IntWritable value : values) {
     sum += value.get();
     }
     
     outV.set(sum);
     
     context.write(key,outV);
     } }

(b)在 Job 驱动类中设置:

复制代码
    job.setCombinerClass(WordCountCombiner.class);

由于 WordcountReducer和Combiner基本上相同的原因,因此可以直接使用WordcountReducer进行处理

复制代码
    job.setCombinerClass(WordCountReducer.class);

(3)自定义一个 LogOutputFormat 类

复制代码
    package com.atguigu.mapreduce.outputformat;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> 
    {
     @Override
     public RecordWriter<Text, NullWritable> 
    getRecordWriter(TaskAttemptContext job) throws IOException, 
    InterruptedException {
     //创建一个自定义的 RecordWriter 返回
     LogRecordWriter logRecordWriter = new LogRecordWriter(job);
     return logRecordWriter;
    
    
     } }

(4)编写 LogRecordWriter 类

复制代码
    package com.atguigu.mapreduce.outputformat;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import java.io.IOException;
    public class LogRecordWriter extends RecordWriter<Text, NullWritable> {
     private FSDataOutputStream atguiguOut;
     private FSDataOutputStream otherOut;
     public LogRecordWriter(TaskAttemptContext job) {
     try {
     //获取文件系统对象
     FileSystem fs = FileSystem.get(job.getConfiguration());
     //用文件系统对象创建两个输出流对应不同的目录
     atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));
     otherOut = fs.create(new Path("d:/hadoop/other.log"));
     } catch (IOException e) {
     e.printStackTrace();
     }
     }
     @Override
     public void write(Text key, NullWritable value) throws IOException, 
    InterruptedException {
     String log = key.toString();
     //根据一行的 log 数据是否包含 atguigu,判断两条输出流输出的内容
     if (log.contains("atguigu")) {
     atguiguOut.writeBytes(log + "\n");
     } else {
     otherOut.writeBytes(log + "\n");
     }
     }
     @Override
     public void close(TaskAttemptContext context) throws IOException, 
    InterruptedException {
     //关流
     IOUtils.closeStream(atguiguOut);
     IOUtils.closeStream(otherOut);
     } }

(5)编写 LogDriver 类

复制代码
    package com.atguigu.mapreduce.outputformat;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    public class LogDriver {
     public static void main(String[] args) throws IOException, 
    ClassNotFoundException, InterruptedException {
     Configuration conf = new Configuration();
     Job job = Job.getInstance(conf);
     job.setJarByClass(LogDriver.class);
     job.setMapperClass(LogMapper.class);
     job.setReducerClass(LogReducer.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(NullWritable.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(NullWritable.class);
     //设置自定义的 outputformat
     job.setOutputFormatClass(LogOutputFormat.class);
     FileInputFormat.setInputPaths(job, new Path("D:\ input"));
     // 虽 然 我 们 自 定 义 了 outputformat , 但 是 因 为 我 们 的 outputformat 继承自
    fileoutputformat
     //而 fileoutputformat 要输出一个_SUCCESS 文件,所以在这还得指定一个输出目录
     FileOutputFormat.setOutputPath(job, new Path("D:\ logoutput"));
     boolean b = job.waitForCompletion(true);
     System.exit(b ? 0 : 1);
     } }

3)MapTask工作机制

在这里插入图片描述

(1)读取阶段:MapTask通过指定的InputFormat获取记录读取器,并从输入的InputSplit中按分区读取一系列键值对。
(2)映射阶段:该节点主要负责将读取到的键值对分配给用户定义映射函数处理,并生成一系列新的键值对。
(3)归并收集阶段:在用户定义映射函数处理完成后,默认会调用OutputCollector.collect()方法输出结果。在该操作内部会将生成的数据按照分区编号进行划分(调用Partitioner进行分区),并将这些数据存储在一个环形内存缓冲区中。
(4)溢出阶段:即"溢出"操作,在环形缓冲区满时会触发数据持久化操作。具体流程如下:
步骤1:采用快速排序算法对缓存区内的数据进行排序,默认先按照分区编号Partition排序后按照键值key排序的方式排列。
步骤2:按照分区编号从小到大的顺序依次将每个分区中的数据持久化到任务工作目录下的临时文件output/spillN.out(N表示当前溢出次数)。如果系统配置有Combiner参数,则会在持久化前对每个分区的数据执行一次归并操作。
步骤3:将各分区的数据元信息存储到内存中的索引数据结构SpillRecord中。其中每个分区的数据元信息包括以下三部分:

  1. 临时文件中的偏移量信息;
  2. 压缩前原始数据大小;
  3. 压缩后压缩后大小。
    如果当前内存中的索引大小超过1MB,则会将内存索引持久化存储到文件output/spillN.out.index中。
    (5)合并阶段:当所有Map任务完成并发送所有临时文件到主节点后,默认会执行一次全局合并操作以确保最终只会生成一个完整的文件。

4)ReduceTask

在这里插入图片描述

(1)Copy 阶段

(2)Sort 阶段

(3)Reduce 阶段

五 Join 应用

1)需求

在这里插入图片描述

2)代码实现

(1)创建商品和订单合并后的 TableBean 类

复制代码
    package com.atguigu.mapreduce.reducejoin;
    import org.apache.hadoop.io.Writable;
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    public class TableBean implements Writable {
     private String id; //订单 id
     private String pid; //产品 id
     private int amount; //产品数量
      private String pname; //产品名称
     private String flag; //判断是 order 表还是 pd 表的标志字段
     public TableBean() {
     }
     public String getId() {
     return id;
     }
     public void setId(String id) {
     this.id = id;
     }
     public String getPid() {
     return pid;
     }
     public void setPid(String pid) {
     this.pid = pid;
     }
     public int getAmount() {
     return amount;
     }
     public void setAmount(int amount) {
     this.amount = amount;
     }
     public String getPname() {
     return pname;
     }
     public void setPname(String pname) {
     this.pname = pname;
     }
     public String getFlag() {
     return flag;
     }
     public void setFlag(String flag) {
     this.flag = flag;
     }
     @Override
     public String toString() {
     return id + "\t" + pname + "\t" + amount;
     }
     @Override
     public void write(DataOutput out) throws IOException {
     out.writeUTF(id);
     out.writeUTF(pid);
     out.writeInt(amount);
     out.writeUTF(pname);
     out.writeUTF(flag);
     }
     @Override
     public void readFields(DataInput in) throws IOException {
     this.id = in.readUTF();
     this.pid = in.readUTF();
     this.amount = in.readInt();
     this.pname = in.readUTF();
     this.flag = in.readUTF();
     } }

(2)编写 TableMapper 类

复制代码
    package com.atguigu.mapreduce.reducejoin;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import java.io.IOException;
    public class TableMapper extends Mapper<LongWritable,Text,Text,TableBean> 
    {
     private String filename;
     private Text outK = new Text();
     private TableBean outV = new TableBean();
     @Override
     protected void setup(Context context) throws IOException, 
    InterruptedException {
     //获取对应文件名称
     InputSplit split = context.getInputSplit();
     FileSplit fileSplit = (FileSplit) split;
     filename = fileSplit.getPath().getName();
     }
     @Override
     protected void map(LongWritable key, Text value, Context context) 
    throws IOException, InterruptedException {
     //获取一行
     String line = value.toString();
     //判断是哪个文件,然后针对文件进行不同的操作
     if(filename.contains("order")){ //订单表的处理
     String[] split = line.split("\t");
     //封装 outK
     outK.set(split[1]);
     //封装 outV
     outV.setId(split[0]);
     outV.setPid(split[1]);
     outV.setAmount(Integer.parseInt(split[2]));
     outV.setPname("");
     outV.setFlag("order");
     }else { //商品表的处理
     String[] split = line.split("\t");
     //封装 outK
     outK.set(split[0]);
     //封装 outV
     outV.setId("");
     outV.setPid(split[0]);
     outV.setAmount(0);
     outV.setPname(split[1]);
     outV.setFlag("pd");
     }
     //写出 KV
     context.write(outK,outV);
     } }

(3)编写 TableReducer 类

复制代码
    package com.atguigu.mapreduce.reducejoin;
    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException;
    import java.lang.reflect.InvocationTargetException;
    import java.util.ArrayList;
    public class TableReducer extends Reducer<Text,TableBean,TableBean, 
    NullWritable> {
     @Override
     protected void reduce(Text key, Iterable<TableBean> values, Context 
    context) throws IOException, InterruptedException {
     ArrayList<TableBean> orderBeans = new ArrayList<>();
     TableBean pdBean = new TableBean();
     for (TableBean value : values) {
     //判断数据来自哪个表
     if("order".equals(value.getFlag())){ //订单表
     //创建一个临时 TableBean 对象接收 value
     TableBean tmpOrderBean = new TableBean();
     try {
     BeanUtils.copyProperties(tmpOrderBean,value);
     } catch (IllegalAccessException e) {
     e.printStackTrace();
     } catch (InvocationTargetException e) {
     e.printStackTrace();
     }
     //将临时 TableBean 对象添加到集合 orderBeans
     orderBeans.add(tmpOrderBean);
     }else { //商品表
     try {
     BeanUtils.copyProperties(pdBean,value);
     } catch (IllegalAccessException e) {
     e.printStackTrace();
     } catch (InvocationTargetException e) {
     e.printStackTrace();
     }
     }
     }
     //遍历集合 orderBeans,替换掉每个 orderBean 的 pid 为 pname,然后写出
     for (TableBean orderBean : orderBeans) {
     orderBean.setPname(pdBean.getPname());
     //写出修改后的 orderBean 对象
     context.write(orderBean,NullWritable.get());
     }
     } }

(4)编写 TableDriver 类

复制代码
    package com.atguigu.mapreduce.reducejoin;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException;
    public class TableDriver {
     public static void main(String[] args) throws IOException, 
    ClassNotFoundException, InterruptedException {
     Job job = Job.getInstance(new Configuration());
     job.setJarByClass(TableDriver.class);
     job.setMapperClass(TableMapper.class);
     job.setReducerClass(TableReducer.class);
     job.setMapOutputKeyClass(Text.class);
     job.setMapOutputValueClass(TableBean.class);
     job.setOutputKeyClass(TableBean.class);
     job.setOutputValueClass(NullWritable.class);
     FileInputFormat.setInputPaths(job, new Path("D:\ input"));
     FileOutputFormat.setOutputPath(job, new Path("D:\ output"));
     boolean b = job.waitForCompletion(true);
     System.exit(b ? 0 : 1);
     } }

六 数据清洗(ETL)

ETL作为Extract-Transform-Load技术的简称,在数据分析流程中被广泛采用以实现对原始数据到目标环境的完整转换过程。该术语常应用于数据分析领域,并非局限于特定的数据存储设施。在大数据处理系统中,在启动核心业务流程MapReduce之前通常会部署专门的数据清洗组件以完成剔除不符合条件记录的任务。值得注意的是,在这一阶段通常只需部署Mapper组件而不必涉及Reducer部分

复制代码
    package com.atguigu.mapreduce.weblog;
    import java.io.IOException;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    public class WebLogMapper extends Mapper<LongWritable, Text, Text, 
    NullWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    // 1 获取 1 行数据
    String line = value.toString();
    // 2 解析日志
    boolean result = parseLog(line,context);
    // 3 日志不合法退出
    if (!result) {
    return;
    }
    // 4 日志合法就直接写出
    context.write(value, NullWritable.get());
    }
    // 2 封装解析日志的方法
    private boolean parseLog(String line, Context context) {
    // 1 截取
    String[] fields = line.split(" ");
    // 2 日志长度大于 11 的为合法
    if (fields.length > 11) {
    return true;
    }else {
    return false;
    } } }

(2)编写 WebLogDriver 类

复制代码
    package com.atguigu.mapreduce.weblog;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class WebLogDriver {
    public static void main(String[] args) throws Exception {
    // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
     args = new String[] { "D:/input/inputlog", "D:/output1" };
    // 1 获取 job 信息
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    // 2 加载 jar 包
    job.setJarByClass(LogDriver.class);
    // 3 关联 map
    job.setMapperClass(WebLogMapper.class);
    // 4 设置最终输出类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    // 设置 reducetask 个数为 0
    job.setNumReduceTasks(0);
    // 5 设置输入和输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 6 提交
     boolean b = job.waitForCompletion(true);
     System.exit(b ? 0 : 1);
    } }

总结

1)输入数据接口:InputFormat

(1)采用的是:TextInputFormat
(2)TextInputFormat 的工作原理是逐行读取文本内容,并将每行的起始偏移量设为键值对中的键值,则每行的内容则设为对应的值返回。
(3)CombineTextInputFormat 可以将多个小文件合并成一个大文件进行分块处理。

2)逻辑处理接口:Mapper

用户根据业务需求实现其中三个方法:map() setup() cleanup ()

3)Partitioner 分区

(1)采用基于哈希值的HashPartitioner,默认实现将根据key的哈希值以及numReduces参数计算出一个合适的分区号;其核心逻辑体现在以下代码片段:key.hashCode() & Integer.MAX_VALUE % numReduces
(2)若业务方存在特殊的分区需求,则可自定义分区策略。

4)Comparable 排序

(1)当我们需要使用自定义对象作为键值进行输出时,则必须实现WritableComparable接口并重写其中的compareTo()方法。(2)部分排序操作涉及对每个最终文件内部数据进行有序排列。(3)全排序操作会对所有数据进行全局排列,并通常仅执行一次归约操作。(4)二次排序基于两个不同的条件来进行比较或排列。

5)Combiner 合并

Combiner 合合并能够提升程序执行效率,并降低 IO 传输次数。在应用时需要注意的是要确保原有业务处理不受影响。

6)逻辑处理接口:Reducer

用户根据业务需求实现其中三个方法:reduce() setup() cleanup ()

7)输出数据接口:OutputFormat

(1)默认实现的类是 TextOutputFormat。其功能逻辑如下:该类将每一个 KV 对进行处理,并将处理结果写入目标文本文件中作为单独的一行内容。
(2)用户还可以自行定制 OutputFormat。

全部评论 (0)

还没有任何评论哟~