第十一章:Flink CEP 复杂事件处理
各位同学好!今天我们将深入探讨 Flink 提供的复杂事件处理功能(CEP)这一非常关键的技术点。
背景
The Complex Event Processing (CEP) feature in Flink stands out as a particularly notable capability. With respect to the explanation of CEP, we have referenced a segment from Wikipedia.
CEP, is event processing that combines data from multiple sources to infer events or patterns that suggest more complicated circumstances. The goal of complex event processing is to identify meaningful events (such as opportunities or threats) and respond to them as quickly as possible.
AI助手
在我们的实际生产环境中,在伴随对数据实时性的需求日益提高的同时,处理的数据量也呈现持续攀升的趋势。在特定业务领域里,我们需要通过分析连续不断更新的数据流来识别其中具有战略意义的关键事件。
说到底,Flink 的 CEP 到底解决了什么样的问题呢?
例如,在规模庞大的订单流转过程中筛选出一批异常订单;通过分析网站访问日志数据,识别出可能存在恶意登录操作的行为特征;对快递物流过程进行监控和追踪服务时,则能够及时发现可能出现的问题案例。
如若你对 CEP 的理论基础有浓厚的兴趣,则可参考论文 Efficient Pattern Matching over Event Streams。
Flink 对 CEP 的支持高度集成,并且能够提供高度复杂模式匹配服务;该平台在吞吐量和延迟表现上均表现出优异性能。
程序结构
Flink CEP 的程序结构主要分为两个步骤:
- 定义模式
- 匹配结果
我们在官网中可以找到一个 Flink 提供的案例:
DataStream<Event> input = ...
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getId() == 42;
}
}
).next("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
public boolean filter(SubEvent subEvent) {
return subEvent.getVolume() >= 10.0;
}
}
).followedBy("end").where(
new SimpleCondition<Event>() {
@Override
public boolean filter(Event event) {
return event.getName().equals("end");
}
}
);
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
DataStream<Alert> result = patternStream.process(
new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> pattern,
Context ctx,
Collector<Alert> out) throws Exception {
out.collect(createAlertFrom(pattern));
}
});
AI助手
在这个案例中可以看到程序结构分别是:
在以下步骤中首先创建一个名为Pattern的模式。该模式将筛选出那些其id字段值等于42的事件记录。随后筛选出volume字段数值超过10.0的事件,并接着筛选出名称字段值为end的事件。最后将这些符合条件的事件进行处理。
第二步是在输入流上基于预设的 pattern 进行扫描,在识别到与之匹配的内容时触发警报装置。
模式定义
Flink 提供了高度多样化的模式定义功能集作为基础模块,这些功能为我们实现复杂的业务逻辑提供了有力支撑.我们对支持的功能进行了细致分类,完整的API接口文档可供进一步查阅了解
简单模式


联合模式

匹配后的忽略模式

源码解析
我们在上面的官网案例中可以发现,Flink CEP 的整个过程是:
- 从一个数据源(数据源)作为输入流。
- 经过应用一个模式算子将其转换为PatternStream。
- 经过先进行选择操作再进行处理操作生成DataStream。
我们来看一下 select 和 process 算子都做了什么?

可以看到最终的逻辑都是在 PatternStream 这个类中进行的。
public <R> SingleOutputStreamOperator<R> process(
final PatternProcessFunction<T, R> patternProcessFunction,
final TypeInformation<R> outTypeInfo) {
return builder.build(
outTypeInfo,
builder.clean(patternProcessFunction));
}
AI助手
通过PatternStreamBuilder的build方法生成一个具体的SingleOutputStreamOperator实例,并且该类继承自DataStream

最终的处理计算逻辑都集成在CepOperator类中,并且该类中的processElement方法负责对每一条数据进行处理。

同时因为 CepOperator遵循Triggerable接口而导致触发定时器的行为发生。该核心处理逻辑均包含在 updateNFA 方法内。

入口在这里:
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
Collection<Map<String, List<IN>>> patterns =
nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
processMatchedSequences(patterns, timestamp);
}
}
AI助手
The full name of NFA is Non-deterministic Finite Automaton (NFA), which encompasses various states and their transition relationships within pattern matching.
在NFA这个类中,核心功能主要由两个关键方法——process和advanceTime来实现。这两个关键方法的实现相对复杂,在一定程度上可以将其概括为:每当一条新的输入数据到达时,会引起整个状态机的状态转换。
实战案例
我们模仿电商网站用户的搜索行为作为数据输入的基础来源,在系统中识别出浏览相同产品的用户群体,并触发相应的报警信息。
代码如下:
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource source = env.fromElements(
//浏览记录
Tuple3.of("Marry", "外套", 1L),
Tuple3.of("Marry", "帽子",1L),
Tuple3.of("Marry", "帽子",2L),
Tuple3.of("Marry", "帽子",3L),
Tuple3.of("Ming", "衣服",1L),
Tuple3.of("Marry", "鞋子",1L),
Tuple3.of("Marry", "鞋子",2L),
Tuple3.of("LiLei", "帽子",1L),
Tuple3.of("LiLei", "帽子",2L),
Tuple3.of("LiLei", "帽子",3L)
);
//定义Pattern,寻找连续搜索帽子的用户
Pattern<Tuple3<String, String, Long>, Tuple3<String, String, Long>> pattern = Pattern
.<Tuple3<String, String, Long>>begin("start")
.where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return value.f1.equals("帽子");
}
}) //.timesOrMore(3);
.next("middle")
.where(new SimpleCondition<Tuple3<String, String, Long>>() {
@Override
public boolean filter(Tuple3<String, String, Long> value) throws Exception {
return value.f1.equals("帽子");
}
});
KeyedStream keyedStream = source.keyBy(0);
PatternStream patternStream = CEP.pattern(keyedStream, pattern);
SingleOutputStreamOperator matchStream = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Long>, String>() {
@Override
public String select(Map<String, List<Tuple3<String, String, Long>>> pattern) throws Exception {
List<Tuple3<String, String, Long>> middle = pattern.get("middle");
return middle.get(0).f0 + ":" + middle.get(0).f2 + ":" + "连续搜索两次帽子!";
}
});
matchStream.printToErr();
env.execute("execute cep");
}
AI助手
上述代码的逻辑我们可以分解如下。
首先构建了一个数据源,并模拟了用户的行为数据。接着创建了自定义的Pattern以分析用户行为特征。该模式的核心特征是连续两次搜索商品"帽子"后进行关联匹配,并当匹配成功时会立即生成并显示一条提示信息至控制台界面。

可以看到,提示信息已经打印在了控制台上。
总结
本节课程主要阐述了Flink CEP的支持机制及其实现方式,并通过一个简化的电商搜索场景演示了实时搜索结果提示功能的实现方法。此外还介绍了模式匹配技术在实际应用中的拓展价值包括但不限于网络攻击检测运维监控日志分析等功能建议读者深入研究Flink官方文档以掌握更多实用技巧
我们下一节课将进入到“Flink 常用的 Source 和 Connector”的学习。
