Advertisement

RabbitMQ--高级特性

阅读量:

生产端可靠性投递

  • 保证消息成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善消息进行补偿机制

大厂处理方案

  • 消息落库,对消息状态打标
  • 消息延迟投递,做二次确认,回调检查

冥等性概念

就是类似原子性,高并发下,不出现重复消费。

方案:乐观锁、唯一id+指纹锁机制,利用数据库去重、redis原子性

一般在消费端处理

Confirm 确认消息

  • 消息投递后,如果Broker收到消息,则会给我们一个生产者应答。
  • channel.confirmSelect(); 开启确认消息
  • channel.addConfirmListener(new ConfirmListener() {}) 添加消息接收对象

(producer)

复制代码
  public static void main(String[] args) throws Exception {

    
 		//1 创建ConnectionFactory
    
 		ConnectionFactory connectionFactory = new ConnectionFactory();
    
 		connectionFactory.setHost("192.168.11.76");
    
 		connectionFactory.setPort(5672);
    
 		connectionFactory.setVirtualHost("/");
    
 		
    
 		//2 获取C	onnection
    
 		Connection connection = connectionFactory.newConnection();
    
 		//3 通过Connection创建一个新的Channel
    
 		Channel channel = connection.createChannel();
    
 		
    
 		//4 指定我们的消息投递模式: 消息的确认模式 
    
 		channel.confirmSelect();
    
 		String exchangeName = "test_confirm_exchange";
    
 		String routingKey = "confirm.save";
    
 		
    
 		//5 发送一条消息
    
 		String msg = "Hello RabbitMQ Send confirm message!";
    
 		channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
    
 		
    
 		//6 添加一个确认监听
    
 		channel.addConfirmListener(new ConfirmListener() {
    
 			@Override
    
 			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    
 				System.err.println("-------no ack!--返回失败---------");
    
 			}
    
 			@Override
    
 			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    
 				System.err.println("-------ack!---成功返回--------");
    
 			}
    
 		});
    
 	}

Return 消息机制

  • Return Listener 用于处理一下不可路由的消息
  • 如果发送的消息指定的 Exchange或者路由Key找不到,这种不可达的消息,需要Return Listener
  • Mandatory :如果设置成True才会接收不可达消息,false的话,那么broker会自动删除该消息!

(producer)

复制代码
  public static void main(String[] args) throws Exception {

    
 		ConnectionFactory connectionFactory = new ConnectionFactory();
    
 		connectionFactory.setHost("192.168.11.76");
    
 		connectionFactory.setPort(5672);
    
 		connectionFactory.setVirtualHost("/");
    
 		
    
 		Connection connection = connectionFactory.newConnection();
    
 		Channel channel = connection.createChannel();
    
 		
    
 		String exchange = "test_return_exchange";
    
 		String routingKey = "return.save";
    
 		String msg = "Hello RabbitMQ Return Message";
    
  
    
 		channel.addReturnListener(new ReturnListener() {
    
 			@Override
    
 			public void handleReturn(int replyCode, String replyText, String exchange,
    
 					String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
 				System.err.println("---------handle  return----------");
    
 				System.err.println("replyCode: " + replyCode);
    
 				System.err.println("replyText: " + replyText);
    
 				System.err.println("exchange: " + exchange);
    
 				System.err.println("routingKey: " + routingKey);
    
 				System.err.println("properties: " + properties);
    
 				System.err.println("body: " + new String(body));
    
 			}
    
 		});
    
 		//这里的 true 就是 Mandatory
    
 		channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
    
 	}

自定义消费监听

消息接收类

复制代码
 public class MyConsumer extends DefaultConsumer {

    
 	
    
 	public MyConsumer(Channel channel) {
    
 		super(channel);
    
 	}
    
  
    
 	@Override
    
 	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
 		System.err.println("-----------consume message----------");
    
 		System.err.println("consumerTag: " + consumerTag);
    
 		System.err.println("envelope: " + envelope);
    
 		System.err.println("properties: " + properties);
    
 		System.err.println("body: " + new String(body));
    
 	}
    
 }

添加绑定(consumer)

复制代码
  public static void main(String[] args) throws Exception {

    
 		ConnectionFactory connectionFactory = new ConnectionFactory();
    
 		connectionFactory.setHost("192.168.11.76");
    
 		connectionFactory.setPort(5672);
    
 		connectionFactory.setVirtualHost("/");
    
 		
    
 		Connection connection = connectionFactory.newConnection();
    
 		Channel channel = connection.createChannel();
    
 		String exchangeName = "test_consumer_exchange";
    
 		String routingKey = "consumer.#";
    
 		String queueName = "test_consumer_queue";
    
 		
    
 		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    
 		channel.queueDeclare(queueName, true, false, false, null);
    
 		channel.queueBind(queueName, exchangeName, routingKey);
    
 		
    
 		channel.basicConsume(queueName, true, new MyConsumer(channel));
    
 	}

消费限流

  • prefetchSize:0
  • prefetchCount : 消费者接收消息上限,一旦有N个消息还没ack(处理完成),该consumer将block(阻塞)掉,直到有ack
  • global : 是否将上面设置应用于channel。就是上面限制是channel级别还是consumer级别
  • prefetchSize 和 global这两个,rabbitmq没实现。prefetchCount 在 no_ask=false情况下生效,自动应答的情况下,这两个值不生效的。
复制代码
 public class MyConsumer extends DefaultConsumer {

    
 	private Channel channel ;
    
 	
    
 	public MyConsumer(Channel channel) {
    
 		super(channel);
    
 		this.channel = channel;
    
 	}
    
  
    
 	@Override
    
 	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
 		System.err.println("-----------consume message----------");
    
 		System.err.println("consumerTag: " + consumerTag);
    
 		System.err.println("envelope: " + envelope);
    
 		System.err.println("properties: " + properties);
    
 		System.err.println("body: " + new String(body));
    
 		//手动ack ,false不支持多条签收
    
 		channel.basicAck(envelope.getDeliveryTag(), false);
    
 	}
    
 }

添加绑定(consumer)

复制代码
  public static void main(String[] args) throws Exception {

    
 		ConnectionFactory connectionFactory = new ConnectionFactory();
    
 		connectionFactory.setHost("192.168.11.76");
    
 		connectionFactory.setPort(5672);
    
 		connectionFactory.setVirtualHost("/");
    
 		
    
 		Connection connection = connectionFactory.newConnection();
    
 		Channel channel = connection.createChannel();
    
 		
    
 		
    
 		String exchangeName = "test_qos_exchange";
    
 		String queueName = "test_qos_queue";
    
 		String routingKey = "qos.#";
    
 		
    
 		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    
 		channel.queueDeclare(queueName, true, false, false, null);
    
 		channel.queueBind(queueName, exchangeName, routingKey);
    
 		
    
 		//1 限流方式  第一件事就是 autoAck设置为 false
    
 		//一次只接受1条消息
    
 		channel.basicQos(0, 1, false);
    
     //关闭自动签收
    
 		channel.basicConsume(queueName, false, new MyConsumer(channel));
    
 	}

消费端ACK与重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息传递给Broker!
  • 实际应用中,一般都会关闭重回队列。设置成false

添加绑定(consumer)

复制代码
      // 手工签收 必须要关闭 autoAck = false

    
 		channel.basicConsume(queueName, false, new MyConsumer(channel));
复制代码
 public class MyConsumer extends DefaultConsumer {

    
 	private Channel channel ;
    
 	
    
 	public MyConsumer(Channel channel) {
    
 		super(channel);
    
 		this.channel = channel;
    
 	}
    
  
    
 	@Override
    
 	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
 		System.err.println("-----------consume message----------");
    
 		System.err.println("body: " + new String(body));
    
 		try {
    
 			Thread.sleep(2000);
    
 		} catch (InterruptedException e) {
    
 			e.printStackTrace();
    
 		}
    
 		if((Integer)properties.getHeaders().get("num") == 0) {
    
         //第一个false 是否支持多条签收,第二个true是否重回队列
    
 			channel.basicNack(envelope.getDeliveryTag(), false, true);
    
 		} else {
    
 			channel.basicAck(envelope.getDeliveryTag(), false);
    
 		}
    
 	}
    
 }

TTL队列/消息

  • 就是队列/消息 的生命周期
  • 设置最大数量和过期时间

Arguments 设置queue

  • x-max-length 3000 (最大长度3000)
  • x-message-ttl 1000 (过期时间10秒)

(producer)

复制代码
  public static void main(String[] args) throws Exception {

    
 		ConnectionFactory connectionFactory = new ConnectionFactory();
    
 		connectionFactory.setHost("192.168.11.76");
    
 		connectionFactory.setPort(5672);
    
 		connectionFactory.setVirtualHost("/");
    
 		Connection connection = connectionFactory.newConnection();
    
 		Channel channel = connection.createChannel();
    
 		
    
 		String exchange = "test_dlx_exchange";
    
 		String routingKey = "dlx.save";
    
 		String msg = "Hello RabbitMQ DLX Message";
    
 		
    
 		for(int i =0; i<1; i ++){
    
 			AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    
 					.deliveryMode(2)
    
 					.contentEncoding("UTF-8")
    
 					.expiration("10000")//过期时间
    
 					.build();
    
 			channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    
 		}
    
 	}

死信队列

  • DLX,Dead-Letter-Exchange
  • 当消息在一个队列中变成死信之后,他会被重新publish到另一个Exchange,这个Exchange就是DLX
  • 变成死信队列的几种情况
    • 消息被拒绝(basic.reject/basic.nack) 并且requeue=false (手动签收)
    • TTL过期
    • 队列达到最大长度
  • 设置死信队列
    • 就是绑定一个死信队列
    • Exchange: dlx.exchange
    • Queue: dlx.queue
    • RoutingKey: #
    • arguments.put("x-dead-letter-exchange","dlx.exchange") 其实上面名称都可以随便取,只要这里绑定对应交换机就行。

(consumer)

复制代码
  public static void main(String[] args) throws Exception {

    
 		ConnectionFactory connectionFactory = new ConnectionFactory();
    
 		connectionFactory.setHost("192.168.11.76");
    
 		connectionFactory.setPort(5672);
    
 		connectionFactory.setVirtualHost("/");
    
 		
    
 		Connection connection = connectionFactory.newConnection();
    
 		Channel channel = connection.createChannel();
    
 		
    
 		// 这就是一个普通的交换机 和 队列 以及路由
    
 		String exchangeName = "test_dlx_exchange";
    
 		String routingKey = "dlx.#";
    
 		String queueName = "test_dlx_queue";
    
 		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
    
 		
    
 		Map<String, Object> agruments = new HashMap<String, Object>();
    
 		agruments.put("x-dead-letter-exchange", "dlx.exchange");
    
 		//这个agruments属性,要设置到声明队列上
    
 		channel.queueDeclare(queueName, true, false, false, agruments);
    
 		channel.queueBind(queueName, exchangeName, routingKey);
    
 		
    
 		//要进行死信队列的声明:
    
 		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
    
 		channel.queueDeclare("dlx.queue", true, false, false, null);
    
 		channel.queueBind("dlx.queue", "dlx.exchange", "#");
    
 		channel.basicConsume(queueName, true, new MyConsumer(channel));
    
 	}

全部评论 (0)

还没有任何评论哟~