Advertisement

Kafka客户端(2.1.0)

阅读量:

Kafka的客户端程序

  • 准备工作

  • 生产者

    • 普通生产者

    • 带回调的生产者

    • 自定义分区策略

      • 实现Partitioner 接口
      • 在配置项中指定Partitioner类
    • 同步生产者

  • 消费者

    • 简单消费者

    • 重置offset

      • 设置offset重置策略为earliest
      • 设置offset重置策略为latest
      • 设置offset重置策略为none
    • 手动提交offset

准备工作

要使用kafka的客户端,首先要引入Kafka客户端的依赖,下面是2.1.0版本的依赖

复制代码
    <dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka-clients</artifactId>
    	<version>2.1.0</version>
    </dependency>
    
    
      
      
      
      
      
    

Kafka的生产者,需要一个properties对象用以指定broker-list以及对应的key-value的序列号类

复制代码
    //创建配置对象
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
    
      
      
      
      
      
    

生产者

先要创建生产者对象

复制代码
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
    
      
    

别忘了把生产者关闭

复制代码
    producer.close();
    
    
      
    

因为生产者是异步发送的,因此在生产者关闭之前最好等待一段时间,避免遗漏消息

复制代码
    Thread.sleep(1000);
    
    
      
    

普通生产者

普通生产者最简单,直接发送消息即可
完整例子如下

复制代码
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
    long b0 = System.currentTimeMillis();
    for (int i = 0; i < 10000; i++) {
    	ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.format("k%08d", i), String.format("v%08d", i));
    	producer.send(record);
    }
    long b1 = System.currentTimeMillis();
    try {
    	Thread.sleep(1000);
    } catch (InterruptedException e) {
    	e.printStackTrace();
    }
    System.out.println(b1 - b0);
    producer.close();
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

经验证,当前配置下,异步发送一万条消息,300毫秒不到就发完了(因需延迟1秒关闭生产者)

带回调的生产者

普通生产者能单纯的发送,但如果发送失败了,主线程是无法感知到的
那么,我们就可以通过带回调的生产者来处理消息的发送状态、统计各消息分配的分区等待

下面是我的一个实例

复制代码
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
    final AtomicInteger successCount = new AtomicInteger(0);
    final AtomicInteger failureCount = new AtomicInteger(0);
    final Map<String, Integer> partitionMaps = Collections.synchronizedMap(new HashMap<String, Integer>());
    long b0 = System.currentTimeMillis();
    for (int i = 0; i < 10000; i++) {
    	final ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.format("k%08d", i), String.format("v%08d", i));
    	producer.send(record, new Callback() {
    		public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    			if (e != null) {
    				failureCount.incrementAndGet();
    			} else {
    				successCount.incrementAndGet();
    				partitionMaps.put(record.key(), recordMetadata.partition());
    			}
    		}
    	});
    }
    long b1 = System.currentTimeMillis();
    try {
    	Thread.sleep(1000);
    } catch (InterruptedException e) {
    	e.printStackTrace();
    }
    
    Map<Integer, Integer> partitionCountMap = new HashMap<Integer, Integer>();
    for (Map.Entry<String, Integer> entry : partitionMaps.entrySet()) {
    	String key = entry.getKey();
    	int partition = entry.getValue();
    	Integer old = partitionCountMap.get(partition);
    	if (old != null) {
    		partitionCountMap.put(partition, old + 1);
    	} else {
    		partitionCountMap.put(partition, 1);
    	}
    }
    
    System.out.println("Time   :" + (b1 - b0));
    System.out.println("Failure:" + failureCount.get());
    System.out.println("Success:" + successCount.get());
    System.out.println();
    for (Map.Entry<Integer, Integer> entry : partitionCountMap.entrySet()) {
    	int partition = entry.getKey();
    	int count = entry.getValue();
    	System.out.println("partition " + partition + " count:" + count);
    }
    producer.close();
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

结果输出:

Time :327
Failure:0
Success:10000

partition 0 count:3299
partition 1 count:3372
partition 2 count:3329

可以看到,消息都发送成功了,异步发送还是很快的,1秒中的等待也足够让所有的消息都发送成功并统计其分配的分区

自定义分区策略

我们可以在生产消息的时候指定分区
为了指定分区,我们只需要做两步操作即可:

实现Partitioner 接口

复制代码
    public class MyPartitioner implements Partitioner {
    /** *
     * @param topic 消息的主题
     * @param key 消息的key对象
     * @param keyBytes 消息的key对象通过Serializer序列号后的byte数组
     * @param value 消息的value对象
     * @param valueBytes 消息的value对象通过Serializer序列号后的byte数组
     * @param cluster 集群信息,可以获取集群的节点数等信息,更多信息可以参考DefaultPartitioner的实现
     * @return 此消息分配的分区
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //具体的分区策略,这里把所有的消息都扔到0分区去
        return 0;
    }
    
    public void close() {
        System.out.println("MyPartitioner close");
    }
    
    public void configure(Map<String, ?> map) {
        System.out.println("MyPartitioner configure");
    }
    }
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

在配置项中指定Partitioner类

复制代码
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "your.package.MyPartitioner");
    
    
      
    

在刚刚带回调的生产者中把自定义分区策略配置上去,输出结果如下:

MyPartitioner configure
Time :315
Failure:0
Success:10000

partition 0 count:10000
MyPartitioner close

可以看到,所有的消息都发送到0分区去了

同步生产者

之前所有的生产者都是异步生产者,异步生产者效率高,但却不能马上获取到发送的结果
Kafka的Producer.send方法返回了一个Future对象,只要在Future对象上等待,即可实现同步发送的效果
因为是同步发送,因此在发送完成后,无需等待生产者发送完消息,可以直接关闭
代码如下

复制代码
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
    long b0 = System.currentTimeMillis();
    for (int i = 0; i < 10000; i++) {
    	ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.format("k%08d", i), String.format("v%08d", i));
    	Future<RecordMetadata> future = producer.send(record);
    	RecordMetadata result = future.get();
    	result.partition();
    	//可以获取到发送的结果,包括分区等信息
    }
    producer.close();
    long b1 = System.currentTimeMillis();
    //无需等待,直接关闭
    System.out.println("Time: " + (b1 - b0));
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

运行输出如下:

Time: 18156

可以看到,同步发送比异步发送还是慢了很多的,算上异步发送等待的1秒中,跟同步发送还是有数量级的优势

消费者

和生产者一样,Kafka的消费者也要有一个properties文件用以指定相应的broker-list和反序列化类,以及消费者独有的 消费者组

复制代码
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-consumer");
    
    
      
      
      
      
    

简单消费者

创建properties对象后,还要指定监听的topic

复制代码
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Collections.singletonList("test"));
    
    
      
      
    

完整的消费者代码如下

复制代码
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-consumer");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Collections.singletonList("test"));
    try {
    	while (true) {
    		ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
    		for (ConsumerRecord record : records) {
    			System.out.println(record.key() + "-->" + record.value());
    		}
    	}
    } catch (Exception e) {
    	e.printStackTrace();
    }
    consumer.close();
    
    
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
      
    

启动一个普通生产者,观察程序输出,在控制台能看到消费者消费数据,数据总量是对的,然而顺序和生产顺序不一样,这个问题会在别的地方讨论

重置offset

我们知道,Kafka在0.9版本后将消费者的offset保存到了一个特定的topic中,那么当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如因为该数据已被删除或是心消费者组),该怎么办呢?
这是由一个配置项来控制的:auto.offset.reset ,它有三个可选项

  • earliest 将偏移量自动重置为最古老的偏移量,能消费当前topic中保存的所有数据
  • latest 自动将偏移量重置为最新偏移量,消费者组启动前的数据都不会消费,这也是Kafka的默认值
  • none 如果未找到消费者组的先前偏移量,则向消费者抛出异常(org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions)
  • 如果是其它值,则会在消费者创建时就抛出异常(org.apache.kafka.common.config.ConfigException: Invalid value nonex for configuration auto.offset.reset)

设置offset重置策略为earliest

复制代码
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    
    
      
    

启动消费者,可以看到消费者启动前发往Kafka的消息也被消费了

设置offset重置策略为latest

复制代码
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    
    
      
    

启动消费者,可以看到消费者启动前发往Kafka的消息没有被消费,新消息才能被消费

设置offset重置策略为none

复制代码
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
    
    
      
    

此时,无法启动一个新的消费者组,因为Kafka中没有此消费者组的offset信息,因此会抛出

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test-1, test-0, test-2]

手动提交offset

默认情况下,Kafka的offset是自动提交的,每隔AUTO_COMMIT_INTERVAL_MS_CONFIG 指定的毫秒数就提交一次offset
Kafka也支持手动提交offset
只需要将ENABLE_AUTO_COMMIT_CONFIG设置为false,Consumer就不会在后台自动提交offset,此时就需要手动提交offset

若不提交offset,则此消费者下次启动时,还会从最后提交的offset开始提取数据
手动提交offset有两种方式:

  • 同步提交,consumer.commitSync(),会等待broker更新offset,有重试机制
  • 异步提交,consumer.commitAsync(),异步更新offset,不等待broker返回,无重试机制

Q:为什么异步提交没有重试机制?
A:因为消费者不会等待,若异步提交也有重试,可能offset=10的提交失败了,offset=20提交成功了,offset=10重试提交成功,那就会造成消息重复

Q:那异步提交有没有办法知道提交失败了?
A:有一个void commitAsync(OffsetCommitCallback callback) ,可以通过回调来感知offset提交成功还是失败

全部评论 (0)

还没有任何评论哟~