本次的实验总体都不是很难,第一个练习让我们在用户态模拟了线程的切换,这里重要的就是进程/线程上下文的保存与恢复;第二三个练习则是让我们跳出了xv6,去熟悉pthread库和线程的同步互斥。

简单分析xv6中的进程切换

init是用户态最先启动的进程,其启动后会创建sh进程,sh进程又会fork并exec其他命令:

  1. 内核会在userinit()中准备init进程,这是最先启动的用户态进程,将其运行状态设置为RUNNABLE
  2. 内核启动scheduler进行进程调度(这个函数会一直执行),当前只有init进程,且其状态为RUNNABLE,故运行
  3. init进程会fork一个子进程,并使用exec执行sh进程,其用于处理用户在控制台的输入
  4. 用户输入命令后,sh解析命令后fork子进程并用exec来将命令对应的进程替换掉fork的子进程
init
- sh
	- ls
	- cat
	- ...

xv6内部采用时间片轮转来进行进程调度,当进程时间片到了后,触发时钟中断,执行yield,在yield中,会将当前进程的状态从运行态RUNNING设置为RUNNABLE,并切换至scheduler调度器,由其来寻找下一个可运行的进程进行切换。

那么xv6是怎么切换当前进程和scheduler调度器的呢?其实现是根据swtch函数来实现的:

// Switch to scheduler.  Must hold only p->lock
// and have changed proc->state. Saves and restores
// intena because intena is a property of this
// kernel thread, not this CPU. It should
// be proc->intena and proc->noff, but that would
// break in the few places where a lock is held but
// there's no process.
void
sched(void)
{
  int intena;
  struct proc *p = myproc();

  if(!holding(&p->lock))
    panic("sched p->lock");
  if(mycpu()->noff != 1)
    panic("sched locks");
  if(p->state == RUNNING)
    panic("sched running");
  if(intr_get())
    panic("sched interruptible");

  intena = mycpu()->intena;
  swtch(&p->context, &mycpu()->context);
  mycpu()->intena = intena;
}

// Give up the CPU for one scheduling round.
void
yield(void)
{
  struct proc *p = myproc();
  acquire(&p->lock);
  p->state = RUNNABLE;
  sched();
  release(&p->lock);
}

swtch(&p->context, &mycpu()->context)函数会将当前寄存器中的值保存到p->context中,并将当前cpu中context的内容恢复到寄存器中,其中最重要的就是ra寄存器,当恢复了这个后,ra的值对应的地址在调度器scheduler()中:

(这里ra的16进制为0x8000141e)

这样,xv6就从:执行当前进程 -> yield -> shed -> scheduler;在scheduler调度器中,内核会找到下一个就绪态的进程,并使用swtch进行进程切换:

// Per-CPU process scheduler.
// Each CPU calls scheduler() after setting itself up.
// Scheduler never returns.  It loops, doing:
//  - choose a process to run.
//  - swtch to start running that process.
//  - eventually that process transfers control
//    via swtch back to the scheduler.
void
scheduler(void)
{
  struct proc *p;
  struct cpu *c = mycpu();

  c->proc = 0;
  for(;;){
    // The most recent process to run may have had interrupts
    // turned off; enable them to avoid a deadlock if all
    // processes are waiting.
    intr_on();

    for(p = proc; p < &proc[NPROC]; p++) {
      acquire(&p->lock);
      if(p->state == RUNNABLE) {
        // Switch to chosen process.  It is the process's job
        // to release its lock and then reacquire it
        // before jumping back to us.
        p->state = RUNNING;
        c->proc = p;
        swtch(&c->context, &p->context);

        // Process is done running for now.
        // It should have changed its p->state before coming back.
        c->proc = 0;
      }
      release(&p->lock);
    }
  }
}

这样,xv6又从调度器切换到了下一个进程去执行了。

Uthread: switching between threads

uthread.c中,我们需要在用户态下模拟了线程的运行与切换。可以简单理解,运行一个线程就是执行了一个函数。

源代码中已经给出要运行的线程(函数),如何启动它们并对他们进行切换呢?

前置准备

线程的切换同进程的切换相似,需要保存通用寄存器的值,所以我们可以对内核照猫画虎般在用户线程struct thread中添加一个struct context变量:

struct context {
  uint64 ra;
  uint64 sp;

  // callee-saved
  uint64 s0;
  uint64 s1;
  uint64 s2;
  uint64 s3;
  uint64 s4;
  uint64 s5;
  uint64 s6;
  uint64 s7;
  uint64 s8;
  uint64 s9;
  uint64 s10;
  uint64 s11;
};

struct thread {
  char       stack[STACK_SIZE]; /* the thread's stack */
  int        state;             /* FREE, RUNNING, RUNNABLE */
  struct context t_context;
};

启动

risc-v中的ra寄存器的作用是保存函数返回地址。具体来说,当一个函数被调用时,调用指令会将返回地址,即调用该函数的下一条指令的地址,保存在ra寄存器中。

在实验中,我们通过thread_create来创建线程,那么,在执行玩thread_create之后并进行了调度后,是不是就该运行线程函数了?没错,那么我们需要保存创建的线程对应的函数的地址到ra寄存器中,同时,还需要设置线程的栈指针:

void 
thread_create(void (*func)())
{
  struct thread *t;

  for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
    if (t->state == FREE) break;
  }
  t->state = RUNNABLE;
  // YOUR CODE HERE
  t->t_context.ra = (uint64)func;
  t->t_context.sp = (uint64)t->stack + STACK_SIZE;
}

调度

即切换线程,我们需要保存当前线程的通用寄存器,并将下一个要运行的线程的通用寄存器值恢复到硬件上,如同进程切换那样:

# uthread_switch.S

thread_switch:
	/* YOUR CODE HERE */
	sd ra, 0(a0)
	sd sp, 8(a0)
	sd s0, 16(a0)
	sd s1, 24(a0)
	sd s2, 32(a0)
	sd s3, 40(a0)
	sd s4, 48(a0)
	sd s5, 56(a0)
	sd s6, 64(a0)
	sd s7, 72(a0)
	sd s8, 80(a0)
	sd s9, 88(a0)
	sd s10, 96(a0)
	sd s11, 104(a0)

	ld ra, 0(a1)
	ld sp, 8(a1)
	ld s0, 16(a1)
	ld s1, 24(a1)
	ld s2, 32(a1)
	ld s3, 40(a1)
	ld s4, 48(a1)
	ld s5, 56(a1)
	ld s6, 64(a1)
	ld s7, 72(a1)
	ld s8, 80(a1)
	ld s9, 88(a1)
	ld s10, 96(a1)
	ld s11, 104(a1)

	ret    /* return to ra */
// uthread.c

void 
thread_schedule(void)
{
  ...
  if (current_thread != next_thread) {         /* switch threads?  */
    next_thread->state = RUNNING;
    t = current_thread;
    current_thread = next_thread;
    /* YOUR CODE HERE
     * Invoke thread_switch to switch from t to next_thread:
     * thread_switch(??, ??);
     */
    thread_switch((uint64)&t->t_context, (uint64)&next_thread->t_context);
  } else
    next_thread = 0;
}

Using threads

如果只是简单在get和put操作中加上锁,那么这样虽然能够保证对共享资源的互斥访问,当时无法达到双线程所带来的性能提升。

应该实现的是两个线程,你放你的,我放我的,现在有5个entry table,我们可以不止使用一把锁来锁住5个table,而是使用5把锁,哪一个table需要互斥操作时只锁它一个就行,这样就细化了锁的粒度。

这里需要把table改为一个数组,实现代码比较简单,这里就不放了。

Barrier

实验的要求是每来一个线程运行到barrier就阻塞,直到所有线程都运行到了barrier才释放。这里是想让我们熟悉pthread库中关于锁和条件变量的使用,更加了解线程的同步关系。

我们可以给全局的bstate中添加一个变量count用于记录当前轮次有多少个线程已经到达了barrier,只要有线程到达,执行++bstate.count,需要注意的是,这个全局变量会被多个线程使用,故需要使用锁来保证互斥访问。

接下来判断count的值是否等于线程的数量,如果不相等,就wait阻塞当前线程,并释放锁;否则,说明一轮结束,将count置0,轮次+1,并使用broadcaset唤醒被阻塞的线程。

尝试1

按刚刚说的思路,如果这么写代码,将会造成系统程序死锁。

static void 
barrier()
{
  pthread_mutex_lock(&bstate.barrier_mutex);
  ++bstate.count;
  pthread_mutex_unlock(&bstate.barrier_mutex);
  if (bstate.count == bstate.nthread) {
    bstate.count = 0;
    ++bstate.round;
    pthread_cond_broadcast(&bstate.barrier_cond);
    return;
  }
  pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
}

当条件变量 cond 变为真且该线程被唤醒时,pthread_cond_wait 会自动重新获取互斥锁 mutex,然后返回。

  • 线程1第0轮获得锁,++count后释放锁,然后在wait处阻塞,并释放锁
  • 接着线程2在第0轮获得锁,++count后进入if判断,使用broadcast唤醒线程1
  • 线程1被唤醒,并自动获取锁
  • 线程2进入第1轮,尝试获得锁,但是此时线程1正持有锁,故线程2阻塞
  • 线程1进入第1轮,但之前自己已经获得锁了,此时又请求锁,故阻塞
  • 两个线程都阻塞,程序出现死锁

尝试2

既然pthread_cond_wait会自动重新获取互斥锁,那就在调用之后使用pthread_mutex_unlock释放锁不久好了吗?

static void 
barrier()
{
  pthread_mutex_lock(&bstate.barrier_mutex);
  if (++bstate.count == bstate.nthread) {
    bstate.count = 0;
    ++bstate.round;
    pthread_cond_broadcast(&bstate.barrier_cond);
    return;
  }
  pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
  pthread_mutex_unlock(&bstate.barrier_mutex);
}

这样还是不行:原因是pthread_cond_broadcast不会释放锁

  • 在第0轮线程1拿到锁后++count后进入wait状态,此时线程1阻塞并释放锁
  • 在第0轮线程2拿到锁,执行++count操作后使用broadcast尝试唤醒线程1,但是线程2并没有释放锁
  • 线程1抢到CPU,但是拿不到锁,继续阻塞
  • 线程2抢到了CPU的执行权进入第1轮,企图获得锁,但是之前自己没有释放锁,也阻塞
  • 两个线程都阻塞,程序出现死锁

正确做法

所以正确的做法是在broadcast后也使用unlock释放锁:

static void 
barrier()
{
  pthread_mutex_lock(&bstate.barrier_mutex);
  if (++bstate.count == bstate.nthread) {
    bstate.count = 0;
    ++bstate.round;
    pthread_cond_broadcast(&bstate.barrier_cond);
    pthread_mutex_unlock(&bstate.barrier_mutex);
    return;
  }
  pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
  pthread_mutex_unlock(&bstate.barrier_mutex);
}

代码实现详情

请见Github:https://github.com/kerolt/xv6-labs-2023/tree/thread

Reference

  1. https://www.cnblogs.com/looking-for-zihuatanejo/p/17682582.html
  2. https://mit-public-courses-cn-translatio.gitbook.io/mit6-s081/lec11-thread-switching-robert/11.7-xv6-switch-function
  3. https://www.bilibili.com/video/BV1bZ421U75W/?spm_id_from=333.999.0.0&vd_source=e7bb0cfb7224c8d6671fa62c0e80c832
  4. https://pdos.csail.mit.edu/6.S081/2023/labs/thread.html