42 | Kafka Streams在金融领域的应用
今天要和你分享的主题是:Kafka Streams 在金融领域的应用。
背景
金融领域涵盖内容广泛,在我的分享中主要探讨的是如何借助大数据技术——尤其是Kafka Streams实时计算框架——以便于我们更有效地进行企业用户分析。
众所周知,在金融领域中获取客户(尤其是高净值人群)的成本非常高。尤其是在一线城市市场中,在获得高净值人群方面面临的挑战往往会导致高昂的成本——单个客户可能需要数千元左右的成本投入才能实现其价值目标。面对这一巨大挑战,在当前环境下无论是通过优化广告投放策略还是实施精准客户服务策略都需要采取相应的措施以确保业务运营效率与客户的长期价值最大化——即 Custom Lifecycle Value(CLV)。
透彻地进行用户洞察成为一个实现价值最大化的关键环节。这意味着你需要透彻地了解并满足客户的个性化需求。即所谓的Know Your Customer(KYC),它要求你不断优化服务细节以提升客户满意度。
通过实施KYC流程,过去的做法通常是投入大量时间和精力与客户进行面对面交流,并进行直接的沟通以获取客户的相关信息.然而,通过这种方式收集到的数据往往不够准确,因为客户内心存在自我保护机制,短期内难以全面把握客户的实际需求.
反过来说,在人们的日常生活的方方面面中深入渗透的大数据分析结果,则准确地反映了客户的实际需求情况。例如客户经常访问哪些网站、购买过哪些商品、最喜欢观看的视频类型是什么等信息都被纳入分析范围。这些看似随意的数据实际上隐藏着客户最真实的意愿与偏好,在经过系统整合处理后便能够全面展现客户的特征与行为模式这就是所谓的用户画像技术
用户画像
用户画像听起来有些抽象化的感觉,在实际应用中却非常贴近我们的日常生活。你提供的各项基本信息信息包括如性别、年龄、职业类别、收入水平以及兴趣爱好等维度上都有所体现。比如我们可以举一个具体的例子比如说某个人物他是男性身份年仅28岁未婚状态收入水平大约在1.5万至2万元之间从事数据开发工程师职位类别生活居于北京市通州区天通苑小区经常加班工作状态并对动漫和游戏类活动表现出浓厚的兴趣
实际上,这一系列特征构成了典型的用户画像模型。其主要任务就是对客户或用户进行分类和分群(Tagging)。在用户的系统中,这些特征组合起来形成了典型的标签集合。通过打标签的形式将客户或用户的详细信息传递给业务人员使用,从而实现精准营销活动。
ID 映射(ID Mapping)
用户画像的好处显而易见,并且打得多不仅显得更丰富,还能让系统更精准地刻画一个人的各个方面。然而,在具体给每个用户贴上一个又一个具体的标签之前,首先要明确"你是谁",这也是所有用户画像系统必须首先解决的核心问题,这也被称为ID识别问题。
所指的 ID 是 Identification标识用户身份信息的方法有五种
身份证号:这是最能表征身份的 ID 信息,每个身份证号只会对应一个人。
手机号码:手机号码一般都能有效地体现一个人的身份信息。然而,在实际应用中...
设定了:在移动互联网时代段内,默认指的是手机或Mac、iPad等移动终端硬件配置标识符。特别强调的是,在这一时代背景下,默认指的是手机或Mac、iPad等移动终端硬件配置标识符。其中值得注意的是,在许多应用场景中,默认具备定位与识别用户的能力。典型的默认配置包括iOS端应用商店使用的IDFA以及Android端应用商店使用的IMEI号码
应用注册账号:这类ID属于具有较弱关联性的ID,在不同应用场景中可能被赋予不同的身份名称。尽管存在大量采用通用身份名称的情况(如用户名),但这种做法仍能体现出一定的关联性和识别特征。
在PC时代 era, 浏览器端的 Cookie 信息 data 是一项关键指标, 它是衡量网络上用户行为 metric 的重要指标之一。然而随着移动互联网 era 的兴起, Cookie 已经变得力不从心, 它作为身份标识 identifier 的作用 value 已经不复存在, 其价值 coefficient 已经达到了前所未有的低谷 level. 个人认为,在构建新一代用户画像的过程中, 随着移动互联网的深入发展, Cookie 可能已经难以承担起其原有的角色 responsibility, 或许它已经被彻底取代了。
在开发用户画像系统时, 我们会在多个数据源上持续不断获取多样化的个人用户数据. 一般而言, 这些数据不包含上述提到的这些 ID 信息. 比如说, 当我们解析浏览器的浏览历史记录时, 能获取到的是 Cookie 数据. 另一方面, 解析用户的访问行为数据时, 我们会获得用户的设备 ID 和注册账号信息.
如果这些数据反映每个用户的信息,请问我们的用户画像系统是如何识别出来的?换句话说,在这种情况下,请问你需要一种手段或技术来实现各个 ID 的打通与映射。
实时 ID Mapping
举一个简短的例子。假设有一位金融理财用户的张三先生。他首先通过苹果手机访问了一款理财产品。接着,在安卓手机上注册了一个账户。最后,在电脑上登录这个账户,并完成了对该理财产品的购买。ID Mapping 的目标是整合不同终端或设备上的用户数据,并建立统一的标识符系统
对于实时进行 ID 映射的需求更为严格,在较短时间内就需要实现这一目标
在实时计算或流处理的视角下,在实时ID映射的情况下,相当于将该问题转化为Stream-Table Join这一类问题;这表明我们能够在实际操作中同时处理数据流与数据库表之间的关联。
在消息流中每个事件或每条消息都对应着一个未知用户的相关信息它可能包括用户浏览网站的历史记录也可能是用户的消费行为记录这些信息通常会涉及我们之前提到的一些具体标识符如设备唯一标识码也有可能包括注册登录后的账号信息
系统提供的所有ID记录在另一方表中存储。随着深度逐步推进,在目标表中会积累越来越多的数据类型。实时输入的数据会被持续补充到该表中,并最终完成全部用户的ID信息整合。
Kafka Streams 实现
我们现在就来探讨如何利用 Kafka Streams 实现特定场景下的实时 ID 映射。为了便于理解,在此我们假设 ID 映射仅涉及身份证号、手机号以及设备 ID 三个字段。以下是按照 Avro 格式书写的详细 Schema 定义:
{
"namespace": "kafkalearn.userprofile.idmapping",
"type": "record",
"name": "IDMapping",
"fields": [
{"name": "deviceId", "type": "string"},
{"name": "idCard", "type": "string"},
{"name": "phone", "type": "string"}
]
}
作为补充说明,《Avro》作为Java或大数据生态系统中的常用序列化编码方案,在实际应用中,默认情况下会采用JSON或XML格式进行对象存储。该方案能够显著降低数据存储占用空间并减少网络I/O通信开销,在处理大规模数据传输时展现出显著优势。
在该情况下,在Kafka生态系统中我们需要两台主题:其中一台用于构建表、另一台用于搭建流。其消息格式均为上述提到的IDMapping对象。
在注册App时需要先提供手机号信息以便系统完成验证流程。当新用户完成注册操作后系统会立即向指定主题发送一条确认消息以确认手机号的有效性。同时该系统会持续监控新用户的各项行为记录并将其同步至第二个主题中以便后续的数据处理工作能够顺利进行。值得注意的是接收方的消息可能会包含额外的信息如手机号码或设备唯一标识符等这些数据有助于确保系统的完整性和可靠性运行效率也能够得到显著提升。
以这一设计理念为基础,在此之前先提供完整的Kafka Streams源码;之后将对其中的关键内容进行深入解析。
package kafkalearn.userprofile.idmapping;
// omit imports……
public class IDMappingStreams {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("Must specify the path for a configuration file.");
}
IDMappingStreams instance = new IDMappingStreams();
Properties envProps = instance.loadProperties(args[0]);
Properties streamProps = instance.buildStreamsProperties(envProps);
Topology topology = instance.buildTopology(envProps);
instance.createTopics(envProps);
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
private Properties loadProperties(String propertyFilePath) throws IOException {
Properties envProps = new Properties();
try (FileInputStream input = new FileInputStream(propertyFilePath)) {
envProps.load(input);
return envProps;
}
}
private Properties buildStreamsProperties(Properties envProps) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private void createTopics(Properties envProps) {
Map<String, Object> config = new HashMap<>();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
try (AdminClient client = AdminClient.create(config)) {
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
envProps.getProperty("stream.topic.name"),
Integer.parseInt(envProps.getProperty("stream.topic.partitions")),
Short.parseShort(envProps.getProperty("stream.topic.replication.factor"))));
topics.add(new NewTopic(
envProps.getProperty("table.topic.name"),
Integer.parseInt(envProps.getProperty("table.topic.partitions")),
Short.parseShort(envProps.getProperty("table.topic.replication.factor"))));
client.createTopics(topics);
}
}
private Topology buildTopology(Properties envProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String streamTopic = envProps.getProperty("stream.topic.name");
final String rekeyedTopic = envProps.getProperty("rekeyed.topic.name");
final String tableTopic = envProps.getProperty("table.topic.name");
final String outputTopic = envProps.getProperty("output.topic.name");
final Gson gson = new Gson();
// 1. 构造表
KStream<String, IDMapping> rekeyed = builder.<String, String>stream(tableTopic)
.mapValues(json -> gson.fromJson(json, IDMapping.class))
.filter((noKey, idMapping) -> !Objects.isNull(idMapping.getPhone()))
.map((noKey, idMapping) -> new KeyValue<>(idMapping.getPhone(), idMapping));
rekeyed.to(rekeyedTopic);
KTable<String, IDMapping> table = builder.table(rekeyedTopic);
// 2. 流-表连接
KStream<String, String> joinedStream = builder.<String, String>stream(streamTopic)
.mapValues(json -> gson.fromJson(json, IDMapping.class))
.map((noKey, idMapping) -> new KeyValue<>(idMapping.getPhone(), idMapping))
.leftJoin(table, (value1, value2) -> IDMapping.newBuilder()
.setPhone(value2.getPhone() == null ? value1.getPhone() : value2.getPhone())
.setDeviceId(value2.getDeviceId() == null ? value1.getDeviceId() : value2.getDeviceId())
.setIdCard(value2.getIdCard() == null ? value1.getIdCard() : value2.getIdCard())
.build())
.mapValues(v -> gson.toJson(v));
joinedStream.to(outputTopic);
return builder.build();
}
}
这个 Java 类代码中的核心功能是 buildTopology 函数 ,该函数实现了对 ID Mapping 整合的完整流程。
在该方法中, 首先创建了一个StreamsBuilder对象实例, 这是构建任何Kafka Streams应用的基础步骤. 随后我们从配置文件中提取所需信息, 以确定将要处理的所有Kafka主题. 在此案例中, 我们总共涉及到了4个主题, 它们各自承担着不同的功能角色:
streamTopic用于记录用户登录App后所触发的各种行为数据。其格式为IDMapping对象转换成的JSON字符串。你可能会有这样的疑问:为什么要选择这种存储方式?原因在于:由于社区版本的Kafka尚未提供Avro序列化/反序列化功能,默认情况下无法直接使用Avro进行数据持久化存储。因此为了实现这一功能不得不转而采用Confluent公司提供的Kafka版本以满足需求。这与我们专栏介绍Apache Kafka的核心初衷相悖因此我们仍然选择JSON作为数据存储格式并利用Avro Code Generator工具来生成相应的set和get方法供开发者操作使用同时Lombok也能达到类似效果
rekeyedTopic:该主题作为一个中间节点,在 streamTopic 中收集手机号并用于生成消息的 Key 同时保持消息内容不变
tableTopic:记录用户在注册App时所输入的手机号。我们要使用这个主题构建用于建立连接所需的表中的数据。
outputTopic
buildTopology的第一步则是构建数据存储结构(KTable对象)。为了确保系统的稳定性与可扩展性需求,在原有消息流的基础上进行重新配置(即对原有的消息流网络进行调整),生成了一个新的中间数据流量(该流量以用户的手机号作为唯一标识符)。随后将该临时流量直接路由至rekeyedTopic进行额外处理,并最终通过builder.table方法实现了KTable对象的具体构建。
有了表之后,在建立好表后我们继续构造消息流来封装用户的登录行为数据,并同时提取了手机号作为连接键用于后续关联
在进行关联操作时, 我们会综合获取双方相关信息, 尽最大努力填充到最后生成的IDMapping对象里, 然后将这个IDMapping实例传递给新建的数据流. 最后, 我们将其存储到outputTopic字段中以供后续使用.
至此,我们仅用了不足 200 行的Java代码便完成了基于真实场景下的实时ID映射系统的构建工作。理论上而言,在这个案例的基础上你可以将其扩展应用到任意数量的一对多或多对多的ID映射场景中,并且还可以处理包含其他附加标记的数据集。其工作原理具有通用性,在我的个人项目实践中得到了良好的验证与应用效果。其中一个是实现用户画像系统的关键模块
小结
今天,在金融领域展示了一个实际应用案例来探讨 Kafka Streams 的功能与用途。重点讲解了如何通过连接函数实现流与表的实时关联,并详细说明了其操作流程和优势所在。实际上,Kafka Streams 所提供的功能远超这些,我建议您参考官网 的相关指南,然后建议您将自己的一些轻量级实时计算任务迁移至 Kafka Streams 中进行处理

开放讨论
让我们深入探讨一个关键问题,在上文中所展示的例子中你认为我选择leftJoin方法而非join方法的原因是什么?(进一步了解可参考SQL中的left join与inner join的区别)

Kafka Streams在金融领域的应用案例充分突显了其在实时计算方面的强大功能。借助实时ID Mapping技术, 金融企业可实现客户生命周期价值的最大化, 同时有效降低获客成本并推动精细化运营。本文详细阐述了使用Kafka Streams实现实时ID Mapping的具体实施步骤, 包括构建数据表和数据流, 实现流与表之间的连接等关键步骤。通过简洁而高效的Java代码, 清晰展示了如何利用Kafka Streams快速实现实时ID Mapping任务的过程。综上所述, Kafka Streams在金融领域的应用为企业提供了一种强大的实时计算解决方案, 帮助企业在精准识别客户需求方面取得显著成效, 并实现了客户的长期价值最大化
