同步与互斥
互斥与同步
当有多个进程或多个线程对共享资源进行访问时,由于进程或线程的执行顺序有随机性,资源访问的结果也会具有随机性,为了保证结果的正确,需要互斥和同步机制。
互斥:一个资源一次只能被一个进程或线程访问,也就是互斥访问资源。
同步:多个进程或线程之间需要合作,相互之间交换信息,实现同步运作。
下面统一采用进程作为调度单元。
代码实现类似于C语言的风格,但是会包含
/* */
包围起来的描述性过程。
硬件支持
禁用中断
实现
1 | for(;;){ |
在进入临界区之前禁用中断,离开临界区之后再启用中断。这样进程在临界区中运行时就不会被中断(成为原子操作)。
不足
- 代价高,执行效率低
- 如果是有多个处理器的计算机,那么就算禁用了中断也不能保证互斥,因为可能同时有多个进程在临界区运行。
专用指令
两种实现
用一个标志lock
来表明临界区是否正在被访问,使用原子操作compare_and_swap
和exchange
来访问lock
。
比较和交换
如果原来
word
地址的值是testval
,就把值设为newval
:1
2
3
4
5
6
7int compare_and_swap(int* word, int testval, int newval){
int oldval = *word;
if (oldval == testval) {
*word = newval;
}
return oldval;
}用法:
1
2
3
4
5
6
7
8
9
10
11int lock;
void P() {
for (;;) {
while (compare_and_swap(&lock, 0, 1) == 1)
/* 忙等待 */;
/* 临界区 */
lock = 0;
/* 剩余部分 */
}
}交换
直接交换,在外部进行比较:
1
2
3
4
5void exchange(int* reg, int* mem){
int tmp = *mem;
*mem = *reg;
*reg = tmp;
}用法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14int lock;
void P() {
for (;;) {
int key = 1;
do
{
exchange(&key, &lock);
} while (key == 1);
/* 临界区 */
lock = 0;
/* 剩余部分 */
}
}不足
忙等待、可能饥饿、可能死锁。
软件支持
信号量 Semaphore
semWait
和semSignal
都是原子操作(可以通过禁用中断实现)
实现
计数信号量
用整数值记录资源数,
semWait
相当于请求资源,资源不够时阻塞等待,semSignal
相当于释放资源,并唤醒一个等待的进程。禁用中断实现信号量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// 一种简单的实现
typedef struct semaphore{
int count; // 计数
Queue* queue; // 等待中的进程队列
}semaphore;
void semWait(semaphore* s){
/* 禁用中断 */
s->count--;
if(s->count < 0){
/* 当前进程加入队列 */
/* 阻塞当前进程并启用中断 */
}else{
/* 启用中断 */
}
}
void semSignal(semaphore* s){
/* 禁用中断 */
s->count++;
if(s->count <= 0){
/* 从队列中取出一个进程 */
/* 放入就绪队列 */
}
/* 启用中断 */
}另一种等价的实现是
count
永远为非负数。semWait
中先判断count
是否为0,是则直接阻塞,醒来后才减小count
,semSignal
中先判断队列是否为空,然后才增加count
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19void semWait(semaphore* s){
/* 禁用中断 */
while(s->count == 0){
/* 当前进程加入队列 */
/* 阻塞当前进程并启用中断 */
}
s->count--;
/* 启用中断 */
}
void semSignal(semaphore* s){
/* 禁用中断 */
if(!s->queue->IsEmpty()){
/* 从队列中取出一个进程 */
/* 放入就绪队列 */
}
s->count++;
/* 启用中断 */
}二元信号量 BinarySemaphore
也就是在计数信号量中
count
只取0
和1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27typedef struct binary_semaphore{
enum{zero, one} value;
Queue* queue; // 等待中的进程队列
}binary_semaphore;
void semWaitB(binary_semaphore* s){
/* 禁用中断 */
if(s->value == one){
s->value = zero;
/* 启用中断 */
}else{
/* 当前进程加入队列 */
/* 阻塞当前进程并启用中断 */
}
}
void semSignalB(binary_semaphore* s){
/* 禁用中断 */
s->count++;
if(s->queue->IsEmpty()){
s->value = one;
}else{
/* 从队列中取出一个进程 */
/* 放入就绪队列 */
}
/* 启用中断 */
}用法
1 | semaphore s = 1; |
互斥锁 Mutex
用一个标志记录资源是否被锁定,访问临界区之前要获得锁,如果已经被上锁(有进程正在访问)就要等待其解锁,访问之后要释放锁。
用开关中断实现
1 | void LockAcquire(){ |
这种实现中存在一点问题,在阻塞当前进程后,中断还是处于关闭状态。但是如果提前开启中断又会有新的问题:
- 如果在加入等待队列之前开启中断。这时可能会切换到拥有锁的进程调用
LockRelease
,就会检测到队列是空的,然后结束LockRelease
,于是当前进程阻塞后就会失去这次被唤醒的机会。 - 如果在加入等待队列之后,阻塞之前开启中断。这时可能会切换到拥有锁的进程调用
LockRelease
,发现队列不空,然后把这个进程放入就绪队列,但是切换到这个进程后又会马上进入阻塞,所以同样失去了这次唤醒的机会。如果LockAcquire
里获得锁
这个操作是放在LockRelease
里的,也就是放入就绪队列之后就认为这把锁由这个就绪进程获得,那么就不只是失去一次唤醒机会,同时还会发生死锁(已经获得了锁却又在阻塞中)
所以这个实现是在LockAcquire
最后才开启中断,在阻塞当前进程之后切换到新进程时就需要系统立即开启中断。
用信号量实现
锁中有一个初值为1
的信号量lock
,相当于是用二元信号量实现了锁。
1 | void LockAcquire(){ |
用法
1 | for(;;){ |
与二元信号量的对比
- 相同:二元信号量的
semWaitB
可以看成是加锁,semSignal
可以看成是解锁 - 不同:为互斥量加锁和解锁的必须是同一个进程,也就是获得了锁之后才能解锁。而二元信号量则不一定要先
semWait
后才能semSignal
。
条件变量 ConditionVariable
条件变量用来等待一个特定的条件为真。一个进程调用ConditionWait
并传入一个已经获取了的锁
,然后进入阻塞,等待另一个进程完成它等待的条件并调用ConditionSignal
或ConditionBroadcast
唤醒这个进程。
Mesa语义:阻塞中的进程被唤醒之后不会立即执行,而是加入就绪队列,等待调用Signal
的那个进程结束之后才可能运行。
用开关中断实现
1 | void ConditionWait(Lock* lock){ |
用信号量实现
被阻塞的进程都在一个初值为0的信号量semWaiting
上等待,用一个整数nWaiting
记录等待中的进程数量。
1 | void ConditionWait(Lock* lock){ |
用法
1 | /* 一个进程 */ |
事件栅栏EventBarrier
用来实现一组进程都到达某个地点之后再同步向前推进,而栅栏就是充当路障的作用,进程没有全部到齐时,栅栏都是关闭的。进程全部到达之后,栅栏开启。
实现
要同步的线程先调用Wait进入阻塞,等到栅栏进程调用Signal才会醒来,然后调用Complete。栅栏的Signal并不直接开启栅栏,而是等到所有进程都调用了Complete才放行。
采用一把锁lock和三个信号量waitCond,unsignaledCond,uncompleteCond
1 | void EventBarrierWait(){ |
用法
1 | /* 同步线程 */ |