ネタ元 → 並列プログラミングの効率的なデバッグを実現する「Parallel Inspector」
# 記事の本筋とは離れるのでネタ元にコメントするのは差し控えるけども。
このサンプル、ひとつの変数に複数のスレッドからアクセスするときの
問題を示しているわけだが、いわゆる「生産者/消費者(producer/consumer)」の
イディオムにはなってねぇ!
生産者/消費者パターンちうのは:
- 生産者は製品を製造し棚に並べる。棚が一杯なら空くまで待つ。
- 消費者は製品を棚から取る。棚が空なら品出しを待つ。
生産者スレッドの生産能力 と 消費者スレッドの購買能力 には
バラつき/ムラがあり、そのバラつきを吸収するのが棚(在庫)なんだが、
棚の容量には限りがあるのでたまにどちらかが待つことになります。
MSDN漁ってみたらばまさにドンピシャなサンプルがありました。
方法 : producer スレッドと consumer スレッドを同期する (C# プログラミング ガイド)
ここんとこC++/CLIから離れてたのでリハビリがてらportしてみた。
※ C++/CLI には lock 構文を持ち合わせてないんで、MonitorのEnter/Exitで代用
using namespace System;
using namespace System::Threading;
using namespace System::Collections::Generic;
// 同期イベント: 「アイテムが追加された」と「終了しろ」
public ref class SyncEvents {
public:
SyncEvents() {
eventArray_ = gcnew array<EventWaitHandle^>(2);
eventArray_[0] = gcnew AutoResetEvent(false);
eventArray_[1] = gcnew ManualResetEvent(false);
}
property EventWaitHandle^ ExitThreadEvent // 「終了しろ」
{ EventWaitHandle^ get() { return eventArray_[1]; } }
property EventWaitHandle^ NewItemEvent // 「アイテムが追加された」
{ EventWaitHandle^ get() { return eventArray_[0]; } }
property array<EventWaitHandle^>^ EventArray // ふたつまとめて
{ array<EventWaitHandle^>^ get() { return eventArray_; } }
private:
// eventArray_[0]:「アイテムが追加された」
// eventArray_[1]:「終了しろ」
array<EventWaitHandle^>^ eventArray_;
};
/*
* 生産者
*/
public ref class Producer {
public:
Producer(Queue<int>^ q, SyncEvents^ e) : queue_(q), syncEvents_(e) {}
// Producer.ThreadRun
void ThreadRun() {
int count = 0;
Random r;
// 「終了しろ」でない間、Queueにアイテムを追加する
while (!syncEvents_->ExitThreadEvent->WaitOne(0, false)) {
Object^ sync = ((System::Collections::ICollection^)queue_)->SyncRoot;
Monitor::Enter(sync);
try {
while ( queue_->Count < 20 ) { // 倉庫に余裕がある間
queue_->Enqueue(r.Next(0,100)); // アイテムを追加して
syncEvents_->NewItemEvent->Set(); // 「アイテムが追加された」イベント発行
count++;
}
} finally {
Monitor::Exit(sync);
}
}
Console::WriteLine(L"Producer thread: produced {0} items", count);
}
private:
Queue<int>^ queue_;
SyncEvents^ syncEvents_;
};
/*
* 消費者
*/
public ref class Consumer {
public:
Consumer(Queue<int>^ q, SyncEvents^ e) : queue_(q), syncEvents_(e) {}
// Consumer.ThreadRun
void ThreadRun() {
int count = 0;
// イベント:「終了しろ」「アイテムが追加された」のいずれかを待ち、
// そのイベントが「終了しろ」でない間
while (WaitHandle::WaitAny(syncEvents_->EventArray) != 1) {
Object^ sync = ((System::Collections::ICollection^)queue_)->SyncRoot;
Monitor::Enter(sync);
try {
int item = queue_->Dequeue(); // アイテムを取り出す
} finally {
Monitor::Exit(sync);
}
count++;
}
Console::WriteLine(L"Consumer Thread: consumed {0} items", count);
}
private:
Queue<int>^ queue_;
SyncEvents^ syncEvents_;
};
/*
* 生産者/消費者を起こすメイン・スレッド
*/
public ref class ThreadSyncSample {
private:
// 在庫(Queue)の内容をダンプ
static void ShowQueueContents(Queue<int>^ q) {
Object^ sync = ((System::Collections::ICollection^)q)->SyncRoot;
Monitor::Enter(sync);
try {
for each (int item in q ) {
Console::Write(L"{0} ", item);
}
} finally {
Monitor::Exit(sync);
}
Console::WriteLine();
}
public:
static void Main() {
Queue<int> queue = gcnew Queue<int>();
SyncEvents syncEvents;
// 生産者/消費者を生成し、それぞれのエントリでスレッドを用意
Console::WriteLine(L"Configuring worker threads...");
Producer producer(%queue, %syncEvents);
Consumer consumer(%queue, %syncEvents);
Thread producerThread(gcnew ThreadStart(%producer, &Producer::ThreadRun));
Thread consumerThread(gcnew ThreadStart(%consumer, &Consumer::ThreadRun));
// 両スレッドを開始
Console::WriteLine(L"Launching producer and consumer threads...");
producerThread.Start();
consumerThread.Start();
// たまにQueueをダンプ
for (int i=0; i<4; i++ ) {
Thread::Sleep(2500);
ShowQueueContents(%queue);
}
// 「終了しろ」イベントを起こす
Console::WriteLine(L"Signaling threads to terminate...");
syncEvents.ExitThreadEvent->Set();
// 両スレッドの終了を待つ
producerThread.Join();
consumerThread.Join();
}
};
int main() {
ThreadSyncSample::Main();
}