[[PageNavi(NavigationList)]]
==== パイプライン処理の並列化 ====
並列処理の実装の1つに、行うべき処理を複数のステージに分割し、それぞれのステージを並列に実行するというものがある。これはパイプライン処理などといわれている。
パイプライン処理の例として、HTTPサーバーのアクセスログ整形を挙げてみよう。たとえばテキスト形式で1行に1件のログが記述されているログファイルに対し、「アクセス元IPアドレスからそのホスト名を取得、整形して出力する」というような処理を行う例を考えてみよう。この処理は、データに対して次のような操作を次々と適用するパイプライン処理で実装できる。
* ファイルから1行読み出し
* データのパース
* IPアドレスの変換
* 整形して出力
このとき、データのパースおよびIPアドレスの変換処理については複数のスレッドで同時に実行できる。一方、行番号の付加と出力については読み出した順に行う必要がある。このように順序付きでデータをパイプライン処理するために、TBBには「pipeline」というアルゴリズムが用意されている。pipelineは「filter」クラスの派生クラスとして実装された処理を、入力データに対して次々と行っていくものだ。
上記のアクセスログ整形処理をTBBを実装した例が、次の'''リスト6'''になる。
'''リスト6 TBBでアクセスログ整形処理を実装する例(抜粋)'''
{{{
#includestring
#includeiostream
#includefstream
#include "tbb/task_scheduler_init.h"
#include "tbb/pipeline.h"
#include "boost/regex.hpp"
using namespace std;
:
:
// アクセスログ1件を表すクラス
class AccessLogItem {
// 元データ
string buffer;
boost::regex regex;
// パース後のデータ
string ipAddr;
string hostname;
string date;
string request;
string resultCode;
string bites;
public:
string parsed;
AccessLogItem();
// 入力ストリームから1行分のデータを読み込む
// 成功すればtrue、失敗すればfalseを返す
bool ReadFromIStream(ifstream ifs);
// 元データをパースする
// 成功すればtrue、失敗すればfalseを返す
bool Parse();
// IPアドレスをホスト名に変換
// 成功すればtrue、失敗すればfalseを返す
bool IPaddrToHostname();
// 出力用に整形したstringを生成
string Format();
};
:
:
// 各ステージでの処理を定義したフィルタを記述
// アクセスログをパースする
class ParseFilter: public tbb::filter {
public:
// 派生元クラスのコンストラクタを呼ぶ
// このステージは並列実行が可能なので、引数にfalseを指定
ParseFilter() :
filter(false)
{}
void* operator()( void* item ) {
AccessLogItem a = *static_castAccessLogItem*(item);
a.Parse();
returna;
}
};
// 入力ストリームから1行分のアクセスログを読み込む
class InputFilter: public tbb::filter {
private:
ifstream ifs;
AccessLogItem* buffer;
int bufSize;
int nextBuffer;
public:
// 派生元クラスのコンストラクタを呼ぶ
// このステージは並列実行が不可なので、引数にtrueを指定
InputFilter( ifstream inputStream, int size_of_buffer );
~InputFilter();
void* operator()(void*);
};
InputFilter::InputFilter( ifstream inputStream, int size_of_buffer ):
filter(true),
ifs(inputStream),
bufSize(size_of_buffer),
buffer(NULL),
nextBuffer(0)
{
buffer = new AccessLogItem[bufSize];
}
InputFilter::~InputFilter() {
delete[] buffer;
}
void* InputFilter::operator()(void*) {
if( nextBuffer == bufSize - 1 ) {
nextBuffer = 0;
} else {
nextBuffer++;
}
if( buffer[nextBuffer].ReadFromIStream(ifs) ) {
return(buffer[nextBuffer]);
} else {
return NULL;
}
}
// IPアドレスをホスト名に変換
class IPaddrToHostnameFilter: public tbb::filter {
public:
// 派生元クラスのコンストラクタを呼ぶ
// このステージは並列実行が可能なので、引数にfalseを指定
IPaddrToHostnameFilter() :
filter(false)
{}
void* operator()( void* item ) {
AccessLogItem a = *static_castAccessLogItem*(item);
a.IPaddrToHostname();
returna;
}
};
// 整形したログを出力
class OutputFilter: public tbb::filter {
private:
ostream os;
public:
// 派生元クラスのコンストラクタを呼ぶ
// このステージは並列実行が不可なので、引数にtrueを指定
OutputFilter( ostream outputStream ) :
filter(true),
os(outputStream)
{}
void* operator()( void* item ) {
AccessLogItem a = *static_castAccessLogItem*(item);
os a.Format() endl;
return NULL;
}
};
int _tmain(int argc, _TCHAR* argv[])
{
tbb::task_scheduler_init init;
:
:
// パイプラインを作成
tbb::pipeline p;
int n_buffer = 10;
// 入力ストリームから読み込むパイプラインを作成、登録
ifstream ifs;
:
:
InputFilter inputFilter(ifs, n_buffer);
p.add_filter(inputFilter);
// アクセスログをパースするパイプラインを作成、登録
ParseFilter parseFilter;
p.add_filter(parseFilter);
// IPアドレスをホスト名に変換するパイプラインを作成、登録
IPaddrToHostnameFilter ip2hostFilter;
p.add_filter(ip2hostFilter);
// 整形したログを出力するパイプラインを作成、登録
ofstream ofs;
:
:
OutputFilter outputFilter(ofs);
p.add_filter(outputFilter);
:
:
// パイプラインを実行
// 引数には処理に使用するバッファ数を指定
:
:
p.run( n_buffer );
:
:
return 0;
}
}}}
[[PageNavi(NavigationList)]]