SpringBoot集成RabbitMq
大约 3 分钟mqmq
1、添加Maven依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
2、添加配置文件
spring.rabbitmq.port=5672
spring.rabbitmq.host=192.168.0.199
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
Exchange几种模式介绍
Fanout模式
fanoutExchange模式是广播模式,所有绑定交换机的队列都收到通知,具体的实现看代码。
package club.xwzzy.springbootrabbitmq.fanoutExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @author by xw
* @Description TODO
*/
@Component
public class FanoutRabbitConfig {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}
@Component
@RabbitListener(queues = {"fanout.A","fanout.B","fanout.C"})
public class FanoutReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver : " + message);
}
}
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}
}
测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootRabbitmqApplication.class)
public class FanoutTest {
@Autowired
private FanoutSender fanoutSender;
@Test
public void test() throws InterruptedException {
fanoutSender.send();
TimeUnit.SECONDS.sleep(100);
}
}
HeadersExchange
这种是在头部加上信息去匹配,可以选择部分匹配或者全部匹配,只有匹配通过后才会将消息转发值队列中。具体实现看代码。
/**
* @author by xw
* @Description TODO
*/
@Component
public class HeadersConfig {
@Bean
public Queue testQueue() {
return new Queue("test.headers.queue");
}
@Bean
public HeadersExchange headersExchange1() {
return new HeadersExchange("headersExchange1");
}
@Bean
public HeadersExchange headersExchange2() {
return new HeadersExchange("headersExchange2");
}
@Bean
public Binding bindingHeadersExchange1(Queue testQueue,HeadersExchange headersExchange1)
{
Map<String,Object> map = new HashMap<>();
map.put("type","test");
map.put("type2","全部匹配");
return BindingBuilder.bind(testQueue).to(headersExchange1).whereAll(map).match();
}
}
@Component
@RabbitListener(queues={"test.headers.queue"})
public class HeadersReceiver {
@RabbitHandler
public void receiver(String msg)
{
System.out.println("接受消息:"+msg);
}
}
@Component
public class HeadersSend {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendTest(Map<String,Object> head,String msg)
{
System.out.println("send message" + msg);
rabbitTemplate.convertAndSend("headersExchange1","test.headers.queue",getMessage(head,msg));
}
private Message getMessage(Map<String, Object> head, Object msg){
MessageProperties messageProperties = new MessageProperties();
for (Map.Entry<String, Object> entry : head.entrySet()) {
messageProperties.setHeader(entry.getKey(), entry.getValue());
}
MessageConverter messageConverter = new SimpleMessageConverter();
return messageConverter.toMessage(msg, messageProperties);
}
}
测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootRabbitmqApplication.class)
public class HeadersTest {
@Autowired
private HeadersSend headersSend;
@Test
public void test() throws InterruptedException {
Map<String,Object> map = new HashMap<>();
map.put("type","test");
map.put("type1","test");
headersSend.sendTest(map,"hello world");
TimeUnit.SECONDS.sleep(10);
}
}
TopicExchange
topicExchange模式会将消息转发到指定的队列。其中 #代表一个或多个字符,*代表一个字符。具体实现看代码。
@Configuration
public class TopicRabbitConfig {
final static String message = "topic.message";
final static String messages = "topic.messages";
// 创建队列
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
// 创建队列
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
/**
* 将队列绑定到交换机
* @return
*/
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
/**
* 说明 #匹配多个字符 *匹配一个字符
* @param queueMessages
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
@Component
@RabbitListener(queues = {"topic.message","topic.messages"})
public class TopicReceiver {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver : " + message);
}
}
@Component
public class TopicSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context);
}
}
测试:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootRabbitmqApplication.class)
public class TopicTest {
@Autowired
private TopicSender topicSender;
@Test
public void send1() throws InterruptedException {
topicSender.send1();
Thread.sleep(30000);
}
@Test
public void send2() throws InterruptedException {
topicSender.send2();
Thread.sleep(30000);
}
}