侧边栏壁纸
博主头像
张种恩的技术小栈博主等级

行动起来,活在当下

  • 累计撰写 747 篇文章
  • 累计创建 65 个标签
  • 累计收到 39 条评论

目 录CONTENT

文章目录

SpringBoot(31)之整合RabbitMQ

zze
zze
2018-08-07 / 0 评论 / 0 点赞 / 695 阅读 / 9373 字

不定期更新相关视频,抖音点击左上角加号后扫一扫右方侧边栏二维码关注我~正在更新《Shell其实很简单》系列

准备

1、使用 Maven 新建 SpringBoot 项目,引入 Rabbit 、Web 场景启动器。

2、配置 RabbitMQ 连接信息:

# application.properties
spring.rabbitmq.host=192.168.202.136
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

3、注解配置启用 RabbitMQ:

// com.springboot.config.MyAmqpConfig
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit // 启用 Rabbit
public class MyAmqpConfig {

}

4、新建测试 JavaBean:

// com.springboot.bean.User
import java.io.Serializable;
import java.util.Date;

public class User implements Serializable {

    private Integer id;
    private String name;
    private Date birthday;
    private String city;

    public User() {
    }

    public User(Integer id, String name, Date birthday, String city) {
        this.id = id;
        this.name = name;
        this.birthday = birthday;
        this.city = city;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Date getBirthday() {
        return birthday;
    }

    public void setBirthday(Date birthday) {
        this.birthday = birthday;
    }

    public String getCity() {
        return city;
    }

    public void setCity(String city) {
        this.city = city;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", birthday=" + birthday +
                ", city='" + city + '\'' +
                '}';
    }
}

RabbitTemplate 使用

下面通过 RabbitTemplate 来完成上篇文章 RabbitMQ 在可视化界面中的几个测试操作:

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTests {

    // org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration 自动配置类中注册了 RabbitTemplate 的 bean
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1(){
        // 通过 direct 交换器给 “张三.msg” 队列发送消息

        // send 方法的 message 参数中需要自己定义消息头和消息体
        // rabbitTemplate.send(exchange,routingkey,message);

        rabbitTemplate.convertAndSend("my.direct","zhangsan.msg","你好 张三");
    }

    @Test
    public void test2(){
        // 接收 “张三.msg” 队列的消息
        Object o = rabbitTemplate.receiveAndConvert("张三.msg");
        System.out.println(o.toString());

        /*
        你好 张三
         */
    }

    @Test
    public void test3(){
        // 通过 fanout 交换器给所有队列发送消息

        rabbitTemplate.convertAndSend("my.fanout", "zhangsan.msg", "大家好");
    }

    @Test
    public void test4(){
        // 所有队列接收消息
        Object msg1 = rabbitTemplate.receiveAndConvert("张三.msg");
        System.out.println(msg1.toString());
        Object msg2 = rabbitTemplate.receiveAndConvert("张四.msg");
        System.out.println(msg2.toString());
        Object msg3 = rabbitTemplate.receiveAndConvert("李三.msg");
        System.out.println(msg3.toString());
        Object msg4 = rabbitTemplate.receiveAndConvert("李四.msg");
        System.out.println(msg4.toString());

        /*
        大家好
        大家好
        大家好
        大家好
         */
    }

    @Test
    public void test5(){
        // 通过 topic 交换器给所有“姓张”的队列发送消息
        rabbitTemplate.convertAndSend("my.topic", "zhang.hello", "张先生 你好");
    }

    @Test
    public void test6(){
        // 所有“姓张”的队列接收消息
        Object msg1 = rabbitTemplate.receiveAndConvert("张三.msg");
        Object msg2 = rabbitTemplate.receiveAndConvert("张四.msg");

        System.out.println(msg1);
        System.out.println(msg2);

        /*
        张先生 你好
        张先生 你好
         */
    }
}

在上述的操作中操作的都是字符串,而通过 RabbitTemplate 是可以直接操作对象的,RabbitTemplate 内部的 Converter 会自动帮我们完成对象的序列化与反序列化:

import com.springboot.bean.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.text.ParseException;
import java.text.SimpleDateFormat;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTemplateTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test1() throws ParseException {
        // 直接发送一个对象
        User user = new User(1, "张三", new SimpleDateFormat("yyyy-MM-dd").parse("1998-6-5"), "深圳");
        rabbitTemplate.convertAndSend("my.direct","zhangsan.msg",user);
     
    }

    @Test
    public void test2(){
        Object o = rabbitTemplate.receiveAndConvert("张三.msg");
        System.out.println(o.getClass());
        System.out.println(o);
        
        /*
        class com.springboot.bean.User
        User{id=1, name='张三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}
         */
        
        // 根据输出结果可以看到,获取的消息自动完成了反序列化转换为 java 对象
    }
}

查看 RabbitMQ 服务器中存储的对象,会发现存储的值为 RabbitMQ 以默认消息转换器 org.springframework.amqp.support.converter.SimpleMessageConverter 序列化后的值,如果我们需要存储的消息为 Json 格式,只需要自己注册一个 Json 格式消息转换器到容器即可,而 Spring 已经给我们提供了这个转换器:

// com.springboot.config.MyAmqpConfig
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class MyAmqpConfig {
    @Bean
    public MessageConverter messageConverter(){
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        return jackson2JsonMessageConverter;
    }
}

此时再次执行上述操作,查看服务器中存储消息:

image.png

监听队列注解

Spring 也为我们提供了监听队列支持的注解 @RabbitListener,它能够帮我们很简便的创建一个监听服务,只需要标注在一个存放在 IoC 容器中实例的方法上。看如下示例:

1、创建一个服务类,注册到 IoC 容器,使用 @RabbitListener 注解标注在方法上:

// com.springboot.service.UserService
import com.springboot.bean.User;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class UserService {

    @RabbitListener(queues = {"张三.msg"})  // 监听指定队列消息
    public void receiveUserMsg(User user) {
        // 接收自动反序列化后的对象
        System.out.println(user);
    }

    @RabbitListener(queues = {"李四.msg"})
    public void receiveMessage(Message message){
        // 接收源消息信息
        
        // 获得消息体
        System.out.println(message.getBody());
        // 获得消息属性信息
        System.out.println(message.getMessageProperties());
    }
}

2、启动程序,运行单元测试中发送 User 对象方法,监听程序输出如下:

User{id=1, name='张三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}

AmqpAdmin 组件

Spring 自动注册了一个 AmqpAdmin 组件,它的作用类似于数据库中的 DDL 语句,可以用来帮我们定义(创建)交换器、队列。如下:

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpAdminTests {

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Test
    public void testDeclareExchange(){
        // 创建一个交换器
        Exchange exchange = new DirectExchange("my.directNew");
        amqpAdmin.declareExchange(exchange);
                  

    }

    @Test
    public void testDeclareQueue(){
        // 创建 Queue
        Queue queue = new Queue("myQueue");
        amqpAdmin.declareQueue(queue);
                  

    }

    @Test
    public void testBinding(){
        // 创建一个 binding ,绑定交换器与队列
        amqpAdmin.declareBinding(new Binding("myQueue", Binding.DestinationType.QUEUE,"my.directNew","myQueue",null));
                  

    }
}
0

评论区