多核机器上并行能力差的常见原因就是锁争用问题,提高并行能力需要修改数据结构和加锁策略。本实验是为了提高内存分配器和block buffer的并行能力,设计思想是分段加锁。
重新设计物理内存分配器,现在是单一内存页链表(freelist)被所有CPU共享,需要设计成每个CPU一个freelist来减少锁争用。但是存在某个CPU的freelist为空,但是其他CPU的freelist不为空,每个CPU的freelist不均匀情况,所以需要增加窃取逻辑,从其他CPU窃取一半空闲内存页。
在原来的基础创建NCPU个kmem,每个CPU各有一个私有freelist。
struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];
void
kinit()
{
for (int i = 0; i < NCPU; i++){
char buf[10];
snprintf(buf,10,"kmem_%d",i);
initlock(&kmem[i].lock,buf);
}
freerange(end, (void*)PHYSTOP);
}
//均衡分配物理页,此时处于初始化阶段,由0-CPU执行,不需要考虑争用问题
void
freerange(void *pa_start, void *pa_end)
{
char *p;
int id=0, total=0;
struct run *r;
p = (char*)PGROUNDUP((uint64)pa_start);
//均匀分配物理页
for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE){
r=(struct run*)p;
r->next = kmem[id].freelist;
kmem[id].freelist = r;
id=(id+1)%NCPU;
total++;
}
}
在申请内存时需要通过cpuid()获取CPU核编号hartid,这个函数是没有锁保护的,所以获取cpuid时需要关中断,避免被切换出去而破坏了临界区。如果该CPU的freelist为空,则释放锁,然后遍历其他CPU的freelist来steal物理页,每次只获取一个lock,不会出现互相等待的情况。
// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
struct run *r;
push_off(); //关中断,避免被切换出去,破坏临界区
int id=cpuid();
acquire(&kmem[id].lock);
r = kmem[id].freelist;
if(r)
kmem[id].freelist = r->next;
release(&kmem[id].lock);
if(!r){
//steal from others
for (int i = 0; i < NCPU; i++){
if(i!=id){
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if(r){
kmem[i].freelist = r->next;
release(&kmem[i].lock);
break;
}
release(&kmem[i].lock);
}
}
}
pop_off(); //在此之前不能够被切换出去
if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}
释放逻辑比较简单,直接放回该CPU的freelist即可
void
kfree(void *pa)
{
struct run *r;
if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");
// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);
r = (struct run*)pa;
push_off(); //关中断,避免破坏临界区
int id=cpuid();
acquire(&kmem[id].lock);
r->next = kmem[id].freelist;
kmem[id].freelist = r;
release(&kmem[id].lock);
pop_off();
}
内核为磁盘维护了block cache来加速读写,进程读写文件时就需要线程安全的操作block cache,本实验是为了重新设计block cache来减少锁争用问题。bcache采用一个bcache.lock保护,需要改成分段锁来支持并行读写。
原先bcache是维护一条双链表且全局锁,并发能力差,因此对blockno哈希分桶,来减少争用提高并行能力。
buf的blockno、refcnt、prev、next等字段是通过bcache.lockshash(blockno)来保护,这样才能移动buf,buf.lock保护的是data。
struct {
struct spinlock evictlock; //置换
struct spinlock locks[NBUCKET];
struct buf buf[NBUF];
// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
// struct buf head; //dummy node
struct buf heads[NBUCKET];
} bcache;
struct buf {
//受bcache.locks[hash(b.blockno)]保护
int disk; // does disk "own" buf?
uint dev;
uint blockno;
uint refcnt;
struct buf *prev; // LRU cache list
struct buf *next;
int valid; // has data been read from disk?
struct sleeplock lock; //只能保护data、valid、lastaccesstick
uint64 lastaccesstick;
uchar data[BSIZE];
};
int hash(uint blockno){
return blockno%NBUCKET;
}
void
binit(void)
{
struct buf *b;
initlock(&bcache.evictlock,"bcache_evict");
for (int i = 0; i < NBUCKET; i++){
char buf[10];
snprintf(buf,10,"bcache_%d",i);
initlock(&bcache.locks[i],buf);
bcache.heads[i].prev = &bcache.heads[i];
bcache.heads[i].next = &bcache.heads[i];
}
//初始化block并添加到0-桶
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->lastaccesstick=ticks;
int id=hash(b->blockno);
b->next=bcache.heads[id].next;
b->prev=&bcache.heads[id];
initsleeplock(&b->lock, "buffer");
bcache.heads[id].next->prev=b;
bcache.heads[id].next=b;
// printf("buffer, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
}
}
bread函数返回指定blockno的已加锁的buf,获取流程如下:
为什么要第二次遍历该桶?
答:因为第一次遍历时如果没命中也是要释放锁的,有可能有多个访问同一个block的进程同时经过第一次遍历,都没有命中,那么只能有一个进程真正能够执行置换逻辑,其他的进程第二次遍历时就能够命中。
既然对每个桶都要加锁,那为什么要加全局锁?
答:有多个访问同一个block的进程,就必须加全局锁才能够保证只有一个进程经过第二次遍历后继续往下
执行置换逻辑。其他进程被拦在全局锁位置,等第一个进程置换成功后,其他进程在第二次遍历时就能够命中。
对两个桶加锁策略:
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
int id=hash(blockno);
acquire(&bcache.locks[id]);
// Is the block already cached?
for(b = bcache.heads[id].next; b != &bcache.heads[id]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.locks[id]);
//持有锁期间可能正在io,所以用sleeplock
acquiresleep(&b->lock);
// printf("buffer, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
return b;
}
}
release(&bcache.locks[id]);
// Not cached.
// Recycle the least recently used (LRU) unused buffer.
//寻找一个buffer用于置换,只有加了该锁才能改变各bucket的buffer数量
//可能有多个访问同一block的进程到达此处,加全局锁,确保只有一个置换成功,其他的重新遍历时会命中
acquire(&bcache.evictlock);
acquire(&bcache.locks[id]);
// Is the block already cached?
for(b = bcache.heads[id].next; b != &bcache.heads[id]; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.locks[id]);
release(&bcache.evictlock);
//持有锁期间可能正在io,所以用sleeplock
acquiresleep(&b->lock);
// printf("buffer, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
return b;
}
}
release(&bcache.locks[id]);
uint lastaccesstick=~0;
int index=-1;
struct buf *selectedbuf=0;
for(int i=0;i< NBUCKET ;i++){
int find=0;
acquire(&bcache.locks[i]);
for(b = bcache.heads[i].prev; b != &bcache.heads[i]; b = b->prev){
if(b->refcnt>0){
continue;
}
if(b->lastaccesstick<lastaccesstick) {
//被选中的block的桶锁不释放,没选中的就释放掉
//会同时加两个桶锁,但是加锁顺序一样,破坏了环路等待条件
if(index!=-1 && index!=i){
release(&bcache.locks[index]);
}
lastaccesstick=b->lastaccesstick;
index=i;
selectedbuf=b;
find=1;
}
}
//没找到就释放锁
if(!find){
release(&bcache.locks[i]);
}
}
if(selectedbuf==0){
panic("bget: no buffers");
}
selectedbuf->dev = dev;
selectedbuf->blockno = blockno;
selectedbuf->valid = 0;
selectedbuf->refcnt = 1;
//移除选中的buffer
if(index!=id){
selectedbuf->prev->next=selectedbuf->next;
selectedbuf->next->prev=selectedbuf->prev;
}
release(&bcache.locks[index]);
//添加选中的buffer到目标桶
if(index!=id){
acquire(&bcache.locks[id]);
selectedbuf->next=bcache.heads[id].next;
selectedbuf->prev=&bcache.heads[id];
selectedbuf->next->prev=selectedbuf;
bcache.heads[id].next=selectedbuf;
release(&bcache.locks[id]);
}
release(&bcache.evictlock);
acquiresleep(&selectedbuf->lock);
return selectedbuf;
}
// Return a locked buf with the contents of the indicated block.
//返回一个加锁的buffer
struct buf*
bread(uint dev, uint blockno)
{
struct buf *b;
b = bget(dev, blockno);
//如果这个block不在cache中,则从磁盘上加载
if(!b->valid) {
virtio_disk_rw(b, 0);
b->valid = 1;
}
// printf("read, blockno: %d, dev: %d, disk: %d, refcnt: %d, valid: %d\n",b->blockno,b->dev,b->disk,b->refcnt,b->valid);
return b;
}
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
//为啥id不在持有锁时获取呢?都可以,反正该buf的引用大于0,不会被置换出去
int id=hash(b->blockno);
acquire(&bcache.locks[id]);
b->refcnt--;
if (b->refcnt == 0) {
// no one is waiting for it.
b->lastaccesstick=ticks;
}
release(&bcache.locks[id]);
}
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。