Advertisement

Spark-zeppelin-大数据可视化分析

阅读量:

官网介绍

Multi-purpose Notebook

The Notebook is the place for all your needs

  • 数据吞噬/收集
  • 数据挖掘/探索
  • 数据分析
  • 数据可视化与协作

Multiple language backend

Zeppelin's interpreter concept enables a variety of language/data-processing backends to seamlessly integrate into the platform. Currently, Zeppelin supports a range of interpreters including Scala with Apache Spark, Python with Apache Spark, etc., along with other tools like SparkSQL, Hive, Markdown and Shell.

The task of developing a new language-backend is quite straightforward. It involves learning the methodology for writing a Zeppelin interpreter.

Apache Spark integration

Zeppelin offers pre-integrated Apache Spark support. It is not necessary to create an additional module, plugin, or library for this.

Zeppelin's Spark integration provides

  • Automated injection of SparkContext and SQLContext.
    • The runtime jar dependency is loaded from the local filesystem or the Maven repository. For further details, refer to dependency loader.
    • Submitting a job for cancellation along with its progress tracking.

Data visualization

The following visualization types are natively supported: basic charts. Moreover, visualizations extend beyond SparkSQL queries, enabling the identification and representation of outputs across various language backends.

Pivot chart

A simple drag-and-drop interface allows Zeppelin to aggregate values and display them in a pivot chart. It is straightforward to create a chart that displays multiple aggregated values such as sum, count, average, minimum, and maximum.

Explore further into Zeppelin's Display engine. ( [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem/display.html ] , [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem-display.html#html ] , [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem-table.html ] , [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem-angular.html ] )

Dynamic forms

Zeppelin can dynamically create some input forms into your notebook.

Delve deeper into the world of Dynamic Forms for comprehensive insights and features.

Collaboration

The Notebook URL is accessible to multiple collaborators. Zeppelin therefore broadcasts any modifications in real-time, mirroring the collaborative approach employed by Google Docs.

Publish

Zeppelin offers a link to display the result solely, and this page excludes Zeppelin’s menu and buttons. Through this method, one can seamlessly integrate it as an iframe within The website.

100% Opensource

Apache Zeppelin处于Beta阶段,并遵循Apache 2.0授权协议作为其软件基础。请访问以下链接获取源代码仓库以及详细的贡献指南:source repositoryHow to contribute

Zeppelin boasts a very lively and highly engaged development community. Subscribing to the mailing list at Mailing list allows you to submit bugs and stay updated on the project's progress.

Undergoing Incubation

Apache Zeppelin represents an ongoing incubation initiative within The Apache Software Foundation (ASF), which is supported by the Incubator program. This phase requires all new projects to undergo incubation until comprehensive evaluation confirms that their infrastructure, communication mechanisms, and decision-making processes align with those of established ASF initiatives. Although this incubation status does not necessarily reflect complete code maturity or system stability, it serves as evidence that these projects have not yet been officially recognized and endorsed by The Apache Software Foundation.

安装

From binary package

Get the latest binary package through the download link at Download.

解析

Build from source

Refer to the instructions in the [ README ] file for how to build from source.

Configure

Configuration can be achieved through either environment variables (conf/zeppelin-env.sh) or Java properties (conf/zeppelin-site.xml). If both are specified, the environment variable is utilized.

zepplin-env.sh zepplin-site.xml Default value Description
ZEPPELIN_PORT zeppelin.server.port 8080 Zeppelin server port.
ZEPPELIN_MEM N/A -Xmx1024m -XX:MaxPermSize=512m JVM mem options
ZEPPELIN_INTP_MEM N/A ZEPPELIN_MEM JVM mem options for interpreter process
ZEPPELIN_JAVA_OPTS N/A JVM Options
ZEPPELIN_ALLOWED_ORIGINS zeppelin.server.allowed.origins * Allows a way to specify a ',' separated list of allowed origins for rest and websockets. i.e. http://localhost:8080
ZEPPELIN_SERVER_CONTEXT_PATH zeppelin.server.context.path / Context Path of the Web Application
ZEPPELIN_SSL zeppelin.ssl false
ZEPPELIN_SSL_CLIENT_AUTH zeppelin.ssl.client.auth false
ZEPPELIN_SSL_KEYSTORE_PATH zeppelin.ssl.keystore.path keystore
ZEPPELIN_SSL_KEYSTORE_TYPE zeppelin.ssl.keystore.type JKS
ZEPPELIN_SSL_KEYSTORE_PASSWORD zeppelin.ssl.keystore.password
ZEPPELIN_SSL_KEY_MANAGER_PASSWORD zeppelin.ssl.key.manager.password
ZEPPELIN_SSL_TRUSTSTORE_PATH zeppelin.ssl.truststore.path
ZEPPELIN_SSL_TRUSTSTORE_TYPE zeppelin.ssl.truststore.type
ZEPPELIN_SSL_TRUSTSTORE_PASSWORD zeppelin.ssl.truststore.password
ZEPPELIN_NOTEBOOK_HOMESCREEN zeppelin.notebook.homescreen Id of notebook to be displayed in homescreen ex) 2A94M5J1Z
ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE zeppelin.notebook.homescreen.hide false hide homescreen notebook from list when this value set to "true"
ZEPPELIN_WAR_TEMPDIR zeppelin.war.tempdir webapps The location of jetty temporary directory.
ZEPPELIN_NOTEBOOK_DIR zeppelin.notebook.dir notebook Where notebook file is saved
ZEPPELIN_NOTEBOOK_S3_BUCKET zeppelin.notebook.s3.bucket zeppelin Bucket where notebook saved
ZEPPELIN_NOTEBOOK_S3_USER zeppelin.notebook.s3.user user User in bucket where notebook saved. For example bucket/user/notebook/2A94M5J1Z/note.json
ZEPPELIN_NOTEBOOK_STORAGE zeppelin.notebook.storage org.apache.zeppelin.notebook.repo.VFSNotebookRepo Comma separated list of notebook storage

| ZEPPELIN_INTERPRETERS| zeppelin.interpreters| org.apache.zeppelin.spark.SparkInterpreter,
org.apache.zeppelin.spark.PySparkInterpreter,
org.apache.zeppelin.spark.SparkSqlInterpreter,
org.apache.zeppelin.spark.DepInterpreter,
org.apache.zeppelin.markdown.Markdown,
org.apache.zeppelin.shell.ShellInterpreter,
org.apache.zeppelin.hive.HiveInterpreter
...| Comma separated interpreter configurations [Class]. First interpreter become a default |
|ZEPPELIN_INTERPRETER_DIR|zeppelin.interpreter.dir|interpreter|Zeppelin interpreter directory|

You will also have to set up each interpreter individually. Detailed information is provided within the 'Interpreter' section of this documentation.

An example of Spark is the open-source distributed computing framework provided by 官网链接 for large-scale data processing.

Start/Stop

Start Zeppelin
复制代码
    bin/zeppelin-daemon.sh start
    
    
    AI助手

After successful start, visit http://localhost:8080 with your web browser.

Stop Zeppelin
复制代码
    bin/zeppelin-daemon.sh stop
    
    
    AI助手

实践例子:

Zeppelin Tutorial

We will require you to have Zeppelin installed already. If that is not the situation, please visit the installation guide at this link for detailed instructions.

Zeppelin's present core backend processing engine is the Apache Spark project. For newcomers, we could suggest starting with an understanding of its operation to maximize your use of Zeppelin.

Tutorial with Local File

Data Refine

Before you begin the Zeppelin tutorial, you will require downloading the bank.zip file from bank.zip.

Please refer to the following steps for data transformation: first, convert the CSV-formatted data into an RDD populated with Bank objects. Additionally, the header row will be excluded through the use of the filter method in the subsequent code snippet.

复制代码
 val bankText = sc.textFile("yourPath/bank/bank-full.csv")

    
  
    
 case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)
    
  
    
 // split each line, filter out header (starts with "age"), and map it into Bank case class  
    
 val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
    
     s=>Bank(s(0).toInt, 
    
         s(1).replaceAll("\"", ""),
    
         s(2).replaceAll("\"", ""),
    
         s(3).replaceAll("\"", ""),
    
         s(5).replaceAll("\"", "").toInt
    
     )
    
 )
    
  
    
 // convert to DataFrame and create temporal table
    
 bank.toDF().registerTempTable("bank")
    
    
    
    
    AI助手
Data Retrieval

Suppose we want to see age distribution from bank. To do this, run:

复制代码
    %sql select age, count(1) from bank where age < 30 group by age order by age
    
    
    AI助手

You can create an age constraint input field by replacing 30 with ${maxAge=30}.

复制代码
    %sql select age, count(1) from bank where age < ${maxAge=30} group by age order by age
    
    
    AI助手

Now, we aim to analyze the age distribution across different marital statuses and incorporate a dropdown menu for selecting the specific marital status. Execute:

复制代码
    %sql select age, count(1) from bank where marital="${marital=single,single|divorced|married}" group by age order by age
    
    
    AI助手

Tutorial with Streaming Data

Data Refine

Since this tutorial is drawn from Twitter's sample tweet stream, you must authenticate using a Twitter account. To proceed, visit the Twitter Credential Setup guide. Once you have obtained your API keys, you should enter the credential-related values (including apiKey, apiSecret, accessToken, and accessTokenSecret) into the following script to complete the setup.

A RDD of Tweet objects will be generated, and the stream data will be stored in a table.

复制代码
 import org.apache.spark.streaming._

    
 import org.apache.spark.streaming.twitter._
    
 import org.apache.spark.storage.StorageLevel
    
 import scala.io.Source
    
 import scala.collection.mutable.HashMap
    
 import java.io.File
    
 import org.apache.log4j.Logger
    
 import org.apache.log4j.Level
    
 import sys.process.stringSeqToProcess
    
  
    
 /** Configures the Oauth Credentials for accessing Twitter */
    
 def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
    
   val configs = new HashMap[String, String] ++= Seq(
    
     "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
    
   println("Configuring Twitter OAuth")
    
   configs.foreach{ case(key, value) =>
    
     if (value.trim.isEmpty) {
    
       throw new Exception("Error setting authentication - value for " + key + " not set")
    
     }
    
     val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
    
     System.setProperty(fullKey, value.trim)
    
     println("\tProperty " + fullKey + " set as [" + value.trim + "]")
    
   }
    
   println()
    
 }
    
  
    
 // Configure Twitter credentials
    
 val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
    
 val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    
 val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    
 val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
    
 configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
    
  
    
 import org.apache.spark.streaming.twitter._
    
 val ssc = new StreamingContext(sc, Seconds(2))
    
 val tweets = TwitterUtils.createStream(ssc, None)
    
 val twt = tweets.window(Seconds(60))
    
  
    
 case class Tweet(createdAt:Long, text:String)
    
 twt.map(status=>
    
   Tweet(status.getCreatedAt().getTime()/1000, status.getText())
    
 ).foreachRDD(rdd=>
    
   // Below line works only in spark 1.3.0.
    
   // For spark 1.1.x and spark 1.2.x,
    
   // use rdd.registerTempTable("tweets") instead.
    
   rdd.toDF().registerAsTable("tweets")
    
 )
    
  
    
 twt.print
    
  
    
 ssc.start()
    
    
    
    
    AI助手
Data Retrieval

Among the subsequent scripts, upon running each one, you will observe varying outcomes due to their reliance on real-time data.

Let's begin by extracting maximum 10 tweets which contain the word "girl".

复制代码
    %sql select * from tweets where text like '%girl%' limit 10
    
    
    AI助手

This time, we aim to examine the number of tweets generated per second over the past 60 seconds. To achieve this, calculate:

复制代码
    %sql select createdAt, count(1) from tweets group by createdAt order by createdAt
    
    
    AI助手

It is possible to create a user-defined function and integrate it into Spark SQL operations. We can attempt this by creating a sentiment analysis function named sentiment. The sentiment analysis function will determine the attitude (positive, negative, or neutral) of the input parameter.

复制代码
 def sentiment(s:String) : String = {

    
     val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
    
     val negative = Array("hate", "bad", "stupid", "is")
    
  
    
     var st = 0;
    
  
    
     val words = s.split(" ")    
    
     positive.foreach(p =>
    
     words.foreach(w =>
    
         if(p==w) st = st+1
    
     )
    
     )
    
  
    
     negative.foreach(p=>
    
     words.foreach(w=>
    
         if(p==w) st = st-1
    
     )
    
     )
    
     if(st>0)
    
     "positivie"
    
     else if(st<0)
    
     "negative"
    
     else
    
     "neutral"
    
 }
    
  
    
 // Below line works only in spark 1.3.0.
    
 // For spark 1.1.x and spark 1.2.x,
    
 // use sqlc.registerFunction("sentiment", sentiment _) instead.
    
 sqlc.udf.register("sentiment", sentiment _)
    
    
    
    
    AI助手

To examine how people perceive girls using the sentiment analysis tool we implemented earlier, run this command:

复制代码
    %sql select sentiment(text), count(1) from tweets where text like '%girl%' group by sentiment(text)
    
    
    AI助手

全部评论 (0)

还没有任何评论哟~