引入

BLER目前有两个业务应用到了邮件发送的功能:

  • 新用户注册时的邮箱验证
  • 企业处理简历时自动向应聘者发送的邮件

例如,在用户注册模块,使用了下面的代码简单地发送了一个邮件:

1
2
3
4
5
6
7
8
9
10
11
@Override
@Transactional(rollbackFor = Exception.class)
public int sendEmailToUser(User user,String identifyCode) {
try {
SendMail sendMail=new SendMail();
sendMail.sendMail(user.getUEmail(), "伯乐网提醒,亲爱的用户 ["+user.getUName()+"] ,您的邮箱验证码是:[" + identifyCode + "]\n您正在通过您的邮箱重置登录密码,如非本人操作,请忽略该短信!");
return ReturnValues.SUCCESS;
}catch (Exception e){
return ReturnValues.FAILED;
}
}

其中的邮箱发送工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 实现邮箱发送的工具类
*
* @author AaronWu
*/
public class SendMail {
/*发件人的邮箱账号
public static String sendEmailAccount = "xxx@qq.com";
/**发件人的邮箱的授权码(自己在邮箱服务器中开启并设置)*/
public static String sendEmailPassword = "xxx";

/**发件人邮箱的SMTP服务器地址*/
public static String sendEmailSMTPHost = "smtp.qq.com";
/**收件人的邮箱账号*/
public static String receiveMailAccount = "";


/**
* 把发送邮件封装为函数,参数为收件人的邮箱账号和要发送的内容
* @param receiveMailAccount 收邮件的人的邮件
* @param mailContent 邮件内容
*/
public void sendMail(String receiveMailAccount, String mailContent) {
// 创建用于连接邮件服务器的参数配置
Properties props = new Properties();
// 设置使用SMTP协议
props.setProperty("mail.transport.protocol", "smtp");
// 设置发件人的SMTP服务器地址
props.setProperty("mail.smtp.host", sendEmailSMTPHost);
// 设置需要验证
props.setProperty("mail.smtp.auth", "true");

// 根据配置创建会话对象, 用于和邮件服务器交互
Session session = Session.getInstance(props);
// 设置debug模式,便于查看发送过程所产生的日志
session.setDebug(true);

try {
// 创建一封邮件
MimeMessage message = createMimeMessage(session, sendEmailAccount, receiveMailAccount, mailContent);

// 根据 Session 获取邮件传输对象
Transport transport = session.getTransport();

transport.connect(sendEmailAccount, sendEmailPassword);

// 发送邮件, 发到所有的收件地址, 通过message.getAllRecipients() 可以获取到在创建邮件对象时添加的所有收件人
transport.sendMessage(message, message.getAllRecipients());

// 关闭连接
transport.close();
} catch (Exception e) {
e.printStackTrace();
}
}

/**
*
* @param session 和服务器交互的会话
* @param sendMail 发件人邮箱
* @param receiveMail 收件人邮箱
* @return
* @throws Exception
*/
public static MimeMessage createMimeMessage(Session session, String sendMail, String receiveMail,
String mailContent) throws Exception {
// 创建一封邮件
MimeMessage message = new MimeMessage(session);
// 设置发件人姓名和编码格式
message.setFrom(new InternetAddress(sendMail, "伯乐网-邮箱验证", "UTF-8"));
// 收件人
message.setRecipient(MimeMessage.RecipientType.TO, new InternetAddress(receiveMail, "尊敬的用户", "UTF-8"));
// 设置邮件主题
message.setSubject("找回密码提醒", "UTF-8");
// 设置邮件正文
message.setContent(mailContent, "text/html;charset=UTF-8");
// 设置发件时间
message.setSentDate(new Date());
// 保存设置
message.saveChanges();
return message;
}
}

然而,这种方式在多个线程并发执行邮件发送功能时,可能存在许多问题:

  1. 性能瓶颈: SMTP服务器通常有限制同一时间内来自同一IP的并发连接数或者发送速率。大量并发线程可能超出SMTP服务器允许的并发处理能力,导致部分连接被拒绝或延迟响应。
  2. 网络带宽限制: 大量并发发送邮件会消耗大量的网络带宽,尤其是在高并发场景下,可能导致网络拥塞,降低整体系统性能。
  3. 邮箱服务商反垃圾邮件策略: 部分邮件服务商会对短时间内从同一IP地址发送大量邮件的行为进行监控,并可能触发反垃圾邮件机制,从而将您的服务器IP标记为可疑或黑名单,影响正常邮件送达。

为了解决上面的问题,可以使用消息队列中间件RabbitMQ实现异步的处理,来解决上述的问题。

RabbitMQ简介

RabbitMQ是最受欢迎的开源消息中间件之一,在全球范围内被广泛应用。RabbitMQ是轻量级且易于部署的,能支持多种消息协议。RabbitMQ可以部署在分布式系统中,以满足大规模、高可用的要求。

RabbitMQ的5种消息模式

简单模式

简单模式包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。

在我们目前讨论的场景(邮件异步发送)中,就是应用了这种工作模式。

代码实现:

1.创建配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class SimpleRabbitConfig {

@Bean
// 名为simple.hello的队列
public Queue hello() {
return new Queue("simple.hello");
}

@Bean
//生产者
public SimpleSender simpleSender(){
return new SimpleSender();
}

@Bean
//消费者
public SimpleReceiver simpleReceiver(){
return new SimpleReceiver();
}

}

2.创建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SimpleSender {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSender.class);

@Autowired
private RabbitTemplate template;

private static final String queueName="simple.hello";

public void send() {
String message = "Hello World!";
this.template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}

}

3.创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
//指定其绑定的队列
@RabbitListener(queues = "simple.hello")
public class SimpleReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

@RabbitHandler
public void receive(String in) {
LOGGER.info(" [x] Received '{}'", in);
}

}

4.在需要执行功能的地方,直接调用生产者的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Controller
@Api(tags = "RabbitController")
@Tag(name = "RabbitController", description = "RabbitMQ功能测试")
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private SimpleSender simpleSender;

@ApiOperation("简单模式")
@RequestMapping(value = "/simple", method = RequestMethod.GET)
@ResponseBody
public CommonResult simpleTest() {
for(int i=0;i<10;i++){
simpleSender.send();
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

工作模式

工作模式是指向多个互相竞争的消费者发送消息的模式,它包含一个生产者、多消费者和一个队列。两个消费者同时绑定到一个队列上去,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。

代码实现:
1.创建配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Configuration
public class WorkRabbitConfig {

@Bean
//名为work.hello的队列
public Queue workQueue() {
return new Queue("work.hello");
}

@Bean
//第一个消费者
public WorkReceiver workReceiver1() {
return new WorkReceiver(1);
}

@Bean
//第二个消费者
public WorkReceiver workReceiver2() {
return new WorkReceiver(2);
}

@Bean
//生产者
public WorkSender workSender() {
return new WorkSender();
}

}

2.定义生产者,生产者通过send方法向队列work.hello中发送消息,消息中包含一定数量的.号;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WorkSender {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

@Autowired
private RabbitTemplate template;

private static final String queueName = "work.hello";

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3+1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index+1);
String message = builder.toString();
template.convertAndSend(queueName, message);
LOGGER.info(" [x] Sent '{}'", message);
}

}

3.定义消费者,两个消费者从队列work.hello中获取消息,名称分别为instance 1instance 2,消息中包含.号越多,耗时越长;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@RabbitListener(queues = "work.hello")
public class WorkReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkReceiver.class);

private final int instance;

public WorkReceiver(int i) {
this.instance = i;
}

@RabbitHandler
public void receive(String in) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", this.instance, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", this.instance, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}

}

4.调用生产者方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Controller
@Api(tags = "RabbitController")
@Tag(name = "RabbitController", description = "RabbitMQ功能测试")
@RequestMapping("/rabbit")
public class RabbitController {

@Autowired
private WorkSender workSender;

@ApiOperation("工作模式")
@RequestMapping(value = "/work", method = RequestMethod.GET)
@ResponseBody
public CommonResult workTest() {
for(int i=0;i<10;i++){
workSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

发布/订阅模式

发布/订阅模式是指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、多消费者、多队列和一个交换机。消费者同时绑定到不同的队列上去,队列绑定到交换机上去,生产者通过发送消息到交换机,所有消费者接收并消费消息。

代码实现

1.创建配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Configuration
public class FanoutRabbitConfig {

@Bean
//交换机
public FanoutExchange fanout() {
return new FanoutExchange("exchange.fanout");
}

@Bean
//第一个队列
public Queue fanoutQueue1() {
return new AnonymousQueue();
}

@Bean
//第二个队列
public Queue fanoutQueue2() {
return new AnonymousQueue();
}

@Bean
//绑定队列与交换机
public Binding fanoutBinding1(FanoutExchange fanout, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanout);
}

@Bean
//绑定队列与交换机
public Binding fanoutBinding2(FanoutExchange fanout, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanout);
}

@Bean
//定义消费者1
public FanoutReceiver fanoutReceiver() {
return new FanoutReceiver();
}

@Bean
//定义消费者2
public FanoutSender fanoutSender() {
return new FanoutSender();
}

}

2.创建生产者,注意这次发送的对象是交换机而不是队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class FanoutSender {
private static final Logger LOGGER = LoggerFactory.getLogger(FanoutSender.class);
@Autowired
private RabbitTemplate template;

private static final String exchangeName = "exchange.fanout";

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello");
int limitIndex = index % 3 + 1;
for (int i = 0; i < limitIndex; i++) {
builder.append('.');
}
builder.append(index + 1);
String message = builder.toString();
template.convertAndSend(exchangeName, "", message);
LOGGER.info(" [x] Sent '{}'", message);
}

}

3.定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class FanoutReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(FanoutReceiver.class);

@RabbitListener(queues = "#{fanoutQueue1.name}")
public void receive1(String in) {
receive(in, 1);
}

@RabbitListener(queues = "#{fanoutQueue2.name}")
public void receive2(String in) {
receive(in, 2);
}

private void receive(String in, int receiver) {
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in) {
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}

}

4.调用生产者方法,不再赘述

路由模式

路由模式是可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不同队列,队列绑定的消费者接收并消费消息。

代码实现

1.创建配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Configuration
public class DirectRabbitConfig {

@Bean
//交换机
public DirectExchange direct() {
return new DirectExchange("exchange.direct");
}

@Bean
//第一个队列
public Queue directQueue1() {
return new AnonymousQueue();
}

@Bean
//第二个队列
public Queue directQueue2() {
return new AnonymousQueue();
}

@Bean
//绑定第一个队列与交换机,其路由键为orange
public Binding directBinding1a(DirectExchange direct, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(direct).with("orange");
}

@Bean
//绑定第二个队列与交换机,其路由键为green
public Binding directBinding2a(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("green");
}

@Bean
//绑定第二个队列与交换机,其路由键为black
public Binding directBinding2b(DirectExchange direct, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(direct).with("black");
}

@Bean
//消费者1
public DirectReceiver receiver() {
return new DirectReceiver();
}


@Bean
//消费者2
public DirectSender directSender() {
return new DirectSender();
}

}

2.定义生产者,生产者通过send方法向交换机exchange.direct中发送消息,发送时使用不同的路由键,根据路由键会被转发到不同的队列;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class DirectSender {

@Autowired
private RabbitTemplate template;

private static final String exchangeName = "exchange.direct";

private final String[] keys = {"orange", "black", "green"};

private static final Logger LOGGER = LoggerFactory.getLogger(DirectSender.class);

public void send(int index) {
StringBuilder builder = new StringBuilder("Hello to ");
int limitIndex = index % 3;
String key = keys[limitIndex];
builder.append(key).append(' ');
builder.append(index+1);
for (int i = 0; i <= limitIndex; i++) {
builder.append('.');
}
String message = builder.toString();
template.convertAndSend(exchangeName, key, message);
LOGGER.info(" [x] Sent '{}'", message);
}

}

3.定义消费者,消费者从自己绑定的匿名队列中获取消息,由于该消费者可以从两个队列中获取并消费消息,可以看做两个消费者,名称分别为instance 1instance 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class DirectReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectReceiver.class);

@RabbitListener(queues = "#{directQueue1.name}")
public void receive1(String in){
receive(in, 1);
}

@RabbitListener(queues = "#{directQueue2.name}")
public void receive2(String in){
receive(in, 2);
}

private void receive(String in, int receiver){
StopWatch watch = new StopWatch();
watch.start();
LOGGER.info("instance {} [x] Received '{}'", receiver, in);
doWork(in);
watch.stop();
LOGGER.info("instance {} [x] Done in {}s", receiver, watch.getTotalTimeSeconds());
}

private void doWork(String in){
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}

}

4.调用生产者方法

通配符模式

通配符模式是可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键匹配规则绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键匹配规则转发到不同队列,队列绑定的消费者接收并消费消息。

与发布/订阅模式相比,通配符模式更加灵活自由了,其区别仅在于配置类中绑定方法的路由键设置,可以使用通配符。

代码实现

配置文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Configuration
public class TopicRabbitConfig {

@Bean
public TopicExchange topic() {
return new TopicExchange("exchange.topic");
}

@Bean
public Queue topicQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue topicQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding topicBinding1a(TopicExchange topic, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");
}

@Bean
public Binding topicBinding2a(TopicExchange topic, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topic).with("*.*.rabbit");
}

@Bean
public Binding topicBinding2b(TopicExchange topic, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");
}

@Bean
public TopicReceiver topicReceiver() {
return new TopicReceiver();
}

@Bean
public TopicSender topicSender() {
return new TopicSender();
}

}

RabbitMQ的应用

1.配置依赖

  • pom.xml中添加AMQP相关依赖;
1
2
3
4
5
<!--Spring AMQP相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 修改application.yml文件,在spring节点下添加RabbitMQ相关配置。
1
2
3
4
5
6
7
8
9
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /bler
username: bler
password: bler
publisher-returns: true #消息发送到队列确认
publisher-confirm-type: simple #消息发送到交换器确认

2.实现简单模式的消息队列

定义配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class SimpleRabbitConfig {

@Bean
// 名为simple.email-queue的队列
public Queue emailQueue() {
return new Queue("simple.email-queue");
}

@Bean
//生产者
public SimpleSender simpleSender(){
return new SimpleSender();
}

@Bean
//消费者
public SimpleReceiver simpleReceiver(){
return new SimpleReceiver();
}

}

定义生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Service
public class SimpleSender {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkSender.class);

@Autowired
private RabbitTemplate template;

private static final String queueName = "simple.mail-queue";

public void send(User user,String identifyCode) {
template.convertAndSend(queueName,EmailTask(user, identifyCode));
}
}

定义消费者

在这里实际进行邮件发送操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Component
public class SimpleReceiver {

private static final Logger LOGGER = LoggerFactory.getLogger(SimpleReceiver.class);

@RabbitListener(queues = "simple.email-queue")
public void receive(@Payload EmailTask emailTask) {
try {
User user = emailTask.getUser();
String identifyCode = emailTask.getIdentifyCode();

SendMail sendMail = new SendMail();
sendMail.sendMail(user.getUEmail(), "伯乐网提醒,亲爱的用户 [" + user.getUName() + "] ,您的邮箱验证码是:[" + identifyCode + "]");
} catch (Exception e) {
LOGGER.error("Failed to send email", e);
// 可能需要实现重试、死信队列等错误处理机制
}
}
}

// 创建EmailTask类
public class EmailTask {
private User user;
private String identifyCode;

// 构造函数、getters和setters...
}

修改Service中的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class YourUserService {

@Autowired
private SimpleSender simpleSender;

@Override
@Transactional(rollbackFor = Exception.class)
public boolean sendEmailToUser(User user, String identifyCode) {
try {
simpleSender.send(user, identifyCode);
return true; // 或根据实际情况返回成功标识
} catch (Exception e) {
LOGGER.error("Failed to send email via message queue", e);
return false;
}
}
}

总结

通过上面实现的异步邮件发送系统,避免了多线程并发情况下邮件发送功能可能出现的问题。

不过,这样的设计会导致并发的压力给到了下面这一行代码:

1
template.convertAndSend(queueName,EmailTask(user, identifyCode));

这会不会导致类似的问题呢?

经过查阅资料,得到了下面的答案:

  1. 消息队列的并发控制: 当多个线程同时调用 convertAndSend() 方法将邮件任务放入RabbitMQ队列时,RabbitMQ服务器会确保每个消息的入队操作是线程安全的,不会因为并发而丢失或破坏消息。

  2. 生产者端的并发处理: 在生产者(即您的服务)这一侧,即使有多个线程并发调用 convertAndSend(),它们实际上是并行地将任务推送到同一个队列中,而不是真正并发执行邮件发送动作。因此,在生产者端并没有实际意义上的邮件并发发送问题。

    也就是说,对于请求提交和消息入队列的操作,RabbitMQ是允许并行进行的,而实际的插入队列操作,则是串行地按顺序逐一进行的。这保证了线程安全性。

  3. 消费者端的任务处理: 并发问题更多可能出现在消费者端。然而,RabbitMQ通过其内部机制(如prefetch count设置等)可以实现对消费者的并发消费进行控制,例如确保同一时刻只有一个邮件任务被单个消费者消费和处理。这样就避免了邮件的实际并发发送,同时也实现了负载均衡。

所以,尽管convertAndSend()方法在生产者端可能存在并发调用,但在使用RabbitMQ工作队列模式下,我们通过队列的机制有效管理和控制了任务的并发处理,并且避免了直接并发发送邮件可能导致的各种问题。