Archive for 2013年12月20日

Java EE 7 WebSocket Mail の実装

本エントリは、Java Advent Calendar 2013 の 20日目のエントリです。
昨日、19日目は 23 才のお誕生日を迎えた、 ひらおかゆみさん (id:yumix_h) の
JavaMailを手軽に使うライブラリ」でした。 yumix_h さんお誕生日おめでとうございます!!

明日は、@nagaseyasuhito さんです。

本エントリは、1つ前のエントリで投稿した「JSF + WebSocket で実装した IMAP Web メール・クライアント」の続きのエントリで、WebSocket の実装部についてご紹介します。

ただし、残念ながら今回はアンチパターンとしてのご紹介になります。
ポイントは、WebSocket のサーバ・エンドポイントの実装で別スレッドを立てて監視などを行ってはいけないという点です。

追記:2014 年 5 月 9 日 : Java Mail 1.5.2 のリリースによりアンチパターンではなくなりました。詳細は本エントリ最下部をご参照

WebSocket はリアルタイム通知を行うために多くの開発者に興味を持たれています。実際、このデモでは、IMPAの受信箱(INBOX)に入ってきたメッセージをユーザにいち早く、リアルタイムでお知らせするために、WebSocket を使用して通知しています。WebSocket を使う事でこのようなアプリケーションを作成する事もできます。

実際には、下記のように実装しています。まず、クライアント側の View の実装ですが、JSF の XHTML と JavaScript でそれぞれ実装します。
JSF のページ内で <h:outputScript> を指定し WebSocket のクライアント・エンドポイントの実装 JavaScript(ws-client-endpoint.js) を読み込んでいます。次に、PrimeFaces で用意されている、「Notification Bar」を使用します。「Notification Bar」は、下記 URL のサンプルでご参照頂ける通り、動的にパネルを表示させる事ができる、JSF のコンポーネントです。
http://www.primefaces.org/showcase/ui/notificationBar.jsf

このコンポーネントを使用して、WebSocket のサーバ・エンドポイントからメッセージを受信した際に、 JavaScript から PF(‘bar’).show() を実行してこのコンポーネントを表示させます。一方で、このコンポーネントを非表示にするためには、JavaScript で PF(‘bar’).hide()を呼び出す事で非表示にできます。

実際に、WebSocket のサーバに接続するためには、「リアルタイムチェック開始」ボタンを押下して接続します。「リアルタイムチェック開始」ボタンを押下すると JavaScript の connectServerEndpoint() が呼び出され、WebSocket のサーバ・エンドポイントに接続します。

 <h:head>
  <title>JSF-WebSocket WebMail</title>
  <f:event type="preRenderView" listener="#{messageReceiveMgdBean.onPageLoad}"/>
  <h:outputScript library="javascripts" name="ws-client-endpoint.js"/>
 </h:head>

 <h:body>
  <h:form id="form">
   <p:notificationBar position="top" effect="slide" widgetVar="bar" styleClass="top" style="background-color : #F8F8FF ; width: fit-content;">  
    <h:panelGrid columns="2" columnClasses="column" cellpadding="0">  
     <h:outputText value="新着メッセージ :" style="color: red;font-size:12px;" /> <p:commandButton value="閉じる" onclick="PF('bar').hide()" type="button" style="font-size:10px;"/>  
     <h:outputText value="Subject :" style="font-size:10px;" /><h:outputText id="wssubject" value="" style="font-size:10px;" />
     <h:outputText value="From :" style="font-size:10px;" /><h:outputText id="wsfrom" value="" style="font-size:10px;" />
     <h:outputText value="メッセージ・サマリー :" style="font-size:10px;" /><h:outputText id="wssummary" value="" style="font-size:10px;" />
    </h:panelGrid>
   </p:notificationBar>

   …中略
     <input id="connect" type="button" value="リアルタイムチェック開始" style="font-size:10px;" onClick="connectServerEndpoint();"/>
     <input id="close" type="button" value="リアルタイムチェック中止" style="font-size:10px;" onClick="closeServerEndpoint();"/>

JavaScript の実装は下記の通りです。まず、ページのロード時には、「リアルタイムチェック中止」のボタンを非表示にし「リアルタイムチェック開始」ボタンのみ表示させています。ボタンを押下すると「connectServerEndpoint()」を呼び出しますが、ここでは、WebSocket のサーバ・エンドポイントに接続し、「リアルタイムチェック開始」、「リアルタイムチェック中止」ボタンそれぞれの表示、非表示を切り替えます。
次に、サーバ・エンドポイントからメッセージを受信した場合、onMessage() 経由で writeToScreen() を呼び出しJSON のデータを展開した後、JSF のコンポーネントに対して値を代入し、 PF(‘bar’).show()で通知バー(<p:notificationBar>)を表示させます。

var websocket = null;
function init() {
 document.getElementById("close").style.display = "none";
}

function closeServerEndpoint() {
 websocket.close(4001, "Close connection from client");
 document.getElementById("connect").style.display = "block";
 document.getElementById("close").style.display = "none";
}

function connectServerEndpoint() {

 var wsUri = "ws://localhost:8080/JSF-WebSocket-Mailer/inbox-check";
 if ("WebSocket" in window) {
  websocket = new WebSocket(wsUri);
 } else if ("MozWebSocket" in window) {
  websocket = new MozWebSocket(wsUri);
 }

 websocket.onopen = function(evt) {
  onOpen(evt);
 };
 websocket.onmessage = function(evt) {
  onMessage(evt);
 };
 websocket.onerror = function(evt) {
  onError(evt);
 };
 websocket.onclose = function(evt) {
  closeServerEndpoint();
 };

 document.getElementById("connect").style.display = "none";
 document.getElementById("close").style.display = "block";
}

function onOpen(evt) {
 ;
}

function onMessage(evt) {
 writeToScreen(evt.data);
}

function onError(evt) {
 writeToScreen("ERROR: " + evt.data);
}

function writeToScreen(messages) {
 if (window.JSON)
 {
  var obj = JSON.parse(messages);
  var subject = obj.subject;
  var from = obj.address;
  var summary = obj.summary;

  document.getElementById('form:wssubject').innerHTML = subject;
  document.getElementById('form:wsfrom').innerHTML = from;
  document.getElementById('form:wssummary').innerHTML = summary;
 }
 PF('bar').show();
}
window.addEventListener("load", init, false);

次に、サーバ・エンドポイント側の実装ですが、サーバ側の実装は下記の3クラスから構成されています。

InboxCheck : WebSocket のサーバ・エンドポイントの実装
MessageEncoder : MessageからJSONを生成するエンコーダ
InboxCheckRunnableTask : INBOX を監視する並列処理用タスク

まず、InboxCheck ですが、このクラスが WebSocket のサーバ・エンドポイントの重要なクラスです。このクラスでは、クライアント・エンドポイントから接続された際に、IMAP サーバへ接続するためのユーザ名、パスワードを受け取り、IMAP サーバへ接続を行っています。
 ※ 今回、JSF のログインページで入力された、ユーザ名、パスワードをWebSocket 側でも扱えるように Session Scope に値を代入し、取り出していますが、本来 Session Scope に代入すべきではないので、別の方法 (Flash等でできれば) でユーザ名、パスワードを渡す方法を検討すべきです。今回は簡単のため、Session で扱わせて頂きました。

IMAPサーバへ接続した後、checkNewMessage() で新着メッセージを監視します。

追記:checkNewMessage() メソッドの実装は、下記の実装ではなく、Java Mail 1.5.2 でリリースされたIdleManagerクラスを使用して実装してください。詳細は本エントリの最下部に記載しています。

具体的には、下記の実装箇所で、IMAP の INBOX Folder に対して、メッセージが追加された際の処理を実装しています。

folder.addMessageCountListener(new MessageCountAdapter() {});

この内部実装では、メッセージが追加(新着メッセージが来た)された際に、最新のメッセージを取得し、その情報を WebSocket のクライアント・エンドポイントに対して配信しています。

次に、実際にメッセージが追加された事を検知するための実装を行います。これは、IMAP の IDLE (RFC 2177) 機能を使って、フォルダの変更をリアルタイムで検知します。このリアルタイム監視は、InboxCheckRunnableTask で実装し、別のスレッドで監視を行うようにします。

package jp.co.oracle.samples.websockets;

import java.io.IOException;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Resource;
import javax.annotation.security.RolesAllowed;
import javax.enterprise.concurrent.ManagedThreadFactory;
import javax.inject.Inject;
import javax.mail.Folder;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.Session;
import javax.mail.Store;
import javax.mail.event.MessageCountAdapter;
import javax.mail.event.MessageCountEvent;
import javax.websocket.EncodeException;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import javax.websocket.server.ServerEndpoint;
import jp.co.oracle.samples.mgdBean.IndexLoginMgdBean;
import jp.co.oracle.samples.tasks.InboxCheckRunnableTask;

/**
 *
 * @author Yoshio Terada
 */
@ServerEndpoint(value = "/inbox-check",
  encoders = {MessageEncoder.class})
public class InboxCheck {

 private Store store;

 private static final Logger logger = Logger.getLogger(InboxCheck.class.getPackage().getName());

 @Resource
 ManagedThreadFactory threadFactory;

 InboxCheckRunnableTask invokeCheck = null;

 @Inject
 IndexLoginMgdBean login;

 @OnOpen
 public void onOpen(javax.websocket.Session session) {
  try {
   initStore(login.getImapServer(), login.getUsername(), login.getPassword());
   checkNewMessage(session);
  } catch (MessagingException mes) {
   try {
    logger.log(Level.SEVERE, "Exception occured on monitoring INBOX ", mes);
    session.close();
   } catch (IOException ex) {
    logger.log(Level.SEVERE, "Failed to close session", ex);
   }
  }
 }

 @OnClose
 public void onClose(javax.websocket.Session session) {
  if (invokeCheck != null) {
   invokeCheck.terminateRealTimeCheck();
  }
 }

 @OnError
 public void onError(Throwable t) {
  logger.log(Level.SEVERE, "Error Occured", t);
 }
/* 本エントリの最下部に説明した IdleManager クラスを使用して実装してください
 private void checkNewMessage(final javax.websocket.Session session) throws MessagingException {

  // INBOX のフォルダを対象
  Folder folder = store.getFolder("INBOX");
  if (!folder.isOpen()) {
   folder.open(javax.mail.Folder.READ_WRITE);
  }
  // フォルダのメッセージ・カウント数を監視
  folder.addMessageCountListener(new MessageCountAdapter() {
   @Override
   public void messagesAdded(MessageCountEvent e) {
    Message[] msgs = e.getMessages();
    Message msg = msgs[msgs.length - 1];
    try {
     // WebSocket のクライアント・エンドポイントに送信
     session.getBasicRemote().sendObject(msg);
    } catch (IOException | EncodeException ioencx) {
     logger.log(Level.SEVERE, "Failed to Send Message ", ioencx);
    }
   }
  });
  // 別スレッドでメッセージの到着を監視
  newInboxCheckThreadWithRetryCount(folder);
 }

 private void newInboxCheckThreadWithRetryCount(Folder folder) {
  invokeCheck = new InboxCheckRunnableTask(folder);
  Thread runTask = threadFactory.newThread(invokeCheck);
  runTask.start();
 }
*/
 // Store の初期化(ページのロード時)
 private void initStore(String imapServer, String username, String password) throws MessagingException {
  Properties props = System.getProperties();
  props.setProperty("mail.store.protocol", "imaps");

  Session session = Session.getDefaultInstance(props, null);
  javax.mail.Store initStore = session.getStore("imaps");
  initStore.connect(imapServer, username, password);

  this.store = initStore;
 }
}

InboxCheckRunnableTask では、IDLE 機能が有効か無効かをチェックし、IDLE 機能が有効な場合、idle()で新着メッセージの到着を待ち受けます。idle() はメッセージが1通到着すると処理が終了しますので、スレッドの終了メソッドが呼び出されるか、何らかの例外が発生するまで、無限ループで繰り返し呼び出します。
一方で、IDLE 機能が無効なサーバに接続する場合は(Yahooなど)、idle() で待ち受けできませんので、自身で定期的にポーリングを行っています。

package jp.co.oracle.samples.tasks;

import com.sun.mail.imap.IMAPFolder;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.mail.Folder;
import javax.mail.MessagingException;

/**
 *
 * @author Yoshio Terada
 */
/* 本エントリの最後に記載した IdleManager で実装してください。
public class InboxCheckRunnableTask implements Runnable {

 private final static int MAIL_CHECK_IDLE_TIME = 20000;
 private Folder folder;
 private static final Logger logger = Logger.getLogger(InboxCheckRunnableTask.class.getPackage().getName());
 volatile boolean isRunnable = true;
 
 public InboxCheckRunnableTask(Folder folder) {
  this.folder = folder;
 }

 private void executeCheckForIdleDisable() throws InterruptedException, MessagingException {
  Thread.sleep(MAIL_CHECK_IDLE_TIME);
  int count = folder.getMessageCount();
 }

 public void terminateRealTimeCheck() {
  isRunnable = false;
 }

 @Override
 public void run() {

  boolean idleIsAvailable = true;
  while (isRunnable) {
   // IMAPFolder のインスタンスで isIdleEnable が true の時実行
   if (folder instanceof IMAPFolder) {
    IMAPFolder ifolder = (IMAPFolder) folder;
    if (idleIsAvailable) {
     try {
      ifolder.idle();
     } catch (javax.mail.FolderClosedException fce) {
      logger.log(Level.SEVERE, "IMAP Folder closed:", fce);
      isRunnable = false;
     } catch (MessagingException ex) {
      if (ex.getMessage().contains("IDLE not supported")) {
       idleIsAvailable = false;
      } else {
       logger.log(Level.SEVERE, "IMAP Folder & Something error occured;", ex);
       isRunnable = false;
      }
     }
    } else {
     try {
      executeCheckForIdleDisable();
     } catch (InterruptedException | MessagingException ime) {
      logger.log(Level.SEVERE, "Some error occured on executeCheckForIdleDisable() : ", ime);
      isRunnable = false;
     }
    }
   } else {
    logger.log(Level.SEVERE, "THis is not IMAP Folder.");
    isRunnable = false;
   }
  }
 }
*/
}

最後に、JavaMail の Message から JSON の文字列を生成するエンコーダを作成します。このエンコーダを作成する事で、Message オブジェクトから、JSON に変換してクライアント・エンドポイントに対してメッセージを送信できるようになります。ここでは、Java EE 7 の標準に含まれる JSON-P を使用して実装しています。

session.getBasicRemote().sendObject(msg);

package jp.co.oracle.samples.websockets;

import java.io.IOException;
import java.io.StringWriter;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonWriter;
import javax.mail.Address;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
import jp.co.oracle.samples.msgutil.MessageDumpUtil;

/**
 *
 * @author Yoshio Terada
 */
public class MessageEncoder implements Encoder.Text<Message> {

 private final static int SUMMARY_SIZE = 80;

 private static final Logger logger = Logger.getLogger(MessageEncoder.class.getPackage().getName());
 
 @Override
 public String encode(Message msg) throws EncodeException {
  try {
   // Address[] から JSon 配列を作成
   Address[] addresses = msg.getFrom();
   JsonArrayBuilder array = Json.createArrayBuilder();
   for (Address adres : addresses) {
    InternetAddress iAddress = (InternetAddress) adres;
    array.add(iAddress.toUnicodeString());
   }

   // メッセージ・サマリを取得・作成
   MessageDumpUtil dumpUtil = new MessageDumpUtil();
   String msgSummary = dumpUtil.getText(msg);

   if (!msgSummary.isEmpty() && msgSummary.length() > SUMMARY_SIZE) {
    String tmp = msgSummary.replaceAll("(\r|\n)", "");
    msgSummary = tmp.substring(0, SUMMARY_SIZE);
    msgSummary = msgSummary + "  ......";
   }

   //JSon オブジェクトを生成
   JsonObject model = Json.createObjectBuilder()
     .add("subject", msg.getSubject())
     .add("address", array)
     .add("summary", msgSummary)
     .build();

   //JSon オブジェクトから JSon 文字列を生成
   StringWriter stWriter = new StringWriter();
   try (JsonWriter jsonWriter = Json.createWriter(stWriter)) {
    jsonWriter.writeObject(model);
   }
   return stWriter.toString();
  } catch (MessagingException | IOException ex) {
   EncodeException ee = new EncodeException(msg, "Encode failed ", ex);
   logger.log(Level.SEVERE, null, ex);
   throw ee;
  }
 }

 @Override
 public void init(EndpointConfig config) {

 }

 @Override
 public void destroy() {

 }
}

上記のようにして、IMAP サーバの受信箱(INBOX)に新着メッセージが来た時点で通知を行う事ができます。しかし、このようなアプリケーションを一般公開する大規模なサービスとして実装すべきではありません。元々、WebSocket をサポートするサーバ・エンジンは大量のアクセスに対し少ないスレッドで処理を行うように、NIO で実装されている事が多いかと想定します。

GlassFish (Grizzly) の場合:
Grizzlyの概要 : C10K問題に対応するGlassFish(Grizzly)
Grizzlyの概要 2 : Java New I/Oで実装されたサーバその他、Jetty でも古くからNIOに対応しています。


今回作成した私の WebSocket で通知するアプリは、監視を行うために、新しく1つのスレッドを作成し、スレッド内で変更を監視するように実装しています。つまり、IMAP の受信箱(INBOX) の監視を行うために、ユーザ毎に新しくスレッドを生成しています。監視を行う為に別のスレッドを起こして無限ループ内で特定のイベントを監視するプログラムはよくあるかと想定しますが、そのようなアプリケーションに WebSocket は向きません。仮に ManagedExecutorService に変更し、コネクション・プールを使用しても問題は同じです。スレッド・プール数の最大までスレッドが生成されると終了です。


せっかく、サーバ・エンジンが NIO で実装して大量のアクセスに対して、少数のスレッドで処理を裁くことができるように実装されていても、個々のユーザ、もしくはリクエスト毎に、アプリケーション側でスレッドを生成すると、アプリケーション・レベルでC10K 問題が発生します。具体的には、100 人サーバに接続してきた場合に、100 スレッドが生成され、1000 人接続してきた場合、1000 スレッドが必要になります。


アクセス数が限られる環境では、こうした実装もありかもしれませんが、大規模に展開するサービスでは、リアルタイムで何らかの監視を行うために、個々にスレッドを生成するような実装はやめた方がよいと思います。WebSocket はリアルタイム監視などにもでき、簡単に実装できますと私も説明をしていますが、サーバ側の実装は十分にご検討頂いた後、作成してください。

追記:2014 年 5 月 9 日
上記の実装後、Java Mail のスペックリードに上記を解決するために、NIO を使って実装できる API を新たに作成して欲しい旨要望を出しておりました。その結果、Java Mail 1.5.2 より実験的に、IdleManager クラスが導入されました。これにより上記で実装した idle() メソッドの代わりに IdleManager で実装する事で、多くのスレッドを生成しなくてもよくなりサーバ側での C10K 問題を解決できます。詳しくは下記 IdleManager の API とそこに記載されているサンプルをご覧ください。
https://javamail.java.net/nonav/docs/api/com/sun/mail/imap/IdleManager.html

実際には微調整が必要ですが下記のような実装になります。

        @Resource
        ManagedExecutorService es;


    private void checkNewMessage(final javax.websocket.Session session) throws MessagingException, IOException {
        Properties props = session.getProperties();
        props.put("mail.event.scope", "session"); // or "application"
        props.put("mail.event.executor", es);
        //javax.mail.Session
        Session mailSession = Session.getInstance(props, null);

        // シングルトン EJB より IdleManager を取得
        final IdleManager idleManager =         
                imapIdleManager.getSingleIdleManager(mailSession, es);
        //final IdleManager idleManager = new IdleManager(mailSession, es);

        // IMAP Server へ接続
        javax.mail.Store initStore = mailSession.getStore("imaps");
        initStore.connect("imap-server.yoshio3.com", "USERNAME", "PASSWORD");

        Folder folder = store.getFolder("INBOX");
        folder.open(Folder.READ_WRITE);
        folder.addMessageCountListener(new MessageCountAdapter() {
            public void messagesAdded(MessageCountEvent ev) {
                Folder folder = (Folder)ev.getSource();
                Message[] msgs = ev.getMessages();
                System.out.println("Folder: " + folder +
                    " got " + msgs.length + " new messages");
                // process new messages
                idleManager.watch(folder); // keep watching for new messages
            }
        });
        idleManager.watch(folder);
    }

JJUG CCC の HoL で実施した WebSocket のハンズオンは、JMS を使用して単一のMDB でキューやトピックを監視しメッセージ配信を行っているため、上記のような問題はありません。

広告

2013年12月20日 at 8:45 AM 1件のコメント


Java Champion & Evangelist

ご注意

このエントリは個人の見解であり、所属する会社の公式見解ではありません

カレンダー

2013年12月
« 11月   1月 »
 1
2345678
9101112131415
16171819202122
23242526272829
3031  

カテゴリー

Twitter

clustermap

ブログ統計情報

  • 997,674 hits

Feeds