事情是這樣的,最近接到要把原本的架構套用 producer-consumer 的方式:一條 thread 產生資料、另一條 thread 處理資料 (當然,這邊只是為了方便討論所以都先假定各一條 thread),以此增加平行度。不過雖然這種方式很常見,要在原本沒有規劃的程式上套用起來還是會改動不少地方,而且為了讓處理邏輯比較獨立、清晰,就想了個方法讓這些改動可以儘可能的集中、且只需要儘可能少的修改現有邏輯。最後最重要的是:想要撤銷 producer-consumer 變回原本 sequential 的架構時也儘可能的簡單,換句話說:能輕易套用不同的資料處理模式。
好的,最後解法簡單來說就是利用私有繼承 (private inheritance):在 base class 定義好想要的 execution policy (以這邊的例子來說就是 producer-consumer 的架構),derived class 利用 private inheritance 取得 policy 的同時就能藉由重新定義特定的 member function 去處理拿到的資料。上例子:
class MyClass
{
public:
void newData(int d) {
std::cout << d << '\n';
}
};
假設上面這個 class 是原本的程式邏輯:外部的程式會呼叫 newData(),而 MyClass 則會把丟進來的值直接印出來。如果我們想套用 producer-consumer 的架構,也就是讓外部呼叫 newData() 後,把值丟給 MyClass 後就撒手不管去生出新的值 (producer)、MyClass 則是接手後交給另一條 thread 去消化這個值 (consumer),則我們可以這樣做:
struct Data { int data; bool is_last; }; class ProduceConsume { public:
ProduceConsume
() { activate(); } virtual ~
ProduceConsume
() { wait(); } void activate() { // at least one async thread is activated if (this->future_ret.valid()) { return; } this->future_ret = std::async(&ProduceConsume::consume, this); } void wait() { // this condition is important since a std::future object // may raise error/exception when it does not have a shared status if (this->future_ret.valid() == false) { return; } const Data last{0, true}; // used to indicate the last element this->future_ret.wait(); } template <typename Args ...> void produce(Args&& ... args) { this->buffer.push( T{std::forward<Args>(args)...} ); } private: void consume() { Data element; while (true) { if (buffer.try_pop(element)) { this->execute(element); } if (element.is_last) { break; } } } // override this function when derived class has its own logic // to process data virtual void execute(Data&) { return; } tbb::concurrent_queue<Data> buffer; std::future<void> future_ret; }; class MyClass : private
ProduceConsume
{ public: void newData(int d) { this->produce(d, false); } private: void execute(Data& data) { if (data.is_last) { return; } std::cout << data.data << '\n'; } };
可以發現 MyClass 整體架構基本上是差不多的,差別只是現在 newData() 只有把資料透過 produce() 塞進 ProduceConsume class 的 buffer,而有另一個 function execute() 會負責把 buffer 內的值印出來。所有 producer-consumer 相關的邏輯都寫在 ProduceConsume class 內:一個 concurrent queue (這邊是利用 intel TBB - thread building block);constructor 時會啟動一條 thread 負責從 buffer 內拿資料並且呼叫 execute() 處理。
想切換成 sequential 版本也不難,提供 sequential 的 base class 就好:
class Sequential
{
public:
Sequential() = default;
virtual ~Sequential() = default;
template <typename Args ...>
void produce(Args&& ... args) {
this->execute( T{std::forward<Args>(args)...} );
}
private:
virtual void execute(Data&) { return; }
};
class MyClass : private Sequential
{
public:
void newData(int d) {
this->produce(d, false);
}
private:
void execute(Data& data) {
if (data.is_last) { return; }
std::cout << data.data << '\n';
}
};
那如果想要把這套機製做成 template 版本讓這個運作機制可以跟 type 無關要怎麼辦? 這點可以這樣改:
template<typename T, typename Derived>
class ProduceConsume
{
public:
virtual ~ProduceConsume() { ... }
void activate() { ... }
void wait() { ... }
template <typename Args ...>
void produce(Args&& ... args) { ... }
private:
// move constructor to private can avoid misuse and make compiler complain
// for example:
// class AnotherClass : private ProduceConsume<Data, MyClass> -> compile error
friend Derived;
ProduceConsume() { ... }
void consume() {
T element;
while (true) {
if (buffer.try_pop(element)) {
// CRTP here to avoid using virtual function in a template class
static_cast<Derived*>(this)->execute(element);
}
if (this->isLast(element)) { break; }
}
}
void setLast(T& element);
void isLast(const T& element) const;
void execute(Data&) { return; }
tbb::concurrent_queue<Data> buffer;
std::future<void> future_ret;
};
class MyClass : private ProduceConsume<Data, MyClass>
{
public:
void newData(int d) { ... }
private:
friend ProduceConsume<Data, MyClass>;
void execute(Data& data) { ... }
};
這邊有幾個重點:
- 利用 CRTP (curiously recurring template pattern) 讓 template class 不會有 virtual member function
- 改用 setLast() / isLast() 去判斷資料是不是最後一筆
setLast() / isLast() 可能要更好的做法,像是利用 type_traits 或是直接假設 type 一定有類似的 member function ... 等等;又或者是採用其他 condition variable 來判斷是不是要終止 thread 等等,有非常多樣的選擇可以調整。但是 CRTP 是目前我想不到其他既要做成 template class 又要保有彈性的作法,要以一句話簡單瞭解 CRTP 的精神的話,相對於 virtual function 是動態多型、CRTP 則是靜態多型。
當然這個機制可以做得更好啦,像是 producer/cosumer 各有 N/M 條 thread、bounded size buffer 下的運作模式、thread pool 且可動態調整 producer / consumer 的 thread 數量 ... 等等,但是不論是哪一種方式,大抵上都能套用上面的方式簡單的套用在既有的邏輯上
你可以讓produceconsume的生命週期就當作是thread的退出條件啊
回覆刪除