本次的实验总体都不是很难,第一个练习让我们在用户态模拟了线程的切换,这里重要的就是进程/线程上下文的保存与恢复;第二三个练习则是让我们跳出了xv6,去熟悉pthread库和线程的同步互斥。
简单分析xv6中的进程切换
init是用户态最先启动的进程,其启动后会创建sh进程,sh进程又会fork并exec其他命令:
- 内核会在
userinit()
中准备init进程,这是最先启动的用户态进程,将其运行状态设置为RUNNABLE
- 内核启动scheduler进行进程调度(这个函数会一直执行),当前只有init进程,且其状态为
RUNNABLE
,故运行 - init进程会fork一个子进程,并使用exec执行sh进程,其用于处理用户在控制台的输入
- 用户输入命令后,sh解析命令后fork子进程并用exec来将命令对应的进程替换掉fork的子进程
init
- sh
- ls
- cat
- ...
xv6内部采用时间片轮转来进行进程调度,当进程时间片到了后,触发时钟中断,执行yield,在yield中,会将当前进程的状态从运行态RUNNING设置为RUNNABLE,并切换至scheduler调度器,由其来寻找下一个可运行的进程进行切换。
那么xv6是怎么切换当前进程和scheduler调度器的呢?其实现是根据swtch
函数来实现的:
// Switch to scheduler. Must hold only p->lock
// and have changed proc->state. Saves and restores
// intena because intena is a property of this
// kernel thread, not this CPU. It should
// be proc->intena and proc->noff, but that would
// break in the few places where a lock is held but
// there's no process.
void
sched(void)
{
int intena;
struct proc *p = myproc();
if(!holding(&p->lock))
panic("sched p->lock");
if(mycpu()->noff != 1)
panic("sched locks");
if(p->state == RUNNING)
panic("sched running");
if(intr_get())
panic("sched interruptible");
intena = mycpu()->intena;
swtch(&p->context, &mycpu()->context);
mycpu()->intena = intena;
}
// Give up the CPU for one scheduling round.
void
yield(void)
{
struct proc *p = myproc();
acquire(&p->lock);
p->state = RUNNABLE;
sched();
release(&p->lock);
}
swtch(&p->context, &mycpu()->context)
函数会将当前寄存器中的值保存到p->context中,并将当前cpu中context的内容恢复到寄存器中,其中最重要的就是ra
寄存器,当恢复了这个后,ra的值对应的地址在调度器scheduler()
中:
(这里ra的16进制为0x8000141e)
这样,xv6就从:执行当前进程 -> yield -> shed -> scheduler;在scheduler调度器中,内核会找到下一个就绪态的进程,并使用swtch进行进程切换:
// Per-CPU process scheduler.
// Each CPU calls scheduler() after setting itself up.
// Scheduler never returns. It loops, doing:
// - choose a process to run.
// - swtch to start running that process.
// - eventually that process transfers control
// via swtch back to the scheduler.
void
scheduler(void)
{
struct proc *p;
struct cpu *c = mycpu();
c->proc = 0;
for(;;){
// The most recent process to run may have had interrupts
// turned off; enable them to avoid a deadlock if all
// processes are waiting.
intr_on();
for(p = proc; p < &proc[NPROC]; p++) {
acquire(&p->lock);
if(p->state == RUNNABLE) {
// Switch to chosen process. It is the process's job
// to release its lock and then reacquire it
// before jumping back to us.
p->state = RUNNING;
c->proc = p;
swtch(&c->context, &p->context);
// Process is done running for now.
// It should have changed its p->state before coming back.
c->proc = 0;
}
release(&p->lock);
}
}
}
这样,xv6又从调度器切换到了下一个进程去执行了。
Uthread: switching between threads
在uthread.c
中,我们需要在用户态下模拟了线程的运行与切换。可以简单理解,运行一个线程就是执行了一个函数。
源代码中已经给出要运行的线程(函数),如何启动它们并对他们进行切换呢?
前置准备
线程的切换同进程的切换相似,需要保存通用寄存器的值,所以我们可以对内核照猫画虎般在用户线程struct thread
中添加一个struct context
变量:
struct context {
uint64 ra;
uint64 sp;
// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};
struct thread {
char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct context t_context;
};
启动
risc-v中的ra寄存器的作用是保存函数返回地址。具体来说,当一个函数被调用时,调用指令会将返回地址,即调用该函数的下一条指令的地址,保存在ra
寄存器中。
在实验中,我们通过thread_create
来创建线程,那么,在执行玩thread_create之后并进行了调度后,是不是就该运行线程函数了?没错,那么我们需要保存创建的线程对应的函数的地址到ra寄存器中,同时,还需要设置线程的栈指针:
void
thread_create(void (*func)())
{
struct thread *t;
for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->t_context.ra = (uint64)func;
t->t_context.sp = (uint64)t->stack + STACK_SIZE;
}
调度
即切换线程,我们需要保存当前线程的通用寄存器,并将下一个要运行的线程的通用寄存器值恢复到硬件上,如同进程切换那样:
# uthread_switch.S
thread_switch:
/* YOUR CODE HERE */
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)
ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)
ret /* return to ra */
// uthread.c
void
thread_schedule(void)
{
...
if (current_thread != next_thread) { /* switch threads? */
next_thread->state = RUNNING;
t = current_thread;
current_thread = next_thread;
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
thread_switch((uint64)&t->t_context, (uint64)&next_thread->t_context);
} else
next_thread = 0;
}
Using threads
如果只是简单在get和put操作中加上锁,那么这样虽然能够保证对共享资源的互斥访问,当时无法达到双线程所带来的性能提升。
应该实现的是两个线程,你放你的,我放我的,现在有5个entry table,我们可以不止使用一把锁来锁住5个table,而是使用5把锁,哪一个table需要互斥操作时只锁它一个就行,这样就细化了锁的粒度。
这里需要把table改为一个数组,实现代码比较简单,这里就不放了。
Barrier
实验的要求是每来一个线程运行到barrier就阻塞,直到所有线程都运行到了barrier才释放。这里是想让我们熟悉pthread库中关于锁和条件变量的使用,更加了解线程的同步关系。
我们可以给全局的bstate中添加一个变量count
用于记录当前轮次有多少个线程已经到达了barrier,只要有线程到达,执行++bstate.count
,需要注意的是,这个全局变量会被多个线程使用,故需要使用锁来保证互斥访问。
接下来判断count的值是否等于线程的数量,如果不相等,就wait阻塞当前线程,并释放锁;否则,说明一轮结束,将count置0,轮次+1,并使用broadcaset唤醒被阻塞的线程。
尝试1
按刚刚说的思路,如果这么写代码,将会造成系统程序死锁。
static void
barrier()
{
pthread_mutex_lock(&bstate.barrier_mutex);
++bstate.count;
pthread_mutex_unlock(&bstate.barrier_mutex);
if (bstate.count == bstate.nthread) {
bstate.count = 0;
++bstate.round;
pthread_cond_broadcast(&bstate.barrier_cond);
return;
}
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
}
当条件变量 cond
变为真且该线程被唤醒时,pthread_cond_wait
会自动重新获取互斥锁 mutex
,然后返回。
- 线程1第0轮获得锁,++count后释放锁,然后在wait处阻塞,并释放锁
- 接着线程2在第0轮获得锁,++count后进入if判断,使用broadcast唤醒线程1
- 线程1被唤醒,并自动获取锁
- 线程2进入第1轮,尝试获得锁,但是此时线程1正持有锁,故线程2阻塞
- 线程1进入第1轮,但之前自己已经获得锁了,此时又请求锁,故阻塞
- 两个线程都阻塞,程序出现死锁
尝试2
既然pthread_cond_wait
会自动重新获取互斥锁,那就在调用之后使用pthread_mutex_unlock
释放锁不久好了吗?
static void
barrier()
{
pthread_mutex_lock(&bstate.barrier_mutex);
if (++bstate.count == bstate.nthread) {
bstate.count = 0;
++bstate.round;
pthread_cond_broadcast(&bstate.barrier_cond);
return;
}
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
pthread_mutex_unlock(&bstate.barrier_mutex);
}
这样还是不行:原因是pthread_cond_broadcast
不会释放锁。
- 在第0轮线程1拿到锁后++count后进入wait状态,此时线程1阻塞并释放锁
- 在第0轮线程2拿到锁,执行++count操作后使用broadcast尝试唤醒线程1,但是线程2并没有释放锁
- 线程1抢到CPU,但是拿不到锁,继续阻塞
- 线程2抢到了CPU的执行权进入第1轮,企图获得锁,但是之前自己没有释放锁,也阻塞
- 两个线程都阻塞,程序出现死锁
正确做法
所以正确的做法是在broadcast后也使用unlock释放锁:
static void
barrier()
{
pthread_mutex_lock(&bstate.barrier_mutex);
if (++bstate.count == bstate.nthread) {
bstate.count = 0;
++bstate.round;
pthread_cond_broadcast(&bstate.barrier_cond);
pthread_mutex_unlock(&bstate.barrier_mutex);
return;
}
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
pthread_mutex_unlock(&bstate.barrier_mutex);
}
代码实现详情
请见Github:https://github.com/kerolt/xv6-labs-2023/tree/thread
Reference
- https://www.cnblogs.com/looking-for-zihuatanejo/p/17682582.html
- https://mit-public-courses-cn-translatio.gitbook.io/mit6-s081/lec11-thread-switching-robert/11.7-xv6-switch-function
- https://www.bilibili.com/video/BV1bZ421U75W/?spm_id_from=333.999.0.0&vd_source=e7bb0cfb7224c8d6671fa62c0e80c832
- https://pdos.csail.mit.edu/6.S081/2023/labs/thread.html