案例介绍
使用mq发送邮件的优点在于:
- 能实现异步处理,提高系统的并发性和相应速度
- 更加灵活,只需要一个邮件系统就能和其他系统共用
- 能够确保消息可靠,提供了消息持久化消息确认机制等特性
这里我们以用户注册后需要同时发送邮件和短信这个场景做为示例,流程图如下所示。

不介绍rabbitMQ的基础信息了,直接进入代码环节。
案例实操
生产者(注册系统)
pom文件中引入相关依赖
org.springframework.boot spring-boot-starter-amqp
application.yml文件配置
# rabbitmq spring: rabbitmq: port: 5672 host: localhost username: guest password: guest virtual-host: / publisher-returns: true #开启生产者手动确认 publisher-confirm-type: correlated #消息确认类型
做完这些就已经成功将rabbitMQ引入到Springboot中了,接下来是生产者中的配置类,这里使用的推送方式是topic
topic交换器是指按照正则表达式模糊匹配:用消息的Routing Key与 Exchange和Queue 之间的BindingKey进行模糊匹配,如果匹配成功,将消息分发到该Queue。 Routing Key是一个句点号“. ”分隔的字符串(我们将被句点号“.”分隔开的每一段独立的字符串称为一个单词)。Binding Key与Routing Key一样也是句点号“. ”分隔的字符串。Binding
Key中可以存在两种特殊字符“ * ”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)
import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class LoginRabbitConfig { private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机"; // 声明交换机 topic @Bean public TopicExchange topicExchange() { // 是否持久化、是否自动删除 return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false); } }
模拟一下注册的接口,因为注册后需要发送邮件和短信提醒用户,如果按照平时的顺序调用不仅耗时长,并且一旦邮件或短信发送失败没有进行异常处理话会导致注册失败,因此采用消息队列能够很好的解决这一问题,代码中UserPOJO只是定义的一个实体类
import com.alibaba.fastjson.JSON; import gwc.mq.pojo.UserPOJO; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; @RestController public class LoginController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/register") public String register(UserPOJO userPOJO) { Object msg = JSON.toJSONString(userPOJO); // 设置ConfirmCallback rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // 消息发送成功 System.out.println("消息发送成功, correlationData: " + correlationData.getReturnedMessage()); } else { // 消息发送失败 System.out.println("消息发送失败, cause: " + cause); } }); try { rabbitTemplate.convertAndSend("测试用Topic交换机", "*", msg, new CorrelationData(UUID.randomUUID().toString())); } catch (Exception e) { e.printStackTrace(); // 失败处理 无论出现那种情况都将错误消息存到redis中 然后用定时任务统一发送 } return "用户-" + userPOJO.getName() + "-注册成功!"; } }
在配置类中,我们打开了消息发送方的消息确认机制,因此在这里我们需要setConfirmCallback函数,其中correlationData是具体的消息,ack表示是否发送成功,cause则是失败的具体原因。
发送方发送失败的原因有三种可归为
(1)producter连接mq失败,消息没有发送到mq
(2)producter连接mq成功,但是发送到exchange失败
(3)消息发送到exchange成功,但是路由到queue失败
无论出现哪一种异常,我们都可以通过try catch来进行错误消息的处理,我采用的是捕获到错误后将消息存入db中(redis),再通过springboot的定时任务进行统一的重发,存入db代码就不再描述。
消费者(以邮件系统为例)
application.yml文件配置
#rabbitMQ spring: rabbitmq: port: 5672 host: localhost username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual #开启手动确认机制
邮件系统的配置类
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MailRabbitConfig { private static final String EXCHANGE_NAME_TOPIC = "测试用Topic交换机"; private static final String QUEUE_NAME = "email_queue"; private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机"; private static final String ERR_QUEUE_NAME = "err_email_queue"; // 声明交换机 topic @Bean public TopicExchange topicExchange() { // 是否持久化、是否自动删除 return new TopicExchange(EXCHANGE_NAME_TOPIC, true, false); } // 声明队列 @Bean public Queue queue() { // 是否持久化、是否当前连接对象独占、是否自动删除 return new Queue(QUEUE_NAME, true, false, false); } // 声明绑定关系 @Bean public Binding queueBinding(Queue queue, TopicExchange topicExchange) { return BindingBuilder.bind(queue).to(topicExchange).with("*.mail"); } // 声明死信交换机 direct @Bean public DirectExchange directExchange() { // 是否持久化、是否自动删除 return new DirectExchange(ERR_EXCHANGE_NAME_DIRECT, true, false); } // 声明死信队列 @Bean public Queue errQueue() { // 是否持久化、是否当前连接对象独占、是否自动删除 return new Queue(ERR_QUEUE_NAME, true, false, false); } // 声明绑定关系 @Bean public Binding errQueueBinding(@Qualifier("errQueue")Queue errQueue, @Qualifier("directExchange")DirectExchange directExchange) { return BindingBuilder.bind(errQueue).to(directExchange).with("err.mail"); } }
邮件发送服务进行监听
import com.alibaba.fastjson.JSON; import com.rabbitmq.client.Channel; import gwc.mq.pojo.UserPOJO; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.HashMap; import java.util.Map; // 死信队列 // 消息出现问题后则会进入死信交换机,然后进入死信队列 // 建立一个消费者根据routingkey监听死信队列 即可处理不同的死信队列中的数据 @Component public class MailListener { private static final int MAX_RETRY = 3; private static final String ERR_EXCHANGE_NAME_DIRECT = "死信交换机"; private static final String ERR_QUEUE_NAME = "err_email_queue"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private TopicExchange topicExchange; @Autowired private AmqpAdmin amqpAdmin; @RabbitListener(queues = "email_queue") public void sendMail(Message message, Channel channel) throws IOException { try { // 睡眠1秒 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } String s = new String(message.getBody()); UserPOJO userPOJO = JSON.parseObject(s, UserPOJO.class); int a = 1 / 0; // 造成异常 System.out.println("Mail系统收到 userPOJO=" + userPOJO); } catch (Exception e) { int retryCount = getRetryCount(message); System.out.println("Mail系统出现异常 当前retryCount =" + retryCount); if (retryCount两个方法分别监听业务队列和死信队列,如果消息消费出现异常,则重新将消息放入队列尾部,如果重试次数达到三次则将此消息放入TTL队列中,TTL队列中的消息会根据配置的过期时间、死信交换机、以及死信交换机上的routingkey对消息进行投送,进入相应的死信队列,然后再通过死信消费者进行消费处理,此时若再次发送失败,则发送邮件提醒人员进行手工发送来确保消息的有效性。
消费者确保消息的可靠性通过下示代码进行消息的确认
/* (1)channel.basicAck 用于肯定确认,RabbitMQ已经知道该消息并且成功地处理消息,可以将其丢弃了 (2)channel.basicNack 用于否定确认 (3)channel.basicReject 用于否定确认,与channel.basicNack相比少一个参数,不处理该消息了直接 拒绝,可以将其丢弃了 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);总结
消息发送的流程大致可化为 生产者(业务接口) -> mq -> 交换机 -> 队列 -> 消费者
生产者消息确认机制可以确保在前半部分的有效性,消费者手动确认机制可以确保在后半部分的有效性,而一旦消息连续失败多次,我们还有保底方案通过定时任务扫描DB获取失败的消息转而通过人工发送,这样就可以在全流程上确保消息的可靠性了,这里仅仅是我个人的一套保证可靠性的方案,如果有其他更为可行的方案欢迎评论区补充