这是我自学 MIT6.S081 操作系统课程的 lab 代码笔记第八篇:Locks。此 lab 大致耗时:14小时。 课程地址:https://pdos.csail.mit.edu/6.S081/2020/schedule.html Lab 地址:https://pdos.csail.mit.edu/6.S081/2020/labs/lock.html 我的代码地址:https://github.com/Miigon/my-xv6-labs-2020/tree/lock Commits: https://github.com/Miigon/my-xv6-labs-2020/commits/lock 本文中代码注释是编写博客的时候加入的,原仓库中的代码可能缺乏注释或代码不完全相同。
重新设计代码以降低锁竞争,提高多核机器上系统的并行性。
通过拆分 kmem 中的空闲内存链表,降低 kalloc 实现中的 kmem 锁竞争。
kalloc 原本的实现中,使用 freelist 链表,将空闲物理页本身直接用作链表项(这样可以不使用额外空间)连接成一个链表,在分配的时候,将物理页从链表中移除,回收时将物理页放回链表中。
// kernel/kalloc.c
struct {
struct spinlock lock;
struct run *freelist;
} kmem;
分配物理页的实现(原版):
// kernel/kalloc.c
void *
kalloc(void)
{
struct run *r;
acquire(&kmem.lock);
r = kmem.freelist; // 取出一个物理页。页表项本身就是物理页。
if(r)
kmem.freelist = r->next;
release(&kmem.lock);
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
在这里无论是分配物理页或释放物理页,都需要修改 freelist 链表。由于修改是多步操作,为了保持多线程一致性,必须加锁。但这样的设计也使得多线程无法并发申请内存,限制了并发效率。
证据是 kmem 锁上频繁的锁竞争:
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: bcache: #fetch-and-add 0 #acquire() 1260
--- top 5 contended locks:
lock: kmem: #fetch-and-add 83375 #acquire() 433015 // kmem 是整个系统中竞争最激烈的锁
lock: proc: #fetch-and-add 23737 #acquire() 130718
lock: virtio_disk: #fetch-and-add 11159 #acquire() 114
lock: proc: #fetch-and-add 5937 #acquire() 130786
lock: proc: #fetch-and-add 4080 #acquire() 130786
tot= 83375
test1 FAIL
这里体现了一个先 profile 再进行优化的思路。如果一个大锁并不会引起明显的性能问题,有时候大锁就足够了。只有在万分确定性能热点是在该锁的时候才进行优化,「过早优化是万恶之源」。
这里解决性能热点的思路是「将共享资源变为不共享资源」。锁竞争优化一般有几个思路:
该 lab 的实验目标,即是为每个 CPU 分配独立的 freelist,这样多个 CPU 并发分配物理页就不再会互相排斥了,提高了并行性。
但由于在一个 CPU freelist 中空闲页不足的情况下,仍需要从其他 CPU 的 freelist 中“偷”内存页,所以一个 CPU 的 freelist 并不是只会被其对应 CPU 访问,还可能在“偷”内存页的时候被其他 CPU 访问,故仍然需要使用单独的锁来保护每个 CPU 的 freelist。但一个 CPU freelist 中空闲页不足的情况相对来说是比较稀有的,所以总体性能依然比单独 kmem 大锁要快。在最佳情况下,也就是没有发生跨 CPU “偷”页的情况下,这些小锁不会发生任何锁竞争。
// kernel/kalloc.c
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU]; // 为每个 CPU 分配独立的 freelist,并用独立的锁保护它。
char *kmem_lock_names[] = {
"kmem_cpu_0",
"kmem_cpu_1",
"kmem_cpu_2",
"kmem_cpu_3",
"kmem_cpu_4",
"kmem_cpu_5",
"kmem_cpu_6",
"kmem_cpu_7",
};
void
kinit()
{
for(int i=0;i<NCPU;i++) { // 初始化所有锁
initlock(&kmem[i].lock, kmem_lock_names[i]);
}
freerange(end, (void*)PHYSTOP);
}
// kernel/kalloc.c
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
push_off();
int cpu = cpuid();
acquire(&kmem[cpu].lock);
r->next = kmem[cpu].freelist;
kmem[cpu].freelist = r;
release(&kmem[cpu].lock);
pop_off();
}
void *
kalloc(void)
{
struct run *r;
push_off();
int cpu = cpuid();
acquire(&kmem[cpu].lock);
if(!kmem[cpu].freelist) { // no page left for this cpu
int steal_left = 64; // steal 64 pages from other cpu(s)
for(int i=0;i<NCPU;i++) {
if(i == cpu) continue; // no self-robbery
acquire(&kmem[i].lock);
struct run *rr = kmem[i].freelist;
while(rr && steal_left) {
kmem[i].freelist = rr->next;
rr->next = kmem[cpu].freelist;
kmem[cpu].freelist = rr;
rr = kmem[i].freelist;
steal_left--;
}
release(&kmem[i].lock);
if(steal_left == 0) break; // done stealing
}
}
r = kmem[cpu].freelist;
if(r)
kmem[cpu].freelist = r->next;
release(&kmem[cpu].lock);
pop_off();
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
这里选择在内存页不足的时候,从其他的 CPU “偷” 64 个页,这里的数值是随意取的,在现实场景中,最好进行测量后选取合适的数值,尽量使得“偷”页频率低。
UPDATE 2022-04-14: 上述代码可能产生死锁(cpu_a 尝试偷 cpu_b,cpu_b 尝试偷 cpu_a),可能的解决方案看本文评论区或 https://github.com/Miigon/blog/issues/8。
再次运行 kalloctest:
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem_cpu_0: #fetch-and-add 0 #acquire() 35979
lock: kmem_cpu_1: #fetch-and-add 0 #acquire() 195945
lock: kmem_cpu_2: #fetch-and-add 0 #acquire() 201094
lock: bcache: #fetch-and-add 0 #acquire() 1248
--- top 5 contended locks:
lock: proc: #fetch-and-add 22486 #acquire() 132299
lock: virtio_disk: #fetch-and-add 16002 #acquire() 114
lock: proc: #fetch-and-add 11199 #acquire() 132301
lock: proc: #fetch-and-add 5330 #acquire() 132322
lock: proc: #fetch-and-add 4874 #acquire() 132345
tot= 0
test1 OK
start test2
total free number of pages: 32499 (out of 32768)
.....
test2 OK
可以看到,kmem 带来的锁竞争降低到了 0(从原本的 ~83375)。
If multiple processes use the file system intensively, they will likely contend for bcache.lock, which protects the disk block cache in kernel/bio.c. bcachetest creates several processes that repeatedly read different files in order to generate contention on bcache.lock;
多个进程同时使用文件系统的时候,bcache.lock 上会发生严重的锁竞争。bcache.lock 锁用于保护磁盘区块缓存,在原本的设计中,由于该锁的存在,多个进程不能同时操作(申请、释放)磁盘缓存。
因为不像 kalloc 中一个物理页分配后就只归单个进程所管,bcache 中的区块缓存是会被多个进程(进一步地,被多个 CPU)共享的(由于多个进程可以同时访问同一个区块)。所以 kmem 中为每个 CPU 预先分割一部分专属的页的方法在这里是行不通的。
前面提到的:
锁竞争优化一般有几个思路:
在这里, bcache 属于“必须共享”的情况,所以需要用到第二个思路,降低锁的粒度,用更精细的锁 scheme 来降低出现竞争的概率。
// kernel/bio.c
struct {
struct spinlock lock;
struct buf buf[NBUF];
// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head;
} bcache;
原版 xv6 的设计中,使用双向链表存储所有的区块缓存,每次尝试获取一个区块 blockno 的时候,会遍历链表,如果目标区块已经存在缓存中则直接返回,如果不存在则选取一个最近最久未使用的,且引用计数为 0 的 buf 块作为其区块缓存,并返回。
新的改进方案,可以建立一个从 blockno 到 buf 的哈希表,并为每个桶单独加锁。这样,仅有在两个进程同时访问的区块同时哈希到同一个桶的时候,才会发生锁竞争。当桶中的空闲 buf 不足的时候,从其他的桶中获取 buf。
思路上是很简单的,但是具体实现的时候,需要注意死锁问题。这里的许多死锁问题比较隐晦,而且 bcachetest 测试不出来,但是在实际运行的系统中,是有可能触发死锁的。网上看过许多其他通过了的同学的博客,代码中都没有注意到这一点。
考虑一下我们的新设计,首先在 bcache 中定义哈希表 bufmap,并为每个桶设置锁:
// kernel/bio.h
struct {
struct buf buf[NBUF];
struct spinlock eviction_lock;
// Hash map: dev and blockno to buf
struct buf bufmap[NBUFMAP_BUCKET];
struct spinlock bufmap_locks[NBUFMAP_BUCKET];
} bcache;
bget(uint dev, uint blockno) 中,首先在 blockno 对应桶中扫描缓存是否存在,如果不存在,则在所有桶中寻找一个最近最久未使用的无引用 buf,进行缓存驱逐,然后将其重新移动到 blockno 对应的桶中(rehash),作为 blockno 的缓存返回。
这里很容易就会写出这样的代码:
bget(dev, blockno) {
key := hash(dev, blockno);
acquire(bufmap_locks[key]); // 获取 key 桶的锁
// 查找 blockno 的缓存是否存在,若是直接返回,若否继续执行
if(b := look_for_blockno_in(bufmap[key])) {
b->refcnt++
release(bufmap_locks[key]);
return b;
}
// 查找可驱逐缓存 b
least_recently := NULL;
for i := [0, NBUFMAP_BUCKET) { // 遍历所有的桶
acquire(bufmap_locks[i]); // 获取第 i 桶的锁
b := look_for_least_recently_used_with_no_ref(bufmap[key]);
// 如果找到未使用时间更长的空闲块
if(b.last_use < least_recently.last_use) {
least_recently := b;
}
release(bufmap_locks[i]); // 查找结束后,释放第 i 桶的锁
}
b := least_recently;
// 驱逐 b 原本存储的缓存(将其从原来的桶删除)
evict(b);
// 将 b 加入到新的桶
append(bucket[key], b);
release(bufmap_locks[key]); // 释放 key 桶的锁
// 设置 b 的各个属性
setup(b);
return b;
}
上面的代码看起来很合理,但是却有两个问题,一个导致运行结果出错,一个导致死锁。
第一个问题比较显而易见,后面进行缓存驱逐的时候,每扫描一个桶前会获取该桶的锁,但是每扫描完一个桶后又释放了该桶的锁。从释放锁的那一瞬间,获取出来的最近最久未使用的空闲 buf 就不再可靠了。因为在我们释放 b 原来所在的桶的锁后(release(bufmap_locks[i]);
后),但是从原桶删除 b 之前(evict(b);
前),另一个 CPU 完全可能会调用 bget 请求 b,使得 b 的引用计数变为不为零。此时我们对 b 进行驱逐就是不安全的了。
解决方法也并不复杂,只需要在扫描桶的时候,确保找到最近最久未使用的空闲 buf 后,不释放桶锁,继续持有其对应的桶的锁直到驱逐完成即可。
这里维护的不变量(invariant)是:「扫描到的 buf 在驱逐完成前保持可驱逐」,以及「桶中若存在某个块的 buf,则这个 buf 可用,bget可以直接返回这个 buf」。
bget(dev, blockno) {
acquire(bufmap_locks[key]); // 获取 key 桶锁
// 查找 blockno 的缓存是否存在,若是直接返回,若否继续执行
if(b := look_for_blockno_in(bufmap[key])) {
b->refcnt++
release(bufmap_locks[key]);
return b;
}
// 缓存不存在,查找可驱逐缓存 b
least_recently := NULL;
holding_bucket := -1;
for i := [0, NBUFMAP_BUCKET) { // 遍历所有的桶
acquire(bufmap_locks[i]); // 获取第 i 桶的锁
b := look_for_least_recently_used_with_no_ref(bufmap[key]);
// 如果找到未使用时间更长的空闲块(新的 least_recently)
if(b.last_use >= least_recently.last_use) {
release(bufmap_locks[i]); // 该桶中没有找到新的 least_recently,释放该桶的锁
} else {
// b.last_use < least_recently.last_use
least_recently := b;
// 释放原本 holding 的锁(holding_bucket < i)
if(holding_bucket != -1 && holding_bucket != key) release(bufmap_locks[holding_bucket]);
// 保持第 i 桶的锁不释放......
holding_bucket := i;
}
}
b := least_recently;
// 此时,仍然持有 b 所在的桶的锁 bufmap_locks[holding_bucket]
// 驱逐 b 原本存储的缓存(将其从原来的桶删除)
evict(b);
release(bufmap_locks[holding_bucket]); // 驱逐后再释放 b 原本所在桶的锁
// 将 b 加入到新的桶
append(bucket[key], b);
release(bufmap_locks[key]); // 释放 key 桶锁
// 设置 b 的各个属性
setup(b);
return b;
}
这里出现的第二个问题就是,一开始我们在 blockno 对应的桶中遍历检查缓存是否存在时,获取了它的锁。而在我们发现 blockno 不存在缓存中之后,需要在拿着 key 桶锁的同时,遍历所有的桶并依次获取它们每个的锁,考虑这种情况:
假设块号 b1 的哈希值是 2,块号 b2 的哈希值是 5
并且两个块在运行前都没有被缓存
----------------------------------------
CPU1 CPU2
----------------------------------------
bget(dev, b1) bget(dev,b2)
| |
V V
获取桶 2 的锁 获取桶 5 的锁
| |
V V
缓存不存在,遍历所有桶 缓存不存在,遍历所有桶
| |
V V
...... 遍历到桶 2
| 尝试获取桶 2 的锁
| |
V V
遍历到桶 5 桶 2 的锁由 CPU1 持有,等待释放
尝试获取桶 5 的锁
|
V
桶 5 的锁由 CPU2 持有,等待释放
!此时 CPU1 等待 CPU2,而 CPU2 在等待 CPU1,陷入死锁!
这里,由于 CPU1 持有锁 2 的情况下去申请锁 5,而 CPU2 持有锁 5 的情况下申请锁 2,造成了环路等待。
复习一下死锁的四个条件:
只要破坏了四个条件中的任何一个,就能破坏死锁。为了尝试解决这个死锁问题,我们考虑破坏每一个条件的可行性:
从「互斥」、「不剥夺」和「环路等待」条件入手都无法解决这个死锁问题,那只能考虑「请求保持」了。
这里死锁出现的原因是我们在拿着一个锁的情况下,去尝试申请另一个锁,并且请求顺序出现了环路。既然带环路的请求顺序是不可避免的,那唯一的选项就是在申请任何其他桶锁之前,先放弃之前持有的 key 的桶锁,在找到并驱逐最近最久未使用的空闲块 b 后,再重新获取 key 的桶锁,将 b 加入桶。
大致代码是这样:
bget(dev, blockno) {
acquire(bufmap_locks[key]); // 获取 key 桶锁
// 查找 blockno 的缓存是否存在,若是直接返回,若否继续执行
if(b := look_for_blockno_in(bufmap[key])) {
b->refcnt++
release(bufmap_locks[key]);
return b;
}
release(bufmap_locks[key]); // 先释放 key 桶锁,防止查找驱逐时出现环路等待
// 缓存不存在,查找可驱逐缓存 b
holding_bucket := -1;
for i := [0, NBUFMAP_BUCKET) {
acquire(bufmap_locks[i]); // 请求时不持有 key 桶锁,不会出现环路等待
if(b := look_for_least_recently_used_with_no_ref(bufmap[key])) {
if(holding_bucket != -1) release(bufmap_locks[holding_bucket]);
holding_bucket := i;
// 如果找到新的未使用时间更长的空闲块,则将原来的块所属桶的锁释放掉,保持新块所属桶的锁...
} else {
release(bufmap_locks[holding_bucket]);
}
}
// 驱逐 b 原本存储的缓存(将其从原来的桶删除)
evict(b);
release(bufmap_locks[holding_bucket]); // 释放 b 原所在桶的锁
acquire(bufmap_locks[key]); // 再次获取 key 桶锁
append(b, bucket[key]); // 将 b 加入到新的桶
release(bufmap_locks[key]); // 释放 key 桶锁
// 设置 b 的各个属性
setup(b);
return b;
}
这样以来,bget 中无论任何位置,获取桶锁的时候都要么没拿其他锁,要么只拿了其左侧的桶锁(遍历所有桶查找可驱逐缓存 b 的过程中,对桶的遍历固定从小到大访问),所以永远不会出现环路,死锁得到了避免。但是这样的方案又会带来新的问题。
注意到我们开始「搜索所有桶寻找可驱逐的 buf」这个过程前,为了防止环路等待,而释放了 key 的桶锁(key 为请求的 blockno 的哈希值),直到遍历所有桶并驱逐最近最久未使用的空闲 buf 的过程完成后才重新获取 key 桶锁。问题在于,在释放掉 key 桶锁之后,第一块关键区(“查找 blockno 的缓存是否存在,若是直接返回,若否继续执行”的区域)就得不到锁保护了。这意味着在「释放掉 key 桶锁后」到「重新获取 key 桶锁前」的这个阶段,也就是我们进行驱逐+重分配时,另外一个 CPU 完全有可能访问同一个 blockno,获取到 key 的桶锁,通过了一开始「缓存不存在」的测试,然后也进入到驱逐+重分配中,导致「一个区块有多份缓存」的错误情况出现。
怎么保障同一个区块不会有两个缓存呢?
这个问题相对比较棘手,我们目前知道的限制条件有:
这里不得不承认,我并没有想到什么特别好的方法,只想到了一个牺牲一点效率,但是能保证极端情况下安全的方案:
这样以来,即使有多个线程同时请求同一个 blockno,并且所有线程都碰巧通过了一开始的「blockno 的缓存是否存在」的判断且结果都为「缓存不存在」,则进入受 eviction_lock 保护的驱逐+重分配区代码后,能够实际进行驱逐+重分配的,也只有第一个进入的线程。
第一个线程进入并驱逐+重分配完毕后才释放 eviction_lock,此时 blockno 的缓存已经由不存在变为存在了,后续的所有线程此时进入后都会被第二次「blockno 缓存是否存在」的判断代码拦住,并直接返回已分配好的缓存 buf,而不会重复对同一个 blockno 进行驱逐+重分配。
这么做的好处是,保证了查找过程中不会出现死锁,并且不会出现极端情况下一个块产生多个缓存的情况。而坏处是,引入了全局 eviction_lock,使得原本可并发的遍历驱逐过程的并行性降低了。并且每一次 cache miss 的时候,都会多一次额外的桶遍历开销。
然而,cache miss 本身(hopefully)为比较稀有事件,并且对于 cache miss 的块,由于后续需要从磁盘中读入其数据,磁盘读入的耗时将比一次桶遍历的耗时多好几个数量级,所以我认为这样的方案的开销还是可以接受的。
ps. 这样的设计,有一个名词称为「乐观锁(optimistic locking)」,即在冲突发生概率很小的关键区内,不使用独占的互斥锁,而是在提交操作前,检查一下操作的数据是否被其他线程修改(在这里,检测的是 blockno 的缓存是否已被加入),如果是,则代表冲突发生,需要特殊处理(在这里的特殊处理即为直接返回已加入的 buf)。这样的设计,相比较「悲观锁(pessimistic locking)」而言,可以在冲突概率较低的场景下(例如 bget),降低锁开销以及不必要的线性化,提升并行性(例如在 bget 中允许「缓存是否存在」的判断并行化)。有时候还能用于避免死锁。
bget(dev, blockno) {
acquire(bufmap_locks[key]); // 获取 key 桶锁
// 查找 blockno 的缓存是否存在,若是直接返回,若否继续执行
if(b := look_for_blockno_in(bufmap[key])) {
b->refcnt++
release(bufmap_locks[key]);
return b;
}
// 注意这里的 acquire 和 release 的顺序
release(bufmap_locks[key]); // 先释放 key 桶锁,防止查找驱逐时出现环路死锁
acquire(eviction_lock); // 获得驱逐锁,防止多个 CPU 同时驱逐影响后续判断
// **再次查找 blockno 的缓存是否存在**,若是直接返回,若否继续执行
// 这里由于持有 eviction_lock,没有任何其他线程能够进行驱逐操作,所以
// 没有任何其他线程能够改变 bufmap[key] 桶链表的结构,所以这里不事先获取
// 其相应桶锁而直接开始遍历是安全的。
if(b := look_for_blockno_in(bufmap[key])) {
acquire(bufmap_locks[key]); // 必须获取,保护非原子操作 `refcnt++`
b->refcnt++
release(bufmap_locks[key]);
release(eviction_lock);
return b;
}
// 缓存不存在,查找可驱逐缓存 b
holding_bucket := -1; // 当前持有的桶锁
for i := [0, NBUFMAP_BUCKET) {
acquire(bufmap_locks[i]); // 请求时不持有 key 桶锁,不会出现环路等待
if(b := look_for_least_recently_used_with_no_ref(bufmap[key])) {
if(holding_bucket != -1) release(bufmap_locks[holding_bucket]);
holding_bucket := i;
// 如果找到新的未使用时间更长的空闲块,则将原来的块所属桶的锁释放掉,保持新块所属桶的锁...
} else {
release(bufmap_locks[holding_bucket]);
}
}
acquire(bufmap_locks[key]); // 再次获取 key 桶锁
append(b, bucket[key]); // 将 b 加入到新的桶
release(bufmap_locks[key]); // 释放 key 桶锁
release(eviction_lock); // 释放驱逐锁
// 设置 b 的各个属性
setup(b);
return b;
}
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
uint lastuse; // *newly added, used to keep track of the least-recently-used buf
struct buf *next;
uchar data[BSIZE];
};
// kernel/bio.c
// bucket number for bufmap
#define NBUFMAP_BUCKET 13
// hash function for bufmap
#define BUFMAP_HASH(dev, blockno) ((((dev)<<27)|(blockno))%NBUFMAP_BUCKET)
struct {
struct buf buf[NBUF];
struct spinlock eviction_lock;
// Hash map: dev and blockno to buf
struct buf bufmap[NBUFMAP_BUCKET];
struct spinlock bufmap_locks[NBUFMAP_BUCKET];
} bcache;
void
binit(void)
{
// Initialize bufmap
for(int i=0;i<NBUFMAP_BUCKET;i++) {
initlock(&bcache.bufmap_locks[i], "bcache_bufmap");
bcache.bufmap[i].next = 0;
}
// Initialize buffers
for(int i=0;i<NBUF;i++){
struct buf *b = &bcache.buf[i];
initsleeplock(&b->lock, "buffer");
b->lastuse = 0;
b->refcnt = 0;
// put all the buffers into bufmap[0]
b->next = bcache.bufmap[0].next;
bcache.bufmap[0].next = b;
}
initlock(&bcache.eviction_lock, "bcache_eviction");
}
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
uint key = BUFMAP_HASH(dev, blockno);
acquire(&bcache.bufmap_locks[key]);
// Is the block already cached?
for(b = bcache.bufmap[key].next; b; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.bufmap_locks[key]);
acquiresleep(&b->lock);
return b;
}
}
// Not cached.
// to get a suitable block to reuse, we need to search for one in all the buckets,
// which means acquiring their bucket locks.
// but it's not safe to try to acquire every single bucket lock while holding one.
// it can easily lead to circular wait, which produces deadlock.
release(&bcache.bufmap_locks[key]);
// we need to release our bucket lock so that iterating through all the buckets won't
// lead to circular wait and deadlock. however, as a side effect of releasing our bucket
// lock, other cpus might request the same blockno at the same time and the cache buf for
// blockno might be created multiple times in the worst case. since multiple concurrent
// bget requests might pass the "Is the block already cached?" test and start the
// eviction & reuse process multiple times for the same blockno.
//
// so, after acquiring eviction_lock, we check "whether cache for blockno is present"
// once more, to be sure that we don't create duplicate cache bufs.
acquire(&bcache.eviction_lock);
// Check again, is the block already cached?
// no other eviction & reuse will happen while we are holding eviction_lock,
// which means no link list structure of any bucket can change.
// so it's ok here to iterate through `bcache.bufmap[key]` without holding
// it's cooresponding bucket lock, since we are holding a much stronger eviction_lock.
for(b = bcache.bufmap[key].next; b; b = b->next){
if(b->dev == dev && b->blockno == blockno){
acquire(&bcache.bufmap_locks[key]); // must do, for `refcnt++`
b->refcnt++;
release(&bcache.bufmap_locks[key]);
release(&bcache.eviction_lock);
acquiresleep(&b->lock);
return b;
}
}
// Still not cached.
// we are now only holding eviction lock, none of the bucket locks are held by us.
// so it's now safe to acquire any bucket's lock without risking circular wait and deadlock.
// find the one least-recently-used buf among all buckets.
// finish with it's corresponding bucket's lock held.
struct buf *before_least = 0;
uint holding_bucket = -1;
for(int i = 0; i < NBUFMAP_BUCKET; i++){
// before acquiring, we are either holding nothing, or only holding locks of
// buckets that are *on the left side* of the current bucket
// so no circular wait can ever happen here. (safe from deadlock)
acquire(&bcache.bufmap_locks[i]);
int newfound = 0; // new least-recently-used buf found in this bucket
for(b = &bcache.bufmap[i]; b->next; b = b->next) {
if(b->next->refcnt == 0 && (!before_least || b->next->lastuse < before_least->next->lastuse)) {
before_least = b;
newfound = 1;
}
}
if(!newfound) {
release(&bcache.bufmap_locks[i]);
} else {
if(holding_bucket != -1) release(&bcache.bufmap_locks[holding_bucket]);
holding_bucket = i;
// keep holding this bucket's lock....
}
}
if(!before_least) {
panic("bget: no buffers");
}
b = before_least->next;
if(holding_bucket != key) {
// remove the buf from it's original bucket
before_least->next = b->next;
release(&bcache.bufmap_locks[holding_bucket]);
// rehash and add it to the target bucket
acquire(&bcache.bufmap_locks[key]);
b->next = bcache.bufmap[key].next;
bcache.bufmap[key].next = b;
}
b->dev = dev;
b->blockno = blockno;
b->refcnt = 1;
b->valid = 0;
release(&bcache.bufmap_locks[key]);
release(&bcache.eviction_lock);
acquiresleep(&b->lock);
return b;
}
// ......
// Release a locked buffer.
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
uint key = BUFMAP_HASH(b->dev, b->blockno);
acquire(&bcache.bufmap_locks[key]);
b->refcnt--;
if (b->refcnt == 0) {
b->lastuse = ticks;
}
release(&bcache.bufmap_locks[key]);
}
void
bpin(struct buf *b) {
uint key = BUFMAP_HASH(b->dev, b->blockno);
acquire(&bcache.bufmap_locks[key]);
b->refcnt++;
release(&bcache.bufmap_locks[key]);
}
void
bunpin(struct buf *b) {
uint key = BUFMAP_HASH(b->dev, b->blockno);
acquire(&bcache.bufmap_locks[key]);
b->refcnt--;
release(&bcache.bufmap_locks[key]);
}
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem_cpu_0: #fetch-and-add 0 #acquire() 32897
lock: kmem_cpu_1: #fetch-and-add 0 #acquire() 77
lock: kmem_cpu_2: #fetch-and-add 0 #acquire() 61
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 6400
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 6685
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 6696
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 7018
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 6266
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 4206
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 4206
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 2193
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 4202
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 2196
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 4359
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 4409
lock: bcache_bufmap: #fetch-and-add 0 #acquire() 6411
lock: bcache_eviction: #fetch-and-add 0 #acquire() 83
--- top 5 contended locks:
lock: proc: #fetch-and-add 397110 #acquire() 70988
lock: proc: #fetch-and-add 262715 #acquire() 70988
lock: proc: #fetch-and-add 222165 #acquire() 70987
lock: virtio_disk: #fetch-and-add 161088 #acquire() 1098
lock: proc: #fetch-and-add 45459 #acquire() 71331
tot= 0
test0: OK
start test1
test1 OK
$
多线程问题往往不如单线程程序中的问题那样容易发现,并且需要对底层指令层面以及 CPU 运行原理层面有足够的认知,才能有效地发现并解决多线程问题。引用 lecture 中的几个建议作为结尾:
don't share if you don't have to
start with a few coarse-grained locks
instrument your code -- which locks are preventing parallelism?
use fine-grained locks only as needed for parallel performance
use an automated race detector
最后我自己的话:
multithreading is a pain😭 , only worth it if there is non-insignificant performance increase.
maybe try multi-process architecture for your next project, so you don't have to deal with all the multithreading hassles. you get the bonus of being able to scale horizontally (and almost infinitely) as well :)
补充一下各个锁的作用:
更简短地讲:
驱逐过程中,首先需要拿 eviction_lock,使得可以遍历所有桶的链表结构。然后遍历链表结构寻找可驱逐块的时候,由于在某个桶i中判断是否有可驱逐块的过程需要读取 refcnt,所以需要再拿该桶的 bufmap_locks[i]。
Tricky的地方就是,bget 方法一开始判断块是否在缓存中时也获取了一个桶的 bufmap_locks[key],此时如果遍历获取所有桶的 bufmap_locks[i] 的话,很容易引起环路等待而触发死锁。若在判断是否存在后立刻释放掉 bufmap_locks[key] 再拿 eviction_lock 的话,又会导致在释放桶锁和拿 eviction_lock 这两个操作中间的微小间隙,其他线程可能会对同一个块号进行 bget 访问,导致最终同一个块被插入两次。
博客后半部分都是在讲我是如何(尝试)解决这一问题的。
最终方案是在释放 bufmap_locks[key],获取 eviction_lock 之后,再判断一次目标块号是否已经插入。这意味着依然会出现 bget 尝试对同一个 blockno 进行驱逐并插入两次的情况,但是能够保证除了第一个驱逐+插入的尝试能成功外,后续的尝试都不会导致重复驱逐+重复插入,而是能正确返回第一个成功的驱逐+插入产生的结果。也就是允许竞态条件的发生,但是对其进行检测,可以理解为一种乐观锁 optimistic locking。