文章目录
- 1、发布确认
- 1.1、发布确认的引出
- 1.2、发布确认的策略
- 1.2.1、开启发布确认的方法
- 1.2.2、单个确认发布
- 1.2.3、批量确认发布
- 1.2.4、异步确认发布
- 1.2.5、如何处理异步未确认讯息
- 1.2.6、以上3种发布确认的速度对比
- 2、交换机
- 2.1、Exchanges
- 2.1.1、概念
- 2.1.2、型别
- 2.1.3、无名exchange
- 2.2、临时队列
- 2.3、系结(binding)
- 2.4、Fanout(发布订阅模式)
- 2.4.1、介绍
- 2.4.2、实战
- 2.5、Direct(路由模式)
- 2.5.1、介绍
- 2.5.2、实战
- 2.6、Topic
- 2.6.1、介绍
- 2.6.2、Topic的要求
- 2.6.3、Topic的匹配案例
- 2.6.4、实战
1、发布确认
1.1、发布确认的引出
一个讯息的持久化需要经历的步骤:
- 设定要求队列持久化,
- 设定要求队列中的讯息必须持久化,
- 发布确认
- 如果缺少了发布确认的话,那么讯息在磁盘上持久化之前会发生丢失,从而不能满足讯息持久化的目的,
1.2、发布确认的策略
1.2.1、开启发布确认的方法
Channel channel = RabbitmqUtil.getChannel();
//开启发布确认
channel.confirmSelect();
- 发布确认默认是没有开启的,如果需要开启需要呼叫
confirmSelect
,每当需要使用发布确认的时候,都需要呼叫该方法,
1.2.2、单个确认发布
- 单个确认发布是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个讯息之后只有它被确认发布,后续的讯息才能继续发布,
- 该确认方式主要通过
waitForConfirms
方法实作,这个方法只有在讯息被确认的时候才会回传,如果在指定时间范围内这个讯息没有被确认那么它将会抛出例外, - 这种确认方式的最大的缺点就是:发布速度特别慢,
public static void ConfirmMessageIndividually() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行单个发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("讯息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("单个确认发送" + MESSAGE_COUNT + "条讯息所消耗的时间是" + (end - begin) + "ms");
}
1.2.3、批量确认发布
- 先发布一批讯息然后一起确认,
- 缺点:当发生故障导致发布出现问题时,不知道那个讯息出现了问题,我们必须将整个批处理保存在存储器中,以记录重要的讯息而后重新发布讯息,
public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 批量处理讯息的个数
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行批量发布确认
if(i % batchSize == 0){
channel.waitForConfirms();
System.out.println("批量处理讯息成功");
}
}
long end = System.currentTimeMillis();
System.out.println("批量确认发送" + MESSAGE_COUNT + "条讯息所消耗的时间是" + (end - begin) + "ms");
}
1.2.4、异步确认发布
原理
- 异步确认发布是利用
回呼函式
来达到讯息可靠性传递的,这个中间件也是通过函式回呼来保证是否投递成功,
代码
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
ConfirmCallback ackCallback = (var1,var2)->{
System.out.println("已确认的讯息" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
System.out.println("未确认的讯息" + var1);
};
/**
* 1. 确认收到讯息的回呼函式
* 2. 未确认收到讯息的回呼函式
*/
channel.addConfirmListener(ackCallback,nackCallback);
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("异步确认发送" + MESSAGE_COUNT + "条讯息所消耗的时间是" + (end - begin) + "ms");
}
1.2.5、如何处理异步未确认讯息
- 最好的解决方案就是把未确认的讯息放到一个基于存储器的能被发布执行绪访问的队列,比如说
ConcurrentSkipListMap
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
/**
* 1. 讯息标识
* 2. 是否批量处理
*/
ConfirmCallback ackCallback = (var1,var2)->{
if(var2){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();
}else{
map.remove(var1);
}
String message = map.get(var1);
System.out.println("已确认的讯息是:" + message + " 已确认的讯息tag:" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
// 未确认的讯息
String s = map.get(var1);
System.out.println(s);
System.out.println("未确认的讯息" + var1);
};
/**
* 1. 确认收到讯息的回呼函式
* 2. 未确认收到讯息的回呼函式
*/
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 1. 将讯息保存到一个执行绪安全地队列中
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("异步确认发送" + MESSAGE_COUNT + "条讯息所消耗的时间是" + (end - begin) + "ms");
}
1.2.6、以上3种发布确认的速度对比
- 单个发布确认:同步等待确认,简单,但吞吐量非常有限,
- 批量确认发布:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪一条讯息出现了问题,
- 异步确认发布:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实作起来稍微难些,
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.xiao.utils.RabbitmqUtil;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 单个发布确认
ConfirmMessageIndividually(); // 单个确认发送1000条讯息所消耗的时间是680ms
// 批量发布确认
ConfirmMessageBatch(); //批量确认发送1000条讯息所消耗的时间是112ms
//异步发布确认
ConfirmMessageAsync(); // 异步确认发送1000条讯息所消耗的时间是41ms
// 异步确认发送1000条讯息所消耗的时间是33ms
}
public static void ConfirmMessageIndividually() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行单个发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("讯息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("单个确认发送" + MESSAGE_COUNT + "条讯息所消耗的时间是" + (end - begin) + "ms");
}
public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
// 批量处理讯息的个数
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 进行批量发布确认
if(i % batchSize == 0){
channel.waitForConfirms();
System.out.println("批量处理讯息成功");
}
}
long end = System.currentTimeMillis();
System.out.println("批量确认发送" + MESSAGE_COUNT + "条讯息所消耗的时间是" + (end - begin) + "ms");
}
public static void ConfirmMessageAsync() throws Exception{
Channel channel = RabbitmqUtil.getChannel();
String QUEUE_NAME = UUID.randomUUID().toString();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.confirmSelect();
ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
/**
* 1. 讯息标识
* 2. 是否批量处理
*/
ConfirmCallback ackCallback = (var1,var2)->{
if(var2){
ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap = map.headMap(var1);
longStringConcurrentNavigableMap.clear();
}else{
map.remove(var1);
}
String message = map.get(var1);
System.out.println("已确认的讯息是:" + message + " 已确认的讯息tag:" + var1);
};
ConfirmCallback nackCallback = (var1,var2)->{
// 未确认的讯息
String s = map.get(var1);
System.out.println(s);
System.out.println("未确认的讯息" + var1);
};
/**
* 1. 确认收到讯息的回呼函式
* 2. 未确认收到讯息的回呼函式
*/
channel.addConfirmListener(ackCallback,nackCallback);
long begin = System.currentTimeMillis();
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
// 1. 将讯息保存到一个执行绪安全地队列中
map.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("异步确认发送" + MESSAGE_COUNT + "条讯息所消耗的时间是" + (end - begin) + "ms");
}
}
2、交换机
- 在这一部分中,我们将做一些完全不同的事情 – 我们将讯息传达给多个消费者,这种模式成为“发布/订阅模式”,这里需要使用到交换机,
2.1、Exchanges
2.1.1、概念
- RabbitMQ讯息传递模型的核心思想是:生产者生产的讯息从不会直接发送到队列,
- 相反,生产者只能将讯息发送到交换机
标签:
发表评论
您的电子邮件地址不会被公开。 必填的字段已做标记 *
0 评论