阻塞队列是Java并发中重要的并发容器,而且这个并发容器首先是队列,提供队列FIFO的特性,而且是这个队列是具有阻塞特性的,也就是说在队列满和队列空的情况下,会对添加线程和获取线程进行阻塞,以等待相应的同步状态释放,方法才得以返回。这种特性大有用武之地,比如典型的并发问题,生产者消费者问题,就可以将这种阻塞队列抽象出来,实现一种具有并发控制的容器,作为一种新的数据结构提供给应用层使用。
JDK提供了多种阻塞队列的实现,常用的如ArrayBlockingQueue,LinkedBlockingQueue等等,本文致力于研究JDK中的阻塞队列的具体实现,并且学习其设计思路。
ArrayBlockingQueue
ArrayBlockingQueue中有三对方法:
- boolean add(E e),E remove(),这两个方法分别是添加元素到队尾,删除队头的元素,如果队列已满或者队列空的情况下抛出异常,这两个方法继承自父类Queue的实现。
- boolean offer(E e),E poll(),这两个方法也是实现添加元素到队尾,删除队头的元素,但是在队列已满或者空的情况下不抛出异常,而是返回false,在添加或者删除的时候进行了加锁控制
- void put(E e),E take(),功能同以上,但是在队列满的时候,put方法会阻塞,直至有take方法将其唤醒,同样的,当队列为空的时候,take方法也会阻塞,直到put方法将其唤醒。
我们重点来看put和take方法的阻塞实现,他们使用了可重入锁ReentrantLock,并且使用Condition来控制入队和出队线程。如图,ArrayBlockingQueue的构造方法会初始化ReentrantLock,并且会初始化两个Condition对象。
put方法在插入的时候加锁,并且尝试判断当前队列是否已满,如果满了,则调用notFull Condition的await方法阻塞,使用while循环的原因是当返回的时候还要检查一遍。最后进行入队操作,并且唤醒notEmpty Condition,解锁返回。
take方法的过程是相似的,加锁后判断队列是否为空,如果为空的话,使用notEmpty阻塞,等待被唤醒,被唤醒后如果队列不为空,调用dequeue方法出队,在出队后唤醒notFull Condition。
LinkedBlockingQueue
LinkedBlockingQueue同样使用ReentrantLock进行并发控制,但是队列使用双向链表进行保存,并且持有首节点和尾节点的引用,并发控制逻辑和ArrayBlockingList类似
PriorityBlockingQueue
PriorityBlocingQueue是一个按照优先级排序的无界队列,所谓无界队列是指默认容量无限大,进行put的时候不会进行阻塞,只在take的时候如果队列为空进行阻塞。
priorityBlockingQueue也是使用数组作为存储容器,并且容器内的元素按照堆的顺序进行排序,所有容器内的元素需要实现Comparable接口或者传入Comparator对象。
put方法直接调用了offer方法,其有几个关键步骤,首先判断是否需要扩容,使用tryGrow方法进行扩容,插入阶段调用siftUpComparable或者siftUpUsingComparator进行堆的上滤调整,最后唤醒notEmpty Conditon。
take方法依旧需要阻塞,和ArrayBlockingQueue的方法类似,不过出队后要进行堆调整。
PriorityBlockingQueue适合需要按照一定的排序规则获取队列中元素的任务。
SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列,每个put必须等待一个take操作,否则不能继续添加元素,实现上使用TransferQueue的transfer放来完成传递,适合有大量消费者的情况使用,这样保证一直有消费者进行take操作,比如Excutors的newCachedThreadPool就使用了SynchronousQueue作为阻塞队列。
声明
本文首发表于我的博客,欢迎关注!转载须注明文章出处,作者保留文章所有权。