Advertisement

Introduction to Big Data Technologies

阅读量:

作者:禅与计算机程序设计艺术

1.简介

“Big data”这个词很容易被提起,但是它背后真正的含义却并不太清楚。究竟什么是“big data”,它为什么如此重要?许多公司、组织和政府都已经在实施大数据解决方案,但却始终没有得到广泛认同。那么,什么才是真正的“big data”呢?又有哪些技术可以帮助企业实现“big data”的价值?本文试图通过阐述这些问题,以及提供一些相关的知识点和案例,帮助读者更加全面地理解和掌握大数据技术。

2.基本概念与术语

2.1 大数据的定义

“Big data”的中文是指数据量巨大的海量数据集,从字面上看,“big”和“data”显然是相互关联的两个字。那么,到底什么是“big data”呢?“Big data”最早由麻省理工学院(MIT)的皮亚杰(Pajoe)教授在1996年提出,他把“big data”定义为三种类型的数据集合:

  1. Volume: 数据的体积足够大,能够容纳整个网络甚至互联网。
  2. Variety: 数据类型丰富多样,包括图像、文本、视频、音频等各种形式的数据。
  3. Velocity: 数据的产生速度快,实时生成、实时流动、高速移动。

随着互联网、通讯网络和社会媒体的发展,当代互联网产品和服务必将产生海量的数据,这些数据构成了“big data”。这种数据不仅数量巨大,而且随着时间推移呈现出越来越复杂、动态的特征,因而使得传统的数据处理方法根本无法有效应对这些数据。

2.2 Hadoop的定义

Hadoop是一个开源框架,它允许用户存储和处理超大型数据集。Hadoop通常用于大数据分析、实时计算和机器学习等领域。由于Hadoop框架高度可扩展性、灵活性及弹性,使其在云计算、大数据分析、金融交易、物联网等诸多领域均有广泛应用。

2.3 MapReduce的定义

MapReduce是一种编程模型,它可以将海量数据分割成多个块,并通过分布式计算集群并行处理。MapReduce分为两个阶段:映射阶段和减少阶段。

  1. 映射阶段:它将输入文件分割成一系列键值对。
  2. 归约阶段:它根据映射阶段的输出计算最终结果。

Apache Hadoop 使用 MapReduce 来处理大规模数据集。基于此模型的系统一般具有以下特征:

  1. 分布式计算:处理任务跨越多个节点,充分利用集群资源,提升性能。
  2. 内存计算:利用集群中各个节点的内存进行快速运算。
  3. 可靠性:系统具备容错能力,保证计算结果正确无误。
  4. 自动调度:系统能够自动管理资源分配和任务调度。

2.4 NoSQL数据库的定义

NoSQL是非关系型数据库的统称。它是一种新型的数据库技术,旨在通过扩展关系型数据库的限制来克服这一缺陷。NoSQL数据库中的数据模型和关系型数据库非常不同,采用不同的方式来表示数据。典型的NoSQL数据库有 Cassandra、MongoDB、Redis和Neo4j。

2.5 图计算的定义

图计算是一种用来处理复杂网络结构的数据分析方法。图结构通常包含节点(Vertex)和边(Edge),节点代表实体或对象,边代表关系或联系。图计算可以用来发现隐藏在数据之间的关系、预测潜在的风险以及寻找最佳路径。目前,图计算技术已成为大数据发展的一个重要趋势。

2.6 列式数据库的定义

列式数据库是一种分布式数据库技术,其中表按列存储,而不是按行存储。它可以降低数据重复和冗余,通过压缩数据节省空间。Apache Hive 是 Apache Hadoop 的一个子项目,它支持类似于 SQL 的查询语言。它的优点是能够快速执行复杂的分析,并且易于管理。Hive 可以连接多个数据源并提供统一的视图。

2.7 流计算的定义

流计算是一种新兴的分布式计算技术,它以连续的方式处理大量数据,包括事件、日志、以及实时数据。它可以实时处理来自各种数据源的海量数据,并在几秒内返回结果。Apache Storm 是 Apache Hadoop 的一个子项目,它是一个开源的分布式实时计算引擎,适用于实时分析、报告、实时监控等场景。Storm 提供了一个简单的编程模型,开发人员只需声明执行某种操作即可,而不需要考虑容错、恢复、或者容量规划。

2.8 OLAP的定义

Online Analytical Processing(OLAP)是一种多维分析处理方法,它主要用于处理大量、复杂的数据集。OLAP通过对数据进行切片、汇总、聚合等操作,生成多维数据集,用于进行复杂查询和分析。通过OLAP技术,企业就可以获取决策支持所需的信息,从而实现决策支撑。

3.核心算法原理与操作步骤

3.1 分布式文件系统HDFS

(1)分布式文件系统

HDFS(Hadoop Distributed File System)是Hadoop生态系统中关键组件之一。HDFS主要用于海量数据的存储和处理,主要由两部分组成:NameNode和DataNode。

  1. NameNode:NameNode负责管理文件系统的命名空间和数据块分布,同时协调客户端对文件的访问。NameNode的职责就是管理数据块的布局,它维护一个FSImage文件,该文件记录了HDFS文件系统的静态信息,如文件列表,目录结构等。NameNode还会向其它NameNode发送心跳信号,保持高可用状态。
  2. DataNode:DataNode负责存储实际的数据,每个DataNode都有一定数量的磁盘空间,用于存储HDFS中的数据块。

HDFS具备以下特性:

  1. 高容错性:HDFS采用主-备模式部署,具有良好的容错性。如果NameNode或者DataNode宕机,另一台服务器会接管相应工作。
  2. 高吞吐量:HDFS支持大量数据读写,因此可以作为高性能的分布式文件系统使用。
  3. 弹性扩缩容:HDFS可以方便的动态添加或删除DataNodes,方便集群容量的调整。

HDFS提供了以下命令,可以操作HDFS:

  1. fsck:检查文件系统的一致性。
  2. balancer:平衡HDFS集群。
  3. dfsadmin:管理HDFS。

(2)HDFS的访问模式

HDFS可以采用如下两种访问模式:

  1. 直接访问模式:访问HDFS的文件和目录,需要指定DataNode地址。例如:hdfs://host1:port/file_path
  2. 间接访问模式:访问HDFS的文件和目录,不需要指定DataNode地址。HDFS会根据文件的长度和块大小,决定访问哪些DataNode。

(3)HDFS的文件读写流程

当客户端要读取或者写入HDFS中的文件时,它首先会请求NameNode获取元数据信息,然后再根据元数据信息访问对应的DataNode读取文件。下图展示了读取文件的流程:

  1. Client端:客户端连接到NameNode,发送读文件或者写文件的请求。
  2. NameNode:NameNode从FSImage文件中读取元数据信息,并确定应该去哪个DataNode读取或者写入文件。
  3. Datanode:Datanode接收客户端请求,并从磁盘上读取数据或者写入数据。

(4)HDFS的读写效率

HDFS的文件读写效率非常高,因为它采用的是分块存放的方式,并且提供了快照机制,可以提高数据安全性。快照可以把当前HDFS的文件系统状态保存下来,这样就可以随时回滚到之前的某个版本。除此之外,HDFS采用了副本机制,默认情况下,每一个文件都会有3个副本。HDFS的读写效率可以达到PB级别。

(5)HDFS的访问控制

HDFS支持用户访问权限控制,可以通过以下命令设置:

  1. setfacl -m user:hadoop:rwx file_path:授权给用户hadoop对某个目录拥有读写权限;
  2. setfacl -x user:hadoop file_path:收回对某个目录的读写权限;
  3. getfacl file_path:查看某个目录的权限配置。

HDFS的权限控制粒度比较细,可以针对目录和文件分别进行权限控制。

3.2 分布式计算框架YARN

(1)YARN概述

YARN(Yet Another Resource Negotiator)是一个资源管理系统,它是一个集群资源管理器,能够为上层的应用程序和框架提供资源管理和调度服务。YARN构建在HDFS之上,与HDFS一样,也有一个NameNode和DataNodes。但是,YARN和HDFS之间存在一些差别:

  1. YARN中的ResourceManager:它是资源管理器,负责整个集群资源的管理和分配。ResourceManager的职责包括:

    • 对应用提交的要求进行优先级排序;
    • 针对队列管理系统资源;
    • 为应用分配可用资源;
    • 监视集群上应用程序的运行状态;
    • 管理生命周期管理、安全性等方面的功能。
  2. NodeManager:每个结点(NodeManager)是一个单独的进程,它运行在每个结点上,负责管理和监视所在结点上的资源使用情况。NodeManager的职责包括:

    • 启动和停止容器;
    • 监视和管理容器的资源使用情况;
    • 将运行状况报告给ResourceManager;
    • 将请求发送给ApplicationMaster。

(2)YARN调度过程

YARN的调度过程由以下几个阶段组成:

  1. 申请资源阶段:RM向NM请求资源,NM向资源申请者提供所需资源,资源申请者将资源封装成Container。
  2. 容器分配阶段:将Container分配到RM上的不同NodeManager上。
  3. 初始化阶段:Container启动之后,NM向AM发送初始化指令。
  4. 启动任务阶段:AM启动所有的任务。
  5. 监控阶段:监控AM和NM的运行状态。

(3)YARN的HA架构

为了保证YARN集群的高可用性,可以部署多个NameNode和 ResourceManager,以及多个NodeManager。其中,Active NameNode和 Active ResourceManager为主备模式,Standby NameNode和 Standby ResourceManager为失效模式。

  1. Active NameNode:NameNode处于活动状态,提供NameNode服务,处理客户端读写请求,处理NameNode状态信息的更新等。
  2. Standby NameNode:NameNode处于备用状态,当主NameNode出现故障时,指向备用NameNode,以便提供NameNode服务。
  3. Active ResourceManager:ResourceManager处于活动状态,接受客户端的Job请求,向各个NodeManager分配Container,并对Container进行重启和失败处理等。
  4. Standby ResourceManager:ResourceManager处于备用状态,当主ResourceManager出现故障时,指向备用ResourceManager,以便提供ResourceManager服务。
  5. NodeManager:NodeManager是YARN集群的工作节点,承载着AM和Containers的资源管理工作。任何节点都可以作为NodeManager加入集群。

3.3 分布式计算框架Spark

(1)Spark概述

Spark是一个快速、可扩展、可弹性的大数据分析引擎,它支持Java、Scala、Python、R等多种编程语言。Spark是用Scala编写的,既支持批处理,也支持交互式查询。Spark可以利用磁盘持久化或内存中缓存的方式来提升查询的响应速度。

Spark的基本组件包括:

  1. Spark Core:Spark Core包含了共用的计算抽象API、序列化工具、DAG(有向无环图)优化、内存管理和分布式运行时等模块。
  2. Spark SQL:Spark SQL支持SQL语法,能够轻松地处理结构化数据。
  3. Spark Streaming:Spark Streaming支持流式数据处理,能够快速处理实时数据。
  4. MLib:MLib是Spark的机器学习库,支持各种机器学习算法。

Spark的三个主要特性:

  1. 快速处理能力:Spark能够快速处理大量的数据,支持TB级的数据量,每秒钟处理超过100亿条数据。
  2. 可扩展性:Spark支持弹性扩展,可以自动增加或减少集群的计算资源。
  3. 高容错性:Spark具有高容错性,可以在节点失败时自动切换。

(2)Spark驱动程序

Spark驱动程序是用户编写并提交到集群上执行的程序,它包含driver main()函数。Spark驱动程序包括以下几个部分:

  1. SparkSession:SparkSession是Spark程序的入口点,创建DataFrame,注册临时表等。
  2. Resilient Distributed Datasets (RDDs):RDD是Spark中不可变的、分区的数据集,表示离散的、并行的、元素集合。
  3. Transformations:transformations操作用于转换RDDs,生成新的RDDs。
  4. Actions:actions操作用于触发计算,并返回结果。

SparkContext是Spark驱动程序的入口点,它代表了Spark的上下文环境,包含Spark的一些配置和全局信息。

(3)Spark执行流程

Spark的执行流程可以分为以下四个阶段:

  1. 创建RDDs:通过外部数据源创建RDDs。
  2. 数据局部化(Locality Sensitive Hashing,LSH):Spark会尝试将数据集按照hash partition分区,以便尽可能本地化处理。
  3. 作业调度:Spark调度程序会根据RDD依赖关系计算每个RDD的分区位置。
  4. 执行任务:Spark执行程序将作业转换为运行任务,并在集群中执行它们。

(4)Spark调优参数

Spark的调优参数有很多,这里只是简单地介绍一下常用的参数。

  1. spark.executor.memory:设置每个Executor使用的内存大小。
  2. spark.num.executors:设置Spark集群中Executor的数量。
  3. spark.default.parallelism:设置每个Stage的默认分区数。
  4. spark.shuffle.partitions:设置RDD shuffle时使用的partition数目。
  5. spark.dynamicAllocation.enabled:设置是否开启Spark Executor动态分配功能。

4.具体代码实例及解释说明

4.1 分布式文件系统HDFS的代码示例

(1)下载并安装HDFS

复制代码
    sudo vi /etc/profile
    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
    source /etc/profile
    
    cd /opt/hadoop-<version>/bin
    ./start-dfs.sh
    ./start-yarn.sh
    
      
      
      
      
      
      
    
    代码解读

(2)创建一个文件夹并上传文件

使用SSH登录到主节点(NameNode)。创建一个文件夹,如 /user/test/ ,并切换到该目录:

复制代码
    mkdir /user/test/
    cd /user/test/
    
      
    
    代码解读

上传文件到HDFS中,可以使用 hdfs dfs -put 命令:

复制代码
    hdfs dfs -put <local_file> <hdfs_directory>
    
    
    代码解读

例如,上传当前目录下的所有文件到HDFS:

复制代码
    hdfs dfs -put. hdfs:///user/test
    
    
    代码解读

(3)下载HDFS文件到本地

下载HDFS文件到本地,可以使用 hdfs dfs -get 命令:

复制代码
    hdfs dfs -get <hdfs_file> <local_directory>
    
    
    代码解读

例如,下载 HDFS 中的 /user/test/input.txt 文件到本地:

复制代码
    hdfs dfs -get hdfs:///user/test/input.txt./
    
    
    代码解读

(4)查看HDFS文件属性

可以使用 hdfs dfs -stat 命令查看HDFS文件属性:

复制代码
    hdfs dfs -stat <hdfs_file>
    
    
    代码解读

例如,查看 /user/test/output.txt 文件的详细信息:

复制代码
    hdfs dfs -stat output.txt
    
    
    代码解读

(5)删除HDFS文件

可以使用 hdfs dfs -rm 命令删除HDFS文件:

复制代码
    hdfs dfs -rm <hdfs_file>
    
    
    代码解读

例如,删除 /user/test/output.txt 文件:

复制代码
    hdfs dfs -rm output.txt
    
    
    代码解读

(6)关闭HDFS集群

关闭HDFS集群需要先停止NameNode和DataNode,然后再停止YARN ResourceManager。

复制代码
    cd /opt/hadoop-<version>/sbin
    ./stop-dfs.sh
    ./stop-yarn.sh
    
      
      
    
    代码解读

4.2 分布式计算框架YARN的代码示例

(1)下载并安装YARN

复制代码
    sudo vi /etc/profile
    export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
    source /etc/profile
    
    cd /opt/hadoop-<version>/bin
    ./start-yarn.sh
    
      
      
      
      
      
    
    代码解读

(2)运行一个简单的WordCount示例

创建一个名为wordcount.py的文件,内容如下:

复制代码
    from operator import add
    import sys
    from pyspark import SparkConf, SparkContext
    
    if __name__ == "__main__":
    conf = SparkConf().setAppName("Word Count").setMaster("local[*]") # 配置Spark应用名称和运行模式
    sc = SparkContext(conf=conf)
    
    lines = sc.textFile(sys.argv[1]) # 从输入文件中读取文本
    words = lines.flatMap(lambda line: line.split(" ")) # 以空格为分隔符拆分单词
    wordCounts = words.map(lambda word: (word, 1)).reduceByKey(add) # 计数单词数量
    
    wordCounts.saveAsTextFile(sys.argv[2]) # 保存结果文件
    sc.stop() # 关闭Spark Context
    
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

然后,运行 WordCount 程序,注意把 <input_file><output_dir> 替换成实际的值:

复制代码
    spark-submit --master yarn \
      --deploy-mode client \
      --num-executors 2 \
      --executor-memory 2g \
      wordcount.py <input_file> <output_dir>
    
      
      
      
      
    
    代码解读

(3)查看YARN Web UI

YARN Web UI的默认端口号为8088,打开浏览器,输入http:// :8088,即可看到集群中所有节点的资源信息。

(4)关闭YARN集群

关闭YARN集群需要先停止ResourceManager,然后再停止NameNode和DataNode。

复制代码
    cd /opt/hadoop-<version>/sbin
    ./stop-yarn.sh
    
      
    
    代码解读

4.3 分布式计算框架Spark的代码示例

(1)下载并安装Spark

复制代码
    cat >> conf/slaves << EOF
    slave1
    slave2
    EOF
    
      
      
      
    
    代码解读

(2)运行一个简单的WordCount示例

创建一个名为wordcount.scala的文件,内容如下:

复制代码
    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount {
    
      def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Word Count")
     .setMaster("spark://localhost:7077") // 设置运行模式
    val sc = new SparkContext(conf)
    
    val input = "some text here" // 待统计文字
    val rdd = sc.parallelize(input.split("\ s+")) // 以空白字符为分隔符分割文字
    val counts = rdd.countByValue() // 统计次数
    
    for ((word, count) <- counts) {
      println("%s : %d".format(word, count)) // 打印结果
    }
    
    sc.stop() // 关闭Spark Context
      }
    }
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

编译并打包WordCount程序,创建运行脚本 run-wordcount.sh 内容如下:

复制代码
    #!/bin/bash
    
    $SPARK_HOME/bin/spark-submit \
      --class WordCount \
      --master spark://localhost:7077 \
      target/scala-*/wordcount_*.jar \
      some\ text\ here myresult.txt
    
      
      
      
      
      
      
    
    代码解读

在运行脚本中替换 $SPARK_HOME 为实际的Spark安装目录,并运行 run-wordcount.sh 脚本。

(3)查看Spark Web UI

Spark Web UI的默认端口号为8080,打开浏览器,输入http:// :8080,即可看到Spark应用程序的详细信息。

(4)关闭Spark集群

关闭Spark集群需要先停止所有Spark应用程序,然后再停止Spark Master。

复制代码
    ./sbin/stop-all.sh
    
    
    代码解读

全部评论 (0)

还没有任何评论哟~