Kafka客户端(2.1.0)
Kafka的客户端程序
-
准备工作
-
生产者
-
-
普通生产者
-
带回调的生产者
-
自定义分区策略
-
- 实现Partitioner 接口
- 在配置项中指定Partitioner类
-
同步生产者
-
-
消费者
-
-
简单消费者
-
重置offset
-
- 设置offset重置策略为earliest
- 设置offset重置策略为latest
- 设置offset重置策略为none
-
手动提交offset
-
准备工作
要使用kafka的客户端,首先要引入Kafka客户端的依赖,下面是2.1.0版本的依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
Kafka的生产者,需要一个properties对象用以指定broker-list以及对应的key-value的序列号类
//创建配置对象
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
生产者
先要创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
别忘了把生产者关闭
producer.close();
因为生产者是异步发送的,因此在生产者关闭之前最好等待一段时间,避免遗漏消息
Thread.sleep(1000);
普通生产者
普通生产者最简单,直接发送消息即可
完整例子如下
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
long b0 = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.format("k%08d", i), String.format("v%08d", i));
producer.send(record);
}
long b1 = System.currentTimeMillis();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(b1 - b0);
producer.close();
经验证,当前配置下,异步发送一万条消息,300毫秒不到就发完了(因需延迟1秒关闭生产者)
带回调的生产者
普通生产者能单纯的发送,但如果发送失败了,主线程是无法感知到的
那么,我们就可以通过带回调的生产者来处理消息的发送状态、统计各消息分配的分区等待
下面是我的一个实例
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
final AtomicInteger successCount = new AtomicInteger(0);
final AtomicInteger failureCount = new AtomicInteger(0);
final Map<String, Integer> partitionMaps = Collections.synchronizedMap(new HashMap<String, Integer>());
long b0 = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
final ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.format("k%08d", i), String.format("v%08d", i));
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
failureCount.incrementAndGet();
} else {
successCount.incrementAndGet();
partitionMaps.put(record.key(), recordMetadata.partition());
}
}
});
}
long b1 = System.currentTimeMillis();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Map<Integer, Integer> partitionCountMap = new HashMap<Integer, Integer>();
for (Map.Entry<String, Integer> entry : partitionMaps.entrySet()) {
String key = entry.getKey();
int partition = entry.getValue();
Integer old = partitionCountMap.get(partition);
if (old != null) {
partitionCountMap.put(partition, old + 1);
} else {
partitionCountMap.put(partition, 1);
}
}
System.out.println("Time :" + (b1 - b0));
System.out.println("Failure:" + failureCount.get());
System.out.println("Success:" + successCount.get());
System.out.println();
for (Map.Entry<Integer, Integer> entry : partitionCountMap.entrySet()) {
int partition = entry.getKey();
int count = entry.getValue();
System.out.println("partition " + partition + " count:" + count);
}
producer.close();
结果输出:
Time :327
Failure:0
Success:10000partition 0 count:3299
partition 1 count:3372
partition 2 count:3329
可以看到,消息都发送成功了,异步发送还是很快的,1秒中的等待也足够让所有的消息都发送成功并统计其分配的分区
自定义分区策略
我们可以在生产消息的时候指定分区
为了指定分区,我们只需要做两步操作即可:
实现Partitioner 接口
public class MyPartitioner implements Partitioner {
/** *
* @param topic 消息的主题
* @param key 消息的key对象
* @param keyBytes 消息的key对象通过Serializer序列号后的byte数组
* @param value 消息的value对象
* @param valueBytes 消息的value对象通过Serializer序列号后的byte数组
* @param cluster 集群信息,可以获取集群的节点数等信息,更多信息可以参考DefaultPartitioner的实现
* @return 此消息分配的分区
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//具体的分区策略,这里把所有的消息都扔到0分区去
return 0;
}
public void close() {
System.out.println("MyPartitioner close");
}
public void configure(Map<String, ?> map) {
System.out.println("MyPartitioner configure");
}
}
在配置项中指定Partitioner类
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "your.package.MyPartitioner");
在刚刚带回调的生产者中把自定义分区策略配置上去,输出结果如下:
MyPartitioner configure
Time :315
Failure:0
Success:10000partition 0 count:10000
MyPartitioner close
可以看到,所有的消息都发送到0分区去了
同步生产者
之前所有的生产者都是异步生产者,异步生产者效率高,但却不能马上获取到发送的结果
Kafka的Producer.send方法返回了一个Future对象,只要在Future对象上等待,即可实现同步发送的效果
因为是同步发送,因此在发送完成后,无需等待生产者发送完消息,可以直接关闭
代码如下
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
long b0 = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
ProducerRecord<String, String> record = new ProducerRecord<String, String>("test", String.format("k%08d", i), String.format("v%08d", i));
Future<RecordMetadata> future = producer.send(record);
RecordMetadata result = future.get();
result.partition();
//可以获取到发送的结果,包括分区等信息
}
producer.close();
long b1 = System.currentTimeMillis();
//无需等待,直接关闭
System.out.println("Time: " + (b1 - b0));
运行输出如下:
Time: 18156
可以看到,同步发送比异步发送还是慢了很多的,算上异步发送等待的1秒中,跟同步发送还是有数量级的优势
消费者
和生产者一样,Kafka的消费者也要有一个properties文件用以指定相应的broker-list和反序列化类,以及消费者独有的 消费者组
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-consumer");
简单消费者
创建properties对象后,还要指定监听的topic
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("test"));
完整的消费者代码如下
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.115:9092,192.168.1.116:9092,192.168.1.117:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-consumer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("test"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord record : records) {
System.out.println(record.key() + "-->" + record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.close();
启动一个普通生产者,观察程序输出,在控制台能看到消费者消费数据,数据总量是对的,然而顺序和生产顺序不一样,这个问题会在别的地方讨论
重置offset
我们知道,Kafka在0.9版本后将消费者的offset保存到了一个特定的topic中,那么当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如因为该数据已被删除或是心消费者组),该怎么办呢?
这是由一个配置项来控制的:auto.offset.reset ,它有三个可选项
- earliest 将偏移量自动重置为最古老的偏移量,能消费当前topic中保存的所有数据
- latest 自动将偏移量重置为最新偏移量,消费者组启动前的数据都不会消费,这也是Kafka的默认值
- none 如果未找到消费者组的先前偏移量,则向消费者抛出异常(org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions)
- 如果是其它值,则会在消费者创建时就抛出异常(org.apache.kafka.common.config.ConfigException: Invalid value nonex for configuration auto.offset.reset)
设置offset重置策略为earliest
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
启动消费者,可以看到消费者启动前发往Kafka的消息也被消费了
设置offset重置策略为latest
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
启动消费者,可以看到消费者启动前发往Kafka的消息没有被消费,新消息才能被消费
设置offset重置策略为none
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
此时,无法启动一个新的消费者组,因为Kafka中没有此消费者组的offset信息,因此会抛出
org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [test-1, test-0, test-2]
手动提交offset
默认情况下,Kafka的offset是自动提交的,每隔AUTO_COMMIT_INTERVAL_MS_CONFIG 指定的毫秒数就提交一次offset
Kafka也支持手动提交offset
只需要将ENABLE_AUTO_COMMIT_CONFIG设置为false,Consumer就不会在后台自动提交offset,此时就需要手动提交offset
若不提交offset,则此消费者下次启动时,还会从最后提交的offset开始提取数据
手动提交offset有两种方式:
- 同步提交,consumer.commitSync(),会等待broker更新offset,有重试机制
- 异步提交,consumer.commitAsync(),异步更新offset,不等待broker返回,无重试机制
Q:为什么异步提交没有重试机制?
A:因为消费者不会等待,若异步提交也有重试,可能offset=10的提交失败了,offset=20提交成功了,offset=10重试提交成功,那就会造成消息重复
Q:那异步提交有没有办法知道提交失败了?
A:有一个void commitAsync(OffsetCommitCallback callback) ,可以通过回调来感知offset提交成功还是失败
