主题模式
577字约2分钟
2024-12-01
- Topic类型和Direct类型相比,都是可以根据Routing key把消息路由绑定到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符。
- Routing key 一般都是由一个或者多个单词组成,多个单词之间使用 "." 分割,例如 item.name
- 通配符规则:
- #: 匹配零个或多个单词
- *: 匹配一个单词
一、生产者代码
package com.atguigu.rabbitmq.topic;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
channel.queueBind(queue2Name,exchangeName,"*.*");
// 分别发送消息到队列:order.info、goods.info、goods.error
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());
body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());
channel.close();
connection.close();
}
}
二、消费者代码
1、消费者1号
消费者1监听队列1:
package com.atguigu.rabbitmq.topic;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue1";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
2、消费者2号
消费者2监听队列2:
package com.atguigu.rabbitmq.topic;
import com.atguigu.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue2";
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
三、运行效果
队列1:

队列2:
