发布于 

实习笔记-RokectMQ

一、核心概念

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息Consumer 负责消费消息Broker 负责存储消息
Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。

  • NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
  • Broker:核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。
  • 生产者:生产消息的一方就是生产者
  • 生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组
  • 消费者:用来消费生产者消息的一方
  • 消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。
  • topic(主题) :可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。
  • Tag(子主题) :比topic低一级,可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。

二、工作流程

RocketMQ工作流程
RocketMQ工作流程
  • Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。
  • Producer在启动之后会跟NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。
  • Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份。
  • Consumer启动之后也会跟会NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费。

三、SpringBoot集成RocketMQ

3.1 引入依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>

3.2 yml配置

1
2
3
4
rocketmq:
producer:
group: sanyouProducer
name-server: 192.168.200.143:9876

3.3 创建消费者

只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可

1
2
3
4
5
6
7
@Component@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouTopic")
public class SanYouTopicListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("处理消息:" + msg);
}
}

@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类.

3.4 原理

  1. 在构造RocketMQTemplate的时候,传入了一个DefaultMQProducer,所以最终RocketMQTemplate发送消息也是通过DefaultMQProducer发送的。
  2. 会为每一个加了@RocketMQMessageListener注解的对象创建一个DefaultMQPushConsumer,所以最终也是通过DefaultMQPushConsumer消费消息的。
  3. 至于监听器,遍历每条消息,然后调用handleMessage,最终会调用实现了RocketMQListener的对象处理消息。

四、业务中实现

RocketMQ业务结构
RocketMQ业务结构

4.1 RocketMQClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class RocketMQClient implements IRocketMQClient {

public Producer getProducer(ClientConfiguration clientConfiguration, String... rocketMQTopic) throws ClientException {
return getProvider()
.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(rocketMQTopic)
.build();
}

public Producer getProducer(RocketMQProducerConfig rocketMQConfig) throws ClientException {
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(rocketMQConfig.getAccessKey(), rocketMQConfig.getSecretKey());
ClientConfiguration clientConfiguration = ClientConfiguration
.newBuilder()
.setEndpoints(rocketMQConfig.getEndpoint())
.setCredentialProvider(sessionCredentialsProvider)
.build();
return getProvider()
.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(rocketMQConfig.getTopic())
.build();
}

public SimpleConsumer getSimpleConsumer(RocketMQConsumerConfig rocketMQConfig) throws ClientException {
//订阅消息的过滤规则,表示订阅所有Tag的消息。
FilterExpression filterExpression = getFilterExpression(rocketMQConfig.getTag());
ClientConfiguration clientConfiguration =
getClientConfiguration(rocketMQConfig.getAccessKey(), rocketMQConfig.getSecretKey(), rocketMQConfig.getEndpoint());
SimpleConsumerBuilder simpleBuild = getSimpleConsumerBuilder(rocketMQConfig.getGroupName(),
rocketMQConfig.getTopic(), clientConfiguration, filterExpression,
Duration.ofSeconds(rocketMQConfig.getAwaitDuration()));
return simpleBuild.build();
}

private ClientServiceProvider getProvider() {
return ClientServiceProvider.loadService();
}

/**
* @param userName 用户名:accessKey
* @param password 密码:accessSecret
* @param endpoints 专有网络地址
* @return 客户端配置
*/
private ClientConfiguration getClientConfiguration(String userName, String password, String endpoints) {
SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(userName, password);
return ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.setCredentialProvider(sessionCredentialsProvider)
.build();
}

/**
* @param tag 生产时定义的tag
* @return 过滤表达式
*/
private FilterExpression getFilterExpression(String tag) {
return new FilterExpression(tag, FilterExpressionType.TAG);
}

/**
* @param group 消费分组
* @param topic 监听的topic
* @param clientConfiguration 客户端配置
* @param filterExpression 过滤表达式
* @param awaitDuration 长轮询时间
* @return SimpleConsumerBuilder
*/
private SimpleConsumerBuilder getSimpleConsumerBuilder(String group, String topic, ClientConfiguration clientConfiguration, FilterExpression filterExpression, Duration awaitDuration) {
return getProvider().newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//设置消费者分组。
.setConsumerGroup(group)
.setAwaitDuration(awaitDuration)
//设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression));

}

4.2 RocketMQProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void sendMsg(String value, String keys, String tag, Long times) {
// 加载一个服务提供者实例,提供消息构建和发送的功能
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// topic表示消息将发送到目标队列
String topic = config.getTopic();
// 将消息内容value转换成字节数组,使用utf-8编码
byte[] body = value.getBytes(StandardCharsets.UTF_8);
// 构建消息,用服务提供者提供的 newMessageBuilder() 方法来创建一个 MessageBuilder 实例。
MessageBuilder messageBuilder = provider.newMessageBuilder()
.setTopic(topic) // 设置消息的主题
.setBody(body); // 设置消息的内容
// 如果keys不为null,则设置消息的keys
if (keys != null) {
messageBuilder.setKeys(keys);
}
// 如果tag不为null,则设置消息的tag
if (tag != null) {
messageBuilder.setTag(tag);
}
// 如果times不为null,计算消息的延迟发送时间
if (times != null) {
// 当前时间 + times * 1000
long deliverTime = System.currentTimeMillis() + times * 1000;
// 设置为消息的时间戳
messageBuilder.setDeliveryTimestamp(deliverTime);
}
try {
// 尝试发送消息,通过producer.send()方法将构建好的实例传递
final SendReceipt sendReceipt = producer.send(messageBuilder.build());
} catch (Exception e) {
}
}

4.3 RocketMQConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* 使用线程池(由 executorService 提供)来并行执行某些任务,并对这些任务进行监控。
*/
public void run(Function<String, Exception> executor) {
for (int i = 0; i < rocketMQConsumerConfig.getMaxWorkerNum(); i++) {
final int num = i;
executorService.execute(() -> {
Thread.currentThread().setName(String.format(threadName, num));
monitorThread(executor);
countDownLatch.countDown();
});
}
}

/**
* 运行在一个线程中,以监控和处理从RocketMQ中接受到的消息
*/
private void monitorThread(Function<String, Exception> executor) {
while (!monitorShutDowned) {
try {
final List<MessageView> messages = simpleConsumer.receive(rocketMQConsumerConfig.getMaxMessageNum(), Duration.ofSeconds(rocketMQConsumerConfig.getInvisibleDuration()));
processMessages(executor, messages);
} catch (Exception e) {

}
}
}

/**
* 处理一系列从RocketMQ中接收到的消息
*/
private void processMessages(Function<String, Exception> executor, List<MessageView> messages) {
// 循环遍历 messages 中的每一个 MessageView 对象。
for (MessageView message : messages) {
// 首先获取当前消息的MessageId
final MessageId messageId = message.getMessageId();
// 将消息体 message.getBody()使用 UTF-8 字符集解码成字符串 body
String body = StandardCharsets.UTF_8.decode(message.getBody()).toString();
try {
// 使用 executor.apply(body) 来处理解码后的消息体
Exception exception = executor.apply(body);
// 如果没有发生异常,则调用 simpleConsumer.ack(message) 来确认消息已经被成功处理
if (null == exception) {
simpleConsumer.ack(message);
}
} catch (Throwable t) {

}
}
}

/**
* 关闭消息消费的监控线程,并终止相关的线程池
*/
public void shutdown() {
monitorShutDowned = true;
try {
simpleConsumer.close();
} catch (IOException e) {
e.printStackTrace();
}
executorService.shutdown();
if (countDownLatch != null) {
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

本站由 Cccccpg 使用 Stellar 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。