Spark-zeppelin-大数据可视化分析
官网介绍
Multi-purpose Notebook
The Notebook is the place for all your needs
- 数据吞噬/收集
- 数据挖掘/探索
- 数据分析
- 数据可视化与协作

Multiple language backend
Zeppelin's interpreter concept enables a variety of language/data-processing backends to seamlessly integrate into the platform. Currently, Zeppelin supports a range of interpreters including Scala with Apache Spark, Python with Apache Spark, etc., along with other tools like SparkSQL, Hive, Markdown and Shell.

The task of developing a new language-backend is quite straightforward. It involves learning the methodology for writing a Zeppelin interpreter.
Apache Spark integration
Zeppelin offers pre-integrated Apache Spark support. It is not necessary to create an additional module, plugin, or library for this.

Zeppelin's Spark integration provides
- Automated injection of SparkContext and SQLContext.
- The runtime jar dependency is loaded from the local filesystem or the Maven repository. For further details, refer to dependency loader.
- Submitting a job for cancellation along with its progress tracking.
Data visualization
The following visualization types are natively supported: basic charts. Moreover, visualizations extend beyond SparkSQL queries, enabling the identification and representation of outputs across various language backends.


Pivot chart
A simple drag-and-drop interface allows Zeppelin to aggregate values and display them in a pivot chart. It is straightforward to create a chart that displays multiple aggregated values such as sum, count, average, minimum, and maximum.

Explore further into Zeppelin's Display engine. ( [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem/display.html ] , [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem-display.html#html ] , [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem-table.html ] , [ https://zeppelin.incubator.apache.org/docs/0.5.6-incubating/displaysystem-angular.html ] )
Dynamic forms
Zeppelin can dynamically create some input forms into your notebook.

Delve deeper into the world of Dynamic Forms for comprehensive insights and features.
Collaboration
The Notebook URL is accessible to multiple collaborators. Zeppelin therefore broadcasts any modifications in real-time, mirroring the collaborative approach employed by Google Docs.

Publish
Zeppelin offers a link to display the result solely, and this page excludes Zeppelin’s menu and buttons. Through this method, one can seamlessly integrate it as an iframe within The website.

100% Opensource
Apache Zeppelin处于Beta阶段,并遵循Apache 2.0授权协议作为其软件基础。请访问以下链接获取源代码仓库以及详细的贡献指南:source repository 和 How to contribute
Zeppelin boasts a very lively and highly engaged development community. Subscribing to the mailing list at Mailing list allows you to submit bugs and stay updated on the project's progress.
Undergoing Incubation
Apache Zeppelin represents an ongoing incubation initiative within The Apache Software Foundation (ASF), which is supported by the Incubator program. This phase requires all new projects to undergo incubation until comprehensive evaluation confirms that their infrastructure, communication mechanisms, and decision-making processes align with those of established ASF initiatives. Although this incubation status does not necessarily reflect complete code maturity or system stability, it serves as evidence that these projects have not yet been officially recognized and endorsed by The Apache Software Foundation.
安装
From binary package
Get the latest binary package through the download link at Download.
解析
Build from source
Refer to the instructions in the [ README ] file for how to build from source.
Configure
Configuration can be achieved through either environment variables (conf/zeppelin-env.sh) or Java properties (conf/zeppelin-site.xml). If both are specified, the environment variable is utilized.
| zepplin-env.sh | zepplin-site.xml | Default value | Description |
|---|---|---|---|
| ZEPPELIN_PORT | zeppelin.server.port | 8080 | Zeppelin server port. |
| ZEPPELIN_MEM | N/A | -Xmx1024m -XX:MaxPermSize=512m | JVM mem options |
| ZEPPELIN_INTP_MEM | N/A | ZEPPELIN_MEM | JVM mem options for interpreter process |
| ZEPPELIN_JAVA_OPTS | N/A | JVM Options | |
| ZEPPELIN_ALLOWED_ORIGINS | zeppelin.server.allowed.origins | * | Allows a way to specify a ',' separated list of allowed origins for rest and websockets. i.e. http://localhost:8080 |
| ZEPPELIN_SERVER_CONTEXT_PATH | zeppelin.server.context.path | / | Context Path of the Web Application |
| ZEPPELIN_SSL | zeppelin.ssl | false | |
| ZEPPELIN_SSL_CLIENT_AUTH | zeppelin.ssl.client.auth | false | |
| ZEPPELIN_SSL_KEYSTORE_PATH | zeppelin.ssl.keystore.path | keystore | |
| ZEPPELIN_SSL_KEYSTORE_TYPE | zeppelin.ssl.keystore.type | JKS | |
| ZEPPELIN_SSL_KEYSTORE_PASSWORD | zeppelin.ssl.keystore.password | ||
| ZEPPELIN_SSL_KEY_MANAGER_PASSWORD | zeppelin.ssl.key.manager.password | ||
| ZEPPELIN_SSL_TRUSTSTORE_PATH | zeppelin.ssl.truststore.path | ||
| ZEPPELIN_SSL_TRUSTSTORE_TYPE | zeppelin.ssl.truststore.type | ||
| ZEPPELIN_SSL_TRUSTSTORE_PASSWORD | zeppelin.ssl.truststore.password | ||
| ZEPPELIN_NOTEBOOK_HOMESCREEN | zeppelin.notebook.homescreen | Id of notebook to be displayed in homescreen ex) 2A94M5J1Z | |
| ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE | zeppelin.notebook.homescreen.hide | false | hide homescreen notebook from list when this value set to "true" |
| ZEPPELIN_WAR_TEMPDIR | zeppelin.war.tempdir | webapps | The location of jetty temporary directory. |
| ZEPPELIN_NOTEBOOK_DIR | zeppelin.notebook.dir | notebook | Where notebook file is saved |
| ZEPPELIN_NOTEBOOK_S3_BUCKET | zeppelin.notebook.s3.bucket | zeppelin | Bucket where notebook saved |
| ZEPPELIN_NOTEBOOK_S3_USER | zeppelin.notebook.s3.user | user | User in bucket where notebook saved. For example bucket/user/notebook/2A94M5J1Z/note.json |
| ZEPPELIN_NOTEBOOK_STORAGE | zeppelin.notebook.storage | org.apache.zeppelin.notebook.repo.VFSNotebookRepo | Comma separated list of notebook storage |
| ZEPPELIN_INTERPRETERS| zeppelin.interpreters| org.apache.zeppelin.spark.SparkInterpreter,
org.apache.zeppelin.spark.PySparkInterpreter,
org.apache.zeppelin.spark.SparkSqlInterpreter,
org.apache.zeppelin.spark.DepInterpreter,
org.apache.zeppelin.markdown.Markdown,
org.apache.zeppelin.shell.ShellInterpreter,
org.apache.zeppelin.hive.HiveInterpreter
...| Comma separated interpreter configurations [Class]. First interpreter become a default |
|ZEPPELIN_INTERPRETER_DIR|zeppelin.interpreter.dir|interpreter|Zeppelin interpreter directory|
You will also have to set up each interpreter individually. Detailed information is provided within the 'Interpreter' section of this documentation.
An example of Spark is the open-source distributed computing framework provided by 官网链接 for large-scale data processing.
Start/Stop
Start Zeppelin
bin/zeppelin-daemon.sh start
AI助手
After successful start, visit http://localhost:8080 with your web browser.
Stop Zeppelin
bin/zeppelin-daemon.sh stop
AI助手
实践例子:
Zeppelin Tutorial
We will require you to have Zeppelin installed already. If that is not the situation, please visit the installation guide at this link for detailed instructions.
Zeppelin's present core backend processing engine is the Apache Spark project. For newcomers, we could suggest starting with an understanding of its operation to maximize your use of Zeppelin.
Tutorial with Local File
Data Refine
Before you begin the Zeppelin tutorial, you will require downloading the bank.zip file from bank.zip.
Please refer to the following steps for data transformation: first, convert the CSV-formatted data into an RDD populated with Bank objects. Additionally, the header row will be excluded through the use of the filter method in the subsequent code snippet.
val bankText = sc.textFile("yourPath/bank/bank-full.csv")
case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)
// split each line, filter out header (starts with "age"), and map it into Bank case class
val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
s=>Bank(s(0).toInt,
s(1).replaceAll("\"", ""),
s(2).replaceAll("\"", ""),
s(3).replaceAll("\"", ""),
s(5).replaceAll("\"", "").toInt
)
)
// convert to DataFrame and create temporal table
bank.toDF().registerTempTable("bank")
AI助手
Data Retrieval
Suppose we want to see age distribution from bank. To do this, run:
%sql select age, count(1) from bank where age < 30 group by age order by age
AI助手
You can create an age constraint input field by replacing 30 with ${maxAge=30}.
%sql select age, count(1) from bank where age < ${maxAge=30} group by age order by age
AI助手
Now, we aim to analyze the age distribution across different marital statuses and incorporate a dropdown menu for selecting the specific marital status. Execute:
%sql select age, count(1) from bank where marital="${marital=single,single|divorced|married}" group by age order by age
AI助手
Tutorial with Streaming Data
Data Refine
Since this tutorial is drawn from Twitter's sample tweet stream, you must authenticate using a Twitter account. To proceed, visit the Twitter Credential Setup guide. Once you have obtained your API keys, you should enter the credential-related values (including apiKey, apiSecret, accessToken, and accessTokenSecret) into the following script to complete the setup.
A RDD of Tweet objects will be generated, and the stream data will be stored in a table.
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.storage.StorageLevel
import scala.io.Source
import scala.collection.mutable.HashMap
import java.io.File
import org.apache.log4j.Logger
import org.apache.log4j.Level
import sys.process.stringSeqToProcess
/** Configures the Oauth Credentials for accessing Twitter */
def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
val configs = new HashMap[String, String] ++= Seq(
"apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
println("Configuring Twitter OAuth")
configs.foreach{ case(key, value) =>
if (value.trim.isEmpty) {
throw new Exception("Error setting authentication - value for " + key + " not set")
}
val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
System.setProperty(fullKey, value.trim)
println("\tProperty " + fullKey + " set as [" + value.trim + "]")
}
println()
}
// Configure Twitter credentials
val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
import org.apache.spark.streaming.twitter._
val ssc = new StreamingContext(sc, Seconds(2))
val tweets = TwitterUtils.createStream(ssc, None)
val twt = tweets.window(Seconds(60))
case class Tweet(createdAt:Long, text:String)
twt.map(status=>
Tweet(status.getCreatedAt().getTime()/1000, status.getText())
).foreachRDD(rdd=>
// Below line works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use rdd.registerTempTable("tweets") instead.
rdd.toDF().registerAsTable("tweets")
)
twt.print
ssc.start()
AI助手
Data Retrieval
Among the subsequent scripts, upon running each one, you will observe varying outcomes due to their reliance on real-time data.
Let's begin by extracting maximum 10 tweets which contain the word "girl".
%sql select * from tweets where text like '%girl%' limit 10
AI助手
This time, we aim to examine the number of tweets generated per second over the past 60 seconds. To achieve this, calculate:
%sql select createdAt, count(1) from tweets group by createdAt order by createdAt
AI助手
It is possible to create a user-defined function and integrate it into Spark SQL operations. We can attempt this by creating a sentiment analysis function named sentiment. The sentiment analysis function will determine the attitude (positive, negative, or neutral) of the input parameter.
def sentiment(s:String) : String = {
val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
val negative = Array("hate", "bad", "stupid", "is")
var st = 0;
val words = s.split(" ")
positive.foreach(p =>
words.foreach(w =>
if(p==w) st = st+1
)
)
negative.foreach(p=>
words.foreach(w=>
if(p==w) st = st-1
)
)
if(st>0)
"positivie"
else if(st<0)
"negative"
else
"neutral"
}
// Below line works only in spark 1.3.0.
// For spark 1.1.x and spark 1.2.x,
// use sqlc.registerFunction("sentiment", sentiment _) instead.
sqlc.udf.register("sentiment", sentiment _)
AI助手
To examine how people perceive girls using the sentiment analysis tool we implemented earlier, run this command:
%sql select sentiment(text), count(1) from tweets where text like '%girl%' group by sentiment(text)
AI助手
