Archive for 2012年12月18日
Concurrency Update (jsr166e)のご紹介
この記事はJava Advent Calendar 2012 の 18 日目の記事です。
昨日は @torutk さんによる「Javaで地図表示ーGeoToolsを使って」でした。
明日は @Akira Koyasu さんです。
Java に関して何を記載しようかと悩んでいたのですが、やはり Java SE 8 のネタが良いかと思い、Java SE 8 に含まれる予定の「Concurrency Update (jsr166e)」について、かんたんにご紹介したいと思います。
Java SE 8 に含まれる予定の機能一覧は下記に記載されております。
JEP (JDK Enhancement-Proposal)
上記をご覧頂くと様々な機能拡張が施される事をご確認頂けますが、並行処理についても下記の JEP として更新される予定のようですので、少しだけご紹介したいと思います。
JEP : 155 Concurrency Updates (jsr166e)
Java SE に含まれる並行処理用の API は Java SE 5 に JSR 166: Concurrency Utilities として、java.util.concurrent パッケージで提供されました。その後、Java SE 6, 7 でもこれらのライブラリはアップデートされ、直近では Java SE 7 で jsr-166y として Fork/Join フレームワークが追加された事は記憶に新しいかと思います。
さて、Java SE 8 では jsr-166 はさらに進化して jsr-166e として機能拡張が施されようとしています。ちなみに「jsr-166e」の「e」は「Eight」つまり「8」を意味しております。jsr-166e で提供される API の一覧は下記をご参照ください。
JSR-166e の主な拡張ポイント:
● ConcurrentHashMap に対するキャッシュ指向の機能拡張
● スケーラブルで更新可能な変数
● ForkJoinPool に対する改良
これらを簡単にご紹介します。
「ConcurrentHashMap」について
ConcurrentHashMap は Java SE 5 に導入されて以降、様々な所で使われてきました。しかし ConcurrentHashMap は必要以上にメモリを消費するという問題もあり、Java SE 8 に導入予定の ConcurrentHashMap は少ないメモリで効率的に利用できるようです。またシーケンシャル処理も、並行処理もサポートし大量のコンテンツを扱う場合に有効に働きます。
ConcurrentHashMapV8 の API リファレンス
ConcurrentHashMapV8 のソースコード
「スケーラブルで更新可能な変数」
Adder(加算用の機能) について
Adder として追加される、DoubleAdder, LongAdder は Java SE 8 では java.util.concurrent.atomic パッケージ内に含まれる予定です。ここで追加される Adder (加算用の機能)はとても高性能な並行処理対応のプリミティブの加算用の機能を提供します。それぞれサンプルのコードが下記より入手できます。
DoubleAdder
LongAdder
LongAdderTable
ソースコードはコチラから 参照できます。
DoubleAdderDemo のサンプル
LongAdderDemo のサンプル
上記の、LongAdderDemo を私の環境で実行した所、複数のスレッドで AtomicLong で 10,000,000 まで加算していく場合と、LongAdder を使用して加算していく場合で下記の違いがありました。この結果を見てもわかる通り、並列で加算を行う場合、AtomicLong よりも、LongAdder を使用した方が高速に動作する事がお分かりいただけるかと思います。

Doug Lea によって書かれた LongAdder のコード:
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
import java.util.concurrent.Phaser;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import jsr166e.LongAdder;
public class LongAdderDemo {
static final int INCS_PER_THREAD = 10000000;
static final int NCPU = Runtime.getRuntime().availableProcessors();
static final ExecutorService pool = Executors.newCachedThreadPool();
public static void main(String[] args) {
System.out.println("Warmup...");
int half = NCPU > 1 ? NCPU / 2 : 1;
casTest(half, 1000);
adderTest(half, 1000);
for (int reps = 0; reps < 2; ++reps) {
System.out.println("Running...");
for (int i = 1; i <= NCPU * 2; i <<= 1) {
casTest(i, INCS_PER_THREAD);
adderTest(i, INCS_PER_THREAD);
}
}
pool.shutdown();
}
static void casTest(int nthreads, int incs) {
System.out.print("AtomicLong ");
Phaser phaser = new Phaser(nthreads + 1);
AtomicLong a = new AtomicLong();
for (int i = 0; i < nthreads; ++i)
pool.execute(new CasTask(a, phaser, incs));
report(nthreads, incs, timeTasks(phaser), a.get());
}
static void adderTest(int nthreads, int incs) {
System.out.print("LongAdder ");
Phaser phaser = new Phaser(nthreads + 1);
LongAdder a = new LongAdder();
for (int i = 0; i < nthreads; ++i)
pool.execute(new AdderTask(a, phaser, incs));
report(nthreads, incs, timeTasks(phaser), a.sum());
}
static void report(int nthreads, int incs, long time, long sum) {
long total = (long)nthreads * incs;
if (sum != total)
throw new Error(sum + " != " + total);
double secs = (double)time / (1000L * 1000 * 1000);
long rate = total * (1000L) / time;
System.out.printf("threads:%3d Time: %7.3fsec Incs per microsec: %4d\n",
nthreads, secs, rate);
}
static long timeTasks(Phaser phaser) {
phaser.arriveAndAwaitAdvance();
long start = System.nanoTime();
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
return System.nanoTime() - start;
}
static final class AdderTask implements Runnable {
final LongAdder adder;
final Phaser phaser;
final int incs;
volatile long result;
AdderTask(LongAdder adder, Phaser phaser, int incs) {
this.adder = adder;
this.phaser = phaser;
this.incs = incs;
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
LongAdder a = adder;
for (int i = 0; i < incs; ++i)
a.increment();
result = a.sum();
phaser.arrive();
}
}
static final class CasTask implements Runnable {
final AtomicLong adder;
final Phaser phaser;
final int incs;
volatile long result;
CasTask(AtomicLong adder, Phaser phaser, int incs) {
this.adder = adder;
this.phaser = phaser;
this.incs = incs;
}
public void run() {
phaser.arriveAndAwaitAdvance();
phaser.arriveAndAwaitAdvance();
AtomicLong a = adder;
for (int i = 0; i < incs; ++i)
a.getAndIncrement();
result = a.get();
phaser.arrive();
}
}
}
MaxUpdater
また、Adder (加算器)と同様に、並行処理の中でハイ・パフォーマンスに最大値を取得する API も追加されます。
DoubleMaxUpdater
LongMaxUpdater
ForkJoinPool について
新機能の追加の他、パフォーマンスが大幅に改善されています。これによって、今まで扱う場所が限定されていた ForkJoinPool が、開発場面の色々な場面で利用できるようになるようです。
最後に、
Java SE 8 では ConcurrentHashMap 等の並行処理機能も拡張・追加されていますので、ご興味ある方は是非チェックしてみてください。

