互斥、同步和信号量
0X00 环境准备
本lab基于前所有lab,但是需要对其进行一些扩展:
其实就只要修改一个中断函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static void trap_dispatch (struct trapframe *tf) { ... case IRQ_OFFSET + IRQ_TIMER: ticks++; assert(current != NULL ); run_timer_list(); break ; ... }
这里将原来的函数(调度器的时间片中断函数)修改成run_timer_list
。这是因为引入了定时器的原因,我们来看一下这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void run_timer_list (void ) { bool intr_flag; local_intr_save(intr_flag); { list_entry_t *le = list_next(&timer_list); if (le != &timer_list) { timer_t *timer = le2timer(le, timer_link); assert(timer->expires != 0 ); timer->expires--; while (timer->expires == 0 ) { le = list_next(le); struct proc_struct *proc = timer->proc; if (proc->wait_state != 0 ) { assert(proc->wait_state & WT_INTERRUPTED); } else { warn("process %d's wait_state == 0.\n" , proc->pid); } wakeup_proc(proc); del_timer(timer); if (le == &timer_list) { break ; } timer = le2timer(le, timer_link); } } sched_class_proc_tick(current); } local_intr_restore(intr_flag); }
注意上面只会对第一个timer--,这是由于timer的构造决定的
timer_t结构用于存储一个定时器所需要的相关数据,包括倒计时时间以及所绑定的进程。
1 2 3 4 5 6 7 8 9 10 11 typedef struct { unsigned int expires; struct proc_struct *proc ; list_entry_t timer_link; } timer_t ;
而add_timer用于将某个timer添加进timer列表中。但出于性能考虑,每个新添加的timer都会按照其expires属性的大小排列,同时减去上一个timer的expires(即每个timer都是前缀,这样每次做减法的时候性能大大提升)。在更新timer_list中的所有timer的expires时,只需递减链首的第一个timer的expire,即可间接达到所有timer的expires减一的目的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 add_timer(timer_t *timer) { bool intr_flag; local_intr_save(intr_flag); { assert(timer->expires > 0 && timer->proc != NULL ); assert(list_empty(&(timer->timer_link))); list_entry_t *le = list_next(&timer_list); while (le != &timer_list) { timer_t *next = le2timer(le, timer_link); if (timer->expires < next->expires) { next->expires -= timer->expires; break ; } timer->expires -= next->expires; le = list_next(le); } list_add_before(le, &(timer->timer_link)); } local_intr_restore(intr_flag); }
del_timer函数将timer从list中移除。将timer从timer_list中删除的操作比较简单:设置好当前待移除timer的下一个timer->expires,并将当前timer从链表中移除即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void del_timer (timer_t *timer) { bool intr_flag; local_intr_save(intr_flag); { if (!list_empty(&(timer->timer_link))) { if (timer->expires != 0 ) { list_entry_t *le = list_next(&(timer->timer_link)); if (le != &timer_list) { timer_t *next = le2timer(le, timer_link); next->expires += timer->expires; } } list_del_init(&(timer->timer_link)); } } local_intr_restore(intr_flag); }
定时器是相当重要的软件结构,为所有的线程提供了一个统一的超时机制。
0X01 内核级信号量
由于时间片中断可以发生在某个进程执行的任意位置,而时间片中断会引发进程调度,所以可能会引起竞争,这些操作系统书上会讲的比较详细,此处不再赘诉。
信号量
为了防止互斥,所以需要支持信号量 以用来加锁,但是如果进程A在临界区消耗时间过长,则其他需要进入临界区的线程就需要不断等待,为了避免自旋等待 ,所以需要支持睡眠等待 ,因此需要引入等待队列 。
我们先来看一下信号量,其位于kern/sync/sem.h
中,信号量的结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 typedef struct { int value; wait_queue_t wait_queue; } semaphore_t ; void sem_init (semaphore_t *sem, int value) ;void up (semaphore_t *sem) ;void down (semaphore_t *sem) ;bool try_down (semaphore_t *sem) ;
初始时,信号量中的值表示可以最多同时支持多少线程进入。当执行down
函数时,会尝试将value--,如果value_new
>=
0则说明当前进程可以获取锁,这些操作都是原子的 。如果value_new
<=
0,则说明当前进程需要等待,所以会将当前进程加入到等待队列中,然后调用schedule
函数进行进程调度,直到有其他进程调用up
函数,将value++,然后唤醒等待队列中的进程。
原子操作
由于时间片中断可以发生在某个进程执行的任意位置,那么自然也会发生在尝试对value--的过程中,也就是一个线程在执行down
函数时,另一个线程也在执行down
函数,这样就会导致value的值不准确,这就引出了原子操作 。
在硬件上,中断信号可以发生在任意位置,但是只有CPU执行完一条指令后才会去检查中断信号,所以在执行一条指令时,中断信号是不会发生的,如果能保证我们某一任务能在一条CPU指令内完成,那么该任务对应的操作就是原子操作,原子操作能从硬件上保证不会发生竞争
当然除了CPU对应的原子操作外,还有一些软件构造的原子操作 ,比如先关闭中断(关闭中断是硬件上的原子操作),然后执行一些可能会引起竞争的代码,最后打开中断(同样是硬件上的原子操作),这样中间部分的代码就不会被中断打断,保证了原子性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 void sem_init (semaphore_t *sem, int value) { sem->value = value; wait_queue_init(&(sem->wait_queue)); } static __noinline void __up(semaphore_t *sem, uint32_t wait_state){ bool intr_flag; local_intr_save(intr_flag); { wait_t *wait; if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL ) { sem->value++; } else { assert(wait->proc->wait_state == wait_state); wakeup_wait(&(sem->wait_queue), wait, wait_state, 1 ); } } local_intr_restore(intr_flag); } static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state){ bool intr_flag; local_intr_save(intr_flag); if (sem->value > 0 ) { sem->value--; local_intr_restore(intr_flag); return 0 ; } wait_t __wait, *wait = &__wait; wait_current_set(&(sem->wait_queue), wait, wait_state); local_intr_restore(intr_flag); schedule(); local_intr_save(intr_flag); wait_current_del(&(sem->wait_queue), wait); local_intr_restore(intr_flag); if (wait->wakeup_flags != wait_state) { return wait->wakeup_flags; } return 0 ; } void up (semaphore_t *sem) { __up(sem, WT_KSEM); } void down (semaphore_t *sem) { uint32_t flags = __down(sem, WT_KSEM); assert(flags == 0 ); } bool try_down (semaphore_t *sem) { bool intr_flag, ret = 0 ; local_intr_save(intr_flag); if (sem->value > 0 ) { sem->value--, ret = 1 ; } local_intr_restore(intr_flag); return ret; }
信号量的实现还是依赖于等待队列的实现,当信号量down时如果失败会将当前线程加入等待队列,然后调度到其他线程上,当信号量up时会从等待队列中唤醒一个等待线程。
当一个线程加入等待队列时,还需要用一个标志位来注明该线程是因为什么等待 (此处是信号量等待),且当一个因信号量等待的线程在唤醒时仍然会执行_down函数中剩余的部分(也就是把自身移除等待队列) 。
等待队列
等待队列也是另一个重要组件,它存储了所有等待中的线程,当一个线程因为某种原因等待时,会将自身加入到等待队列中,同时等待队列也会存储该线程因为什么阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 typedef struct { struct proc_struct *proc ; uint32_t wakeup_flags; wait_queue_t *wait_queue; list_entry_t wait_link; } wait_t ;
这与run_queue类似,总体结构也十分简单。我们来看一下其相关函数,不过考虑到其函数很多,就。略过简单的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void wait_init (wait_t *wait, struct proc_struct *proc) ;void wait_queue_init (wait_queue_t *queue ) ;void wait_queue_add (wait_queue_t *queue , wait_t *wait) ;void wait_queue_del (wait_queue_t *queue , wait_t *wait) ;wait_t *wait_queue_next (wait_queue_t *queue , wait_t *wait) ;wait_t *wait_queue_prev (wait_queue_t *queue , wait_t *wait) ;wait_t *wait_queue_first (wait_queue_t *queue ) ;wait_t *wait_queue_last (wait_queue_t *queue ) ;bool wait_queue_empty (wait_queue_t *queue ) ;bool wait_in_queue (wait_t *wait) ;void wakeup_wait (wait_queue_t *queue , wait_t *wait, uint32_t wakeup_flags, bool del) ;void wakeup_first (wait_queue_t *queue , uint32_t wakeup_flags, bool del) ;void wakeup_queue (wait_queue_t *queue , uint32_t wakeup_flags, bool del) ;void wait_current_set (wait_queue_t *queue , wait_t *wait, uint32_t wait_state) ;
下面两张是调用关系图:
0X02 内核级条件变量和管程
之前实现的是信号量,通过信号量能实现简单的线程通信,但是存在以下的局限:
信号量有点类似于资源的总数,在生产者-消费者这样的模式下存在同步问题(比如消费者先获取信号量,生产者无法获取,因此无法通知消费者。当然更复杂的逻辑可以实现生-消模式,但是复杂 )
信号量一次只能唤醒一个线程,但有时我们需要广播(比如actor模式)
信号量内部定义value限制了其局限性,只能对int类型资源同步,但有时想要同步其他类型的资源(比如a队列和b队列长度相等)
但是,条件变量是基于信号量实现的,可以看成是对信号量的进一步封装。信号量=条件变量+互斥锁
管程是一种程序结构,该结构内多个子程序形成多个线程互斥访问共享资源。
条件变量和管程在kern/sync/monitor.c
中实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 typedef struct monitor monitor_t ;typedef struct condvar { semaphore_t sem; int count; monitor_t * owner; } condvar_t ; typedef struct monitor { semaphore_t mutex; semaphore_t next; int next_count; condvar_t *cv; } monitor_t ;
管程中的成员变量mutex是一个二值信号量,是实现每次只允许一个进程进入管程的关键元素,确保了互斥访问性质 。管程中的条件变量cv,某线程通过执行wait_cv,会使得等待某个条件Cond为真的进程能够离开管程并睡眠,且让其他进程进入管程继续执行 ;而进入管程的某进程设置条件Cond为真并执行signal_cv时,能够让等待某个条件Cond为真的睡眠进程被唤醒 ,从而继续进入管程中执行。
管程中的成员变量信号量next 和整型变量next_count 是配合进程对条件变量cv的操作而设置的,这是由于发出signal_cv的进程A会唤醒由于wait_cv而睡眠的进程B,由于管程中只允许一个进程运行,所以进程B执行会导致唤醒进程B的进程A睡眠,直到进程B离开管程,进程A才能继续执行,这个同步过程是通过信号量next完成的;而next_count表示了由于发出singal_cv(没写错)而睡眠的进程个数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 void monitor_init (monitor_t *mtp, size_t num_cv) { int i; assert(num_cv > 0 ); mtp->next_count = 0 ; mtp->cv = NULL ; sem_init(&(mtp->mutex), 1 ); sem_init(&(mtp->next), 0 ); mtp->cv = (condvar_t *)kmalloc(sizeof (condvar_t ) * num_cv); assert(mtp->cv != NULL ); for (i = 0 ; i < num_cv; i++) { mtp->cv[i].count = 0 ; sem_init(&(mtp->cv[i].sem), 0 ); mtp->cv[i].owner = mtp; } } void cond_signal (condvar_t *cvp) { cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count); if (cvp->count > 0 ) { cvp->owner->next_count++; up(&(cvp->sem)); down(&(cvp->owner->next)); cvp->owner->next_count--; } cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count); } void cond_wait (condvar_t *cvp) { cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count); cvp->count++; if (cvp->owner->next_count > 0 ) up(&(cvp->owner->next)); else up(&(cvp->owner->mutex)); down(&(cvp->sem)); cvp->count--; cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count); }
由于条件变量依附于管程,而管程和条件变量信号量比较多,下面一一说明:
条件变量内的信号量:用于唤醒等待该条件变量 的线程
条件变量的count:用于记录等待该条件变量的线程数
管程内的mutex信号量:用于保证管程中只有一个线程在运行
管程内的next信号量:用于快速阻塞调用signal_cv的线程,以便唤醒等待该条件变量的线程,从而保证管程只有一个线程在运行
管程内的next_count:用于记录因调用signal_cv而阻塞的线程数
信号量next的初始值为0,所以任何尝试访问该信号量的线程都会阻塞,这是一个用于快速阻塞自身的信号量。一个线程调用signal_cv时,由于会唤醒其他线程,不能保证管程一个线程的限制,所以需要next来阻塞自身。
当一个线程调用wait_cv,等不到条件变量时会阻塞自身,因此先看看管程中是否存在因调用signal_cv而阻塞线程,要是有的话,就直接唤醒一个,由于此时锁没有被释放,而是相当"转移",所以管程仍然能正常工作。当然,如果没有的话,就释放锁,把管程解放出来,然后阻塞自身,等待条件变量的信号量。
为了让整个管程正常运行,还需在管程中的每个函数的入口和出口增加相关操作,即:
1 2 3 4 5 6 7 8 9 10 11 12 13 void monitorFunc () { down(&(mtp->mutex)); if (mtp->next_count>0 ) up(&(mtp->next)); else up(&(mtp->mutex)); }
这样做的好处有两个:
只有一个进程在执行管程中的函数。
避免由于执行了cond_signal函数而睡眠的进程无法被唤醒:
管程中wait和signal函数的调用存在时间顺序。例如:当线程1先调用signal唤醒线程2并将自身线程挂起后,线程2在开始执行时将无法唤醒原先的在signal中挂起的线程1。
也就是说,只要存在线程在管程中执行了signal,那么至少存在一个线程在管程中被挂起。
此时,就只能在临界区外唤醒挂起的线程1,而这一步在代码中也得到了实现。
0X03 总结
通过这个lab7,我们学习了以下内容:
软件定时器的实现与功能
信号量的实现方式(基于等待队列)
条件变量和管程(基于信号量)