实习笔记-Redis(队列)
因为Redis做缓存的用法以及比较熟悉,所以这里只学习用Redis实现队列的方法。
一、Redis作消息队列原理
现在各种开源的 MQ 已经足够使用了,为什么需要用 Redis 实现 MQ 呢?
- 有些简单的业务场景,可能不需要重量级的 MQ 组件(相比 Redis 来说,Kafka 和 RabbitMQ 都算是重量级的消息队列);
主要是利用Redis中的List实现队列;
主要是利用Redis中的ZSet来实现延迟队列,Zset中存储数据结构也是K-V的数据,其中V中包含memmber和score,可以通过score可以排序。
Redis 的有序集合保留了集合的特性(元素不能重复),而且在此基础上的元素是可以排序的(分数可以重复)。 但是,redis 的有序集合的排序和列表的排序不同,有序集合并非使用索引下标来排序,而是使用每个元素设置了一个分数(score),来做为排序的依据。
二、RedisQueue实现方法
2.1 rightPush方法
1 | public boolean rightPush(String s) { |
2.2 leftPop方法
1 | public String leftPop() { |
三、RedisDelayQueue实现方法
定义IDelayQueue接口,其中有add、get和rem三个方法,然后写一个RedisDelayQueue类实现这个接口。
3.1 add方法
1 | public boolean add(long score, String data) { |
3.2 get方法
1 | public String get() { |
3.3 rem方法
1 | public boolean rem(String data) { |
3.4 业务中具体实现
以金币任务队列为例:
在dj_task里的GoldTaskServiceImpl中实现添加用户金币的逻辑,调用comSaveGold方法保存用户信息和金币,并且传入goldTaskQueue队列;
1
2
3
4
5
6private void saveUserGold(String user, int coin) {
GoldOrder goldOrder = new GoldOrder();
goldOrder.setUser(user);
goldOrder.setAmount(String.valueOf(coin));
comSaveGold(goldOrder, goldTaskQueue);
}在comSaveGold方法中,通过rpc调用dj_vc中saveCoin方法保存金币;
1
2
3
4
5
6
7
8private void comSaveGold(GoldOrder goldOrder, IDelayQueue<String> goldOrderQueue) {
boolean flag = vcRpcService.saveCoin(goldOrder);
if (!flag) {
// 保存失败重试
goldOrderQueue.add(System.currentTimeMillis(), GsonUtils.toJson(goldOrder, new TypeToken<GoldOrder>() {
}.getType()));
}
}如果保存失败,利用队列的失败重试机制,调用goldOrderQueue的add方法,获取系统当前时间作为score;
在dj_task中的RedisConfig中设置队列和消费;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15// 队列
public IDelayQueue<String> goldOrderRedisQueue(RedisTemplate<String, String> redisTemplate) {
return new RedisDelayQueue(goldOrderQueue, redisTemplate);
}
// 队列消费
public GoldOrderQueueMonitor goldOrderService(IDelayQueue<String> goldOrderQueue) {
if (goldOrderQueueNum > 0) {
return new GoldOrderQueueMonitor(goldOrderQueue, goldOrderQueueName, goldOrderQueueNum, goldOrderQueueSleepTime);
}
return null;
}调用GoldOrderQueueMonitor中的GoldOrderQueueMonitor()方法消费;
在dj_task的GoldOrderQueueMonitor中,进行监听,再继续通过rpc调用dj_vc中的saveCoin方法保存金币;
1
2
3
4
5
6
7
8
9
10
11
12
13public boolean execute(String value) {
LOG.info("GoldOrderQueue execute value: {}", value);
GoldOrder goldOrder = GsonUtils.fromJson(value, new TypeToken<GoldOrder>() {
}.getType());
boolean flag = vcRpcService.saveCoin(goldOrder);
if (!flag) {
// 保存失败重试
goldOrderQueue.add(System.currentTimeMillis() + Constant.GOLD_QUEUE_DELAY_TIME,
GsonUtils.toJson(goldOrder, new TypeToken<GoldOrder>() {
}.getType()));
}
return true;
}如果仍然保存失败,继续添加进goldOrderQueue队列,并以当前时间 + 延迟时间(5分钟)作为score;
在dj_vc中的RedisConfig配置队名和消费;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24/**
* 队列,task入队列,vc消费
*
* @return queue
*/
public IDelayQueue<String> goldTaskRedisQueue(RedisTemplate<String, String> redisTemplate) {
return new RedisDelayQueue(goldTaskQueue, redisTemplate, true);
}
/**
* 队列消费,task入队列,vc消费
*
* @param goldTaskQueue queue
* @return queue
*/
public GoldTaskQueueMonitor goldTaskService(IDelayQueue<String> goldTaskQueue) {
if (goldTaskQueueNum > 0) {
return new GoldTaskQueueMonitor(goldTaskQueue, goldTaskQueueName, goldTaskQueueNum, goldTaskQueueSleepTime);
}
return null;
}到GoldTaskQueueMonitor里面,继续监听,调用billing方法处理业务,如果处理失败,继续以当前时间+5分钟设置score。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public boolean execute(String value) {
GoldOrder goldOrder = GsonUtils.fromJson(value, new TypeToken<GoldOrder>() {
}.getType());
if (null == goldOrder) {
return true;
}
try {
billingService.billing(goldOrder);
} catch (Exception e) {
// 保存失败重试
goldTaskQueue.add(System.currentTimeMillis() + VcConstant.GOLD_QUEUE_DELAY_TIME,
GsonUtils.toJson(goldOrder, new TypeToken<List<GoldOrder>>() {
}.getType()));
}
return true;
}