Advertisement

java延迟队列

阅读量:

在使用队列系统之前,默认情况下需要配置复杂的逻辑才能实现定时执行的功能。然而,在面对类似订餐业务/购物等场景时会遇到困难:例如,在购物功能中,在你的订单管理系统中有N个等待支付的订单。当任何一个订单超过十分钟未支付时会自动释放购物篮中的商品,并标记该订单为失效状态。频繁的高频率延迟任务若再采用基于定时(定时任务)的方式进行调度将会导致效率低下甚至崩溃的情况。

推荐使用java 延迟队里来实现:

前提先了解线程,队列。

DelayQueue是java.util.concurrent中提供的一个类。

了解DelayQueue

DelayQueue是什么?

DelayQueue是一种无界且基于BlockingQueue的数据结构,在满足Delayed接口的前提下存储相关对象。这些对象只有当它们到达指定的时间才会被移除。具有优先级特征的DelayQueue表明其优先级最高的元素是那些延迟时间最长的对象。特别提示:在这种队列中不允许存入null值

DelayQueue能做什么?

在我们的业务中常见功能模块如下:

那么这类业务我们可以提炼出其核心特征:涉及延迟处理机制。由此而产生的问题就是我们 DelayQueue 应用需求的形成。

怎么用DelayQueue来解决这类的问题

1:声明Delayed类

复制代码
 import java.util.concurrent.Delayed;

    
 import java.util.concurrent.TimeUnit;
    
 import java.util.concurrent.atomic.AtomicLong;
    
  
    
 /** * 任务线程 实现Delayed接口
    
  */
    
 public class DelayItem<T extends Runnable> implements Delayed
    
 {
    
     
    
     /** * 到期时间
    
      */
    
     private final long time;
    
     
    
     /** * 任务对象
    
      */
    
     private final T task;
    
     
    
     /** * 原子类
    
      */
    
     private static final AtomicLong atomic = new AtomicLong(0);
    
     
    
     private final long n;
    
     
    
     public DelayItem(long timeout, T t)
    
     {
    
     this.time = System.nanoTime() + timeout;
    
     this.task = t;
    
     this.n = atomic.getAndIncrement();
    
     }
    
     
    
     /** * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
    
      */
    
     public long getDelay(TimeUnit unit)
    
     {
    
     return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
    
     }
    
     
    
     public int compareTo(Delayed other)
    
     {
    
     if (other == this)
    
         return 0;
    
     if (other instanceof DelayItem)
    
     {
    
         DelayItem<?> x = (DelayItem<?>)other;
    
         long diff = time - x.time;
    
         if (diff < 0)
    
             return -1;
    
         else if (diff > 0)
    
             return 1;
    
         else if (n < x.n)
    
             return -1;
    
         else
    
             return 1;
    
     }
    
     long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
    
     return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    
     }
    
     
    
     public T getTask()
    
     {
    
     return this.task;
    
     }
    
     
    
     @Override
    
     public int hashCode()
    
     {
    
     return task.hashCode();
    
     }
    
     
    
     @Override
    
     public boolean equals(Object object)
    
     {
    
     if (object instanceof DelayItem)
    
     {
    
         return object.hashCode() == hashCode() ? true : false;
    
     }
    
     return false;
    
     }
    
 }

2:再实现一个管理延迟任务的类

复制代码
 import java.util.Map;

    
 import java.util.concurrent.DelayQueue;
    
 import java.util.concurrent.ExecutorService;
    
 import java.util.concurrent.Executors;
    
 import java.util.concurrent.TimeUnit;
    
  
    
 import org.apache.log4j.Logger;
    
  
    
 /** * 延迟队列 存放有效期对象
    
  * */
    
 public class ItemQueueThread
    
 {
    
     private static final Logger logger = Logger.getLogger(ItemQueueThread.class);
    
     
    
     private ItemQueueThread()
    
     {
    
     }
    
     
    
     /** * 延迟加载(线程安全)
    
      * */
    
     private static class LazyHolder
    
     {
    
     private static ItemQueueThread itemQueueThread = new ItemQueueThread();
    
     }
    
     
    
     public static ItemQueueThread getInstance()
    
     {
    
     return LazyHolder.itemQueueThread;
    
     }
    
     
    
     /** * 缓存线程池
    
      */
    
     ExecutorService executor = Executors.newCachedThreadPool();
    
     
    
     /** * 线程
    
      */
    
     private Thread daemonThread;
    
     
    
     /** * 初始化线程
    
      */
    
     public void init()
    
     {
    
     daemonThread = new Thread(() -> {
    
         try
    
         {
    
             execute();
    
         }
    
         catch (InterruptedException e)
    
         {
    
             e.printStackTrace();
    
             logger.info(e.getMessage());
    
         }
    
     });
    
     System.out.println("init------------------start");
    
     daemonThread.start();
    
     }
    
     
    
     private void execute()
    
     throws InterruptedException
    
     {
    
     while (true)
    
     {
    
         Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
    
         System.out.println("线程数--------------" + map.size());
    
         System.out.println(System.currentTimeMillis());
    
         System.out.println(item.size());
    
         System.out.println("线程状态---------" + Thread.currentThread().getState());
    
         try
    
         {
    
             // 从延迟队列中取值,如果没有对象过期则队列一直等待,
    
             DelayItem<?> t1 = item.take();
    
             if (t1 != null)
    
             {
    
                 Runnable task = t1.getTask();
    
                 if (task == null)
    
                 {
    
                     continue;
    
                 }
    
                 executor.execute(task);
    
             }
    
         }
    
         catch (Exception e)
    
         {
    
             e.printStackTrace();
    
             logger.info(e.getMessage());
    
         }
    
     }
    
     }
    
     
    
     /** * 创建空的延迟队列
    
      */
    
     private DelayQueue<DelayItem<?>> item = new DelayQueue<>();
    
     
    
     /** * 往队列中添加任务
    
      * * @param time 延迟时间
    
      * @param task 任务
    
      * @param timeUnit 时间单位
    
      * */
    
     @SuppressWarnings({"unchecked", "rawtypes"})
    
     public void put(long time, Runnable task, TimeUnit timeUnit)
    
     {
    
     // 转换成ns
    
     long nanoTime = TimeUnit.NANOSECONDS.convert(time, timeUnit);
    
     DelayItem<?> k = new DelayItem(nanoTime, task);
    
     item.put(k);
    
     }
    
     
    
     /** * 结束任务
    
      * * @param task
    
      */
    
     public boolean endTask(DelayItem<Runnable> task)
    
     {
    
     return item.remove(task);
    
     }
    
 }

将需要延迟的功能代码单独提取出来,并作为一类具有特定行为的实体进行处理

复制代码
 public class DataDemo implements Runnable{

    
  
    
 	int a=-1;
    
 	public DataDemo(int a){
    
 		this.a=a;
    
 	}
    
 	@Override
    
 	public void run() {
    
 			
    
 		System.out.println("超时,我要撤销订单啦~~~~~~~"+"##########################"+a);
    
 	}
    
  
    
 }

4:编写一个测试类

复制代码
 import java.util.Random;

    
 import java.util.concurrent.TimeUnit;
    
  
    
 public class DelayTest {
    
  
    
 	public static void main(String[] args) {
    
 		ItemQueueThread ith=ItemQueueThread.getInstance();
    
 		ith.init();
    
 		Random r=new Random();
    
 		for(int i=0;i<5;i++)
    
 		{
    
 			int a=r.nextInt(20);//创建一个随机秒数
    
 			System.out.println("预先知道等待时间:"+a);
    
 			DataDemo dd=new DataDemo(a);//创建一个任务对象
    
 			ith.put(a, dd,TimeUnit.SECONDS);//将任务对象添加到队列中
    
 		}
    
 		
    
 	}
    
 }

需注意ItemQueueThread的初始化方法init()应在容器初始化时即被启动,在第一次将延迟对象放入队列前就已完成初始化。一旦指定的时间超限,则会启动其中的任务执行。

end。。。。。。。

全部评论 (0)

还没有任何评论哟~