2020年6月5日

[C++] Easy Way to Apply Producer-Consumer Parallelism

事情是這樣的,最近接到要把原本的架構套用 producer-consumer 的方式:一條 thread 產生資料、另一條 thread 處理資料 (當然,這邊只是為了方便討論所以都先假定各一條 thread),以此增加平行度。不過雖然這種方式很常見,要在原本沒有規劃的程式上套用起來還是會改動不少地方,而且為了讓處理邏輯比較獨立、清晰,就想了個方法讓這些改動可以儘可能的集中、且只需要儘可能少的修改現有邏輯。最後最重要的是:想要撤銷 producer-consumer 變回原本 sequential 的架構時也儘可能的簡單,換句話說:能輕易套用不同的資料處理模式
好的,最後解法簡單來說就是利用私有繼承 (private inheritance):在 base class 定義好想要的 execution policy (以這邊的例子來說就是 producer-consumer 的架構),derived class 利用 private inheritance 取得 policy 的同時就能藉由重新定義特定的 member function 去處理拿到的資料。上例子:
class MyClass
{
public:
    void newData(int d) {
        std::cout << d << '\n';
    }
};
假設上面這個 class 是原本的程式邏輯:外部的程式會呼叫 newData(),而 MyClass 則會把丟進來的值直接印出來。如果我們想套用 producer-consumer 的架構,也就是讓外部呼叫 newData() 後,把值丟給 MyClass 後就撒手不管去生出新的值 (producer)、MyClass 則是接手後交給另一條 thread 去消化這個值 (consumer),則我們可以這樣做:
struct Data {
    int data;
    bool is_last;
};

class ProduceConsume
{
public:
    ProduceConsume()  { activate(); }
    virtual ~ProduceConsume() { wait(); }
	
    void activate() {
        // at least one async thread is activated
        if (this->future_ret.valid()) { return; }
        this->future_ret = std::async(&ProduceConsume::consume, this);
    }
	
    void wait() {
        // this condition is important since a std::future object
        // may raise error/exception when it does not have a shared status
        if (this->future_ret.valid() == false) { return; }
        const Data last{0, true}; // used to indicate the last element
        this->future_ret.wait();
    }
	
    template <typename Args ...>
    void produce(Args&& ... args) {
        this->buffer.push( T{std::forward<Args>(args)...} );
    }
	
private:
    void consume() {
        Data element;
        while (true) {
            if (buffer.try_pop(element)) {
                this->execute(element);
            }
            if (element.is_last) { break; }
        }
    }

    // override this function when derived class has its own logic
    // to process data
    virtual void execute(Data&) { return; }

    tbb::concurrent_queue<Data> buffer;
    std::future<void> future_ret;
};

class MyClass : private ProduceConsume
{
public:
    void newData(int d) {
        this->produce(d, false);
    }

private:
    void execute(Data& data) {
        if (data.is_last) { return; }
        std::cout << data.data << '\n';
    }
};
可以發現 MyClass 整體架構基本上是差不多的,差別只是現在 newData() 只有把資料透過 produce() 塞進 ProduceConsume class 的 buffer,而有另一個 function execute() 會負責把 buffer 內的值印出來。所有 producer-consumer 相關的邏輯都寫在 ProduceConsume class 內:一個 concurrent queue (這邊是利用 intel TBB - thread building block);constructor 時會啟動一條 thread 負責從 buffer 內拿資料並且呼叫 execute() 處理。
想切換成 sequential 版本也不難,提供 sequential 的 base class 就好
class Sequential
{
public:
    Sequential() = default;
    virtual ~Sequential() = default;
	
    template <typename Args ...>
    void produce(Args&& ... args) {
        this->execute( T{std::forward<Args>(args)...} );
    }
	
private:
    virtual void execute(Data&) { return; }
};

class MyClass : private Sequential
{
public:
    void newData(int d) {
        this->produce(d, false);
    }

private:
    void execute(Data& data) {
        if (data.is_last) { return; }
        std::cout << data.data << '\n';
    }
};
那如果想要把這套機製做成 template 版本讓這個運作機制可以跟 type 無關要怎麼辦? 這點可以這樣改:
template<typename T, typename Derived>
class ProduceConsume
{
public:
    virtual ~ProduceConsume() { ... }	
    void activate() { ... }
    void wait() { ... }
	
    template <typename Args ...>
    void produce(Args&& ... args) { ... }
	
private:
    // move constructor to private can avoid misuse and make compiler complain
    // for example:
    // class AnotherClass : private ProduceConsume<Data, MyClass> -> compile error
    friend Derived;
    ProduceConsume()  { ... }

    void consume() {
        T element;
        while (true) {
            if (buffer.try_pop(element)) {
                // CRTP here to avoid using virtual function in a template class
                static_cast<Derived*>(this)->execute(element);
            }
            if (this->isLast(element)) { break; }
        }
    }
	
    void setLast(T& element);
    void isLast(const T& element) const;
    void execute(Data&) { return; }

    tbb::concurrent_queue<Data> buffer;
    std::future<void> future_ret;
};

class MyClass : private ProduceConsume<Data, MyClass>
{
public:
    void newData(int d) { ... }

private:
    friend ProduceConsume<Data, MyClass>;

    void execute(Data& data) { ... }
};
這邊有幾個重點:
  1. 利用 CRTP (curiously recurring template pattern) 讓 template class 不會有 virtual member function
  2. 改用 setLast() / isLast() 去判斷資料是不是最後一筆
setLast() / isLast() 可能要更好的做法,像是利用 type_traits 或是直接假設 type 一定有類似的 member function ... 等等;又或者是採用其他 condition variable 來判斷是不是要終止 thread 等等,有非常多樣的選擇可以調整。但是 CRTP 是目前我想不到其他既要做成 template class 又要保有彈性的作法,要以一句話簡單瞭解 CRTP 的精神的話,相對於 virtual function 是動態多型、CRTP 則是靜態多型。

當然這個機制可以做得更好啦,像是 producer/cosumer 各有 N/M 條 thread、bounded size buffer 下的運作模式、thread pool 且可動態調整 producer / consumer 的 thread 數量 ... 等等,但是不論是哪一種方式,大抵上都能套用上面的方式簡單的套用在既有的邏輯上

1 則留言:

  1. 你可以讓produceconsume的生命週期就當作是thread的退出條件啊

    回覆刪除