2018年10月1日

[C++] A Simple Wrapper on A Non-Thread Safe Container for Thread Safety

大部分的 container 像是 STL 的所有 container 通常都不是 thread safe 的,因為如果要做到 thread safe 的設計多半都要犧牲一些效能,但是大多數情況其實我們不會需要這種設計,因此不考慮 thread safe 對於這類考量泛用類型的架構是合理的。當然如果一開始就有考慮做成 multi-thread 的架構,也有許多 thread-safe container 可以使用,像是 Boost 或是 Intel Thread Building Block (TBB)。

但是假設今天用了 STL 的 container 後一段時間才想要改用 multi-thread 架構的話要怎麼辦?方法大致上就 2 種:
  1. 換掉 container:不過多半連既有程式的使用方式也要跟著做修改
  2. 想辦法把原本的 container 改成 thread-safe
第 1 點當然會是比較保險的,但是換 library 是大事,特別是如果想用的 library 沒有對應的 API 可以對回去原本 library 的用法那就很麻煩了。所以這邊提供第二種方式:增加一個 wrapper,在盡量不改變 usage 的情況下提供 thread safety。

概念說來其實很簡單 (請參考最後面的範例程式):
  1. MyVector 是原先設計的沒有 thread safe 的 container
  2. 外部所有的操作 (不論讀或寫) 都無法直接對 MyVector 內的資料做修改,而是必須透過中介的 MyVector<T>::DataWrapper 做處理
  3. MyVector<T>::DataWrapper 提供兩種操作方式
    • assignment operator:專門處理資料寫入
    • casting operator:專門處理資料讀取
  4. MyVector<T>::DataWrapper 中控處理 concurrency,確保讀或寫能 thread-safe

只是這種作法依舊有其限制就是,會受限於 compiler 對於 casting operator 的使用規範,所以對於無法採用 casting operator 的情境下寫法上就會變成必須要先使用 static_cast 做轉型後才能使用 (請參考範例程式中 read function)。不過這種方式應該能對既有的程式達到最小的修改幅度。


範例程式:

#include <cstdio>
#include <iostream>
#include <string>
#include <vector>
#include <mutex>
#include <thread>
#include <chrono>
using namespace std;
using namespace std::chrono_literals;
template <typename T>
class MyVector
{
private:
struct DataWrapper {
vector<T>& data_ref;
mutex& data_mutex_ref;
const size_t index;
DataWrapper(vector<T>& dr, mutex& dmr, const size_t idx) : data_ref(dr), data_mutex_ref(dmr), index(idx) {}
DataWrapper(const DataWrapper&) = default;
~DataWrapper() = default;
DataWrapper& operator= (const DataWrapper&) = default;
DataWrapper& operator= (const T& rhs);
operator T () const;
};
public:
DataWrapper operator[] (const size_t index);
private:
typedef lock_guard<mutex> lock_type;
vector<T> data;
mutex data_mutex;
};
template <typename T>
typename MyVector<T>::DataWrapper MyVector<T>::operator[] (const size_t index)
{
if (index >= this->data.size()) {
lock_type lock(this->data_mutex);
if (0 != index) { this->data.resize(index << 1); }
else { this->data.resize(1); }
}
return DataWrapper(this->data, this->data_mutex, index);
}
template <typename T>
typename MyVector<T>::DataWrapper& MyVector<T>::DataWrapper::operator= (const T& rhs)
{
lock_type lock(this->data_mutex_ref);
this->data_ref.at(this->index) = rhs;
return *this;
}
template <typename T>
MyVector<T>::DataWrapper::operator T () const
{
lock_type lock(this->data_mutex_ref);
return this->data_ref.at(this->index);
}
struct MyData
{
string name;
size_t data;
MyData() = default;
MyData(const char* n, const size_t d) : name(n), data(d) {}
string tostr() const { return ("name: " + name + "\ndata: " + to_string(data)); }
};
MyVector<MyData> buffer;
void read(const size_t total)
{
for (size_t i = 0; i < total; ++i) {
std::this_thread::sleep_for(50ms);
MyData x = buffer[i]; // valid, calling casting operator
// invalid, it will try to find a member function 'tostr' in MyVector<T>::DataWrapper but not in MyData
// after casting the return type MyVector<T>::DataWrapper to MyData&
//cout << buffer[i].tostr() << '\n';
cout << static_cast<MyData&>(buffer[i]).tostr() << '\n';
}
}
void write(const size_t total)
{
for (size_t i = 0; i < total; ++i) {
buffer[i] = MyData("MyData", i + 1);
std::this_thread::sleep_for(50ms);
}
}
int main()
{
constexpr size_t TOTAL = 15;
thread tw(write, TOTAL);
thread tr(read, TOTAL);
tw.join();
tr.join();
return 0;
}

沒有留言:

張貼留言