目錄
- 一、PriorityBlockingQueue概述
- 二、PriorityBlockingQueue源碼解析
- 1.容器
- 2.比較器
- 3.構(gòu)造函數(shù)
- 4.添加元素
- 5.獲取元素
- 6.維護(hù)堆性質(zhì)
- 總結(jié)
PriorityBlockingQueue是Java中實(shí)現(xiàn)了堆數(shù)據(jù)結(jié)構(gòu)的線程安全的有界阻塞隊(duì)列。它可以在多線程場(chǎng)景下安全地進(jìn)行元素添加、刪除和獲取操作,而且可以根據(jù)元素的優(yōu)先級(jí)進(jìn)行排序。本篇博客將會(huì)深入解讀PriorityBlockingQueue的源碼實(shí)現(xiàn)。
一、PriorityBlockingQueue概述
PriorityBlockingQueue類實(shí)現(xiàn)了BlockingQueue接口,它是一個(gè)線程安全的隊(duì)列,繼承自AbstractQueue類,而AbstractQueue類又實(shí)現(xiàn)了Queue接口。PriorityBlockingQueue是一個(gè)有界的隊(duì)列,其容量可以在構(gòu)造函數(shù)中進(jìn)行指定,若不指定則默認(rèn)大小為Integer.MAX_VALUE。同時(shí),PriorityBlockingQueue也支持根據(jù)元素的優(yōu)先級(jí)進(jìn)行排序,這是由于PriorityBlockingQueue內(nèi)部實(shí)現(xiàn)了一個(gè)堆數(shù)據(jù)結(jié)構(gòu)。
二、PriorityBlockingQueue源碼解析
1.容器
PriorityBlockingQueue內(nèi)部使用了一個(gè)Object類型的數(shù)組queue來(lái)存儲(chǔ)元素,同時(shí)使用了一個(gè)int類型的變量size來(lái)記錄元素的數(shù)量。下面是PriorityBlockingQueue類中的定義:
private transient Object[] queue; private transient int size;
2.比較器
PriorityBlockingQueue可以根據(jù)元素的優(yōu)先級(jí)進(jìn)行排序,這是由于PriorityBlockingQueue內(nèi)部維護(hù)了一個(gè)小根堆或大根堆。在構(gòu)造函數(shù)中,我們可以選擇使用元素自身的比較方式或是自定義比較器來(lái)進(jìn)行元素的排序。若未指定比較器,則PriorityBlockingQueue將使用元素自身的比較方式進(jìn)行排序。
private final Comparator<? super E> comparator;
3.構(gòu)造函數(shù)
PriorityBlockingQueue提供了多個(gè)構(gòu)造函數(shù),我們可以選擇使用無(wú)參構(gòu)造函數(shù)來(lái)創(chuàng)建一個(gè)默認(rèn)容量為Integer.MAX_VALUE的PriorityBlockingQueue,或是使用帶有初始容量或自定義比較器的構(gòu)造函數(shù)。下面是PriorityBlockingQueue類的兩個(gè)構(gòu)造函數(shù):
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.queue = new Object[initialCapacity]; this.comparator = comparator; }
4.添加元素
PriorityBlockingQueue中添加元素的方法為offer()方法,它會(huì)首先檢查容量是否足夠,如果容量不足則會(huì)進(jìn)行擴(kuò)容操作,擴(kuò)容的方式是將原數(shù)組長(zhǎng)度增加一半。接著,它會(huì)將新元素添加到隊(duì)列的末尾,并通過(guò)siftUp()方法將元素上濾到合適的位置,以維護(hù)堆的性質(zhì)。
public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (n == 0) { array[0] = e; } else { siftUp(n, e, array, cmp); } size = n + 1; notEmpty.signal(); } finally { lock.unlock(); } return true; }
5.獲取元素
PriorityBlockingQueue中獲取元素的方法為take()方法,它會(huì)首先檢查隊(duì)列是否為空,如果隊(duì)列為空則會(huì)將當(dāng)前線程阻塞,直到有元素被添加到隊(duì)列中。接著,它會(huì)獲取隊(duì)列的頭部元素,并通過(guò)siftDown()方法將隊(duì)列的末尾元素移動(dòng)到頭部,以維護(hù)堆的性質(zhì)。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while (size == 0) notEmpty.await(); result = extract(); } finally { lock.unlock(); } return result; } private E extract() { final Object[] array = queue; final E result = (E) array[0]; final int n = --size; final E x = (E) array[n]; array[n] = null; if (n != 0) siftDown(0, x, array, comparator); return result; }
6.維護(hù)堆性質(zhì)
PriorityBlockingQueue使用小根堆或大根堆來(lái)維護(hù)元素的優(yōu)先級(jí),這里我們以小根堆為例。小根堆的特點(diǎn)是父節(jié)點(diǎn)的值小于等于左右子節(jié)點(diǎn)的值,PriorityBlockingQueue中的堆是通過(guò)數(shù)組來(lái)實(shí)現(xiàn)的。當(dāng)添加元素時(shí),會(huì)將新元素添加到隊(duì)列的末尾,并通過(guò)siftUp()方法將元素上濾到合適的位置,以維護(hù)堆的性質(zhì)。當(dāng)獲取元素時(shí),會(huì)獲取隊(duì)列的頭部元素,并通過(guò)siftDown()方法將隊(duì)列的末尾元素移動(dòng)到頭部,以維護(hù)堆的性質(zhì)。下面是siftUp()和siftDown()方法的代碼實(shí)現(xiàn):
private static <T> void siftUp(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftUpUsingComparator(k, x, array, cmp); else siftUpComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftUpUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (cmp.compare(x, (T) e) >= 0) break; array[k] = e; k = parent; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = array[parent]; if (key.compareTo((T) e) >= 0) break; array[k] = e; k = parent; } array[k] = key; } private static <T> void siftDown(int k, T x, Object[] array, Comparator<? super T> cmp) { if (cmp != null) siftDownUsingComparator(k, x, array, cmp); else siftDownComparable(k, x, array); } @SuppressWarnings("unchecked") private static <T> void siftDownUsingComparator(int k, T x, Object[] array, Comparator<? super T> cmp) { int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && cmp.compare((T) c, (T) array[right]) > 0) c = array[child = right]; if (cmp.compare(x, (T) c) <= 0) break; array[k] = c; k = child; } array[k] = x; } @SuppressWarnings("unchecked") private static <T> void siftDownComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; int half = size >>> 1; while (k < half) { int child = (k << 1) + 1; Object c = array[child]; int right = child + 1; if (right < size && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; if (key.compareTo((T) c) <= 0) break; array[k] = c; k = child; } array[k] = key; }
siftUp()方法和siftDown()方法都使用了siftUpUsingComparator()方法和siftDownUsingComparator()方法,它們是使用Comparator來(lái)實(shí)現(xiàn)堆的上濾和下濾的。當(dāng)PriorityBlockingQueue沒(méi)有指定Comparator時(shí),會(huì)使用元素自身的Comparable來(lái)實(shí)現(xiàn)堆的上濾和下濾。
總結(jié)
PriorityBlockingQueue是Java并發(fā)包中的一個(gè)線程安全的、支持優(yōu)先級(jí)隊(duì)列的類,它使用小根堆或大根堆來(lái)維護(hù)元素的優(yōu)先級(jí)。PriorityBlockingQueue的內(nèi)部實(shí)現(xiàn)使用了ReentrantLock和Condition來(lái)實(shí)現(xiàn)線程安全的操作,同時(shí)使用了數(shù)組來(lái)實(shí)現(xiàn)堆。PriorityBlockingQueue的核心方法包括添加元素的offer()方法,獲取元素的take()方法,以及維護(hù)堆性質(zhì)的siftUp()方法和siftDown()方法。PriorityBlockingQueue的使用方式與普通隊(duì)列類似,但可以根據(jù)元素的優(yōu)先級(jí)進(jìn)行排序和處理。
以上就是詳解Java并發(fā)編程中的優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue的詳細(xì)內(nèi)容,更多關(guān)于Java PriorityBlockingQueue的資料請(qǐng)關(guān)注其它相關(guān)文章!
原文地址:https://juejin.cn/post/7229492628873674807