Advertisement

SpringBoot集成Kafka并发送消息

阅读量:

SpringBoot集成kafka

一、下载依赖

maven仓库官方地址:https://mvnrepository.com/

找到和自己项目SpringBoot相匹配版本的kafka版本
(此处为1.3.9.RELEASE)

复制代码
    <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka</artifactId>
       <version>1.3.9.RELEASE</version> <!-- 对应的 Spring Kafka 版本 -->
    </dependency>
    
    
    java

在项目配置文件中添加以下配置:

复制代码
    spring:
      kafka:
    topic: app_event_track_data  #发送队列
    bootstrap-servers: 服务器地址(127.0.0.1:9092)
    producer: # producer 生产者
      retries: 0 # 重试次数
      acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 # 批量大小
      buffer-memory: 33554432 # 生产端缓冲区大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-07-13/qZGESm9hN6OxUg3M7i4r5e10TPRb.png)

至此,SpringBoot集成Kafka完成,下面代码实现使用Kafka发送消息

二、Kafka生产者发送消息

实体类:

复制代码
    @Data
    public class HiddenPointReq extends CommonParam implements Serializable {
    
    private static final long serialVersionUID = -5344192250139833209L;
    
    /** *  跳转前页面
     */
    @ApiModelProperty("跳转前页面")
    private  String  fromPage;
    
    /** *  当前页面
     */
    @ApiModelProperty("当前页面")
    private  String  currentPage;
    
    /** *  事件名称
     */
    @ApiModelProperty("事件名称")
    private  String  eventName;
    
    /** *  额外参数
     */
    @ApiModelProperty("额外参数")
    private  String  extra;
    
    /** *  上报时间
     */
    private Date recordTime;
    
    // get from headers
    /** *  渠道code
     */
    private String channelCode;
    
    /** *  渠道名称
     */
    private String channelName;
    
    /** *  设备ID
     */
    private String deviceId;
    
    /** *  事件发生时间
     */
    private long eventTime;
    
    /** * 事件类型
     */
    private String eventType;
    
    /** * 事件可选值
     */
    private String eventValue;
    
    }
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-07-13/pHbqOAQgEtkeT2BJcLrDyvzndYXa.png)

HiddenPointController层:

复制代码
    @Slf4j
    @RestController
    @RequestMapping("/{version}/app/event")
    public class HiddenPointController {
    
    @Autowired
    private IAppEventService appEventService;
    
    @PostMapping("/record")
    public CommResult hiddenPointRecord(@RequestBody HiddenPointReq reqDTO, HttpServletRequest request) {
        log.info("HiddenPointReq:{}", reqDTO.toString());
        reqDTO.setDeviceId(request.getHeader("deviceId"));
        reqDTO.setAppVersion(request.getHeader("version"));
        if (request.getHeader("param-userId") != null) {
            reqDTO.setUserId(Long.valueOf(request.getHeader("param-userId")));
        }
        log.info("finish HiddenPointReq:{}", JSON.toJSONString(reqDTO));
        return appEventService.sendKafkaMsg(reqDTO);
    }
    }
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-07-13/gXfw4tMRh29SPp8oj1qKLA3ErI7F.png)

IAppEventService层:

复制代码
    public interface IAppEventService {
    
    	CommResult sendKafkaMsg(HiddenPointReq req);
    	
    }
    
    
    
    java

AppEventServiceImpl层:

复制代码
    @Slf4j
    @Service
    public class AppEventServiceImpl implements IAppEventService {
    
    	@Autowired
    private KafkaTemplate kafkaTemplate;
    
    	@Value("${spring.kafka.topic}")
    private String topic;
    
    	@Override
    public CommResult sendKafkaMsg(HiddenPointReq req) {
        log.info("senKafkaMsg req:{}", JSON.toJSONString(req));
        //发送消息
        try {
            req.setRecordTime(new Date());
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, JSON.toJSONString(req));
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    //发送失败的处理
                    log.error("senKafkaMsg fail_" + topic + "_provider message:" + throwable.getMessage());
                }
    
                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    //成功的处理
                    log.info("senKafkaMsg success_" + topic + "_provider message:" + stringObjectSendResult.toString());
                }
            });
        } catch (Exception e) {
            log.error("senKafkaMsg kafka send message:{}, error:{}", e.getMessage(), e);
        }
        return CommResult.success();
    }
    }
    
    
    java
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-07-13/2xHhPz8yOZRQraFk5Yg4LuUdTt0N.png)

全部评论 (0)

还没有任何评论哟~