An Introduction to Hadoop Streaming API in Big Data
作者:禅与计算机程序设计艺术
1.简介
Hadoop Streaming 是 Hadoop 的一个模块,在实际应用中能够支持用户在 Hadoop 环境下执行离线批处理任务或实时流处理任务。其基本工作流程是从标准输入获取数据进行加工后发送到标准输出。该组件的核心架构基于MapReduce模式,在实际应用中实现了高效的并行计算能力。
Hadoop Streaming的主要特点是简单性、可靠性和高效率;遵循MapReduce模型的并行计算模式能够有效提升数据处理的速度与准确性;然而,并没有提供与MapReduce或Spark类似的高级分析功能;因此,在实现这些高级功能时通常需要依赖额外组件的支持。
Big Data Analytics主要涉及使用海量数据执行复杂的分析与决策过程。因为处理能力不足, 传统数据仓库及分析工具难以应对这类庞大的数据量。而Hadoop在大数据领域发挥着越来越关键的作用, 它通过分布式计算与存储架构能够迅速响应海量数据, 并提供了丰富的工具与平台支持Hadoop的大数据分析。
HadoopStreamingAPI支持用户通过其提供的接口对Hadoop进行批量任务和流式任务的集成开发。使用Stream APIs来编写Java或Python程序,并在命令行界面环境中运行以实现功能。该API不仅能够开发相应的应用程序还能够灵活支持多种类型的作业包括批量任务和实时流式作业。
本文旨在为读者提供深入解析 Hadoop Streaming API 特性及其实现方法。我们的目标是协助读者更清晰地掌握 Hadoop Streaming API 的工作原理及其在大数据分析中的实际应用。
2.基本概念和术语
2.1 MapReduce 概念
MapReduce 是 Google 在2004年发明的一种基于分布式计算平台的编程范式。该技术通过将一个大规模的任务划分为多个阶段,并在每个阶段执行特定操作的过程来实现大规模数据处理。具体操作包括:将一个巨大的任务分解为许多小任务,并在每一个小任务中进行数据映射和分片存储等操作步骤。
- Map 函数通过将每份输入文件拆分为独立的键值对来操作;
- Shuffle 过程用于重新排序 map 产生的结果,并为合并相同键值的结果集提供基础;
- Reduce 函数能够计算出 shuffle 过程产生的所有键值对,并最终确定输出数据。
整个流程可以表示为以下的伪代码形式:
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);
// input and output paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
代码解读
这个例子阐述了WordCount作业遵循MapReduce规范的操作流程。第一步是需要配置必要的参数信息并初始化作业对象。随后,在随后的步骤中设置了jar文件以及输入与输出的路径,并初始化了Mapper类和Reducer类对象。最后,在最后一步指定了处理的数据类型与格式。
2.2 Hadoop Streaming 术语
2.2.1 InputSplit
在 Hadoop 系统中,InputSplit 被定义为划分数据集的基本单元。它指定了某个数据集中的一组子数据,并可通过指定的 InputFormat 获取所有 InputSplit 集合的信息。从功能角度来看,它只是一个简单的接口,在实际应用中却有多种具体的实现方式,在此基础之上,默认情况下最常用的方式就是 FileInputFormat 该接口实现了基于文件的方式来划分数据分片。
2.2.2 RecordReader
RecordReader 负责从输入源中读取记录,并将其反序列化为键值对;通常情况下,默认情况下或一般而言, key 会被视为记录的标识符, value 为记录的值;当所有输入数据被完全读取完成后, recordreader 将返回空值。
2.2.3 Mapper
作为 Hadoop Streaming 体系中的核心组件之一,Mapper 负责接收并处理来自 recordreader 的输入数据。通过 map 方法将这些输入转换为中间键值对(即 (k1, v1)),随后经由 Shuffle 过程传递给下一个阶段的任务处理环节。其通常采用基于函数式的编程范式,在此框架下定义了一个关键操作:void map(Object key, Object value, OutputCollector<K, V> collector, Reporter reporter)。该操作接受键-值对作为输入,并通过指定的收集器将结果返回给任务处理器以跟踪进度管理。其中 key 和 value 分别代表输入记录中的键与值,在此过程中由相应的收集器负责整合并输出最终结果
2.2.4 Partitioner
Partitioner 配置了数据在不同 mappers 之间的分布情况。该组件基于 org.apache.hadoop.mapred.JobConf 参数对象运行,并提供了一个名为 getNumPartitions() 的方法来获取总分区数。
2.2.5 Combiner
作为一个关键组件,在Hadoop Streaming生态系统中发挥着重要作用,Combiner会整合来自同一键值的输入数据,并产生一个中间结果,在将其发送给reducer前能够有效减少网络IO操作。Combiner采用了类似于mapper的函数式架构。
2.2.6 Shuffle
Shuffle 是 Hadoop Streaming 中的重要环节之一;该过程接收自 mapper 产生的中间结果,并同时将这些数据重新排列组合以生成最终输出;Shuffle 包括两个主要组成部分:第一阶段被称为 merge sort ,负责对数据进行排序与合并;第二阶段被称为 spill 到磁盘 ,用于处理超出内存容量的数据块;merge sort 是一种基于归并排序算法的过程;它负责对 mapper 生成的所有中间数据进行排序和合并处理;spill 到磁盘 则是一个流程,在内存不足以存储所有中间数据时会将多余的记录转移至磁盘
2.2.7 Sort
该方法是一种提升Hadoop Streaming性能的技术手段,在特定场景下Shuffle阶段的内存消耗可能导致效率下降因此Hadoop推荐将输入数据提前进行排序然后再传递给Shuffle机制其中Sort过程采用归并排序算法实现
2.2.8 Reducer
Reducer 是 Hadoop Streaming 的重要组成部分之一。它接收来自 shuffler 的中间结果,并整合相同键值的数据。Reducer 具有两种运行模式:合并模式与非合并模式。合并模式是一种性能优化策略,通过 combiner 进行数据预聚合,从而降低网络 I/O 开销。非合并模式是一种完整的 reducer ,它整合所有 mappers 的输入并生成单一输出。
2.2.9 JobConf
在Hadoop配置体系中,JobConf扮演着关键角色。它不仅整合了多种配置参数,并且在构建Job的过程中通常被传递进去作为必要的配置。其中包括了多种配置参数:如任务名称、输入路径以及输出路径等信息。
2.2.10 TaskAttemptContext
TaskAttemptContext充当了Hadoop系统中一个关键组件的角色;它负责提供一个操作环境;该环境包含了任务ID标识符、进度报告器以及状态报告器等多个要素。
3.MapReduce 相关原理及操作步骤
3.1 MapReduce 框架概览
基于 MapReduce 抽取了 Hadoop 的核心组件,并具体包括以下几个关键部分:Job Tracker、Task Tracker、HDFS 和 YARN 等功能模块。其中 Job Tracker 主要负责协调并管理整个集群的资源分配;负责完成各节点的具体计算作业的是 Task Tracker;而 HDFS 则负责存储与管理完整的数据集合;最后 YARN 则作为一个用于配置资源管理和调度的核心组件存在。
MapReduce采用了分块处理的方法来解决大规模的数据集问题。利用InputFormat将输入数据集切分为多个小数据块,并将其进一步分割为更细的小区域,这些区域分布在不同的计算节点上. MapReduce的运行流程如图所示:
在提交作业的过程中,客户端首先登录了Application Master,并向其请求资源分配。随后,在上传JAR包时,客户端不仅上传了必要的依赖项如map()和reduce()函数,并且还对这些函数进行了详细的注释说明。
当作业启动时, Job Tracker 根据 map() 和 reduce() 的逻辑计算每个分区的任务数量,并将任务提交给 Task Trackers. Job Tracker 根据集群的空闲资源状况,决定每个任务应运行的位置. Task Trackers 接受任务后启动相应进程,从 HDFS 下载文件并执行 map() 或 reduce() 操作. 当所有 map() 任务完成后,Job Tracker 发布 Combine 任务到各节点,用于合并 map() 的输出.完成所有 reduce() 任务后,Application Master 结束作业并通知用户作业完成.
在 happen when a node fails, the system automatically redirects failed tasks to available nodes using a failover mechanism.
3.2 MapReduce 编程模型
该MapReduce编程模型是用于在Hadoop平台中执行分布式应用的一种核心框架。该框架提供了相应的编程接口,使得开发者只需通过指定输入数据、输出结果以及对数据进行映射和计算操作即可构建并行化的分布式应用程序。其基本运行流程通常由四个明确的步骤构成。
- 数据分片:原始数据集被划分为多个数据块,并分别存储在不同的节点中。
- 映射:每个数据块会被转换为一系列键-值对。
- 排序和规约:如果存在相同的键-值对,则会合并为一个键-值对。
- 输出:生成最终的结果。
MapReduce编程模型支持开发人员通过定义Mapper函数和Reducer函数来进行数据处理。Mapper函数接收单个键-值对并生成零个或多个键-值对作为输出;Reducer函数会对Mapper产生的键-值对进行汇总并输出结果
MapReduce编程模型主要依靠其关键组件——数据分片、排序与归约等技术手段——来实现对系统可靠性和扩展性的增强。当单个节点出现故障时,该模型不会干扰整个集群运行。随着新增节点的引入,系统能够自动优化任务分配,从而实现对作业并行度的有效控制。
3.3 基本操作步骤
3.3.1 准备数据
通过按分布的方式将数据集放置于多台机器上,便于检索与访问.采用 HDFS(即 Hadoop Distributed File System)作为数据存储与处理的技术手段.
3.3.2 创建作业配置文件
生成一个 xml 参数文件,并包含必要的字段信息。这些字段包括作业名称字段名、输入路径字段名、输出路径字段名以及调用的映射类和约简类名称。
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>mapred.job.name</name>
<value>wordcount</value>
</property>
<!-- 输入路径 -->
<property>
<name>mapreduce.input.fileinputformat.inputdir</name>
<value>/user/hduser/wordcount</value>
</property>
<!-- 输出路径 -->
<property>
<name>mapreduce.output.fileoutputformat.outputdir</name>
<value>/user/hduser/result</value>
</property>
<!-- Mapper 和 Reducer 类所在的 Jar 文件路径 -->
<property>
<name>mapreduce.job.jar</name>
<value>./wordcount.jar</value>
</property>
<!-- 设置使用的 Mapper 类 -->
<property>
<name>mapreduce.mapper.class</name>
<value>org.mycompany.MyMapper</value>
</property>
<!-- 设置使用的 Reducer 类 -->
<property>
<name>mapreduce.reducer.class</name>
<value>org.mycompany.MyReducer</value>
</property>
</configuration>
代码解读
3.3.3 编写 Map 函数
创建一个继承自 org.apache.hadoop.mapred.Mapper 的 Java 类,并实现或覆盖其映射方法。
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString().toLowerCase();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
代码解读
该Map()方法接收一种由两个字段组成的键值对作为输入:第一个字段为LongWritable类型的数据类型变量(即长整型偏移量),第二个字段为Text类型(即文本数据)。该方法生成的结果同样是键值对形式:其中第一个字段为Text类型(即文本数据),表示词语信息;第二个字段为IntWritable类型(即整型词频)。
3.3.4 编写 Reduce 函数
实现一个继承自 org.apache.hadoop.mapred.Reducer 的Java类,并重写或覆盖该类的reduce方法。
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class MyReducer extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
代码解读
该 reduce 方法接收的是一个键值对序列的形式,在结果中生成一个新的键值对序列对象。其中每个计数器字段存储的是相应词语出现位置信息累积的结果集合。值得注意的是, 该方法返回的结果同样是键值对形式, 但与 map() 方法的行为存在差异, 其中每个计数器字段存储的是相应词语出现次数累计总和
3.3.5 执行作业
在命令行下执行以下命令:
$ hadoop jar /path/to/your/jarfile myjob.xml
代码解读
其中,“/path/to/your/jarfile”是您的JAR文件位置,“myjob.xml”是您最近创建的XML配置文件
作业完成后,您可以在指定的输出目录 (/user/hduser/result) 下查看结果。
4.代码实例与解释
4.1 Map 函数示例
假设有如下文本文件,名为 “words.txt”:
apple orange banana apple kiwi cherry
banana guava lemon lemon
代码解读
统计每个单词的出现频率可以通过MapReduce计算框架来完成。以下详细描述了Map阶段的具体实现:
import java.io.*;
import java.util.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class WordCountMap extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString().toLowerCase();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
代码解读
这个 Map 函数主要做两件事情:
- 从输入的 value (文本行) 中解析出每个单词
- 发射出 (单词, 1) 键值对
需要注意的是,在本实现中没有考虑空格、大小写等因素。此外, 还定义了一个 IntWritable 实例来表示单字词.
4.2 Reduce 函数示例
假设经过 Map 阶段得到的结果如下:
(apple, 1), (orange, 1), (banana, 2), (kiwi, 1), (cherry, 1),
(banana, 1), (guava, 1), (lemon, 2)
代码解读
统计每个单词出现的总次数时,请参考 ... 指令的具体应用方法
请通过 ... 命令来完成对各个单词频率值的累加计算过程
具体而言,在执行 ... 运算时,请确保输入数据中包含完整的单词字段信息
在实际操作中,请注意区分不同类型的文本数据字段
建议在初始化运算前对输入数据进行必要的预处理工作
import java.io.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
public class WordCountReduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
代码解读
该函数仅用于求和运算;它依次累计每个出现次数。
4.3 编译和打包
为执行MapReduce作业需将开发完成的Map函数、Reduce函数及其依赖项形成一个JAR文件以下指令用于编译及打包该JAR文件
$ javac -classpath.:/usr/lib/hadoop/* WordCount*.java
$ jar cf wc.jar *.class
代码解读
以上命令将生成一个包含三份Java文件的JAR文件。JAR文件名称为wc.jar。其中位于/usr/lib/hadoop/的Hadoop相关文件旨在支持运行MapReduce任务。
4.4 执行作业
以下命令用于运行 MapReduce 作业:
$ hadoop jar wc.jar WordCount /path/to/input/words.txt /path/to/output/directory
代码解读
上述指令明确设置了MapReduce任务名称设为"WordCount",并指定其输入数据位于"/path/to/input/words.txt"路径下,默认处理后结果将存放在"/path/to/output/directory"目录中。
运行成功后, 该作业的输出目录中将出现多个文件, 这些文件将包含每一行单词及其出现次数的信息
