I/O Completion Port Part2

GetQueuedCompletionStatusで待機中のスレッドを解放するのは

I/O処理が完了しただけじゃなくPostQueuedCompletionStatusを呼び出すことでも可能なのです。

PostQueuedCompletionStatusはAPIですから好きなときに呼び出せます。

I/Oとはぜんぜん関係ないよないよ。

これやばくないですか・・・?

だって・・I/O関係なしにコアの数だけスレッドががんがん仕事してるシステムってことですよね。

ある意味最高の並列処理ができるフレームワークじゃないかな?


という訳で、早速つくってみました♪

1万の配列の合計を並列計算するだけなのですが活用方法はいろいろありそうです

問題は作るのが面倒ってことですけどね・・

そんなわけで計算処理の終了の同期はしてません

もちろんエラー処理もしてません(キリッ

参考までに



追記

計算処理同期版にしてみたけどおかしいので戻しました^p^

ただの足し算程度じゃsingleのほうがよっぽど早いみたいです^w^

あうあう・・

#include <vector>
#include <functional>
#include <algorithm>
#include <numeric>

#include <boost/function.hpp>
#include <boost/bind.hpp>

#include <windows.h>
#include <process.h>

struct ParallelOperation
{
	OVERLAPPED overlapped_;
	boost::function<void ()> op_;

	ParallelOperation()
		:overlapped_()
		,op_()
	{
		memset(&overlapped_,0,sizeof(overlapped_));
	}
	~ParallelOperation()
	{
	}
};

struct CompletionPort
{
	HANDLE completionPort_;
	std::vector<HANDLE> threads_;

	CompletionPort()
	{
		completionPort_=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);

		SYSTEM_INFO systemInfo;
		GetSystemInfo(&systemInfo);
		for(unsigned long i=0;i<systemInfo.dwNumberOfProcessors*2;++i)
		{
			unsigned int id;
			uintptr_t ret=_beginthreadex(0,0,&PrallelWorkingPool,this,0,&id);
			threads_.push_back((HANDLE)ret);
		}
	}
	~CompletionPort()
	{
		CloseHandle(completionPort_);
	}

	void StartParallelOperation(std::vector<ParallelOperation>& vec)
	{
		for(size_t i=0;i<vec.size();++i)
			PostQueuedCompletionStatus(completionPort_,0,0,(LPOVERLAPPED)&vec[i]);
	}

	void WaitForExit()
	{
		for(size_t i=0;i<threads_.size();++i)
			PostQueuedCompletionStatus(completionPort_,0,0,0);
		WaitForMultipleObjects((unsigned long)threads_.size()
			,&threads_[0],TRUE,INFINITE);
		for(size_t i=0;i<threads_.size();++i)
			CloseHandle(threads_[i]);
	}

	static unsigned int __stdcall PrallelWorkingPool(void* arguments)
	{
		CompletionPort* This=(CompletionPort*)arguments;
		OVERLAPPED* lpOverlapped;
		while(1)
		{
			unsigned long numberOfBytesTransferred=0;
			ULONG_PTR CompletionKey;
			GetQueuedCompletionStatus(This->completionPort_,
				&numberOfBytesTransferred,&CompletionKey,&lpOverlapped,INFINITE);
			if(lpOverlapped==0)
				break;

			((ParallelOperation*)lpOverlapped)->op_();
		}
		_endthreadex(0);
		return 0;
	}
};

struct Answer
{
	volatile LONG value;
	Answer():value(0){}
	void Add(LONG v)
	{
		InterlockedExchangeAdd(&value,v);
	}
};

void Calc(Answer& answer,LONG* begin,LONG* end)
{
	std::for_each(begin,end,boost::bind(&Answer::Add,boost::ref(answer),_1));
}

int _tmain(int argc, _TCHAR* argv[])
{
	const int maxElementSize=100000000;
	const int parallel=10;
	CompletionPort completionPort;

	std::vector<LONG> vec;
	std::generate_n(std::back_inserter(vec),maxElementSize,boost::bind(rand));

	Answer ans;
	std::vector<ParallelOperation> addOp(parallel);
	for(int i=0;i<parallel;++i)
	{
		addOp[i].op_=boost::bind(&Calc,boost::ref(ans)
			,&vec[0]+(maxElementSize/parallel)*i
			,&vec[0]+(maxElementSize/parallel)*(i+1));
	}

	completionPort.StartParallelOperation(addOp);
	completionPort.WaitForExit();

	printf("parallel answer=%d \n",ans.value);

	LONG answer=std::accumulate(vec.begin(),vec.end(),0);
	printf("single answer=%d \n",answer);

	return 0;
}