清单 12.例子 thread_atomic.cc1 2 3 4 5 6 7 8 9 10 11 12 13 14 | static void
inc(atomic< int > *p ){
for ( int i = 0; i < COUNT; i++){
(*p)++;
}
}
void threadDataRacing( void ){
atomic< int > a(0) ;
thread ta( inc, &a);
thread tb( inc, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}
|
我们也可以使用 lock_guard,lock_guard 是一个范围锁,本质是 RAII(Resource Acquire Is Initialization),在构建的时候自动加锁,在析构的时候自动解锁,这保证了每一次加锁都会得到解锁。即使是调用函数发生了异常,在清理栈帧的时候也会调用它的析构函数得到解锁,从而保证每次加锁都会解锁,但是我们不能手工调用加锁方法或者解锁方法来进行更加精细的资源占用管理,使用 lock_guard 示例如下: 清单 13.例子 thread_lock_guard.cc1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | static mutex g_mutex;
static void
inc( int *p ){
for ( int i = 0; i < COUNT; i++){
lock_guard<mutex> _(g_mutex);
(*p)++;
}
}
void threadLockGuard( void ){
int a = 0;
thread ta( inc, &a);
thread tb( inc, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}
|
如果要支持手工加锁,可以考虑使用 unique_lock 或者直接使用 mutex。unique_lock 也支持 RAII,它也可以一次性将多个锁加锁;如果使用 mutex 则直接调用 mutex 类的 lock, unlock, trylock 等方法进行更加精细的锁管理: 清单 14.例子 thread_mutex.cc1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | static mutex g_mutex;
static void
inc( int *p ){
thread_local int i;
for (; i < COUNT; i++){
g_mutex.lock();
(*p)++;
g_mutex.unlock();
}
}
void threadMutex( void ){
int a = 0;
thread ta( inc, &a);
thread tb( inc, &a);
ta.join();
tb.join();
cout << "a=" << a << endl;
}
|
在上例中,我们还使用了线程本地存储 (TLS) 变量,我们只需要在变量前面声明它是 thread_local 即可。TLS 变量在线程栈内分配,线程栈只有在线程创建之后才生效,在线程退出的时候销毁,需要注意不同系统的线程栈的大小是不同的,如果 TLS 变量占用空间比较大,需要注意这个问题。TLS 变量一般不能跨线程,其初始化在调用线程第一次使用这个变量时进行,默认初始化为 0。 对于线程间的事件通知,C++11 提供了条件变量类 condition_variable,可视为 pthread_cond_t 的封装,使用条件变量可以让一个线程等待其它线程的通知 (wait,wait_for,wait_until),也可以给其它线程发送通知 (notify_one,notify_all),条件变量必须和锁配合使用,在等待时因为有解锁和重新加锁,所以,在等待时必须使用可以手工解锁和加锁的锁,比如 unique_lock,而不能使用 lock_guard,示例如下:
清单 15.例子 thread_cond_var.cc1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | #include <thread>
#include <iostream>
#include <condition_variable>
using namespace std;
mutex m;
condition_variable cv;
void threadCondVar( void ){
# define THREAD_COUNT 10
thread ** t = new thread *[THREAD_COUNT];
int i;
for (i = 0; i < THREAD_COUNT; i++){
t[i] = new thread ( []( int index){
unique_lock<mutex> lck(m);
cv.wait_for(lck, chrono::hours(1000));
cout << index << endl;
}, i );
this_thread::sleep_for( chrono::milliseconds(50));
}
for (i = 0; i < THREAD_COUNT; i++){
lock_guard<mutex> _(m);
cv.notify_one();
}
for (i = 0; i < THREAD_COUNT; i++){
t[i]->join();
delete t[i];
}
delete t;
}
|
从上例的运行结果也可以看到,条件变量是不保证次序的,即首先调用 wait 的不一定首先被唤醒。
几个高级概念 C++11 提供了若干多线程编程的高级概念:promise/future, packaged_task, async,来简化多线程编程,尤其是线程之间的数据交互比较简单的情况下,让我们可以将注意力更多地放在业务处理上。 promise/future 可以用来在线程之间进行简单的数据交互,而不需要考虑锁的问题,线程 A 将数据保存在一个 promise 变量中,另外一个线程 B 可以通过这个 promise 变量的 get_future() 获取其值,当线程 A 尚未在 promise 变量中赋值时,线程 B 也可以等待这个 promise 变量的赋值: 清单 16.例子 thread_promise_future.cc1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | promise<string> val;
static void
threadPromiseFuture(){
thread ta([](){
future<string> fu = val.get_future();
cout << "waiting promise->future" << endl;
cout << fu.get() << endl;
});
thread tb([](){
this_thread::sleep_for( chrono::milliseconds(100) );
val.set_value( "promise is set" );
});
ta.join();
tb.join();
}
|
一个 future 变量只能调用一次 get(),如果需要多次调用 get(),可以使用 shared_future,通过 promise/future 还可以在线程之间传递异常。 如果将一个 callable 对象和一个 promise 组合,那就是 packaged_task,它可以进一步简化操作:
清单 17.例子 thread_packaged_task.cc1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | static mutex g_mutex;
static void
threadPackagedTask(){
auto run = [=]( int index){
{
lock_guard<mutex> _(g_mutex);
cout << "tasklet " << index << endl;
}
this_thread::sleep_for( chrono::seconds(10) );
return index * 1000;
};
packaged_task< int ( int )> pt1(run);
packaged_task< int ( int )> pt2(run);
thread t1([&](){pt1(2);} );
thread t2([&](){pt2(3);} );
int f1 = pt1.get_future().get();
int f2 = pt2.get_future().get();
cout << "task result=" << f1 << endl;
cout << "task result=" << f2 << endl;
t1.join();
t2.join();
}
|
我们还可以试图将一个 packaged_task 和一个线程组合,那就是 async() 函数。使用 async() 函数启动执行代码,返回一个 future 对象来保存代码返回值,不需要我们显式地创建和销毁线程等,而是由 C++11 库的实现决定何时创建和销毁线程,以及创建几个线程等,示例如下:
清单 18.例子 thread_async.cc1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | static long
do_sum(vector< long > *arr, size_t start, size_t count){
static mutex _m;
long sum = 0;
for ( size_t i = 0; i < count; i++){
sum += (*arr)[start + i];
}
{
lock_guard<mutex> _(_m);
cout << "thread " << this_thread::get_id()
<< ", count=" << count
<< ", sum=" << sum << endl;
}
return sum;
}
static void
threadAsync(){
# define COUNT 1000000
vector< long > data(COUNT);
for ( size_t i = 0; i < COUNT; i++){
data[i] = random() & 0xff;
}
vector< future< long > > result;
size_t ptc = thread ::hardware_concurrency() * 2;
for ( size_t batch = 0; batch < ptc; batch++){
size_t batch_each = COUNT / ptc;
if (batch == ptc - 1){
batch_each = COUNT - (COUNT / ptc * batch);
}
result.push_back(async(do_sum, &data, batch * batch_each, batch_each));
}
long total = 0;
for ( size_t batch = 0; batch < ptc; batch++){
total += result[batch].get();
}
cout << "total=" << total << endl;
}
|
如果是在多核或者多 CPU 的环境上面运行上述例子,仔细观察输出结果,可能会发现有些线程 ID 是重复的,这说明重复使用了线程,也就是说,通过使用 async() 还可达到一些线程池的功能。
几个需要注意的地方 thread 同时也是棉线、毛线、丝线等意思,我想大家都能体会面对一团乱麻不知从何处查找头绪的感受,不要忘了,线程不是静态的,它是不断变化的,请想像一下面对一团会动态变化的乱麻的情景。所以,使用多线程技术的首要准则是我们自己要十分清楚我们的线程在哪里?线头(线程入口和出口)在哪里?先安排好线程的运行,注意不同线程的交叉点(访问或者修改同一个资源,包括内存、I/O 设备等),尽量减少线程的交叉点,要知道几条线堆在一起最怕的是互相打结。 当我们的确需要不同线程访问一个共同的资源时,一般都需要进行加锁保护,否则很可能会出现数据不一致的情况,从而出现各种时现时不现的莫名其妙的问题,加锁保护时有几个问题需要特别注意:一是一个线程内连续多次调用非递归锁 (non-recursive lock) 的加锁动作,这很可能会导致异常;二是加锁的粒度;三是出现死锁 (deadlock),多个线程互相等待对方释放锁导致这些线程全部处于罢工状态。 第一个问题只要根据场景调用合适的锁即可,当我们可能会在某个线程内重复调用某个锁的加锁动作时,我们应该使用递归锁 (recursive lock),在 C++11 中,可以根据需要来使用 recursive_mutex,或者 recursive_timed_mutex。 第二个问题,即锁的粒度,原则上应该是粒度越小越好,那意味着阻塞的时间越少,效率更高,比如一个数据库,给一个数据行 (data row) 加锁当然比给一个表 (table) 加锁要高效,但是同时复杂度也会越大,越容易出错,比如死锁等。
对于第三个问题我们需要先看下出现死锁的条件: - 资源互斥,某个资源在某一时刻只能被一个线程持有 (hold);
- 吃着碗里的还看着锅里的,持有一个以上的互斥资源的线程在等待被其它进程持有的互斥资源;
- 不可抢占,只有在某互斥资源的持有线程释放了该资源之后,其它线程才能去持有该资源;
- 环形等待,有两个或者两个以上的线程各自持有某些互斥资源,并且各自在等待其它线程所持有的互斥资源。
我们只要不让上述四个条件中的任意一个不成立即可。在设计的时候,非常有必要先分析一下会否出现满足四个条件的情况,特别是检查有无试图去同时保持两个或者两个以上的锁,当我们发现试图去同时保持两个或者两个以上的锁的时候,就需要特别警惕了。
|