Worker Thread パターンは、スレッドをあらかじめ起動しておくことにより、スレッドの起動するコストを削減することと、スレッドの数を制御することが出来るパターンです。
Boost.Function を使ってある程度汎用的にするとこんな感じになりました。
class thread_pool
{
private:
std::queue<boost::function0<void> > queue_;
boost::thread_group group_;
boost::mutex mutex_;
boost::condition condition_;
public:
thread_pool(int size)
{
for (int i = 0; i < size; i++)
{
group_.create_thread(boost::bind(&thread_pool::run, this));
}
}
void put(boost::function0<void> f)
{
boost::mutex::scoped_lock lock(mutex_);
queue_.push(f);
condition_.notify_all();
}
private:
boost::function0<void> take()
{
boost::mutex::scoped_lock lock(mutex_);
while (queue_.empty())
{
condition_.wait(lock);
}
boost::function0<void> f = queue_.front();
queue_.pop();
return f;
}
void run()
{
while (true)
{
take()();
}
}
};
boost::mutex g_mutex;
void func(const std::string& str, int num, int count)
{
int n = 0;
for (int i = 0; i < num; i++)
{
n += count;
{
boost::mutex::scoped_lock lock(g_mutex);
std::cout << (boost::format("%1% %2%/%3% (%4%)") % str % i % num % ::GetCurrentThreadId()) << std::endl;
}
::Sleep(1000);
}
{
boost::mutex::scoped_lock lock(g_mutex);
std::cout << (boost::format("%1% %2%/%2% (%3%) result: %4%") % str % num % ::GetCurrentThreadId() % n) << std::endl;
}
}
class hoge
{
private:
int num_;
int count_;
public:
hoge(int num, int count) : num_(num), count_(count) { }
void func(const std::string& str) { ::func(str, num_, count_); }
};
void main()
{
thread_pool tp(3);
tp.put(boost::bind(func, "request1:", 5, 10));
tp.put(boost::bind(func, "request2:", 5, 10));
::Sleep(3000);
hoge h(3, 5);
tp.put(boost::bind(&hoge::func, &h, "request3:"));
tp.put(boost::bind(&hoge::func, &h, "request4:"));
::Sleep(7000);
}
// 実行結果
request1: 0/5 (3756)
request2: 0/5 (2708)
request1: 1/5 (3756)
request2: 1/5 (2708)
request1: 2/5 (3756)
request2: 2/5 (2708)
request3: 0/3 (1960) // スレッドプールの空きがないので request4 は開始しない
request1: 3/5 (3756)
request2: 3/5 (2708)
request3: 1/3 (1960)
request1: 4/5 (3756)
request2: 4/5 (2708)
request3: 2/3 (1960)
request1: 5/5 (3756) result: 50
request2: 5/5 (2708) result: 50
request4: 0/3 (3756) // スレッドプールの空きが出来たので request4 が開始された
request3: 3/3 (1960) result: 15
request4: 1/3 (3756)
request4: 2/3 (3756)
request4: 3/3 (3756) result: 15
ほんとは thread_pool クラスのスレッドを安全に終了するための処理が必要なのですが、難しそうなのでとりあえず保留しています。
発展した考えとしては、スレッドの数を動的に変更したり、キューにためられる数に制限を付けたり、キューに制限がある場合に Guarded Suspension パターンで待つか Balking で待つかを変更したりと、いろんな考えがあると思います。
これらをうまく汎用的に作りたいのですが、なかなかうまい方法が思いつきません。
やっぱり用途に応じて作り直した方がいいのかなぁ……って Future の時と同じことを考えてるおいらガイル。