Advertisement

Java netty发送接收(TCP、UDP)

阅读量:

最下方附项目地址

依赖

复制代码
 <dependency>

    
     <groupId>io.netty</groupId>
    
     <artifactId>netty-all</artifactId>
    
 </dependency>

yml配置

复制代码
 gps:

    
   netty:
    
     tcp:
    
       port: 8888
    
       read-timeout: 15 #读超时 15分钟
    
     udp:
    
       port: 7777
    
     threads:
    
       boss: 1
    
       worker: 4
    
       business:
    
     num: 1 #业务线程数量
    
     max-pending: 100000

配置类

复制代码
 @Configuration

    
 public class EventLoopGroupConfig {
    
  
    
     @Value("${gps.netty.threads.boss}")
    
     private int bossNum;
    
  
    
     @Value("${gps.netty.threads.worker}")
    
     private int workerNum;
    
  
    
     @Value("${gps.netty.threads.business.num}")
    
     private int businessNum;
    
  
    
 	@Value("${gps.netty.threads.business.max-pending}")
    
     private int maxPending;
    
  
    
  
    
     /** * TCP连接处理
    
      * @return
    
      */
    
     @Bean(name = "bossGroup")
    
     public NioEventLoopGroup bossGroup() {
    
     return new NioEventLoopGroup(bossNum);
    
     }
    
  
    
     /** * Socket数据读写
    
      * @return
    
      */
    
     @Bean(name = "workerGroup")
    
     public NioEventLoopGroup workerGroup() {
    
     return new NioEventLoopGroup(workerNum);
    
     }
    
  
    
     /** * Handler业务处理
    
      * @return
    
      */
    
     @Bean(name = "businessGroup")
    
     public EventExecutorGroup businessGroup() {
    
     return new DefaultEventExecutorGroup(businessNum,new BusinessThreadFactory(),maxPending, RejectedExecutionHandlers.reject());
    
     }
    
     
    
     static class BusinessThreadFactory implements ThreadFactory {
    
     private final ThreadGroup group;
    
     private final AtomicInteger threadNumber = new AtomicInteger(1);
    
     private final String namePrefix;
    
  
    
     BusinessThreadFactory() {
    
         SecurityManager s = System.getSecurityManager();
    
         group = (s != null) ? s.getThreadGroup() :
    
                               Thread.currentThread().getThreadGroup();
    
         namePrefix = "business-thread-";
    
     }
    
  
    
     @Override
    
     public Thread newThread(Runnable r) {
    
         Thread t = new Thread(group, r,
    
                               namePrefix + threadNumber.getAndIncrement(),
    
                               0);
    
         if (t.isDaemon()){
    
             t.setDaemon(false);
    
         }
    
  
    
         if (t.getPriority() != Thread.NORM_PRIORITY){
    
             t.setPriority(Thread.NORM_PRIORITY);
    
         }
    
         return t;
    
     }
    
     }
    
  
    
 }

TcpServer

复制代码
 @Slf4j

    
 @Component
    
 public class NettyTcpServer implements ApplicationListener<ApplicationStartedEvent> {
    
  
    
     @Value("${gps.netty.tcp.port}")
    
     private int port;
    
     
    
     @Value("${gps.netty.tcp.read-timeout}")
    
     private int readTimeOut;
    
  
    
     @Autowired
    
     @Qualifier("bossGroup")
    
     private NioEventLoopGroup bossGroup;
    
  
    
     @Autowired
    
     @Qualifier("workerGroup")
    
     private NioEventLoopGroup workerGroup;
    
     
    
     @Autowired
    
     @Qualifier("businessGroup")
    
     private EventExecutorGroup businessGroup;
    
  
    
     @Autowired
    
     private TcpServerHandler tcpServerHandler;
    
  
    
     /** * 启动Server
    
      * */
    
     @Override
    
 	public void onApplicationEvent(ApplicationStartedEvent event) {
    
     	try {
    
 	        ServerBootstrap serverBootstrap = new ServerBootstrap();
    
 	        serverBootstrap.group(bossGroup, workerGroup)
    
 	                .channel(NioServerSocketChannel.class)
    
 	                .childHandler(new ChannelInitializer<SocketChannel>() { //
    
 						@Override
    
 						public void initChannel(SocketChannel ch) throws Exception {
    
 							ch.pipeline().addLast(new IdleStateHandler(readTimeOut, 0, 0, TimeUnit.MINUTES));
    
 							// 1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常
    
 							ch.pipeline().addLast(
    
 									new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { MsgUtil.DELIMITER }),
    
 											Unpooled.copiedBuffer(new byte[] { MsgUtil.DELIMITER, MsgUtil.DELIMITER })));
    
 							ch.pipeline().addLast(businessGroup,tcpServerHandler);
    
 						}
    
 					})
    
 	                .option(ChannelOption.SO_BACKLOG, 1024) //服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
    
 	                .childOption(ChannelOption.TCP_NODELAY, true)//立即写出
    
 	                .childOption(ChannelOption.SO_KEEPALIVE, true);//长连接
    
 	        ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
    
 	        if (channelFuture.isSuccess()) {
    
 				log.info("TCP服务启动完毕,port={}", port);
    
 	        }
    
     	}catch(Exception e) {
    
 			log.info("TCP服务启动失败", e);
    
     	}
    
     }
    
  
    
     /** * 销毁资源
    
      */
    
     @PreDestroy
    
     public void destroy() {
    
     bossGroup.shutdownGracefully().syncUninterruptibly();
    
     workerGroup.shutdownGracefully().syncUninterruptibly();
    
     log.info("TCP服务关闭成功");
    
     }
    
 }

UdpServer

复制代码
 @Slf4j

    
 @Configuration
    
 public class NettyUdpServer implements ApplicationListener<ApplicationStartedEvent> {
    
  
    
     @Value("${gps.netty.udp.port}")
    
     private int port;
    
  
    
     @Resource
    
     private UdpServerHandler udpServerHandler;
    
  
    
     private EventLoopGroup group = null;
    
  
    
     @Override
    
     public void onApplicationEvent(@NonNull ApplicationStartedEvent event) {
    
  
    
     try {
    
         Bootstrap b = new Bootstrap();
    
         String osName= SystemPropertyUtil.get("os.name").toLowerCase();
    
         if("linux".equals(osName)) {
    
             group = new EpollEventLoopGroup();
    
             b.group(group)
    
                     .channel(EpollDatagramChannel.class);
    
         }else {
    
             group = new NioEventLoopGroup();
    
             b.group(group)
    
                     .channel(NioDatagramChannel.class);
    
         }
    
         //广播
    
         b.option(ChannelOption.SO_BROADCAST, true)
    
                 //接收缓存区  10M
    
                 .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10 )
    
                 //发送缓存区  10M
    
                 .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10 )
    
                 .handler(udpServerHandler);
    
  
    
         ChannelFuture channelFuture = b.bind(port).sync();
    
         if (channelFuture.isSuccess()) {
    
             log.info("UDP服务启动完毕,port={}", port);
    
         }
    
  
    
     } catch (InterruptedException e) {
    
         log.info("UDP服务启动失败", e);
    
     }
    
  
    
     }
    
  
    
     /** * 销毁资源
    
      */
    
     @PreDestroy
    
     public void destroy() {
    
     if(group!=null) {
    
         group.shutdownGracefully();
    
     }
    
     log.info("UDP服务关闭成功");
    
     }
    
 }

TcpHandler

复制代码
 @Slf4j

    
 @ChannelHandler.Sharable
    
 @Component
    
 public class TcpServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
  
    
 	@Autowired
    
 	private KafkaSender kafkaSender;
    
  
    
 	@Autowired
    
 	@Qualifier("businessGroup")
    
 	private EventExecutorGroup businessGroup;
    
  
    
     /** * 使用
    
      * @param ctx
    
      * @param byteBuf
    
      * @throws Exception
    
      */
    
     @Override
    
     protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) {
    
 		String content = byteBuf.toString(StandardCharsets.UTF_8);
    
  
    
 		log.info("TCP服务端接收到消息:{}",  content);
    
  
    
  
    
 		ByteBuf buf = Unpooled.copiedBuffer("TCP已经接收到消息:".getBytes(StandardCharsets.UTF_8));
    
  
    
 		businessGroup.execute(()->{
    
 			try {
    
 				kafkaSender.sendMessage("hello", content);
    
 				send2client(ctx,buf.array());
    
 			}catch(Throwable e) {
    
 				log.error("TCP数据接收处理出错",e);
    
 				ByteBuf err = Unpooled.copiedBuffer("系统错误:".getBytes(StandardCharsets.UTF_8));
    
 				send2client(ctx,err.array());
    
 			}
    
 		});
    
  
    
     }
    
     @Override
    
     public void channelReadComplete(ChannelHandlerContext ctx) {
    
     ctx.flush();
    
     }
    
  
    
     @Override
    
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    
     	log.error("TCP数据接收处理出错:",cause);
    
     }
    
  
    
     /** * 返回消息给客户端
    
      * @param ctx
    
      * @param msg
    
      */
    
     void send2client(ChannelHandlerContext ctx, byte[] msg) {
    
     	ByteBuf buf= Unpooled.buffer(msg.length+1);
    
     	buf.writeBytes(msg);
    
     	buf.writeByte(MsgUtil.DELIMITER);
    
     	ctx.writeAndFlush(buf).addListener(future->{
    
     		if(!future.isSuccess()) {
    
 				log.error("TCP发送给客户端消息失败");
    
     		}
    
     	});
    
     }
    
 }

UdpHandler

复制代码
 @ChannelHandler.Sharable

    
 @Component
    
 @Slf4j
    
 public class UdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
    
  
    
     @Autowired
    
     @Qualifier("businessGroup")
    
     private EventExecutorGroup businessGroup;
    
  
    
    @Autowired
    
    private KafkaSender kafkaSender;
    
  
    
     @Override
    
     protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
    
     String content = packet.content().toString(StandardCharsets.UTF_8);
    
  
    
     log.info("UDP服务端接收到消息:{}",  content);
    
  
    
  
    
     ByteBuf buf = Unpooled.copiedBuffer("UDP已经接收到消息:".getBytes(StandardCharsets.UTF_8));
    
  
    
  
    
     businessGroup.execute(()->{
    
         try {
    
             kafkaSender.sendMessage("hello", content);
    
             ctx.writeAndFlush(new DatagramPacket(buf, packet.sender()));
    
         }catch(Throwable e) {
    
             log.info("UDP数据接收处理出错{}",  e);
    
         }
    
     });
    
     }

Tcp消息发送

复制代码
 @Slf4j

    
 @RestController
    
 public class TcpClientController {
    
  
    
     @Value("${gps.netty.tcp.port}")
    
     private int port;
    
  
    
     @PostMapping("sendTcp")
    
     public String send(String msg){
    
     EventLoopGroup group = new NioEventLoopGroup();
    
     Bootstrap b = new Bootstrap();
    
     Channel ch =null;
    
     TcpServerHandler handler=new TcpServerHandler();
    
  
    
     try {
    
         b.group(group)
    
                 .channel(NioSocketChannel.class)
    
                 .handler(new ChannelInitializer<NioSocketChannel>() {
    
                     @Override
    
                     protected void initChannel(NioSocketChannel ch) throws Exception {
    
                         ch.pipeline().addLast(
    
                                 new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }),
    
                                         Unpooled.copiedBuffer(new byte[] { 0x7e, 0x7e })));
    
                         ch.pipeline().addLast(handler);
    
                     }});
    
         ch =b.connect("localhost", port).sync().channel();
    
  
    
     } catch (Exception e) {
    
         e.printStackTrace();
    
     }
    
  
    
     ByteBuf tcpMsg = Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8));
    
  
    
     ByteBuf buf= Unpooled.buffer(msg.length() + 1);
    
     try{
    
         log.info("TCP客户端发送消息:{}", msg);
    
         buf.writeBytes(tcpMsg);//消息体
    
         buf.writeByte(MsgUtil.DELIMITER);//消息分割符
    
         ch.writeAndFlush(buf).sync();
    
     }catch (Exception e){
    
         log.info("TCP客户端发送消息失败:{}", e);
    
     }
    
  
    
     //关闭链接
    
     group.shutdownGracefully();
    
     return "success";
    
     }

Udp消息发送

复制代码
 @Slf4j

    
 @RestController
    
 public class UdpClientController {
    
  
    
     @Value("${gps.netty.udp.port}")
    
     private int port;
    
  
    
     @PostMapping("sendUdp")
    
     public String send(String msg){
    
     EventLoopGroup group = new NioEventLoopGroup();
    
     Bootstrap b = new Bootstrap();
    
     Channel ch =null;
    
     UdpServerHandler handler=new UdpServerHandler();
    
  
    
     try {
    
         b.group(group)
    
                 .channel(NioDatagramChannel.class)
    
                 .option(ChannelOption.SO_BROADCAST, true)
    
                 .handler(new ChannelInitializer<NioDatagramChannel>() {
    
  
    
                     @Override
    
                     protected void initChannel(NioDatagramChannel ch) throws Exception {
    
                         ch.pipeline().addLast(handler.getClass().getSimpleName(),handler);
    
                     }});
    
         ch =b.bind(0).sync().channel();
    
     } catch (Exception e) {
    
         e.printStackTrace();
    
     }
    
  
    
     ByteBuf buf = Unpooled.copiedBuffer(msg.getBytes(StandardCharsets.UTF_8));
    
  
    
     try{
    
         log.info("UDP客户端发送消息:{}", msg);
    
         ch.writeAndFlush(new DatagramPacket(
    
                 Unpooled.copiedBuffer(buf.array()),
    
                 SocketUtils.socketAddress("localhost", port))).sync();
    
     }catch (Exception e){
    
         log.info("UDP客户端发送消息失败:{}", e);
    
     }
    
  
    
     //关闭链接
    
     group.shutdownGracefully();
    
     return "success";
    
     }
    
 }

[项目地址

https://gitee.com/x-n/m-g/netty_kafka](https://gitee.com/x-n/m-g/netty_kafka "此项目具体位置")

全部评论 (0)

还没有任何评论哟~