ご注意(NOTE): WebSocket with Concurrency for EE
Java EE 7 のリリースが控えておりますが、先日 Java Day Tokyo で私のセッションの中でデモした WebSocket (JSR-356) と Concurrency Utilities for EE (JSR-236) を組み合わせたデモのコードについてご紹介すると共に、実装時の注意点をご報告いたします。
下記は、実際に私が WebSocket (JSR-356) と Concurrency Utilities for EE (JSR-236) を組み合わせたコードを実装する際にハマった内容をお届けします。
コードは下記のようなコードを記載しています。
package jp.co.oracle.websocket; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Resource; import javax.enterprise.concurrent.ManagedExecutorService; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import jp.co.oracle.tasks.WebSocketAIRSearchTask; import jp.co.oracle.tasks.WebSocketHotelSearchTask; @ServerEndpoint(value = "/asyncResult") public class AsyncResultWebSocketEndpoint { private static final Logger logger = Logger.getLogger(AsyncResultWebSocketEndpoint.class.getPackage().getName()); // Concurrency Utilities for EE の ManagedExecutorService をインジェクト @Resource(name = "concurrent/DefaultManagedExecutorService") ManagedExecutorService managedExecsvc; // WebSocket のコネクションがオープンした際の処理 @OnOpen public void initOpen(Session session) { executeTasks(session); } // WebSocket クライアントからメッセージを受信した際の処理 @OnMessage public void receivedMessage(String message, Session session) { if (!message.equals("re-execute")) { return; } executeTasks(session); } // 実際の処理内容 private void executeTasks(Session session) { // 複数タスクの実行の際に終わった順に処理結果を取り出す ExecutorCompletionService<String> execCompService = new ExecutorCompletionService<>(managedExecsvc); // 複数タスクの登録 List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { WebSocketHotelSearchTask task = new WebSocketHotelSearchTask(i); futures.add(execCompService.submit(task)); } try { // 終了したタスクの順番に処理結果を取得し // 処理結果を WebSocket のクライアント・エンドポイント // に対して処理結果を送信 for (Future<String> results : futures) { String resultString = execCompService.take().get(); session.getBasicRemote().sendText(resultString); } } catch (IOException | InterruptedException | ExecutionException ex) { logger.log(Level.SEVERE, null, ex); } } }
ダミーのタスク
package jp.co.oracle.tasks; import java.util.concurrent.Callable; import java.util.logging.Level; import java.util.logging.Logger; public class WebSocketHotelSearchTask implements Callable<String> { private static final Logger logger = Logger.getLogger(WebSocketHotelSearchTask.class.getPackage().getName()); private int counter; public WebSocketHotelSearchTask(int counter) { this.counter = counter; } // タスクの処理内容によっては時間のかかるタスクもあるため // 半分のタスクをわざと 4 秒待たせ、タスクの登録順にタスクが // 完了しないように作成 @Override public String call() { String result = ""; if (counter % 2 == 1) { Thread.sleep(4000); } result = "ホテル検索タスク完了 : 終了タスクの ID" + counter;; } catch (InterruptedException ex) { logger.log(Level.SEVERE, null, ex); } return result; } }
今回、なぜ下記のようにタスクの一括登録を行った後に、タスクの処理が終わった順に WebSocket のクライアント・エンドポイントに対してメッセージ配信を行うコードを実装したかというと(つまり、タスクの処理コード中から WebSocket のクライアントに対してメッセージ配信をしていない)、WebSocket 側のスレッドの制限があったためです。
// 複数タスクの登録 List<Future<String>> futures = new ArrayList<>(); for (int i = 0; i < 100; i++) { WebSocketHotelSearchTask task = new WebSocketHotelSearchTask(i); futures.add(execCompService.submit(task)); } try { // 終了したタスクの順番に処理結果を取得し // 処理結果を WebSocket のクライアント・エンドポイント // に対して処理結果を送信 for (Future<String> results : futures) { String resultString = execCompService.take().get(); session.getBasicRemote().sendText(resultString); }
当初、下記のように Runnable (Callable の call() 中で実装も可)のインタフェースを実装したタスクを作成し、タスクの処理の中で WebSocket のエンドポイントに対してメッセージを配信するコードを記載しました。例えば、下記のような感じです。
public class SomeMyTask implements Runnable{ Session session; public SomeMyTask(Session session){ this.session = session; } @Override void run(){ // 何らかの処理を実施 // タスクの処理の最後に、WebSocket のクライアント・エンドポイント // に対してメッセージを配信 session.getBasicRemote().sendText(resultString); } }
そして、下記のコードを書いてタスクを実行しました。
for( int i = 0 ; i < 10 ; i++ ){ SomeMyTask task = new SomeMyTask(session); managedExecsvc.submit(task); }
すると下記の例外が発生しました。
例外の出力内容:
java.lang.IllegalStateException: HeapBuffer has already been disposed at org.glassfish.grizzly.memory.HeapBuffer.checkDispose(HeapBuffer.java:802) at org.glassfish.grizzly.memory.HeapBuffer.position(HeapBuffer.java:188) at org.glassfish.grizzly.nio.transport.TCPNIOAsyncQueueWriter.fillByteBuffer(TCPNIOAsyncQueueWriter.java:194) at org.glassfish.grizzly.nio.transport.TCPNIOAsyncQueueWriter.writeComposite0(TCPNIOAsyncQueueWriter.java:151) at org.glassfish.grizzly.nio.transport.TCPNIOAsyncQueueWriter.write0(TCPNIOAsyncQueueWriter.java:80) at org.glassfish.grizzly.nio.AbstractNIOAsyncQueueWriter.processAsync(AbstractNIOAsyncQueueWriter.java:458) at org.glassfish.grizzly.filterchain.DefaultFilterChain.process(DefaultFilterChain.java:110) at org.glassfish.grizzly.ProcessorExecutor.execute(ProcessorExecutor.java:77) at org.glassfish.grizzly.nio.transport.TCPNIOTransport.fireIOEvent(TCPNIOTransport.java:838) at org.glassfish.grizzly.strategies.AbstractIOStrategy.fireIOEvent(AbstractIOStrategy.java:113) at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.run0(WorkerThreadIOStrategy.java:115) at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.access$100(WorkerThreadIOStrategy.java:55) at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy$WorkerThreadRunnable.run(WorkerThreadIOStrategy.java:135) at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:564) at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:544) at java.lang.Thread.run(Thread.java:722) |
なぜだろうと、Grizzly のソースコードを追ってみると、既にヒープが開放されてしまっているようです。
800 protected final void [More ...] checkDispose() { 801 if (heap == null) { 802 throw new IllegalStateException( 803 "HeapBuffer has already been disposed", 804 disposeStackTrace); 805 } 806 }
当初バグかとも思ったのですが念のため、WebSocket (JSR-356) 仕様のスレッド関連部分をチェックしてみました。すると下記の 5.1 に WebSocket に関するスレッドの注意書きが記載されておりました。
5.1 Threading Considerations Implementations of the WebSocket API may employ a variety of threading strategies in order to provide a scalable implementation. The specification aims to allow a range of strategies. However, the implementation must fulfill certain threading requirements in order to provide the developer a consistent threading environment for their applications. Unless backed by a Java EE component with a different lifecycle (See Chapter 7), the container must use a unique instance of the endpoint per peer. [WSC-5.1-1] In all cases, the implementation must not invoke an endpoint instance with more than one thread per peer at a time. [WSC-5.1-2] The implementation may not invoke the close method on an endpoint until after the open method has completed. [WSC-5.1-3] This guarantees that a websocket endpoint instance is never called by more than one container thread at a time per peer. [WSC-5.1-4] If the implementation decides to process an incoming message in parts, it must ensure that the corresponding onMessage() calls are called sequentially, and do not interleave either with parts of the same message or with other messages [WSC-5.1.5]. |
つまり、タスクの実行自身は複数のスレッドで実行できるのですが、WebSocket のクライアント・エンドポイントへのメッセージ送信は1箇所にまとめて実装しなければならない事に気付き上記のようなコードを書いています。
個人的には、マルチスレッドから WebSocket のクライアント・エンドポイントにメッセージ送信ができるようになるとより便利になるのではないかと思いますが、皆様如何でしょう? もちろん、既に仕様は FIX して Java EE 7 のリリース時点では無理ですし、Grizzly 等サーバ側の実装も今のままだと難しい部分があるかもしれません。しかし、皆様で声を上げていけば、時期 Java EE 8 の WebSocket 1.1 当たりで、マルチスレッドからのメッセージ送信もできるかも?!しれないので、賛同して頂ける方がいらっしゃったら、まとめて報告したいなと思っております。
(仕様上ダメって断られる可能性ももちろんありますが。(^_^;))
でも、昔と違って今の Java はこういった事がスペック・リードやエキスパート・グループメンバーに伝えやすい環境なんですよ!!
JJUG として Adopt-A-JSR プログラムに参加し、日本から改善要望なども出していけるといいですね。
Entry filed under: GlassFish, Java. Tags: Concurrency, GlassFish, Java EE 7, WebSocket.