实习笔记-RokectMQ
一、核心概念
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。
Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。
- NameServer:可以理解为是一个注册中心,主要是用来保存topic路由信息,管理Broker。在NameServer的集群中,NameServer与NameServer之间是没有任何通信的。
- Broker:核心的一个角色,主要是用来保存topic的信息,接受生产者产生的消息,持久化消息。在一个Broker集群中,相同的BrokerName可以称为一个Broker组,一个Broker组中,BrokerId为0的为主节点,其它的为从节点。BrokerName和BrokerId是可以在Broker启动时通过配置文件配置的。每个Broker组只存放一部分消息。
- 生产者:生产消息的一方就是生产者
- 生产者组:一个生产者组可以有很多生产者,只需要在创建生产者的时候指定生产者组,那么这个生产者就在那个生产者组
- 消费者:用来消费生产者消息的一方
- 消费者组:跟生产者一样,每个消费者都有所在的消费者组,一个消费者组可以有很多的消费者,不同的消费者组消费消息是互不影响的。
- topic(主题) :可以理解为一个消息的集合的名字,生产者在发送消息的时候需要指定发到哪个topic下,消费者消费消息的时候也需要知道自己消费的是哪些topic底下的消息。
- Tag(子主题) :比topic低一级,可以用来区分同一topic下的不同业务类型的消息,发送消息的时候也需要指定。
二、工作流程
- Broker启动的时候,会往每台NameServer(因为NameServer之间不通信,所以每台都得注册)注册自己的信息,这些信息包括自己的ip和端口号,自己这台Broker有哪些topic等信息。
- Producer在启动之后会跟NameServer建立连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息需要发送到哪个topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。
- Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份。
- Consumer启动之后也会跟会NameServer建立连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费。
三、SpringBoot集成RocketMQ
3.1 引入依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>2.1.1.RELEASE</version> </dependency>
|
3.2 yml配置
1 2 3 4
| rocketmq: producer: group: sanyouProducer name-server: 192.168.200.143:9876
|
3.3 创建消费者
只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可
1 2 3 4 5 6 7
| @Component@RocketMQMessageListener(consumerGroup = "sanyouConsumer", topic = "sanyouTopic") public class SanYouTopicListener implements RocketMQListener<String> { @Override public void onMessage(String msg) { System.out.println("处理消息:" + msg); } }
|
@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类.
3.4 原理
- 在构造RocketMQTemplate的时候,传入了一个DefaultMQProducer,所以最终RocketMQTemplate发送消息也是通过DefaultMQProducer发送的。
- 会为每一个加了@RocketMQMessageListener注解的对象创建一个DefaultMQPushConsumer,所以最终也是通过DefaultMQPushConsumer消费消息的。
- 至于监听器,遍历每条消息,然后调用handleMessage,最终会调用实现了RocketMQListener的对象处理消息。
四、业务中实现
4.1 RocketMQClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| public class RocketMQClient implements IRocketMQClient {
public Producer getProducer(ClientConfiguration clientConfiguration, String... rocketMQTopic) throws ClientException { return getProvider() .newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(rocketMQTopic) .build(); }
public Producer getProducer(RocketMQProducerConfig rocketMQConfig) throws ClientException { SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(rocketMQConfig.getAccessKey(), rocketMQConfig.getSecretKey()); ClientConfiguration clientConfiguration = ClientConfiguration .newBuilder() .setEndpoints(rocketMQConfig.getEndpoint()) .setCredentialProvider(sessionCredentialsProvider) .build(); return getProvider() .newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(rocketMQConfig.getTopic()) .build(); }
public SimpleConsumer getSimpleConsumer(RocketMQConsumerConfig rocketMQConfig) throws ClientException { FilterExpression filterExpression = getFilterExpression(rocketMQConfig.getTag()); ClientConfiguration clientConfiguration = getClientConfiguration(rocketMQConfig.getAccessKey(), rocketMQConfig.getSecretKey(), rocketMQConfig.getEndpoint()); SimpleConsumerBuilder simpleBuild = getSimpleConsumerBuilder(rocketMQConfig.getGroupName(), rocketMQConfig.getTopic(), clientConfiguration, filterExpression, Duration.ofSeconds(rocketMQConfig.getAwaitDuration())); return simpleBuild.build(); }
private ClientServiceProvider getProvider() { return ClientServiceProvider.loadService(); }
private ClientConfiguration getClientConfiguration(String userName, String password, String endpoints) { SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider(userName, password); return ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setCredentialProvider(sessionCredentialsProvider) .build(); }
private FilterExpression getFilterExpression(String tag) { return new FilterExpression(tag, FilterExpressionType.TAG); }
private SimpleConsumerBuilder getSimpleConsumerBuilder(String group, String topic, ClientConfiguration clientConfiguration, FilterExpression filterExpression, Duration awaitDuration) { return getProvider().newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(group) .setAwaitDuration(awaitDuration) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression));
}
|
4.2 RocketMQProducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| private void sendMsg(String value, String keys, String tag, Long times) { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String topic = config.getTopic(); byte[] body = value.getBytes(StandardCharsets.UTF_8); MessageBuilder messageBuilder = provider.newMessageBuilder() .setTopic(topic) .setBody(body); if (keys != null) { messageBuilder.setKeys(keys); } if (tag != null) { messageBuilder.setTag(tag); } if (times != null) { long deliverTime = System.currentTimeMillis() + times * 1000; messageBuilder.setDeliveryTimestamp(deliverTime); } try { final SendReceipt sendReceipt = producer.send(messageBuilder.build()); } catch (Exception e) { } }
|
4.3 RocketMQConsumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
public void run(Function<String, Exception> executor) { for (int i = 0; i < rocketMQConsumerConfig.getMaxWorkerNum(); i++) { final int num = i; executorService.execute(() -> { Thread.currentThread().setName(String.format(threadName, num)); monitorThread(executor); countDownLatch.countDown(); }); } }
private void monitorThread(Function<String, Exception> executor) { while (!monitorShutDowned) { try { final List<MessageView> messages = simpleConsumer.receive(rocketMQConsumerConfig.getMaxMessageNum(), Duration.ofSeconds(rocketMQConsumerConfig.getInvisibleDuration())); processMessages(executor, messages); } catch (Exception e) {
} } }
private void processMessages(Function<String, Exception> executor, List<MessageView> messages) { for (MessageView message : messages) { final MessageId messageId = message.getMessageId(); String body = StandardCharsets.UTF_8.decode(message.getBody()).toString(); try { Exception exception = executor.apply(body); if (null == exception) { simpleConsumer.ack(message); } } catch (Throwable t) {
} } }
public void shutdown() { monitorShutDowned = true; try { simpleConsumer.close(); } catch (IOException e) { e.printStackTrace(); } executorService.shutdown(); if (countDownLatch != null) { try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
|