互斥、同步和信号量 
 
0X00 环境准备 
本lab基于前所有lab,但是需要对其进行一些扩展:
其实就只要修改一个中断函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static  void  trap_dispatch (struct  trapframe *tf) {     ...      case  IRQ_OFFSET + IRQ_TIMER:     ticks++;     assert(current != NULL );     run_timer_list();     break ;     ... } 
 
这里将原来的函数(调度器的时间片中断函数)修改成run_timer_list。这是因为引入了定时器的原因,我们来看一下这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 void  run_timer_list (void ) {     bool  intr_flag;          local_intr_save(intr_flag);     {         list_entry_t  *le = list_next(&timer_list);                           if  (le != &timer_list)         {             timer_t  *timer = le2timer(le, timer_link);             assert(timer->expires != 0 );                          timer->expires--;             while  (timer->expires == 0 )             {                 le = list_next(le);                 struct  proc_struct  *proc  =  timer->proc;                 if  (proc->wait_state != 0 )                 {                     assert(proc->wait_state & WT_INTERRUPTED);                 }                 else                  {                     warn("process %d's wait_state == 0.\n" , proc->pid);                 }                 wakeup_proc(proc);                 del_timer(timer);                 if  (le == &timer_list)                 {                     break ;                 }                 timer = le2timer(le, timer_link);             }         }                  sched_class_proc_tick(current);     }     local_intr_restore(intr_flag); } 
 
            注意上面只会对第一个timer--,这是由于timer的构造决定的 
timer_t结构用于存储一个定时器所需要的相关数据,包括倒计时时间以及所绑定的进程。
1 2 3 4 5 6 7 8 9 10 11 typedef  struct  {          unsigned  int  expires;                 struct  proc_struct  *proc ;              list_entry_t  timer_link;     } timer_t ; 
 
而add_timer用于将某个timer添加进timer列表中。但出于性能考虑,每个新添加的timer都会按照其expires属性的大小排列,同时减去上一个timer的expires(即每个timer都是前缀,这样每次做减法的时候性能大大提升)。在更新timer_list中的所有timer的expires时,只需递减链首的第一个timer的expire,即可间接达到所有timer的expires减一的目的。 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 add_timer(timer_t  *timer) {     bool  intr_flag;     local_intr_save(intr_flag);     {         assert(timer->expires > 0  && timer->proc != NULL );         assert(list_empty(&(timer->timer_link)));         list_entry_t  *le = list_next(&timer_list);                  while  (le != &timer_list)         {             timer_t  *next = le2timer(le, timer_link);             if  (timer->expires < next->expires)             {                 next->expires -= timer->expires;                 break ;             }             timer->expires -= next->expires;             le = list_next(le);         }                  list_add_before(le, &(timer->timer_link));     }     local_intr_restore(intr_flag); } 
 
del_timer函数将timer从list中移除。将timer从timer_list中删除的操作比较简单:设置好当前待移除timer的下一个timer->expires,并将当前timer从链表中移除即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void  del_timer (timer_t  *timer) {     bool  intr_flag;     local_intr_save(intr_flag);     {         if  (!list_empty(&(timer->timer_link)))         {                                       if  (timer->expires != 0 )             {                 list_entry_t  *le = list_next(&(timer->timer_link));                 if  (le != &timer_list)                 {                     timer_t  *next = le2timer(le, timer_link);                     next->expires += timer->expires;                 }             }                          list_del_init(&(timer->timer_link));         }     }     local_intr_restore(intr_flag); } 
 
定时器是相当重要的软件结构,为所有的线程提供了一个统一的超时机制。 
           
0X01 内核级信号量 
由于时间片中断可以发生在某个进程执行的任意位置,而时间片中断会引发进程调度,所以可能会引起竞争,这些操作系统书上会讲的比较详细,此处不再赘诉。
信号量 
为了防止互斥,所以需要支持信号量 以用来加锁,但是如果进程A在临界区消耗时间过长,则其他需要进入临界区的线程就需要不断等待,为了避免自旋等待 ,所以需要支持睡眠等待 ,因此需要引入等待队列 。
我们先来看一下信号量,其位于kern/sync/sem.h中,信号量的结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 typedef  struct  {          int  value;          wait_queue_t  wait_queue; } semaphore_t ; void  sem_init (semaphore_t  *sem, int  value) ;void  up (semaphore_t  *sem) ;void  down (semaphore_t  *sem) ;bool  try_down (semaphore_t  *sem) ;
 
初始时,信号量中的值表示可以最多同时支持多少线程进入。当执行down函数时,会尝试将value--,如果value_new
>=
0则说明当前进程可以获取锁,这些操作都是原子的 。如果value_new
<=
0,则说明当前进程需要等待,所以会将当前进程加入到等待队列中,然后调用schedule函数进行进程调度,直到有其他进程调用up函数,将value++,然后唤醒等待队列中的进程。
            原子操作 
由于时间片中断可以发生在某个进程执行的任意位置,那么自然也会发生在尝试对value--的过程中,也就是一个线程在执行down函数时,另一个线程也在执行down函数,这样就会导致value的值不准确,这就引出了原子操作 。
在硬件上,中断信号可以发生在任意位置,但是只有CPU执行完一条指令后才会去检查中断信号,所以在执行一条指令时,中断信号是不会发生的,如果能保证我们某一任务能在一条CPU指令内完成,那么该任务对应的操作就是原子操作,原子操作能从硬件上保证不会发生竞争 
当然除了CPU对应的原子操作外,还有一些软件构造的原子操作 ,比如先关闭中断(关闭中断是硬件上的原子操作),然后执行一些可能会引起竞争的代码,最后打开中断(同样是硬件上的原子操作),这样中间部分的代码就不会被中断打断,保证了原子性。
           
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 void  sem_init (semaphore_t  *sem, int  value) {     sem->value = value;     wait_queue_init(&(sem->wait_queue)); } static  __noinline void  __up(semaphore_t  *sem, uint32_t  wait_state){     bool  intr_flag;          local_intr_save(intr_flag);     {         wait_t  *wait;                  if  ((wait = wait_queue_first(&(sem->wait_queue))) == NULL )         {             sem->value++;         }                  else          {                                       assert(wait->proc->wait_state == wait_state);             wakeup_wait(&(sem->wait_queue), wait, wait_state, 1 );         }     }     local_intr_restore(intr_flag); } static  __noinline uint32_t  __down(semaphore_t  *sem, uint32_t  wait_state){     bool  intr_flag;     local_intr_save(intr_flag);          if  (sem->value > 0 )     {         sem->value--;         local_intr_restore(intr_flag);         return  0 ;     }          wait_t  __wait, *wait = &__wait;     wait_current_set(&(sem->wait_queue), wait, wait_state);     local_intr_restore(intr_flag);          schedule();               local_intr_save(intr_flag);     wait_current_del(&(sem->wait_queue), wait);     local_intr_restore(intr_flag);          if  (wait->wakeup_flags != wait_state)     {         return  wait->wakeup_flags;     }     return  0 ; } void  up (semaphore_t  *sem) {     __up(sem, WT_KSEM); } void  down (semaphore_t  *sem) {     uint32_t  flags = __down(sem, WT_KSEM);     assert(flags == 0 ); } bool  try_down (semaphore_t  *sem) {     bool  intr_flag, ret = 0 ;     local_intr_save(intr_flag);     if  (sem->value > 0 )     {         sem->value--, ret = 1 ;     }     local_intr_restore(intr_flag);     return  ret; } 
 
信号量的实现还是依赖于等待队列的实现,当信号量down时如果失败会将当前线程加入等待队列,然后调度到其他线程上,当信号量up时会从等待队列中唤醒一个等待线程。
当一个线程加入等待队列时,还需要用一个标志位来注明该线程是因为什么等待 (此处是信号量等待),且当一个因信号量等待的线程在唤醒时仍然会执行_down函数中剩余的部分(也就是把自身移除等待队列) 。
等待队列 
等待队列也是另一个重要组件,它存储了所有等待中的线程,当一个线程因为某种原因等待时,会将自身加入到等待队列中,同时等待队列也会存储该线程因为什么阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 typedef   struct  {          struct  proc_struct  *proc ;              uint32_t  wakeup_flags;                  wait_queue_t  *wait_queue;               list_entry_t  wait_link;        } wait_t ; 
 
这与run_queue类似,总体结构也十分简单。我们来看一下其相关函数,不过考虑到其函数很多,就。略过简单的函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void  wait_init (wait_t  *wait, struct  proc_struct *proc) ;void  wait_queue_init (wait_queue_t  *queue ) ;void  wait_queue_add (wait_queue_t  *queue , wait_t  *wait) ;void  wait_queue_del (wait_queue_t  *queue , wait_t  *wait) ;wait_t  *wait_queue_next (wait_queue_t  *queue , wait_t  *wait) ;wait_t  *wait_queue_prev (wait_queue_t  *queue , wait_t  *wait) ;wait_t  *wait_queue_first (wait_queue_t  *queue ) ;wait_t  *wait_queue_last (wait_queue_t  *queue ) ;bool  wait_queue_empty (wait_queue_t  *queue ) ;bool  wait_in_queue (wait_t  *wait) ;void  wakeup_wait (wait_queue_t  *queue , wait_t  *wait, uint32_t  wakeup_flags, bool  del) ;void  wakeup_first (wait_queue_t  *queue , uint32_t  wakeup_flags, bool  del) ;void  wakeup_queue (wait_queue_t  *queue , uint32_t  wakeup_flags, bool  del) ;void  wait_current_set (wait_queue_t  *queue , wait_t  *wait, uint32_t  wait_state) ;
 
下面两张是调用关系图:
 
 
0X02 内核级条件变量和管程 
之前实现的是信号量,通过信号量能实现简单的线程通信,但是存在以下的局限:
信号量有点类似于资源的总数,在生产者-消费者这样的模式下存在同步问题(比如消费者先获取信号量,生产者无法获取,因此无法通知消费者。当然更复杂的逻辑可以实现生-消模式,但是复杂 ) 
信号量一次只能唤醒一个线程,但有时我们需要广播(比如actor模式) 
信号量内部定义value限制了其局限性,只能对int类型资源同步,但有时想要同步其他类型的资源(比如a队列和b队列长度相等) 
 
但是,条件变量是基于信号量实现的,可以看成是对信号量的进一步封装。信号量=条件变量+互斥锁 
管程是一种程序结构,该结构内多个子程序形成多个线程互斥访问共享资源。
条件变量和管程在kern/sync/monitor.c中实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 typedef  struct  monitor  monitor_t ;typedef  struct  condvar {          semaphore_t  sem;                  int  count;                        monitor_t  * owner;       } condvar_t ; typedef  struct  monitor {          semaphore_t  mutex;                semaphore_t  next;                 int  next_count;                   condvar_t  *cv;           } monitor_t ; 
 
管程中的成员变量mutex是一个二值信号量,是实现每次只允许一个进程进入管程的关键元素,确保了互斥访问性质 。管程中的条件变量cv,某线程通过执行wait_cv,会使得等待某个条件Cond为真的进程能够离开管程并睡眠,且让其他进程进入管程继续执行 ;而进入管程的某进程设置条件Cond为真并执行signal_cv时,能够让等待某个条件Cond为真的睡眠进程被唤醒 ,从而继续进入管程中执行。
管程中的成员变量信号量next 和整型变量next_count 是配合进程对条件变量cv的操作而设置的,这是由于发出signal_cv的进程A会唤醒由于wait_cv而睡眠的进程B,由于管程中只允许一个进程运行,所以进程B执行会导致唤醒进程B的进程A睡眠,直到进程B离开管程,进程A才能继续执行,这个同步过程是通过信号量next完成的;而next_count表示了由于发出singal_cv(没写错)而睡眠的进程个数。 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 void  monitor_init (monitor_t  *mtp, size_t  num_cv) {     int  i;     assert(num_cv > 0 );     mtp->next_count = 0 ;     mtp->cv = NULL ;          sem_init(&(mtp->mutex), 1 );      sem_init(&(mtp->next), 0 );            mtp->cv = (condvar_t  *)kmalloc(sizeof (condvar_t ) * num_cv);     assert(mtp->cv != NULL );          for  (i = 0 ; i < num_cv; i++)     {         mtp->cv[i].count = 0 ;                  sem_init(&(mtp->cv[i].sem), 0 );         mtp->cv[i].owner = mtp;     } } void  cond_signal (condvar_t  *cvp) {          cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count);               if  (cvp->count > 0 )     {                  cvp->owner->next_count++;                  up(&(cvp->sem));                                    down(&(cvp->owner->next));                  cvp->owner->next_count--;     }     cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count); } void  cond_wait (condvar_t  *cvp) {          cprintf("cond_wait begin:  cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count);               cvp->count++;          if  (cvp->owner->next_count > 0 )                  up(&(cvp->owner->next));     else                   up(&(cvp->owner->mutex));          down(&(cvp->sem));          cvp->count--;     cprintf("cond_wait end:  cvp %x, cvp->count %d, cvp->owner->next_count %d\n" , cvp, cvp->count, cvp->owner->next_count); } 
 
由于条件变量依附于管程,而管程和条件变量信号量比较多,下面一一说明:
条件变量内的信号量:用于唤醒等待该条件变量 的线程 
条件变量的count:用于记录等待该条件变量的线程数  
管程内的mutex信号量:用于保证管程中只有一个线程在运行  
管程内的next信号量:用于快速阻塞调用signal_cv的线程,以便唤醒等待该条件变量的线程,从而保证管程只有一个线程在运行  
管程内的next_count:用于记录因调用signal_cv而阻塞的线程数  
 
信号量next的初始值为0,所以任何尝试访问该信号量的线程都会阻塞,这是一个用于快速阻塞自身的信号量。一个线程调用signal_cv时,由于会唤醒其他线程,不能保证管程一个线程的限制,所以需要next来阻塞自身。
当一个线程调用wait_cv,等不到条件变量时会阻塞自身,因此先看看管程中是否存在因调用signal_cv而阻塞线程,要是有的话,就直接唤醒一个,由于此时锁没有被释放,而是相当"转移",所以管程仍然能正常工作。当然,如果没有的话,就释放锁,把管程解放出来,然后阻塞自身,等待条件变量的信号量。
为了让整个管程正常运行,还需在管程中的每个函数的入口和出口增加相关操作,即:
1 2 3 4 5 6 7 8 9 10 11 12 13 void  monitorFunc ()  {      down(&(mtp->mutex));              if (mtp->next_count>0 )          up(&(mtp->next));       else           up(&(mtp->mutex)); } 
 
这样做的好处有两个:
只有一个进程在执行管程中的函数。 
避免由于执行了cond_signal函数而睡眠的进程无法被唤醒:
管程中wait和signal函数的调用存在时间顺序。例如:当线程1先调用signal唤醒线程2并将自身线程挂起后,线程2在开始执行时将无法唤醒原先的在signal中挂起的线程1。 
也就是说,只要存在线程在管程中执行了signal,那么至少存在一个线程在管程中被挂起。 
此时,就只能在临界区外唤醒挂起的线程1,而这一步在代码中也得到了实现。 
  
 
0X03 总结 
通过这个lab7,我们学习了以下内容:
软件定时器的实现与功能 
信号量的实现方式(基于等待队列) 
条件变量和管程(基于信号量)