TBBの並列アルゴリズム parallel_なんちゃら の中から使用頻度の高そげなparallel_reduce をご紹介。
まぁキホン parallel_for と同じく繰り返し(loop)の並列化なんだけども parallel_reduce には繰り返しの合間に reduction(リダクション:還元/縮約/要は"おまとめ")が割り込みます。
ふたつのベクトル x[0..N-1] と y[0..N-1] の内積を求めてみんとす。内積ちゅーのは:
x・y = ∑x[i]*y[i] (i = 0..N-1)
ですな。こいつはインデクスを 0..M-1 と M..N-1 の二つに分けて
∑x[i]*y[i] (i = 0..M-1) と
∑x[i]*y[i] (i = M..N-1) との
和を求めても同じ結果。いくつに分割してもかまわんから並列処理にはうってつけ。
ただし、それぞれの部分和を積算(おまとめ)するとこ(ココがreduction)では各スレッドが勝手にやらんようにガードせんならんです。
parallel_reduceにはloop範囲と初期値、処理本体およびreduction処理を与えます。
#include <iostream>
#include <array>
#include <numeric>
#include <tbb/tbb.h>
using namespace std;
// 作為的に遅い掛け算
double mult(double x, double y) {
const int N = 100000;
double sum = 0.0;
for ( int i = 0; i < N; ++i ) {
sum += x*y;
}
return sum / (double)N;
};
// フツーに内積
tbb::tick_count::interval_t
serial_inner_product(double x[], double y[], double& z, int n) {
tbb::tick_count t = tbb::tick_count::now();
double result = 0.0;
for ( int i = 0; i < n; ++i ) {
result += mult(*x,*y);
++x; ++y;
}
z = result;
return tbb::tick_count::now() - t;
}
// 並列に内積
tbb::tick_count::interval_t
parallel_inner_product(double x[], double y[], double& z, int n) {
tbb::tick_count t = tbb::tick_count::now();
z = tbb::parallel_reduce( tbb::blocked_range<int>(0,n), 0.0, // [0,n) の範囲で、初期値0.0
// 切り分けられた範囲での内積を求め
[&](const tbb::blocked_range<int>& range, const double& value) -> double {
double result = value;
for ( int i = range.begin(); i != range.end(); ++i ) {
result += mult(x[i],y[i]);
}
return result;
},
// リダクション汁
[](const double& x, const double& y) { return x + y; });
return tbb::tick_count::now() - t;
}
int main() {
// 1*1 + 2*2 + 3*3 + ... + N*N を求めるよ
const int N = 100;
array<double,N> x;
array<double,N> y;
iota(x.begin(),x.end(),1.0);
iota(y.begin(),y.end(),1.0);
double sz, pz;
std::cout << "serial: " << serial_inner_product(&x[0],&y[0],sz,N).seconds() << "[sec.]\n";
std::cout << "parallel: " << parallel_inner_product(&x[0],&y[0],pz,N).seconds() << "[sec.]\n";
std::cout << sz << '/' << pz << std::endl;
}
実行結果(dual-core)
serial: 0.0807083[sec.]
parallel: 0.0465302[sec.]
338350/338350
おー、いぃ感じにcore数なりのスピードアップなりねー
R-value reference のつづき。
先だってのエントリ、正確さに欠ける言い回しがあったかな。
R-value reference / move semantics は、右辺値からそのナカミをmoveする機能じゃありません。
右辺値であればそれは一時オブジェクトであり、いずれ捨てられる運命。
↓
ならば右辺値からナカミをひっぺがして使える。
↓
そのためには、もらった引数が右辺値であることが識別できるからくりが言語レベルで必要。
↓
右辺値を T&& にmatchさせて関数を呼び分けてあげよう。
ってゆースンポーです。実際にナカミをひっぺがすのはプログラマのお仕事。言語サイドではナカミをひっぺがす、つまり移動(move)可能ってゆー意味(semantics)を伴って呼び出されたことをプログラマに教えてあげるカラクリを用意してくれた。ってーのが右辺値参照の意味となりますか。
んでもって右辺値参照のサンプル、ショボい文字列 shobostr をこしらえてみんとす。
#include <iostream> // cout,err
#include <utility> // move
#include <cstring> // strほげほげ()
using namespace std;
// ショボい文字列
class shobostr {
private:
char* body;
public:
// デフォルト コンストラクタ
shobostr() : body(nullptr) { cerr << "ctor()\n"; }
// C文字列を食うコンストラクタ
shobostr(const char* cstr) {
cerr << "ctor(const char*)\n";
body = new char[strlen(cstr)+1];
strcpy(body,cstr);
}
// コピー コンストラクタ
shobostr(const shobostr& ss) {
cerr << "ctor(const shobostr&)\n";
body = new char[strlen(ss.body)+1];
strcpy(body,ss.body);
}
#ifdef RVR
// 右辺値参照 コンストラクタ
shobostr(shobostr&& rvr) {
cerr << "ctor(shobostr&&)\n";
body = rvr.body; // 引数から引き剥がすよ
rvr.body = nullptr;
}
#endif
// デストラクタ
~shobostr() {
cerr << "dtor() for " << (body ? body : "(null)") << '\n';
delete[] body;
}
// C文字列を代入
shobostr& operator=(const char* cstr) {
cerr << "operator=(const char*)\n";
delete[] body;
body = new char[strlen(cstr)+1];
strcpy(body,cstr);
return *this;
}
// コピー
shobostr& operator=(const shobostr& ss) {
cerr << "operator=(const shobostr&)\n";
if ( &ss != this ) {
delete[] body;
body = new char[strlen(ss.body)+1];
strcpy(body,ss.body);
}
return *this;
}
#ifdef RVR
// 右辺値参照 コピー
shobostr& operator=(shobostr&& rvr) {
cerr << "operator=(const shobostr&)\n";
if ( &rvr != this ) {
delete[] body;
body = rvr.body; // 右辺から引き剥がすよ
rvr.body = nullptr;
}
return *this;
}
#endif
// 連結
friend shobostr operator+(const shobostr& lhs, const shobostr& rhs) {
cerr << "operator+(const shobostr&,const shobostr&)\n";
shobostr result;
result.body = new char[strlen(lhs.body)+strlen(rhs.body)+1];
strcpy(result.body, lhs.body);
strcat(result.body, rhs.body);
return result;
}
};
// こっからおためし
int main() {
shobostr s0("Hello,");
shobostr s1("world");
shobostr s2(s0+s1);
}
実行結果:
--- RVR disabled ---
ctor(const char*)
ctor(const char*)
operator+(const shobostr&,const shobostr&)
ctor()
ctor(const shobostr&) ← s2:コピーコンストラクタ
dtor() for Hello,world ← +の結果を捨ててるMOTTAINAIトコ
dtor() for Hello,world
dtor() for world
dtor() for Hello,
--- RVR enabled ---
ctor(const char*)
ctor(const char*)
operator+(const shobostr&,const shobostr&)
ctor()
ctor(shobostr&&) ← s2: 右辺値参照版 コピーコンストラクタ
dtor() for (null) ← operator+の戻り値からbodyが移動してゆー
dtor() for Hello,world
dtor() for world
dtor() for Hello,