当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。immediate
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
exchange:交换机名称 routingkey:路由键 props:消息属性字段,比如消息头部信息等等 body:消息主体部分本节主要讲述mandatory, 下面我们写一个demo,在RabbitMQ broker中有:
exchange : exchange.mandatory.test queue: queue.mandatory.test exchange路由到queue的routingkey是mandatory 这里先不讲当前的exchange绑定到queue中,即:channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
package com.vms.test.zzh.rabbitmq.self;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;/** * Created by hidden on 2017/2/7. */public class RBmandatoryTest { public static final String ip = ""; public static final int port = 5672; public static final String username = "root"; public static final String password = "root"; public static final String queueName = "queue.mandatory.test"; public static final String exchangeName = "exchange.mandatory.test"; public static final String routingKey = "mandatory"; public static final Boolean mandatory = true; public static final Boolean immediate = false; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());// channel.close();// connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }}
try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes()); channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); System.out.println("Basic.return返回的结果是:"+message); } });// channel.close();// connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());
通过wireshark抓包如下: 可以看到并不会有basic.return方法被调用。查看RabbitMQ管理界面发现消息已经到达了队列。在RabbitMQ3.0以后的版本里,去掉了immediate参数的支持,发送带immediate标记的publish会返回如下错误:
Removal of “immediate” flag What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish. Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues. What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0. If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them. 这段解释的大概意思是:immediate标记会影响镜像队列性能,增加代码复杂性,并建议采用“TTL”和“DLX”等方式替代。