在上篇【消息中间件RabbitMQ(一)之初见】中已经了解了 RabbitMQ 的安装、相关概念以及图形化管理界面的使用,接下来学习一下使用 Java 客户端来操作 RabbitMQ。
入门程序
依赖
1、使用 Maven 创建生产者工程和消费者 Java 工程,分别引入 RabbitMQ 客户端依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version> <!-- 此版本与 Spring Boot 1.5.9 版本匹配 -->
</dependency>
生产者
2、编写生产者程序:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ 入门程序-生产者
*/
public class Producer01 {
/**
* 队列名称
*/
private static final String QUEUE_NAME = "helloworld";
public static void main(String[] args) throws Exception {
// 通过连接工厂创建和 mq 服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
connectionFactory.setPort(5672); // mq 服务暴露的端口
connectionFactory.setUsername("guest"); // 用户名
connectionFactory.setPassword("guest"); // 密码
connectionFactory.setVirtualHost("/"); // 设置虚拟机路径
// 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
Connection connection = null;
Channel channel = null;
try {
// 创建新连接
connection = connectionFactory.newConnection();
// 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
channel = connection.createChannel();
// 声明队列,如果队列不存在,则创建
/**
* 参数列表:
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:队列名称
* durable:是否持久化,mq 重启后队列依旧存在
* exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
* autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
* arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发布消息
/**
* 参数列表:
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:交换机,如果不指定则使用默认的交换机
* routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
* props:消息属性
* body:消息体,消息内容
*/
String msg = "hello world";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.printf("Send msg " + msg + " to " + QUEUE_NAME);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
}
}
执行该生产者程序,执行完毕后可通过管理界面查看到队列及消息信息:
生产者发送消息流程小结:
- 创建连接;
- 创建通道;
- 声明队列;
- 发送消息;
消费者
3、编写消费者程序:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RabbitMQ 入门程序-消费者
*/
public class Consumer01 {
private static final String QUEUE_NAME = "helloworld";
public static void main(String[] args) throws Exception {
// 通过连接工厂创建和 mq 服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
connectionFactory.setPort(5672); // mq 服务暴露的端口
connectionFactory.setUsername("guest"); // 用户名
connectionFactory.setPassword("guest"); // 密码
connectionFactory.setVirtualHost("/"); // 设置虚拟机路径
// 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
Connection connection = null;
Channel channel = null;
try {
// 创建新连接
connection = connectionFactory.newConnection();
// 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
channel = connection.createChannel();
// 声明队列,如果队列不存在,则创建
/**
* 参数列表:
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:队列名称
* durable:是否持久化,mq 重启后队列依旧存在
* exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
* autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
* arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 监听队列接收(消费)消息
/**
* 参数列表:
* String queue, boolean autoAck, String consumerTag, Consumer callback
* queue:监听的队列名称
* autoAck:自动回复,当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 那么将会自动回复 mq,设置为 false 时需要手动编程回复 mq
* consumerTag:消费者标签,消费者的标识,可选
* callback:接收到消息时的回调对象
*/
channel.basicConsume(QUEUE_NAME, true,"Consumer01", new DefaultConsumer(channel){
/**
* 接收到消息时该方法将被调用
* @param consumerTag 消费者标签,消费者标识,使用 basicConsume 方法时使用的标签名称
* @param envelope 信封,存放了消息的相关信息,如发送该消息的交换机、该消息在 mq 中的标识
* @param properties 消息属性,生产者使用 channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 发送消息时传递的 props 属性信息
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String exchange = envelope.getExchange(); // 获取发送消息的交换机
long deliveryTag = envelope.getDeliveryTag(); // 消息在 mq 中的标识 id,可用于消息接收确认
System.out.println("receive msg from "+ consumerTag + ":"+ new String(body, "utf-8"));
}
});
} catch (Exception e) {
e.printStackTrace();
}
// 消费者不关闭连接则持续监听接收(消费)队列中的消息
//finally {
// if (channel != null) channel.close();
// if (connection != null) connection.close();
//}
}
}
执行该生产者程序,执行完毕后可通过管理界面查看到队列中的消息已被消费(接收),且之后生产者每发送一条消息到队列,消费者也随之消费一条消息;
消费者接收消息流程小结:
- 创建连接;
- 创建通道;
- 声明队列(如果能够确定队列存在,则可忽略);
- 监听队列接收消息;
- 回复 mq(如果设置了自动回复可忽略);
工作模式
RabbitMQ 有如下几种工作模式:
- Simple:简单模式;
- Work queues:工作队列模式;
- Publish/Subscribe:发布订阅模式;
- Routing:路由模式;
- Topics:通配符模式;
- Header:Header 转发器模式;
- RPC:远程过程调用模式;
各个工作模式使用的交换机类型可能是不一样的,如下:
- Fanout Exchange 对应发布订阅模式;
- Direct Exchange 对应路由模式;
- Topic Exchange 对应通配符模式;
- Headers Exchange 对应 Header 转发器模式;
上述交换机类型都定义在 com.rabbitmq.client.BuiltinExchangeType
枚举类中。
Simple 简单模式
生产者将消息放入队列, 消费者(Consumer) 监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,消息自动从队列中删除。
隐患:消息可能没有被消费者正确处理,但该消息已经从队列中消失了,造成消息的丢失。
应用场景:聊天。
示例
上面入门程序使用的就是简单模式。
Work queues 工作队列模式
生产者将消息放入队列,消费者可以有多个。如消费者 C1 、消费者 C2 同时监听同一个队列。
C1 和 C2 共同消费当前的消息队列内容,Rabbit MQ 默认使用轮询的方式将消息平均发送给各个消费者。
上图中虽然没有出现交换机,但实际上是使用了默认的交换机。
隐患:高并发情况下,可能会产生某一个消息被多个消费者共同使用,此时可以设置同步,保证一条消息只能被一个消费者使用。
应用场景:抢红包,大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)。
示例
将入门程序中的消费者程序启动多个,则就是工作队列模式。
publish/subscribe 发布订阅模式
X 代表交换机,是 RabbitMQ 内部组件,生产者将消息放入交换机,交换机负责把消息发送到所有绑定了该交换机的队列,对应消息队列的消费者拿到消息进行消费。
应用场景:邮件群发、群聊天、广播、广告。
示例
编写生产者程序:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ 发布订阅模式-生产者
*/
public class Producer02 {
private static final String FANOUT_EXCHANGE = "test_fanout_exchange";
private static final String QUEUE_1 = "queue_1";
private static final String QUEUE_2 = "queue_2";
public static void main(String[] args) throws Exception {
// 通过连接工厂创建和 mq 服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
connectionFactory.setPort(5672); // mq 服务暴露的端口
connectionFactory.setUsername("guest"); // 用户名
connectionFactory.setPassword("guest"); // 密码
connectionFactory.setVirtualHost("/"); // 设置虚拟机路径
// 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
Connection connection = null;
Channel channel = null;
try {
// 创建新连接
connection = connectionFactory.newConnection();
// 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
channel = connection.createChannel();
// 声明队列,如果队列不存在,则创建
/**
* 参数列表:
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:队列名称
* durable:是否持久化,mq 重启后队列依旧存在
* exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
* autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
* arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
*/
channel.queueDeclare(QUEUE_1, true, false, false, null);
channel.queueDeclare(QUEUE_2, true, false, false, null);
// 声明交换机
/**
* 参数列表:
* String exchange, String type
* exchange:交换机名称
* type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
*/
channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
// 绑定交换机和队列
/**
* 参数列表:
* String queue, String exchange, String routingKey
* queue:队列名称
* exchange:交换机名称
* routingKey:路由 key,在发布订阅模式中无用,设为 "" 即可
*/
channel.queueBind(QUEUE_1, FANOUT_EXCHANGE, "");
channel.queueBind(QUEUE_2, FANOUT_EXCHANGE, "");
// 发布消息给交换机
/**
* 参数列表:
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:交换机,如果不指定则使用默认的交换机
* routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
* props:消息属性
* body:消息体,消息内容
*/
String msg = "hello world";
channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());
System.out.printf("Send msg " + msg + " to " + FANOUT_EXCHANGE);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
}
}
发布订阅模式对应的交换机类型为 Fanout,所以该生产者程序在入门程序的基础上添加了手动定义的 Fanout 类型的交换机,并且声明了 2 个队列绑定到了该交换机。
执行该程序,查看会发现两个队列都接受到了消息:
Routing 路由模式
生产者将消息携带一个路由 key(字符串) 发送给交换机,交换机根据路由 key,匹配到与该路由 key 对应的消息队列,对应消息队列的消费者消费消息。
应用场景:消息分类通知(利用路由 key 可将消息分类)。
示例
编写生产者程序:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ 路由模式-生产者
*/
public class Producer03 {
private static final String DIRECT_EXCHANGE = "test_direct_exchange";
private static final String QUEUE_3 = "queue_3";
private static final String QUEUE_4 = "queue_4";
/**
* queue_3 使用的路由 key
*/
private static final String QUEUE_3_ROUTING_KEY = "queue_3_key";
/**
* queue_4 使用的路由 key
*/
private static final String QUEUE_4_ROUTING_KEY = "queue_4_key";
public static void main(String[] args) throws Exception {
// 通过连接工厂创建和 mq 服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
connectionFactory.setPort(5672); // mq 服务暴露的端口
connectionFactory.setUsername("guest"); // 用户名
connectionFactory.setPassword("guest"); // 密码
connectionFactory.setVirtualHost("/"); // 设置虚拟机路径
// 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
Connection connection = null;
Channel channel = null;
try {
// 创建新连接
connection = connectionFactory.newConnection();
// 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
channel = connection.createChannel();
// 声明队列,如果队列不存在,则创建
/**
* 参数列表:
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:队列名称
* durable:是否持久化,mq 重启后队列依旧存在
* exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
* autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
* arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
*/
channel.queueDeclare(QUEUE_3, true, false, false, null);
channel.queueDeclare(QUEUE_4, true, false, false, null);
// 声明交换机
/**
* 参数列表:
* String exchange, String type
* exchange:交换机名称
* type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
*/
channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
// 绑定交换机和队列
/**
* 参数列表:
* String queue, String exchange, String routingKey
* queue:队列名称
* exchange:交换机名称
* routingKey:路由 key
*/
channel.queueBind(QUEUE_3, DIRECT_EXCHANGE, QUEUE_3_ROUTING_KEY);
channel.queueBind(QUEUE_4, DIRECT_EXCHANGE, QUEUE_4_ROUTING_KEY);
// 发布消息给交换机
/**
* 参数列表:
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:交换机,如果不指定则使用默认的交换机
* routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
* props:消息属性
* body:消息体,消息内容
*/
String routingKey = "";
for (int i = 0; i < 10; i++) {
String msg = "hello world" + i;
routingKey = i % 2 == 0 ? QUEUE_3_ROUTING_KEY : QUEUE_4_ROUTING_KEY;
channel.basicPublish(DIRECT_EXCHANGE, routingKey, null, msg.getBytes());
System.out.println("Send msg " + msg + " to " + DIRECT_EXCHANGE);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
}
}
路由模式对应的交换机类型为 Direct,所以该生产者程序在入门程序的基础上添加了手动定义的 Direct 类型的交换机,并且声明了 2 个队列绑定到了该交换机,在发布消息时也是直接发送给交换机,但发送消息的同时指定了路由 key 让消息分发到两个队列。
执行该程序,查看会发现两个队列都接受到了消息:
Topics 通配符模式
符号 *
和 #
都是通配符,#
可匹配一个或多个词,*
匹配单个词,每个单词间使用 .
分隔。
实际上就是在路由模式的基础上添加了模糊匹配功能,生产者把消息携带一个带通配符的路由 key 交给交换机,交换机根据路由 key 的规则模糊匹配到对应的队列,由对应队列的消费者消费消息;
示例
编写生产者程序:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* RabbitMQ 通配符模式-生产者
*/
public class Producer04 {
private static final String TOPIC_EXCHANGE = "test_topic_exchange";
private static final String QUEUE_5 = "queue_5";
private static final String QUEUE_6 = "queue_6";
/**
* queue_5 使用的路由 key
*/
private static final String QUEUE_5_ROUTING_KEY = "msg.#.queue_5.#";
/**
* queue_6 使用的路由 key
*/
private static final String QUEUE_6_ROUTING_KEY = "msg.#.queue_6.#";
public static void main(String[] args) throws Exception {
// 通过连接工厂创建和 mq 服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
connectionFactory.setPort(5672); // mq 服务暴露的端口
connectionFactory.setUsername("guest"); // 用户名
connectionFactory.setPassword("guest"); // 密码
connectionFactory.setVirtualHost("/"); // 设置虚拟机路径
// 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
Connection connection = null;
Channel channel = null;
try {
// 创建新连接
connection = connectionFactory.newConnection();
// 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
channel = connection.createChannel();
// 声明队列,如果队列不存在,则创建
/**
* 参数列表:
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:队列名称
* durable:是否持久化,mq 重启后队列依旧存在
* exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
* autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
* arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
*/
channel.queueDeclare(QUEUE_5, true, false, false, null);
channel.queueDeclare(QUEUE_6, true, false, false, null);
// 声明交换机
/**
* 参数列表:
* String exchange, String type
* exchange:交换机名称
* type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
*/
channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
// 绑定交换机和队列
/**
* 参数列表:
* String queue, String exchange, String routingKey
* queue:队列名称
* exchange:交换机名称
* routingKey:路由 key
*/
channel.queueBind(QUEUE_5, TOPIC_EXCHANGE, QUEUE_5_ROUTING_KEY);
channel.queueBind(QUEUE_6, TOPIC_EXCHANGE, QUEUE_6_ROUTING_KEY);
// 发布消息给交换机
/**
* 参数列表:
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:交换机,如果不指定则使用默认的交换机
* routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
* props:消息属性
* body:消息体,消息内容
*/
String routingKey = "";
for (int i = 0; i < 10; i++) {
String msg = "hello world" + i;
routingKey = i % 2 == 0 ? "msg.queue_5" : "msg.queue_6";
channel.basicPublish(TOPIC_EXCHANGE, routingKey, null, msg.getBytes());
System.out.println("Send msg " + msg + " to " + TOPIC_EXCHANGE);
}
channel.basicPublish(TOPIC_EXCHANGE, "msg.queue_5.queue_6", null, "hello world to all queues".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
}
}
通配符模式对应的交换机类型为 Topic,所以该生产者程序在入门程序的基础上添加了手动定义的 Topic 类型的交换机,并且声明了 2 个队列绑定到了该交换机,绑定时的路由 key 分别为
msg.#.queue_5.#
和msg.#.queue_6.#
,之后分别发送了 5 条消息携带的路由 key 分别为msg.queue_5
和msg.queue_6
,且在最后还发送了一条消息路由 key 为msg.queue_5.queue_6
,路由 keymsg.queue_5
和msg.queue_6
分别能匹配msg.#.queue_5.#
和msg.#.queue_6.#
绑定的队列,而msg.queue_5.queue_6
既能匹配到msg.#.queue_5.#
绑定的队列,也能匹配到msg.#.queue_6.#
绑定的队列,所以两个队列都应该接收到 6 条消息。
执行该程序,查看会发现两个队列都接受到了消息:
Header 转发器模式
Header 模式与 Routing 模式类似,不同的地方在于 Header 模式不使用路由 key,而是使用 Header 中的 key/value(键值对)来匹配队列。
示例
import com.rabbitmq.client.*;
import java.util.HashMap;
/**
* RabbitMQ Header 模式-生产者
*/
public class Producer05 {
private static final String HEADER_EXCHANGE = "test_header_exchange";
private static final String QUEUE_7 = "queue_7";
private static final String QUEUE_8 = "queue_8";
/**
* queue_5 使用的路由 key
*/
private static final String QUEUE_7_ROUTING_KEY = "queue_7_key";
/**
* queue_6 使用的路由 key
*/
private static final String QUEUE_8_ROUTING_KEY = "queue_8_key";
public static void main(String[] args) throws Exception {
// 通过连接工厂创建和 mq 服务的连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
connectionFactory.setPort(5672); // mq 服务暴露的端口
connectionFactory.setUsername("guest"); // 用户名
connectionFactory.setPassword("guest"); // 密码
connectionFactory.setVirtualHost("/"); // 设置虚拟机路径
// 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
Connection connection = null;
Channel channel = null;
try {
// 创建新连接
connection = connectionFactory.newConnection();
// 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
channel = connection.createChannel();
// 声明队列,如果队列不存在,则创建
/**
* 参数列表:
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue:队列名称
* durable:是否持久化,mq 重启后队列依旧存在
* exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
* autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
* arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
*/
channel.queueDeclare(QUEUE_7, true, false, false, null);
channel.queueDeclare(QUEUE_8, true, false, false, null);
// 声明交换机
/**
* 参数列表:
* String exchange, String type
* exchange:交换机名称
* type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
*/
channel.exchangeDeclare(HEADER_EXCHANGE, BuiltinExchangeType.HEADERS);
// 绑定交换机和队列
/**
* 参数列表:
* String queue, String exchange, String routingKey, Map<String, Object> arguments
* queue:队列名称
* exchange:交换机名称
* routingKey:路由 key
* arguments:Headers 参数
*/
HashMap<String, Object> queue7Map = new HashMap<>();
queue7Map.put("routing_key",QUEUE_7_ROUTING_KEY);
HashMap<String, Object> queue8Map = new HashMap<>();
queue8Map.put("routing_key",QUEUE_8_ROUTING_KEY);
channel.queueBind(QUEUE_7, HEADER_EXCHANGE, "",queue7Map);
channel.queueBind(QUEUE_8, HEADER_EXCHANGE, "",queue8Map);
// 发布消息给交换机
/**
* 参数列表:
* String exchange, String routingKey, BasicProperties props, byte[] body
* exchange:交换机,如果不指定则使用默认的交换机
* routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
* props:消息属性
* body:消息体,消息内容
*/
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.headers(queue7Map);
channel.basicPublish(HEADER_EXCHANGE, "", builder.build(), "hello queue_7".getBytes());
builder.headers(queue8Map);
channel.basicPublish(HEADER_EXCHANGE, "", builder.build(), "hello queue_8".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null) channel.close();
if (connection != null) connection.close();
}
}
}
通配符模式对应的交换机类型为 Headers,所以该生产者程序在入门程序的基础上添加了手动定义的 Headers 类型的交换机,并且声明了 2 个队列绑定到了该交换机。
绑定时没有设置路由,而是设置了 map 属性,之后发送消息时也指定一个 map,交换机会根据 map 的键值匹配并将消息转发到对应的消息队列。
执行该程序,查看会发现两个队列分别接受到了 1 条消息:
RPC 远程过程调用模式
RPC 即客户端远程调用服务端的方法 ,使用 MQ 可以实现 RPC 的异步调用,基于 Direct 交换机实现,流程如下:
- 客户端既是生产者就是消费者,向 RPC 请求队列发送 RPC 调用消息,同时监听 RPC 响应队列;
- 服务端监听 RPC 请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果;
- 服务端将 RPC 方法的结果发送到 RPC 响应队列;
- 客户端(RPC 调用方)监听 RPC 响应队列,接收到 RPC 调用结果;
评论区