Advertisement

第二章 kafka进阶笔记

阅读量:

一、第一个Kafka程序

******1、**创建我们的主题

生成主题命令如下:kafka-topics.bat --zookeeper localhost:2181/kafka --启动主题 --topic hello-kafka --设置副本数为1 --partitions 4

******2、**生产者发送消息

我们目前采用Kafka内置的客户端API来构建kafka应用程序。作为Java程序员,在这种情况下我们将Maven作为工具,并且选择最新的版本来实现项目。

生产者代码示例如下:

复制代码
 public class HelloKafkaProducer {

    
     public static void main(String[] args) {
    
         //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
    
         Properties properties = new Properties();
    
         properties.put("bootstrap.servers","127.0.0.1:9092");
    
         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    
         try {
    
             ProducerRecord<String,String> record;
    
             try {
    
                 //TODO 发送4条消息
    
                 for(int i=0;i<4;i++){
    
                     record = new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC, String.valueOf(i),"lison");
    
                     producer.send(record);
    
                     System.out.println(i+",message is sent");
    
                 }
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         } finally {
    
             producer.close();
    
         }
    
     }
    
 }

必选属性**:******

创建生产者对象时有三个属性必须指定。

bootstrap.servers****:****

该属性配置了Broker地址列表,其中每个地址遵循host:port格式.清单中未包含所有Broker的详细信息;相反地,生产者将从指定的Broker中检索其他Broker所需的信息.

至少需要提供两个broker的信息(以逗号分隔列出具体信息,例如:127.0.0.1:9092, 192.168.0.13:9092),若其中一个出现故障,则生产者仍可通过集群正常工作

key.serializer****:****

生产者接口支持参数化的数据类型,在将Java对象用作键值对传递给Broker时,请注意Broker接收的消息中键值都是字节数组。然而,在使用Kafka Broker时,请确保已配置了能够将Java对象序列化为字节数组的serializer字段。key.serializer字段必须设置为满足org.apache.kafka.commonerialization.Serializer接口的要求。系统默认提供了ByteArraySerializer、IntegerSerializer、StringSerializer三种标准序列化器,并且支持自定义序列化器以满足特定需求

value.serializer****:****

同key.serializer,参见代码模块kafka-no-spring下包hellokafka中

properties.set((String)(("key ser ilizer"), (("org.apache.kafka.commonserialization.StringSerializer"))));
properties.set((String)(("value ser ilizer"), (("org.apache.kafka.commonserialization.StringSerializer"))));

******3、**消费者接受消息

消费者代码示例如下(Kafka 只提供拉取的方式)

复制代码
 public class HelloKafkaConsumer {

    
     public static void main(String[] args) {
    
         //TODO 消费者三个属性必须指定(broker地址清单、key和value的反序列化器)
    
         Properties properties = new Properties();
    
         properties.put("bootstrap.servers","127.0.0.1:9092");
    
         properties.put("key.deserializer", StringDeserializer.class);
    
         properties.put("value.deserializer", StringDeserializer.class);
    
         //TODO 群组并非完全必须
    
         properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
    
         KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    
         try {
    
             //TODO 消费者订阅主题(可以多个)
    
             consumer.subscribe(Collections.singletonList(BusiConst.HELLO_TOPIC));
    
             while(true){
    
                 //TODO 拉取(新版本)
    
                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
    
                 for(ConsumerRecord<String, String> record:records){
    
                     System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s",record.topic(),record.partition(),
    
                             record.offset(),record.key(),record.value()));
    
                     //do my work
    
                     //打包任务投入线程池
    
                 }
    
             }
    
         } finally {
    
             consumer.close();
    
         }
    
     }
    
 }

必选参数**:******

bootstrap.servers、key.serializer、value.serializer含义同生产者

group.id****:****

并非绝对必要的一点是它明确指定了消费者的归属群组然而在无需为任何特定群组创建用户的前提下即使未将消费者分配至任何特定群组也不会出现问题

参考代码中,在kafka-no-spring模块下的hellokafka包中新增功能亮点:优化了这一设计流程,在该版本中使得整个超时时间计算中包含元数据获取环节,并且这一改进使得整体处理更加合理。

复制代码
 public static void main(String[] args) {

    
     /*消费配置的实例*/
    
  Properties properties = KafkaConst.consumerConfig("groupC",StringDeserializer.class, StringDeserializer.class);
    
     /*消息消费者*/
    
     consumer = new KafkaConsumer<String, String>(properties);
    
     try {
    
         consumer.subscribe(Collections.singletonList(BusiConst.CONSUMER_GROUP_TOPIC));
    
         consumer.poll(0);
    
         while(true){
    
             ConsumerRecords<String, String> records = consumer.poll(500);//5秒  50000
    
             for(ConsumerRecord<String, String> record:records){
    
                 System.out.println(String.format( "主题:%s,分区:%d,偏移量:%d,key:%s,value:%s",
    
                         record.topic(),record.partition(),record.offset(), record.key(),record.value()));
    
                 //do our work
    
             }
    
         }
    
     } finally {
    
         consumer.close();
    
     }
    
 }

******4、**演示示例

1)默认创建主题,只有一个分区时,演示生产者和消费者情况。

2)修改主题分区为2(使用管理命令),再重新演示生产者和消费者情况。

******二、**Kafka的生产者

******1、**生产者发送消息的基本流程

为建立一个ProducerRecord对象作为起点,在该对象中应包含目标主题以及将要传输的内容信息。此外,在向系统发送ProducerRecord对象时,则需要预先将键和对应的值对象进行序列化处理,并将其转换为字节数组的形式以实现数据在网络中的传输需求。

随后数据将传递给分割器。如果在ProducerRecord对象中设置了特定的区域,则分割器将不再执行任何操作并返回该区域。如果没有设置特定区域,则分割器将根据ProducerRecord对象中的键值来确定目标区域。确定目标区域后, 生产者就能够知道如何向对应的区域和地区发布这一条目了。随后, 这一条目会被加入一个批处理队列(双端队列, 以尾部追加的方式)中。这个批处理队列中的所有条目将会被发送到同一个主题和地区内。系统将通过独立线程来处理这些批处理队列并将其转发至相应的中间件节点。

当服务器接收到这些消息时会返回一个响应。如果消息成功地被写入到Kafka,则生成并返回一个RecordMetaData对象;该对象记录了主题以及对应的分区信息,并包含记录在其分区中的偏移量数据。若写入出现失败,则会返回相应的错误信息;接收到来自生产者的错误信息后将尝试重新发送该消息多次,在经过几次尝试后仍未成功的情况下,则最终将返回错误信息

生产者发送消息一般会发生两类错误:

一类是可修复的错误,例如连接错误可以通过重新尝试建立连接来解决;而无主情况(即没有领导者的情况),可以通过将该区域重新分组并选举新的首领来处理。

存在无法通过重试解决的情况**,** 例如当消息超出传输容量限制时,如"消息太大"异常,具体信息可参考message.max.bytes字段,此类消息将不会发起任何重试请求并直接触发异常处理流程.

2、使用Kafka生产者

采用生成者提供的send方法进行消息发送。send方法将返回一个Future对象,并包含RecordMetadata的信息。其内容包括目标主题、分区信息以及消息偏移量。

******2.1、**发送并忘记

不考虑send方法返回的值。
通常情况下消息能够成功送达。
当遇到问题时生产者会自动进行重试。
偶尔会有个别消息未能成功送达。

******2.2、**同步发送

通过调用send方法可以获得一个Future对象,并在适当的时候调用该对象的get方法以获取结果。参考代码,在kafka-no-spring模块下的sendtype包中有相关实现。

复制代码
 public class KafkaFutureProducer {

    
     private static KafkaProducer<String,String> producer = null;
    
     public static void main(String[] args) {
    
         // 消息生产者
    
         producer = new KafkaProducer<String, String>(KafkaConst.producerConfig(StringSerializer.class, StringSerializer.class));
    
         try {/*待发送的消息实例*/
    
             ProducerRecord<String,String> record;
    
             try {
    
                 record =  new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC,"test msg","chj");
    
                 Future<RecordMetadata> future = producer.send(record);
    
                 System.out.println("do other sth");
    
                 RecordMetadata recordMetadata = future.get();
    
                 if(null!=recordMetadata){
    
                     System.out.println("offset:"+recordMetadata.offset()+"-" +"partition:"+recordMetadata.partition());
    
                 }
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         } finally {
    
             producer.close();
    
         }
    
     }
    
 }

******2.3、**异步发送

为该组织架构中的组织者提供Callback接口的解决方案;随后将该类的实例传递至 send 方法;参考相关代码示例,请查看位于 kafka-no-spring 模块下的 sendtype 包中的相关内容

复制代码
 public class KafkaAsynProducer {

    
     private static KafkaProducer<String,String> producer = null;
    
     public static void main(String[] args) {
    
         // 消息生产者
    
         producer = new KafkaProducer<String, String>(KafkaConst.producerConfig(StringSerializer.class,StringSerializer.class));
    
         /*待发送的消息实例*/
    
         ProducerRecord<String,String> record;
    
         try {
    
             record = new ProducerRecord<String,String>(BusiConst.HELLO_TOPIC,"mesage02","hankin");
    
             producer.send(record, new Callback() {
    
                 public void onCompletion(RecordMetadata metadata,Exception exception) {
    
                     if(null!=exception){
    
                         exception.printStackTrace();
    
                     }
    
                     if(null!=metadata){
    
                         System.out.println("offset:"+metadata.offset()+"-" +"partition:"+metadata.partition());
    
                     }
    
                 }
    
             });
    
         } finally {
    
             producer.close();
    
         }
    
     }
    
 }

******2.4、**多线程下的生产者

该类实现了线程安全机制,并因此可以在多个线程环境中正确地利用KafkaProducer实例。为如何有效地节省资源呢?参考代码如下:在kafka-no-spring模块下的concurrent包中:

多线程下使用生产者:

复制代码
 public class KafkaConProducer {

    
     //发送消息的个数
    
     private static final int MSG_SIZE = 1000;
    
     //负责发送消息的线程池
    
     private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
     private static CountDownLatch countDownLatch = new CountDownLatch(MSG_SIZE);
    
     private static DemoUser makeUser(int id){
    
         DemoUser demoUser = new DemoUser(id);
    
         String userName = "xiangxue_"+id;
    
         demoUser.setName(userName);
    
         return demoUser;
    
     }
    
     /** * 发送消息的任务
    
      */
    
     private static class ProduceWorker implements Runnable{
    
         private ProducerRecord<String,String> record;
    
         private KafkaProducer<String,String> producer;
    
         public ProduceWorker(ProducerRecord<String, String> record,KafkaProducer<String, String> producer) {
    
             this.record = record;
    
             this.producer = producer;
    
         }
    
         public void run() {
    
             final String id = Thread.currentThread().getId() +"-"+System.identityHashCode(producer);
    
             try {
    
                 producer.send(record, new Callback() {
    
                     public void onCompletion(RecordMetadata metadata,Exception exception) {
    
                         if(null!=exception){
    
                             exception.printStackTrace();
    
                         }
    
                         if(null!=metadata){
    
                             System.out.println(id+"|" +String.format("偏移量:%s,分区:%s",
    
                                     metadata.offset(),metadata.partition()));
    
                         }
    
                     }
    
                 });
    
                 System.out.println(id+":数据["+record+"]已发送。");
    
                 countDownLatch.countDown();
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         }
    
     }
    
     public static void main(String[] args) {
    
         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(
    
                 KafkaConst.producerConfig(StringSerializer.class, StringSerializer.class));
    
         try { //循环发送,通过线程池的方式
    
             for(int i=0;i<MSG_SIZE;i++){
    
                 DemoUser demoUser = makeUser(i);
    
                 ProducerRecord<String,String> record = new ProducerRecord<String,String>(
    
 BusiConst.CONCURRENT_USER_INFO_TOPIC,null,
    
 System.currentTimeMillis(),demoUser.getId()+"", demoUser.toString());
    
                 executorService.submit(new ProduceWorker(record,producer));
    
             }
    
             countDownLatch.await();
    
         } catch (Exception e) {
    
             e.printStackTrace();
    
         } finally {
    
             producer.close();
    
             executorService.shutdown();
    
         }
    
     }
    
 }

******3、**更多发送配置

生產者具備多種可配置的屬性, 大都數配备有合乎常理的預設值, 然後无需調整或修改即可正常運作。
某些參數可能会影响記憶體使用情況、运算性能以及系统的可靠性。
建议參閱org.apache.kafka.clients.producer包下的 ProducerConfig class相關 resource 或文档。
具體代码可見於模块 kafka-no-spring 下包 ProducerConfig 中 ConfigKafkaProducer class 的位置

复制代码
 public class ConfigKafkaProducer {

    
     public static void main(String[] args) {
    
         //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
    
         Properties properties = new Properties();
    
         properties.put("bootstrap.servers","127.0.0.1:9092");
    
         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
         //TODO 更多发送配置(重要的)
    
         properties.put("acks","1"); //ack 0,1,all
    
         // 一个批次可以使用的内存大小 缺省16384(16k)
    
         properties.put("batch.size",16384);
    
         // 指定了生产者在发送批次前等待更多消息加入批次的时间,  缺省0  50ms
    
         properties.put("linger.ms",0L);
    
         // 控制生产者发送请求最大大小,默认1M (这个参数和Kafka主机的message.max.bytes 参数有关系)
    
         properties.put("max.request.size",1 * 1024 * 1024);
    
         //TODO 更多发送配置(非重要的)
    
         properties.put("buffer.memory",32 * 1024 * 1024L);//生产者内存缓冲区大小
    
         properties.put("retries",0); //重发消息次数
    
         //客户端将等待请求的响应的最大时间 默认30秒
    
         properties.put("request.timeout.ms",30 * 1000);
    
         //最大阻塞时间,超过则抛出异常 缺省60000ms
    
         properties.put("max.block.ms",60*1000);
    
         // 于压缩数据的压缩类型。默认是无压缩 ,none、gzip、snappy
    
         properties.put("compression.type","none");
    
         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    
         try {
    
             ProducerRecord<String,String> record;
    
             try {
    
                 //TODO 发送4条消息
    
                 for(int i=0;i<4;i++){
    
                     record = new ProducerRecord<String,String>(
    
                             BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
    
                     producer.send(record);
    
                     System.out.println(i+",message is sent");
    
                 }
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         } finally {
    
             producer.close();
    
         }
    
     }
    
 }

OrderKafkaProducer:

复制代码
 public class OrderKafkaProducer {

    
     public static void main(String[] args) {
    
         //TODO 生产者三个属性必须指定(broker地址清单、key和value的序列化器)
    
         Properties properties = new Properties();
    
         properties.put("bootstrap.servers","127.0.0.1:9092");
    
         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
         //TODO 顺序消息的保证(只有一个分区、)
    
         //properties.put("retries",0); //重发消息次数(设置为0)
    
         //在阻塞之前,客户端将在单个连接上发送的未确认请求的最大数目
    
         //max.in.flight.request.per.connection 设为1,这样在生产者尝试发送第一批消息时,
    
  
    
 就不会有其他的消息发送给broker
    
         //这个值默认是5
    
         properties.put("max.in.flight.requests.per.connection",1);
    
         KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
    
         try {
    
             ProducerRecord<String,String> record;
    
             try {
    
                 //TODO发送4条消息
    
                 for(int i=0;i<4;i++){
    
                     record = new ProducerRecord<String,String>(
    
                             BusiConst.HELLO_TOPIC, String.valueOf(i),"hankin");
    
                     producer.send(record);
    
                     System.out.println(i+",message is sent");
    
                 }
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         } finally {
    
             producer.close();
    
         }
    
     }
    
 }

******1)**acks

Kafk内部的复制机制相当复杂,在后续章节中将对这一部分进行详细讲解)。我们专注于讨论生产者发送消息时与副本的关系)。指定必须要有多少个分区副本收到消息后,生产者才会确认写入成功)。这一参数起到关键作用,在一定程度上影响了数据丢失的可能性)。

acks=0:当生产者在写入消息之前没有等待任何来自服务器的反馈时,可能会导致消息丢失,并且其吞吐量较高。

acks参数设为1时的集群机制如下:若集群的首领节点接收到该消息,则生产者将接收到服务器的正常响应;若因首领节点故障或新首领未被选举而未能接收到该信息,则生产者将收到来自服务器的异常响应以指示信息丢失;为了避免数据丢失问题,在此情况下生产者仍会重发该信息;然而,在某些特殊情况下(如未成功接收该信息而意外获得主节点权限),相关数据可能会丢失,默认应采用此配置设置。

acks=all:仅在所有复制参与节点均接收到消息时, 生产者将仅接收到来自服务器的一个成功响应. 运行效率较低.

在金融业务领域中采用主从架构,并结合异地备份策略。这使得许多高可用性场景并非仅依赖于两个复制本即可实现目标;反而可能通过在多个机架以及异地节点各自部署一组备份实例的方式来增强系统的容错能力。其中,“buffer.memory”参数用于指定生产者端的消息临时缓存容量(基于生产者的消息发送机制)。当数据产生速率超过每秒 thousands of messages的速度时;系统可能会出现资源耗尽导致 producer出现阻塞或异常退出的情况。缺省值设定为 33554432 (即 32M字节)

******2)**max.block.ms

明确要求在被调用send()方法或采用partitionsFor()方法来获取元数据的信息期间设定生产的阻塞时间。一旦生产者的发送缓冲区达到上限或缺乏必要的元数据信息,则这些操作将导致阻塞。当阻塞时间超过最大允许值(即max.block.ms),则系统将触发超时异常并设定默认值为60秒。

******3)**retries

当消息发送失败时,默认情况下指定生产者可将消息重新发送的次数设置为Integer.MAX_VALUE(缺省值)。默认情况下,在每次重试期间会延迟100毫秒,并可通过参数retry.backoff.ms 来调整这一延迟时间。

4)receive.buffer.bytes和********send.buffer.bytes

请配置TCP socket接收与发送数据包的缓存空间大小。若要将它们设定为-1,则会采用操作系统的默认值。当生产者或消费者位于不同的数据中心时,请相应地增加这些数值以适应跨数据中心网络的特点:由于跨数据中心之间的网络通常具有较高的延迟和较低的带宽

******5)**batch.size

若将多个消息投向同一分区,则生产者将这些消息整合至同一批中进行处理。该参数配置了每一批次可占用的内存容量(单位为字节)。当一批次的内存已满时,则该批中的所有消息随后会被发布出去。然而生产者并非总是等到一批次填满才会触发发布;有时即使是一半填充或仅包含一条记录的消息批次也会被及时发布出来。缺省值设为16384字节(相当于16KB)。若单条消息体积超过该批次容量,则无法存入。

******6)**linger.ms

生产者被指示在发送批次之前等待更多消息加入该批次所需的时间,并遵循先到先服务原则。具体而言,在我们接收的消息数量达到batch.size的数量后(而不是当总数据量达到时),该机制将立即触发并启动发送过程而不受此参数的影响。然而,在我们接收的消息总字节数远低于batch.size设置的情况下,则需要设定一个特定时长以便接收更多消息。这个设置默认为0(无延迟)。例如设定linger.ms=5将减少请求数量的同时增加5ms延迟并提升吞吐量。

******7)**compression.type

producer主要用于对数据进行压缩操作,并且默认情况下未启用任何额外的 compression;允许选择的 compression 类别包括 none 选项表示无 compression;建议优先考虑将 compression 应用于批量处理场景;当处理的数据量越大时;效果越显著;其中 snappy 压缩算法具有较低的 CPU 开销;对于性能与网络带宽更为关键的应用场景建议选用 snappy 压缩算法;而当带宽成为主要瓶颈时则应考虑采用 gzip 选项;正确选项值为 none, gzip, snappy。

******8)**client.id

每当向server发送请求时,该字符串会被发送给server,其主要目的是为了追踪请求源.这项应用可自由设置任意字符串,并出于仅记录和跟踪的目的.

******9)**max.in.flight.requests.per.connection

允许指定的生产者在其接收服务器响应前发送多个消息;数值越高则占用的内存越大;当然还可以提高吞吐量。当发生错误时可能会影响数据发送顺序;默认设置为5(可选)。若需确保消息在一个分区内的严格顺序,则应将其设为1;不过这样做会显著降低生产者的吞吐能力。

******10)**request.timeout.ms

客户端将在请求的响应到来前指定的时间内保持等待状态,若在规定时间内未接收到回应,客户端会立即执行重新发送请求的操作;当达到预先设置的重试次数上限后会触发异常抛出机制,默认情况下允许30秒的时间间隔进行重试。

******11)**metadata.fetch.timeout.ms

表示我们获取的一些元数据中的第一个时间信息,在这些元数据中包含三个字段:topic、host以及partitions这三个部分。该配置用于等待元数据完成获取所需的时间;若此过程未完成,则会向客户端抛出异常。

******12)**max.request.size

限定生产者的每次发送请求的最大数据量。默认设置为1兆字节(MB)。若一个请求仅包含单个消息,则该消息的数据量不得超过1M;若一个请求涉及一批次数据,并包含1,000条 msg,则每条 msg 的数据量不得多于1千字节(KB)。特别注意的是:Broker节点自身会设定对 msg 存储尺寸的上限

无需关注其他参数。通常建议仅需记住acks、batch.size、linger.ms和max.request.size这几个关键参数。

******13)**顺序保证

该系统确保了同一主题内消息的有序性。当向系统发送消息时,主题仅限于一个特定分区,生产者会按预定顺序依次发送消息,Broker将按该序列将这些消息存储到对应的分区中,接收端同样会按输入序列读取这些信息。在某些业务场景中对处理顺序的要求并不严格,例如,转账操作中的金额变化具有明确的时间先后关系,而资金交易中的先后次序则相对不敏感

设置retries为非零整数,并将max.in.flight.requests.per.connection设定为大于1的数值,则当第一批消息提交失败而第二批能够成功提交时, broker将尝试重传第一批消息.

通常情况下当消息需要有序时 消息是否成功提交同样重要

******4、**序列化

要创建一个生产者对象,则需要先指定一个序列化器;然而,默认情况下提供的序列化器可能无法满足所有应用场景的需求。因此我们完全有权限根据具体需求来定制适合自己的序列化器方案;只要求实现org.apache.kafka.commonerialization.Serializer这一接口即可完成自定义序列化的开发工作

如何实现,看模块kafka-no-spring下包selfserial 中代码。

序列化代码如下:

复制代码
 public class SelfSerializer implements Serializer<DemoUser> {

    
     public void configure(Map<String, ?> configs, boolean isKey) {
    
         //do nothing
    
     }
    
     public byte[] serialize(String topic, DemoUser data) {
    
         try {
    
             byte[] name;
    
             int nameSize;
    
             if(data==null){
    
                 return null;
    
             }
    
             if(data.getName()!=null){
    
                 name = data.getName().getBytes("UTF-8");
    
                 //字符串的长度
    
                 nameSize = data.getName().length();
    
             }else{
    
                 name = new byte[0];
    
                 nameSize = 0;
    
             }
    
             /*id的长度4个字节,字符串的长度描述4个字节,
    
             字符串本身的长度nameSize个字节*/
    
             ByteBuffer buffer = ByteBuffer.allocate(4+4+nameSize);
    
             buffer.putInt(data.getId());//4
    
             buffer.putInt(nameSize);//4
    
             buffer.put(name);//nameSize
    
             return buffer.array();
    
         } catch (Exception e) {
    
             throw new SerializationException("Error serialize DemoUser:"+e);
    
         }
    
     }
    
     public void close() {
    
         //do nothing
    
     }
    
 }

反序列化代码:

复制代码
 public class SelfDeserializer implements Deserializer<DemoUser> {

    
     public void configure(Map<String, ?> configs, boolean isKey) {
    
         //do nothing
    
     }
    
     public DemoUser deserialize(String topic, byte[] data) {
    
         try {
    
             if(data==null){
    
                 return null;
    
             }
    
             if(data.length<8){
    
                 throw new SerializationException("Error data size.");
    
             }
    
             ByteBuffer buffer = ByteBuffer.wrap(data);
    
             int id;
    
             String name;
    
             int nameSize;
    
             id = buffer.getInt();
    
             nameSize = buffer.getInt();
    
             byte[] nameByte = new byte[nameSize];
    
             buffer.get(nameByte);
    
             name = new String(nameByte,"UTF-8");
    
             return new DemoUser(id,name);
    
         } catch (Exception e) {
    
             throw new SerializationException("Error Deserializer DemoUser."+e);
    
         }
    
     }
    
     public void close() {
    
         //do nothing
    
     }
    
 }

发送消息--未来某个时候get发送结果

复制代码
 public class SelfSerialProducer {

    
     private static KafkaProducer<String, DemoUser> producer = null;
    
     public static void main(String[] args){
    
         producer = new KafkaProducer<String, DemoUser>(KafkaConst.producerConfig(StringSerializer.class,SelfSerializer.class));
    
         try {
    
             // 待发送的消息实例
    
             ProducerRecord<String,DemoUser> record;
    
             try{
    
                 record = new ProducerRecord<String,DemoUser>( BusiConst.SELF_SERIAL_TOPIC,
    
                         "user001",new DemoUser(1,"hankinn01"));
    
                 producer.send(record);
    
                 System.out.println("sent: "+record);
    
             }catch (Exception e){
    
                 e.printStackTrace();
    
             }
    
         } finally {
    
             producer.close();
    
         }
    
     }
    
 }

自定义序列化需要考虑的问题****:****

自定义序列化可能带来程序的脆弱性。例如,在我们的实现中,我们采用了不同类型的消费者来处理实体字段的不同需求。其中一些消费者会将字段转换为long型的数据类型,而另一些则会新增字段。这会导致前后消息之间的兼容性问题,在实际应用中尤其需要注意。特别在系统升级时,通常会遇到部分系统升级而其余系统被迫跟随升级的情况。

应对这一问题, 可以考虑采用内置格式支持并具备不受语言影响的数据序列化能力的语言无关的序列化框架, 如Apache Avro被广泛采用. 其通过一个JSON文件定义了data schema用于描述数据结构, 可将其内置于数据文件中以确保一致性和规范性. 从而确保即使遇到不同版本的数据格式也能有效处理

但是内置的消息实例,默认会携带格式信息这一特性可能会导致消息体积出现不必要的增长,并从而造成资源消耗的问题。为了优化这一情况 我们可以通过采用schema注册表机制来实现数据管理上的优化方案具体而言 可以将所有待存入的数据与其所属的schema进行绑定 并将其存储于系统预定义的注册表中 这样在后续的信息传递过程中 只需在消息传递过程中引用对应的schema标识符 由系统自动从该注册表中提取相应的chema来进行解码和解析 这不仅能够有效减少数据传输过程中的开销 同时还能提高数据处理的整体效率

注意:Kafka本身不支持schema注册表,在实际应用中通常依赖第三方工具或平台以实现功能需求。目前已有众多开源解决方案可供选择,并且许多解决方案都可以通过GitHub获取。

请参考以下链接:https://cloud.tencent.com/developer/article/1336568

5、kafka分区****

我们新增了一个ProducerRecord对象,在其中可以看到该对象包含目标主题、key和value以及Kafka中的消息(以键值对的形式存在)。该key可设为null。该key有两种主要用途:首先用于确定消息分配至主题中的哪个分区;其次也可作为附加信息携带

当键值设为null并采用默认设置划分区域时,在轮转算法下消息将均匀分配至各个区域中。

如果键字段非空且配置了默认分区器,则 Kafka 会对此键执行哈希运算(基于 Kafka 内置定制哈希算法的具体实现尚不清楚)。该操作会将消息依据哈希值分配至对应的 partitions 中。显而易见的是同一个 key 始终会被分配至同一 partitions 中。然而仅当在创建主题时就规划好 partitions 并且不再增加新的 partitions 时才能保证这种一致性;否则一旦新增 partition 就无法保证这种一致性了;因此为了利用 key 对 partition 进行映射必须提前在主题创建阶段规划好 partition 的数量与分布并且坚决避免以后再添加新的 partition

自定义分区器****:** **

在某些情况下(如电商场景),由于特定地区的交易特征(如北京地区的交易量占比高达20%),需要对高并发区域进行专门化的处理以提升性能)。例如,在电商系统中,默认采用散列分区内存在不足(因为会导致资源浪费),我们可以自行设计专门针对其优化的分区内(如为北京地区单独划分特定区域)。另一种情况是通过值域来进行分区内(如根据订单金额大小划分)。具体实现时,请先创建一个4区间的主题索引结构;然后观察模块kafka-no-spring下的selfpartition包中的相关代码实现细节。

定义分区器,以value值进行分区代码

public class SelfPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//拿到
__ List partitionInfos = cluster.partitionsForTopic(topic);
//_TODO 分区数_
** ****int num = partitionInfos.size();
//
TODO 根据value与分区数求余的方式得到分区ID**
** __**int parId = ((String)value).hashCode()%num;
return parId;
}
public void close() {
//do nothing
__}
public void configure(Map<String, ?> configs) {
//do nothing
__}
}

可以和KafkaFutureProducer比较分区结果:

复制代码
 public class SelfPartitionProducer {

    
     private static KafkaProducer<String,String> producer = null;
    
     public static void main(String[] args) {
    
         /*消息生产者*/
    
         Properties properties = KafkaConst.producerConfig(StringSerializer.class,StringSerializer.class);
    
         /*使用自定义的分区器*/
    
         properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.chj.selfpartition.SelfPartitioner");
    
         producer = new KafkaProducer<String, String>(properties);
    
         try {
    
             /*待发送的消息实例*/
    
             ProducerRecord<String,String> record;
    
             try {
    
                 record = new ProducerRecord<String,String>( BusiConst.SELF_PARTITION_TOPIC,"teacher01","hankin");
    
                 Future<RecordMetadata> future = producer.send(record);
    
                 System.out.println("Do other something");
    
                 RecordMetadata recordMetadata = future.get();
    
                 if(null!=recordMetadata){
    
                     System.out.println(String.format("偏移量:%s,分区:%s", recordMetadata.offset(),recordMetadata.partition()));
    
                 }
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         } finally {
    
             producer.close();
    
         }
    
     }
    
 }
    
  
    
 /** * 类说明:可以和KafkaFutureProducer比较分区结果
    
  */
    
  
    
 public class SysPartitionProducer {
    
     private static KafkaProducer<String,String> producer = null;
    
     public static void main(String[] args) {
    
         // 消息生产者
    
         Properties properties = KafkaConst.producerConfig(StringSerializer.class,StringSerializer.class);
    
         producer = new KafkaProducer<String, String>(properties);
    
         try {
    
             // 待发送的消息实例
    
             ProducerRecord<String,String> record;
    
             try {
    
                 record = new ProducerRecord<String,String>(
    
                         BusiConst.SELF_PARTITION_TOPIC,"teacher01","hankin");
    
                 Future<RecordMetadata> future = producer.send(record);
    
                 System.out.println("Do other something");
    
                 RecordMetadata recordMetadata = future.get();
    
                 if(null!=recordMetadata){
    
                     System.out.println(String.format("偏移量:%s,分区:%s",recordMetadata.offset(),recordMetadata.partition()));
    
                 }
    
             } catch (Exception e) {
    
                 e.printStackTrace();
    
             }
    
         } finally {
    
             producer.close();
    
         }
    
     }

全部评论 (0)

还没有任何评论哟~