隊列以一種先進先出的方式管理數據。如果你試圖向一個已經滿了的阻塞隊列中添加一個元素,或是從一個空的阻塞隊列中移除一個元素,將導致線程阻塞。在多線程進行合作時,阻塞隊列是很有用的工具。工作者線程可以定期的把中間結果存到阻塞隊列中。而其他工作者線程把中間結果取出并在將來修改它們。隊列會自動平衡負載。如果第一個線程集運行的比第二個慢,則第二個線程集在等待結果時就會阻塞。如果第一個線程集運行的快,那么它將等待第二個線程集趕上來。
下面的程序展示了如何使用阻塞隊列來控制線程集。程序在一個目錄及它的所有子目錄下搜索所有文件,打印出包含指定關鍵字的文件列表。
java.util.concurrent包提供了阻塞隊列的4個變種:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我們用的是ArrayBlockingQueue。ArrayBlockingQueue在構造時需要給定容量,并可以選擇是否需要公平性。如果公平參數被設置了,等待時間最長的線程會優先得到處理。通常,公平性會使你在性能上付出代價,只有在的確非常需要的時候再使用它。
生產者線程枚舉在所有子目錄下的所有文件并把它們放到一個阻塞隊列中。這個操作很快,如果隊列沒有設上限的話,很快它就包含了沒有找到的文件。
我們同時還啟動了大量的搜索線程。每個搜索線程從隊列中取出一個文件,打開它,打印出包含關鍵字的所有行,然后取出下一個文件。我們使用了一個小技巧來在工作結束后終止線程。為了發出完成信號,枚舉線程把一個虛擬對象放入隊列。(這類似于在行李輸送帶上放一個寫著“最后一個包”的虛擬包。)當搜索線程取到這個虛擬對象時,就將其放回并終止。
注意,這里不需要人任何顯示的線程同步。在這個程序中,我們使用隊列數據結構作為一種同步機制。
import java.io.*;
import java.util.*;
import java.util.concurrent.*;
public class BlockingQueueTest
{
public static void main(String[] args)
{
Scanner in = new Scanner(System.in);
System.out.print("Enter base directory (e.g. /usr/local/jdk1.6.0/src): ");
String directory = in.nextLine();
System.out.print("Enter keyword (e.g. volatile): ");
String keyword = in.nextLine();
final int FILE_QUEUE_SIZE = 10;
final int SEARCH_THREADS = 100;
BlockingQueue<File> queue = new ArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
FileEnumerationTask enumerator = new FileEnumerationTask(queue, new File(directory));
new Thread(enumerator).start();
for (int i = 1; i <= SEARCH_THREADS; i++)
new Thread(new SearchTask(queue, keyword)).start();
}
}
/**
* This task enumerates all files in a directory and its subdirectories.
*/
class FileEnumerationTask implements Runnable
{
/**
* Constructs a FileEnumerationTask.
* @param queue the blocking queue to which the enumerated files are added
* @param startingDirectory the directory in which to start the enumeration
*/
public FileEnumerationTask(BlockingQueue<File> queue, File startingDirectory)
{
this.queue = queue;
this.startingDirectory = startingDirectory;
}
public void run()
{
try
{
enumerate(startingDirectory);
queue.put(DUMMY);
}
catch (InterruptedException e)
{
}
}
/**
* Recursively enumerates all files in a given directory and its subdirectories
* @param directory the directory in which to start
*/
public void enumerate(File directory) throws InterruptedException
{
File[] files = directory.listFiles();
for (File file : files)
{
if (file.isDirectory()) enumerate(file);
else queue.put(file);
}
}
public static File DUMMY = new File("");
private BlockingQueue<File> queue;
private File startingDirectory;
}
/**
* This task searches files for a given keyword.
*/
class SearchTask implements Runnable
{
/**
* Constructs a SearchTask.
* @param queue the queue from which to take files
* @param keyword the keyword to look for
*/
public SearchTask(BlockingQueue<File> queue, String keyword)
{
this.queue = queue;
this.keyword = keyword;
}
public void run()
{
try
{
boolean done = false;
while (!done)
{
File file = queue.take();
if (file == FileEnumerationTask.DUMMY)
{
queue.put(file);
done = true;
}
else search(file);
}
}
catch (IOException e)
{
e.printStackTrace();
}
catch (InterruptedException e)
{
}
}
/**
* Searches a file for a given keyword and prints all matching lines.
* @param file the file to search
*/
public void search(File file) throws IOException
{
Scanner in = new Scanner(new FileInputStream(file));
int lineNumber = 0;
while (in.hasNextLine())
{
lineNumber++;
String line = in.nextLine().trim();
if (line.contains(keyword)) System.out.printf("%s:%d %s%n", file.getPath(), lineNumber, line);
}
in.close();
}
private BlockingQueue<File> queue;
private String keyword;
}