3つのアルゴリズムを並列化して、並列化によってどのように処理効率が向上するのか、検証します。
はじめに
CPUは、コンピューターの頭脳です。単純に考えると、頭脳が2つあれば、1つの時と比べて、同じ時間でたくさんのことを考えることができそうです。10年ほど前、Intel製プロセッサーPentium IIの頃から、1枚のボードに複数のCPUを載せて高速化する試みがなされました。Intel製CPUでは2ユニットまででしたが、SUN Microsystems製のコンピューターでは、もっとたくさんのユニットを載せることができていました。今では、CPU自体に複数の「コア」を載せ、1ユニットで同時に別々のことを実行できるようになっています。
ハードウェアが、同時に複数のことを実行できるようになったため、ソフトウェアもそれに対応する必要が出てきました。本記事では、昨今あちらこちらで声高に繰り返される、「並列処理化すれば速くなる」に疑問を持ったため、それを検証することにします。
対象読者
なんといっても、並列処理化に興味のある方が対象です。特に、「並列化すれば速くなる」と、単純に思い込んでいる方に、読んでもらいたいと思います。
本記事では、Visual C++ 2008を元に、基本的にC言語によるコードにより、検証を行います。よって、ある程度のC言語の知識があるとよいでしょう。しかし、処理は日本語で説明をするようにしますので、C言語の文法を知っている必要はありません。
必要な環境
本記事を読む分には、特別な環境を必要としません。
本記事で紹介するコード例を試すには、Visual Studio C++ 2008を使用します。また、実際に差異を見るためには、タスクマネージャーで複数のCPU使用履歴が表示されるPCが必要です。
並列処理化の有用性を検証する
どうやって検証するか
まず、検証方法を設計します。どういう仮説があって、その仮説が正しいことを証明するためには、どの様な実験から、どの様な結果が出ればよいのか。これらを明らかにしておかなければ「検証」ではなく、「観察」になってしまいます。
本記事は「並列化すれば速くなる」という仮定を検証します。このために、いくつかのアルゴリズムについてコードを作成します。そのコードを、Open MPによってマルチスレッド化します。シングル、マルチスレッドで実行完了までにかかる時間を計測し、それぞれマルチスレッドによってどれくらい実行時間が短縮できたかを算出します。短縮時間を比べることで、マルチスレッド化がどのようなな時にも有効なのか、調査することとします。
アルゴリズム1:素数を求める
素数とはなんでしょうか。1と、その数以外の数では割り切ることができない数です。では、どうやってこれを求めましょうか。2からその数の手前までの数で順番に割ってみて、割りきれる数があるかどうか調べます。つまり、11が素数かどうか調べるには、2、3、4、5、6、7、8、9、10で割り切れるかどうかを調べます。コードで示すと次のようになります。
素数を判別するコード例
// 素数かどうか、判別する
// 戻り値:0…素数ではない / 0以外…素数
int IsPrimeNumber(const int 調べる数)
{
for (int i = 2; i < 調べる数; ++i) {
if (調べる数 % i == 0) {
return 0;
}
}
return 1;
}
素数を求めたい範囲について、for
文でループします。それぞれの数についてこの判別関数を通すことで、素数かどうかを調べます。とりあえず、この関数では「素数の数」だけ返すようにします。処理を並列化することの効率を測定するので、コンソールへの出力は行いません。コンソールへの出力がシリアル化される、コンソール出力は遅い処理である、というのが理由です。求めた素数は配列へ格納します。配列を動的に拡張すると、メモリを確保するという並列化できない要素が出てくるため、呼び出し側で確保しておくこととします。
ある数までの素数を求めるコード例
int GetPrimeNumbers(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0)
{
int count = 0;
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
++count;
if (素数配列 != NULL && index < 配列数) {
素数配列[i] = i;
}
}
}
return count;
}
これで「ある数までの素数の数を求めるコード」ができました。では、このコードが正しいかどうか、検証します。コードによって、1~20までの素数の数を求めさせます。これくらいなら、自分で計算もできるでしょう。その結果とつきあわせます。
素数(の数)を求めるコードを検証するコード
#include "stdafx.h"
#include <iostream>
#include <omp.h>
int IsPrimeNumber(const int 調べる数) をここに
int GetPrimeNumbers(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0) をここに
int _tmain(int argc, _TCHAR* argv[])
{
std::cout << GetPrimeNumbers(2) << "個/2…1\n";
std::cout << GetPrimeNumbers(3) << "個/3…2\n";
std::cout << GetPrimeNumbers(4) << "個/4…2\n";
std::cout << GetPrimeNumbers(5) << "個/5…3\n";
std::cout << GetPrimeNumbers(6) << "個/6…3\n";
std::cout << GetPrimeNumbers(7) << "個/7…4\n";
std::cout << GetPrimeNumbers(8) << "個/8…4\n";
std::cout << GetPrimeNumbers(9) << "個/9…4\n";
std::cout << GetPrimeNumbers(10) << "個/10…4\n";
std::cout << GetPrimeNumbers(11) << "個/11…5\n";
std::cout << GetPrimeNumbers(12) << "個/12…5\n";
std::cout << GetPrimeNumbers(13) << "個/13…6\n";
std::cout << GetPrimeNumbers(14) << "個/14…6\n";
std::cout << GetPrimeNumbers(15) << "個/15…6\n";
std::cout << GetPrimeNumbers(16) << "個/16…6\n";
std::cout << GetPrimeNumbers(17) << "個/17…7\n";
std::cout << GetPrimeNumbers(18) << "個/18…7\n";
std::cout << GetPrimeNumbers(19) << "個/19…8\n";
std::cout << GetPrimeNumbers(20) << "個/20…8\n";
return 0;
}
実行結果
1個/2…1
2個/3…2
2個/4…2
3個/5…3
3個/6…3
4個/7…4
4個/8…4
4個/9…4
4個/10…4
5個/11…5
5個/12…5
6個/13…6
6個/14…6
6個/15…6
6個/16…6
7個/17…7
7個/18…7
8個/19…8
8個/20…8
コードが正しいことが検証できました。これを基に、マルチスレッド化のためのコードを追加します。
GetPrimeNumbersをマルチスレッド化する
int GetPrimeNumbersParallel(const int この数まで, int *素数配列 = NULL, const int 配列数 = 0)
{
int count = 0;
#pragma omp parallel for reduction(+:count)
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
++count;
if (素数配列 != NULL && i < 配列数) {
素数配列[i] = i;
}
}
}
return count;
}
これも、検証しておきます。GetPrimeNumbers
がシングルスレッドで動作することと、GetPrimeNumbersParallel
がマルチスレッドで動作しているかを確認します。確認を行う土台が、確認を行う目的にとって正しいことを確認しておかないと、何を確認しているのかわからなくなります。
次のように_tmain
関数を書き換え、実行します。実行中、タスクマネージャーの[プロセス]タブで実行しているプロセスのスレッド数と、CPU使用率を見ます。スレッド数は、初期状態では表示されていないので、[表示]メニューから[列の選択]を選び、一覧から[スレッド数]を探してチェックします。シングルスレッドの方は、スレッド数が“1”、CPU使用率は“(100/CPU数)%”であることを確認します。また、マルチスレッドの方は、スレッド数が“CPU数”、CPU使用率が“100%”であることを確認します。もし、マルチスレッド側のスレッド数が“1”の場合、プロジェクトのプロパティで、[構成プロパティ]→[C/C++]→[言語]を開き、[OpenMPサポート]が「はい(/openmp)」になっていることを確認してください。
シングルスレッドであることを確認する
int _tmain(int argc, _TCHAR* argv[])
{
GetPrimeNumbers(300000);
return 0;
}
マルチスレッドであることを確認する
int _tmain(int argc, _TCHAR* argv[])
{
GetPrimeNumbersParallel(300000);
return 0;
}
アルゴリズム2:FizzBuzz問題
FizzBuzz問題はご存知ですね。1から任意の数までについて出力します。しかし、3の倍数の時は「Fizz」、5の倍数の時は「Buzz」と出力します。これを単純にコード化すると、次のようになります。
FizzBuzz問題のコード
void FizzBuzz(const int この数まで)
{
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
std::cout << "FizzBuzz ";
} else if (i % 3 == 0) {
std::cout << "Fizz ";
} else if (i % 5 == 0) {
std::cout << "Buzz ";
} else {
std::cout << i << " ";
}
}
}
しかし、こうすると、出力結果でコンソールが大変なことになります。また、このように出力すると並列化したときに「実行順」が問題になります。そのため、ここでは文字列領域を確保し、そこに入れていくことにします。これも動作の検証をして、並列化したコードも作ります。ここでは動作検証は省きますが、実際にやってみる場合は、しっかり検証してください。
FizzBuzz問題のコード(文字列格納型)
#define FIZZBUZZMAX 5000000
char FB結果[FIZZBUZZMAX][12];
void FizzBuzz(const int この数まで)
{
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
void FizzBuzzParallel(const int この数まで)
{
#pragma omp parallel for
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
アルゴリズム3:バブルソート
3つ目のアルゴリズムは、バブルソートです。
バブルソートのアルゴリズムをおさらいしましょう。配列に数値が並んでいます。この数値を、小さい順に並べるとします。まず、配列の1番後ろと、その1つ手前を比べます。後ろにある方が大きければ、入れ替えます。次に、後ろから2番目と、その1つ手前を比べます。そして、1つ手前の方が大きければ入れ替えます。これを、最初の要素まで繰り返します。最初までくると、「1番小さい数値」が決定します。そこで今度は、「2番目に小さい数値」を決定すべく、また1番後ろから比較を繰り返します。
バブルソートのコード
#define BSORTNUM 100000
int BS配列[BSORTNUM];
void BubbleSort(int* 対象配列, const int 配列数)
{
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
for (int y = 配列数 - 1; y > 決定済み数; --y) {
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
void BubbleSortParallel(int* 対象配列, const int 配列数)
{
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
#pragma omp parallel for
for (int y = 配列数 - 1; y > 決定済み数; --y) {
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
さて、これも検証しておきます。すると、並列化した方で、結果がおかしいことが分かります。以下の結果は、BSORTNUMを100として検証したときの結果です。「BS配列」は、BSORTNUM~1で初期化するようにしました。
バブルソートの検証をするコード
void Shuffle(int *対象配列, const int 配列数)
{
for (int i = 配列数 - 1; i >= 0; --i) {
対象配列[i] = 配列数 - i;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
for (int i = 0; i < 1; ++i) {
Shuffle(BS配列, BSORTNUM);
std::cout << "ソート前\n";
for (int i = 0; i < BSORTNUM; ++i) {
std::cout << BS配列[i] << "\t";
}
std::cout << "\n----------\nソート後\n";
BubbleSortParallel(BS配列, BSORTNUM);
}
for (int i = 0; i < BSORTNUM; ++i) {
std::cout << BS配列[i] << "\t";
}
return 0;
}
実行結果(一例)
ソート前
88 25 18 83 54 90 79 2 49 73
94 92 13 14 97 52 96 75 27 38
100 39 61 40 59 24 7 26 29 30
42 31 23 55 53 36 37 91 11 98
62 74 43 66 71 46 47 95 58 16
51 45 76 6 99 17 19 12 82 60
33 41 35 64 84 5 67 22 68 70
50 72 10 65 80 86 77 44 9 78
81 85 93 32 87 69 8 1 89 3
20 15 4 63 48 56 28 21 34 57
----------
ソート後
1 2 3 4 4 5 6 7 8 9
10 11 12 13 14 15 16 17 18 19
20 21 21 22 23 24 25 26 27 28
29 30 31 33 34 35 36 37 38 39
40 41 42 43 44 45 46 47 48 49
88 50 51 83 54 90 79 52 53 73
94 92 55 56 97 57 96 75 58 59
100 60 61 62 63 64 65 66 67 68
69 70 71 72 74 76 77 91 78 98
80 81 82 84 86 87 89 95 93 99
検証するクセを付けておいてよかったでしょ?
実は、バブルソートのアルゴリズムは、並列化に向いていません。複数の並列に行われる処理が、同じ領域を書き換えようとするためです。上の実行結果では、「4」が2回現れています。代わりに、32がなくなっています。「4」のことを除くと50までは正しいように思われますが、51以上は並び方がおかしいです。これは「対象配列[y]
」と「対象配列[y-1]
」の比較入れ替えで、複数のスレッドが、y
の値が同じところを操作しようとしたためです。
これを防ぐために、バブルソートのアルゴリズムについて、調べなければなりません。先に書いているように、バブルソートでは、検査をする場所を、順番に調べなければなりません。単純に並列化すると、実行する順序は保障されません。実行順序が保障されない、すなわち検査する順序が前後すると、並びがおかしくなるケースが発生します。
OpenMPでは、ordered
句とordered
ディレクティブを使うことで、順番に実行される事を保障します。まず、for
ディレクティブにordered
句を指定して、「順番に実行されるブロックがある」ことを指示します。次に、ordered
ディレクティブによって、順番に実行されるブロックを明示します。
バブルソートのコード(順序づけとマーク)
void BubbleSortParallel(int* 対象配列, const int 配列数)
{
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
#pragma omp parallel for ordered
for (int y = 配列数 - 1; y > 決定済み数; --y) {
#pragma omp ordered
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
実行結果(一例)
ソート前
63 2 3 100 5 81 16 82 86 53
11 1 13 14 34 35 58 74 19 40
8 22 23 24 73 18 27 28 9 30
55 56 33 76 99 12 98 4 26 90
68 37 57 44 67 95 47 48 32 69
51 52 78 54 80 38 94 17 59 60
72 49 43 64 65 66 45 41 50 70
88 7 25 84 77 15 62 10 36 92
6 21 83 61 85 29 87 42 89 96
91 20 93 79 46 31 97 71 39 75
----------
ソート後
1 2 3 4 5 6 7 8 9 10
11 12 13 14 15 16 17 18 19 20
21 22 23 24 25 26 27 28 29 30
31 32 33 34 35 36 37 38 39 40
41 42 43 44 45 46 47 48 49 50
51 52 53 54 55 56 57 58 59 60
61 62 63 64 65 66 67 68 69 70
71 72 73 74 75 76 77 78 79 80
81 82 83 84 85 86 87 88 89 90
91 92 93 94 95 96 97 98 99 100
さて、一通りできました。ここまでのコードを、もう一度掲示しておきます。
検証コード(全体)
#include "stdafx.h"
#include <Windows.h>
#include <iostream>
#include <math.h>
#include <omp.h>
#include <sys/types.h>
#include <sys/timeb.h>
#include <time.h>
#include <stdlib.h>
#define PRIMENUMBERS 500000
#define FIZZBUZZMAX 30000000
#define BSORTNUM 30000
#define CHECKNUM 5
char FB結果[FIZZBUZZMAX][12];
int BS配列[BSORTNUM];
int PR配列[PRIMENUMBERS];
#define TmToSec(X) ((X).time + (X).millitm / 1000.0)
#define DiffTmSec(START, STOP) (TmToSec(STOP) - TmToSec(START))
// 素数かどうか、判別する
// 戻り値:0…素数ではない / 0以外…素数
int IsPrimeNumber(const int 調べる数)
{
for (int i = 2; i < 調べる数; ++i) {
if (調べる数 % i == 0) {
return 0;
}
}
return 1;
}
int IsPrimeNumberFast(const int 調べる数)
{
if (調べる数 == 2) { return 1;}
if (調べる数 % 2 == 0) { return 0;}
int limit = (int) sqrt((double)調べる数);
for (int i = 3; i < limit; i += 2) {
if (調べる数 % i == 0) {
return 0;
}
}
return 1;
}
int GetPrimeNumbers(const int この数まで,
int *素数配列 = NULL, const int 配列数 = 0)
{
int count = 0;
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
++count;
if (素数配列 != NULL && i < 配列数) {
素数配列[i] = i;
}
}
}
return count;
}
int GetPrimeNumbersParallel(const int この数まで,
int *素数配列 = NULL, const int 配列数 = 0)
{
int count = 0;
#pragma omp parallel for reduction(+:count) //schedule(guided)
for (int i = 2; i <= この数まで; ++i) {
if (IsPrimeNumber(i) != 0) {
++count;
if (素数配列 != NULL && i < 配列数) {
素数配列[i] = i;
}
}
}
return count;
}
void FizzBuzz(const int この数まで)
{
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
void FizzBuzzParallel(const int この数まで)
{
#pragma omp parallel for
for (int i = 1; i <= この数まで; ++i) {
if (i % 15 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "FizzBuzz");
} else if (i % 3 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Fizz");
} else if (i % 5 == 0) {
strcpy_s(FB結果[i], sizeof(FB結果[i]), "Buzz");
} else {
sprintf_s(FB結果[i], sizeof(FB結果[i]), "%d", i);
}
}
}
void BubbleSort(int* 対象配列, const int 配列数)
{
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
for (int y = 配列数 - 1; y > 決定済み数; --y) {
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
void BubbleSortParallel(int* 対象配列, const int 配列数)
{
for (int 決定済み数 = 0; 決定済み数 < 配列数; ++決定済み数) {
#pragma omp parallel for ordered
for (int y = 配列数 - 1; y > 0; --y) {
#pragma omp ordered
if (対象配列[y] < 対象配列[y - 1]) {
int tmp = 対象配列[y];
対象配列[y] = 対象配列[y - 1];
対象配列[y - 1] = tmp;
}
}
}
}
void Shuffle(int *対象配列, const int 配列数)
{
for (int i = 配列数 - 1; i >= 0; --i) {
対象配列[i] = 配列数 - i;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
struct _timeb startTm, stopTm; // ストップウォッチ用
std::cout << "素数の数を求める範囲\t" << PRIMENUMBERS << "\n";
std::cout << "FizzBuzz を求める範囲\t" << FIZZBUZZMAX << "\n";
std::cout << "Bubble Sort の要素数\t" << BSORTNUM << "\n";
std::cout << "繰り返し回数\t" << CHECKNUM << "\n\n";
/**/
double bsSeri = 0.0;
for (int i = 0; i < CHECKNUM; ++i) {
Shuffle(BS配列, BSORTNUM);
_ftime_s(&startTm);
BubbleSort(BS配列, BSORTNUM);
_ftime_s(&stopTm);
std::cout << "BubbleSort シリアル\t"
<< DiffTmSec(startTm, stopTm) << " sec.\n";
bsSeri += DiffTmSec(startTm, stopTm);
}
/**/
double bsPall = 0.0;
for (int i = 0; i < CHECKNUM; ++i) {
Shuffle(BS配列, BSORTNUM);
_ftime_s(&startTm);
BubbleSortParallel(BS配列, BSORTNUM);
_ftime_s(&stopTm);
std::cout << "BubbleSort パラレル\t"
<< DiffTmSec(startTm, stopTm) << " sec.\n";
bsPall += DiffTmSec(startTm, stopTm);
for (int j = 0; j < BSORTNUM - 1; ++j) {
if (BS配列[j] + 1 != BS配列[j+1]) {
std::cout << "wrong:" << j << "\n";
}
}
}
/**/
double fbSeri = 0.0;
for (int i = 0; i < CHECKNUM; ++i) {
_ftime_s(&startTm);
FizzBuzz(FIZZBUZZMAX);
_ftime_s(&stopTm);
std::cout << "FizzBuzz シリアル\t"
<< DiffTmSec(startTm, stopTm) << " sec.\n";
fbSeri += DiffTmSec(startTm, stopTm);
}
double fbPall = 0.0;
for (int i = 0; i < CHECKNUM; ++i) {
_ftime_s(&startTm);
FizzBuzzParallel(FIZZBUZZMAX);
_ftime_s(&stopTm);
std::cout << "FizzBuzz パラレル\t"
<< DiffTmSec(startTm, stopTm) << " sec.\n";
fbPall += DiffTmSec(startTm, stopTm);
}
/**/
int cnt;
double pnSeri = 0.0;
for (int i = 0; i < CHECKNUM; ++i) {
_ftime_s(&startTm);
cnt = GetPrimeNumbers(PRIMENUMBERS);
_ftime_s(&stopTm);
std::cout << "素数 シリアル " << cnt << "個\t"
<< DiffTmSec(startTm, stopTm) << " sec.\n";
pnSeri += DiffTmSec(startTm, stopTm);
}
double pnPall = 0.0;
for (int i = 0; i < CHECKNUM; ++i) {
_ftime_s(&startTm);
cnt = GetPrimeNumbersParallel(PRIMENUMBERS);
_ftime_s(&stopTm);
std::cout << "素数 パラレル " << cnt << "個\t"
<< DiffTmSec(startTm, stopTm) << " sec.\n";
pnPall += DiffTmSec(startTm, stopTm);
}
std::cout << "\n";
std::cout << "BubbleSort シリアル平均 "
<< (bsSeri / (float)CHECKNUM) << " sec.\n";
std::cout << "BubbleSort パラレル平均 "
<< (bsPall / (float)CHECKNUM) << " sec.\n";
std::cout << "FizzBuzz シリアル平均 "
<< (fbSeri / (float)CHECKNUM) << " sec.\n";
std::cout << "FizzBuzz パラレル平均 "
<< (fbPall / (float)CHECKNUM) << " sec.\n";
std::cout << "素数 シリアル平均 "
<< (pnSeri / (float)CHECKNUM) << " sec.\n";
std::cout << "素数 パラレル平均 "
<< (pnPall / (float)CHECKNUM) << " sec.\n";
std::cout << "\n";
std::cout << "BubbleSort 処理効率 " << (bsSeri / bsPall) << "\n";
std::cout << "FizzBuzz 処理効率 " << (fbSeri / fbPall) << "\n";
std::cout << "素数 処理効率 " << (pnSeri / pnPall) << "\n";
/**/
return 0;
}
実行結果(一例)
素数の数を求める範囲 500000
FizzBuzz を求める範囲 30000000
Bubble Sort の要素数 30000
繰り返し回数 5
BubbleSort シリアル 1.326 sec.
BubbleSort シリアル 1.201 sec.
BubbleSort シリアル 1.497 sec.
BubbleSort シリアル 1.28 sec.
BubbleSort シリアル 1.216 sec.
BubbleSort パラレル 31.934 sec.
BubbleSort パラレル 33.521 sec.
BubbleSort パラレル 34.381 sec.
BubbleSort パラレル 33.757 sec.
BubbleSort パラレル 33.679 sec.
FizzBuzz シリアル 5.556 sec.
FizzBuzz シリアル 5.291 sec.
FizzBuzz シリアル 5.322 sec.
FizzBuzz シリアル 5.337 sec.
FizzBuzz シリアル 5.291 sec.
FizzBuzz パラレル 3.106 sec.
FizzBuzz パラレル 3.09 sec.
FizzBuzz パラレル 3.106 sec.
FizzBuzz パラレル 3.09 sec.
FizzBuzz パラレル 3.121 sec.
素数 シリアル 41538個 52.922 sec.
素数 シリアル 41538個 52.735 sec.
素数 シリアル 41538個 52.804 sec.
素数 シリアル 41538個 52.737 sec.
素数 シリアル 41538個 52.939 sec.
素数 パラレル 41538個 39.115 sec.
素数 パラレル 41538個 40.582 sec.
素数 パラレル 41538個 40.519 sec.
素数 パラレル 41538個 39.583 sec.
素数 パラレル 41538個 40.921 sec.
BubbleSort シリアル平均 1.304 sec.
BubbleSort パラレル平均 33.4544 sec.
FizzBuzz シリアル平均 5.3594 sec.
FizzBuzz パラレル平均 3.1026 sec.
素数 シリアル平均 52.8274 sec.
素数 パラレル平均 40.144 sec.
BubbleSort 処理効率 0.0389784
FizzBuzz 処理効率 1.72739
素数 処理効率 1.31595
検証
さあ、結果を検証しましょう。それぞれのアルゴリズムの主要な要素数を変更して、実行しました。結果の一例を、表形式で表します。
Bubble Sortの計測結果
要素数 |
10000 |
20000 |
30000 |
1回目 |
0.4060 |
6.1150 |
1.6380 |
24.0090 |
3.6510 |
54.057 |
2回目 |
0.4210 |
6.2410 |
1.6530 |
25.3200 |
3.8530 |
56.287 |
3回目 |
0.4060 |
6.2560 |
1.7010 |
25.3660 |
3.9000 |
56.474 |
4回目 |
0.4050 |
6.2250 |
1.6380 |
25.1940 |
3.6820 |
56.443 |
5回目 |
0.4210 |
6.3180 |
1.6690 |
25.3040 |
3.8070 |
56.459 |
FizzBuzzの計測結果
要素数 |
10000000 |
20000000 |
30000000 |
1回目 |
3.4790 |
1.9350 |
7.7220 |
3.8070 |
11.384 |
5.9430 |
2回目 |
3.4330 |
1.8870 |
7.9250 |
3.8370 |
10.920 |
5.8820 |
3回目 |
3.4320 |
1.8570 |
7.3910 |
3.8380 |
10.921 |
5.8190 |
4回目 |
3.4320 |
1.9190 |
7.3950 |
3.8220 |
10.874 |
5.8190 |
5回目 |
3.4480 |
1.9030 |
7.5290 |
3.8220 |
10.921 |
5.8040 |
素数(の数)を求めるの計算結果
要素数 |
100000 |
300000 |
500000 |
1回目 |
2.6990 |
2.3400 |
22.7450 |
15.5390 |
52.4810 |
38.5330 |
2回目 |
2.6990 |
2.3240 |
21.2480 |
15.4930 |
52.4970 |
38.7670 |
3回目 |
2.7140 |
2.3250 |
20.9510 |
15.4150 |
52.6060 |
39.0010 |
4回目 |
2.7150 |
2.3870 |
21.0290 |
15.4920 |
52.4500 |
38.9240 |
5回目 |
2.6980 |
2.4960 |
20.9980 |
15.8210 |
52.5430 |
38.8140 |
バブルソートについては、並列化をする方が遅い結果となりました。これは、並列化して実行する処理のほとんどを順次実行としたため、順番を守るために待ち合わせが発生しているためと考えられます。なお、この例ではループを単純に並列化しようとしたため、この様な結果になっています。パイプライン並列化する等、並列化の方法を変更すると、並列化の効率が良くなります。
素数(の数)を求めると、FizzBuzzで、並列化による効果が異なります。FizzBuzzでは1.8倍の効果が出ていますが、素数(の数)を求めるでは、1.3倍にとどまっています。並列化で実行されるループの中の処理が、ループの後ろほど数が多いことが原因です。
素数(の数)を求める処理では、「2以下の素数の数」「3以下の素数の数」…「100以下の素数の数」と数が進むにつれて、だんだんと調べる数が多くなります。Open MPのfor
ディレクティブでは、1つのスレッドが実行するループの回数を制御します。例えば、全体で千回のループであれば、(1000/論理CPU数)回のループを(論理CPU数)回繰り返すような動作をします。この(論理CPU数)回の繰り返しを、並列に処理するわけです。今、50万以下の素数を求めるわけですが、論理CPU数が2の環境では、「25万未満」と、「25万以上50万以下」の2つに分けて実行します。すると、「25万未満」を担当するスレッドは、「25万以上50万以下」を担当するスレッドよりも、処理の時間が圧倒的に短くなります。すなわち、いずれかのスレッドの負荷だけが上がり、負荷の平準化がされないために、効率が上がらないのです。これを、「100ずつ」に区切って5,000回のタスクに分けると、1つめのタスクと5,000個目のタスクでは処理にかかる時間が大きく異なりますが、1つめのタスクと2つめのタスクでは、わずかな差しかありません。2つのスレッドに負荷を平準化して掛けることにより、処理効率を上げることができます。今回のコードでは、こういう処理をしていないため、1.3倍の効率しか出ていません。schedule句によって、「ループの回数を小さくする(schedule(static)
)」か、「ループの後ろの方ほどループ回数を小さくする(schedule(guided)
)」ことで、実行効率を上げることが出来ます。
まとめ
この結果から、何でもかんでも並列化すれば、等しく速くなるわけではない、ということが分かります。並列化に向いているアルゴリズム、向いていないアルゴリズムがあります。また、並列化にむいているアルゴリズムであっても、十分に検討しなければ思わぬバグを作り込んだり、思ったほど効率が上がらないことになります。事前にしっかりと調査し、コードの設計をすることが必要です。
並列化する前に
- パフォーマンスを計測し、高速化しなければならないか、検討する
- 10ミリ秒で終わる処理を5ミリ秒にしたところで、どれほどの意味があるでしょうか。1分かかるところを40秒で処理できるようにするのは、意味があるでしょう。すなわち、「何倍」という倍率よりも、体感できる時間を短くする方が大事です。
- より効率化された書き方がないか、調査する
- 例示した「素数か判定する」ルーチンは、判定に使う数は判定したい数の平方根を超えない整数まででかまいません(
IsPrimeNumberFast
関数を参照)。並列化ではたかだか論理CPU数倍にしかなりませんが、IsPrimeNumberFast
関数を使うと、実行効率は100倍以上になります。
- ここでは、「並列化に向かないアルゴリズム」の説明として、複数のスレッドが同時に同じ領域を変更する例として、バブルソートを選びました。ソートをするなら、クイックソートやマージソートなど、より効率的なアルゴリズムがあります。
- 処理を、より単純化する
- 並列化できる箇所を明確にする
- 本文では、いくつかの並列化できない箇所を、予め廃止するか並列化しない箇所へ退避させています。コンソール出力をやめる、メモリの確保を前もって行うなどです。
- 並列に動作すると不具合を引き起こす箇所がないか、検証する
- 処理順序が一定であることに依存しないか、検証します。FizzBuzz問題を、ループ内でコンソールに出力すると、処理順序が一定ではないことが問題になります。そのため、決められた位置へ格納するというアルゴリズムに変更しています。
- 複数のスレッドで、同じ領域を書き換えることがないか、検証します(「生産者と消費者問題」で検索して下さい)。
- 不具合を起こす箇所を一か所にまとめ、クリティカルブロックとする
- デッドロックしないように、慎重に設計します(「哲学者の食事問題」で検索して下さい)。
- クリティカルブロックの処理時間と、並列化している箇所の処理時間を比較する
- クリティカルブロックの処理時間が長い場合、並列化をあきらめる
謝辞
本記事は、一旦ブログにて公開し、コミュニティの皆さまよりご意見をいただいて、より読みやすいように修正しました。ご協力くださった方々に、この場を借りてお礼申し上げます。
投稿日時 : 2010年2月5日 23:42