发布于 

实习笔记-Redis(队列)

因为Redis做缓存的用法以及比较熟悉,所以这里只学习用Redis实现队列的方法。

一、Redis作消息队列原理

现在各种开源的 MQ 已经足够使用了,为什么需要用 Redis 实现 MQ 呢?

  • 有些简单的业务场景,可能不需要重量级的 MQ 组件(相比 Redis 来说,Kafka 和 RabbitMQ 都算是重量级的消息队列);

主要是利用Redis中的List实现队列;

主要是利用Redis中的ZSet来实现延迟队列,Zset中存储数据结构也是K-V的数据,其中V中包含memmber和score,可以通过score可以排序。

Redis 的有序集合保留了集合的特性(元素不能重复),而且在此基础上的元素是可以排序的(分数可以重复)。 但是,redis 的有序集合的排序和列表的排序不同,有序集合并非使用索引下标来排序,而是使用每个元素设置了一个分数(score),来做为排序的依据。

二、RedisQueue实现方法

2.1 rightPush方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public boolean rightPush(String s) {
try {
// 调用 opsForList().rightPush(key, s) 方法,将s推到列表key的尾部
// 返回一个long类型的值,表示入队之后列表的新长度,如果 > 0则表示推入成功
return redisQueueTemplate.opsForList().rightPush(key, s) > 0;
} catch (Exception e) {
// 获取redisQueueTemplate的RedisConnectionFactory
RedisConnectionFactory redisConnectionFactory = redisQueueTemplate.getConnectionFactory();
// 判断这个连接工厂是否是JedisConnectionFactory实例
if (!(redisConnectionFactory instanceof JedisConnectionFactory)) {
throw e;
}
JedisConnectionFactory connectionFactory = (JedisConnectionFactory) redisConnectionFactory;
// 获取主机名
String host = connectionFactory.getHostName();
// 获取端口号
int port = connectionFactory.getPort();
// 将主机名、端口号、数据库索引创建RedisErrorLog对象,记录错误信息
RedisErrorLog errorLog = new RedisErrorLog("", host, port, connectionFactory.getDatabase());
// 调用 addLpushOp 方法向 errorLog 添加一个关于 LPUSH 操作的日志。
errorLog.addLpushOp(key, s);
// 使用 RedisLogUtils.redisLog(errorLog) 方法记录错误日志
RedisLogUtils.redisLog(errorLog);
throw e;
}
}

2.2 leftPop方法

1
2
3
4
public String leftPop() {
// 将key从list的队头取出
return redisQueueTemplate.opsForList().leftPop(key);
}

三、RedisDelayQueue实现方法

定义IDelayQueue接口,其中有add、get和rem三个方法,然后写一个RedisDelayQueue类实现这个接口。

3.1 add方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public boolean add(long score, String data) {
try {
// 调用ZSet中的add方法,将传入的data和score存到ZSet中;
Boolean result = this.redisQueueTemplate.opsForZSet().add(this.key, data, (double)score);
// 返回true or false,判断是否成功
return result != null && result;
} catch (Exception var10) { // 如果存在异常,则捕获异常
// 获取redisQueueTemplate的链接工厂
RedisConnectionFactory redisConnectionFactory = this.redisQueueTemplate.getConnectionFactory();
// 判断当前链接工厂是否是JedisConnectionFactory类型的
if (!(redisConnectionFactory instanceof JedisConnectionFactory)) {
// 如果不是,则抛出异常
throw var10;
} else {
// 如果是
JedisConnectionFactory connectionFactory = (JedisConnectionFactory)redisConnectionFactory;
// 获取主机名
String host = connectionFactory.getHostName();
// 获取端口号
int port = connectionFactory.getPort();
// 创建RedisErrorLog实例,记录错误信息
RedisErrorLog errorLog = new RedisErrorLog("", host, port, connectionFactory.getDatabase());
// 记录错误日志
errorLog.addZaddOp(this.key, score, data);
RedisLogUtils.redisLog(errorLog);
throw var10;
}
}
}

3.2 get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public String get() {
// 获取当前时间
double now = (double)System.currentTimeMillis();
double min = Double.MIN_VALUE;
Set res;
// 如果random为true
if (this.random) {
// 使用rangeByScore方法,以最小得分min和当前时间now作为范围,获取ZSet中前60个key成员
res = this.redisQueueTemplate.opsForZSet().rangeByScore(this.key, min, now, 0L, 60L);
// 如果res集合中的元素不为空
if (CollectionUtils.isNotEmpty(res)) {
ArrayList<String> list = new ArrayList(res);
// 尝试随机获取集合中的某个元素最多两次,每次随机一个0-59之间的随机索引
for(int j = 0; j < 2; ++j) {
int i = (int)(Math.random() * 60.0);
String s = (String)list.get(i % res.size());
// 如果s不为空,则删除这个元素
if (StringUtils.isNotBlank(s)) {
if (this.rem(s)) {
return s;
}
}
}
}
} else {
// 使用rangeByScore方法,以最小得分min和当前时间now作为范围,获取ZSet中前10个key成员
res = this.redisQueueTemplate.opsForZSet().rangeByScore(this.key, min, now, 0L, 10L);
// 遍历res,删除每个元素
if (CollectionUtils.isNotEmpty(res)) {
Iterator var10 = res.iterator();
while(var10.hasNext()) {
String data = (String)var10.next();
if (this.rem(data)) {
return data;
}
}
}
}
return null;
}

3.3 rem方法

1
2
3
4
5
public boolean rem(String data) {
// 调用remove方法直接删除
Long result = this.redisQueueTemplate.opsForZSet().remove(this.key, new Object[]{data});
return result != null && result > 0L;
}

3.4 业务中具体实现

以金币任务队列为例:

  1. 在dj_task里的GoldTaskServiceImpl中实现添加用户金币的逻辑,调用comSaveGold方法保存用户信息和金币,并且传入goldTaskQueue队列;

    1
    2
    3
    4
    5
    6
    private void saveUserGold(String user, int coin) {
    GoldOrder goldOrder = new GoldOrder();
    goldOrder.setUser(user);
    goldOrder.setAmount(String.valueOf(coin));
    comSaveGold(goldOrder, goldTaskQueue);
    }
  2. 在comSaveGold方法中,通过rpc调用dj_vc中saveCoin方法保存金币;

    1
    2
    3
    4
    5
    6
    7
    8
    private void comSaveGold(GoldOrder goldOrder, IDelayQueue<String> goldOrderQueue) {
    boolean flag = vcRpcService.saveCoin(goldOrder);
    if (!flag) {
    // 保存失败重试
    goldOrderQueue.add(System.currentTimeMillis(), GsonUtils.toJson(goldOrder, new TypeToken<GoldOrder>() {
    }.getType()));
    }
    }
  3. 如果保存失败,利用队列的失败重试机制,调用goldOrderQueue的add方法,获取系统当前时间作为score;

  4. 在dj_task中的RedisConfig中设置队列和消费;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // 队列
    @Bean(name = "goldOrderQueue")
    public IDelayQueue<String> goldOrderRedisQueue(RedisTemplate<String, String> redisTemplate) {
    return new RedisDelayQueue(goldOrderQueue, redisTemplate);
    }

    // 队列消费
    @Bean(name = "goldOrderService", destroyMethod = "shutdown")
    @Conditional(ConditionalConfig.class)
    public GoldOrderQueueMonitor goldOrderService(IDelayQueue<String> goldOrderQueue) {
    if (goldOrderQueueNum > 0) {
    return new GoldOrderQueueMonitor(goldOrderQueue, goldOrderQueueName, goldOrderQueueNum, goldOrderQueueSleepTime);
    }
    return null;
    }
  5. 调用GoldOrderQueueMonitor中的GoldOrderQueueMonitor()方法消费;

  6. 在dj_task的GoldOrderQueueMonitor中,进行监听,再继续通过rpc调用dj_vc中的saveCoin方法保存金币;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public boolean execute(String value) {
    LOG.info("GoldOrderQueue execute value: {}", value);
    GoldOrder goldOrder = GsonUtils.fromJson(value, new TypeToken<GoldOrder>() {
    }.getType());
    boolean flag = vcRpcService.saveCoin(goldOrder);
    if (!flag) {
    // 保存失败重试
    goldOrderQueue.add(System.currentTimeMillis() + Constant.GOLD_QUEUE_DELAY_TIME,
    GsonUtils.toJson(goldOrder, new TypeToken<GoldOrder>() {
    }.getType()));
    }
    return true;
    }
  7. 如果仍然保存失败,继续添加进goldOrderQueue队列,并以当前时间 + 延迟时间(5分钟)作为score;

  8. 在dj_vc中的RedisConfig配置队名和消费;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    /**
    * 队列,task入队列,vc消费
    *
    * @return queue
    */
    @Bean(name = "goldTaskQueue")
    public IDelayQueue<String> goldTaskRedisQueue(RedisTemplate<String, String> redisTemplate) {
    return new RedisDelayQueue(goldTaskQueue, redisTemplate, true);
    }

    /**
    * 队列消费,task入队列,vc消费
    *
    * @param goldTaskQueue queue
    * @return queue
    */
    @Bean(name = "goldTaskService", destroyMethod = "shutdown")
    @Conditional(ConditionalConfig.class)
    public GoldTaskQueueMonitor goldTaskService(IDelayQueue<String> goldTaskQueue) {
    if (goldTaskQueueNum > 0) {
    return new GoldTaskQueueMonitor(goldTaskQueue, goldTaskQueueName, goldTaskQueueNum, goldTaskQueueSleepTime);
    }
    return null;
    }
  9. 到GoldTaskQueueMonitor里面,继续监听,调用billing方法处理业务,如果处理失败,继续以当前时间+5分钟设置score。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public boolean execute(String value) {
    GoldOrder goldOrder = GsonUtils.fromJson(value, new TypeToken<GoldOrder>() {
    }.getType());
    if (null == goldOrder) {
    return true;
    }
    try {
    billingService.billing(goldOrder);
    } catch (Exception e) {
    // 保存失败重试
    goldTaskQueue.add(System.currentTimeMillis() + VcConstant.GOLD_QUEUE_DELAY_TIME,
    GsonUtils.toJson(goldOrder, new TypeToken<List<GoldOrder>>() {
    }.getType()));
    }
    return true;
    }

本站由 Cccccpg 使用 Stellar 主题创建。
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。