Advertisement

Kafka 的基本介绍与适用场景

阅读量:

作者:禅与计算机程序设计艺术

1.简介

Apache Kafka 是一种分布式的发布订阅消息系统。它能够通过消费者实现大规模数据的实时传输、存储与处理功能,并具备低延迟性和高效的处理能力;其广泛应用于数据管道线和事件驱动的应用程序中。

Apache Kafka 的主要特性包括:

  1. 可扩展能力:基于分区方案和副本机制实现了系统的可扩展能力。
  2. 持久特性:复制机制确保了信息存储的持久特性。
  3. 消息有序特性:通过设置消息偏移量以及分区等方式实现了消息的有序特性。
  4. 处理能力达到每秒数万条以上:支持超高的处理能力。
  5. 数据丢失问题:采用分区方案及副本机制解决了数据丢失问题。
  6. 兼容多种编程语言库:支持多种编程语言编写的客户端库模块。

2.基本概念术语说明

2.1 分布式消息队列模型

Apache Kafka 是 Apache Hadoop 项目的分支之一。它旨在创建一种高容量且能灵活扩展的数据传输网络,并通过其独特的四层架构实现对大规模实时数据流的有效管理。该系统架构主要包括生产者用于接收和发送消息、消费者用于处理消息以及持久化和协调机制以确保数据的一致性和可用性。

  1. 发布者的角色是将信息传递至消息队列。
  2. Kafka集群中的代理节点负责接收并转发来自其他节点的消息,并根据需求划分多个数据分区以提高处理效率。
  3. 消费器的任务是从的消息队列定期获取并处理 incoming的消息内容。
  4. 主题作为信息分类管理的核心概念,在Kafka系统中相当于一个大容量的信箱,在每个主题下可创建多份分区用于高效的数据分布与管理。

由消息发布者负责将消息发送至指定的Topic中。基于Partitioning策略,Kafka能够将同一Topic内的消息分配至不同的Partitions中.消费者可以选择所需的Topic,无需关注Partitions的具体分布情况.这种架构显著提升了Kafka在水平方向上的扩展能力.

2.2 名词解释

Broker(代理节点)

Kafka集群内的服务器用于接收来自各个生产者的消息,并将它们按主题分类后发送给对应的消费者。一个Broker节点通常能够管理多个消息分区(partitions),每个分区对应特定的主题。

Topic(主题)

消息的集合,可以理解成邮箱。一个Topic下可以创建多个Partitions。

Partitions(分区)

每个Topic会被划分为多个Partition,并将这些Partition分布在多个Brokers上。每个Partition都是一个有序排列且不可变的序列,在每一个这样的Sequence中存储的消息都会被按一定的Order进行Sorting,并以偏移量(Offset)来记录每一条Message在其Sequence中的Position。

Producer(生产者)

The message posting entity submits the message to the message queue. The producer allocates messages to corresponding Partitions following a load balancing strategy.

Consumer(消费者)

消息的接收者从消息队列中读取消息并进行处理。系统通过指定Offset来消费特定Topic下的消息。

Offset(偏移量)

标记Consumer所处的位置。每个Partition都与一个偏移量相关联。偏移量用于追踪Consumer的消费进度。

Replication(复制)

一个Topic可能包含多个Replicas,每个Replica都相当于一个完整的Kafka服务器实例,在某一个Broker发生故障或其磁盘出现故障时(即该Broker无法正常运行),另一个Replica就能够顶替它继续提供服务。

Leader(领导者)

当一个Partition被划分给多个Consumer时,在系统中扮演重要角色的Leader会负责协调这些Consumer之间的同步机制,并确保每个消息都被正确地处理一次。

Follower(追随者)

当Leader发生故障时,Follower会接管该Partition,继续提供服务。

2.3 存储机制

Kafka系统的存储架构由多个Partition单元构成, 每个单元均为有序且不可变的数据序列. 具体而言, 在每一个Partition中, 消息被组织并存储在一个磁盘文件中, 其中包含了消息键值对以及相关的元数据记录. 其分区划分依据主题属性, 因此同一主题下相关的信息会整合至同一个Partition中. 但值得注意的是, 不同的主题可能会对应至不同的Partition结构上.

2.4 消息传递方式

Kafka 为了确保消息的不丢失,采用如下两种消息传递的方式:

  1. At least once delivery(至少一次交付):Producer采用逐个发送的方式将数据块传递给相应的Partition(分区)。当Leader发生故障时(失效),系统会重新选举并指定新的Leader以继续传输数据块(信息)。然而这种方法并不能保证完全没有丢失的数据块的情况发生(可能性)。例如,在新的Leader就职后(上任),若Follower不可用时(失效),则可能导致部分数据块无法正确接收和处理(接收)。
  2. Exactly once delivery(精准一次交付):Kafka系统内置了严格的事务管理机制(机制)。每一个消息都会被封装成一个完整的交易请求,并且只有当所有操作都确认完成才会被视为交易成功完成(达成)。如果在执行过程中发生故障或错误,则会立即终止该交易并进行回滚操作以恢复到之前的完整状态(恢复)。这种设计确保了所有传输的消息都能得到可靠地接收和处理。

3.核心算法原理和具体操作步骤

3.1 生产者角色(Producer)

该系统中的生产者角色主要承担生成和投递消息的任务至Kafka集群。该系统允许生产者灵活配置其发布行为,在满足一定约束条件下能够自由选择目标主题,并且该系统具备智能的负载均衡能力以实现资源的有效利用。在处理过程中,默认情况下无需等待服务器确认应答,在需要时可以通过同步操作机制来实现对事务性的支持;此外该系统还提供多种设计选项供开发者灵活配置例如可以通过自定义回调函数来实现特定业务逻辑或通过异步任务队列机制来提升吞吐量。

3.2 消费者角色(Consumer)

Kafka 的消费者角色主要负责接收并处理来自 Kafka 集群的消息。为了实现这一功能,在配置阶段需要指定特定的主题、集群 ID 和用户 ID 以确定其处理数据的方式。一旦启动后,则会借助协调器(Coordinator)定位对应的分区区间,并依据用户的偏移值从中读取数据。此外,用户还能通过新增关注数量或缩减关注范围来调节其数据读取范围。

3.3 消息路由和多播机制

在Kafka中,每个分区都是一个按顺序排列且固定不变的序列.当生产者发送一条新消息时,系统会自动将其分配到某个特定的分区.具体来说,Kafka采用两步机制完成这一过程.首先,系统会计算该消息的哈希码值,然后根据该哈希码值与分区总数进行取模运算,最终确定该消息应被路由至对应的指定分区.

该系统采用了基于消息路由的技术框架,在这种架构下每个消息仅能分配至指定分区,并采用即通常所说的"广播机制"来实现信息传播。这样的设计确保了事件处理的一致性和可追溯性。然而,在实际应用中存在一些挑战需要克服以保证系统的稳定运行和高效响应能力

3.4 存储机制

Kafka 的存储架构极具灵活性,在运行过程中允许用户根据实际需求动态配置Partition数目、副本数量以及索引类型等参数设置。基于复制机制设计的系统能够将关键数据分布在多个Broker上并进行多份备份,并非局部存储而是分散式的冗余存储方案;这种架构设计不仅有效规避了单点故障可能导致的数据丢失问题;此外该系统内置磁盘缓存模块;会将高频率访问的数据直接缓存至内存区域以减少I/O操作次数;同时支持TTL(Time-to-Live)功能;该功能可设定消息的有效生存时长;从而实现对过时信息的自动清理和优化管理

3.5 消息确认机制

生产者与消费者均可通过等待服务器的应答确认来实现通信。当选择等待服务器的应答确认时,生产者将会被阻塞直至接收到响应;若不等到服务器的应答确认,则生产者将立即发送下一条消息,并可能导致消费者错过了刚处理的那条消息。

Kafka 为生产者和消费者提供了丰富多样的 ACKs(确认机制),即为两类不同的传输模式——单播传输和幂等传输(Idempotent)。

单播(Unicast)ACK:生产者发送的消息仅限于一个特定的分区。当 leader 分区成功接收消息并完成本地日志记录后,此时生产者将能够收到确认信息。如果 leader 分区发生故障或记录操作失败,则可能导致数据丢失。该机制允许在某些高吞吐量场景下运行;然而,在对数据持久化要求较高的应用场景中,则会产生显著的性能消耗。

幂等(idempotent)ACKs机制:生产者发送的消息会被所有in-SYNC replicas副本自动转发。这些in-SYNC replicas是指当前保持正常通信状态下的备份副本。只有当in-SYNC replica接收到生产者的确认信息并完成本地日志记录后,生产者才会收到一个确认反馈。值得注意的是,在in-SYNC replica出现故障的情况下也不会影响消息的安全性,因为其他复制体会继承其功能。这种ACK机制虽然对数据持久化要求较高但相比单播ACKs具有更强的消息可靠性保障

3.6 消费者容错

Kafka系统的消费者容错机制是基于消费群体协作机制实现的。每个主题下不同分片的消息由该系统中的所有消费者共同处理。

首先任何归属于同一消费群体的消费者都注册订阅同一个主题但不一定位于同一细分区域

此外,在同一群组中的每个消费者都与同一个协调者共享。这个协调者的职责是管理整个工作流程。每当有新的用户加入该群组或原有的用户退出时,这个协调者都会向剩余的参与者发出通知。

在处理某个特定的分区内,在协调器那里就会自动分配出当前具备‘可分配’特征的一个分块。每当新用户加入到这个集群的时候,在协调器那里就会立即从所有当前具备‘可分配’特征的一组数据中选择一组并将其发送给新的连接请求。与此同时,在现有条件下,并行处理多个这样的查询请求也是完全可行的,并不会导致性能上的明显下降。

当某个分区不可用时,
coordinator 会监控消费者在日志跟踪延迟方面的状态。
如果消费者与该分区最早的消息之间相差一个消息,
coordinator 将指示消费者消费该消息。
这种LAG监控机制帮助消费者了解当前是否在该分区内有积压的消息。

当消费者完成消费任务后,系统会通知 coordinator 他们已用完该部分资源,并指示 coordinator 将剩余资源分配给其他用户。

总体而言,在Kafka系统中,在生产者和消费者之间共享数据分区,并通过分布式架构中的协调机制来保证系统的容错能力

4.代码实例和解释说明

为了更好地说明 Kafka 的工作原理,我们采用了一个基础的 Producer、Consumer 代码示例。

4.1 准备环境

首先,下载 Kafka 压缩包,并解压到任意目录。

复制代码
    $ wget https://www.apache.org/dyn/closer.lua?path=/kafka/2.7.0/kafka_2.13-2.7.0.tgz -O kafka_2.13-2.7.0.tgz
    $ tar xzf kafka_2.13-2.7.0.tgz
    
      
    
    代码解读

然后,启动 Zookeeper 服务端。

复制代码
    $ bin/zookeeper-server-start.sh config/zookeeper.properties
    
    
    代码解读

启动一个 Broker 。

复制代码
    $ bin/kafka-server-start.sh config/server.properties
    
    
    代码解读

创建一个测试用的 Topic 。

复制代码
    $ bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
    
    
    代码解读

4.2 编写 Producer 代码

编写 Producer 代码非常简单。以下是 Java 版本的代码:

复制代码
    public class SimpleProducer {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
    
        Producer<String, String> producer = new KafkaProducer<>(properties);
    
        for (int i = 0; i < 10; i++) {
            System.out.println("Producing message : " + i);
            ProducerRecord<String, String> record =
                    new ProducerRecord<>("test", Integer.toString(i),
                            "Hello World" + Integer.toString(i));
    
            RecordMetadata metadata = producer.send(record).get();
    
            System.out.println(metadata.topic());
            System.out.println(metadata.partition());
            System.out.println(metadata.offset());
        }
    
        producer.flush();
        producer.close();
    }
    }
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

4.3 编写 Consumer 代码

编写 Consumer 代码也非常简单。以下是 Java 版本的代码:

复制代码
    public class SimpleConsumer {
    
    public static void main(String[] args) {
        // Create a Kafka consumer for topic `test` with group id `my-group`.
        KafkaConsumer<String, String> consumer =
                new KafkaConsumer<>(consumerProperties());
    
        // Subscribe the consumer to the topic `test`.
        consumer.subscribe(Collections.singletonList("test"));
    
        while (true) {
            // Poll records from Kafka until some data is available or timeout reached.
            final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
            if (!records.isEmpty()) {
                System.out.printf("%d records received.\n", records.count());
    
                // Process each record.
                for (final ConsumerRecord<String, String> record : records) {
                    System.out.printf("Received record: %s\n", record);
    
                    // Commit offset so that next time we will start from where we left off last time.
                    try {
                        consumer.commitAsync();
                    } catch (CommitFailedException e) {
                        log.error("Unable to commit offset for partition {}.",
                                e.partition(), e);
                    }
                }
            } else {
                // There are no more records in this poll() operation, so let's wait and see if there is any change later on.
                Thread.sleep(100);
            }
        } finally {
            // Close down the Kafka consumer gracefully.
            consumer.close();
        }
    }
    
    private static Map<String, Object> consumerProperties() {
        return Collections.<String, Object>singletonMap(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
               .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
               .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
               .put(ConsumerConfig.GROUP_ID_CONFIG, "my-group")
               .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }
    }
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    
    代码解读

4.4 测试运行结果

首先,启动 Kafka Server:

复制代码
    $ bin/kafka-server-start.sh config/server.properties
    
    
    代码解读

然后,运行 Producer 程序:

复制代码
    $ java -cp target/simple-kafka-client-example-0.1.jar com.github.charlemaznable.examples.SimpleProducer
    Producing message : 0
    Producing message : 1
    Producing message : 2
    Producing message : 3
    Producing message : 4
    Producing message : 5
    Producing message : 6
    Producing message : 7
    Producing message : 8
    Producing message : 9
    
      
      
      
      
      
      
      
      
      
      
    
    代码解读

接着,运行 Consumer 程序:

复制代码
    $ java -cp target/simple-kafka-client-example-0.1.jar com.github.charlemaznable.examples.SimpleConsumer
    Received record: ConsumerRecord(topic=test, partition=0, leaderEpoch=null, offset=0, CreateTime=1625950445998, serializedKeySize=-1, serializedValueSize=12, key=null, value=Hello World0)
    Received record: ConsumerRecord(topic=test, partition=0, leaderEpoch=null, offset=1, CreateTime=1625950445999, serializedKeySize=-1, serializedValueSize=12, key=null, value=Hello World1)
    Received record: ConsumerRecord(topic=test, partition=0, leaderEpoch=null, offset=2, CreateTime=1625950446000, serializedKeySize=-1, serializedValueSize=12, key=null, value=Hello World2)
    ......
    
      
      
      
      
    
    代码解读

可以看到,Consumer 从 Kafka Topic 中消费到了 Producer 生产的数据。

全部评论 (0)

还没有任何评论哟~