Intro
这个实验个人感觉挺难的,需要我们重新设计数据结构,还要考虑在并发(并行)情况下对于锁的操作,以减少多核情况下对于锁的竞争。其中主要涉及内存分配和IO缓冲块分配,在这个lab之前,xv6对于这两个分配都是使用的全局对象,并只有一把全局锁进行操作,这样的话在并行情况下锁的竞争是很激烈的,我们的任务就是重新设计这两个分配器,它们的重构思路并不完全一致,需要具体问题具体分析。
Memory Allocator
在xv6中,内存通过kalloc()
来分配,在其内部,会使用一个kmem
的结构体变量:
1 2 3 4 5 6 7 8 9 10 struct run { struct run *next ; }; struct { struct spinlock lock ; struct run *freelist ; } kmem;
kmem.freelist
保存着未使用的内存块(每块大小为4KB),在内核初始化时,会通过kinit()
将地址KERNBASE ~ PHYSTOP
的物理内存(一共128MB)放入freelist中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void kinit () { initlock(&kmem.lock, "kmem" ); freerange(end, (void *)PHYSTOP); } void freerange (void *pa_start, void *pa_end) { char *p; p = (char *)PGROUNDUP((uint64)pa_start); for (; p + PGSIZE <= (char *)pa_end; p += PGSIZE) kfree(p); }
通过上面的源码分析我们可以看到,整个xv6内核都是通过一个全局的kmem
来分配和回收内存块,那么如果当多个CPU(多个core)需要操作内存块时,就必须得用锁才能保证整体的稳定和正确。
但是这样做又会有一个大问题,那就是一个cpu在操作kmem
时,另一个CPU即便想获取内存块或释放内存块,因为锁的缘故也只能等待。故我们的解决方案是为每个CPU都设置一个kmem
,这样,哪个CPU需要操作内存块时就可以只锁上它自己的kmem,其他CPU受到的干扰会大大减少。
为什么说是大大减少而不是完全减少呢?这是因为当某个CPU的kmem.freelist
中没有可分配的内存块时,需要去其他CPU的kmem.freelist
中去拿一个过来,这时就需要处理好这两个CPU的锁的处理了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 struct { struct spinlock lock ; struct run *freelist ; char lock_name[8 ]; } kmem[NCPU]; void kinit () { for (int i = 0 ; i < NCPU; i++) { initlock(&kmem[i].lock, "kmem" ); snprintf (kmem[i].lock_name, sizeof (kmem[i].lock_name), "kmem%d" , i); } freerange(end, (void *)PHYSTOP); } void kfree (void *pa) { struct run *r ; if (((uint64)pa % PGSIZE) != 0 || (char *)pa < end || (uint64)pa >= PHYSTOP) panic("kfree" ); memset (pa, 1 , PGSIZE); r = (struct run*)pa; push_off(); int cid = cpuid(); acquire(&kmem[cid].lock); r->next = kmem[cid].freelist; kmem[cid].freelist = r; release(&kmem[cid].lock); pop_off(); } void *kalloc (void ) { struct run *r ; push_off(); int cid = cpuid(); acquire(&kmem[cid].lock); r = kmem[cid].freelist; if (r) { kmem[cid].freelist = r->next; } else { for (int next_cid = 0 ; next_cid < NCPU; next_cid++) { if (next_cid == cid) continue ; acquire(&kmem[next_cid].lock); r = kmem[next_cid].freelist; if (r) { kmem[next_cid].freelist = r->next; release(&kmem[next_cid].lock); break ; } release(&kmem[next_cid].lock); } } release(&kmem[cid].lock); pop_off(); if (r) memset ((char *)r, 5 , PGSIZE); return (void *)r; }
Buffer Cache
先介绍下实验初始的bcache:
1 2 3 4 5 6 7 8 9 struct { struct spinlock lock ; struct buf buf [NBUF ]; struct buf head ; } bcache;
struct spinlock lock
:bcache的全局锁
struct buf buf[NBUF]
:缓冲块池,即包含了所有的buffer cache
struct buf head
:一个LRU链表,用于操作缓冲块,使用head.next获取的是最近刚使用过的buffer,head.prev获取的是最近未使用时间最久的buffer(或者说是未被使用的buffer,即引用计数为0)
原来的bcache中的buf数组在binit时就给LRU链表初始化用了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void binit (void ) { struct buf *b ; initlock(&bcache.lock, "bcache" ); bcache.head.prev = &bcache.head; bcache.head.next = &bcache.head; for (b = bcache.buf; b < bcache.buf+NBUF; b++){ b->next = bcache.head.next; b->prev = &bcache.head; initsleeplock(&b->lock, "buffer" ); bcache.head.next->prev = b; bcache.head.next = b; } }
而这样设计的buffer cache有一个问题,由于buffer cache只有一个全局锁,当在高并发的情况下,进程需要并发访问bcache时,无法达到多进程带来的优势,一个进程必须要先等前一个进程释放锁后才可以操作。
考虑前一个实验中的kalloc,我们能否也使用那样的策略?
Reducing contention in the block cache is more tricky than for kalloc, because bcache buffers are truly shared among processes (and thus CPUs). For kalloc, one could eliminate most contention by giving each CPU its own allocator; that won’t work for the block cache.
为什么这么说呢?是因为block cache并不像内存页那样具有通用性。block cache对应着真实的物理外存块,每一个CPU都可能使用同一个block cache,所以无法像kalloc中那样为每个CPU分配其对应的cache。
既然我们无法分别为每个CPU分配,那么我们可以换一个角度,对所有的block块进行分组:我们可以创建一个哈希桶hash buckets(这里使用的容量为13),那么对每个block块的块号进行取余操作(mod 13)即可将它们映射到其中一个哈希桶中。这样一来,我们需要使用标号为blockno的块时,到其映射的哈希桶中去寻找即可,并且,在并发情况下,我们一般上锁的单位从整个bcache变为了其中的一个桶,锁的粒度大大减小了。
那么我们修改一下bcache的数据结构:
1 2 3 4 5 6 7 8 9 10 11 #define NBUCKET 13 #define BLOCK_HASH(blockno) (blockno % NBUCKET) struct { struct spinlock g_lock ; struct buf buf [NBUF ]; struct spinlock bk_lock [NBUCKET ]; struct buf bucket [NBUCKET ]; int size; } bcache;
我们在重构的方案中,不再使用LRU链表,但还是使用了LRU算法的思想,只是用时间戳来代替LRU链表:
1 2 3 4 5 6 7 8 9 10 11 12 struct buf { int valid; int disk; uint dev; uint blockno; struct sleeplock lock ; uint refcnt; struct buf *next ; uchar data[BSIZE]; uint timestamp; };
这个时间戳通过一个全局变量ticks
(trap.c)来获取:
那么在初始化bcache时,我们只需要初始化其中的锁和bcache.size即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void binit (void ) { struct buf *b ; bcache.size = 0 ; initlock(&bcache.g_lock, "bcache" ); for (int i = 0 ; i < NBUCKET; i++) { initlock(&bcache.bk_lock[i], "bk_lock" ); } for (b = bcache.buf; b < bcache.buf+NBUF; b++){ initsleeplock(&b->lock, "buffer" ); } }
由于我们对于锁的操作单位变为了bucket,所以下面两个函数也需要修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void bpin (struct buf *b) { int idx = BLOCK_HASH(b->blockno); acquire(&bcache.bk_lock[idx]); b->refcnt++; release(&bcache.bk_lock[idx]); } void bunpin (struct buf *b) { int idx = BLOCK_HASH(b->blockno); acquire(&bcache.bk_lock[idx]); b->refcnt--; release(&bcache.bk_lock[idx]); }
在原先的释放buffer块的brelse(struct buf *b)
函数中,会先对b
的引用计数-1,当其值为0时,将其转移至LRU链表的head->prev
位置,说明这个buffer没有被任何进程使用了。而在重构方案中,我们使用时间戳代替了LRU链表,每个bucket中我们并没有维护一个LRU链表,而是认为引用计数为0且时间戳最小的的buffer是没有被任何进程使用的,故在引用计数为0时,只需要更新buffer的时间戳即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void brelse (struct buf *b) { if (!holdingsleep(&b->lock)) panic("brelse" ); releasesleep(&b->lock); int idx = BLOCK_HASH(b->blockno); acquire(&bcache.bk_lock[idx]); b->refcnt--; if (b->refcnt == 0 ) { b->timestamp = ticks; } release(&bcache.bk_lock[idx]); }
对于bcache最重要的bget()
函数,我们重构的思路为:
检查标号为blockno的块的缓存是否在cache中,如果在,则其引用计数+1,返回该缓存块
在缓存中没有找到,先在缓冲块池中寻找还未分配给bucket的缓存块
如果缓冲块池中都分配出去了,就到每个bucket中去找。如果当前遍历的bucket中有buffer的引用计数为0,拿到其中时间戳最小的buffer(这也就是说这个buffer是距离现在最久没有使用的合法块)
如果这个buffer在原先的bucket中,返回这个buffer
否则需要将这个buffer从当前bucket中转移到目标bucket中
这是一个大致的思路,具体细节参考如下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 static struct buf*bget (uint dev, uint blockno) { struct buf *b ; int bucket_idx = BLOCK_HASH(blockno); acquire(&bcache.bk_lock[bucket_idx]); for (b = &bcache.bucket[bucket_idx]; b; b = b->next){ if (b->dev == dev && b->blockno == blockno){ b->refcnt++; release(&bcache.bk_lock[bucket_idx]); acquiresleep(&b->lock); return b; } } acquire(&bcache.g_lock); if (bcache.size < NBUF) { struct buf *b = &bcache.buf[bcache.size++]; b->next = bcache.bucket[bucket_idx].next; bcache.bucket[bucket_idx].next = b; b->dev = dev; b->blockno = blockno; b->valid = 0 ; b->refcnt = 1 ; release(&bcache.g_lock); acquiresleep(&b->lock); return b; } release(&bcache.g_lock); release(&bcache.bk_lock[bucket_idx]); for (int i = 0 ; i < NBUCKET; i++) { struct buf *cur_buf , *pre_buf , *min_buf , *min_pre_buf ; uint min_timestamp = -1 ; acquire(&bcache.bk_lock[bucket_idx]); pre_buf = &bcache.bucket[bucket_idx]; cur_buf = pre_buf->next; while (cur_buf) { if (bucket_idx == BLOCK_HASH(blockno) && cur_buf->blockno == blockno && cur_buf->dev == dev) { cur_buf->refcnt++; release(&bcache.bk_lock[bucket_idx]); acquiresleep(&cur_buf->lock); return cur_buf; } if (cur_buf->refcnt == 0 && cur_buf->timestamp < min_timestamp) { min_pre_buf = pre_buf; min_buf = cur_buf; min_timestamp = cur_buf->timestamp; } pre_buf = cur_buf; cur_buf = cur_buf->next; } if (min_buf) { min_buf->dev = dev; min_buf->blockno = blockno; min_buf->valid = 0 ; min_buf->refcnt = 1 ; if (bucket_idx == BLOCK_HASH(blockno)) { release(&bcache.bk_lock[bucket_idx]); acquiresleep(&min_buf->lock); return min_buf; } min_pre_buf->next = min_buf->next; release(&bcache.bk_lock[bucket_idx]); bucket_idx = BLOCK_HASH(blockno); acquire(&bcache.bk_lock[bucket_idx]); min_buf->next = bcache.bucket[bucket_idx].next; bcache.bucket[bucket_idx].next = min_buf; release(&bcache.bk_lock[bucket_idx]); acquiresleep(&min_buf->lock); return min_buf; } release(&bcache.bk_lock[bucket_idx]); if (++bucket_idx == NBUCKET) { bucket_idx = 0 ; } } panic("bget: no buffers" ); }
bget是这个lab中最复杂的地方,其处理逻辑虽然较好理解,但是多个锁的操作顺序很让人头疼,需要考虑到多种并发情况。
Code Details
代码实现详情请见Github:https://github.com/kerolt/xv6-labs-2023/commit/ccac48e8ae6b6dde3e7c747a77f4149232420901
Reference
https://github.com/whileskies/xv6-labs-2020/blob/main/doc/Lab8-locks.md
https://blog.csdn.net/LostUnravel/article/details/121430900