1、任何的高并發,請求總是會有一個順序的
2、java的隊列的數據結構是先進先出的取值順序
3、BlockingQueue類(線程安全)(使用方法可以百度)
一般使用LinkedBlockingQueue
利用以上幾點,我們可以把高并發時候的請求放入一個隊列,隊列的大小可以自己定義,比如隊列容量為1000個數據,那么可以利用過濾器或者攔截器把當前的請求放入隊列,如果隊列的容量滿了,其余的請求可以丟掉或者作出相應回復
具體實施:
利用生產者、消費者模型:
將隊列的請求一一處理完。
上代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
/** * @author fuguangli * @description 前沿消費者類 * @Create date: 2017/3/7 * @using EXAMPLE */ public class Customer implements Runnable{ /** * 拋出異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 檢查 element() peek() 不可用 不可用 */ private BlockingQueue blockingQueue; private AtomicInteger count = new AtomicInteger(); public Customer(BlockingQueue blockingQueue) { this .blockingQueue = blockingQueue; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p/> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { System.out.println( "消費者線程啟動..." ); LockFlag.setCustomerRunningFlag( true ); try { while (LockFlag.getProducerRunningFlag()){ System.out.println(Thread.currentThread().getId()+ "I'm Customer.Queue current size=" +blockingQueue.size()); String data = (String) blockingQueue.poll( 10 , TimeUnit.SECONDS); if (data!= null ){ System.out.println(Thread.currentThread().getId()+ "*************正在消費數據 data=" +data); } else { //表示超過取值時間,視為生產者不再生產數據 System.out.println(Thread.currentThread().getId()+ "隊列為空無數據,請檢查生產者是否阻塞" ); } Thread.sleep( 50 ); } System.err.println( "消費者程序執行完畢" ); } catch (InterruptedException e) { e.printStackTrace(); System.err.println( "消費者程序退出" ); LockFlag.setCustomerRunningFlag( false ); //異常退出線程 Thread.currentThread().interrupt(); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
package com.qysxy.framework.queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author fuguangli * @description 隊列生產者類 * @Create date: 2017/3/7 * @using EXAMPLE */ public class Producer implements Runnable{ /** * 拋出異常 特殊值 阻塞 超時 插入 add(e) offer(e) put(e) offer(e, time, unit) 移除 remove() poll() take() poll(time, unit) 檢查 element() peek() 不可用 不可用 */ private BlockingQueue blockingQueue; private AtomicInteger count = new AtomicInteger(); public Producer(BlockingQueue blockingQueue) { this .blockingQueue = blockingQueue; } /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p/> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { System.out.println( "生產者線程啟動..." ); LockFlag.setProducerRunningFlag( true ); try { while (LockFlag.getProducerRunningFlag()){ String data = "data:" +count.incrementAndGet(); if (blockingQueue.offer(data, 10 , TimeUnit.SECONDS)){ //返回true表示生產數據正確 System.out.println( "^^^^^^^^^^^^^^正在生產數據 data=" +data); } else { //表示阻塞時間內還沒有生產者生產數據 System.out.println( "生產者異常,無法生產數據" ); } Thread.sleep( 50 ); } } catch (InterruptedException e) { e.printStackTrace(); System.err.println( "生產者程序退出" ); LockFlag.setProducerRunningFlag( false ); //異常退出線程 Thread.currentThread().interrupt(); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.qysxy.framework.queue; /** * @author fuguangli * @description 前沿生產者消費者模型的鎖類 * @Create date: 2017/3/7 */ public class LockFlag { /** * 生產者互斥鎖 */ private static Boolean producerRunningFlag = false ; /** * 消費者互斥鎖 */ private static Boolean customerRunningFlag = false ; public static Boolean getProducerRunningFlag() { return producerRunningFlag; } public static void setProducerRunningFlag(Boolean producerRunningFlag) { LockFlag.producerRunningFlag = producerRunningFlag; } public static Boolean getCustomerRunningFlag() { return customerRunningFlag; } public static void setCustomerRunningFlag(Boolean customerRunningFlag) { LockFlag.customerRunningFlag = customerRunningFlag; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
package com.qysxy.framework.queue; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.util.Queue; import java.util.concurrent.*; /** * @author fuguangli * @description 前沿隊列實用類,用于大量并發用戶 * @Create date: 2017/3/7 */ public class BlockingQueueHelper { private static final Integer maxQueueSize = 1000 ; private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize); private static ExecutorService threadPool = Executors.newCachedThreadPool(); public static BlockingQueue getBlockingQueue() { if (blockingQueue == null ) { blockingQueue = new LinkedBlockingQueue(maxQueueSize); } return blockingQueue; } /** * @param o 隊列處理對象(包含request,response,data) */ public static void requestQueue(Object o) { //檢測當前的隊列大小 if (blockingQueue != null && blockingQueue.size() < maxQueueSize) { //可以正常進入隊列 if (blockingQueue.offer(o)) { //添加成功,檢測數據處理線程是否正常 if (LockFlag.getCustomerRunningFlag()) { //說明處理線程類正常運行 } else { //說明處理線程類停止,此時,應重新啟動線程進行數據處理 LockFlag.setCustomerRunningFlag( true ); //example:run Customer customer = new Customer(blockingQueue); threadPool.execute(customer); } } else { //進入隊列失敗,做出相應的處理,或者嘗試重新進入隊列 } } else { //隊列不正常,或隊列大小已達上限,做出相應處理 } } } |
好了,這時候,利用過濾器或者攔截器將每個請求封裝成隊列元素進行處理就行。
當然了,對于多應用服務器的部署架構來說,數據庫也需要加鎖,數據庫隔離級別下篇再說。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。