開始
在本文中,我將展示如何使用各種不同的 Java 技術構建一些簡單的 Comet 風格的 Web 應用程序。讀者對 Java Servlet、Ajax 和 JavaScript 應該有一定的了解。我們將考察 Tomcat 和 Jetty 中一些支持 Comet 的特性,因此需要使用這兩個產品的最新版本。本文使用 Tomcat 6.0.14 和 Jetty 6.1.14.另外還需要一個支持 Java 5 或更高版本的 JDK.本文使用 JDK 1.5.0-16.此外還需要看看 Jetty 7 的預發布版,因為它實現了 Servlet 3.0 規范,我們將在本文中研究該規范。
理解 Comet
您可能已經聽說過 Comet,因為它最近受到了一定的關注。Comet 有時也稱反向 Ajax 或服務器端推技術(server-side push)。其思想很簡單:將數據直接從服務器推到瀏覽器,而不必等到瀏覽器請求數據。聽起來簡單,但是如果熟悉 Web 應用程序,尤其是 HTTP 協議,那么您就會知道,這絕不簡單。實現 Comet 風格的 Web 應用程序,同時保證在瀏覽器和服務器上的可伸縮性,這只是在最近幾年才成為可能。在本文的后面,我們將看看一些流行的 Java Web 服務器如何支持可伸縮的 Comet 架構,但首先我們來看看為什么要創建 Comet 應用程序,以及用于實現它們的常見設計模式。
使用 Comet 的動機
HTTP 協議的成功毋庸置疑。它是 Internet 上大部分信息交換的基礎。然而,它也有一些局限性。特別是,它是無狀態、單向的協議。請求被發送到 Web 服務器,服務器處理請求并發回一個響應 — 僅此而已。請求必須由客戶機發出,而服務器則只能在對請求的響應中發送數據。這至少會影響很多類型的 Web 應用程序的實用性。典型的例子就是聊天程序。另外還有一些例子,例如比賽的比分、股票行情或電子郵件程序。
HTTP 的這些局限性也是它取得一定成功的原因。請求/響應周期使它成為了經典的模型,即每個連接使用一個線程。只要能夠快速為請求提供服務,這種方法就有巨大的可伸縮性。每秒鐘可以處理大量的請求,只需使用少量的服務器就可以處理很大數量的用戶。對于很多經典的 Web 應用程序,例如內容管理系統、搜索應用程序和電子商務站點等等而言,這非常適合。在以上任何一種 Web 應用程序中,服務器提供用戶請求的數據,然后關閉連接,并釋放那個線程,使之可以為其他請求服務。如果提供初始數據之后仍可能存在交互,那么將連接保持為打開狀態,因此線程就不能釋放出來,服務器也就不能為很多用戶服務。
但是,如果想在對請求做出響應并發送初始數據之后,仍然保持與用戶的交互呢?在 Web 早期,這一點常使用 meta 刷新實現。這將自動指示瀏覽器在指定秒數之后重新裝載頁面,從而支持簡陋的輪詢(polling)。這不僅是一種糟糕的用戶體驗,而且通常效率非常低下。如果沒有新的數據要顯示在頁面上呢?這時不得不重新呈現同樣的頁面。如果對頁面的更改很少,并且頁面的大部分沒有變化呢?同樣,不管是否有必要,都得重新請求和獲取頁面上的一切內容。
Ajax 的發明和流行改變了上述狀況?,F在,服務器可以異步通信,因此不必重新請求整個頁面?,F在可以進行增量式的更新。只需使用 XMLHttpRequest 輪詢服務器。這項技術通常被稱作 Comet.這項技術存在一些變體,每種變體具有不同的性能和可伸縮性。我們來看看這些不同風格的 Comet.
Comet 風格
Ajax 的出現使 Comet 成為可能。HTTP 的單向性質可以有效地加以規避。實際上有一些不同的方法可以繞過這一點。您可能已經猜到,支持 Comet 的最容易的方式是輪詢(poll)。使用 XMLHttpRequest 向服務器發出調用,返回后,等待一段固定的時間(通常使用 JavaScript 的 setTimeout 函數),然后再次調用。這是一項非常常見的技術。例如,大多數 webmail 應用程序就是通過這種技術在電子郵件到達時顯示電子郵件的。
這項技術有優點也有缺點。在這種情況下,您期望快速返回響應,就像任何其他 Ajax 請求一樣。在請求之間必須有一段暫停。否則,連續不斷的請求會沖垮服務器,并且這種情況下顯然不具有可伸縮性。這段暫停使應用程序產生一個延時。暫停的時間越長,服務器上的新數據就需要越多的時間才能到達客戶機。如果縮短暫停時間,又將重新面臨沖垮服務器的風險。但是另一方面,這顯然是最簡單的實現 Comet 的方式。
現在應該指出,很多人認為輪詢并不屬于 Comet.相反,他們認為 Comet 是對輪詢的局限性的一個解決方案。最常見的 “真正的” Comet 技術是輪詢的一種變體,即長輪詢(long polling)。輪詢與長輪詢之間的主要區別在于服務器花多長的時間作出響應。長輪詢通常將連接保持一段較長的時間 — 通常是數秒鐘,但是也可能是一分鐘甚至更長。當服務器上發生某個事件時,響應被發送并隨即關閉,輪詢立即重新開始。
長輪詢相對于一般輪詢的優點在于,數據一旦可用,便立即從服務器發送到客戶機。請求可能等待較長的時間,期間沒有任何數據返回,但是一旦有了新的數據,它將立即被發送到客戶機。因此沒有延時。如果您使用過基于 Web 的聊天程序,或者聲稱 “實時” 的任何程序,那么它很可能就是使用了這種技術。
長輪詢有一種變體,這是第三種風格的 Comet.這通常被稱為流(streaming)。按照這種風格,服務器將數據推回客戶機,但是不關閉連接。連接將一直保持開啟,直到過期,并導致重新發出請求。XMLHttpRequest 規范表明,可以檢查 readyState 的值是否為 3 或 Receiving(而不是 4 或 Loaded),并獲取正從服務器 “流出” 的數據。和長輪詢一樣,這種方式也沒有延時。當服務器上的數據就緒時,該數據被發送到客戶機。這種方式的另一個優點是可以大大減少發送到服務器的請求,從而避免了與設置服務器連接相關的開銷和延時。不幸的是,XMLHttpRequest 在不同的瀏覽器中有很多不同的實現。這項技術只能在較新版本的 Mozilla Firefox 中可靠地使用。對于 Internet Explorer 或 Safari,仍需使用長輪詢。
至此,您可能會想,長輪詢和流都有一個很大的問題。請求需要在服務器上存在一段較長的時間。這打破了每個請求使用一個線程的模型,因為用于一個請求的線程一直沒有被釋放。更糟糕的是,除非要發回數據,否則該線程一直處于空閑狀態。這顯然不具有可伸縮性。幸運的是,現代 Java Web 服務器有很多方式可以解決這個問題。
Java 中的 Comet
現在有很多 Web 服務器是用 Java 構建的。一個原因是 Java 有一個豐富的本地線程模型。因此實現典型的每個連接一個線程的模型便非常簡單。該模型對于 Comet 不大適用,但是,Java 對此同樣有解決的辦法。為了有效地處理 Comet,需要非阻塞 IO,Java 通過它的 NIO 庫提供非阻塞 IO.兩種最流行的開源服務器 Apache Tomcat 和 Jetty 都利用 NIO 增加非阻塞 IO,從而支持 Comet.然而,這兩種服務器中的實現卻各不相同。我們來看看 Tomcat 和 Jetty 對 Comet 的支持。
Tomcat 和 Comet
對于 Apache Tomcat,要使用 Comet,主要需要做兩件事。首先,需要對 Tomcat 的配置文件 server.XML 稍作修改。默認情況下啟用的是更典型的同步 IO 連接器。現在只需將它切換成異步版本,如清單 1 所示。
清單 1. 修改 Tomcat 的 server.xml
1
2
3
4
5
|
<!-- This is the usual Connector, comment it out and add the NIO one --> <!-- Connector URIEncoding="utf-8" connectionTimeout="20000" port="8084" protocol="HTTP/1.1" redirectPort="8443"/ --> < Connector connectionTimeout = "20000" port = "8080" protocol="org.apache. coyote.http11.Http11NioProtocol" redirectPort = "8443" /> |
Servlet.這顯然是 Tomcat 特有的一個接口。清單 2 顯示了一個這樣的例子。
清單 2. Tomcat Comet servlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public class TomcatWeatherServlet extends HttpServlet implements CometProcessor { private MessageSender messageSender = null ; private static final Integer TIMEOUT = 60 * 1000 ; @Override public void destroy() { messageSender.stop(); messageSender = null ; } @Override public void init() throws ServletException { messageSender = new MessageSender(); Thread messageSenderThread = new Thread(messageSender, "MessageSender[" + getServletContext() .getContextPath() + "]" ); messageSenderThread.setDaemon( true ); messageSenderThread.start(); } public void event( final CometEvent event) throws IOException, ServletException { HttpServletRequest request = event.getHttpServletRequest(); HttpServletResponse response = event.getHttpServletResponse(); if (event.getEventType() == CometEvent.EventType.BEGIN) { request.setAttribute( "org.apache.tomcat.comet.timeout" , TIMEOUT); log( "Begin for session: " + request.getSession( true ).getId()); messageSender.setConnection(response); Weatherman weatherman = new Weatherman( 95118 , 32408 ); new Thread(weatherman).start(); } else if (event.getEventType() == CometEvent.EventType.ERROR) { log( "Error for session: " + request.getSession( true ).getId()); event.close(); } else if (event.getEventType() == CometEvent.EventType.END) { log( "End for session: " + request.getSession( true ).getId()); event.close(); } else if (event.getEventType() == CometEvent.EventType.READ) { throw new UnsupportedOperationException("This servlet does not accept data"); } } } |
CometProcessor 接口要求實現 event 方法。這是用于 Comet 交互的一個生命周期方法。Tomcat 將使用不同的 CometEvent 實例調用。通過檢查 CometEvent 的 eventType,可以判斷正處在生命周期的哪個階段。當請求第一次傳入時,即發生 BEGIN 事件。READ 事件表明數據正在被發送,只有當請求為 POST 時才需要該事件。遇到 END 或 ERROR 事件時,請求終止。
在清單 2 的例子中,Servlet 使用一個 MessageSender 類發送數據。這個類的實例是在 servlet 的 init 方法中在其自身的線程中創建,并在 servlet 的 destroy 方法中銷毀的。清單 3 顯示了 MessageSender.
清單 3. MessageSender
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
private class MessageSender implements Runnable { protected boolean running = true ; protected final ArrayList messages = new ArrayList(); private ServletResponse connection; private synchronized void setConnection(ServletResponse connection){ this .connection = connection; notify(); } public void send(String message) { synchronized (messages) { messages.add(message); log( "Message added #messages=" + messages.size()); messages.notify(); } } public void run() { while (running) { if (messages.size() == 0 ) { try { synchronized (messages) { messages.wait(); } } catch (InterruptedException e) { // Ignore } } String[] pendingMessages = null ; synchronized (messages) { pendingMessages = messages.toArray( new String[ 0 ]); messages.clear(); } try { if (connection == null ){ try { synchronized ( this ){ wait(); } } catch (InterruptedException e){ // Ignore } } PrintWriter writer = connection.getWriter(); for ( int j = 0 ; j < pendingMessages.length; j++) { final String forecast = pendingMessages[j] + " "; writer.println(forecast); log( "Writing:" + forecast); } writer.flush(); writer.close(); connection = null ; log( "Closing connection" ); } catch (IOException e) { log( "IOExeption sending message" , e); } } } } |
這個類基本上是樣板代碼,與 Comet 沒有直接的關系。但是,有兩點要注意。這個類含有一個 ServletResponse 對象?;仡^看看清單 2 中的 event 方法,當事件為 BEGIN 時,response 對象被傳入到 MessageSender 中。在 MessageSender 的 run 方法中,它使用 ServletResponse 將數據發送回客戶機。注意,一旦發送完所有排隊等待的消息后,它將關閉連接。這樣就實現了長輪詢。如果要實現流風格的 Comet,那么需要使連接保持開啟,但是仍然刷新數據。
回頭看清單 2 可以發現,其中創建了一個 Weatherman 類。正是這個類使用 MessageSender 將數據發送回客戶機。這個類使用 Yahoo RSS feed 獲得不同地區的天氣信息,并將該信息發送到客戶機。這是一個特別設計的例子,用于模擬以異步方式發送數據的數據源。清單 4 顯示了它的代碼。
清單 4. Weatherman
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
private class Weatherman implements Runnable{ private final List zipCodes; private final String YAHOO_WEATHER = "http://weather.yahooapis.com/forecastrss?p=" ; public Weatherman(Integer... zips) { zipCodes = new ArrayList(zips.length); for (Integer zip : zips) { try { zipCodes.add( new URL(YAHOO_WEATHER + zip)); } catch (Exception e) { // dont add it if it sucks } } } public void run() { int i = 0 ; while (i >= 0 ) { int j = i % zipCodes.size(); SyndFeedInput input = new SyndFeedInput(); try { SyndFeed feed = input.build( new InputStreamReader(zipCodes.get(j) .openStream())); SyndEntry entry = (SyndEntry) feed.getEntries().get( 0 ); messageSender.send(entryToHtml(entry)); Thread.sleep(30000L); } catch (Exception e) { // just eat it, eat it } i++; } } private String entryToHtml(SyndEntry entry){ StringBuilder html = new StringBuilder(" "); html.append(entry.getTitle()); html.append(" "); html.append(entry.getDescription().getValue()); return html.toString(); } } |
這個類使用 Project Rome 庫解析來自 Yahoo Weather 的 RSS feed.如果需要生成或使用 RSS 或 Atom feed,這是一個非常有用的庫。此外,這個代碼中只有一個地方值得注意,那就是它產生另一個線程,用于每過 30 秒鐘發送一次天氣數據。最后,我們再看一個地方:使用該 Servlet 的客戶機代碼。在這種情況下,一個簡單的 JSP 加上少量的 JavaScript 就足夠了。清單 5 顯示了該代碼。
清單 5. 客戶機 Comet 代碼
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
"http://www.w3.org/TR/html4/loose.dtd" > var request = new XMLHttpRequest(); request.open( "GET" , url, true ); request.setRequestHeader( "Content-Type" , "application/x-javascript; " ); request.onreadystatechange = function () { if (request.readyState == 4) { if (request.status == 200){ if (request.responseText) { document.getElementById( "forecasts" ).innerHTML = request.responseText; } } go(); } }; request.send( null ); } |
該代碼只是在用戶單擊 Go 按鈕時開始長輪詢。注意,它直接使用 XMLHttpRequest 對象,所以這在 Internet Explorer 6 中將不能工作。您可能需要使用一個 Ajax 庫解決瀏覽器差異問題。除此之外,惟一需要注意的是回調函數,或者為請求的 onreadystatechange 函數創建的閉包。該函數粘貼來自服務器的新的數據,然后重新調用 go 函數。
現在,我們看過了一個簡單的 Comet 應用程序在 Tomcat 上是什么樣的。有兩件與 Tomcat 密切相關的事情要做:一是配置它的連接器,二是在 Servlet 中實現一個特定于 Tomcat 的接口。您可能想知道,將該代碼 “移植” 到 Jetty 有多大難度。接下來我們就來看看這個問題。
Jetty 和 Comet
Jetty 服務器使用稍微不同的技術來支持 Comet 的可伸縮的實現。Jetty 支持被稱作 continuations 的編程結構。其思想很簡單。請求先被暫停,然后在將來的某個時間點再繼續。規定時間到期,或者某種有意義的事件發生,都可能導致請求繼續。當請求被暫停時,它的線程被釋放。
可以使用 Jetty 的 org.mortbay.util.ajax.ContinuationSupport 類為任何 HttpServletRequest 創建 org.mortbay.util.ajax.Continuation 的一個實例。這種方法與 Comet 有很大的不同。但是,continuations 可用于實現邏輯上等效的 Comet.清單 6 顯示清單 2 中的 weather servlet “移植” 到 Jetty 后的代碼。
清單 6. Jetty Comet servlet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
public class JettyWeatherServlet extends HttpServlet { private MessageSender messageSender = null ; private static final Integer TIMEOUT = 5 * 1000 ; public void begin(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { request.setAttribute( "org.apache.tomcat.comet" , Boolean.TRUE); request.setAttribute( "org.apache.tomcat.comet.timeout" , TIMEOUT); messageSender.setConnection(response); Weatherman weatherman = new Weatherman( 95118 , 32408 ); new Thread(weatherman).start(); } public void end(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { synchronized (request) { request.removeAttribute( "org.apache.tomcat.comet" ); Continuation continuation = ContinuationSupport.getContinuation (request, request); if (continuation.isPending()) { continuation.resume(); } } } public void error(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { end(request, response); } public boolean read(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { throw new UnsupportedOperationException(); } @Override protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { synchronized (request) { Continuation continuation = ContinuationSupport.getContinuation (request, request); if (!continuation.isPending()) { begin(request, response); } Integer timeout = (Integer) request.getAttribute ( "org.apache.tomcat.comet.timeout" ); boolean resumed = continuation.suspend(timeout == null ? 10000 : timeout.intValue()); if (!resumed) { error(request, response); } } } public void setTimeout(HttpServletRequest request, HttpServletResponse response, int timeout) throws IOException, ServletException, UnsupportedOperationException { request.setAttribute( "org.apache.tomcat.comet.timeout" , new Integer(timeout)); } } |
這里最需要注意的是,該結構與 Tomcat 版本的代碼非常類似。begin、read、end 和 error 方法都與 Tomcat 中相同的事件匹配。該 Servlet 的 service 方法被覆蓋為在請求第一次進入時創建一個 continuation 并暫停該請求,直到超時時間已到,或者發生導致它重新開始的事件。上面沒有顯示 init 和 destroy 方法,因為它們與 Tomcat 版本是一樣的。該 servlet 使用與 Tomcat 相同的 MessageSender.因此不需要修改。注意 begin 方法如何創建 Weatherman 實例。對這個類的使用與 Tomcat 版本中也是完全相同的。甚至客戶機代碼也是一樣的。只有 servlet 有更改。雖然 servlet 的變化比較大,但是與 Tomcat 中的事件模型仍是一一對應的。
希望這足以鼓舞人心。雖然完全相同的代碼不能同時在 Tomcat 和 Jetty 中運行,但是它是非常相似的。當然,JavaEE 吸引人的一點是可移植性。大多數在 Tomcat 中運行的代碼,無需修改就可以在 Jetty 中運行,反之亦然。因此,毫不奇怪,下一個版本的 Java Servlet 規范包括異步請求處理(即 Comet 背后的底層技術)的標準化。 我們來看看這個規范:Servlet 3.0 規范。
Servlet 3.0 規范
在此,我們不深究 Servlet 3.0 規范的全部細節,只看看 Comet servlet 如果在 Servlet 3.0 容器中運行,可能會是什么樣子。注意 “可能” 二字。該規范已經發布公共預覽版,但在撰寫本文之際,還沒有最終版。因此,清單 7 顯示的是遵從公共預覽規范的一個實現。
清單 7. Servlet 3.0 Comet
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@WebServlet (asyncSupported= true , asyncTimeout= 5000 ) public class WeatherServlet extends HttpServlet { private MessageSender messageSender; // init and destroy are the same as other @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { AsyncContext async = request.startAsync(request, response); messageSender.setConnection(async); Weatherman weatherman = new Weatherman( 95118 , 32444 ); async.start(weatherman); ; } } |
值得高興的是,這個版本要簡單得多。平心而論,如果不遵從 Tomcat 的事件模型,在 Jetty 中可以有類似的實現。這種事件模型似乎比較合理,很容易在 Tomcat 以外的容器(例如 Jetty)中實現,只是沒有相關的標準。
回頭看看清單 7,注意它的標注聲明它支持異步處理,并設置了超時時間。startAsync 方法是 HttpServletRequest 上的一個新方法,它返回新的 javax.servlet.AsyncContext 類的一個實例。注意,MessageSender 現在傳遞 AsynContext 的引用,而不是 ServletResponse 的引用。在這里,不應該關閉響應,而是調用 AsyncContext 實例上的 complete 方法。還應注意,Weatherman 被直接傳遞到 AsyncContext 實例的 start 方法。這樣將在當前 ServletContext 中開始一個新線程。
而且,盡管與 Tomcat 或 Jetty 相比都有較大的不同,但是修改相同風格的編程來處理 Servlet 3.0 規范提議的 API 并不是太難。還應注意,Jetty 7 是為實現 Servlet 3.0 而設計的,目前處于 beta 狀態。但是,在撰寫本文之際,它還沒有實現該規范的最新版本。
結束語
Comet 風格的 Web 應用程序可以為 Web 帶來全新的交互性。它為大規模地實現這些特性帶來一些復雜的挑戰。但是,領先的 Java Web 服務器正在為實現 Comet 提供成熟、穩定的技術。在本文中,您看到了 Tomcat 和 Jetty 上當前風格的 Comet 的不同點和相似點,以及正在進行的 Servlet 3.0 規范的標準化。Tomcat 和 Jetty 使如今構建可伸縮的 Comet 應用程序成為可能,并且明確了未來面向 Servlet 3.0 標準化的升級路線。