RisingWave分布式SQL流处理数据库调研
概述

RisingWave是一款分布式流式数据库平台 ,专为云计算环境下的大规模数据流处理设计。它旨在减轻用户在开发实时应用时的成本负担。该系统提供类似PostgreSQL易用性的功能模块,并声称其吞吐量较Flink提升了10倍以上的同时运营成本显著降低。在RisingWave平台中进行SQL操作无需额外配置网络参数设置等细节工作
*RisingWave在功能上与Flink有显著区别:不仅支持流处理功能还能实现数据持久化;相比之下,Flink仅专注于流处理功能,在结果生成后需经由外部数据库完成数据存储工作。官方资料显示该系统可全面替代现有的FlinkSQL功能。两者架构对比图如下所示:

- RisingWave与传统批次型数据库的主要区别在于其支持流水线型数据处理,并能够根据预先设定的逻辑进行实时数据分析。官方表示实现了流水线与批量处理的无缝衔接,并且传统批次型数据库仅支持离线批量处理.*
使用场景
RisingWave的核心优势在于其强大的流处理能力,并采用行存架构设计,在处理高并发的行数据查询时表现尤为出色。然而这种架构并不适用于执行全表扫描任务。
该平台的主要应用场景涵盖了监控系统建设与部署以及报警信息管理两大核心模块,并可快速生成实时动态报表;同时支持构建高效的流式ETL管道以及开展机器学习相关的特征工程;已在金融交易监控系统建设与部署以及制造过程自动化管理等方面取得显著成效。
值得注意的是:
- 虽然 RisingWave 不适用于执行复杂的数据分析任务;
- 若需进行复杂的数据分析,则需要将数据同步至实时数据分析数据库;
- 不少企业会采用 RisingWave 与 ClickHouse 等实时数据分析平台的结合配置:
- 在 RisingWave 上完成高效的流计算
- 同时在实盘数据分析平台上开展复杂的数据挖掘工作
- 此外还支持通过RisingWate Sink接口接入OLAP引擎
- 更多详细信息可参考官方文档RisingWave Sink
注意事项: 按照系统设计原则,在实际应用环境中建议遵循以下操作规范配置RisingWave服务组件。该系统组件不直接参与传统的全功能事务处理职责体系(即不支持Read/Write型事务处理),但在具体实现层面仅限于提供单向可持久化(只读)服务功能模块的支持。建议在实际应用中将RisingWave配置为置于基于交易的数据库架构下游的最佳实践方案。该系统组件主要通过基于事件驱动的数据一致性控制机制(CCM,CDC)实现对已序列化的历史数据进行按需访问功能扩展需求。
RisingWave 应用
部署
RisingWave 单机试玩模式
docker run -itd \
-p 4566:4566 \
-p 5691:5691 \
--privileged \
--name=risingwave \
risingwavelabs/risingwave:latest playground
bash
RisingWave的单机Docker Compose部署方案(建议在测试中采用此方案进行部署;后续的所有测试工作均以该方案为基础展开)
clone the risingwave repository.
进入docker目录
cd docker
启动RisingWave集群
#使用MinIO存储状态后端,standalone模式启动
export RW_IMAGE=risingwavelabs/risingwave:latest
export ENABLE_TELEMETRY=true
docker compose up -d
#使用MinIO存储状态后端,distributed模式启动
export RW_IMAGE=risingwavelabs/risingwave:latest
export ENABLE_TELEMETRY=true
docker compose -f docker-compose-distributed.yml up
bash

使用MinIO存储状态后端,distributed模式启动,相关服务如下:

安装postgresql客户端
基于RisingWave与PostgreSQL协议的兼容性,在使用PostgreSQL客户端时可以方便地操作RisingWave系统。为了获取该软件包,请访问[此处]并下载相应的工具包
yum install -y postgresql
使用 psql 连接
psql -h localhost -p 4566 -d dev -U root
启动mysql并开启binlog
- 启动mysql
# 查看详细默认配置
docker run -it --rm mysql:5.7 --verbose --help
#启动mysql server
docker run -d \
--name mysql5.7 \
--restart=always \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=123456 \
-v /data/mysql5.7/data:/var/lib/mysql \#数据文件
-v /data/mysql5.7/conf:/etc/mysql/conf.d \#配置文件
-v /data/mysql5.7/log:/var/log \#日志文件
mysql:5.7 \
--character-set-server=utf8mb4 \
--collation-server=utf8mb4_unicode_ci \
--log-bin=/var/lib/mysql/mysql-bin \#开启binlog配置
--server-id=2 #开启binlog配置
bash

- 链接mysql
docker exec -it mysql5.7 mysql -h127.0.0.1 -P3306 -p’123456’
- 验证是否开启 binlog
show variables like ‘%log_bin%’;
- 授权
--授权RisingWave作为slave访问mysql binlog
grant RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT on *.* to 'root'@'%' IDENTIFIED BY '123456';
--grant ALL PRIVILEGES on db01.* to 'root'@'%' IDENTIFIED BY '123456';
flush privileges;
--取消授权,如有需要
REVOKE GRANT OPTION on *.* FROM 'root'@'%';
REVOKE ALL PRIVILEGES on *.* FROM 'root'@'%';
REVOKE ALL PRIVILEGES on db01.* FROM 'root'@'%';
flush privileges;
--查看授权
show grants for root@'%';
sql

部署kafka
- 启动kafka
# step-1
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper:latest
# step-2
# 启动Kafka,将以下的俩个192.168.1.100换为本身的IP地址bash
docker run -d \
--name kafka \
--restart=always \
-p 8092:8092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.1.100:2181/kafka \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:8092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:8092 \
-t wurstmeister/kafka
bash

- 与kafka交互
#list
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --list
#create topic
docker run -it --rm wurstmeister/kafka kafka-topics.sh --bootstrap-server 192.168.1.100:8092 --create --replication-factor 1 --partitions 1 --topic test2
#producer
docker run -it --rm wurstmeister/kafka kafka-console-producer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
#consumer
docker run -it --rm wurstmeister/kafka kafka-console-consumer.sh --bootstrap-server 192.168.1.100:8092 --topic test1
bash

- 或通过kcat与kafka交互
docker pull edenhill/kcat:1.7.1
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C
bash
RisingWave 使用demo
- 数据导出sink demo
-- create table
CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
-- create sink
CREATE SINK test_sink_1
FROM t1
WITH (
properties.bootstrap.server = '192.168.1.100:8092',
topic = 'test_sink_topic',
connector = 'kafka',
primary_key = 'v1'
)
FORMAT UPSERT ENCODE JSON;
sql

查看kafka sink 结果
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t test_sink_topic -C -J
- 连接器 source
--source 连接器
CREATE SOURCE IF NOT EXISTS source_1 (
v1 integer,
v2 integer,
)
WITH (
connector='kafka',
topic='test_sink_topic',
properties.bootstrap.server='192.168.1.100:8092',
scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
-- table连接器
CREATE TABLE IF NOT EXISTS table_1 (
v1 integer,
v2 integer,
)
WITH (
connector='kafka',
topic='test_sink_topic',
properties.bootstrap.server='192.168.1.100:8092',
scan.startup.mode='latest',
) FORMAT PLAIN ENCODE JSON;
sql

- Change Data Capture (CDC) 直连 MySQL CDC
--mysql ddl:
create database db01;
use db01;
CREATE TABLE orders (
order_id int(11) NOT NULL AUTO_INCREMENT,
price decimal(11),
PRIMARY KEY (order_id)
);
-- risingwave ddl
CREATE TABLE orders (
order_id int,
price decimal,
PRIMARY KEY (order_id)
) WITH (
connector = 'mysql-cdc',
hostname = '192.168.1.100',
port = '3306',
username = 'root',
password = '123456',
database.name = 'db01',
table.name = 'orders',
);
--mysql dml
insert into orders(price) values(12),(10),(23);
insert into orders(price) values(12),(10);
update orders set price=100 where order_id=1;
delete from orders where order_id=3;
-- risingwave验证数据
select * from orders ;
sql

- 直接导出物化视图/表数据 (CREATE SINK FROM)
CREATE TABLE t11 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
create materialized view mv_t11 as select count(*) from t11;
CREATE SINK sink1 FROM mv_t11
WITH (
connector='kafka',
properties.bootstrap.server='192.168.1.100:8092',
topic='t_sink1'
)
FORMAT PLAIN ENCODE JSON(
force_append_only='true'
);
sql

check结果
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J
- 导出 Query 的数据(CREATE SINK AS)
CREATE TABLE t11 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
CREATE SINK sink2 AS
SELECT
avg(v1) as avg_v1,
avg(v2) as avg_v2
FROM t1
WITH (
connector='kafka',
properties.bootstrap.server='192.168.1.100:8092',
topic='t_sink2'
)
FORMAT PLAIN ENCODE JSON(
force_append_only='true'
);
sql

check结果
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink1 -C -J
CREATE SINK INTO
使用CREATE SINK INTO命令,创建一个SINK,数据插入在RisingWave的表中
CREATE TABLE t1 (v1 int, v2 int)
WITH (
connector = 'datagen',
fields.v1.kind = 'sequence',
fields.v1.start = '1',
fields.v2.kind = 'random',
fields.v2.min = '-10',
fields.v2.max = '10',
fields.v2.seed = '1',
datagen.rows.per.second = '10'
) ROW FORMAT JSON;
create materialized view mv_t1 as select count(*) from t1;
DROP TABLE t11;
CREATE TABLE t11 (
--id int primary key,
v1 int,
v2 int
) APPEND ONLY;
CREATE SINK if not exists t1_sink INTO t11 AS
SELECT v1,v2
FROM t1
WITH
(
type='append-only',
force_append_only='true'
);
sql

总结
RisingWave 支持与 PostgreSQL 兼容的标准SQL接口,并允许用户以 PostgreSQL 的方式管理数据流。通过避免涉及实时处理的技术挑战(如状态存储、数据一致性及分布式扩展问题)来简化开发流程。为企业迅速构建实时数据分析系统提供了便捷的方法,并支持高效的数据抽取和转换流程。该产品具有以下特性:具备同步实时性(确保数据的即时更新),提供强一致性(仅保证最终一致性),同时具备高可用性和高并发能力;此外还支持流处理语义和资源隔离机制;最后能够应用于一些特定的数据看板监控场景以及实时指标分析领域。
相关文章
存储空间:github 仓库
正式指导文件:官方文档
简明教程:中文文档
个人简介:创始人知乎主页
协作平台链接:Slack
