java延迟队列
发布时间
阅读量:
阅读量
在使用队列系统之前,默认情况下需要配置复杂的逻辑才能实现定时执行的功能。然而,在面对类似订餐业务/购物等场景时会遇到困难:例如,在购物功能中,在你的订单管理系统中有N个等待支付的订单。当任何一个订单超过十分钟未支付时会自动释放购物篮中的商品,并标记该订单为失效状态。频繁的高频率延迟任务若再采用基于定时(定时任务)的方式进行调度将会导致效率低下甚至崩溃的情况。
推荐使用java 延迟队里来实现:
前提先了解线程,队列。
DelayQueue是java.util.concurrent中提供的一个类。
了解DelayQueue
DelayQueue是什么?
DelayQueue是一种无界且基于BlockingQueue的数据结构,在满足Delayed接口的前提下存储相关对象。这些对象只有当它们到达指定的时间才会被移除。具有优先级特征的DelayQueue表明其优先级最高的元素是那些延迟时间最长的对象。特别提示:在这种队列中不允许存入null值
DelayQueue能做什么?
在我们的业务中常见功能模块如下:
那么这类业务我们可以提炼出其核心特征:涉及延迟处理机制。由此而产生的问题就是我们 DelayQueue 应用需求的形成。
怎么用DelayQueue来解决这类的问题
1:声明Delayed类
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** * 任务线程 实现Delayed接口
*/
public class DelayItem<T extends Runnable> implements Delayed
{
/** * 到期时间
*/
private final long time;
/** * 任务对象
*/
private final T task;
/** * 原子类
*/
private static final AtomicLong atomic = new AtomicLong(0);
private final long n;
public DelayItem(long timeout, T t)
{
this.time = System.nanoTime() + timeout;
this.task = t;
this.n = atomic.getAndIncrement();
}
/** * 返回与此对象相关的剩余延迟时间,以给定的时间单位表示
*/
public long getDelay(TimeUnit unit)
{
return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
public int compareTo(Delayed other)
{
if (other == this)
return 0;
if (other instanceof DelayItem)
{
DelayItem<?> x = (DelayItem<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (n < x.n)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
public T getTask()
{
return this.task;
}
@Override
public int hashCode()
{
return task.hashCode();
}
@Override
public boolean equals(Object object)
{
if (object instanceof DelayItem)
{
return object.hashCode() == hashCode() ? true : false;
}
return false;
}
}
2:再实现一个管理延迟任务的类
import java.util.Map;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
/** * 延迟队列 存放有效期对象
* */
public class ItemQueueThread
{
private static final Logger logger = Logger.getLogger(ItemQueueThread.class);
private ItemQueueThread()
{
}
/** * 延迟加载(线程安全)
* */
private static class LazyHolder
{
private static ItemQueueThread itemQueueThread = new ItemQueueThread();
}
public static ItemQueueThread getInstance()
{
return LazyHolder.itemQueueThread;
}
/** * 缓存线程池
*/
ExecutorService executor = Executors.newCachedThreadPool();
/** * 线程
*/
private Thread daemonThread;
/** * 初始化线程
*/
public void init()
{
daemonThread = new Thread(() -> {
try
{
execute();
}
catch (InterruptedException e)
{
e.printStackTrace();
logger.info(e.getMessage());
}
});
System.out.println("init------------------start");
daemonThread.start();
}
private void execute()
throws InterruptedException
{
while (true)
{
Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
System.out.println("线程数--------------" + map.size());
System.out.println(System.currentTimeMillis());
System.out.println(item.size());
System.out.println("线程状态---------" + Thread.currentThread().getState());
try
{
// 从延迟队列中取值,如果没有对象过期则队列一直等待,
DelayItem<?> t1 = item.take();
if (t1 != null)
{
Runnable task = t1.getTask();
if (task == null)
{
continue;
}
executor.execute(task);
}
}
catch (Exception e)
{
e.printStackTrace();
logger.info(e.getMessage());
}
}
}
/** * 创建空的延迟队列
*/
private DelayQueue<DelayItem<?>> item = new DelayQueue<>();
/** * 往队列中添加任务
* * @param time 延迟时间
* @param task 任务
* @param timeUnit 时间单位
* */
@SuppressWarnings({"unchecked", "rawtypes"})
public void put(long time, Runnable task, TimeUnit timeUnit)
{
// 转换成ns
long nanoTime = TimeUnit.NANOSECONDS.convert(time, timeUnit);
DelayItem<?> k = new DelayItem(nanoTime, task);
item.put(k);
}
/** * 结束任务
* * @param task
*/
public boolean endTask(DelayItem<Runnable> task)
{
return item.remove(task);
}
}
将需要延迟的功能代码单独提取出来,并作为一类具有特定行为的实体进行处理
public class DataDemo implements Runnable{
int a=-1;
public DataDemo(int a){
this.a=a;
}
@Override
public void run() {
System.out.println("超时,我要撤销订单啦~~~~~~~"+"##########################"+a);
}
}
4:编写一个测试类
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class DelayTest {
public static void main(String[] args) {
ItemQueueThread ith=ItemQueueThread.getInstance();
ith.init();
Random r=new Random();
for(int i=0;i<5;i++)
{
int a=r.nextInt(20);//创建一个随机秒数
System.out.println("预先知道等待时间:"+a);
DataDemo dd=new DataDemo(a);//创建一个任务对象
ith.put(a, dd,TimeUnit.SECONDS);//将任务对象添加到队列中
}
}
}
需注意ItemQueueThread的初始化方法init()应在容器初始化时即被启动,在第一次将延迟对象放入队列前就已完成初始化。一旦指定的时间超限,则会启动其中的任务执行。
end。。。。。。。
全部评论 (0)
还没有任何评论哟~
