C++并发编程教程

This commit is contained in:
Estom
2021-09-07 10:14:04 +08:00
parent 32e2f87c5e
commit 651a2fe58e
335 changed files with 29689 additions and 10 deletions

View File

@@ -0,0 +1,352 @@
# multi_threading
《C++并发编程实战》的读书笔记,供以后工作中查阅。
## 第一章
- 何谓并发和多线程
并发:单个系统里同时执行多个独立的活动。
多线程:每个线程相互独立运行,且每个线程可以运行不同的指令序列。但进程中所有线程都共享相同的地址空间,并且从所有的线程中访问到大部分数据。
- 为什么要在应用程序中使用并发和多线程
关注点分离DVD程序逻辑分离和性能加快程序运行速度
- 一个简单的C++多线程程序是怎么样的
[清单1.1 一个简单的Hello,Cuncurrent World程序](https://github.com/xuyicpp/multi_threading/blob/master/chapter01/example1_1.cpp)
## 第二章
- 启动线程,以及让各种代码在新线程上运行的方法
多线程在分离detach的时候离开局部函数后会在后台持续运行直到程序结束。如果仍然需要访问局部函数的变量就会造成悬空引用的错误
[清单2.1 当线程仍然访问局部变量时返回的函数](https://github.com/xuyicpp/multi_threading/blob/master/chapter02/example2_1.cpp)
解决上述错误的一个常见的方式,使函数自包含,并且把数据复制到该线程中而不是共享数据。
std::thread是支持移动的如同std::unique_ptr是可移动的而非可复制的。以下是两个转移thread控制权的例子
[清单2.5 从函数中返回std::thread,控制权从函数中转移出](https://github.com/xuyicpp/multi_threading/blob/master/chapter02/example2_5.cpp)、[清单2.6 scoped_thread和示例用法,一旦所有权转移到该对象其他线程就不就可以动它了,保证退出一个作用域线程完成](https://github.com/xuyicpp/multi_threading/blob/master/chapter02/example2_6.cpp)
- 等待线程完成并让它自动运行
在当前线程的执行到达f末尾时局部对象会按照构造函数的逆序被销毁因此thread_guard对象g首先被销毁。所以使用thread_guard类可以保证std::thread对象被销毁前在thread_guard析构函数中调用join。
[清单2.3 使用RAII等待线程完成](https://github.com/xuyicpp/multi_threading/blob/master/chapter02/example2_3.cpp)
- 唯一地标识线程
线程标识符是std::thread::id类型的
1.通过与之相关联的std::thread对象中调用get_id()。
2.当前线程的标识符可以调用std::this_thread::get_id()获得。
## 第三章
- 线程间共享数据的问题
所有线程间共享数据的问题,都是修改数据导致的(竞争条件)。如果所有的共享数据都是只读的,就没问题,因为一个线程所读取的数据不受另一个线程是否正在读取相同的数据而影响。
避免有问题的竞争条件
1.用保护机制封装你的数据结构,以确保只有实际执行修改的线程能够在不变量损坏的地方看到中间数据。
2.修改数据结构的设计及其不变量,从而令修改作为一系列不可分割的变更来完成,每个修改均保留其不变量。者通常被称为无锁编程,且难以尽善尽美。
- 用互斥元保护数据
在[清单3.1 用互斥元保护列表](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_1.cpp)中有一个全局变量它被相应的std::mutex的全局实例保护。在add_to_list()以及list_contains()中对std::lock_guard<std::mutex>的使用意味着这些函数中的访问是互斥的list_contains()将无法再add_to_list()进行修改的半途看到该表。
注意:一个迷路的指针或引用,所有的保护都将白费。在[清单3.2 意外地传出对受保护数据的引用](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_2.cpp)展示了这一个错误的做法。
发现接口中固有的竞争条件这是一个粒度锁定的问题就是说锁定从语句上升到接口了书中用一个stack类做了一个扩展详见[清单3.5 一个线程安全栈的详细类定义](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_5.cpp)
死锁:问题和解决方案:为了避免死锁,常见的建议是始终使用相同的顺序锁定者两个互斥元。
std::lock函数可以同时锁定两个或更多的互斥元而没有死锁的风险。
常见的思路:
- 避免嵌套锁
- 在持有锁时,避免调用用户提供的代码
- 以固定顺序获取锁
这里有几个简单的事例:[清单3.7 使用锁层次来避免死锁](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_7.cpp)、[清单3.9 用std::unique_lock灵活锁定](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_9.cpp)
锁定在恰当的粒度
特别的在持有锁时不要做任何耗时的活动比如文件的I/O。
一般情况下只应该以执行要求的操作所需的最小可能时间而去持有锁。这也意味着耗时的操作比如获取获取另一个锁即便你知道它不会死锁或是等待I/O完成都不应该在持有锁的时候去做除非绝对必要。
在[清单3.10 在比较运算符中每次锁定一个互斥元](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_10.cpp)虽然减少了持有锁的时间,但是也暴露在竞争条件中去了。
- 用于保护共享数据的替代工具
二次检测锁定模式,注意这个和单例模式中的饱汉模式不一样,它后面有对数据的使用
```
void undefined_behaviour_with_double_checked_locking()
{
if(!resource_ptr)
{
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr)
{
resoutce_ptr.reset(new some_resource);
}
}
resource_ptr->do_something();
}
```
它有可能产生恶劣的竞争条件,因为在锁外部的读取与锁内部由另一线程完成的写入不同步。这就因此创建了一个竞争条件,不仅涵盖了指针本身,还涵盖了指向的对象。
C++标准库提供了std::once_flag和std::call_once来处理这种情况。使用std::call_once比显示使用互斥元通常会由更低的开销特别是初始化已经完成的时候应优先使用。[清单3.12 使用std::call_once的线程安全的类成员延迟初始化](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_12.cpp)
保护很少更新的数据结构例如DNS缓存使用读写互斥元单个“写”线程独占访问或共享由多个“读”线程并发访问。
[清单3.13 使用boost::share_mutex保护数据结构](https://github.com/xuyicpp/multi_threading/blob/master/chapter03/example3_13.cpp)
## 第4章 同步并发操作
- 等待事件
使用C++标准库提供的工具来等待事件本身。std::condition_variable的std::condition_variable_any后者可以与任何互斥元一起工作所以有额外代价的可能。
std::condition_variable可以调用notify_one()和notify_all()。然后std::condition_variable还可以wait(lk,[this]{return !data_queue.empty();}),这里的lk是unique_lock方便后面条件不满足的时候解锁满足时开锁。
[清单4.1 使用std::condition_variable等待数据](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_01.cpp)
使用条件变量建立一个线程安全队列:[清单4.2 std::queue接口](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_02.cpp)、[清单4.4 从清单4.1中提取push()和wait_and_pop()](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_04.cpp)。
- 使用future来等待一次性事件
在一个线程不需要立刻得到结果的时候你可以使用std::async来启动一个异步任务。std::async返回一个std::future对象而不是给你一个std::thread对象让你在上面等待std::future对象最终将持有函数的返回值当你需要这个值时只要在future上调用get(),线程就会阻塞知道future就绪然后返回该值。
[清单4.6 使用std::future获取异步任务的返回值](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_06.cpp)
std::async允许你通过将额外的参数添加到调用中来将附加参数传递给函数这与std::thread是同样的方式。
[清单4.7 使用std::async来将参数传递给函数](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_07.cpp)
std::packaged_task<>将一个future绑定到一个函数或可调用对象上。当std::packaged_task<>对象被调用时它就调用相关联的函数或可调用对象并且让future就绪将返回值作为关联数据存储。
[清单4.9 使用std::packaged_task在GUI线程上运行代码](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_09.cpp)
std::promise<T>提供一种设置值类型T方式它可以在这之后通过相关联的std::future<T>对象进行读取。
[清单4.10 使用promise在单个线程中处理多个链接](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_10.cpp)这个有点像select,或者poll。
同时还要为future保存异常以及使用share_future等待来自多个线程。
- 有时间限制的等待
1.基于时间段的超时。2.基于时间点的超时。
[清单4.11 等待一个具有超时的条件变量](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_11.cpp)
- 使用操作的同步来简化代码
解决同步问题的范式函数式编程其中每个任务产生的结果完全依赖于它的输入而不是外部环境以及消息传递ATM状态机线程通信通过状态发送一部消息来实现的。
[清单4.13 使用future的并行快速排序](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_13.cpp)、
[清单4.15 ATM逻辑类的简单实现](https://github.com/xuyicpp/multi_threading/blob/master/chapter04/example4_15.cpp)。
## 第5章 C++内存模型和原子类型上操作
本章介绍了C++11内存模型的底层细节以及在线程间提供同步基础的原子操作。这包括了由std::atomic<>类模板的特化提供的基本原子类型由std::atomic<>主模板提供的泛型原子接口,在这些类型上的操作,以及各种内存顺序选项的复杂细节。
我们还看了屏障,以及它们如何通过原子类型上的操作配对,以强制顺序。最后,我们回到开头,看了看原子操作是如何用来在独立线程上的非原子操作之间强制顺序的。
在原子类型上的每一个操作均具有一个可选的内存顺序参数,它可以用来指定所需的内存顺序语义。
- 存储(store)操作可以包括memory_order_relaxed、memory_order_release或memory_order_seq_cst顺序。
- 载入(load)操作可以包括memory_order_relaxed、memory_order_consume、memory_order_acquire或memory_order_seq_cst顺序。
- 读-修改-写(read-modify-write)操作可以包括memory_order_relaxed、memory_order_consume、memory_order_acquire、memory_order_release、memory_order_acq_rel或memory_order_seq_cst顺序。
所有操作的默认顺序为memory_order_seq_cst。
原子操作的内存顺序的三种模型:
- 顺序一致顺序(sequentially consistent):(memory_order_seq_cst):[清单5.4 顺序一致隐含着总体顺序](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_04.cpp)。
- 松散顺序(relaxed):(memory_order_relaxed):[清单5.6 多线程的松散操作](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_06.cpp)。
- 获取-释放顺序(acquire-release):(memory_order_consume、memory_order_acquire、memory_order_release和memory_order_acq_rel):[清单5.9 使用获取和释放顺序的传递性同步](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_09.cpp)、[清单5.10 使用std::memory_order_consume同步数据(原子载入操作指向某数据的指针)](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_10.cpp)
synchronizes-with(与同步):
- 在原子变量的载入和来自另一个线程的对该原子变量的载入之间建立一个synchronizes-with关系[清单5.11 使用原子操作从队列中读取值](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_11.cpp)
- 在一个线程中释放屏障在另一个线程中获取屏障从而实现synchronizes-with关系[清单5.12 松散操作可以使用屏障来排序](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_12.cpp)
happens-before(发生于之前):传递性如果A线程发生于B线程之前并且B线程发生于C之前则A线程间发生于C之前。
- [清单5.8 获取-释放操作可以在松散操作中施加顺序](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_08.cpp)
- [清单5.13 在非原子操作上强制顺序](https://github.com/xuyicpp/multi_threading/blob/master/chapter05/example5_13.cpp)
## 第六章 设计基于锁的并发数据结构
为并发存取设计数据结构时,需要考虑两方面:
1、保证存取是安全的
- 保证当数据结构不变性被别的线程破坏时的状态不被任何别的线程看到。
- 注意避免数据结构接口所固有的竞争现象,通过为完整操作提供函数,而不是提供操作步骤。
- 注意当出现例外时,数据结构是怎样来保证不变性不被破坏的。
- 当使用数据结构时,通过限制锁的范围和避免使用嵌套锁,来降低产生死锁的机会。
2、实现真正的并发存取
- 锁的范围能否被限定,使得一个操作的一部分可以在锁外被执行?
- 数据结构的不同部分能否被不同的互斥元保护?
- 是否所有操作需要同样级别的保护?
- 数据结构的一个小改变能否在不影响操作语义情况下提高并发性的机会?
一些通用的数据结构(栈、队列、哈希映射以及链表),考虑了如何在设计并发存取的时候应用上述设计准则来实现他们,使用锁来保护数据并阻止数据竞争。
- 使用锁的线程安全栈
[清单6.1 线程安全栈的类定义](https://github.com/xuyicpp/multi_threading/blob/master/chapter06/example6_01.cpp)
- 使用细粒度锁和条件变量的线程安全队列
[清单6.7 使用锁和等待的线程安全队列:内部与接口](https://github.com/xuyicpp/multi_threading/blob/master/chapter06/example6_07.cpp)
- 一个使用锁的线程安全查找表
[清单6.11 线程安全查找表](https://github.com/xuyicpp/multi_threading/blob/master/chapter06/example6_11.cpp)
- 一个使用锁的线程安全链表
[清单6.13 支持迭代的线程安全链表](https://github.com/xuyicpp/multi_threading/blob/master/chapter06/example6_13.cpp)
## 第七章 设计无锁的并发数据结构
- 为无需使用锁的并发而设计的数据结构的实现
- 在无锁数据结构中管理内存的技术
- 有助于编写无锁数据结构的简单准则
### 定义
使用互斥元条件变量以及future来同步数据的算法和数据结构被称为阻塞(blocking)的算法和数据结构。不使用阻塞库函数的数据结构和算法被称为非阻塞(nonblocking)的。但是,并不是所有的数据结构都是无锁(lock-free)的。
[清单7.1 使用std::atomic_flag的自旋锁互斥元的实现](https://github.com/xuyicpp/multi_threading/blob/master/chapter07/example7_01.cpp)这段代码,没有阻塞调用。然而,它并非无锁的。它仍然是一个互斥元,并且一次仍然只能被一个线程锁定。
对于有资格称为无锁的数据结构,就必须能够让多余一个线程可以并发地访问次数据结构。
无等待的数据结构是一种无锁的数据结构,并且有着额外的特性,每个访问数据结构的线程都可以在有限数量的步骤内完成它的操作,而不用管别的线程的行为。
### 无锁数据结构的优点与缺点
优点:
- 1.实现最大程度的并发。
- 2.健壮性:当一个线程在持有锁的时候终止,那个数据结构就永远被破坏了。但是如果一个线程在操作无锁数据结构时终止了,就不会丢失任何数据,除了此线程的数据之外,其他线程可以继续正常执行。
缺点:
- 1.无锁数据结构时不会发生死锁的,尽管有可能存在活锁。活锁会降低性能而不会导致长期的问题,但是也是需要注意的事情。根据定义,无等待的代码无法忍受活锁,因为它执行操作的步骤数通常是有上限的。另一方面,这种算法比别的算法更复杂,并且即使当没有线程存取数据结构的时候也需要执行更多的步骤。
- 2.它可能降低整体的性能。1、原子操作可能比非原子操作要慢很多。2、与基于锁数据结构的互斥元锁代码相比无锁数据结构中需要更多的原子操作。3、硬件必须在存取同样的原子变量相关的乒乓缓存可能会成为一种显著的性能消耗。
总结:
- 选择有锁无锁的数据结构之前,比较,是否为最坏等待时间,平均等待时间,总的执行时间......是很重要的。
### 无锁数据结构的例子
从清单7.2-清单7.12不用锁的线程安全栈从清单7.13-清单7.21无锁线程安全队列。
(这里不是看的很懂以后有机会再补充)。另外哑元结点是一个和有意思的概念。
### 编写无锁数据结构的准则
- 使用std::memory_order_seq_cst作为原型(先用顺序一致顺序跑通,再来其他的骚操作)
- 使用无锁内存回收模式(1.等待直到没有线程访问该数据结构并且删除所有等待删除的对象。2.使用风险指针来确定线程正在访问一个特定的对象。3.引用计数对象,只有直到没有显著的引用时才删除它们。)
- 当心ABA问题就是线程1比较/交换操作原子x发现它的值是A然后阻塞然后线程2改成B然后线程3改回了A(并且恰好使用了相同的地址),线程1比较/交换成功。破坏了数据结构。
- 解决ABA问题的方法就是在变量x使用一个ABA计数器。使用空闲表或者回收结点而不是将它返回给分配器使ABA常见。
- 识别忙于等待的循环以及辅助其他线程(数据成员变原子,并使用比较/交换操作设置它)
## 第8章 设计并发代码
### 在线程间划分工作的技术
- 处理开始前在线程间划分数据
- 递归地划分数据
- 以任务类型划分工作
### 影响并发代码性能的因素
- 有多少个处理器
- 数据竞争和乒乓缓存处理器很多需要互相等待称为高竞争。在如下的循环中counter的数据在各处理器的缓存间来回传递。这被称为乒乓缓存(cache ping-pong),而且会严重影响性能。
```
std::atomic<unsigned long> counter(0);
void processing_loop()
{
while(counter.fetch_add(1,std::memory_order_relaxed)<100000000)
{
do_something();
}
}
```
- 假共享:处理器缓存的最小单位通常不是一个内存地址,而是一小块缓存线(cache line)的内存。这些内存块一般大小为32 ~ 64字节取决于具体的处理器。这个缓存线是两者共享的然而其中的数据并不共享因此称为假共享(false sharing)。
- 数据应该多紧密
- 过度订阅和过多的任务切换
### 为多线程性能设计数据结构
为多线程性能设计你的数据结构时:竞争、假共享以及数据接近。
- 为复杂操作划分数组元素
- 其他数据结构中的数据访问方式
### 为并发设计时的额外考虑
- 并行算法中的异常安全1.用对象的析构函数中检查2.STD::ASYNC()的异常安全
- 可扩展性和阿姆达尔定律:简单来说就是设计最大化并发
- 用多线程隐藏延迟
- 用并发提高响应性
### 在实践中设计并发代码
- std::for_each的并行实现:[清单8.7 std::for_each的并行版本](https://github.com/xuyicpp/multi_threading/blob/master/chapter08/example8_07.cpp)、[清单8.8 使用std::async的std::for_each的并行版本](https://github.com/xuyicpp/multi_threading/blob/master/chapter08/example8_08.cpp)
- std::find的并行实现:[清单8.9 并行find算法的一种实现](https://github.com/xuyicpp/multi_threading/blob/master/chapter08/example8_09.cpp)、[清单8.10 使用std::async的并行查找算法的实现](https://github.com/xuyicpp/multi_threading/blob/master/chapter08/example8_10.cpp)
- std::partial_sum的并行实现:[清单8.11 通过划分问题来并行计算分段的和](https://github.com/xuyicpp/multi_threading/blob/master/chapter08/example8_11.cpp)、[清单8.13 通过成对更新的partial_sum的并行实现](https://github.com/xuyicpp/multi_threading/blob/master/chapter08/example8_13.cpp)
- 屏障(barrier):一种同步方法使得线程等待直到要求的线程已经到达了屏障。[清单8.12 一个简单的屏障类](https://github.com/xuyicpp/multi_threading/blob/master/chapter08/example8_12.cpp)
## 第9章 高级线程管理
本章,我们考虑了许多“高级的“线程管理方法:线程池和中断线程。
- [清单9.1 简单的线程池](https://github.com/xuyicpp/multi_threading/blob/master/chapter09/example9_01.cpp)
- [清单9.9 interruptible_thread的基本实现](https://github.com/xuyicpp/multi_threading/blob/master/chapter09/example9_09.cpp)
你已经看到使用本地工作队列如何减少同步管理以及潜在提高线程池的吞吐量,
- [清单9.6 使用本地线程工作队列的线程池](https://github.com/xuyicpp/multi_threading/blob/master/chapter09/example9_06.cpp)
并且看到当等待子任务完成时如何运行队列中别的任务来减少发生死锁的可能性。
- [清单9.8 使用工作窃取的线程池](https://github.com/xuyicpp/multi_threading/blob/master/chapter09/example9_08.cpp)
我们也考虑了许多方法来允许一个线程中断另一个线程的处理,例如使用特殊中断点
```
void interruption_point()
{
if(this_thread_interrupt_flag.is_set())
{
throw thread_interrupted();
}
}
```
和如何将原本会被中断阻塞的函数变得可以被中断。
```
template<typename T>
void interruptible_wait(std::future<T>& uf)
{
//这会一直等到要么中断标志被设置要么future已经准备好了但是每次在future上执行阻塞要等待1ms。
while(!this_thread_interrupt_flag.is_set())
{
if(uf.wait_for(lk.std::chrono::miliseconds(1)==std::future_status::ready))
break;
}
interruption_point();
}
```
## 第10章 多线程应用的测试与调试
### 并发相关错误的类型
不必要的阻塞
- 死锁
- 活锁
- 在I/O或外部输入上的阻塞
竞争条件
- 数据竞争
- 破坏不变量
- 生存期问题
### 定位并发相关的错误的技巧
#### 审阅代码以定位潜在的错误
- 该线程载入的数据是否有效?该数据是够已经被其他线程修改了?
- 如果你假设其他线程可能正在修改该数据,那么可能会导致什么样的后果以及如何保证这样的事情永不发生?
#### 通过测试定位并发相关的错误
#### 可测试性设计
- 每个函数功能和类的划分清晰明确
- 函数扼要简洁
- 你的测试代码可以完全控制你的被测试代码的周围的环境
- 被测试的需要特定操作的代码应该集中在一块而不是分散在整个系统中。
- 在你写测试代码之前你要先考虑如何测试代码
#### 多线程测试技术
- 暴力测试(穷举法)
- 组合仿真测试
- 使用特殊的库函数来检测测试暴露出的问题

View File

@@ -0,0 +1,15 @@
#include <iostream>
#include <thread>
//join的作用是让主线程等待直到该子线程执行结束示例
//需要注意的是线程对象执行了join后就不再joinable了所以只能调用join一次。
void hello()
{
std::cout<<"Hello Concurrent World\n";
}
int main()
{
std::thread t(hello);
t.join();
}

View File

@@ -0,0 +1,20 @@
struct func
{
int& i;
func(int& i_):i(i_) {}
void operator() ()
{
for(unsigned j = 0; j < 1000000; ++j)
{
do_something(i); //对悬空引用可能的访问
}
}
};
void oops()
{
int some_local_state = 0;
func my_function(some_local_state);
std::thread my_thread(my_func);
my_thread.detach(); //不等待线程完成
} //新的线程可能仍在运行

View File

@@ -0,0 +1,29 @@
struct func
{
int& i;
func(int& i_):i(i_) {}
void operator() ()
{
for(unsigned j = 0; j < 1000000; ++j)
{
do_something(i); //对悬空引用可能的访问
}
}
};
void f()
{
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
}
catch(...)
{
t.join; //异常中断,局部函数的线程在函数退出前结束
throw;
}
t.join; //正常结束,局部函数的线程在函数退出前结束
}

View File

@@ -0,0 +1,30 @@
//使用RAII等待线程完成
class thread_guard
{
std::thread& t;
public:
explicit thread_guard(std::thread& t_):
t(t_)
{}
~thread_guard()
{
if(t.joinable())
{
t.join();
}
}
thread_guard(thread_guard const&)=delete;
thread_guard& operator=(thread_guard const&)=delete;
};
struct func; //2_1
void f()
{
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
do_something_in_current_thread();
} //在当前线程的执行到达f末尾时局部对象会按照构造函数的逆序被销毁因此thread_guard对象g首先被销毁。

View File

@@ -0,0 +1,19 @@
//分离线程以处理其他文档
void edit_document(std::string const& filename)
{
open_document_and_display_gui(filename);
while(!done_editing())
{
user_command cmd = get_user_input();
if(cmd.type == open_new_document)
{
std::string const new_name=get_filename_from_user();
std::thread t(edit_document,new_name);
t.detach();
}
else
{
process_user_input(cmd);
}
}
}

View File

@@ -0,0 +1,22 @@
//从函数中返回std::thread,控制权从函数中转移出
std::thread f()
{
void some_function();
return std::thread(some_function);
}
std::thread g()
{
void some_other_function(int);
std::thread t(some_other_function,42);
return t;
}
//控制权从函数中转移进
void f(std::thread t);
void g()
{
void some_function();
f(std::thread(some_function));
std::thread t(some_function);
f(std::move(t));
}

View File

@@ -0,0 +1,28 @@
//scoped_thread和示例用法,一旦所有权转移到该对象其他线程就不就可以动它了,保证退出一个作用域线程完成
class scoped_thread
{
std::thread t;
public:
explicit scoped_thread(std::thread t_):
t(std::move(t_))
{
if(!t.joinable())
throw std::logic_error("No thread");
}
~scoped_thread()
{
t.join();
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};
struct func;
void f()
{
int some_local_state;
scoped_thread t(std::thread(func(some_local_state)));
do_something_in_current_thread();
}

View File

@@ -0,0 +1,12 @@
//生成一批线程并等待它们完成
void do_work(unsigned id);
void f()
{
std::vector<std::thread> threads;
for(unsigned i = 0; i < 20; ++i)
{
threads.push_back(std::thread(do_work,i)); //生成线程
}
std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join)); //轮流在每个线程上调用join()
}

View File

@@ -0,0 +1,40 @@
//使得每个线程具有最小数目的元素以避免过多的线程开销
template<typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumlate(first,last,result);
}
};
template<typename Iterator,typename T>
T parallel_accumlate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length)
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=std::thread::hardware_concurrency();
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::vector<T> results(num_threads);
std::vector<std::thread> thread(num_threads-1);
Iterator block_start=first;
for(unsigned long i = 0; i < (num_threads-1);++i)
{
Iterator block_end = block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(accumulate_block<Iterator,T>(),block_start,block_end,std::ref(results[i]));
block_start=block_end;
}
accumulate_block<Iterator,T>()(block_start,last,results[num_threads-1]);
std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));
return std::accumulate(results.begin(),results.end(),init);
}

View File

@@ -0,0 +1,19 @@
//用互斥元保护列表
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value)
{
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
bool list_contains(int value_to_find)
{
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(),some_list.end(),value_to_find) != some_list.end();
}

View File

@@ -0,0 +1,25 @@
//在比较运算符中每次锁定一个互斥元
class Y
{
private:
int some_detail;
//被mutable修饰的变量(mutable只能由于修饰类的非静态数据成员),将永远处于可变的状态,即使在一个const函数中。
mutable std::mutex m;
int get_detail() const
{
std::lock_guard<std::mutex> lock_a(m);
return some_detail;
}
public:
Y(int sd):some_detail(sd){}
freind bool operator==(Y const& lhs, Y const& rhs)
{
if(&lhs==&rhs)
return true;
int const lhs_value=lhs.get_detail();
int const rhs_value=rhs.get_detail();
return lhs_value==rhs_value;
}
};

View File

@@ -0,0 +1,13 @@
//使用互斥元进行线程安全的延迟初始化
std:shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo()
{
std::unique_lock<std::mutex> lk(resource_mutex);
if(!resource_ptr)
{
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}

View File

@@ -0,0 +1,27 @@
//使用std::call_once的线程安全的类成员延迟初始化
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection=connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_):
connection_details(connection_details_)
{}
void send_data(data_packet const& data)
{
std::call_once(connection_init_flag,&X::open_connection,this);
connection_send_data(data);
}
data_packet receive_data()
{
std::call_once(connection_init_flag,&X::open_connection,this);
return connection.receive_data();
}
};

View File

@@ -0,0 +1,25 @@
//使用boost::share_mutex保护数据结构
#include <map>
#include <string>
#include <mutex>
#include <boost/thread/shared_mutex.hpp>
class dns_entry;
class dns_cache
{
std::map<std::string,dns_entry> entries;
mutable boost::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain) const
{
boost::shared_lock<boost::shared_mutex> lk(entry_mutex);
std::map<std::string,dns_entry>::const_iterator const it = entries.find(domain);
return (it==entries.end())?dns_entry():it->second;
}
void update_or_add_entry(std::string const& domain,dns_entry const& dns_details)
{
std::lock_guard<boost::shared_mutex> lk(entry_mutex);
entries[domain]=dns_details;
}
};

View File

@@ -0,0 +1,36 @@
//意外的传出对受保护数据的引用
class some_data
{
int a;
std::string b;
public:
void do_something();
};
class data_warpper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data); //传递“受保护的”数据到用户提供的函数
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected = &protected_data;
}
data_warpper x;
void foo()
{
x.protected_data(malicious_function); //传入一个恶意函数
unprotected->do_something(); //对受保护的数据进行未受保护的访问
}

View File

@@ -0,0 +1,21 @@
//std::stack 容器适配器的接口
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(const Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&); //这里应该移动构造函数
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
}
//对于共享的stack对象这个调用序列不再安全

View File

@@ -0,0 +1,22 @@
//一个线程安全栈的概要类定义
#include <exception>
#include <memory> //For std::shared_ptr<>
struct empty_stack: std::exception
{
const char* waht() const throw();
};
template<typename T>
class threadsafe_stack
{
public:
threadsafe_stack();
threadsafe_stack(const threadsafe_stack&);
threadsafe_stack& operator=(const threadsafe_stack&) = delete; //赋值运算符被删除了
void push(T new_value);
std::shared_ptr<T> pop();
void pop(T& value);
bool empty() const;
};

View File

@@ -0,0 +1,56 @@
//一个线程安全栈的详细类定义
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template <typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
//mutalbe的中文意思是“可变的易变的”跟constant既C++中的const是反义词。
//在C++中mutable也是为了突破const的限制而设置的。被mutable修饰的变量(mutable只能由于修饰类的非静态数据成员)
//将永远处于可变的状态即使在一个const函数中。
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::share_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack(); //在试着出栈值的时候检查是否为空
std::share_ptr<T> const res(std::make_shared<T>(data.top())); //在修改栈之前分配返回值
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

View File

@@ -0,0 +1,23 @@
//在交换操作中使用std::lock()和std::lock_guard
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs == &rhs)
return;
std::lock(lhs.m,rhs.m); //std::lock函数可以同时锁定两个或更多的互斥元而没有死锁的风险。
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
//额外提供一个adopt_lock给互斥元,沿用互斥元上已有锁的所有权
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
swap(lhs.some_detail,rhs.some_detail);
}
}

View File

@@ -0,0 +1,40 @@
//使用锁层次来避免死锁
hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(5000);
int do_low_level_stuff();
int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}
void high_level_stuff(int some_param);
void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
high_level_stuff(low_level_func());
}
void thread_a() //遵守规则,运行良好
{
high_level_func();
}
hierarchical_mutex other_mutex(100);
void do_other_stuff();
void other_stuff()
{
high_level_func();
do_other_stuff();
}
//违反层次100<1000
void thread_b()
{
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff();
}

View File

@@ -0,0 +1,48 @@
//简单的分层次互斥元
class hierarchical_mutex
{
std::mutex internal_mutex;
unsigned long const hierarchical_value;
unsigned long previous_hierarchical_value;
//线程局部变量可以在程序中让你为每个线程拥有独立的变量实例用thread_local关键字标记
static thread_local unsigned long this_thread_hierarchical_value;
void check_for_hierarchy_violation()
{
if(this_thread_hierarchical_value <= hierarchical_value)
{
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value()
{
previous_hierarchical_value = this_thread_hierarchical_value;
this_thread_hierarchical_value = hierarchical_value;
}
public:
explicit hierarchical_mutex(unsigned long value):
hierarchical_value(value),
previous_hierarchical_value(0)
{}
void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
void unlock()
{
this_thread_hierarchical_value = previous_hierarchical_value;
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if(!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchical_value(ULONG_MAX);

View File

@@ -0,0 +1,22 @@
//在交换操作中使用std::lock()和std::unique_lock
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_big_object(sd){};
friend void swap(X& lhs, X& rhs)
{
if(& lhs==& rhs)
return;
std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock);
std::lock(lock_a,lock_b);
swap(lhs.some_detail,rhs.some_detail);
}
};

Binary file not shown.

View File

@@ -0,0 +1,31 @@
//清单4.1 使用std::condition_variable等待数据
std::mutex mut;
std::queue<data_chunk> data_queue;
std::condition_variable data_cond;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
}
void data_processing_thread()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut);
//lambda函数编写一个匿名函数作为表达式的一部分[]作为其引导符
data_cond.wait(lk,[]{return !data_queue.empty();});
data_chunk data=data_queue.front();
data_queue.pop();
lk.unlock();
process(data);
if(is_last_chunk(data))
break;
}
}

View File

@@ -0,0 +1,29 @@
//清单4.2 std::queue接口
template<class T, class Container = std::deque<T> >
class queue
{
public:
explicit queue(const Container&);
explicit queue(Container&& = Container());
template <class Alloc> explicit queue(const Alloc&);
template <class Alloc> queue(const Container&, const Alloc&);
template <class Alloc> queue(Container&&, const Alloc&);
template <class Alloc> queue(queue&&, const Alloc&);
void swap(queue& q);
bool empty() const;
size_type size() const;
T& front();
const T& front() const;
T& back();
const T& back() const;
void push(const T& x);
void push(T&& x);
void pop();
template <class... Args> void emplace(Args&&... args);
};

View File

@@ -0,0 +1,21 @@
//清单4.3 threadsafe_queue的接口
#include <memory> //为了std::shared_ptr
template <typename T>
class threadsafe_queue
{
public:
threadsafe_queue();
threadsafe_queue(const threadsafe_queue&);
threadsafe_queue& operator=(const threadsafe_queue&) = delete; //为了简单起见不允许复制
void push(T new_value);
bool try_pop(T& value);
std::shared_ptr<T> try_pop();
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool empty() const;
};

View File

@@ -0,0 +1,52 @@
//从清单4.1中提取push()和wait_and_pop()
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class threadsafe_queue
{
private:
std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
//这里在[]加入this可以从lambda中访问类成员
data_cond.wait(lk, [this] {return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
};
threadsafe_queue<data_chunk> data_queue;
void data_preparation_thread()
{
while(more_data_to_prepare())
{
data_chunk const data = prepare_data();
data_queue.push(data);
}
}
void data_processing_thread()
{
while(true)
{
data_chunk data;
data_queue.wait_and_pop(data);
process(data);
if(is_last_chunk(data))
break;
}
}

View File

@@ -0,0 +1,73 @@
//使用条件变量的线程安全队列的完整类定义
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template <typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut; //互斥元必须是可变的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
//这里在[]加入this可以从lambda中访问类成员
data_cond.wait(lk, [this] {return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
//模板函数 std::make_shared 可以返回一个指定类型的 std::shared_ptr
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
//模板函数 std::make_shared 可以返回一个指定类型的 std::shared_ptr
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

View File

@@ -0,0 +1,23 @@
//使用std::future获取异步任务的返回值
#include <future>
#include <iostream>
int find_the_answer_to_ltuae();
void do_other_stuff();
int main()
{
std::future<int> the_anwer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<<the_anwer.get()<<std::endl;
}
int find_the_answer_to_ltuae()
{
return (1+1);
}
void do_other_stuff()
{
std::cout<<"do_other_stuff"<<std::endl;
}

View File

@@ -0,0 +1,32 @@
//使用std::async来将参数传递给函数
#include <string>
#include <future>
struct X
{
void foo(int,std::string const&);
std::string bar(std::string const&);
};
X x;
auto f1=std::async(&X::foo,&x,42,"hello"); //调用p->foo(42,"hello"),其中p是&x
auto f2=std::async(&X::bar,x,"goodbye"); //调用tmpx.bar("goodbye"),其中tmpx是x的副本
struct Y
{
double operator()(double);
};
Y y;
auto f3=std::async(Y(),3.141); //调用tmpy(3.131),其中tm
auto f4=std::async(std::ref(y),2.718); //调用y(2.718),
X baz(X&);
std::async(baz,std::ref(x)); //调用baz(x)
class move_only
{
public:
move_only();
move_only(move_only&&);
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;
void operator()();
};//这个类将拷贝构造函数和赋值构造函数都取消了,只留下移动构造函数
auto f5=std::async(move_only()); //调用tmp(),其中tmp是从std::move(move_only())构造的

View File

@@ -0,0 +1,10 @@
//std::packaged_task<>特化的部分类定义
template<>
class packaged_task<std::string(std::vector<char>*,int)>
{
public:
template<typename Callable>
explicit packaged_task(Callable&& f);
std::future<std::string> get_future();
void operator()(std::vector<char>*,int);
};

View File

@@ -0,0 +1,41 @@
//使用std::packaged_task在GUI线程上运行代码
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility> //这里有move函数
std::mutex m;
std::deque<std::packaged_task<void()> > tasks;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
void gui_thread()
{
while(!gui_shutdown_message_received())
{
get_and_process_gui_message();
std::packaged_task<void()> task;
{
std::lock_guard<std::mutex> lk(m);
if(tasks.empty())
continue;
task=std::move(tasks.front()); //move将对象的状态或对象转移到另一个对象原来那个对象就为空了。
tasks.pop_front();
}
task();
}
}
std::thread gui_bg_thread(gui_thread);
template <typename Func>
std::future<void> post_task_for_gui_thread(Func f)
{
std::packaged_task<void()> task(f);
std::future<void> res=task.get_future();
std::lock_guard<std::mutex> lk(m);
tasks.push_back(std::move(task));
return res;
}

View File

@@ -0,0 +1,26 @@
//使用promise在单个线程中处理多个连接
#include <future>
void process_connections(connection_set& connections)
{
while(!done(connections))
{
for(connection_iterator connection=connections.begin(),end=connections.end();
connection!=end;
++connection)
{
if(connection->has_incoming_data())
{
data_packet data=connection->incoming();
std::promise<payload_type>& p=connection->get_promise(data.id);
p.set_value(data.payload);
}
if(connection->has_outcoming_data())
{
outgoing_packet data=connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true);
}
}
}
}

View File

@@ -0,0 +1,19 @@
//等待一个具有超时的条件变量
#include <condition_variable>
#include <mutex>
#include <chrono>
std::condition_variable cv;
bool done;
std::mutex m;
bool wait_loop()
{
auto const timeout=std::chrono::steady_clock::now()+std::chrono::milliseconds(500);
std::unique_lock<std::mutex> lk(m);
while(!done)
{
if(cv.wait_until(lk,timeout)==std::cv_status::timeout)
break;
}
return done;
}

View File

@@ -0,0 +1,24 @@
//快速排序的顺序实现
template<typename T>
std::list<T> sequential_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),divide_point);
auto new_lower(sequential_quick_sort(std::move(lower_part)));
auto new_higher(sequential_quick_sort(std::move(input)));
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower);
return result;
}

View File

@@ -0,0 +1,25 @@
//使用future的并行快速排序
template<typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
std::list<T> result;
result.splice(result.begin(),input,input.begin());
T const& pivot=*result.begin();
auto divide_point=std::partition(input.begin(),input.end(),[&](T const& t){return t<pivot;});
std::list<T> lower_part;
lower_part.splice(lower_part.end(),input,input.begin(),divide_point);
std::future<std::list<T> > new_lower(std::async(&parallel_quick_sort<T>,std::move(lower_part)));
auto new_higher(parallel_quick_sort(std::move(input)));
//splice剪切加粘贴
result.splice(result.end(),new_higher);
result.splice(result.begin(),new_lower.get());
return result;
}

View File

@@ -0,0 +1,12 @@
//一个简单的spawn_task的实现
//相比于使用std::async只有在你确实知道将要做什么并且希望想要通过线程池建立的方式进行完全掌控和执行任务的时候
//才值得首选这种方法
template <typename F,typename A>
std::future<std::result_of<F(A&&)>::type> spawn_task(F&& f,A&& a)
{
typedef std::result_of<F(A&&)>::type result_type;
std::packaged_task<result_type> res(task.get_future());
std::thread t(std::move(task),std::move(a));
t.detach();
return res;
}

View File

@@ -0,0 +1,46 @@
//ATM逻辑类的简单实现
struct card_inserted
{
std::string account;
};
class atm
{
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
void (atm::*state)();
std::string account;
std::string pin;
void waiting_for_card()
{
interface_hardware.send(display_enter_card());
//wait如果接收到消息不匹配指定的类型他将被丢弃。
incoming.wait()
.handle<card_inserted>([&](card_inserted const& msg)
{
account=msg.account;
pin="";
interface_hardware.send(display_enter_pin());
state=&atm::getting_pin;
}
);
}
void getting_pin();
public:
void run()
{
state=&atm::waiting_for_card;
try
{
for(;;)
{
(this->*state)();
}
}
catch(messaging::close_queue const&)
{
}
}
};

View File

@@ -0,0 +1,36 @@
//简单ATM实现的getting_pin状态函数
void atm::getting_pin()
{
incoming.wait()
//每一次对handle()的调用将消息类型指定为模板参数,然后
//接受特定消息类型作为参数的Lambda函数。
.handle<digit_pressed>(
[&](digit_pressed const& msg)
{
unsigned const pin_length=4;
pin+=msg.digit;
if(pin.length()==pin_length)
{
bank.send(verify_pin(account,pin,incoming));
state=&atm::verifying_pin;
}
}
)
.handle<clear_last_prossed>(
[&](clear_last_prossed const& msg)
{
if(!pin.empty())
{
pin.resize(pin.length()-1);
}
}
)
.handle<cancel_pressed>(
[&](cancel_pressed const& msg)
{
state=&atm::done_processing;
}
);
}

Binary file not shown.

View File

@@ -0,0 +1,18 @@
//使用std::atomic_flag的自旋锁互斥实现
class spinlock_mutex
{
std::atomic_flag flag;
public:
spinlock_mutex():
flag(ATOMIC_FLAG_INIT)
{}
void lock()
{
while(flag.test_and_set(std::memory_order_acquire));
}
void unlock()
{
flag.clear(std::memory_order_release);
}
};
//为了锁定互斥元循环执行test_and_set()知道旧值为false指示这个线程将值设为true。解锁互斥元就是简单地清除标志。

View File

@@ -0,0 +1,21 @@
//从不同的线程中读取和写入变量
#include <vector>
#include <atomic>
#include <iostream>
std::vector<int> data;
std::atomic<bool> data_ready(false);
void reader_thread()
{
while(!data_ready.load())
{
std::this_thread::sleep(std::milliseconds(1));
}
std::cout<<"The answer="<<data[0]<<"\n";
}
void writer_thread()
{
data.push_back(42);
data_ready=true;
}

View File

@@ -0,0 +1,18 @@
//一个函数调用的参数的估计顺序是未指定的
#include <iostream>
void foo(int a,int b)
{
std::cout<<a<<","<<b<<std::endl;
}
int get_num()
{
static int i = 0;
return ++i;
}
int main()
{
foo(get_num(),get_num()); //对get_num()的调用是无序的
}

View File

@@ -0,0 +1,47 @@
//顺序一直隐含着总体顺序
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x()
{
x.store(true,std::memory_order_seq_cst);
}
void write_y()
{
y.store(true,std::memory_order_seq_cst);
}
void read_x_then_y()
{
while(!x.load(std::memory_order_seq_cst));
if(y.load(std::memory_order_seq_cst))
++z;
}
void read_y_then_x()
{
while(!y.load(std::memory_order_seq_cst));
if(x.load(std::memory_order_seq_cst))
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load()!=0);
}

View File

@@ -0,0 +1,32 @@
//放松操作有极少数的排序要求
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed);
y.store(true,std::memory_order_relaxed);
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed));
if(x.load(std::memory_order_relaxed))
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0);
}

View File

@@ -0,0 +1,83 @@
//多线程的松散操作
#include <thread>
#include <atomic>
#include <iostream>
std::atomic<int> x(0),y(0),z(0);
std::atomic<bool> go(false);
unsigned const loop_count=10;
struct read_values
{
int x,y,z;
};
read_values values1[loop_count];
read_values values2[loop_count];
read_values values3[loop_count];
read_values values4[loop_count];
read_values values5[loop_count];
void increment(std::atomic<int>* var_to_inc,read_values* values)
{
while(!go) //旋转等待信号
std::this_thread::yield();
for (unsigned i = 0; i < loop_count; ++i)
{
values[i].x=x.load(std::memory_order_relaxed);
values[i].y=y.load(std::memory_order_relaxed);
values[i].z=z.load(std::memory_order_relaxed);
var_to_inc->store(i+1,std::memory_order_relaxed);
std::this_thread::yield();
//std::this_thread::yield() 是让当前线程让渡出自己的CPU时间片(给其他线程使用)
//std::this_thread::sleep_for() 是让当前休眠”指定的一段”时间.
}
}
void read_vals(read_values* values)
{
while(!go) //旋转等待信号
std::this_thread::yield();
for (unsigned i = 0; i < loop_count; ++i)
{
values[i].x=x.load(std::memory_order_relaxed);
values[i].y=y.load(std::memory_order_relaxed);
values[i].z=z.load(std::memory_order_relaxed);
std::this_thread::yield();
}
}
void print(read_values* v)
{
for (unsigned i = 0; i < loop_count; ++i)
{
if(i)
std::cout<<",";
std::cout<<"("<<v[i].x<<","<<v[i].y<<","<<v[i].z<<")";
}
std::cout<<std::endl;
}
int main()
{
std::thread t1(increment,&x,values1);
std::thread t2(increment,&y,values2);
std::thread t3(increment,&z,values3);
std::thread t4(read_vals,values4);
std::thread t5(read_vals,values5);
go=true; //开始执行主循环的信号
t5.join();
t4.join();
t3.join();
t2.join();
t1.join();
print(values1); //打印最终的值
print(values2);
print(values3);
print(values4);
print(values5);
}

View File

@@ -0,0 +1,47 @@
//获取-释放并不意味着总体排序
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x()
{
x.store(true,std::memory_order_release);
}
void write_y()
{
y.store(true,std::memory_order_release);
}
void read_x_then_y()
{
while(!x.load(std::memory_order_acquire));
if(y.load(std::memory_order_acquire))
++z;
}
void read_y_then_x()
{
while(!y.load(std::memory_order_acquire));
if(x.load(std::memory_order_acquire))
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load()!=0);
}

View File

@@ -0,0 +1,36 @@
//获取-释放操作可以在松散操作中施加顺序
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
//对x的存储发生在对y的存储之前因为他们在同一个线程
x.store(true,std::memory_order_relaxed); //旋转等待y被设为true
//relaxed松散顺序release获得-释放顺序
y.store(true,std::memory_order_release);
}
void read_y_then_x()
{
//对y的加载将会看到由存储写下的true。因为存储使用memory_order_release并且载入
//使用memory_order_acquire,存储与载入同步。
while(!y.load(std::memory_order_aquire));
if(x.load(std::memory_order_relaxed))
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0);
}

View File

@@ -0,0 +1,25 @@
//使用获取和释放顺序的传递性同步(利用了线程间happen-before的定义)
std::atomic<int> data[5];
std::atomic<bool> sync1(false),sync2(false);
void thread_1()
{
data[0].store(42,std::memory_order_relaxed);
data[1].store(97,std::memory_order_relaxed);
data[2].store(17,std::memory_order_relaxed);
sync1.store(true,std::memory_order_release); //设置sync1
}
void thread_2()
{
while(!sync1.load(std::memory_order_acquire)); //循环直到sync1被设置
sync2.store(true,std::memory_order_release); //设置sync2
}
void thread_3()
{
while(!sync2.load(std::memory_order_acquire)); //循环直到sync2被设置
assert(data[0].load(std::memory_order_relaxed)==42);
assert(data[1].load(std::memory_order_relaxed)==97);
assert(data[2].load(std::memory_order_relaxed)==17);
}

View File

@@ -0,0 +1,36 @@
//使用std::memory_order_consume同步数据用于在原子操作载入指向某数据的指针的场合
struct X
{
int i;
std::string s;
};
std::atomic<X*> p;
std::atomic<int> a;
void create_x()
{
X* x=new X;
x->i=42;
x->s="hello";
a.store(99,std::memory_order_relaxed);
p.store(x,std::memory_order_release);
}
void use_x()
{
X* x;
while(!(x=p.load(std::memory_order_consume))) //对p的存储只发生在依赖p的载入值得表达式之前
std::this_thread::sleep(std::chrono::microseconds(1));
assert(x->i==42);
assert(x->s=="hello");
assert(a.load(std::memory_order_relaxed)==99);
}
int main()
{
std::thread t1(create_x);
std::thread t2(use_x);
t1.join();
t2.join();
}

View File

@@ -0,0 +1,43 @@
//使用原子操作从队列中读取值
#include <atomic>
#include <thread>
std::vector<int> queue_data;
std::atomic<atomic> count;
void populate_queue()
{
unsigned const number_of_items=20;
queue_data.clear();
for (unsigned i = 0; i < number_of_items; ++i)
{
queue_data.push_back(i);
}
count.store(number_of_items,std::memory_order_release); //最初的存储
}
void consume_queue_items()
{
while(true)
{
int item_index;
//fetch_sub()是一个具有memory_order_acquire语义的读取并且存储具有memory_order_release语义所以存储与载入同步
if((item_index=count.fetch_sub(1,std::memory_order_acquire))<=0) //一个读-修改-写操作
{
wait_for_more_items(); //等待更多的项目
continue;
}
process(queue_data[item_index-1]); //读取queue_data是安全的
}
}
int main()
{
std::thread a(populate_queue);
std::thread b(consume_queue_items);
std::thread c(consume_queue_items);
a.join();
b.join();
c.join();
}

View File

@@ -0,0 +1,35 @@
//松散操作可以使用屏障来排序
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release); //释放屏障
y.store(true,std::memory_order_relaxed);
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed));
std::atomic_thread_fence(std::memory_order_acquire); //获取屏障
//对x的存储发生在从x的载入之前所以读取的值必然是true
if(x.load(std::memory_order_relaxed))
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0);
}

View File

@@ -0,0 +1,35 @@
//在非原子操作上强制顺序
#include <atomic>
#include <thread>
#include <assert.h>
bool x=false; //x现在是一个普通的非原子变量
std::atomic<bool> y;
std::atomic<int> z;
void write_x_then_y()
{
x=true; //1.在屏障前存储x
std::atomic_thread_fence(std::memory_order_release);
y.store(true,std::memory_order_relaxed); //2.在屏障后存储y
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); //等待到你看见来自2的写入
std::atomic_thread_fence(std::memory_order_acquire);
if(x) //将读取1写入的值
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); //此断言不会触发
}

View File

@@ -0,0 +1,42 @@
//线程安全栈的类定义
#include <exception>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template <typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=std::move(data.top());
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};

View File

@@ -0,0 +1,62 @@
//使用条件变量的线程安全队列的完整定义
template <typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(data));
data_cond.notify_one();
}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=std::move(data_queue.front());
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=std::move(data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
retrun data_queue.empty();
}
};

View File

@@ -0,0 +1,63 @@
//包含std::shared_ptr<>实例的线程安全队列
template <typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T> > data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue();
{}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=std::move(*data_queue.front());
data_queue.pop();
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=std::move(*data_queue.front());
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res=data_queue.front();
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res=data_queue.front();
data_queue.pop();
return res;
}
void push(T new_value)
{
std::shared_ptr<T> data(std::make_shared<T>(std::move(new_value)));
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};

View File

@@ -0,0 +1,50 @@
//一种简单的单线程队列实现
template <typename T>
class queue
{
private:
struct node
{
T data;
std::unique_ptr<node> next;
node(T data_);
data(std::move(data_))
{}
};
std::unique_ptr<node> head;
node* tail;
public:
queue()
{}
queue(const queue& other)=delete;
queue& operator=(const queue& other)=delete;
std::share_ptr<T> try_pop()
{
if(!head)
{
return std::share_ptr<T>();
}
std::share_ptr<T> const res(std::make_shared<T>(std::move(head->data)));
std::unique_ptr<node> const old_head=std::move(head);
head=std::move(old_head->next);
return res;
}
void push(T new_value)
{
std::unique_ptr<node> p(new node(std::move(new_value)));
node* const new_tail=p.get();
if(tail)
{
tail->next=std::move(p);
}
else
{
head=std::move(p);
}
tail=new_tail;
}
};

View File

@@ -0,0 +1,44 @@
//使用傀儡结点的简单队列
template <typename T>
class queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::unique_ptr<node> head;
node* tail;
public:
queue():
head(new node),tail(gead.get())
{}
queue(const queue& other)=delete;
queue& operator=(const queue& other)=delete;
std::shared_ptr<T> try_pop()
{
if(head.get()==tail)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(head->data);
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next);
return res;
}
void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
tail->data=new_data;
node* const new_tail=p.get();
tail->next=std::move(p);
tail=new_tail;
}
};

View File

@@ -0,0 +1,60 @@
//使用细粒度锁的线程安全队列
template <typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return nullptr;
}
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}
public:
threadsafe_queue():
head(new node),tail(head.get())
{}
threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head=pop_head();
return old_head?old_head->data:std::shared_ptr<T>();
}
void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
node* const new_tail=p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data=new_data;
tail->next=std::move(p);
tail=new_tail;
}
};

View File

@@ -0,0 +1,30 @@
//使用锁和等待的线程安全队列:内部与接口
template <typename T>
class threadsafe_queue
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;
public:
threadsafe_queue():
head(new node),tail(head.get())
{}
threadsafe_queue(const threadsafe_queue& other)=delete;
threadsafe_queue& operator=(const threadsafe_queue& other)=delete;
std::shared_ptr<T> try_pop();
bool try_pop(T& value);
std::shared_ptr<T> wait_and_pop();
void wait_and_pop(T& value);
void push(T new_value);
void empty();
};

View File

@@ -0,0 +1,15 @@
//使用锁和等待的线程安全队列:push新值
template <typename T>
void threadsafe_queue<T>::push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data=new_data;
node* const new_tail=p.get();
tail->next=std::move(p);
tail=new_tail;
}
data_cond.notify_one();
}

View File

@@ -0,0 +1,49 @@
//使用锁和等待的线程安全队列:wait_and_pop()
template <typename T>
class threadsafe_queue
{
private:
node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> old_head=std::move(head);
head=std::move(old_head->next);
return old_head;
}
std::unique_ptr<std::mutex> wait_for_data()
{
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock,[&]{return head.get()!=get_tail();});
return std::move(head_lock);
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T& value)
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
value=std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr<T> wait_and_pop()
{
std::unique_ptr<node> const old_head=wait_pop_head();
retun old_head->data;
}
void wait_and_pop(T& value)
{
std::unique_ptr<node> const old_head=wait_pop_head(value);
}
};

View File

@@ -0,0 +1,45 @@
//使用锁和等待的线程安全队列try_pop()和empty()
template <typename T>
class threadsafe_queue
{
private:
std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}
std::unique_ptr<node> try_pop_head(T& value)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get()==get_tail())
{
return std::unique_ptr<node>();
}
value=std::move(*head->data);
return pop_head();
}
public:
std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head=try_pop_head();
return old_head?old_head->data:std::shared_ptr<T>();
}
bool try_pop(T& value)
{
std::unique_ptr<node> const old_head=try_pop_head(value);
return old_head; //不知道这里是不是隐式转换
}
bool empty()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head.get()==get_tail());
}
};

View File

@@ -0,0 +1,93 @@
//线程安全查找表
template <typename Key,typename Value,typename Hash=std::hash<Key> >
class threadsafe_lookup_table
{
private:
class bucket_type
{
private:
typedef std::pair<Key,Value> bucket_value;
typedef std::list<bucket_value> bucket_data;
typedef typename bucket_data::iterator bucket_iterator;
bucket_data data;
mutable boost::shared_mutex mutex;
bucket_iterator find_entry_for(Key const& key) const
{
return std::find_if(data.begin(),data.end()
,[&](bucket_value const& item){return item.first==key;});
}
public:
Value value_for(Key const& key,Value const& default_value) const
{
boost::shared_lock<boost::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
return (found_entry==data.end())?default_value:found_entry->second;
}
void add_or_update_mapping(Key const& key,Value const& value)
{
std::unique_lock<boost::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry==data.end())
{
data.push_back(bucket_value(key,value));
}
else
{
found_entry->second=value;
}
}
void remove_mapping(Key const& key)
{
std::unique_lock<boost::shared_mutex> lock(mutex);
bucket_iterator const found_entry=find_entry_for(key);
if(found_entry!=data.end())
{
data.erase(found_entry);
}
}
};
std::vector<std::unique_ptr<bucket_type> > buckets;
Hash hasher;
bucket_type& get_bucket(Key const& key) const
{
std::size_t const bucket_index=hasher(key)%buckets.size();
return *buckets[bucket_index];
}
public:
typedef Key key_type;
typedef Value mapped_type;
typedef Hash hash_type;
threadsafe_lookup_table(unsigned num_buckets=19,Hash const& hasher_=Hash()):
buckets(num_buckets),hasher(hasher_)
{
for(unsigned i=0;i<num_buckets;++i)
{
buckets[i].reset(new bucket_type);
}
}
threadsafe_lookup_table(threadsafe_lookup_table const& other)=delete;
threadsafe_lookup_table& operator=(threadsafe_lookup_table const& other)=delete;
Value value_for(Key const& key,Value const& default_value=Value()) const
{
return get_bucket(key).value_for(key,default_value);
}
void add_or_update_mapping(Key const& key,Value const& value)
{
get_bucket(key).add_or_update_mapping(key,value);
}
void remove_mapping(Key const& key)
{
get_bucket(key).remove_mapping(key);
}
};

View File

@@ -0,0 +1,18 @@
//获取threadsafe_lookup_table的内容作为一个std::map<>
std::map<Key,Value> threadsafe_lookup_table::get_map() const
{
std::vector<std::unique_lock<boost::shared_mutex> > locks;
for(unsigned i=0;i<buckets.size();++i)
{
locks.push_back(std::unique_lock<boost::shared_mutex>(buckets[i].mutex));
}
std::map<Key,Value> res;
for (unsigned i = 0; i < buckets.size(); ++i)
{
for(bucket_iterator it=buckets[i].data.begin(); it!=buckets[i].data.end(); ++it)
{
res.insert(*it);
}
}
return res;
}

View File

@@ -0,0 +1,97 @@
//支持迭代的线程安全链表
template <typename T>
class threadsafe_list
{
struct node
{
std::mutex m;
std::shared_ptr<T> data;
std::unique_ptr<node> next;
node():
next()
{}
node(T const& value):
data(std::make_shared<T>(value))
{}
};
node head;
public:
threadsafe_list()
{}
~threadsafe_list()
{
remove_if([](node const&){return true;}); // remove node
}
threadsafe_list(threadsafe_list const& other)=delete;
threadsafe_list& operator=(threadsafe_list const& other)=delete;
void push_front(T const& value)
{
std::unique_ptr<node> new_node(new node(value));
std::lock_guard<std::mutex> lk(head.m);
new_node->next=std::move(head.next);
head.next=std::move(new_node);
}
template <typename Function>
void for_each(Function f)
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
f(*next->data);
current=next;
lk=std::move(next_lk);
}
}
template <typename Predicate>
std::shared_ptr<T> find_first_if(Predicate p)
{
node* current=&head;
std::unique_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
lk.unlock();
if(p(*next->data))
{
return next->data;
}
current=next;
lk=std::move(next_lk);
}
return std::shared_ptr<T>();
}
template <typename Predicate>
void remove_if(Predicate p)
{
node* current=&head;
std::unoque_lock<std::mutex> lk(head.m);
while(node* const next=current->next.get())
{
std::unique_lock<std::mutex> next_lk(next->m);
if(p(*next->data))
{
std::unique_ptr<node> old_next=std::move(current->next);
current->next=std::move(next->next);
next_lk.unlock();
}
else
{
lk.unlock();
current=next;
lk=std::move(next_lk);
}
}
}
};

View File

@@ -0,0 +1,17 @@
//使用std::atomic_flag的自旋锁互斥元的实现
class spinlock_mutex
{
std::atomic_flag flag;
public:
spinlock_mutex():
flag(ATOMIC_FLAG_INIT)
{}
void lock()
{
while(flag.test_and_set(std::memory_order_acquire));
}
void unlock()
{
flag.clear(std::memory_order_release);
}
};

View File

@@ -0,0 +1,25 @@
//实现不使用锁的线程安全push()
template <typename T>
class lock_free_stack
{
private:
struct node
{
T data;
node* next;
node(T const& data_):
data(data_)
{}
};
public:
void push(T const& data)
{
node* const new_node=new node(data);
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node));
//如果这两个值是一样的那么将head指向new_node。这段代码中使用了比较/交换函数的一部分,
//如果它返回false则表明此次比较没有成功(例如因为另一个线程修改了head)。此时,第一个参数(new_node->next)的值
//将被更新为head当前的值。
}
};

View File

@@ -0,0 +1,30 @@
//缺少结点的无锁栈
template <typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data; //data现在由指针持有
node* next;
node(T const& data_):
data(std::make_shared<T>(data_)) //为新分配的T创建std::shared_ptr
{}
};
std::atomic<node*> head;
public:
void push(T const& data)
{
node* const new_node=new node(data);
new_node->next=head.load();
while(!head.compare_exchange_weak(new_node->next,new_node));
}
std::shared_ptr<T> pop()
{
node* old_head=head.load();
while(old_head && !head.compare_exchange_weak(old_head,old_head->next)); //在解引用之前检查old_head不是一个空指针
return old_head ? old_head->data : std::shared_ptr<T>();
}
};

View File

@@ -0,0 +1,22 @@
//当pop()中没有线程时回收结点
template <typename T>
class lock_free_stack
{
private:
std::atomic<unsigned> threads_in_pop; //原子变量
void try_reclaim(node* old_head); //试着回收内存 7.5有详细的实现
public:
std::shared_ptr<T> pop()
{
++threads_in_pop; //在做任何其他事情前增加计数
node* old_head=head.load();
while(old_head && !head.compare_exchange_weak(old_head,old_head->next));
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data); //如果可能,回收删除的结点
}
try_reclaim(old_head); //从结点中提取数据,而不是复制指针
return res;
}
};

View File

@@ -0,0 +1,59 @@
//引用计数的回收机制try_reclaim
template <typename T>
class lock_free_stack
{
private:
std::atomic<node*> to_be_deleted;
static void delete_nodes(node* nodes)
{
while(nodes)
{
node* next=nodes->next;
delete nodes;
nodes=next;
}
}
void try_reclaim(node* old_head)
{
if(threads_in_pop==1)
{
node* nodes_to_delete=to_be_deleted.exchange(nullptr); //列出将要被删除的结点清单
if(!--threads_in_pop) //是pop()中唯一的线程吗?
{
delete_nodes(nodes_to_delete);
}
else if(nodes_to_delete) //有等待的节点
{
chain_pending_nodes(nodes_to_delete); //将此结点插入到等待删除结点列表的尾部
}
delete old_head; //安全删除刚移动出来的结点
}
else
{
chain_pending_node(old_head);
--threads_in_pop;
}
}
void chain_pending_nodes(node* nodes)
{
node* last=nodes;
while(node* const next=last->next) //跟随下一个指针,链至末尾
{
last=next;
}
chain_pending_nodes(nodes,last);
}
void chain_pending_nodes(nodes* first,node* last)
{
last->next=to_be_deleted;
while(!to_be_deleted.compare_exchange_weak(last->next,first)); //循环以保证last->next正确
}
void chain_pending_node(node* n)
{
chain_pending_nodes(n,n);
}
};

View File

@@ -0,0 +1,37 @@
//使用风险指针的pop()实现
std::shared_ptr<T> pop()
{
std::atomic<void*>& hp=get_hazard_pointer_for_current_thread();
node* old_head=head.load();
do
{
node* temp;
//一直循环到你将风险指针设置到head上
do
{
temp=old_head;
hp.store(old_head);
old_head=head.load();
}while(old_head!=temp);
//设置风险指针放到外部循环,如果比较/交换失败则重载old_head。
}
while(old_head && !head.compare_exchange_strong(old_head,old_head->next));
//因为在这个while循环中确实有效使用weak()会导致不必要地重置风险指针
hp.store(nullptr); //当你完成时清除风险指针
std::shared_ptr<T> res;
if(old_head)
{
res.swap(old_head->data);
//在你删除一个结点前检查风险指针是否引用它
if(outstanding_hazard_pointers_for(old_head))
{
reclaim_later(old_head); //放在稍后回收的列表中
}
else
{
delete old_head; //立刻删除
}
delete_nodes_with_no_hazards();
}
return res;
}

View File

@@ -0,0 +1,53 @@
//get_hazard_pointer_for_current_thread()的简单实现
unsigned const max_hazard_pointers=100;
struct hazard_pointer
{
std::atomic<std::thread::id> id;
std::atomic<void*> pointer;
};
hazard_pointer hazard_pointers[max_hazard_pointers];
class hp_owner
{
hazard_pointer* hp;
public:
hp_owner(hp_owner const&)=delete;
hp_owner& operator=(hp_owner const&)=delete;
hp_owner():
hp(nullptr)
{
for (unsigned i = 0; i < max_hazard_pointers; ++i)
{
std::thread::id old_id;
//试着获取风险指针的所有权
if(hazard_pointers[i].id.compare_exchange_strong(old_id,std::this_thread::get_id()))
{
hp=&hazard_pointers[i];
break;
}
}
if(!hp)
{
throw std::runtime_error("No hazard pointers available");
}
}
std::atomic<void*>& get_pointer()
{
return hp->pointer;
}
//线程退出hp_owner实例就被销毁了
~hp_owner()
{
hp->pointer.store(nullptr);
hp->id.store(std::thread::id());
}
};
std::atomic<void*>& get_hazard_pointer_for_current_thread()
{
thread_local static hp_owner hazard; //每个线程有自己的风险指针
return hazard.get_pointer();
}

View File

@@ -0,0 +1,68 @@
//回收函数的简单实现
template <typename T>
void do_delete(void* p)
{
delete static_cast<T*>(p);
}
struct data_to_reclaim
{
void* data;
std::function<void(void*)> deleter;
data_to_reclaim* next;
template <typename T>
data_to_reclaim(T* p):
data(p),
deleter(&do_delete<T>),
next(0)
{}
~data_to_reclaim()
{
deleter(data);
}
};
std::atomic<data_to_reclaim*> nodes_to_reclaim;
void add_to_reclaim_list(data_to_reclaim* node)
{
node->next=nodes_to_reclaim.load();
while(!nodes_to_reclaim.compare_exchange_weak(node->next,node));
}
template <typename T>
void reclaim_later(T* data)
{
add_to_reclaim_list(new data_to_reclaim(data));
}
bool outstanding_hazard_pointers_for(void* p)
{
for(unsigned i=0;i<max_hazard_pointers;++i)
{
if(hazard_pointers[i].pointer.load()==p)
{
return true;
}
}
return false;
}
void delete_nodes_with_no_hazards()
{
data_to_reclaim* current=nodes_to_reclaim.exchange(nullptr);
while(current)
{
data_to_reclaim* const next=current->next;
if(!outstanding_hazard_pointers_for(current->data))
{
delete current;
}
else
{
add_to_reclaim_list(current);
}
current=next;
}
}

View File

@@ -0,0 +1,31 @@
//使用无锁的std::shared_ptr<>的无锁栈实现
template <typename T>
class lock_free_stack
{
private:
struct node
{
std::shared_ptr<T> data;
std::shared_ptr<node> next;
node(T const& data_):
data(std::make_shared<T>(data_))
{}
};
std::shared_ptr<node> head;
public:
void push(T const& data)
{
std::shared_ptr<node> const new_node=std::make_shared<node>(data);
new_node->next=head.load();
while(!std::atomic_compare_exchange_weak(&head,&new_node->next,new_node))
}
std::shared_ptr<T> pop()
{
std::shared_ptr<node> old_head=std::atomic_load(&head);
while(old_head && !std::atomic_compare_exchange_weak(&head,&old_head,old_head->next));
return old_head ? old_head->data : std::shared_ptr<T>();
}
};

View File

@@ -0,0 +1,43 @@
//在使用两个引用计数的无锁栈中入栈结点
template <typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::shared_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head;
public:
~lock_free_stack()
{
while(pop());
}
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next.ptr->next=head.load();
while(!head.compare_exchange_weak(new_node.ptr->next,new_node));
}
};

View File

@@ -0,0 +1,51 @@
//使用两个引用计数从无锁栈中出栈一个结点
template <typename T>
class lock_free_stack
{
private:
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter));
old_counter.external_count=new_counter.external_count;
}
public:
std::shared_ptr<T> pop()
{
counted_node_ptr old_head=head.load();
for(;;)
{
increase_head_count(old_head);
node* const ptr=old_head.ptr;
if(!ptr)
{
return std::shared_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next))
{
std::shared_ptr<T> res;
res.swap(ptr->data);
//你增加的值比外部计数的值减少2
int const count_increase=old_head.external_count-2;
// 如果当前引用计数的值为0那么先前你增加的值(即fetch_add的返回值)就是负数,此时可以删除这个结点
if(ptr->internal_count.fetch_add(count_increase)==(-count_increase))
{
delete ptr;
}
return res;
}
else if(ptr->internal_count.fetch_sub(1)==1)
{
delete ptr;
}
}
}
};

View File

@@ -0,0 +1,90 @@
//使用引用计数和放松原子操作的无锁栈
template <typename T>
class lock_free_stack
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node
{
std::share_ptr<T> data;
std::atomic<int> internal_count;
counted_node_ptr next;
node(T const& data_):
data(std::make_shared<T>(data_)),
internal_count(0)
{}
};
std::atomic<counted_node_ptr> head;
void increase_head_count(counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter=old_counter;
++new_counter.external_count;
}
while(!head.compare_exchange_strong(old_counter,new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
public:
~lock_free_stack()
{
while(pop());
}
void push(T const& data)
{
counted_node_ptr new_node;
new_node.ptr=new node(data);
new_node.external_count=1;
new_node.ptr->next=head.load(std::memory_order_relaxed);
while(!head.compare_exchange_weak(new_node.ptr->next,new_node,
std::memory_order_release,
std::memory_order_relaxed));
}
std::share_ptr<T> pop()
{
counted_node_ptr old_head=head.load(std::memory_order_relaxed);
for(;;)
{
increase_head_count(old_head);
node* const ptr=old_head.ptr;
if(!ptr)
{
return std::share_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next,std::memory_order_relaxed))
{
std::share_ptr<T> res;
res.swap(ptr->data);
int const count_increase=old_head.external_count-2;
if(ptr->internal_count.fetch_add(count_increase,std::memory_order_release)==(-count_increase))
{
delete ptr;
}
return res;
}
else if(ptr->internal_count.fetch_add(-1,std::memory_order_relaxed)==1)
{
ptr->internal_count.load(std::memory_order_acquire);
delete ptr;
}
}
}
};

View File

@@ -0,0 +1,67 @@
//单生产者单消费者的无锁队列
template <typename T>
class lock_free_queue
{
private:
struct node
{
std::shared_ptr<T> data;
node* next;
node():
next(nullptr)
{}
};
std::atomic<node*> head;
std::atomic<node*> tail;
node* pop_head()
{
node* const old_head=head.load();
if(old_head==tail.load())
{
return nullptr;
}
head.store(old_head->next);
return old_head;
}
public:
lock_free_queue():
head(new node),tail(head.load())
{}
lock_free_queue(const lock_free_queue& other)=delete;
lock_free_queue& operator=(const lock_free_queue& other)=delete;
~lock_free_queue()
{
while(node* const old_head=head.load())
{
head.store(old_head->next);
delete old_head;
}
}
std::shared_ptr<T> pop()
{
node* old_head=pop_head();
if(!old_head)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> const res(old_head->data);
delete old_head;
return res;
}
void push(T new_value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(new_value));
node* p=new node;
node* const old_tail=tail.load();
old_tail->data.swap(new_data);
old_tail->next=p;
tail.store(p);
}
};

View File

@@ -0,0 +1,20 @@
//首次很逊的尝试修订push()
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr=new node;
new_next.external_count=1;
for(;;)
{
node* const old_tail=tail.load(); //加载一个原子指针
T* old_data=nullptr;
if(old_tail->data.compare_exchange_strong(old_data,new_data.get())) //解引用那个指针
{
old_tail->next=new_next;
tail.store(new_next.ptr); //更新那个指针
new_data.release();
break;
}
}
}

View File

@@ -0,0 +1,68 @@
//在无锁队列中引用计数tail来实现push()
template <typename T>
class lock_free_queue
{
private:
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
std::atomic<counted_node_ptr> head;
std::atomic<counted_node_ptr> tail;
//将此结构体保存在一个机器字中在许多平台中使原子操作更容易是无锁的
struct node_counter
{
unsigned internal_count:30;
unsigned external_counters:2; //这里的external_counters只包含两个比特因为最多只有两个计数器
};
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
counted_node_ptr next;
node()
{
node_counter new_count;
new_count.internal_count=0;
new_count.external_counters=2;
count.store(new_count);
next.ptr=nullptr;
next.external_count=0;
}
};
public:
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr=new node;
new_next.external_count=1;
counted_node_ptr old_tail=tail.load();
for(;;)
{
increase_external_count(tail,old_tail); //增加计数
T* old_data=nullptr;
if(old_tail.ptr->data.compare_exchange_strong(old_data,new_data.get())) //解引用
{
old_tail.ptr->next=new_next;
old_tail=tail.exchange(new_next);
free_external_counter(old_tail);
new_data.release();
break;
}
old_tail.ptr->release_ref();
}
}
};

View File

@@ -0,0 +1,32 @@
//从使用引用计数tail的无锁队列中将结点出队列
template <typename T>
class lock_free_queue
{
private:
struct node
{
void release_ref();
};
public:
std::unique_ptr<T> pop()
{
counted_node_ptr old_head=head.load(std::memory_order_relaxed);
for(;;)
{
increase_external_count(head,old_head);
node* const ptr=old_head.ptr;
if(ptr==tail.load().ptr)
{
ptr->release_ref();
return std::unique_ptr<T>();
}
if(head.compare_exchange_strong(old_head,ptr->next))
{
T* const res=ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};

View File

@@ -0,0 +1,26 @@
//释放无锁队列的结点引用
template <typename T>
class lock_free_queue
{
private:
struct node
{
void release_ref()
{
node_counter old_counter=count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.internal_count;
}
while(!count.compare_exchange_strong(old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters)
{
delete this;
}
}
};
};

View File

@@ -0,0 +1,21 @@
//在无锁队列中获得结点的新引用
template <typename T>
class lock_free_queue
{
private:
static void increase_external_count(std::atomic<counted_node_ptr>& counter, counted_node_ptr& old_counter)
{
counted_node_ptr& new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
}
while(!counter.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
old_counter.external_count=new_counter.external_count;
}
};

View File

@@ -0,0 +1,29 @@
//在无锁队列中释放结点的外部计数
template <typename T>
class lock_free_queue
{
private:
static void free_external_counter(counted_node_ptr& old_node_ptr)
{
node* const ptr=old_node_ptr.ptr;
int const count_increase=old_node_ptr.external_count-2;
node_counter old_counter=ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.external_counters;
new_counter.internal_count+=count_increase;
}
while(!ptr->count.compare_exchange_strong(
old_counter,new_counter,
std::memory_order_acquire,std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters)
{
delete ptr;
}
}
};

View File

@@ -0,0 +1,34 @@
//修改pop()来允许帮助push()
template <typename T>
class lock_free_queue
{
private:
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next;
};
public:
std::unique<T> pop()
{
counted_node_ptr old_head=head.load(std::memory_order_relaxed);
for(;;)
{
increase_external_count(head,old_head);
node* const ptr=old_head.ptr;
if(ptr==tail.load().ptr)
{
return std::unique_ptr<T>();
}
counted_node_ptr next=ptr->next.load();
if(head.compare_exchange_strong(old_head,next))
{
T* const res=ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};

View File

@@ -0,0 +1,53 @@
//无锁队列中使用帮助的push()
template <typename T>
class lock_free_queue
{
private:
void set_new_tail(counted_node_ptr& old_tail,counted_node_ptr const& new_tail)
{
node* const current_tail_ptr=old_tail.ptr;
while(!tail.compare_exchange_weak(old_tail,new_tail) && old_tail.ptr==current_tail_ptr);
if(old_tail.ptr==current_tail_ptr)
free_external_counter(old_tail);
else
current_tail_ptr->release_ref();
}
public:
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count=1;
counted_node_ptr old_tail=tail.load();
for(;;)
{
increase_external_count(tail,old_tail);
T* old_data=nullptr;
if(old_tail.ptr->data.compare_exchange_strong(old_data,new_data.get()))
{
counted_node_ptr old_next={0};
if(!old_tail.ptr->next.compare_exchange_strong(old_next,new_next))
{
delete new_next.ptr;
new_next=old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else
{
counted_node_ptr old_next={0};
if(old_tail.ptr->next.compare_exchange_strong(old_next,new_next))
{
old_next=new_next;
new_next.ptr=new_next
}
set_new_tail(old_tail, old_next);
}
}
}
};

View File

@@ -0,0 +1,105 @@
//使用待排序块栈的并行快速排序
template <typename T>
struct sorter
{
struct chunk_to_sort
{
std::list<T> data;
std::promise<std::list<T> > promise;
};
thread_safe_stack<chunk_to_sort> chunks; //未排序块
std::vector<std::thread> threads; //线程集
unsigned const max_thread_count;
std::atomic<bool> end_of_data;
sorter():
max_thread_count(std::thread::hardware_concurrency()-1),
end_of_data(false)
{}
~sorter()
{
end_of_data=true;
for(unsigned i=0;i<threads.size();++i)
{
threads[i].join();
}
}
void try_sort_chunk()
{
boost::shared_ptr<chunk_to_sort> chunk=chunks.pop();
if(chunk)
{
sort_chunk(chunk);
}
}
//完成排序并压入栈
std::list<T> do_sort(std::List<T>& chunk_data)
{
if(chunk_data.empty())
{
return chunk_data;
}
std::list<T> result;
result.splice(result.begin(),chunk_data,chunk_data.begin());
T const& partition_val=*result.begin();
typename std::list<T>::iterator divide_point=
std::partition(chunk_data.begin(),chunk_data.end(),[&](T const& val){
return val < partition_val;
});
chunk_to_sort new_lower_chunk;
new_lower_chunk.data.splice(new_lower_chunk.data.end(),
chunk_data,chunk_data.begin(),divide_point);
std::future<std::list<T> > new_lower=new_lower_chunk.promise.get_future();
chunks.push(std::move(new_lower_chunk));
if(threads.size()<max_thread_count)
{
threads.push_back(std::thread(&sorter<T>::sort_thread,this));
}
std::list<T> new_higher(do_sort(chunk_data));
result.splice(result.end(),new_higher);
while(new_lower.wait_for(std::chrono::second(0)) != std::future_status::ready)
{
try_sort_chunk();
}
result.splice(result.begin(),new_lower.get());
return result;
}
void sort_chunk(boost::shared_ptr<chunk_to_sort> const& chunk)
{
chunk->promise.set_value(do_sort(chunk->data));
}
void sort_thread()
{
while(!end_of_data)
{
try_sort_chunk();
std::this_threads::yield();
}
}
};
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input)
{
if(input.empty())
{
return input;
}
sorter<T> s;
return s.do_sort(input);
}

View File

@@ -0,0 +1,48 @@
//std::accumulate的并行版本(来自清单2.8)
template <typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template <typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) //如果输入的范围为空只返回初始值init
return init;
unsigned long const min_per_thread=25; //最小块的大小
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread; //处理的元素数量除以最小块的大小,获取线程的最大数量
unsigned long const hardware_threads=std::thread::hardware_concurrency(); //真正并发运行的线程数量的指示
//要运行的线程数是你计算出的最大值的硬件线程数量的较小值。
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
//如果hardware_concurrency返回0我们就替换成2运行过多的线程会在单核机器上变慢过少会错过可用的并发
unsigned long const block_size=length/num_threads; //待处理的线程的条目数量是范围的长度除以线程的数量
std::vector<T> results(num_threads); //保存中间结果
std::vector<std::thread> threads(num_threads-1); //因为有一个线程(本线程)了所以少创建一个文档
//循环1.递进block_end到当前块的结尾2.并启动一个新的线程来累计此块的结果。3.下一个块的开始是这一个的结束
Iterator block_start=first;
for(unsigned long i = 0; i < (num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); ...1
threads[i]=std::thread(accumulate_block<Iterator,T>(),block_start,block_end,std::ref(results[i])); ...2
block_start=block_end; ...3
}
//这里是处理上面没有整除的掉block_size的剩下的部分
accumulate_block()(block_start,last,results[num_threads-1]);
//通过join等待所有计算的线程
std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));
//一旦累计计算出最后一个块的结果调用accumulate将结果计算出来
return std::accumulate(results.begin(),results.end(),init);
}

View File

@@ -0,0 +1,55 @@
//使用std::packaged_task的std::accumulate的并行版本来解决新线程上抛出异常的问题
template <typename Iterator,typename T>
struct accumulate_block
{
//直接返回结果
T operator()(Iterator first,Iterator last)
{
return std::accumulate(first,last,T());
}
};
template <typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length)
return init;
//2.8有讲
unsigned long const min_per_thread=25;
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=std::thread::hardware_concurrency();
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::vector<std::future<T> > futures(num_threads-1); //与8.2不同使用future变量
std::vector<std::thread> threads(num_threads-1);
Iterator block_start=first;
for(unsigned long i = 0; i < (num_threads-1); ++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
//为accumulate_block创造一个任务
std::packaged_task<T(Iterator,Iterator)> task(accumulate_block<Iterator,T>());
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task),block_start,block_end); //允许任务的时候将在future中捕捉结果也会捕捉任何抛出的异常
block_start=block_end;
}
T last_result=accumulate_block()(block_start,last);
std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));
T result=init;
for(unsigned long i=0;i<(num_threads-1);++i)
{
result+=futures[i].get();
}
result += last_result;
return result;
}

View File

@@ -0,0 +1,61 @@
//std::accumulate的异常安全并行版本
//使用future最简单的方法就是捕获所有异常并且将它们融合到调用joinable()的线程中,然后再次抛出异常。
//try-catch块令人讨厌我们在一个对象的析构函数中检查它
class join_threads
{
std::vector<std::thread>& threads;
public:
explicit join_threads(std::vector<std::threads>& threads_):
threads(threads_)
{}
~join_threads();
{
for(unsigned long i=0;i<threads.size();++i)
{
if(threads[i].joinable())
threads[i].join();
}
}
};
template <typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length)
return init;
unsigned long const min_per_thread=25;
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=std::thread::hardware_concurrency();
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::vector<std::future<T> > futures(num_threads-1);
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task<T(Iterator,Iterator)> task(accumulate_block<Iterator,T>());
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task),block_start,block_end);
block_start=block_end;
}
T last_result=accumulate_block()(block_start,last);
T result=init;
for(unsigned long i=0;i<(num_threads-1);++i)
{
result+=futures[i].get(); //将被阻塞直到结果出来
}
result += last_result;
return result;
}

View File

@@ -0,0 +1,21 @@
//使用std::async的std::accumulate的异常安全并行版本
template <typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
unsigned long const max_chunk_size=25;
if(length<=max_chunk_size)
{
return std::accumulate(first,last,init);
}
else
{
Iterator mid_point=first;
std::advance(mid_point,length/2);
std::future<T> first_half_result=std::async(parallel_accumulate<Iterator,T>,first,mid_point,init);
T second_half_result=parallel_accumulate(mid_point,last,T());
return first_half_result.get()+second_half_result; //出异常时future会销毁get()会再次抛出异常
}
}
//这个版本使用递归将数据划分为块而不是重新计算将数据划分为块,但是它比之前的版本要简单一些,并且是异常安全的

View File

@@ -0,0 +1,51 @@
//从任务线程中分离GUI线程
std::thread task_thread;
std::atomic<bool> task_cancelled(false);
void gui_thread()
{
while(true)
{
event_data event=get_event();
if(event.type == quit)
break;
process(event);
}
void task()
{
while(!task_complete() && !task_cancelled)
{
do_next_operation();
}
if(task_cancelled)
{
perform_cleanup();
}
else
{
post_gui_event(task_complete);
}
}
void process(event_data const& event)
{
switch(event.type)
{
case start_task:
task_cancelled=false;
task_thread=std::thread(task);
break;
case stop_task:
task_cancelled=true;
task_thread.join();
break;
case task_complete:
task_thread.join();
display_results();
break;
default:
//...
}
}
}

View File

@@ -0,0 +1,41 @@
//std::for_each的并行版本
template <typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long const min_per_thread=25;
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=std::thread::hardware_concurrency();
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::vector<std::future<void> > futures(num_threads-1);
std::vector<std::thread> threads(num_threads-1);
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0; i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
std::packaged_task<void(void)> task([=](){
std::for_each(block_start,block_end,f);
});
futures[i]=task.get_future();
threads[i]=std::thread(std::move(task));
block_start=block_end;
}
std::for_each(block_start,last,f);
for (unsigned long i = 0; i < (num_threads-1); ++i)
{
//只提供取回工作线程抛出的异常的方法,如果你不希望传递异常,那么你就可以省略它。
futures[i].get();
}
}

View File

@@ -0,0 +1,25 @@
//使用std::async的std::for_each的并行版本
template <typename Iterator,typename Func>
void parallel_for_each(Iterator first,Iterator last,Func f)
{
unsigned long const length=std::distance(first,last);
if(!length)
return;
unsigned long const min_per_thread=25;
if(length < (2*min_per_thread))
{
std::for_each(first,last,f);
}
else
{
Iterator const mid_point=first+length/2;
//异步运行前半部分
std::future<void> first_half=std::async(&parallel_for_each<Iterator,Func>,first,mid_point,f);
parallel_for_each(mid_point,last,f);
//使用std::async和get()成员函数std::future提供了异常传播语义
first_half.get();
}
}

View File

@@ -0,0 +1,74 @@
//并行find算法的一种实现
template <typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
struct find_element
{
void operator()(Iterator begin,Iterator end,MatchType match,
std::promise<Iterator>* result,
std::atomic<bool>* done_flag)
{
try
{
for(;(begin!=end) && !done_flag->load();++begin)
{
if(*begin == match)
{
result->set_value(begin);
done_flag->store(true);
return;
}
}
}
catch(...)
{
try
{
result->set_exception(std::current_exception());
done_flag->store(true);
}
catch(...)
{}
}
}
};
unsigned long const length=std::distance(first,last);
if(!length)
return last;
unsigned long const min_per_thread=25;
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=std::thread::hardware_concurrency();
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
std::promise<Iterator> result;
std::atomic<bool> done_flag(false);
std::vector<std::thread> threads(num_threads-1);
//你通过在块中附入线程链接的代码,使得检查结构之前需要等待所有线程结束。
{
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size);
threads[i]=std::thread(find_element(),block_start,block_end,match,&result,&done_flag);
block_start=block_end;
}
find_element()(block_start,last,match,&result,&done_flag);
}
//检查是否有匹配项的时候,所有线程都被联合起来了
if(!done_flag.load())
{
return last;
}
return result.get_future().get();
}

View File

@@ -0,0 +1,43 @@
//使用std::async的并行查找算法的实现
template <typename Iterator,typename MatchType>
Iterator parallel_find_impl(Iterator first,Iterator last,MatchType match,std::atomic<bool>& done)
{
try
{
unsigned long const length=std::distance(first,second);
unsigned long const min_per_thread=25;
if(length<(2*min_per_thread))
{
for(;(first!=last) && !done.load();++first)
{
if(*first==match)
{
done=true;
return first;
}
}
return last;
}
else
{
Iterator const mid_point=first+(length/2);
std::future<Iterator> async_result=std::async(&parallel_find_impl<Iterator,MatchType>,
mid_point,last,match,std::ref(done));
Iterator const direct_result=parallel_find_impl(first,mid_point,match,done);
return (direct_result==mid_point)?async_result.get():direct_result;
}
}
catch(...)
{
done=true;
throw;
}
}
template <typename Iterator,typename MatchType>
Iterator parallel_find(Iterator first,Iterator last,MatchType match)
{
std::atomic<bool> done(false);
//线程间共享的标志,需要传递给所有递归调用。从主入口点传递进来的。
return parallel_find_impl(first,last,match,done);
}

View File

@@ -0,0 +1,91 @@
//通过划分问题来并行计算分段的和
//简单来说还是分块累加
template <typename Iterator>
void parallel_partial_sum(Iterator first,Iterator last)
{
typedef typename Iterator::value_type value_type;
struct process_chunk
{
void operator()(Iterator begin,Iterator last,
std::future<value_type>* previous_end_value,
std::promise<value_type>* end_value)
{
try
{
Iterator end=last;
++end;
std::partial_sum(begin,end,begin);
if(previous_end_value) //这是否为第一个块
{
value_type& addend=previous_end_value->get();
*last+=addend;
if(end_value)
{
end_value->set_value(*last);
}
std::for_each(begin,last,[addend](value_type& item)
{
item+=addend;
});
}
else if(end_value)
{
end_value->set_value(*last);
}
}
catch(...)
{
if(end_value)
{
end_value->set_exception(std::current_exception());
}
else
{
throw;
}
}
}
};
unsigned long const length=std::distance(first,last);
if(!length)
return last;
unsigned long const min_per_thread=25;
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread;
unsigned long const hardware_threads=std::thread::hardware_concurrency();
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
unsigned long const block_size=length/num_threads;
typedef typename Iterator::value_type value_type;
std::vector<std::thread> threads(num_threads-1);
std::vector<std::promise<value_type> > end_values(num_threads-1);
std::vector<std::future<value_type> > previous_end_values;
previous_end_values.reserve(num_threads-1);
join_threads joiner(threads);
Iterator block_start=first;
for(unsigned long i=0;i<(num_threads-1);++i)
{
Iterator block_last=block_start;
std::advance(block_last,block_size-1);
threads[i]=std::thread(process_chunk(),
block_start,block_last,
(i!=0)?&previous_end_values[i-1]:0,
&end_values[i]);
block_start=block_last;
++block_start;
previous_end_values.push_back(end_values[i].get_future());
}
Iterator final_element=block_start;
std::advance(final_element,std::distance(block_start,last)-1);
process_chunk()(block_start,final_element,
(num_threads-1)?&previous_end_values.back():0,
0);
}

Some files were not shown because too many files have changed in this diff Show More