【MIT6.S081】Lab8 lock

Intro

这个实验个人感觉挺难的,需要我们重新设计数据结构,还要考虑在并发(并行)情况下对于锁的操作,以减少多核情况下对于锁的竞争。其中主要涉及内存分配和IO缓冲块分配,在这个lab之前,xv6对于这两个分配都是使用的全局对象,并只有一把全局锁进行操作,这样的话在并行情况下锁的竞争是很激烈的,我们的任务就是重新设计这两个分配器,它们的重构思路并不完全一致,需要具体问题具体分析。

Memory Allocator

在xv6中,内存通过kalloc()来分配,在其内部,会使用一个kmem的结构体变量:

1
2
3
4
5
6
7
8
9
10
// kernel/kalloc.c

struct run {
struct run *next;
};

struct {
struct spinlock lock;
struct run *freelist;
} kmem;

kmem.freelist保存着未使用的内存块(每块大小为4KB),在内核初始化时,会通过kinit()将地址KERNBASE ~ PHYSTOP的物理内存(一共128MB)放入freelist中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void
kinit()
{
initlock(&kmem.lock, "kmem");
freerange(end, (void*)PHYSTOP); // end为kernel.ld中定义的,值为0x80000000,即KERNBASE
}

void
freerange(void *pa_start, void *pa_end)
{
char *p;
p = (char*)PGROUNDUP((uint64)pa_start);
for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE)
kfree(p); // 使用kfree将内存块放入freelist中
}

通过上面的源码分析我们可以看到,整个xv6内核都是通过一个全局的kmem来分配和回收内存块,那么如果当多个CPU(多个core)需要操作内存块时,就必须得用锁才能保证整体的稳定和正确。

但是这样做又会有一个大问题,那就是一个cpu在操作kmem时,另一个CPU即便想获取内存块或释放内存块,因为锁的缘故也只能等待。故我们的解决方案是为每个CPU都设置一个kmem,这样,哪个CPU需要操作内存块时就可以只锁上它自己的kmem,其他CPU受到的干扰会大大减少。

为什么说是大大减少而不是完全减少呢?这是因为当某个CPU的kmem.freelist中没有可分配的内存块时,需要去其他CPU的kmem.freelist中去拿一个过来,这时就需要处理好这两个CPU的锁的处理了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
struct {
struct spinlock lock;
struct run *freelist;
char lock_name[8]; // 每个kmem的锁的名称
} kmem[NCPU]; // 为每个CPU都设置一个kmem

void
kinit()
{
// 初始化时要对每个kmem都初始锁
for (int i = 0; i < NCPU; i++) {
initlock(&kmem[i].lock, "kmem");
snprintf(kmem[i].lock_name, sizeof(kmem[i].lock_name), "kmem%d", i);
}
// 这里会先将所有内存块都分配给kmem[0],因为内核启动时是cpus[0]在做初始化操作
// 之后其他CPU需要内存块时,从cpus[0]这里拿
freerange(end, (void*)PHYSTOP);
}

void
kfree(void *pa)
{
struct run *r;

if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");

// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);

r = (struct run*)pa;

// ----------------------------------------
// kfree时只需要将内存块放到自己所属cpu的kmem.freelist中
push_off();

int cid = cpuid();
acquire(&kmem[cid].lock);
r->next = kmem[cid].freelist;
kmem[cid].freelist = r;
release(&kmem[cid].lock);

pop_off();
// ----------------------------------------
}

void *
kalloc(void)
{
struct run *r;

// ----------------------------------------
push_off();

int cid = cpuid();
acquire(&kmem[cid].lock);
r = kmem[cid].freelist;
if (r) {
// 如果当前的freelist中还有内存块,则直接用
kmem[cid].freelist = r->next;
} else {
// 没有?那就拿!
// 遍历一下其他CPU的kmem,如果找到的freelist中还有内存块,就拿它的
for (int next_cid = 0; next_cid < NCPU; next_cid++) {
// 不找自己
if (next_cid == cid)
continue;

acquire(&kmem[next_cid].lock);
r = kmem[next_cid].freelist;
if (r) {
kmem[next_cid].freelist = r->next;
release(&kmem[next_cid].lock);
break;
}
release(&kmem[next_cid].lock);
}
}
release(&kmem[cid].lock);

pop_off();
// ----------------------------------------

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}

Buffer Cache

先介绍下实验初始的bcache:

1
2
3
4
5
6
7
8
9
struct {
struct spinlock lock;
struct buf buf[NBUF];

// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head;
} bcache;
  • struct spinlock lock:bcache的全局锁
  • struct buf buf[NBUF]:缓冲块池,即包含了所有的buffer cache
  • struct buf head:一个LRU链表,用于操作缓冲块,使用head.next获取的是最近刚使用过的buffer,head.prev获取的是最近未使用时间最久的buffer(或者说是未被使用的buffer,即引用计数为0)

原来的bcache中的buf数组在binit时就给LRU链表初始化用了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
binit(void)
{
struct buf *b;

initlock(&bcache.lock, "bcache");

// Create linked list of buffers
bcache.head.prev = &bcache.head;
bcache.head.next = &bcache.head;
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->next = bcache.head.next;
b->prev = &bcache.head;
initsleeplock(&b->lock, "buffer");
bcache.head.next->prev = b;
bcache.head.next = b;
}
}

而这样设计的buffer cache有一个问题,由于buffer cache只有一个全局锁,当在高并发的情况下,进程需要并发访问bcache时,无法达到多进程带来的优势,一个进程必须要先等前一个进程释放锁后才可以操作。

考虑前一个实验中的kalloc,我们能否也使用那样的策略?

Reducing contention in the block cache is more tricky than for kalloc, because bcache buffers are truly shared among processes (and thus CPUs). For kalloc, one could eliminate most contention by giving each CPU its own allocator; that won’t work for the block cache.

为什么这么说呢?是因为block cache并不像内存页那样具有通用性。block cache对应着真实的物理外存块,每一个CPU都可能使用同一个block cache,所以无法像kalloc中那样为每个CPU分配其对应的cache。

既然我们无法分别为每个CPU分配,那么我们可以换一个角度,对所有的block块进行分组:我们可以创建一个哈希桶hash buckets(这里使用的容量为13),那么对每个block块的块号进行取余操作(mod 13)即可将它们映射到其中一个哈希桶中。这样一来,我们需要使用标号为blockno的块时,到其映射的哈希桶中去寻找即可,并且,在并发情况下,我们一般上锁的单位从整个bcache变为了其中的一个桶,锁的粒度大大减小了。

那么我们修改一下bcache的数据结构:

1
2
3
4
5
6
7
8
9
10
11
#define NBUCKET 13
#define BLOCK_HASH(blockno) (blockno % NBUCKET)

struct {
struct spinlock g_lock; // 全局锁,这里相较之前只是改了个名字
struct buf buf[NBUF];

struct spinlock bk_lock[NBUCKET]; // 每个hash bucket都对应有一把锁
struct buf bucket[NBUCKET]; // hash bucket
int size; // 缓冲块池(buf[NBUF])中已使用的块数
} bcache;

我们在重构的方案中,不再使用LRU链表,但还是使用了LRU算法的思想,只是用时间戳来代替LRU链表:

1
2
3
4
5
6
7
8
9
10
11
12
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
// struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
uint timestamp;
};

这个时间戳通过一个全局变量ticks(trap.c)来获取:

1
extern uint ticks;

那么在初始化bcache时,我们只需要初始化其中的锁和bcache.size即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void
binit(void)
{
struct buf *b;

bcache.size = 0;
initlock(&bcache.g_lock, "bcache");

for (int i = 0; i < NBUCKET; i++) {
initlock(&bcache.bk_lock[i], "bk_lock");
}

for(b = bcache.buf; b < bcache.buf+NBUF; b++){
initsleeplock(&b->lock, "buffer");
}
}

由于我们对于锁的操作单位变为了bucket,所以下面两个函数也需要修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void
bpin(struct buf *b) {
int idx = BLOCK_HASH(b->blockno);
acquire(&bcache.bk_lock[idx]);
b->refcnt++;
release(&bcache.bk_lock[idx]);
}

void
bunpin(struct buf *b) {
int idx = BLOCK_HASH(b->blockno);
acquire(&bcache.bk_lock[idx]);
b->refcnt--;
release(&bcache.bk_lock[idx]);
}

在原先的释放buffer块的brelse(struct buf *b)函数中,会先对b的引用计数-1,当其值为0时,将其转移至LRU链表的head->prev位置,说明这个buffer没有被任何进程使用了。而在重构方案中,我们使用时间戳代替了LRU链表,每个bucket中我们并没有维护一个LRU链表,而是认为引用计数为0且时间戳最小的的buffer是没有被任何进程使用的,故在引用计数为0时,只需要更新buffer的时间戳即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");

releasesleep(&b->lock);

int idx = BLOCK_HASH(b->blockno);
acquire(&bcache.bk_lock[idx]);
b->refcnt--;
if (b->refcnt == 0) {
// no one is waiting for it.
b->timestamp = ticks; // 未使用的buffer的时间戳一定是最小的
}

release(&bcache.bk_lock[idx]);
}

对于bcache最重要的bget()函数,我们重构的思路为:

  1. 检查标号为blockno的块的缓存是否在cache中,如果在,则其引用计数+1,返回该缓存块
  2. 在缓存中没有找到,先在缓冲块池中寻找还未分配给bucket的缓存块
  3. 如果缓冲块池中都分配出去了,就到每个bucket中去找。如果当前遍历的bucket中有buffer的引用计数为0,拿到其中时间戳最小的buffer(这也就是说这个buffer是距离现在最久没有使用的合法块)
    • 如果这个buffer在原先的bucket中,返回这个buffer
    • 否则需要将这个buffer从当前bucket中转移到目标bucket中

这是一个大致的思路,具体细节参考如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
int bucket_idx = BLOCK_HASH(blockno);

// 先对blockno对应的bucket上锁即可
acquire(&bcache.bk_lock[bucket_idx]);

// Is the block already cached?
for(b = &bcache.bucket[bucket_idx]; b; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.bk_lock[bucket_idx]);
acquiresleep(&b->lock);
return b;
}
}

// 在缓存中没有找到,先在缓冲块池中寻找还未分配给bucket的缓存块
// 需要使用全局锁来保证bcache.size++的原子性
// 这里也是为什么buffer时间戳不需要初始化的原因:每一次需要使用buffer时(会调用bget),
// 如果buffer池中还有空闲buffer,则会直接使用这个buffer,完成操作后会调用relese释放buffer,此时就会更新buffer的时间戳
acquire(&bcache.g_lock);
if (bcache.size < NBUF) {
struct buf *b = &bcache.buf[bcache.size++];
b->next = bcache.bucket[bucket_idx].next;
bcache.bucket[bucket_idx].next = b;
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&bcache.g_lock);
acquiresleep(&b->lock);
return b;
}
release(&bcache.g_lock);

// 在这时才能释放该bucket的锁,假设在检查缓存是否存在后释放:
// 如果有两个进程1和2,此时缓冲块池还有多个未分配的块,进程1检查bucket,发现没有缓存,释放锁,准备去缓冲块池中拿
// 此时切换到进程2,进程2检查bucket,发现没有缓存,也去缓冲块池中拿
// 这样就会导致bucket会添加两个blockno的缓冲块
release(&bcache.bk_lock[bucket_idx]);

// 在每个bucket中去找可用的buffer cache
for (int i = 0; i < NBUCKET; i++) {
struct buf *cur_buf, *pre_buf, *min_buf, *min_pre_buf;
uint min_timestamp = -1;

acquire(&bcache.bk_lock[bucket_idx]);
pre_buf = &bcache.bucket[bucket_idx];
cur_buf = pre_buf->next;

// 遍历bcache.bucket[bucket_idx]
while (cur_buf) {
// 为什么这里需要重新检查?考虑这样一种情况:
// 假设缓冲块池中还有一个未分配的,此时有两个进程,进程1和2都需要访问同一个标号blockno的块
// 进程1先拿到这个未分配的,并将其放入了对应的bucket中
// 之后进程2发现池中没有未分配的了,开始遍历所有bucket,
// 如果这时不重新检查一下blockno对应的bucket,则会导致一个bucket中有两个blockno的块
if (bucket_idx == BLOCK_HASH(blockno) && cur_buf->blockno == blockno &&
cur_buf->dev == dev) {
cur_buf->refcnt++;
release(&bcache.bk_lock[bucket_idx]);
acquiresleep(&cur_buf->lock);
return cur_buf;
}

// 只有引用计数为0,并且时间戳最小的缓冲块才可被重新分配
if (cur_buf->refcnt == 0 && cur_buf->timestamp < min_timestamp) {
min_pre_buf = pre_buf;
min_buf = cur_buf;
min_timestamp = cur_buf->timestamp;
}

pre_buf = cur_buf;
cur_buf = cur_buf->next;
}

// 在本轮中找到了可重新分配的缓冲块
if (min_buf) {
min_buf->dev = dev;
min_buf->blockno = blockno;
min_buf->valid = 0;
min_buf->refcnt = 1;

// 是自身bucket中的,不用做转移操作
if (bucket_idx == BLOCK_HASH(blockno)) {
// release(&bcache.hash_lock);
release(&bcache.bk_lock[bucket_idx]);
acquiresleep(&min_buf->lock);
return min_buf;
}

// 是其他bucket中的,需要转移
// 先将目标bucket中的buffer移除,然后释放锁
min_pre_buf->next = min_buf->next;
release(&bcache.bk_lock[bucket_idx]);

// 接着获取blockno对应的锁,并将从目标bucket中移除的buffer放至blockno对应的bucket,返回该buffer
bucket_idx = BLOCK_HASH(blockno);
acquire(&bcache.bk_lock[bucket_idx]);
min_buf->next = bcache.bucket[bucket_idx].next;
bcache.bucket[bucket_idx].next = min_buf;
release(&bcache.bk_lock[bucket_idx]);
acquiresleep(&min_buf->lock);
return min_buf;
}

release(&bcache.bk_lock[bucket_idx]);
// 如果到底了,从头来,保证遍历每一个bucket
if (++bucket_idx == NBUCKET) {
bucket_idx = 0;
}
}

panic("bget: no buffers");
}

bget是这个lab中最复杂的地方,其处理逻辑虽然较好理解,但是多个锁的操作顺序很让人头疼,需要考虑到多种并发情况。

Code Details

代码实现详情请见Github:https://github.com/kerolt/xv6-labs-2023/commit/ccac48e8ae6b6dde3e7c747a77f4149232420901

Reference

  • https://github.com/whileskies/xv6-labs-2020/blob/main/doc/Lab8-locks.md
  • https://blog.csdn.net/LostUnravel/article/details/121430900