Advertisement

spring boot-使用redis的Keyspace Notifications实现定时任务队列

阅读量:

前言:

最近项目中出现了一个需求:需要将那些在执行过程中出现错误的任务记录下来,并每隔五分钟重新处理这些指令。一见到这个需求就联想到了Redis其特点之一就是能够自动通知出现问题的服务及其对应的事务队列这种机制非常适合当前场景。一个替代方案是采用Java JDK自带的DelayQueue组件但可能无法达到预期性能提升的效果因此决定尝试通过自定义实现的方式解决这个问题。鉴于系统架构采用了分布式设计因此选择在Redis层面上进行处理以保证系统的可靠性和可扩展性

该机制涉及Redis的Keyspace Notifications功能,请参阅:http://redisdoc.com/topic/notification.htm

工作原理:从Redis 2.8版本开始,在应用Redis缓存功能时(即把<key, value>对存储到Redis缓存池中并设置其过期时间),系统会触发Redis的键事件通知机制(即所谓的"key event")。客户端订阅该特定键事件类型的通知(即当某个键过期时会触发此类型的通知),那么服务端就会将对应的通知信息发送给所有订阅该事件类型的客户端。随后客户端就会根据接收到的相关信息进行相应的后续处理操作(例如:键过期时间通知对应的topic为:"keyevent@0:expired")

由于启动键空间通知功能可能会占用一些资源管理中的CPU时间,在默认配置下,该功能被设置为关闭状态。

支持通过修改 redis.conf 文件的操作来实现开启或关闭键空间通知功能;也可直接采用CONFIGSET 命令来完成该操作。

配置文件修改方式如下:

复制代码
    notify-keyspace-events Ex  // 打开此配置,其中Ex表示键事件通知里面的key过期事件,每当有过期键被删除时,会发送通知
  • notify-keyspace-events 选项的参数设置为空字符串时(功能被关闭)。
  • 相反地,在这种情况下(即参数不为空字符串),功能被启用。

notify-keyspace-events 的参数表示为指定字符的任意组合,并指定了服务器应该发送哪些类型的通知。

字符 发送的通知
K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
g DEL EXPIRERENAME 等类型无关的通用命令的通知
$ 字符串命令的通知
l 列表命令的通知
s 集合命令的通知
h 哈希命令的通知
z 有序集合命令的通知
x 过期事件:每当有过期键被删除时发送
e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
A 参数 g$lshzxe 的别名

如果在参数中没有包含 KE ,则无论其他参数如何设置,都不会有任何通知被发送出去。

举例来说,在键空间中关注与列表相关的通知之前,请确保您将参数配置为 Kl 之类的设置。

将参数设为字符串 "AKE" 表示发送所有类型的通知。

1、编写监听器

复制代码
 package com.chhliu.springboot.redis.config;

    
  
    
  
    
 import org.springframework.data.redis.connection.Message;
    
 import org.springframework.data.redis.connection.MessageListener;
    
 import org.springframework.stereotype.Component;
    
  
    
 @Component
    
 public class TopicMessageListener implements MessageListener {
    
  
    
 	@Override
    
 	public void onMessage(Message message, byte[] pattern) {// 客户端监听订阅的topic,当有消息的时候,会触发该方法
    
     	byte[] body = message.getBody();// 请使用valueSerializer
    
     	byte[] channel = message.getChannel();
    
     	String topic = new String(channel);
    
     	String itemValue = new String(body);
    
     	// 请参考配置文件,本例中key,value的序列化方式均为string。
    
     	System.out.println("topic:"+topic);
    
     	System.out.println("itemValue:"+itemValue);
    
 	}
    
 }

2、配置RedisMessageListenerContainer监听容器

复制代码
 package com.chhliu.springboot.redis.config;

    
  
    
 import java.util.concurrent.Executor;
    
 import java.util.concurrent.ThreadPoolExecutor;
    
  
    
 import org.springframework.beans.factory.annotation.Autowired;
    
 import org.springframework.beans.factory.annotation.Value;
    
 import org.springframework.context.annotation.Bean;
    
 import org.springframework.context.annotation.Configuration;
    
 import org.springframework.data.redis.core.RedisTemplate;
    
 import org.springframework.data.redis.listener.ChannelTopic;
    
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
  
    
 @Configuration
    
 public class RedisMessageListenerContainerConfig {
    
 	
    
 	@Autowired
    
 	private RedisTemplate<Object,Object> redisTemplate;
    
 	
    
 	@Autowired
    
 	private TopicMessageListener messageListener;
    
 	
    
 	@Autowired
    
 	private TaskThreadPoolConfig config;
    
 	
    
 	@Value("spring.redis.topic")
    
 	private String topic;
    
 	
    
 	@Bean
    
 	public RedisMessageListenerContainer configRedisMessageListenerContainer(Executor executor){
    
 		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    
 		// 设置Redis的连接工厂
    
 		container.setConnectionFactory(redisTemplate.getConnectionFactory());
    
 		// 设置监听使用的线程池
    
 		container.setTaskExecutor(executor);
    
 		// 设置监听的Topic
    
 		ChannelTopic channelTopic = new ChannelTopic("__keyevent@0__:expired");
    
 		// 设置监听器
    
 		container.addMessageListener(messageListener, channelTopic);
    
 		return container;
    
 	}
    
 	
    
 	@Bean // 配置线程池
    
 	public Executor myTaskAsyncPool() {
    
 		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    
 		executor.setCorePoolSize(config.getCorePoolSize());
    
 		executor.setMaxPoolSize(config.getMaxPoolSize());
    
 		executor.setQueueCapacity(config.getQueueCapacity());
    
 		executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
    
 		executor.setThreadNamePrefix(config.getThreadNamePrefix());
    
  
    
 		// rejection-policy:当pool已经达到max size的时候,如何处理新任务
    
 		// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
    
 		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    
 		executor.initialize();
    
 		return executor;
    
 	}
    
 }

注释:详细说明了线程池的配置方案。读者若想进一步了解相关内容,请参考我的另一篇博客:<>

特别提示:在Redis客户端中查看变量keyevent@0_:expired时,请特别注意其中的字段名"_"并非英文字符下的下划线符号,请直接从客户端复制该字段名以避免导致无法正确订阅指定的事件Topic

3、配置文件

复制代码
 server.port=9999

    
 ########################################################
    
 ###REDIS (RedisProperties) redis基本配置;
    
 ########################################################
    
 # database name
    
 spring.redis.database=0
    
 # server host1 单机使用,对应服务器ip
    
 #spring.redis.host=127.0.0.1  
    
 # server password 密码,如果没有设置可不配
    
 #spring.redis.password=
    
 #connection port  单机使用,对应端口号
    
 #spring.redis.port=6379
    
 # pool settings ...池配置
    
 spring.redis.pool.max-idle=8
    
 spring.redis.pool.min-idle=0
    
 spring.redis.pool.max-active=8
    
 spring.redis.pool.max-wait=-1
    
 # name of Redis server  哨兵监听的Redis server的名称
    
 spring.redis.sentinel.master=mymaster
    
 # comma-separated list of host:port pairs  哨兵的配置列表
    
 spring.redis.sentinel.nodes=192.168.1.108:26379,192.168.1.108:26479,192.168.1.108:26579
    
  
    
 spring.task.pool.corePoolSize=10
    
 spring.task.pool.maxPoolSize=20
    
 spring.task.pool.keepAliveSeconds=60
    
 spring.task.pool.queueCapacity=100
    
 spring.task.pool.threadNamePrefix=myThreadPool
    
  
    
 spring.redis.topic=__keyevent@0__:expired

4、启动程序

复制代码
 package com.chhliu.springboot.redis;

    
  
    
 import org.springframework.boot.SpringApplication;
    
 import org.springframework.boot.autoconfigure.SpringBootApplication;
    
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
    
  
    
 import com.chhliu.springboot.redis.config.TaskThreadPoolConfig;
    
  
    
 @SpringBootApplication
    
 @EnableConfigurationProperties({TaskThreadPoolConfig.class} ) // 开启配置属性支持
    
 public class SpringbootRedisApplication {
    
 	
    
 	public static void main(String[] args) {
    
 		SpringApplication.run(SpringbootRedisApplication.class, args);
    
 	}
    
 }

5、在客户端输入命令

复制代码
 set myname chhliu

    
 expire myname 2
    
 或者如下:
    
 set myname chhliu px 2000

请配置键值,并同时设置超时时间为2秒。约2秒后, 您就能查看到console端提供的输出信息了.

复制代码
 2017-04-12 20:23:16.367  INFO 12464 --- [  myThreadPool2] c.c.s.redis.config.TopicMessageListener  : 是否获取到锁:true

    
 topic:__keyevent@0__:expired
    
 itemValue:myname
    
 2017-04-12 20:23:16.369  INFO 12464 --- [  myThreadPool2] c.c.s.redis.config.TopicMessageListener  : 任务结束,释放锁!

注意:

上面的_keyevent@0_:expired 是 key 过期事件关联的 topic。服务端会将该 overlapping event push 到相应的 topic 中,并且客户端会关注并监听这个 topic。

2、当key发生过期时,在topic中推送的内容仅包含关键信息而没有对应的值域(value),这是因为一旦key过期后相应的值域就不再包含与该key相关的信息。

注意事项

Redis 使用以下两种方式删除过期的键:

  • 每当一个键被访问时, 程序会对该键进行审查, 如果发现已失效, 该会将其删除.
  • 该系统会逐步地在后台进行查找和清除已失效的键, 并处理那些虽然已失效但不常访问到的键.

当上述两个程序中的任何一个检测到过期键时,并将其从数据库中删除,则Redis会发送一个expired的通知。

Redis 不保证 TTL 为零的键会在它们过期后立即被移除。如果程序未访问该键或该键已存在存活时间,则在 TTL 达到 0 之前,即使某个键未被访问过,在 TTL 有效期内也可能会有较长时间未被移除。或者当大量带存活时间的键存在时,在 TTL 达到 0 之前也会有较长的一段时间无法完全清理干净。

因为Redis会在过期键被删除时触发expired通知,并非基于键的生存时间归零这一时刻触发。若业务对从过期至删除之间的时间间隔有严格要求,则必须采用其他方法。

全部评论 (0)

还没有任何评论哟~