跳至主要內容

SpringBoot集成RabbitMq

xw大约 3 分钟mqmq

1、添加Maven依赖

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、添加配置文件

spring.rabbitmq.port=5672
spring.rabbitmq.host=192.168.0.199
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin

Exchange几种模式介绍

Fanout模式

fanoutExchange模式是广播模式,所有绑定交换机的队列都收到通知,具体的实现看代码。

package club.xwzzy.springbootrabbitmq.fanoutExchange;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author by xw
 * @Description TODO
 */
@Component
public class FanoutRabbitConfig {
    
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

  
    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

   
    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }

 
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

  
    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    
    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

  
    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
}
@Component
@RabbitListener(queues = {"fanout.A","fanout.B","fanout.C"})
public class FanoutReceiver {
    @RabbitHandler
    public void process(String message) {
        System.out.println("fanout Receiver  : " + message);
    }
}

@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }

}

测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootRabbitmqApplication.class)
public class FanoutTest {


    @Autowired
    private FanoutSender fanoutSender;

    @Test
    public void test() throws InterruptedException {
        fanoutSender.send();
        TimeUnit.SECONDS.sleep(100);
    }
}

HeadersExchange

这种是在头部加上信息去匹配,可以选择部分匹配或者全部匹配,只有匹配通过后才会将消息转发值队列中。具体实现看代码。

/**
 * @author by xw
 * @Description TODO
 */
@Component
public class HeadersConfig {

    @Bean
    public Queue testQueue() {
        return new Queue("test.headers.queue");
    }

    @Bean
    public HeadersExchange headersExchange1() {
        return new HeadersExchange("headersExchange1");
    }

    @Bean
    public HeadersExchange headersExchange2() {
        return new HeadersExchange("headersExchange2");
    }

    @Bean
    public Binding bindingHeadersExchange1(Queue testQueue,HeadersExchange headersExchange1)
    {
        Map<String,Object> map = new HashMap<>();
        map.put("type","test");
        map.put("type2","全部匹配");
        return BindingBuilder.bind(testQueue).to(headersExchange1).whereAll(map).match();
    }
}

@Component
@RabbitListener(queues={"test.headers.queue"})
public class HeadersReceiver {

    @RabbitHandler
    public void receiver(String msg)
    {
        System.out.println("接受消息:"+msg);

    }
}

@Component
public class HeadersSend {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendTest(Map<String,Object> head,String msg)
    {
        System.out.println("send message" + msg);
        rabbitTemplate.convertAndSend("headersExchange1","test.headers.queue",getMessage(head,msg));
    }

    private Message getMessage(Map<String, Object> head, Object msg){
        MessageProperties messageProperties = new MessageProperties();
        for (Map.Entry<String, Object> entry : head.entrySet()) {
            messageProperties.setHeader(entry.getKey(), entry.getValue());
        }
        MessageConverter messageConverter = new SimpleMessageConverter();
        return messageConverter.toMessage(msg, messageProperties);
    }
}


测试:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootRabbitmqApplication.class)
public class HeadersTest {

    @Autowired
    private HeadersSend headersSend;

    @Test
    public void test() throws InterruptedException {
        Map<String,Object> map = new HashMap<>();
        map.put("type","test");
        map.put("type1","test");
        headersSend.sendTest(map,"hello world");
        TimeUnit.SECONDS.sleep(10);

    }
}

TopicExchange

topicExchange模式会将消息转发到指定的队列。其中 #代表一个或多个字符,*代表一个字符。具体实现看代码。

@Configuration
public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";
    // 创建队列
    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }
    // 创建队列
    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    /**
     * 将队列绑定到交换机
     * @return
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }


    /**
     *  说明 #匹配多个字符 *匹配一个字符
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

@Component
@RabbitListener(queues = {"topic.message","topic.messages"})
public class TopicReceiver {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver  : " + message);
    }
}
@Component
public class TopicSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;


    public void send1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }



}

测试:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootRabbitmqApplication.class)
public class TopicTest {

    @Autowired
    private TopicSender topicSender;

    @Test
    public void send1() throws InterruptedException {
        topicSender.send1();
        Thread.sleep(30000);
    }
    @Test
    public void send2() throws InterruptedException {
        topicSender.send2();
        Thread.sleep(30000);
    }

}