0%

从ucore来总结操作系统(7)----互斥、同步和信号量

互斥、同步和信号量

0X00 环境准备

本lab基于前所有lab,但是需要对其进行一些扩展:

其实就只要修改一个中断函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

// kern/trap/trap.c

static void trap_dispatch(struct trapframe *tf)
{

...
case IRQ_OFFSET + IRQ_TIMER:
ticks++;
assert(current != NULL);
run_timer_list();
break;
...
}

这里将原来的函数(调度器的时间片中断函数)修改成run_timer_list。这是因为引入了定时器的原因,我们来看一下这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

// call scheduler to update tick related info, and check the timer is expired? If expired, then wakup proc
// 检查时间片中断时,是否存在timer过期,如果存在,就唤醒对应线程
void run_timer_list(void)
{
bool intr_flag;
// 关闭中断
local_intr_save(intr_flag);
{
list_entry_t *le = list_next(&timer_list);
// 找到所有的定时器,并将其递减
// 如果存在定时器过期,就调度对应的线程
if (le != &timer_list)
{
timer_t *timer = le2timer(le, timer_link);
assert(timer->expires != 0);
// 注意,只有第一个timer会--,其余的不会,这里后面会说明
timer->expires--;
while (timer->expires == 0)
{
le = list_next(le);
struct proc_struct *proc = timer->proc;
if (proc->wait_state != 0)
{
assert(proc->wait_state & WT_INTERRUPTED);
}
else
{
warn("process %d's wait_state == 0.\n", proc->pid);
}
wakeup_proc(proc);
del_timer(timer);
if (le == &timer_list)
{
break;
}
timer = le2timer(le, timer_link);
}
}
// 然后执行时间片中断函数
sched_class_proc_tick(current);
}
local_intr_restore(intr_flag);
}

注意上面只会对第一个timer--,这是由于timer的构造决定的

timer_t结构用于存储一个定时器所需要的相关数据,包括倒计时时间以及所绑定的进程。

1
2
3
4
5
6
7
8
9
10
11

typedef struct
{
// 过期时间
unsigned int expires; //the expire time
// timer对应的进程
struct proc_struct *proc; //the proc wait in this timer. If the expire time is end, then this proc will be scheduled
// timer的链表入口
list_entry_t timer_link; //the timer list
} timer_t;

而add_timer用于将某个timer添加进timer列表中。但出于性能考虑,每个新添加的timer都会按照其expires属性的大小排列,同时减去上一个timer的expires(即每个timer都是前缀,这样每次做减法的时候性能大大提升)。在更新timer_list中的所有timer的expires时,只需递减链首的第一个timer的expire,即可间接达到所有timer的expires减一的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

add_timer(timer_t *timer)
{
bool intr_flag;
local_intr_save(intr_flag);
{
assert(timer->expires > 0 && timer->proc != NULL);
assert(list_empty(&(timer->timer_link)));
list_entry_t *le = list_next(&timer_list);
// 先找到正确的位置
while (le != &timer_list)
{
timer_t *next = le2timer(le, timer_link);
if (timer->expires < next->expires)
{
next->expires -= timer->expires;
break;
}
timer->expires -= next->expires;
le = list_next(le);
}
// 插入
list_add_before(le, &(timer->timer_link));
}
local_intr_restore(intr_flag);
}

del_timer函数将timer从list中移除。将timer从timer_list中删除的操作比较简单:设置好当前待移除timer的下一个timer->expires,并将当前timer从链表中移除即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

void del_timer(timer_t *timer)
{
bool intr_flag;
local_intr_save(intr_flag);
{
if (!list_empty(&(timer->timer_link)))
{
// 当移除一个未过期的定时器时
// 需要这个未过期的定时的值添加到后面的定时器上
if (timer->expires != 0)
{
list_entry_t *le = list_next(&(timer->timer_link));
if (le != &timer_list)
{
timer_t *next = le2timer(le, timer_link);
next->expires += timer->expires;
}
}
// 否则如果是超时过期,就直接移除就行
list_del_init(&(timer->timer_link));
}
}
local_intr_restore(intr_flag);
}

定时器是相当重要的软件结构,为所有的线程提供了一个统一的超时机制。

0X01 内核级信号量

由于时间片中断可以发生在某个进程执行的任意位置,而时间片中断会引发进程调度,所以可能会引起竞争,这些操作系统书上会讲的比较详细,此处不再赘诉。

信号量

为了防止互斥,所以需要支持信号量以用来加锁,但是如果进程A在临界区消耗时间过长,则其他需要进入临界区的线程就需要不断等待,为了避免自旋等待,所以需要支持睡眠等待,因此需要引入等待队列

我们先来看一下信号量,其位于kern/sync/sem.h中,信号量的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

typedef struct
{
// 信号量的值
int value;
// 信号量对应的等待队列
wait_queue_t wait_queue;
} semaphore_t;

// 初始化一个信号量
void sem_init(semaphore_t *sem, int value);
// 释放一个信号量
void up(semaphore_t *sem);
// 获取一个信号量
void down(semaphore_t *sem);
// 测试并获取一个信号量
bool try_down(semaphore_t *sem);

初始时,信号量中的值表示可以最多同时支持多少线程进入。当执行down函数时,会尝试将value--,如果value_new >= 0则说明当前进程可以获取锁,这些操作都是原子的。如果value_new <= 0,则说明当前进程需要等待,所以会将当前进程加入到等待队列中,然后调用schedule函数进行进程调度,直到有其他进程调用up函数,将value++,然后唤醒等待队列中的进程。

原子操作

由于时间片中断可以发生在某个进程执行的任意位置,那么自然也会发生在尝试对value--的过程中,也就是一个线程在执行down函数时,另一个线程也在执行down函数,这样就会导致value的值不准确,这就引出了原子操作

在硬件上,中断信号可以发生在任意位置,但是只有CPU执行完一条指令后才会去检查中断信号,所以在执行一条指令时,中断信号是不会发生的,如果能保证我们某一任务能在一条CPU指令内完成,那么该任务对应的操作就是原子操作,原子操作能从硬件上保证不会发生竞争

当然除了CPU对应的原子操作外,还有一些软件构造的原子操作,比如先关闭中断(关闭中断是硬件上的原子操作),然后执行一些可能会引起竞争的代码,最后打开中断(同样是硬件上的原子操作),这样中间部分的代码就不会被中断打断,保证了原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92

// 初始化信号量
void sem_init(semaphore_t *sem, int value)
{
sem->value = value;
wait_queue_init(&(sem->wait_queue));
}

// 信号量的up操作
static __noinline void __up(semaphore_t *sem, uint32_t wait_state)
{
bool intr_flag;
// 保证原子性
local_intr_save(intr_flag);
{
wait_t *wait;
// 如果不存在其他需要占用信号量的进程,就对value++
if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL)
{
sem->value++;
}
// 否则,需要调度出这个等待队列中的进程,然后唤醒执行
else
{
// 保证等待队列中的对应线程的状态是wait_state
// wait_state是线程的等待原因
assert(wait->proc->wait_state == wait_state);
wakeup_wait(&(sem->wait_queue), wait, wait_state, 1);
}
}
local_intr_restore(intr_flag);
}

// 信号量的down操作
static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state)
{
bool intr_flag;
local_intr_save(intr_flag);
// 大于0直接--,然后返回
if (sem->value > 0)
{
sem->value--;
local_intr_restore(intr_flag);
return 0;
}

// 否则,需要将当前进程加入到等待队列中
wait_t __wait, *wait = &__wait;
wait_current_set(&(sem->wait_queue), wait, wait_state);
local_intr_restore(intr_flag);

// 然后调度到其他线程上
schedule();

// 如果被V操作唤醒,则把自身关联的wait从等待队列中删除
// (此过程需要先关中断,完成后开中断)
local_intr_save(intr_flag);
wait_current_del(&(sem->wait_queue), wait);
local_intr_restore(intr_flag);

// 如果等待线程的状态与预期不符,就返回(最终其实会报错)
if (wait->wakeup_flags != wait_state)
{
return wait->wakeup_flags;
}
return 0;
}

void up(semaphore_t *sem)
{
__up(sem, WT_KSEM);
}

void down(semaphore_t *sem)
{
uint32_t flags = __down(sem, WT_KSEM);
assert(flags == 0);
}

// 尝试获取信号量,不会直接阻塞
bool try_down(semaphore_t *sem)
{
bool intr_flag, ret = 0;
local_intr_save(intr_flag);
if (sem->value > 0)
{
sem->value--, ret = 1;
}
local_intr_restore(intr_flag);
return ret;
}

信号量的实现还是依赖于等待队列的实现,当信号量down时如果失败会将当前线程加入等待队列,然后调度到其他线程上,当信号量up时会从等待队列中唤醒一个等待线程。

当一个线程加入等待队列时,还需要用一个标志位来注明该线程是因为什么等待(此处是信号量等待),且当一个因信号量等待的线程在唤醒时仍然会执行_down函数中剩余的部分(也就是把自身移除等待队列)

等待队列

等待队列也是另一个重要组件,它存储了所有等待中的线程,当一个线程因为某种原因等待时,会将自身加入到等待队列中,同时等待队列也会存储该线程因为什么阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13

typedef struct
{
// 等待进程的指针
struct proc_struct *proc;
//进程被放入等待队列的原因标记
uint32_t wakeup_flags;
//指向此wait结构所属于的wait_queue
wait_queue_t *wait_queue;
//用来组织wait_queue中wait节点的连接
list_entry_t wait_link;
} wait_t;

这与run_queue类似,总体结构也十分简单。我们来看一下其相关函数,不过考虑到其函数很多,就。略过简单的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

// 初始化wait结构体
void wait_init(wait_t *wait, struct proc_struct *proc);
// 初始化wq中的队列
void wait_queue_init(wait_queue_t *queue);
// wq添加
void wait_queue_add(wait_queue_t *queue, wait_t *wait);
// wq删除
void wait_queue_del(wait_queue_t *queue, wait_t *wait);

// wq查找操作
wait_t *wait_queue_next(wait_queue_t *queue, wait_t *wait);
wait_t *wait_queue_prev(wait_queue_t *queue, wait_t *wait);
wait_t *wait_queue_first(wait_queue_t *queue);
wait_t *wait_queue_last(wait_queue_t *queue);

// wq判断操作
bool wait_queue_empty(wait_queue_t *queue);
bool wait_in_queue(wait_t *wait);

// 唤醒与wait关联的进程
void wakeup_wait(wait_queue_t *queue, wait_t *wait, uint32_t wakeup_flags, bool del);
// 唤醒wq第一个线程
void wakeup_first(wait_queue_t *queue, uint32_t wakeup_flags, bool del);
// 唤醒wq所有线程
void wakeup_queue(wait_queue_t *queue, uint32_t wakeup_flags, bool del);

//让 wait与进程关联,且让当前进程关联的wait进入等待队列queue,当前进程睡眠
void wait_current_set(wait_queue_t *queue, wait_t *wait, uint32_t wait_state);

下面两张是调用关系图:

0X02 内核级条件变量和管程

之前实现的是信号量,通过信号量能实现简单的线程通信,但是存在以下的局限:

  • 信号量有点类似于资源的总数,在生产者-消费者这样的模式下存在同步问题(比如消费者先获取信号量,生产者无法获取,因此无法通知消费者。当然更复杂的逻辑可以实现生-消模式,但是复杂)
  • 信号量一次只能唤醒一个线程,但有时我们需要广播(比如actor模式)
  • 信号量内部定义value限制了其局限性,只能对int类型资源同步,但有时想要同步其他类型的资源(比如a队列和b队列长度相等)

但是,条件变量是基于信号量实现的,可以看成是对信号量的进一步封装。信号量=条件变量+互斥锁

管程是一种程序结构,该结构内多个子程序形成多个线程互斥访问共享资源。

条件变量和管程在kern/sync/monitor.c中实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

typedef struct monitor monitor_t;

// 条件变量
typedef struct condvar
{
// 信号量,用于等待和唤醒
semaphore_t sem; // the sem semaphore is used to down the waiting proc, and the signaling proc should up the waiting proc
// 条件变量中等待的线程数
int count; // the number of waiters on condvar
// 指向拥有该条件变量的管程
monitor_t * owner; // the owner(monitor) of this condvar
} condvar_t;

// 管程
typedef struct monitor
{
// 管程锁,值应为1
semaphore_t mutex; // the mutex lock for going into the routines in monitor, should be initialized to 1
// 比较复杂,下面说明
semaphore_t next; // the next semaphore is used to down the signaling proc itself, and the other OR wakeuped waiting proc should wake up the sleeped signaling proc.
// 和next一起用
int next_count; // the number of of sleeped signaling proc
// 条件变量(数组)
condvar_t *cv; // the condvars in monitor
} monitor_t;

管程中的成员变量mutex是一个二值信号量,是实现每次只允许一个进程进入管程的关键元素,确保了互斥访问性质。管程中的条件变量cv,某线程通过执行wait_cv,会使得等待某个条件Cond为真的进程能够离开管程并睡眠,且让其他进程进入管程继续执行;而进入管程的某进程设置条件Cond为真并执行signal_cv时,能够让等待某个条件Cond为真的睡眠进程被唤醒,从而继续进入管程中执行。

管程中的成员变量信号量next和整型变量next_count是配合进程对条件变量cv的操作而设置的,这是由于发出signal_cv的进程A会唤醒由于wait_cv而睡眠的进程B,由于管程中只允许一个进程运行,所以进程B执行会导致唤醒进程B的进程A睡眠,直到进程B离开管程,进程A才能继续执行,这个同步过程是通过信号量next完成的;而next_count表示了由于发出singal_cv(没写错)而睡眠的进程个数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

// Initialize monitor.
// 初始化管程
void monitor_init(monitor_t *mtp, size_t num_cv)
{
int i;
assert(num_cv > 0);
mtp->next_count = 0;
mtp->cv = NULL;
// 初始化管程信号量
sem_init(&(mtp->mutex), 1); // unlocked
sem_init(&(mtp->next), 0); // 注意next为0,表示next资源为0
// 分配当前管程内的条件变量
mtp->cv = (condvar_t *)kmalloc(sizeof(condvar_t) * num_cv);
assert(mtp->cv != NULL);
// 初始化条件变量
for (i = 0; i < num_cv; i++)
{
mtp->cv[i].count = 0;
// 初始化条件变量信号量
sem_init(&(mtp->cv[i].sem), 0);
mtp->cv[i].owner = mtp;
}
}

// Unlock one of threads waiting on the condition variable.
// 当某个线程准备离开临界区、准备释放对应的条件变量时
// 线程会执行函数cond_signal
void cond_signal(condvar_t *cvp)
{
// LAB7 EXERCISE1: YOUR CODE
cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
/*
* cond_signal(cv) {
* if(cv.count>0) {
* mt.next_count ++;
* signal(cv.sem);
* wait(mt.next);
* mt.next_count--;
* }
* }
*/
// 如果该条件变量中等待的线程数大于0
if (cvp->count > 0)
{
// cvp->owner是管程,管程中等待的线程数+1(这个+1表示自身将进入等待)
cvp->owner->next_count++;
// 条件变量中信号量+1,表示解锁一个等待的线程(可能会唤醒一个线程)
up(&(cvp->sem));
// 管程中next信号量-1,由于next初始值为0,所以当前线程会立刻阻塞
// 这是由于上一步唤醒了一个线程,而管程不允许两个线程运行
// 所以需要挂起当前线程
down(&(cvp->owner->next));
// 管程中等待的线程数-1(表示自身已经离开等待)
cvp->owner->next_count--;
}
cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}

// Suspend calling thread on a condition variable waiting for condition Atomically unlocks
// mutex and suspends calling thread on conditional variable after waking up locks mutex. Notice: mp is mutex semaphore for monitor's procedures
// 当某个线程需要等待锁时,则会执行cond_wait函数
void cond_wait(condvar_t *cvp)
{
// LAB7 EXERCISE1: YOUR CODE
cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
/*
* cv.count ++;
* if(mt.next_count>0)
* signal(mt.next)
* else
* signal(mt.mutex);
* wait(cv.sem);
* cv.count --;
*/
// 表示自身因等待条件变量而阻塞,因此记录在条件变量中
cvp->count++;
// 如果管程中等待的线程数大于0
if (cvp->owner->next_count > 0)
// 那么就唤醒一个等待的线程
up(&(cvp->owner->next));
else
// 否则释放互斥锁
up(&(cvp->owner->mutex));
// 阻塞自身
down(&(cvp->sem));
// 至此说明阻塞结束了,因此条件变量中等待的线程数-1
cvp->count--;
cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n", cvp, cvp->count, cvp->owner->next_count);
}

由于条件变量依附于管程,而管程和条件变量信号量比较多,下面一一说明:

  • 条件变量内的信号量:用于唤醒等待该条件变量的线程
  • 条件变量的count:用于记录等待该条件变量的线程数
  • 管程内的mutex信号量:用于保证管程中只有一个线程在运行
  • 管程内的next信号量:用于快速阻塞调用signal_cv的线程,以便唤醒等待该条件变量的线程,从而保证管程只有一个线程在运行
  • 管程内的next_count:用于记录因调用signal_cv而阻塞的线程数

信号量next的初始值为0,所以任何尝试访问该信号量的线程都会阻塞,这是一个用于快速阻塞自身的信号量。一个线程调用signal_cv时,由于会唤醒其他线程,不能保证管程一个线程的限制,所以需要next来阻塞自身。

当一个线程调用wait_cv,等不到条件变量时会阻塞自身,因此先看看管程中是否存在因调用signal_cv而阻塞线程,要是有的话,就直接唤醒一个,由于此时锁没有被释放,而是相当"转移",所以管程仍然能正常工作。当然,如果没有的话,就释放锁,把管程解放出来,然后阻塞自身,等待条件变量的信号量。

为了让整个管程正常运行,还需在管程中的每个函数的入口和出口增加相关操作,即:

1
2
3
4
5
6
7
8
9
10
11
12
13

void monitorFunc()
{
down(&(mtp->mutex));
//--------into routine in monitor--------------
// ...
//--------leave routine in monitor--------------
if(mtp->next_count>0)
up(&(mtp->next));
else
up(&(mtp->mutex));
}

这样做的好处有两个:

  • 只有一个进程在执行管程中的函数。
  • 避免由于执行了cond_signal函数而睡眠的进程无法被唤醒:
    • 管程中wait和signal函数的调用存在时间顺序。例如:当线程1先调用signal唤醒线程2并将自身线程挂起后,线程2在开始执行时将无法唤醒原先的在signal中挂起的线程1。
    • 也就是说,只要存在线程在管程中执行了signal,那么至少存在一个线程在管程中被挂起。
    • 此时,就只能在临界区外唤醒挂起的线程1,而这一步在代码中也得到了实现。

0X03 总结

通过这个lab7,我们学习了以下内容:

  • 软件定时器的实现与功能
  • 信号量的实现方式(基于等待队列)
  • 条件变量和管程(基于信号量)