LinkedBlockingQueue1.8源码详解

[TOC]

创新互联公司是一家专业提供尚志企业网站建设,专注与成都网站制作、成都网站设计、H5场景定制、小程序制作等业务。10年已为尚志众多企业、政府机构等服务。创新互联专业的建站公司优惠进行中。

LinkedBlockingQueue 1.8 源码详解

一,简介

LinkedBlockingQueue 是一个用链表实现的有界阻塞队列;此队列的默认和最大长度为Integer.MAX_VALUE;此队列按照先进先出的原则对元素就行排序;队列有两个锁,生成和消费各一把锁,都是默认的非公平锁。

二,类UML图

LinkedBlockingQueue 1.8 源码详解

三,基本成员
    static class Node<E> {
        // 我们插入的值
        E item;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head.next
         * - null, meaning there is no successor (this is the last node)
         */
        // 下一个node
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** 队列容量 */
    private final int capacity;

    /** 两个锁,需要使用AtomicInteger保证原子性 */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    // 头结点
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    // 尾节点
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    /** take, poll, etc 的锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    /** 等待在队列空 */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    /** put, offer, etc的锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    /** 等待在队列满 */
    private final Condition notFull = putLock.newCondition();
四,常用方法
构造方法
    // 无参构造
    public LinkedBlockingQueue() {
        // 默认Integer.MAX_VALUE
        this(Integer.MAX_VALUE);
    }
    // 有参构造
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        // 创建一个item为null的节点
        last = head = new Node<E>(null);
    }
offer 方法
public boolean offer(E e) {
        // e不能为null
        if (e == null) throw new NullPointerException();
        // 总数
        final AtomicInteger count = this.count;
        // 总数等于了容量 返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        // 创建一个node
        Node<E> node = new Node<E>(e);
        // 获取锁
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                // 插入链表
                enqueue(node);
                // 加1返回旧值
                c = count.getAndIncrement();
                // c是增加之前的值,然后加1,再判断有没有可以存储的容量
                if (c + 1 < capacity)
                    // 有唤醒下一个线程
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        // 队列有一个元素了,证明之前队列为空,可能已经有元素来消费了,所以就需要唤醒一个等待消费的线程
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

注意:offer 还有一个重载方法,支持中断,带有超时时间的限制offer(E e, long timeout, TimeUnit unit)。

put 方法
    public void put(E e) throws InterruptedException {
        // 不可以为null
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        // 构建一个节点
        Node<E> node = new Node<E>(e);
        // 获取put锁
        final ReentrantLock putLock = this.putLock;
        // 获取count
        final AtomicInteger count = this.count;
        // 调用获取锁的方法,支持中断
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // 等于了队列的容量
            while (count.get() == capacity) {
                // 进入阻塞队列
                notFull.await();
            }
            // 入队
            enqueue(node);
            // 返回的是自增前的值
            c = count.getAndIncrement();
            // 如果这个元素入队以后,还有多于的空间,唤醒等待队列的线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        // c==0,证明之前队列是空的,唤醒一个获取线程
        if (c == 0)
            signalNotEmpty();
    }
poll 方法

这次我们看个带超时时间的poll方法。

    // 带超时时间的消费一个元素
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 支持中断的获取锁
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            // count-- 返回旧值
            c = count.getAndDecrement();
            // 还有元素,唤醒一个等待获取的线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 队列还有一个位置,唤醒一个入队线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC // 自引用
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
take 方法
    // 获取元素
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 队列为null 就阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        // 队列消费一个元素,可以唤醒一个生产线程了
        if (c == capacity)
            signalNotFull();
        return x;
    }
peek 方法
    // 获取第一个元素
    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
size 方法
    public int size() {
        return count.get();
    }
五,总结

LinkedBlockingQueue 可以看做是一个×××队列,因为最大容量是Integer.MAX_VALUE,这已经很大了,所以使用时一定注意容量问题,避免内存溢出,但是好处就是可以不用我们去初始容量;队列在入队和出队使用了两把锁,提高了并发性,相对于一把锁来说;我们可以发现队列的底层数据结构采用的是链表,对比ArrayBlockingQueue的数组数据结构,在处理数据的同时,节点本身也需要处理垃圾回收,所以相对于数组来的数据来说增加了垃圾回收,可能影响性能;LinkedBlockingQueue 和ArrayBlockingQueue 两个可以对比学习,追求系统稳定性,性能就使用ArrayBlockingQueue ,追求并发性,可能发生大量请求时(系统不是很稳定)要注意内存溢出就使用LinkedBlockingQueue ,使用场景属于个人理解,欢迎指正。

《参考 Java 并发编程的艺术》

文章标题:LinkedBlockingQueue1.8源码详解
标题路径:https://www.cdcxhl.com/article16/ghscdg.html

成都网站建设公司_创新互联,为您提供搜索引擎优化网站设计品牌网站建设移动网站建设网站建设品牌网站制作

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

小程序开发