jBatch(JSR-352) on Java SE 環境

2014年2月18日 at 12:51 午後 2件のコメント

先日のデブサミの発表後、jBatch (JSR-352) についてご質問を頂き、また別件でも同じ質問を頂きましたので、その内容を共有致します。

jBatch を cron 等で実行したいのだが、jBatch は Java EE 環境でしか実行できないのか?とのご質問を頂きました。
答えは、jBatch の仕様上、Java SE 環境上でも動作するように実装されております。

ただし、Java EE 環境上で実装する方がとても簡単に実装・運用ができますので個人的にはJava EE 環境上での動作をお薦めします。仮に Java SE 環境上で jBatch (jBatch の RI)を実行したい場合は下記をご参照ください。

1. 準備

Java SE 環境上で jBatch を稼働させるためには、JavaDB(Derby) が必要です。
また、jBatch の RI を使って Java SE 環境上で動作させるためには、
jBatch RI の実行に必要なライブラリ一式を下記より入手します。

https://java.net/projects/jbatch/downloads/download/jsr352-SE-RI-1.0.zip

zip を展開すると下記のファイルが含まれています。下記全ファイルを lib 配下にコピーしてください。

  • derby.jar
  • javax.inject.jar
  • jsr352-SE-RI-javadoc.jar
  • javax.batch.api.jar
  • jsr352-RI-spi.jar
  • jsr352-SE-RI-runtime.jar

2. Batch コンテナを実行するための設定

次に Batch コンテナを稼働させるためのプロパティの設定を行います。
META-INF ディレクトリ配下に services ディレクトリを作成して
それぞれ下記のファイルを作成してください。

src/META-INF/services/batch-config.properties

JDBC_DRIVER=org.apache.derby.jdbc.EmbeddedDriver
# JDBC_URL=jdbc:derby://localhost:1527/batchdb;create=true
JDBC_URL=jdbc:derby://localhost:1527/batchdb

src/META-INF/services/batch-services.properties

J2SE_MODE=true

以上で基本的には Java SE 環境上で動作させるために必要な設定は完了です。

3. 動作確認

それでは、バッチであるファイルの内容を別のファイルに書き出すサンプルを作成します。(※ 以降は Java EE 環境での実装と同じです。)
下記に、本 jBatch プロジェクトのディレクトリ構成を下記に示します。

まず、メイン・メソッドから Batch の JOB: “my-batch-job” を起動します。

package com.yoshio3.main;

import java.util.Properties;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;

/**
 *
 * @author Yoshio Terada
 */
public class StandAloneBatchMain {

    public static void main(String... args) {
        JobOperator job = BatchRuntime.getJobOperator();
        long id = job.start("my-batch-job", new Properties());
    }    
}

この、”my-batch-job” の処理内容は、META-INF/batch-jobs ディレクトリ配下に、”my-batch-job.xml” として定義します。

“my-batch-job” の内容を下記に示します。プロパティを2つ input_file,output_file 定義し、それぞれ /tmp/input.txt, /tmp/output.txt を示します。また、JOB の step としてチャンク形式の step を1つ定義し、データの読み込み用(reader)、処理用(processor)、書き込み(writer)用の処理を、それぞれ、MyItemReader, MyItemProcessor, MyItemWriter に実装します。

<job id="my-batch-job"
     xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <properties>
        <property name="input_file" value="/tmp/input.txt"/>
        <property name="output_file" value="/tmp/output.txt"/>
    </properties>

    <step id="first-step">
        <chunk item-count="5">
            <reader    ref="com.yoshio3.chunks.MyItemReader"/>
            <processor ref="com.yoshio3.chunks.MyItemProcessor"/>
            <writer    ref="com.yoshio3.chunks.MyItemWriter"/>
        </chunk>
    </step>
</job>

読み込み用の処理は、ItemReader を実装したクラスを作成します。ここでは、input_file で指定されたプロパティのファイル(/tmp/input.txt)ファイルを読み込み、1行読み込んでその値を返します。

package com.yoshio3.chunks;

import java.io.BufferedReader;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import javax.batch.api.chunk.ItemReader;
import javax.batch.runtime.context.JobContext;
import javax.inject.Inject;

public class MyItemReader implements ItemReader {

    @Inject
    JobContext jobCtx;

    BufferedReader bufReader;

    @Override
    public void open(Serializable checkpoint) throws Exception {        
        String fileName = jobCtx.getProperties()
                .getProperty("input_file");
        bufReader = Files.newBufferedReader(Paths.get(fileName),Charset.forName("UTF-8"));
    }

    @Override
    public void close() throws Exception {
        bufReader.close();
    }

    @Override
    public Object readItem() throws Exception {
        String data = bufReader.readLine();
        System.out.println("Reader readItem : " + data);
        return data;
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }
}

次に、読み込んだデータの加工処理部分は、ItemProsessor を実装したクラスに記述します。ここでは、読み込んだデータ(文字列)に対して、文字列を付加して返しています。

package com.yoshio3.chunks;

import javax.batch.api.chunk.ItemProcessor;

/**
 *
 * @author Yoshio Terada
 */
public class MyItemProcessor implements ItemProcessor {

    @Override
    public Object processItem(Object item) throws Exception {
        String line = (String)item ;
        StringBuilder sBuilder = new StringBuilder();
        sBuilder.append("Processor processItem : ");
        sBuilder.append(line);
        String returnValue = sBuilder.toString();
        System.out.println(returnValue);
        return returnValue;
    }    
}

最後に書き出し部分を ItemWriter を実装したクラスに記述します。ここでは、oputput_file のプロパティを取得して書き出すファイル名を取得しています。次にファイルに対して取得したデータを書き出しています。

package com.yoshio3.chunks;

import java.io.BufferedWriter;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import javax.batch.api.chunk.ItemWriter;
import javax.batch.runtime.context.JobContext;
import javax.inject.Inject;

/**
 *
 * @author Yoshio Terada
 */
public class MyItemWriter implements ItemWriter {

    @Inject
    JobContext jobCtx;

    String fileName;

    BufferedWriter bufWriter;

    @Override
    public void open(Serializable checkpoint) throws Exception {
        fileName = jobCtx.getProperties()
                .getProperty("output_file");
        bufWriter = Files.newBufferedWriter(Paths.get(fileName), Charset.forName("UTF-8"));
    }

    @Override
    public void close() throws Exception {
        bufWriter.close();
    }

    @Override
    public void writeItems(List<Object> items) throws Exception {
        for (Object obj : items) {
            String data = (String) obj;
            System.out.println("Writer writeItems : " + data);

            bufWriter.write(data);
            bufWriter.newLine();
        }
    }

    @Override
    public Serializable checkpointInfo() throws Exception {
        return null;
    }
}

上記を実装した後、コンパイルをしてください。


# java -classpath
  lib/jsr352-ri-1.0/javax.inject.jar:
  lib/jsr352-ri-1.0/derby.jar:
  lib/jsr352-ri-1.0/jsr352-RI-spi.jar:
  lib/jsr352-ri-1.0/javax.batch.api.jar:
  lib/jsr352-ri-1.0/jsr352-SE-RI-javadoc.jar:
  lib/jsr352-ri-1.0/jsr352-SE-RI-runtime.jar:
  build/classes com.yoshio3.main.StandAloneBatchMain

実行すると下記のようなログを確認できます。chunk 形式ではデフォルトで 10 件まとめて読み込み&処理を実施し、
まとめて 10 件書き込むという動作を下記からも確認できるかと思います。

2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createSchema
情報: JBATCH schema does not exists. Trying to create it.
2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists
情報: CHECKPOINTDATA table does not exists. Trying to create it.
2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists
情報: JOBINSTANCEDATA table does not exists. Trying to create it.
2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists
情報: EXECUTIONINSTANCEDATA table does not exists. Trying to create it.
2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists
情報: STEPEXECUTIONINSTANCEDATA table does not exists. Trying to create it.
2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists
情報: JOBSTATUS table does not exists. Trying to create it.
2 18, 2014 12:10:50 午後 com.ibm.jbatch.container.services.impl.JDBCPersistenceManagerImpl createIfNotExists
情報: STEPSTATUS table does not exists. Trying to create it.
Reader readItem : hogehoge1
Processor processItem : hogehoge1
Reader readItem : hogehoge2
Processor processItem : hogehoge2
Reader readItem : hogehoge3
Processor processItem : hogehoge3
Reader readItem : hogehoge4
Processor processItem : hogehoge4
Reader readItem : hogehoge5
Processor processItem : hogehoge5
Reader readItem : hogehoge6
Processor processItem : hogehoge6
Reader readItem : hogehoge7
Processor processItem : hogehoge7
Reader readItem : hogehoge8
Processor processItem : hogehoge8
Reader readItem : hogehoge9
Processor processItem : hogehoge9
Reader readItem : hogehoge10
Processor processItem : hogehoge10
Writer writeItems : Processor processItem : hogehoge1
Writer writeItems : Processor processItem : hogehoge2
Writer writeItems : Processor processItem : hogehoge3
Writer writeItems : Processor processItem : hogehoge4
Writer writeItems : Processor processItem : hogehoge5
Writer writeItems : Processor processItem : hogehoge6
Writer writeItems : Processor processItem : hogehoge7
Writer writeItems : Processor processItem : hogehoge8
Writer writeItems : Processor processItem : hogehoge9
Writer writeItems : Processor processItem : hogehoge10
Reader readItem : hogehoge11
Processor processItem : hogehoge11
Reader readItem : hogehoge12
Processor processItem : hogehoge12
Reader readItem : hogehoge13
Processor processItem : hogehoge13
Reader readItem : hogehoge14
Processor processItem : hogehoge14
Reader readItem : hogehoge15
Processor processItem : hogehoge15
Reader readItem : hogehoge16
Processor processItem : hogehoge16
Reader readItem : hogehoge17
Processor processItem : hogehoge17
Reader readItem : hogehoge18
Processor processItem : hogehoge18
Reader readItem : hogehoge19
Processor processItem : hogehoge19
Reader readItem : hogehoge20
Processor processItem : hogehoge20
Writer writeItems : Processor processItem : hogehoge11
Writer writeItems : Processor processItem : hogehoge12
Writer writeItems : Processor processItem : hogehoge13
Writer writeItems : Processor processItem : hogehoge14
Writer writeItems : Processor processItem : hogehoge15
Writer writeItems : Processor processItem : hogehoge16
Writer writeItems : Processor processItem : hogehoge17
Writer writeItems : Processor processItem : hogehoge18
Writer writeItems : Processor processItem : hogehoge19
Writer writeItems : Processor processItem : hogehoge20
Reader readItem : null

書き込む間隔を変更したい場合は、Job XML の設定を変更し<chunk item-count=”5″>を設定します。

<job id="my-batch-job"
     xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
    <properties>
        <property name="input_file" value="/tmp/input.txt"/>
        <property name="output_file" value="/tmp/output.txt"/>
    </properties>

    <step id="first-step">
        <chunk item-count="5">
            <reader    ref="com.yoshio3.chunks.MyItemReader"/>
            <processor ref="com.yoshio3.chunks.MyItemProcessor"/>
            <writer    ref="com.yoshio3.chunks.MyItemWriter"/>
        </chunk>
    </step>
</job>

<chunk item-count=”5″>を設定した後、実行すると下記のような結果が得られます。

run:
Reader readItem : hogehoge1
Processor processItem : hogehoge1
Reader readItem : hogehoge2
Processor processItem : hogehoge2
Reader readItem : hogehoge3
Processor processItem : hogehoge3
Reader readItem : hogehoge4
Processor processItem : hogehoge4
Reader readItem : hogehoge5
Processor processItem : hogehoge5
Writer writeItems : Processor processItem : hogehoge1
Writer writeItems : Processor processItem : hogehoge2
Writer writeItems : Processor processItem : hogehoge3
Writer writeItems : Processor processItem : hogehoge4
Writer writeItems : Processor processItem : hogehoge5
Reader readItem : hogehoge6
Processor processItem : hogehoge6
Reader readItem : hogehoge7
Processor processItem : hogehoge7
Reader readItem : hogehoge8
Processor processItem : hogehoge8
Reader readItem : hogehoge9
Processor processItem : hogehoge9
Reader readItem : hogehoge10
Processor processItem : hogehoge10
Writer writeItems : Processor processItem : hogehoge6
Writer writeItems : Processor processItem : hogehoge7
Writer writeItems : Processor processItem : hogehoge8
Writer writeItems : Processor processItem : hogehoge9
Writer writeItems : Processor processItem : hogehoge10
Reader readItem : hogehoge11
Processor processItem : hogehoge11
Reader readItem : hogehoge12
Processor processItem : hogehoge12
Reader readItem : hogehoge13
Processor processItem : hogehoge13
Reader readItem : hogehoge14
Processor processItem : hogehoge14
Reader readItem : hogehoge15
Processor processItem : hogehoge15
Writer writeItems : Processor processItem : hogehoge11
Writer writeItems : Processor processItem : hogehoge12
Writer writeItems : Processor processItem : hogehoge13
Writer writeItems : Processor processItem : hogehoge14
Writer writeItems : Processor processItem : hogehoge15
Reader readItem : hogehoge16
Processor processItem : hogehoge16
Reader readItem : hogehoge17
Processor processItem : hogehoge17
Reader readItem : hogehoge18
Processor processItem : hogehoge18
Reader readItem : hogehoge19
Processor processItem : hogehoge19
Reader readItem : hogehoge20
Processor processItem : hogehoge20
Writer writeItems : Processor processItem : hogehoge16
Writer writeItems : Processor processItem : hogehoge17
Writer writeItems : Processor processItem : hogehoge18
Writer writeItems : Processor processItem : hogehoge19
Writer writeItems : Processor processItem : hogehoge20
Reader readItem : null

以上のように、Java SE の環境でも jBatch (JSR-352) を実行する事ができます。今回は参考のため jBatch の RI を使用しましたが、各 Java EE 7 準拠のアプリケーション・サーバで必要なライブラリはそれぞれ異なるかと想定します。必要なライブラリは各アプリケーション・サーバでお調べください。

Entry filed under: Java. Tags: , , .

Java EE 7 の新機能紹介と Java のイベントのご紹介 Java SE 8 で Charset の改良

2件のコメント

  • 1. Frank.Jiao  |  2014年2月19日 7:18 午後

    jbatchを利用しようとしていますが、ロジックとしてはcsvに数万件の伝票データを整合性チェックしDBへ挿入したいですが、チェックロジックはかなり複雑と想定しています。Multithreadをやろうとしたら、JavaEEサーバはMultithreadをやらないほうがいいという記事をみました。 JMS/MDBを利用したらいいかと思ってましたが、
    数万件の伝票データをメッセージキュー経由、クラスタを作成して
    一気に処理しDBに入れることは可能でしょうか
    最近JavaEEを興味が持ち初めて、寺田さんのプレゼンテーション資料はかなり参考になりました。ありがとうございます

    • 2. Yoshio Terada  |  2014年2月20日 6:33 午後

      ご質問の内容は、幾つかの内容が混ざってご質問頂いていますので、整理させていただきます。

      ● まず、Java EE 7 からは、コンカレンシー・ユーティリティが入っていますので、Java EE 7 環境では並列処理タスクを実装する事ができます。また GlassFish v4 の場合ではございますが、GlassFish に含まれる jBatch は内部的に Concurrency Utilities を使用して動作しています。

      ● 数万件のデータを CSV ファイルから読み込み1件ずつ JMS でメッセージプロバイダに送信し、MDB 側で受信した後 JPA で DB へにコミットする事も可能です。

      基本的に JMS/MDB の場合は、キューに入っているデータを1件ずつ処理(送受信)する必要があります。
      一方、jBatch は書き込むタイミングを設定できます。(デフォルト:10件づつ書き込み)

      その他、DB の保存に失敗した場合のリトライ・スキップの実装などの実装もJMS/MDB と jBatch では異なります。

      できるか、できないかという話であれば、どちらでもできますが、今回のような件は、恐らく jBatch の方が向いているのではないでしょうか?

      ● jBatch で CSV からデータを読み込み DB へ書き込むサンプルは下記にございますので、下記をお試しいただければ幸いです。

      https://github.com/javaee-samples/javaee7-samples/tree/master/batch/chunk-csv-database

      また、実際にパフォーマンスが向上するか否かは、実際のデータや処理内容(読み込んだデータの検証、もしくはデータの加工内容)に応じて変わるかと想定しますが、その部分の処理が比較的重いのであれば、CSV ファイルを複数に分割し、jBatch の split で並列に処理をさせる事で、並列にデータ処理(検証、加工等)を行う事も可能で効率的になるかと想定します。
      (※ あまり処理内容が重くないのであれば、並列にしても、DB へのコミット部分でボトルネックになる可能性もあるのでご注意ください。)
      バッチの並列化は下記 URL の P84 辺りをご参照ください。


Java Champion & Evangelist

Translate

ご注意

このエントリは個人の見解であり、所属する会社の公式見解ではありません

カレンダー

2014年2月
 12
3456789
10111213141516
17181920212223
2425262728  

カテゴリー

clustermap

ブログ統計情報

  • 1,288,555 hits

Feeds

アーカイブ