同步与互斥

互斥与同步

当有多个进程或多个线程对共享资源进行访问时,由于进程或线程的执行顺序有随机性,资源访问的结果也会具有随机性,为了保证结果的正确,需要互斥和同步机制。

互斥:一个资源一次只能被一个进程或线程访问,也就是互斥访问资源。

同步:多个进程或线程之间需要合作,相互之间交换信息,实现同步运作。

下面统一采用进程作为调度单元。

代码实现类似于C语言的风格,但是会包含/* */包围起来的描述性过程。

硬件支持

禁用中断

实现

1
2
3
4
5
6
for(;;){
/* 禁用中断 */
/* 临界区 */
/* 启用中断 */
/* 剩余部分 */
}

在进入临界区之前禁用中断,离开临界区之后再启用中断。这样进程在临界区中运行时就不会被中断(成为原子操作)。

不足

  1. 代价高,执行效率低
  2. 如果是有多个处理器的计算机,那么就算禁用了中断也不能保证互斥,因为可能同时有多个进程在临界区运行。

专用指令

两种实现

用一个标志lock来表明临界区是否正在被访问,使用原子操作compare_and_swapexchange来访问lock

  1. 比较和交换

    如果原来word地址的值是testval,就把值设为newval:

    1
    2
    3
    4
    5
    6
    7
    int compare_and_swap(int* word, int testval, int newval){
    int oldval = *word;
    if (oldval == testval) {
    *word = newval;
    }
    return oldval;
    }

    用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    int lock;

    void P() {
    for (;;) {
    while (compare_and_swap(&lock, 0, 1) == 1)
    /* 忙等待 */;
    /* 临界区 */
    lock = 0;
    /* 剩余部分 */
    }
    }
  2. 交换

    直接交换,在外部进行比较:

    1
    2
    3
    4
    5
    void exchange(int* reg, int* mem){
    int tmp = *mem;
    *mem = *reg;
    *reg = tmp;
    }

    用法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    int lock;

    void P() {
    for (;;) {
    int key = 1;
    do
    {
    exchange(&key, &lock);
    } while (key == 1);
    /* 临界区 */
    lock = 0;
    /* 剩余部分 */
    }
    }

    不足

忙等待、可能饥饿、可能死锁。

软件支持

信号量 Semaphore

semWaitsemSignal都是原子操作(可以通过禁用中断实现)

实现

  1. 计数信号量

    用整数值记录资源数,semWait相当于请求资源,资源不够时阻塞等待semSignal相当于释放资源,并唤醒一个等待的进程。

    禁用中断实现信号量:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // 一种简单的实现
    typedef struct semaphore{
    int count; // 计数
    Queue* queue; // 等待中的进程队列
    }semaphore;

    void semWait(semaphore* s){
    /* 禁用中断 */
    s->count--;
    if(s->count < 0){
    /* 当前进程加入队列 */
    /* 阻塞当前进程并启用中断 */
    }else{
    /* 启用中断 */
    }
    }

    void semSignal(semaphore* s){
    /* 禁用中断 */
    s->count++;
    if(s->count <= 0){
    /* 从队列中取出一个进程 */
    /* 放入就绪队列 */
    }
    /* 启用中断 */
    }

    另一种等价的实现是count永远为非负数。semWait中先判断count是否为0,是则直接阻塞,醒来后才减小countsemSignal中先判断队列是否为空,然后才增加count

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    void semWait(semaphore* s){
    /* 禁用中断 */
    while(s->count == 0){
    /* 当前进程加入队列 */
    /* 阻塞当前进程并启用中断 */
    }
    s->count--;
    /* 启用中断 */
    }

    void semSignal(semaphore* s){
    /* 禁用中断 */
    if(!s->queue->IsEmpty()){
    /* 从队列中取出一个进程 */
    /* 放入就绪队列 */
    }
    s->count++;
    /* 启用中断 */
    }
  2. 二元信号量 BinarySemaphore

    也就是在计数信号量中count只取01

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    typedef struct binary_semaphore{
    enum{zero, one} value;
    Queue* queue; // 等待中的进程队列
    }binary_semaphore;

    void semWaitB(binary_semaphore* s){
    /* 禁用中断 */
    if(s->value == one){
    s->value = zero;
    /* 启用中断 */
    }else{
    /* 当前进程加入队列 */
    /* 阻塞当前进程并启用中断 */
    }
    }

    void semSignalB(binary_semaphore* s){
    /* 禁用中断 */
    s->count++;
    if(s->queue->IsEmpty()){
    s->value = one;
    }else{
    /* 从队列中取出一个进程 */
    /* 放入就绪队列 */
    }
    /* 启用中断 */
    }

    用法

1
2
3
4
5
6
7
8
9
10
semaphore s = 1;

void P(){
for(;;){
semWait(&s);
/* 临界区 */
semSignal(&s);
/* 剩余部分 */
}
}

互斥锁 Mutex

用一个标志记录资源是否被锁定,访问临界区之前要获得锁,如果已经被上锁(有进程正在访问)就要等待其解锁,访问之后要释放锁。

用开关中断实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void LockAcquire(){
/* 禁用中断 */
/* 如果已经被当前进程获取则抛出异常 */
while(isLocked){
/* 将当前进程加入等待队列 */
/* 阻塞当前进程 */
}
/* 获得锁 */
/* 启用中断 */
}

void LockRelease(){
/* 禁用中断 */
/* 如果未被当前进程获取则抛出异常 */
if(等待队列不空){
/* 从等待队列中取出一个进程 */
/* 加入就绪队列 */
}
/* 释放锁 */
/* 启用中断 */
}

这种实现中存在一点问题,在阻塞当前进程后,中断还是处于关闭状态。但是如果提前开启中断又会有新的问题:

  1. 如果在加入等待队列之前开启中断。这时可能会切换到拥有锁的进程调用LockRelease,就会检测到队列是空的,然后结束LockRelease,于是当前进程阻塞后就会失去这次被唤醒的机会。
  2. 如果在加入等待队列之后,阻塞之前开启中断。这时可能会切换到拥有锁的进程调用LockRelease,发现队列不空,然后把这个进程放入就绪队列,但是切换到这个进程后又会马上进入阻塞,所以同样失去了这次唤醒的机会。如果LockAcquire获得锁这个操作是放在LockRelease里的,也就是放入就绪队列之后就认为这把锁由这个就绪进程获得,那么就不只是失去一次唤醒机会,同时还会发生死锁(已经获得了锁却又在阻塞中)

所以这个实现是在LockAcquire最后才开启中断,在阻塞当前进程之后切换到新进程时就需要系统立即开启中断。

用信号量实现

锁中有一个初值为1的信号量lock,相当于是用二元信号量实现了锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void LockAcquire(){
/* 如果已经被当前进程获取则抛出异常 */
lock->P();
/* 获得锁 */
}

void LockRelease(){
/* 如果未被当前进程获取则抛出异常 */
/* 释放锁 */
if(等待队列不空){
/* 从等待队列中取出一个进程 */
/* 加入就绪队列 */
}
lock->V();
}

用法

1
2
3
4
5
6
for(;;){
lock->Acquire();
/* 临界区 */
lock->Release();
/* 剩余部分 */
}

与二元信号量的对比

  1. 相同:二元信号量的semWaitB可以看成是加锁,semSignal可以看成是解锁
  2. 不同:为互斥量加锁和解锁的必须是同一个进程,也就是获得了锁之后才能解锁。而二元信号量则不一定要先semWait后才能semSignal

条件变量 ConditionVariable

条件变量用来等待一个特定的条件为真。一个进程调用ConditionWait并传入一个已经获取了的锁,然后进入阻塞,等待另一个进程完成它等待的条件并调用ConditionSignalConditionBroadcast唤醒这个进程。

Mesa语义:阻塞中的进程被唤醒之后不会立即执行,而是加入就绪队列,等待调用Signal的那个进程结束之后才可能运行。

用开关中断实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void ConditionWait(Lock* lock){
/* 禁用中断 */
/* 如果lock未被当前进程获取则抛出异常 */
/* 将当前进程加入等待队列 */
/* 释放lock */
/* 阻塞当前进程 */
/* 获取lock */
/* 启用中断 */
}

void ConditionSignal(){
/* 禁用中断 */
if(等待队列不空){
/* 从等待队列中取出一个进程 */
/* 加入就绪队列 */
}
/* 启用中断 */
}

void ConditionBroadcast(){
/* 禁用中断 */
while(等待队列不空){
/* 从等待队列中取出一个进程 */
/* 加入就绪队列 */
}
/* 启用中断 */
}

用信号量实现

被阻塞的进程都在一个初值为0的信号量semWaiting上等待,用一个整数nWaiting记录等待中的进程数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ConditionWait(Lock* lock){
/* 如果lock未被当前进程获取则抛出异常 */
nWaiting++;
// 释放锁之后,其他进程才有机会改变条件
lock->Release();
// 睡眠,等待被唤醒
semWaiting->P();
lock->Acquire();
}

void ConditionSignal(){
if(nWaiting > 0){
semWaiting->V();
nWaiting--;
}
}

void ConditionBroadcast(){
while(nWaiting > 0){
semWaiting->V();
nWaiting--;
}
}

用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* 一个进程 */
{
lock->Acquire();
while(条件不满足){
condition->Wait(lock);
}
lock->Release();
}

/* 另一个进程 */
{
lock->Acquire();
/* 完成条件 */
condition->Signal(); // 或Broadcast
lock->Release();
}

事件栅栏EventBarrier

用来实现一组进程都到达某个地点之后再同步向前推进,而栅栏就是充当路障的作用,进程没有全部到齐时,栅栏都是关闭的。进程全部到达之后,栅栏开启。

实现

要同步的线程先调用Wait进入阻塞,等到栅栏进程调用Signal才会醒来,然后调用Complete。栅栏的Signal并不直接开启栅栏,而是等到所有进程都调用了Complete才放行。

采用一把锁lock和三个信号量waitCond,unsignaledCond,uncompleteCond

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
void EventBarrierWait(){
lock->Acquire();

nWaiting++;
while(barrierState == UNSIGNALED){
unsignaledCond->Wait(lock);
}

lock->Release();
}

void EventBarrierComplete(){
lock->Acquire();

if(--nWaiting == 0){ // 如果是最后一个调用Complete的,则唤醒栅栏
uncompleteCond->Broadcast(lock);
}
waitCond->Wait(lock); // 等待一起离开

lock->Release();
}

void EventBarrierSignal(){
if(nWaiting == 0){ // 没有在等待的,直接返回
return;
}

lock->Acquire();

// 设置状态为SIGNALED
state = SIGNALED;
// 唤醒所有阻塞于Signal的线程
unsignaledCond->Broadcast(lock);
// 阻塞于Complete,等待全部完成
uncompleteCond->Wait(lock);

// 恢复状态为UNSIGNALED
state = UNSIGNALED;
// 全部放行
waitCond->Broadcast(lock);

lock->Release();
}

用法

1
2
3
4
5
6
7
8
9
10
11
/* 同步线程 */
{
barrier->Wait();
barrier->Complete();
}

/* 栅栏线程 */
{
/* 等待一段时间 */
barrier->Signal();
}