手写一个延迟队列
发布时间
阅读量:
阅读量
延迟队列
延迟队列一般应用于周期执行或延迟执行的线程池中,在写延迟队列之前,先写一个一般线程池中应用的阻塞队列,阻塞队列有个特点,当尝试去获取元素时,如果队列为空,则阻塞当前线程,直到有元素进入再次唤醒。
/** * @author siiirius
* @since 2020-10-30 10:14
*/
public interface Queue<E> {
/** * take an element from the queue and delete
* * @return the element
*/
E take();
/** * offer an element to the queue
* * @param e the element
*/
void offer(E e);
}
为了简单,直接使用ArrayList作为容器,这里要注意,每个方法在释放锁的时候,都要检查是否需要唤醒正在等待的线程。
import java.util.ArrayList;
import java.util.List;
/** * @author siiirius
* @since 2020-10-30 10:16
*/
public class BlockQueue<E> implements Queue<E> {
private List<E> container = new ArrayList<>();
private final Object lock = new Object();
@Override
public E take() {
synchronized (lock) {
try {
while (container.isEmpty()) {
try {
lock.wait();
} catch (InterruptedException e) {
return null;
}
}
return container.remove(0);
} finally {
if (!container.isEmpty()) {
lock.notifyAll();
}
}
}
}
@Override
public void offer(E e) {
synchronized (lock) {
container.add(e);
lock.notifyAll();
}
}
}
延迟队列中的元素
/** * @author siiirius
* @since 2020-10-30 10:24
*/
public interface DelayTask extends Runnable, Comparable<DelayTask> {
/** * the time ready to run
* * @return milli time
*/
long getReadyTime();
}
/** * @author siiirius
* @since 2020-10-30 10:34
*/
public class DelayTaskImpl implements DelayTask {
private final Runnable runnable;
private final long delay;
private final long createTime;
public DelayTaskImpl(Runnable runnable, long delay) {
this.runnable = runnable;
this.delay = delay;
createTime = System.currentTimeMillis();
}
@Override
public long getReadyTime() {
return createTime + delay;
}
@Override
public int compareTo(DelayTask o) {
return (int) (this.getReadyTime() - o.getReadyTime());
}
@Override
public void run() {
runnable.run();
}
}
现在来写一个延迟队列,这里不再使用ArrayList作为容器,而是使用一个链表, 此时时间复杂度为n,使用堆可以将时间复杂度降低到logn
/** * @author siiirius
* @since 2020-10-30 10:49
*/
public class DelayQueue implements Queue<DelayTask> {
private Node head;
private Node tail;
private final Object lock = new Object();
@Override
public DelayTask take() {
synchronized (lock) {
try {
while (true) {
if (head == null) {
try {
lock.wait();
} catch (InterruptedException e) {
return null;
}
} else {
Node node = head;
long timeout = node.value.getReadyTime() - System.currentTimeMillis();
if (timeout > 0) {
try {
lock.wait(timeout);
continue;
} catch (InterruptedException e) {
return null;
}
}
head = node.next;
if (tail == node) {
tail = null;
}
return node.value;
}
}
} finally {
if (head != null) {
lock.notifyAll();
}
}
}
}
@Override
public void offer(DelayTask delayTask) {
synchronized (lock) {
Node node = new Node(delayTask);
if (head == null) {
head = node;
tail = node;
} else {
Node cur = head;
while (cur != null) {
int compare = delayTask.compareTo(cur.value);
if (compare < 0) {
Node curPre = cur.pre;
if (curPre != null) {
node.pre = curPre;
curPre.next = node;
} else {
// cur 是head节点,只有head节点的 pre == null,直接插入头部
head = node;
}
node.next = cur;
cur.pre = node;
break;
} else {
cur = cur.next;
}
}
if (cur == null) {
// 应该被放到最后一个节点上, 从尾部直接添加
Node tailPre = tail.pre;
tailPre.next = node;
node.pre = tailPre;
node.next = tail;
tail = node;
}
}
lock.notifyAll();
}
}
private static class Node {
DelayTask value;
Node next;
Node pre;
Node(DelayTask t) {
value = t;
}
}
}
官方的延迟队列是 java.util.concurrent.DelayQueue,容器是优先队列 PriorityQueue,其实就是一个堆,使用的锁是 ReentrantLock, 条件队列是lock.newCondition(),本文中使用的是链表,锁是synchronized,使用Object对象作为锁和条件队列。
全部评论 (0)
还没有任何评论哟~
