Establishing a RealTime Big Data Platform for Transport
作者:禅与计算机程序设计艺术
1.简介
Apache Kafka是一个开源分布式流处理平台最初开发于LinkedIn公司主要用于实时数据传输及流动计算随着时间推移现已成为最受欢迎的开源消息代理工具它还是一种快速可靠的分布式存储系统可作为消息队列使用而MongoDB则是一种基于分布式文件存储的技术具有高性能易于扩展等特点那么如何将这两种技术有机融合构建一个适用于交通管理的实时大数据平台呢本文将深入探讨相关技术要点并展示构建方案
2.基本概念
2.1 Apache Kafka
Apache Kafka是一种基于开源理念的流处理平台。该系统旨在提供高效可靠的数据采集、处理与传输能力。该技术采用分布式集群架构,并通过复制与容错机制确保系统的可靠性。该架构允许消费者与生产者之间能够通过可扩展的方式实现异步通信。Apache Kafka的主要特点包括以下几个方面:
- 发布/订阅机制:消息按主题分类后发送出去, 消费者可以根据兴趣选择关注特定主题.
- 持久化存储机制: 系统根据设定策略自动管理数据存储至硬盘的日志记录, 保障数据的安全性和完整性.
- 分布式协调组件: 各个节点维持一致的状态状态, 通过一致性协议协调各节点的状态更新.
- 可扩展性特性: 允许集群内的分区数量动态调整以提升处理能力, 提高处理速率并增强系统的容错机制.
- 超高的处理效率: 利用AQPS算法优化网络连接性能, 每秒可达数百万条消息的传输速度.
Apache Kafka中的一些重要概念如下图所示:
如上文所示,在Kafka系统中生产者负责管理生成与发布流程并发送消息至指定主题中;该集群由多个节点构成,在其中每个节点支持多个主题分区;而每个主题分区通常备份到多台节点上;一旦消息被发布到指定分区中,则其内容将不再发生变更;只有当消费者确认已成功消费该消息时才会触发其删除操作;其中Broker和Zookeeper是该系统的两大核心组件
2.2 MongoDB
MongoDB采用了分布式文件系统作为其存储基础。其核心优势在于能够自动处理数据复制、负载均衡以及故障转移等问题。因此,在处理实时大数据方面具有显著优势。其主要特征如下:
- 基于文档:存储的数据形式类似于JSON对象。
- 动态的查询功能:提供多样化的Query表达式,并能灵活支持各种数据检索需求。
- 高度可扩展性:通过分片集群架构实现数据的水平扩展。
- 在传统关系型数据库中常会遇到的问题MongoDB通过其设计得以有效规避这些挑战
MongoDB中的一些重要概念如下:
数据库是一个 MongoDB 实例可以承载多个独立的存储单元,在每个存储单元中可容纳多个集合(Collection)。
集合是数据库中的一个存储容器,在此容器中存放着大量数据对象。
每个文档都是一个由 BSON 标识符唯一标识的数据对象。
属性是文档中的字段名。
索引是一种优化查询性能的数据结构。
3.核心算法
为了解决开发一个实时的交通管理平台并将其集成到Apache Kafka和MongoDB之间的问题,我们需要明确如何利用这两个数据库系统的优势。接下来我们将详细探讨这三种方案:
3.1 直接写入MongoDB
这种处理方式包括Apache Kafka生产者通过接口将消息传递给MongoDB数据库。在实际应用中,可能存在效率上的瓶颈因素可能导致处理速度较慢.由此可见,在实时场景中应用这种方案并不理想.
3.2 使用Kafka Connect将数据从Kafka导入MongoDB
基于两者均为开源系统的特点,在此可借助开源工具Kafka Connect实现异源数据与目标数据之间的同步。该方式要求搭建一个Connector插件,在其中至少包括以下三个组件:
- Data Source Port: 用于获取原始数据源的通道口(如Kafka主题)。
- Export Port: 汇出端口用于将信息发送到目标系统的存储层(如MongoDB数据库)。
- Data Converter: 用于将信息格式转换为所需结构的组件。
该方法能够实现实时性,但是需要配置Connector,增加维护难度。
3.3 使用Storm实时处理数据,将结果导入MongoDB
Storm是一种基于流处理的平台;该平台能够接收并处理来自Kafka的消息流;完成数据实时分析与决策;并将处理结果存储在MongoDB数据库中。
该方案无需额外配置,并且也不必编写Connector。然而其缺点在于不够实时仅能进行批量导入操作。
综上所述,在搭建实时的交通管理平台时,建议采用Storm方案,并结合MongoDB系统进行整合与优化。具体的实施步骤如下:Storm将对实时采集的数据进行处理,并将处理后的数据导入MongoDB系统中。
- 安装与启动MongoDB:按照安装指南完成MongoDB的安装与启动流程,并创建以"traffic"命名的数据库以及以"trajectories"命名的集合。
- 建立一个Storm拓扑结构:在storm集群中启动一个topology实例,在该实例上配置从Kafka topic "trajectories"读取轨迹信息,并对数据执行聚合统计等处理操作后将结果输出至MongoDB数据库中的集合 traffic.trajactories。
- 验证 Storm 拓扑结构:通过命令行界面或 storm UI工具来检查topology实例的状态,并监测数据写入MongoDB的速度。
- 部署 storm 拓扑结构:将 storm 拓扑结构部署至生产环境服务器上,在实际负载下调整spout和bolt组件的并发数以充分释放计算资源。
- 配置 storm 集群监控告警规则:根据实际需求设置 storm 集群监控告警策略,在检测到异常情况时自动触发响应机制。
该方法具备了实时处理能力,并且具有较高的稳定性和可靠性。它基于Storm集群架构设计,在本地环境下进行测试,并在生产环境中部署运行。
4.具体代码实例
为了展示上述方法,请提供一个具体的代码示例。在此假设有一个名为trajectories_topic的Kafka主题接收的消息格式如下:
{
"id": "trajectory_1",
"start": {
"lat": 39.99821,
"lng": 116.31143
},
"end": {
"lat": 39.90662,
"lng": 116.40314
},
"points": [
{"lat": 39.99991,"lng": 116.30649},
{"lat": 39.99965,"lng": 116.30541}
]
}
代码解读
此消息中存在一条轨迹数据记录内容,在具体细节上则涵盖了起始位置、终点位置以及轨迹上的关键点位置信息。
为了即时处理此信息,并将其存储于MongoDB数据库中,则可以通过构建一个Storm架构来完成这一功能。
- Spout会从trajectories_topic获取数据,并随后发送给Bolt。
- Bolt负责解析数据,并将它们汇总分析之后被写入MongoDB数据库。
- Storm UI实时跟踪Storm topology的状态,并分析数据输出频率等信息。
下面以Java语言为例,介绍Storm topology的具体代码实现。
首先,我们需要添加Storm的依赖项:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
</dependency>
<!-- MongoDB Java Driver -->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>${mongo.driver.version}</version>
</dependency>
<!-- SLF4J Logging API -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
代码解读
然后,我们定义Storm topology类TrajectoryStatisticsTopology:
import com.mongodb.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.bson.Document;
import java.util.Map;
public class TrajectoryStatisticsTopology extends BaseTopology {
private static final String TRAJECTORIES = "trajectories";
@Override
public void initialize() {
builder.setSpout("trajectory-spout", new TrajctorySpout(), spoutConfig);
builder.setBolt("trajectory-statistics", new TrajctoryStatisticsBolt()).shuffleGrouping("trajectory-spout");
addClickhouseBolts();
addToDashboard("Trajectory Statistics");
}
/** * Creates the ClickHouse bolts which inserts processed data into ClickHouse tables.
*/
protected void addClickhouseBolts() {
// TODO Implement ClickHouse bolts here.
}
}
代码解读
这个类继承自BaseTopology,主要做以下工作:
通过读取Storm配置文件获取相关参数后,并将其用于注册Spout和Bolt。
在系统中加入了clickhouse bolt节点后,并将处理后的数据输出至ClickHouse。
改写说明
import kafka.consumer.ConsumerIterator;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONObject;
import storm.kafka.trident.OpaqueTridentKafkaSpout;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class TrajctorySpout implements IRichSpout {
private OpaqueTridentKafkaSpout opaqueTridentKafkaSpout;
private SpoutOutputCollector collector;
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
Map<String, Object> spoutConfig = Utils.readStormConfig();
spoutConfig.putAll(conf);
opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout().setSpoutConfig(spoutConfig).setTopic(TRAJECTORIES);
}
@Override
public void activate() {
opaqueTridentKafkaSpout.activate();
}
@Override
public void deactivate() {
opaqueTridentKafkaSpout.deactivate();
}
@Override
public void close() {
opaqueTridentKafkaSpout.close();
}
@Override
public void nextTuple() {
ConsumerIterator<byte[]> iterator = opaqueTridentKafkaSpout.getOpaquePartitionedTridentSpout().getConsumer().poll(100);
if (iterator!= null && iterator.hasNext()) {
byte[] messageBytes = iterator.next().value();
JSONObject jsonObj = JSONValue.parseWithException(new String(messageBytes, StandardCharsets.UTF_8));
Document document = createMongoDoc((JSONObject) jsonObj);
collector.emit(new Values(document));
Utils.sleep(100); // Limit emit frequency to avoid overloading Kafka or MongoDb with too many messages at once.
} else {
Utils.sleep(500); // Sleep when no messages available in Kafka topic to save resources.
}
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("document"));
}
/** * Converts a JSONObject object from the Kafka message format to a MongoDB document object.
*/
private static Document createMongoDoc(JSONObject jsonObj) {
Document doc = new Document("_id", jsonObj.get("id"))
.append("startLat", ((JSONObject) jsonObj.get("start")).get("lat"))
.append("startLng", ((JSONObject) jsonObj.get("start")).get("lng"))
.append("endLat", ((JSONObject) jsonObj.get("end")).get("lat"))
.append("endLng", ((JSONArray) jsonObj.get("end")).get(1)) // assuming lng is always the second element of end point array.
.append("points", createPointsArray((JSONArray) jsonObj.get("points")));
return doc;
}
/** * Helper method that converts an JSONArray of point objects to a Points array in the MongoDB document format.
*/
private static Document createPointsArray(JSONArray pointsJsonArr) {
Document pointsDoc = new Document();
int i=0;
while (i < pointsJsonArr.size()) {
JSONObject pointObj = (JSONObject) pointsJsonArr.get(i);
double lat = (double) pointObj.get("lat");
double lng = (double) pointObj.get("lng");
Point point = new Point(lat, lng);
pointsDoc.append("" + i++, point);
}
return pointsDoc;
}
}
代码解读
该类实现了open方法。它初始化了OpaqueTridentKafkaSpout实例,并通过调用其setSpoutConfig方法设置了相关配置参数。随后,该类打开了OpaqueTridentKafkaSpout实例,并声明了一个实现本类的output collector对象。
接下来我们将该系统命名为TrajectoryStatisticsBolt它主要负责接收并处理 incoming消息并对收集到的数据执行聚合统计操作并将计算得出的结果整合后存入MongoDB数据库
import com.mongodb.*;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.FailedException;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import org.bson.Document;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class TrajctoryStatisticsBolt extends BaseBasicBolt {
private DB mongoClient;
private DBCollection trajectoryColl;
@Override
public void prepare(Map conf, TopologyContext context) {
try {
String hostName = System.getProperty("MONGODB_HOST", "localhost");
Integer portNumber = Integer.getInteger("MONGODB_PORT", 27017);
MongoCredential credential = MongoCredential.createScramSha1Credential("", "", "");
ServerAddress serverAddr = new ServerAddress(hostName, portNumber);
mongoClient = new MongoClient(serverAddr, List.of(credential));
DB db = mongoClient.getDatabase("traffic");
trajectoryColl = db.getCollection("trajectories");
} catch (UnknownHostException e) {
throw new FailedException(e);
}
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
Document document = (Document) input.getValueByField("document");
Long timestamp = System.currentTimeMillis();
Date date = new Date(timestamp);
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmmssSSS");
String timeStr = formatter.format(date);
String id = "stats_" + timeStr;
Double startLatitude = document.getDouble("startLat");
Double startLongitude = document.getDouble("startLng");
Double endLatitude = document.getDouble("endLat");
Double endLongitude = document.getDouble("endLng");
Double distance = HaversineDistanceCalculator.calculateHaversineDistance(startLatitude, startLongitude, endLatitude, endLongitude);
Double duration = 0.0; // TODO Calculate trip duration based on points list.
insertStatisticsToMongo(id, startLatitude, startLongitude, endLatitude, endLongitude, distance, duration);
}
@Override
public void cleanup() {
mongoClient.close();
}
/** * Inserts statistics record into MongoDB database.
*/
private void insertStatisticsToMongo(String id, Double startLatitude, Double startLongitude, Double endLatitude,
Double endLongitude, Double distance, Double duration) {
BasicDBObject obj = new BasicDBObject()
.append("id", id)
.append("startTime", timestampToString(System.currentTimeMillis()))
.append("endTime", "")
.append("startPoint", new Point(startLatitude, startLongitude))
.append("endPoint", new Point(endLatitude, endLongitude))
.append("distance", distance)
.append("duration", duration);
Document statsDoc = new Document(obj);
Document query = new Document().append("$and", List.of(
new Document().append("_id", id),
new Document().append("startTime", ""),
new Document().append("endTime", "").ne("")
));
UpdateResult updateRes = trajectoryColl.updateOne(query, new Document().append("$setOnInsert", statsDoc), true);
if (!updateRes.wasAcknowledged()) {
throw new RuntimeException("Failed to insert statistics record.");
}
}
/** * Helper method that converts current timestamp to string representation suitable as a MongoDB ObjectId.
*/
private static String timestampToString(long timestamp) {
long seconds = (timestamp / 1000L) % 60;
long minutes = (timestamp / (1000L*60)) % 60;
long hours = (timestamp / (1000L*60*60)) % 24;
StringBuilder sb = new StringBuilder(Long.toString(hours)).append(":").append(Long.toString(minutes)).append(":").append(Long.toString(seconds));
return sb.toString();
}
}
代码解读
该类实现了prepare方法。该类试图建立与MongoDB数据库的连接,并成功获取了MongoDB数据库及其集合的句柄。随后,该类实现了execute方法。该方法从输入tuple中提取数据,并对其执行聚合统计;然后将统计结果存储于MongoDB数据库中。
最后,我们定义Point类,它封装了经纬度坐标,并提供计算距离的方法:
import com.mongodb.client.model.geojson.Point;
/** * Represents a geographic coordinate in latitude/longitude degrees.
*/
public class Point {
private final double latitude;
private final double longitude;
public Point(double latitude, double longitude) {
this.latitude = latitude;
this.longitude = longitude;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
public double calculateDistance(Point otherPoint) {
double earthRadius = 6371; // km
double dLat = Math.toRadians(otherPoint.getLatitude() - latitude);
double dLon = Math.toRadians(otherPoint.getLongitude() - longitude);
double a = Math.sin(dLat/2)*Math.sin(dLat/2)+
Math.cos(Math.toRadians(latitude))*Math.cos(Math.toRadians(otherPoint.getLatitude()))*
Math.sin(dLon/2)*Math.sin(dLon/2);
double c = 2*Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
double dist = earthRadius*c;
return dist;
}
public Point projectAlongBearingAndDistance(double bearingDegrees, double distanceMeters) {
double angularDist = distanceMeters/(111.32*Math.cos(Math.toRadians(latitude))); // convert meters to kilometers
double brngRads = Math.toRadians(bearingDegrees);
double dr = angularDist*(Math.sin(brngRads)/Math.tan(Math.PI/4+Math.toRadians(latitude)/2));
double destLat = Math.asin(Math.sin(Math.toRadians(latitude))*Math.cos(dr))+
Math.PI/2; // round up to nearest radian
double centralAngleHalfPi = Math.acos(Math.cos(angularDist)*(Math.cos(brngRads))/Math.sin(Math.toRadians(latitude)));
double destLon = Math.toRadians(longitude)+(centralAngleHalfPi-Math.PI)<-(Math.PI)?
Math.toRadians(-180):Math.toRadians(180)-(centralAngleHalfPi-Math.PI)>Math.PI?
Math.toRadians(180):Math.toRadians(bearingDegrees)-((centralAngleHalfPi-Math.PI)/(Math.PI*2))*360;
return new Point(Math.toDegrees(destLat), Math.toDegrees(destLon));
}
}
代码解读
该类提供两种计算距离的方法:其中之一是经典的haversine距离公式;另一个是球面均匀切线法;此外该类还提供了名为projectAlongBearingAndDistance的方法;根据给定起始点、方向角以及指定的距离参数,则可确定目标点的位置
我们可以通过将这些类整合到Storm topology中进行开发,最终实现了我们的实时交通管理平台
