博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
7、RabbitMQ-主题模式
阅读量:6330 次
发布时间:2019-06-22

本文共 5306 字,大约阅读时间需要 17 分钟。

 

1、模式图

 

 

发送到主题交换的消息不能具有任意的 routing_key - 它必须是由点分隔的单词列表。
单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由键示例:“ stock.usd.nyse ”,“ nyse.vmw”,“ quick.orange.rabbit ”。路由密钥中可以包
含任意数量的单词,最多可达255个字节。
 
绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换- 使用特定路
由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是绑定键有两个重
要的特殊情况:
*(星号)可以替代一个单词。
#(hash)可以替换零个或多个单词。

 

 类型是topic

 

2、实践 

 生产者

 

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.util.ConnectionUtils;public class Send {        private static final String EXCHANGE_NAME = "exchange_topic";        public static void main(String[] args) throws IOException, TimeoutException {                Connection conn = ConnectionUtils.getConnection();                Channel channel = conn.createChannel();                //exchange        channel.exchangeDeclare(EXCHANGE_NAME, "topic");                String msg = "商品.....";                //绑定路由        String routingKey = "goods.add";        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());                channel.close();        conn.close();            }}

 

 

 消费者1:

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class Receive {        private static final String QUEUE_NAME="test_topic1";    private static final String EXCHANGE_NAME = "exchange_topic";        public static void main(String[] args) throws IOException, TimeoutException {                Connection conn = ConnectionUtils.getConnection();                Channel channel = conn.createChannel();                //队列声明        channel.queueDeclare(QUEUE_NAME, false, false, false, null);                channel.basicQos(1);                //绑定队列到交换机转发器        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.add");                //定义一个消费者                Consumer consumer = new DefaultConsumer(channel){                    //收到消息就会触发这个方法                    @Override                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)                            throws IOException {                        String msg = new String(body,"utf-8");                        System.out.println("消费者1接收到的消息" + msg);                                                try {                            Thread.sleep(1500);                        } catch (InterruptedException e) {                            e.printStackTrace();                        }finally{                            System.out.println("消费者1处理完成!");                            //手动回执                            channel.basicAck(envelope.getDeliveryTag(), false);                        }                    }                };                //监听队列                //自动应答false                boolean autoAck = false;                channel.basicConsume(QUEUE_NAME, autoAck, consumer);    }}

 

 

 消费者2:

 

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.util.ConnectionUtils;public class Receive2 {        private static final String QUEUE_NAME="test_topic2";    private static final String EXCHANGE_NAME = "exchange_topic";    public static void main(String[] args) throws IOException, TimeoutException {        Connection conn = ConnectionUtils.getConnection();        Channel channel = conn.createChannel();                //队列声明        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        channel.basicQos(1);                //绑定队列到交换机转发器        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");                        //定义一个消费者                Consumer consumer = new DefaultConsumer(channel){                    //收到消息就会触发这个方法                    @Override                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)                            throws IOException {                        String msg = new String(body,"utf-8");                        System.out.println("消费者2接收到的消息" + msg);                                                try {                            Thread.sleep(1500);                        } catch (InterruptedException e) {                            e.printStackTrace();                        }finally{                            System.out.println("消费者2处理完成!");                            //手动回执                            channel.basicAck(envelope.getDeliveryTag(), false);                        }                    }                };                //监听队列                //自动应答false                boolean autoAck = false;                channel.basicConsume(QUEUE_NAME, autoAck, consumer);    }}

 

 

此时如果生产者的String routingKey = "
goods.add";
此时2个消费者都可以收到消息
若:String routingKey = "
goods.del";
此时只有消费者2收到消息

 

转载于:https://www.cnblogs.com/Mrchengs/p/10531483.html

你可能感兴趣的文章
js作用域链
查看>>
java中如何选择Collection Class--java线程(第3版)
查看>>
ASP.NET页面之间传递值的几种方式
查看>>
Linux系统权限
查看>>
TinyTemplate模板引擎火热出炉,正式开源了~~~
查看>>
android开发之GPS定位详解
查看>>
Mac OS X如何重装 苹果电脑重装操作系统
查看>>
集算器读写EXCEL文件的代码示例
查看>>
Ubuntu Server上搭建可用于生产环境的ASP.NET服务器
查看>>
php---PHP使用GD库实现截屏
查看>>
华为交换机802.1x动态下发vlan配置
查看>>
spring boot websocket + thy模版
查看>>
查看文件的真实路径
查看>>
如何开发一个自己的 RubyGem?
查看>>
职工系统150206308
查看>>
『中级篇』K8S最小调度单位Pod(62)
查看>>
ACE网络编程思考(一)
查看>>
数据结构的几种存储方式
查看>>
React源码学习——ReactClass
查看>>
JavaScript中几个相似方法对比
查看>>