【MIT6.S081】Lab6 multithreading

本次的实验总体都不是很难,第一个练习让我们在用户态模拟了线程的切换,这里重要的就是进程/线程上下文的保存与恢复;第二三个练习则是让我们跳出了xv6,去熟悉pthread库和线程的同步互斥。

简单分析xv6中的进程切换

init是用户态最先启动的进程,其启动后会创建sh进程,sh进程又会fork并exec其他命令:

  1. 内核会在userinit()中准备init进程,这是最先启动的用户态进程,将其运行状态设置为RUNNABLE
  2. 内核启动scheduler进行进程调度(这个函数会一直执行),当前只有init进程,且其状态为RUNNABLE,故运行
  3. init进程会fork一个子进程,并使用exec执行sh进程,其用于处理用户在控制台的输入
  4. 用户输入命令后,sh解析命令后fork子进程并用exec来将命令对应的进程替换掉fork的子进程
1
2
3
4
5
init
- sh
- ls
- cat
- ...

xv6内部采用时间片轮转来进行进程调度,当进程时间片到了后,触发时钟中断,执行yield,在yield中,会将当前进程的状态从运行态RUNNING设置为RUNNABLE,并切换至scheduler调度器,由其来寻找下一个可运行的进程进行切换。

那么xv6是怎么切换当前进程和scheduler调度器的呢?其实现是根据swtch函数来实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Switch to scheduler.  Must hold only p->lock
// and have changed proc->state. Saves and restores
// intena because intena is a property of this
// kernel thread, not this CPU. It should
// be proc->intena and proc->noff, but that would
// break in the few places where a lock is held but
// there's no process.
void
sched(void)
{
int intena;
struct proc *p = myproc();

if(!holding(&p->lock))
panic("sched p->lock");
if(mycpu()->noff != 1)
panic("sched locks");
if(p->state == RUNNING)
panic("sched running");
if(intr_get())
panic("sched interruptible");

intena = mycpu()->intena;
swtch(&p->context, &mycpu()->context);
mycpu()->intena = intena;
}

// Give up the CPU for one scheduling round.
void
yield(void)
{
struct proc *p = myproc();
acquire(&p->lock);
p->state = RUNNABLE;
sched();
release(&p->lock);
}

swtch(&p->context, &mycpu()->context)函数会将当前寄存器中的值保存到p->context中,并将当前cpu中context的内容恢复到寄存器中,其中最重要的就是ra寄存器,当恢复了这个后,ra的值对应的地址在调度器scheduler()中:

(这里ra的16进制为0x8000141e)

这样,xv6就从:执行当前进程 -> yield -> shed -> scheduler;在scheduler调度器中,内核会找到下一个就绪态的进程,并使用swtch进行进程切换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Per-CPU process scheduler.
// Each CPU calls scheduler() after setting itself up.
// Scheduler never returns. It loops, doing:
// - choose a process to run.
// - swtch to start running that process.
// - eventually that process transfers control
// via swtch back to the scheduler.
void
scheduler(void)
{
struct proc *p;
struct cpu *c = mycpu();

c->proc = 0;
for(;;){
// The most recent process to run may have had interrupts
// turned off; enable them to avoid a deadlock if all
// processes are waiting.
intr_on();

for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == RUNNABLE) {
// Switch to chosen process. It is the process's job
// to release its lock and then reacquire it
// before jumping back to us.
p->state = RUNNING;
c->proc = p;
swtch(&c->context, &p->context);

// Process is done running for now.
// It should have changed its p->state before coming back.
c->proc = 0;
}
release(&p->lock);
}
}
}

这样,xv6又从调度器切换到了下一个进程去执行了。

Uthread: switching between threads

uthread.c中,我们需要在用户态下模拟了线程的运行与切换。可以简单理解,运行一个线程就是执行了一个函数。

源代码中已经给出要运行的线程(函数),如何启动它们并对他们进行切换呢?

前置准备

线程的切换同进程的切换相似,需要保存通用寄存器的值,所以我们可以对内核照猫画虎般在用户线程struct thread中添加一个struct context变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct context {
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};

struct thread {
char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct context t_context;
};

启动

risc-v中的ra寄存器的作用是保存函数返回地址。具体来说,当一个函数被调用时,调用指令会将返回地址,即调用该函数的下一条指令的地址,保存在ra寄存器中。

在实验中,我们通过thread_create来创建线程,那么,在执行玩thread_create之后并进行了调度后,是不是就该运行线程函数了?没错,那么我们需要保存创建的线程对应的函数的地址到ra寄存器中,同时,还需要设置线程的栈指针:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void 
thread_create(void (*func)())
{
struct thread *t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->t_context.ra = (uint64)func;
t->t_context.sp = (uint64)t->stack + STACK_SIZE;
}

调度

即切换线程,我们需要保存当前线程的通用寄存器,并将下一个要运行的线程的通用寄存器值恢复到硬件上,如同进程切换那样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# uthread_switch.S

thread_switch:
/* YOUR CODE HERE */
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)

ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)

ret /* return to ra */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// uthread.c

void
thread_schedule(void)
{
...
if (current_thread != next_thread) { /* switch threads? */
next_thread->state = RUNNING;
t = current_thread;
current_thread = next_thread;
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
thread_switch((uint64)&t->t_context, (uint64)&next_thread->t_context);
} else
next_thread = 0;
}

Using threads

如果只是简单在get和put操作中加上锁,那么这样虽然能够保证对共享资源的互斥访问,当时无法达到双线程所带来的性能提升。

应该实现的是两个线程,你放你的,我放我的,现在有5个entry table,我们可以不止使用一把锁来锁住5个table,而是使用5把锁,哪一个table需要互斥操作时只锁它一个就行,这样就细化了锁的粒度。

这里需要把table改为一个数组,实现代码比较简单,这里就不放了。

Barrier

实验的要求是每来一个线程运行到barrier就阻塞,直到所有线程都运行到了barrier才释放。这里是想让我们熟悉pthread库中关于锁和条件变量的使用,更加了解线程的同步关系。

我们可以给全局的bstate中添加一个变量count用于记录当前轮次有多少个线程已经到达了barrier,只要有线程到达,执行++bstate.count,需要注意的是,这个全局变量会被多个线程使用,故需要使用锁来保证互斥访问。

接下来判断count的值是否等于线程的数量,如果不相等,就wait阻塞当前线程,并释放锁;否则,说明一轮结束,将count置0,轮次+1,并使用broadcaset唤醒被阻塞的线程。

尝试1

按刚刚说的思路,如果这么写代码,将会造成系统程序死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void 
barrier()
{
pthread_mutex_lock(&bstate.barrier_mutex);
++bstate.count;
pthread_mutex_unlock(&bstate.barrier_mutex);
if (bstate.count == bstate.nthread) {
bstate.count = 0;
++bstate.round;
pthread_cond_broadcast(&bstate.barrier_cond);
return;
}
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
}

当条件变量 cond 变为真且该线程被唤醒时,pthread_cond_wait 会自动重新获取互斥锁 mutex,然后返回。

  • 线程1第0轮获得锁,++count后释放锁,然后在wait处阻塞,并释放锁
  • 接着线程2在第0轮获得锁,++count后进入if判断,使用broadcast唤醒线程1
  • 线程1被唤醒,并自动获取锁
  • 线程2进入第1轮,尝试获得锁,但是此时线程1正持有锁,故线程2阻塞
  • 线程1进入第1轮,但之前自己已经获得锁了,此时又请求锁,故阻塞
  • 两个线程都阻塞,程序出现死锁

尝试2

既然pthread_cond_wait会自动重新获取互斥锁,那就在调用之后使用pthread_mutex_unlock释放锁不久好了吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
static void 
barrier()
{
pthread_mutex_lock(&bstate.barrier_mutex);
if (++bstate.count == bstate.nthread) {
bstate.count = 0;
++bstate.round;
pthread_cond_broadcast(&bstate.barrier_cond);
return;
}
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
pthread_mutex_unlock(&bstate.barrier_mutex);
}

这样还是不行:原因是pthread_cond_broadcast不会释放锁

  • 在第0轮线程1拿到锁后++count后进入wait状态,此时线程1阻塞并释放锁
  • 在第0轮线程2拿到锁,执行++count操作后使用broadcast尝试唤醒线程1,但是线程2并没有释放锁
  • 线程1抢到CPU,但是拿不到锁,继续阻塞
  • 线程2抢到了CPU的执行权进入第1轮,企图获得锁,但是之前自己没有释放锁,也阻塞
  • 两个线程都阻塞,程序出现死锁

正确做法

所以正确的做法是在broadcast后也使用unlock释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void 
barrier()
{
pthread_mutex_lock(&bstate.barrier_mutex);
if (++bstate.count == bstate.nthread) {
bstate.count = 0;
++bstate.round;
pthread_cond_broadcast(&bstate.barrier_cond);
pthread_mutex_unlock(&bstate.barrier_mutex);
return;
}
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
pthread_mutex_unlock(&bstate.barrier_mutex);
}

代码实现详情

请见Github:https://github.com/kerolt/xv6-labs-2023/tree/thread

Reference

  1. https://www.cnblogs.com/looking-for-zihuatanejo/p/17682582.html
  2. https://mit-public-courses-cn-translatio.gitbook.io/mit6-s081/lec11-thread-switching-robert/11.7-xv6-switch-function
  3. https://www.bilibili.com/video/BV1bZ421U75W/?spm_id_from=333.999.0.0&vd_source=e7bb0cfb7224c8d6671fa62c0e80c832
  4. https://pdos.csail.mit.edu/6.S081/2023/labs/thread.html