Advertisement

6.Best Practices for Handling Big Data with Python in

阅读量:

作者:禅与计算机程序设计艺术

1.简介

企业中常用的数据分析方法之一是大数据处理。AWS 为我们提供了丰富的工具来支持用户的高效数据存储管理与分析工作。在 AWS 平台上操作大数据时会用到一些具体的方法与技巧。期待通过这些内容为读者提供有价值的信息和指导。

本文主要面向具备一定Python编程基础的专业人士阅读。如果你尚未掌握Python编程技能或对基于AWS的大数据处理尚不熟悉,则建议您先深入学习相关的基础知识后再继续阅读本文。

注意:以下所有的代码都是基于Python3+进行编写

2.基本概念术语说明

Amazon EC2(Elastic Cloud Compute)

EC2是亚马逊发布的一款弹性计算平台,在其平台中企业用户可以快速部署虚拟机或容器化应用,并通过该服务对应用程序和环境进行自动配置以确保良好的可扩展性和可靠性。

EC2支持多种硬件平台运行,涵盖基于Intel IA架构的标准服务器、用于高性能计算的ASIC设备以及GPU加速卡。该服务不仅提供全面可靠性保障,还具备灵活配置调整能力。

Amazon S3(Simple Storage Service)

S3是一种基于对象存储的技术方案...该方案通过提供高效的静态资源访问与持久化功能...向用户提供一种集成了先进技术架构的安全云存储解决方案...该平台支持多种数据类型的高效存取与管理...包括图像文件、视频流数据以及音频信息等关键业务数据的智能归档需求。

S3提供了多种存储方案以满足不同需求,在其服务中包含低延迟服务、高可靠性保障以及层次化架构设计。系统还具备数据冗余备份方案以保证数据安全,并提供异地复制功能以提升可用性的同时实现快速恢复能力。此外系统内置了版本管理模块以实现对不同版本的数据管理以及提供完善的日志记录和审核机制来保证数据质量与合规性要求。S3所提供的完整RESTful API接口集合为开发者提供了便捷的第三方应用集成接口从而降低了开发门槛并提升了系统的易用性

Apache Hadoop

该分布式系统框架由Apache基金会提供开放源代码支持,并采用Java语言构建而成;这些组件包括HDFS、MapReduce和YARN等核心功能模块,并被用于实现大规模数据处理任务;该系统架构具备高容错性、良好的可扩展性和高效的性能特征。

Hadoop与其他开源框架相比,在侧重于实时计算和离线数据处理方面具有显著优势而不包括批量处理任务

Apache Spark

Apache Spark被视为一款备受尊敬的开放源代码分布式计算框架。它提供了一系列强大的功能包如高级SQL、机器学习算法以及图形渲染工具等。这些功能使得Spark能够快速地处理海量数据并实现实时运算需求。与Hadoop相比,Spark同样采用Java虚拟机作为其运行基础,并通过Scala语言进行开发。

Spark基于强大的集群计算能力和卓越的计算效率来进行高效的数据分析,并且还支持内存中的高速交互式分析。

3.核心算法原理和具体操作步骤以及数学公式讲解

MapReduce

在Hadoop生态系统中,MapReduce被视为一种核心编程范式。它特别适用于大规模数据处理场景,并通过分布式计算框架实现高效的海量数据处理能力。其主要特点如下:

在分布式计算中:MapReduce模型中的各个计算节点之间不共享数据。每个节点仅处理自己本地的数据集的一小部分,在这种设计下便能够有效规避网络带宽带来的限制

分布式并行计算中,MapReduce模型将输入数据分割为多个独立的数据块,并对每个独立的数据块进行相应的处理,从而实现整个系统的高效运行

  1. 容错机制设计:当系统识别到某个节点发生故障时,在MapReduce模型中,并未导致整个系统停止运行;相反地, 该模型能够自动跳过已损坏的数据副本, 并重新分配剩余的任务处理工作

下面以WordCount示例进行说明:

假设有一个文本文件,其中包含大量单词.该文件的绝对路径是:s3://mybucket/inputfile

第一步, 生成MapReduce作业。我们应当生成一个名为wordcount的MapReduce作业, 并应包含两个类: Mapper类和Reducer类

复制代码
    import org.apache.hadoop.io.{LongWritable, Text}
    import org.apache.hadoop.mapreduce.Mapper
    import org.apache.hadoop.mapreduce.Reducer
    
    class Tokenizer extends Mapper[Object, Text, Text, LongWritable] {
      def map(key: Object, value: Text, context: Context): Unit = {
    val line = value.toString()
    for (token <- line.split("\ W+")) {
      if (token!= "") {
        context.write(new Text(token), new LongWritable(1))
      }
    }
      }
    }
    
    class SumReducer extends Reducer[Text, LongWritable, Text, LongWritable] {
      def reduce(key: Text, values: java.lang.Iterable[LongWritable],
             context: Reducer[Text, LongWritable, Text, LongWritable]#Context): Unit = {
    var sum = 0L
    for (value <- values) {
      sum += value.get()
    }
    context.write(key, new LongWritable(sum))
      }
    }
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

该文提到了一个自定义的映射器类——Tokenizer类;它通过按空格、制表符以及换行符等分隔符将输入文件进行切分,并记录这些单词及其出现次数到context中。

SumReducer类是一个定制化的Reducer类,在其功能中负责接收Mapper生成的输出数据,并对其中每个单词的数量进行汇总后写入HDFS。

在第二步中,我们需要执行MapReduce作业。为此我们可以调用job.waitForCompletion()这一函数来启动MapReduce作业。该函数会在任务完成时进行阻塞操作,并且在发生错误时会抛出异常信息。

第三阶段的任务是验证作业的正确性。我们可以通过命令行工具查看输出结果,并利用spark-shell或pyspark与HDFS建立关联以读取结果。

复制代码
    val wordCounts = sc.textFile("s3a://mybucket/output")
                     .flatMap(_.split("\t"))
                     .filter(_!= " ").map{case x => (x.substring(0, x.length-1), x.substring(x.length).toLong)}
    println(wordCounts.collect().mkString(", "))
    
      
      
      
    
    代码解读

以上代码负责处理hdfs上生成的文件,并将其转换为RDD对象。接着使用collect()函数整合结果后进行打印。

第四个步骤是退出客户端程序。最后一步是分别使用job.close()sc.stop()这两个方法来实现。这一操作的目的在于释放系统资源。

Spark SQL

Spark SQL是Spark的一个重要功能模块,在此平台中提供了强大的数据分析能力。该功能使用户能够利用SQL语言指令进行数据查询。 spark/sql作为一种高级的数据分析工具,不仅支持从hdfs中获取各种组织形式的数据,还能够将这些数据按照特定需求进行整理与分析. 其核心作用就是将散落于hdfs中的各种类型资源转化为统一的dataframe对象,从而便于后续的数据处理与建模工作.

Spark SQL具备处理复杂SQL查询的能力,并且能够高效运行。例如,在数据筛选操作中提供良好的性能,在数据汇总和关联操作中展现出卓越的效果,在数据运算和管理流程中均表现出较高的效率。

Spark SQL的语法类似于关系数据库的SQL语法。

复制代码
    // 将数据集保存为parquet文件
    df.write.mode("overwrite").format("parquet").save("s3a://mybucket/path/to/output/")
    
    // 从parquet文件中读取数据集
    val df = spark.read.format("parquet").load("s3a://mybucket/path/to/input/")
    
    // 对数据集进行过滤、聚合、排序
    val filteredDf = df.filter($"age" > 10).groupBy($"gender").agg(sum($"amount")).orderBy($"sum(amount)" desc)
    
      
      
      
      
      
      
      
    
    代码解读

上述代码包含了Spark SQL的主要功能模块。具体来说,这些模块包括将数据存储为Parquet格式(通过调用save函数)、从Parquet文件中获取数据集(使用readTable方法)、完成对数据集的筛选(通过applyFilter函数)、汇总统计(借助groupBy和agg函数)以及排序处理(采用sort函数)。

Kafka

基于分布式架构设计的Kafka是一种消息发布订阅平台。该平台支持高效的高吞吐量和极低延迟的消息传递能力。通过Spark Streaming技术能够实现对Kafka中数据的有效读取及实时分析功能。

Spark Streaming是一个小型批次处理框架;它能够读取Kafka中的数据,并将其组织为小批量块。

复制代码
    import org.apache.kafka.clients.consumer._
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test",
      "auto.offset.reset" -> "earliest"
    )
    
    val topics = Array("topic1", "topic2")
    
    val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
    
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
    
    stream.foreachRDD((rdd, time) => {
      // perform analysis on the RDD here
      println(s"========= $time =========")
      rdd.foreach(record => println(record.value()))
    })
    
    ssc.start()
    ssc.awaitTermination()
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

以上代码展示了如何运用Spark Streaming获取Kafka中的数据源,并实现了对Kafka实时数据的处理和分析。

Presto

Presto是一个基于开放源代码的分布式查询引擎。它能够高效地管理并分析规模庞大的多种数据集。

Presto采用了多样化的查询优化方案来增强其性能,在数据处理方面提供了详细的统计数据功能,并通过数据索引支持和智能查询规划来实现高效的业务处理。

复制代码
    SELECT * FROM hive.default.orders WHERE order_date >= DATEADD('day', -7, GETDATE())
    
    
    代码解读

以上代码展示了如何使用Presto查询Hive中的数据。

Hive

Hive是一个由Hadoop构建而成的数据库系统;它能够高效地存储大量结构化的和半结构化的数据;该系统不仅支持传统的SQL查询操作;还包括分组运算、关联操作、窗口函数以及事务处理等功能。

复制代码
    CREATE TABLE IF NOT EXISTS orders (order_id INT, customer VARCHAR, product VARCHAR, price DECIMAL(10, 2), quantity INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
    LOAD DATA INPATH's3n://mybucket/path/to/input' OVERWRITE INTO TABLE orders;
    
    SELECT customer, AVG(price*quantity) AS total_revenue
    FROM orders
    WHERE order_date >= DATEADD('day', -7, GETDATE())
    GROUP BY customer;
    
      
      
      
      
      
      
    
    代码解读

以上代码展示了如何使用Hive存储数据、加载数据、查询数据。

4.具体代码实例和解释说明

接下来, 我会展示几个详细代码片段来说明如何利用Python在AWS平台处理大数据进行说明

使用EC2快速搭建Spark集群

AWS包含一系列的产品与服务。这些资源允许用户迅速、经济地构建、管理、监控和运营大规模集群系统。在此基础上,我们采用官方文档指定的方法,在线部署一个Spark集群。

请首先访问AWS管理控制台并导航至EC2页面。单击“Launch Instance”按钮后,在选项中选择 Amazon Linux AMI 作为操作系统,并配置实例类型为 t2.micro 同时添加云存储设备。为了减少潜在的欺骗性宣传风险,请尽量避免订阅超过一年的AWS服务计划。

下一步,请单击进入配置实例详情界面。首先,请选择使用该虚拟专用网络(VPC),并设置相应的安全组。在存储管理模块中,请指定一块50GB的EBS外设存储介质作为系统盘。该系统盘将用于存储操作系统及相关应用数据。

在'Add tags'部分,在实例上标记标签以确保后续操作更加便捷。单击'Next: Configure Security Group'以进入下一步骤。在选定的安全组中设置一条规则允许TCP连接到端口22(SSH)。

单击"Review and Launch"按钮,在确认所有设置无误的情况下, 启动实例. 在随后弹出的对话框中, 选择密钥对用于配置, 并开始运行实例.

随后,在等待实例启动的过程中,请单击左侧导航栏中的'Network & Security'选项。接着,请点击'Security Groups'旁边的链接,并找到之前创建的安全组选项卡后,请单击右侧的'Edit Inbound Rules'选项卡处。然后,请选择一个空白的位置添加新的规则:选择'Custom TCP Rule'并指定SSH端口范围之后,请单击保存按钮完成配置

最后,打开实例的SSH终端,安装必要的依赖包。

复制代码
    sudo yum update -y && sudo yum install git python3 -y
    git clone https://github.com/apache/spark.git /opt/spark
    cd /opt/spark
    ./build/mvn -DskipTests clean package
    export PYSPARK_PYTHON=/usr/bin/python3
    ./bin/pyspark --packages org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION} \
              --conf "spark.jars.ivy=/root/.ivy2" \
              --master local[*] --driver-memory=1g --executor-memory=1g \
              --num-executors 2 --executor-cores 2
    
      
      
      
      
      
      
      
      
    
    代码解读

通过运行代码下载最新的Spark源码后进行编译完成后打包成可执行文件并设置环境变量参数使得系统能够正常识别和运行Spark组件。运行Spark shell客户端程序即可启动一个本地模式下的Spark会话并配置两个Executor节点和相应的计算资源以满足应用需求。其中--packages参数指定用于导入avro包的依赖项--master参数指定本地模式作为Master服务器地址而--num-executors和--executor-cores分别指定了这两个Executor节点的数量以及每个Executor所使用的核数以优化资源利用效率。

使用EMR快速搭建Spark集群

EMR(Elastic MapReduce)是一个基于云平台托管的Hadoop框架,具备极高的扩展性和自动调节资源的能力,能够根据不同的应用场景进行优化配置。它能够高效地构建、管理和监控运行多节点高性能计算集群,并实现资源按需弹性扩展,从而显著提升系统的性能和效率。

请登录AWS管理控制台,并导航至EMR服务页面。单击“开始使用”按钮,随后启动集群创建向导步骤。在步骤1中选择集群配置设置时,请指定版本标签。然后,在系统提示下输入emr-6.2.0(基于Hadoop 3.1.1与Spark 3.0.0的组合)。

在Step 2: Specify Cluster Details中,在集群名称字段填写集群名称,并在环境设置中选择"开发模式";在Step 3: Select IAM Role部分,请配置已配置的EKS(Elastic Kubernetes Service)或ECS(Elastic Container Service)IAM角色。

在步骤4中进行应用配置时,请根据需求选择需要安装的应用程序,并参考以下插件包进行操作。

  1. Hadoop
  2. Hue
  3. Livy

在步骤5(Step 5)中进行配置时

注意

在步骤6的查看阶段中,在确认所有配置无误的情况下,请单击“Create Cluster”按钮以生成集群。随后,请耐心等待集群启动完成。

单击左侧侧边菜单中的"Clusters"选项,在定位到之前创建的一个集群位置时,请单击右侧连接按钮图标以启动Web浏览器程序。随后转向左侧侧边菜单中的"Notebooks"选项卡,并新增一个笔记本文件以录入以下代码段。

复制代码
    from pyspark import SparkConf, SparkContext
    
    conf = SparkConf().setAppName("TestApp").setMaster("yarn")\
                 .set("spark.executor.instances", "2")\
                 .set("spark.executor.memoryOverhead", "1g")\
                 .set("spark.executor.cores", "2")\
                 .set("spark.driver.cores", "1")\
                 .set("spark.dynamicAllocation.enabled", "true")\
                 .set("spark.shuffle.service.enabled", "true")
    sc = SparkContext(conf=conf)
    
    textData = sc.textFile("/mnt/data/example.txt")
    print(textData.first()) # output first line of text file
    
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

该代码初始化了一个SparkContext并将其配置为以Yarn作为Master URL。随后启用了两个Executor节点,并为每个Executor分配了2个计算核心以及额外1GB的内存资源。SparkContext随后自动部署动态内存资源并开启了Shuffle服务以支持数据传输需求。

使用S3高效存储数据

S3是一种基于对象的云存储服务, 其核心功能是提供安全可靠且具有高可用性的数据存储解决方案. 该服务能够支持多种类型的文件内容, 包括图片视频音频以及数据备份等关键业务资产.

我们可以使用boto3库,通过Python上传、下载和管理S3中的数据。

复制代码
    import boto3
    
    s3 = boto3.resource('s3')
    
    s3.meta.client.upload_file('/tmp/example.csv', 'your-bucket', 'path/to/example.csv')
    
    object = s3.Bucket('your-bucket').Object('path/to/example.csv').download_file('/tmp/downloaded-example.csv')
    
      
      
      
      
      
      
    
    代码解读

该代码不仅展示了如何将本地文件上传至S3平台,并且说明了从该存储服务获取文件后将其下载至本地设备的具体操作流程。此外,它还指导您删除存储在S3平台上的特定文件。

使用Lambda与API Gateway构建Serverless应用

Lambda是一种无需管理本地服务器的云服务。该服务使开发者只需专注于业务逻辑,并无需处理底层的服务器配置、集群管理和基础设施维护。

AWS提供API Gateway服务,可以帮助我们构建、发布、维护、保护和监控API,并确保其安全运行。这些功能由Lambda函数发起请求来执行具体的业务逻辑,并返回响应。

复制代码
    AWSTemplateFormatVersion: 2010-09-09
    Transform: AWS::Serverless-2016-10-31
    Description: Example Serverless Application
    Resources:
      MyFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: index.handler
      Runtime: nodejs10.x
      CodeUri: functions/
      Description: An example Lambda function that returns a message
      MemorySize: 128
      Timeout: 3
      Events:
        ApiEvent:
          Type: Api
          Properties:
            Path: '/hello'
            Method: get
      ApiGatewayRestApi:
    Type: AWS::ApiGateway::RestApi
    Properties:
      Body: '{"swagger": "2.0","info":{"title":"My API","version":"1.0"}}'
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

该模板中包含了一个简单的API网关接口,并导致了lambda函数被调用。该调用者充当一个index.js文件,在接收HTTP请求后生成相应的JSON消息。

复制代码
    exports.handler = async (event, context) => {
      const response = {
    statusCode: 200,
    body: JSON.stringify({message: 'Hello world!'})
      };
    
      return response;
    };
    
      
      
      
      
      
      
      
    
    代码解读

以下是一个使用Python调用API Gateway的例子:

复制代码
    import requests
    response = requests.get('<api endpoint>/hello')
    if response.status_code == 200:
      print(response.json()['message'])
    else:
      print('Error:', response.text)
    
      
      
      
      
      
    
    代码解读

以上代码调用API Gateway接口,并获取返回的JSON消息。

全部评论 (0)

还没有任何评论哟~