Advertisement

手写一个延迟队列

阅读量:

延迟队列

延迟队列一般应用于周期执行或延迟执行的线程池中,在写延迟队列之前,先写一个一般线程池中应用的阻塞队列,阻塞队列有个特点,当尝试去获取元素时,如果队列为空,则阻塞当前线程,直到有元素进入再次唤醒。

复制代码
    /** * @author siiirius
     * @since 2020-10-30 10:14
     */
    public interface Queue<E> {
    
    /** * take an element from the queue and delete
     * * @return the element
     */
    E take();
    
    /** * offer an element to the queue
     * * @param e the element
     */
    void offer(E e);
    }

为了简单,直接使用ArrayList作为容器,这里要注意,每个方法在释放锁的时候,都要检查是否需要唤醒正在等待的线程。

复制代码
    import java.util.ArrayList;
    import java.util.List;
    
    /** * @author siiirius
     * @since 2020-10-30 10:16
     */
    public class BlockQueue<E> implements Queue<E> {
    
    private List<E> container = new ArrayList<>();
    
    private final Object lock = new Object();
    
    @Override
    public E take() {
        synchronized (lock) {
            try {
                while (container.isEmpty()) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        return null;
                    }
                }
                return container.remove(0);
            } finally {
                if (!container.isEmpty()) {
                    lock.notifyAll();
                }
            }
        }
    }
    
    @Override
    public void offer(E e) {
        synchronized (lock) {
            container.add(e);
    
            lock.notifyAll();
        }
    }
    }

延迟队列中的元素

复制代码
    /** * @author siiirius
     * @since 2020-10-30 10:24
     */
    public interface DelayTask extends Runnable, Comparable<DelayTask> {
    
    
    /** * the time ready to run
     * * @return milli time
     */
    long getReadyTime();
    }
复制代码
    /** * @author siiirius
     * @since 2020-10-30 10:34
     */
    public class DelayTaskImpl implements DelayTask {
    
    private final Runnable runnable;
    private final long delay;
    private final long createTime;
    
    public DelayTaskImpl(Runnable runnable, long delay) {
        this.runnable = runnable;
        this.delay = delay;
        createTime = System.currentTimeMillis();
    }
    
    @Override
    public long getReadyTime() {
        return createTime + delay;
    }
    
    @Override
    public int compareTo(DelayTask o) {
        return (int) (this.getReadyTime() - o.getReadyTime());
    }
    
    @Override
    public void run() {
        runnable.run();
    }
    }

现在来写一个延迟队列,这里不再使用ArrayList作为容器,而是使用一个链表, 此时时间复杂度为n,使用堆可以将时间复杂度降低到logn

复制代码
    /** * @author siiirius
     * @since 2020-10-30 10:49
     */
    public class DelayQueue implements Queue<DelayTask> {
    
    private Node head;
    private Node tail;
    
    private final Object lock = new Object();
    
    @Override
    public DelayTask take() {
        synchronized (lock) {
            try {
                while (true) {
                    if (head == null) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            return null;
                        }
                    } else {
                        Node node = head;
                        long timeout = node.value.getReadyTime() - System.currentTimeMillis();
                        if (timeout > 0) {
                            try {
                                lock.wait(timeout);
                                continue;
                            } catch (InterruptedException e) {
                                return null;
                            }
                        }
    
                        head = node.next;
    
                        if (tail == node) {
                            tail = null;
                        }
    
                        return node.value;
                    }
    
                }
            } finally {
                if (head != null) {
                    lock.notifyAll();
                }
            }
        }
    }
    
    @Override
    public void offer(DelayTask delayTask) {
        synchronized (lock) {
            Node node = new Node(delayTask);
            if (head == null) {
                head = node;
                tail = node;
            } else {
                Node cur = head;
                while (cur != null) {
                    int compare = delayTask.compareTo(cur.value);
                    if (compare < 0) {
                        Node curPre = cur.pre;
                        if (curPre != null) {
                            node.pre = curPre;
                            curPre.next = node;
                        } else {
                            // cur 是head节点,只有head节点的 pre == null,直接插入头部
                            head = node;
                        }
                        node.next = cur;
                        cur.pre = node;
                        break;
                    } else {
                        cur = cur.next;
                    }
                }
                if (cur == null) {
                    // 应该被放到最后一个节点上, 从尾部直接添加
                    Node tailPre = tail.pre;
                    tailPre.next = node;
                    node.pre = tailPre;
                    node.next = tail;
                    tail = node;
                }
            }
            lock.notifyAll();
        }
    }
    
    
    private static class Node {
        DelayTask value;
        Node next;
        Node pre;
    
        Node(DelayTask t) {
            value = t;
        }
    }
    }

官方的延迟队列是 java.util.concurrent.DelayQueue,容器是优先队列 PriorityQueue,其实就是一个堆,使用的锁是 ReentrantLock, 条件队列是lock.newCondition(),本文中使用的是链表,锁是synchronized,使用Object对象作为锁和条件队列。

全部评论 (0)

还没有任何评论哟~