jBatch(JSR-352) on Java SE 環境
2014年2月18日 at 12:51 午後 2件のコメント
先日のデブサミの発表後、jBatch (JSR-352) についてご質問を頂き、また別件でも同じ質問を頂きましたので、その内容を共有致します。
jBatch を cron 等で実行したいのだが、jBatch は Java EE 環境でしか実行できないのか?とのご質問を頂きました。
答えは、jBatch の仕様上、Java SE 環境上でも動作するように実装されております。
ただし、Java EE 環境上で実装する方がとても簡単に実装・運用ができますので個人的にはJava EE 環境上での動作をお薦めします。仮に Java SE 環境上で jBatch (jBatch の RI)を実行したい場合は下記をご参照ください。
1. 準備
Java SE 環境上で jBatch を稼働させるためには、JavaDB(Derby) が必要です。
また、jBatch の RI を使って Java SE 環境上で動作させるためには、
jBatch RI の実行に必要なライブラリ一式を下記より入手します。
https://java.net/projects/jbatch/downloads/download/jsr352-SE-RI-1.0.zip
zip を展開すると下記のファイルが含まれています。下記全ファイルを lib 配下にコピーしてください。
- derby.jar
- javax.inject.jar
- jsr352-SE-RI-javadoc.jar
- javax.batch.api.jar
- jsr352-RI-spi.jar
- jsr352-SE-RI-runtime.jar
2. Batch コンテナを実行するための設定
次に Batch コンテナを稼働させるためのプロパティの設定を行います。
META-INF ディレクトリ配下に services ディレクトリを作成して
それぞれ下記のファイルを作成してください。
src/META-INF/services/batch-config.properties
JDBC_DRIVER=org.apache.derby.jdbc.EmbeddedDriver # JDBC_URL=jdbc:derby://localhost:1527/batchdb;create=true JDBC_URL=jdbc:derby://localhost:1527/batchdb
src/META-INF/services/batch-services.properties
J2SE_MODE=true
以上で基本的には Java SE 環境上で動作させるために必要な設定は完了です。
3. 動作確認
それでは、バッチであるファイルの内容を別のファイルに書き出すサンプルを作成します。(※ 以降は Java EE 環境での実装と同じです。)
下記に、本 jBatch プロジェクトのディレクトリ構成を下記に示します。
まず、メイン・メソッドから Batch の JOB: “my-batch-job” を起動します。
package com.yoshio3.main; import java.util.Properties; import javax.batch.operations.JobOperator; import javax.batch.runtime.BatchRuntime; /** * * @author Yoshio Terada */ public class StandAloneBatchMain { public static void main(String... args) { JobOperator job = BatchRuntime.getJobOperator(); long id = job.start("my-batch-job", new Properties()); } }
この、”my-batch-job” の処理内容は、META-INF/batch-jobs ディレクトリ配下に、”my-batch-job.xml” として定義します。
“my-batch-job” の内容を下記に示します。プロパティを2つ input_file,output_file 定義し、それぞれ /tmp/input.txt, /tmp/output.txt を示します。また、JOB の step としてチャンク形式の step を1つ定義し、データの読み込み用(reader)、処理用(processor)、書き込み(writer)用の処理を、それぞれ、MyItemReader, MyItemProcessor, MyItemWriter に実装します。
<job id="my-batch-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="input_file" value="/tmp/input.txt"/> <property name="output_file" value="/tmp/output.txt"/> </properties> <step id="first-step"> <chunk item-count="5"> <reader ref="com.yoshio3.chunks.MyItemReader"/> <processor ref="com.yoshio3.chunks.MyItemProcessor"/> <writer ref="com.yoshio3.chunks.MyItemWriter"/> </chunk> </step> </job>
読み込み用の処理は、ItemReader を実装したクラスを作成します。ここでは、input_file で指定されたプロパティのファイル(/tmp/input.txt)ファイルを読み込み、1行読み込んでその値を返します。
package com.yoshio3.chunks; import java.io.BufferedReader; import java.io.Serializable; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import javax.batch.api.chunk.ItemReader; import javax.batch.runtime.context.JobContext; import javax.inject.Inject; public class MyItemReader implements ItemReader { @Inject JobContext jobCtx; BufferedReader bufReader; @Override public void open(Serializable checkpoint) throws Exception { String fileName = jobCtx.getProperties() .getProperty("input_file"); bufReader = Files.newBufferedReader(Paths.get(fileName),Charset.forName("UTF-8")); } @Override public void close() throws Exception { bufReader.close(); } @Override public Object readItem() throws Exception { String data = bufReader.readLine(); System.out.println("Reader readItem : " + data); return data; } @Override public Serializable checkpointInfo() throws Exception { return null; } }
次に、読み込んだデータの加工処理部分は、ItemProsessor を実装したクラスに記述します。ここでは、読み込んだデータ(文字列)に対して、文字列を付加して返しています。
package com.yoshio3.chunks; import javax.batch.api.chunk.ItemProcessor; /** * * @author Yoshio Terada */ public class MyItemProcessor implements ItemProcessor { @Override public Object processItem(Object item) throws Exception { String line = (String)item ; StringBuilder sBuilder = new StringBuilder(); sBuilder.append("Processor processItem : "); sBuilder.append(line); String returnValue = sBuilder.toString(); System.out.println(returnValue); return returnValue; } }
最後に書き出し部分を ItemWriter を実装したクラスに記述します。ここでは、oputput_file のプロパティを取得して書き出すファイル名を取得しています。次にファイルに対して取得したデータを書き出しています。
package com.yoshio3.chunks; import java.io.BufferedWriter; import java.io.Serializable; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import javax.batch.api.chunk.ItemWriter; import javax.batch.runtime.context.JobContext; import javax.inject.Inject; /** * * @author Yoshio Terada */ public class MyItemWriter implements ItemWriter { @Inject JobContext jobCtx; String fileName; BufferedWriter bufWriter; @Override public void open(Serializable checkpoint) throws Exception { fileName = jobCtx.getProperties() .getProperty("output_file"); bufWriter = Files.newBufferedWriter(Paths.get(fileName), Charset.forName("UTF-8")); } @Override public void close() throws Exception { bufWriter.close(); } @Override public void writeItems(List<Object> items) throws Exception { for (Object obj : items) { String data = (String) obj; System.out.println("Writer writeItems : " + data); bufWriter.write(data); bufWriter.newLine(); } } @Override public Serializable checkpointInfo() throws Exception { return null; } }
上記を実装した後、コンパイルをしてください。
# java -classpath
lib/jsr352-ri-1.0/javax.inject.jar:
lib/jsr352-ri-1.0/derby.jar:
lib/jsr352-ri-1.0/jsr352-RI-spi.jar:
lib/jsr352-ri-1.0/javax.batch.api.jar:
lib/jsr352-ri-1.0/jsr352-SE-RI-javadoc.jar:
lib/jsr352-ri-1.0/jsr352-SE-RI-runtime.jar:
build/classes com.yoshio3.main.StandAloneBatchMain
実行すると下記のようなログを確認できます。chunk 形式ではデフォルトで 10 件まとめて読み込み&処理を実施し、
まとめて 10 件書き込むという動作を下記からも確認できるかと思います。
2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createSchema 情報: JBATCH schema does not exists. Trying to create it. 2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists 情報: CHECKPOINTDATA table does not exists. Trying to create it. 2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists 情報: JOBINSTANCEDATA table does not exists. Trying to create it. 2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists 情報: EXECUTIONINSTANCEDATA table does not exists. Trying to create it. 2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists 情報: STEPEXECUTIONINSTANCEDATA table does not exists. Trying to create it. 2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists 情報: JOBSTATUS table does not exists. Trying to create it. 2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists 情報: STEPSTATUS table does not exists. Trying to create it. Reader readItem : hogehoge1 Processor processItem : hogehoge1 Reader readItem : hogehoge2 Processor processItem : hogehoge2 Reader readItem : hogehoge3 Processor processItem : hogehoge3 Reader readItem : hogehoge4 Processor processItem : hogehoge4 Reader readItem : hogehoge5 Processor processItem : hogehoge5 Reader readItem : hogehoge6 Processor processItem : hogehoge6 Reader readItem : hogehoge7 Processor processItem : hogehoge7 Reader readItem : hogehoge8 Processor processItem : hogehoge8 Reader readItem : hogehoge9 Processor processItem : hogehoge9 Reader readItem : hogehoge10 Processor processItem : hogehoge10 Writer writeItems : Processor processItem : hogehoge1 Writer writeItems : Processor processItem : hogehoge2 Writer writeItems : Processor processItem : hogehoge3 Writer writeItems : Processor processItem : hogehoge4 Writer writeItems : Processor processItem : hogehoge5 Writer writeItems : Processor processItem : hogehoge6 Writer writeItems : Processor processItem : hogehoge7 Writer writeItems : Processor processItem : hogehoge8 Writer writeItems : Processor processItem : hogehoge9 Writer writeItems : Processor processItem : hogehoge10 Reader readItem : hogehoge11 Processor processItem : hogehoge11 Reader readItem : hogehoge12 Processor processItem : hogehoge12 Reader readItem : hogehoge13 Processor processItem : hogehoge13 Reader readItem : hogehoge14 Processor processItem : hogehoge14 Reader readItem : hogehoge15 Processor processItem : hogehoge15 Reader readItem : hogehoge16 Processor processItem : hogehoge16 Reader readItem : hogehoge17 Processor processItem : hogehoge17 Reader readItem : hogehoge18 Processor processItem : hogehoge18 Reader readItem : hogehoge19 Processor processItem : hogehoge19 Reader readItem : hogehoge20 Processor processItem : hogehoge20 Writer writeItems : Processor processItem : hogehoge11 Writer writeItems : Processor processItem : hogehoge12 Writer writeItems : Processor processItem : hogehoge13 Writer writeItems : Processor processItem : hogehoge14 Writer writeItems : Processor processItem : hogehoge15 Writer writeItems : Processor processItem : hogehoge16 Writer writeItems : Processor processItem : hogehoge17 Writer writeItems : Processor processItem : hogehoge18 Writer writeItems : Processor processItem : hogehoge19 Writer writeItems : Processor processItem : hogehoge20 Reader readItem : null
書き込む間隔を変更したい場合は、Job XML の設定を変更し<chunk item-count=”5″>を設定します。
<job id="my-batch-job" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"> <properties> <property name="input_file" value="/tmp/input.txt"/> <property name="output_file" value="/tmp/output.txt"/> </properties> <step id="first-step"> <chunk item-count="5"> <reader ref="com.yoshio3.chunks.MyItemReader"/> <processor ref="com.yoshio3.chunks.MyItemProcessor"/> <writer ref="com.yoshio3.chunks.MyItemWriter"/> </chunk> </step> </job>
<chunk item-count=”5″>を設定した後、実行すると下記のような結果が得られます。
run: Reader readItem : hogehoge1 Processor processItem : hogehoge1 Reader readItem : hogehoge2 Processor processItem : hogehoge2 Reader readItem : hogehoge3 Processor processItem : hogehoge3 Reader readItem : hogehoge4 Processor processItem : hogehoge4 Reader readItem : hogehoge5 Processor processItem : hogehoge5 Writer writeItems : Processor processItem : hogehoge1 Writer writeItems : Processor processItem : hogehoge2 Writer writeItems : Processor processItem : hogehoge3 Writer writeItems : Processor processItem : hogehoge4 Writer writeItems : Processor processItem : hogehoge5 Reader readItem : hogehoge6 Processor processItem : hogehoge6 Reader readItem : hogehoge7 Processor processItem : hogehoge7 Reader readItem : hogehoge8 Processor processItem : hogehoge8 Reader readItem : hogehoge9 Processor processItem : hogehoge9 Reader readItem : hogehoge10 Processor processItem : hogehoge10 Writer writeItems : Processor processItem : hogehoge6 Writer writeItems : Processor processItem : hogehoge7 Writer writeItems : Processor processItem : hogehoge8 Writer writeItems : Processor processItem : hogehoge9 Writer writeItems : Processor processItem : hogehoge10 Reader readItem : hogehoge11 Processor processItem : hogehoge11 Reader readItem : hogehoge12 Processor processItem : hogehoge12 Reader readItem : hogehoge13 Processor processItem : hogehoge13 Reader readItem : hogehoge14 Processor processItem : hogehoge14 Reader readItem : hogehoge15 Processor processItem : hogehoge15 Writer writeItems : Processor processItem : hogehoge11 Writer writeItems : Processor processItem : hogehoge12 Writer writeItems : Processor processItem : hogehoge13 Writer writeItems : Processor processItem : hogehoge14 Writer writeItems : Processor processItem : hogehoge15 Reader readItem : hogehoge16 Processor processItem : hogehoge16 Reader readItem : hogehoge17 Processor processItem : hogehoge17 Reader readItem : hogehoge18 Processor processItem : hogehoge18 Reader readItem : hogehoge19 Processor processItem : hogehoge19 Reader readItem : hogehoge20 Processor processItem : hogehoge20 Writer writeItems : Processor processItem : hogehoge16 Writer writeItems : Processor processItem : hogehoge17 Writer writeItems : Processor processItem : hogehoge18 Writer writeItems : Processor processItem : hogehoge19 Writer writeItems : Processor processItem : hogehoge20 Reader readItem : null
以上のように、Java SE の環境でも jBatch (JSR-352) を実行する事ができます。今回は参考のため jBatch の RI を使用しましたが、各 Java EE 7 準拠のアプリケーション・サーバで必要なライブラリはそれぞれ異なるかと想定します。必要なライブラリは各アプリケーション・サーバでお調べください。
1.
Frank.Jiao | 2014年2月19日 7:18 午後
jbatchを利用しようとしていますが、ロジックとしてはcsvに数万件の伝票データを整合性チェックしDBへ挿入したいですが、チェックロジックはかなり複雑と想定しています。Multithreadをやろうとしたら、JavaEEサーバはMultithreadをやらないほうがいいという記事をみました。 JMS/MDBを利用したらいいかと思ってましたが、
数万件の伝票データをメッセージキュー経由、クラスタを作成して
一気に処理しDBに入れることは可能でしょうか
最近JavaEEを興味が持ち初めて、寺田さんのプレゼンテーション資料はかなり参考になりました。ありがとうございます
2.
Yoshio Terada | 2014年2月20日 6:33 午後
ご質問の内容は、幾つかの内容が混ざってご質問頂いていますので、整理させていただきます。
● まず、Java EE 7 からは、コンカレンシー・ユーティリティが入っていますので、Java EE 7 環境では並列処理タスクを実装する事ができます。また GlassFish v4 の場合ではございますが、GlassFish に含まれる jBatch は内部的に Concurrency Utilities を使用して動作しています。
● 数万件のデータを CSV ファイルから読み込み1件ずつ JMS でメッセージプロバイダに送信し、MDB 側で受信した後 JPA で DB へにコミットする事も可能です。
基本的に JMS/MDB の場合は、キューに入っているデータを1件ずつ処理(送受信)する必要があります。
一方、jBatch は書き込むタイミングを設定できます。(デフォルト:10件づつ書き込み)
その他、DB の保存に失敗した場合のリトライ・スキップの実装などの実装もJMS/MDB と jBatch では異なります。
できるか、できないかという話であれば、どちらでもできますが、今回のような件は、恐らく jBatch の方が向いているのではないでしょうか?
● jBatch で CSV からデータを読み込み DB へ書き込むサンプルは下記にございますので、下記をお試しいただければ幸いです。
https://github.com/javaee-samples/javaee7-samples/tree/master/batch/chunk-csv-database
また、実際にパフォーマンスが向上するか否かは、実際のデータや処理内容(読み込んだデータの検証、もしくはデータの加工内容)に応じて変わるかと想定しますが、その部分の処理が比較的重いのであれば、CSV ファイルを複数に分割し、jBatch の split で並列に処理をさせる事で、並列にデータ処理(検証、加工等)を行う事も可能で効率的になるかと想定します。
(※ あまり処理内容が重くないのであれば、並列にしても、DB へのコミット部分でボトルネックになる可能性もあるのでご注意ください。)
バッチの並列化は下記 URL の P84 辺りをご参照ください。
http://www.slideshare.net/OracleMiddleJP/java-ee-7-45-features