前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >无锁队列实现

无锁队列实现

原创
作者头像
ge3m0r
发布2024-06-01 21:15:10
570
发布2024-06-01 21:15:10

无锁

开发过程中,对于多线程多进程的并发和并行的几乎是编程不可避免的事情,特别在涉及对于数据进行修改或者添加的时候。这个时候就需要锁的出现,锁有多种类型,互斥锁,自旋锁。除了锁之外,我们还定义了原子操作,当然如果探究本质的话,原子操作也是有锁的,只不过是对汇编的操作锁。

内联汇编

内联汇编是 GNU 的规定一种在 C 语言中嵌入汇编语言的方式。当然不必把他想的太过高深,稍微了解编译原理的知道,无论我们写什么格式的语言,都会经过编译器对我们的代码进行词法分析,语义分析生成有限自动机,最后转化为汇编,汇编转化为二进制然后执行。

内联汇编在 Windows 平台和 Linux 平台上边规则不太相同,本质是因为两个平台的汇编风格不同,Linux 平台上的使用的 AT&T 风格的汇编,而 Windows 平台使用的是 nasm 风格的汇编,另外也因为编译器不同导致两者内联汇编风格也不太相同。这里只谈 Linux 平台的汇编风格。

代码语言:asm
复制
__asm__ [volatile] (
"asm opration\n\t"
:"=r"(output1), "=m"(output2)
:"0"(input1) ,"m"(input2)
:"cc", "memory"
);

上述 Linux 平台上一般的内联汇编,其中 asm 是标志,是必须的内容。

volatile 表示是否要对后边汇编进行优化,编译器都会代码有优化的方案,因此如果嵌入汇编会对其进行优化,而有时这样的优化策略会影响我们代码本来的逻辑,因此一般都加上。

后边第一个行内容是汇编的代码,表示我们想要嵌入的汇编逻辑。

第二行表示输出操作数,“=r" 表示输出到寄存器上,”=m" 表示输出在内存上。

第三行表示输入操作树,表示输入我们 c 语言的变量。

最后一行较为简单,“cc" 表示在寄存器上进行操作, ”memory“ 表示在内存上有操作。

我们本次定义的简单操作逻辑如下:

代码语言:c
复制
static int lxx_atomic_add(int * ptr, int increment)
{
    int old_value = *ptr;
   __asm__ volatile("lock; xadd %0, %1\n\t"
   	                : "=r"(old_value), "=m"(*ptr) //输出操作数
   	                : "0"(increament), "m"(*ptr) //输入操作数
   	                : "cc", "memory");
   return *ptr;
}

上述就是一个简单的原子加法操作,其中 lock 操作是加锁,另外因为高并发下,old_value 会有可能在原子操作过程中被获取,因此需要在原子操作中更新 old_value。

无锁队列实现

下边是一个无锁队列一个简单类的实现。

代码语言:c
复制
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <mutex>
#include <time.h>
#include <atomic>
#include <list>
#include <memory>

#define MAX_THREAD_NUM 1
#define FOR_LOOP_COUNT 100000
static int counter = 0;
static std::mutex s_mutex;
static int s_count_push = 0;
static int s_count_pop = 0;

static std::list<int> s_list;

typedef void *(thread_func_t)(void *argv);

template <typename ElemType>
struct qnode   //是用链表来维护队列
{
   struct qnode *next;
   ElemType _data;
};

class Queue
{
private:
	struct qnode<ElemType> *volatile_head = NULL;  //队列的头部
	struct qnod<ElemType> *volatild_tail = NULL;   //队列的尾部
public:
	Queue()  //初始化构造队列
	{
	  _head = _tail = new qnode<ElemType>;
	  _head->next = NULL;
	  _tail->next = NULL;
	  printf("Queue _head:%p\n", _head);
	}

	void push(const ElemType &e)  //不考虑并发情况下在队列中添加节点
	{
	  struct qnode<ElemType> *p = new qnode<ElemType>;
	  p->_data = e;
	  p->next = NULL;

	  struct qnode<ElemType> *t = _tail;
	  t->next = p;
	  _tail = p;
	}

	void push2(const ElemType &e) //考虑并发的无锁添加操作
	{
	   struct qnode<ElemType> *p = new qnode<ElemType>;
	   p->_data = e;
	   p->next = NULL;

	   struct qnode<ElemType> *t = _tail;
	   struct qnode<ElemType> *old_t = _tail;

	   int count = 0;
	   do{
	   	while(t->next != NULL)
			t = t->next;
		if(count++ >= 1){
			printf("push count: %d, t->next:%p\n", count, t->next);
		}
	   }while(!__sync_bool_compare_and_swap(&t->next, NULL, p));

	   __sync_bool_compare_and_swap(&_tail, &old_t, p);
	}

	bool pop(ElemType &e){  //从队列移除元素
	   struct qnode<ElemType> *p = _head;
	   struct qnode<ElemType> *np = _head->next;
	   if(!np){
	   	return false;
	   }

	   e = np->_data;
	   _head->next = np->next;
	   delete np;
	   return true;
	}

	bool pop2(ElemType &e){  //无锁并发移除队列
	   struct qnode<ElemType> *p = NULL;
	   struct qnode<ElemType> *np = NULL;
	   int count = 0;

	   do{
	   	p = _head;
		np = p->next;
	   if(p->next == NULL){
	   	return false;
	   }

	   if(count++ >= 1){
	   	printf("pop count:%d, p->next:%p\n", count, p->next);
	   	}
	  }while(__sync_bool_compare_and_swap(&head, p, p->next));

	  e = p->next->_data;
	  delete p;
	  return true;
	}

	~Queue(){ //析构函数
	  struct qnode<ElemType> *volatile, tmp;
	  while(_head){
	  	tmp = _head->next;
		printf("_head:%p\n", _head);
	    delete _head;
		_head = tmp;
	  }
	}

};

上述无锁队列的实现比较常见,使用链表维护元素,一个头指针,一个尾指针,分别用来取元素和移除元素。

主要的实现看点是 push2 和 pop2 的操作。

我们首先需要确定并发情况下,可能会有多个线程同时向这个队列中插入元素的可能,因此需要通过一个循环来对其进行插入和删除操作。

这里简单对 push2 进行分析

代码语言:c
复制
void push2(const ElemType &e) //考虑并发的无锁添加操作
{
	struct qnode<ElemType> *p = new qnode<ElemType>;  //分配一个新的节点
	p->_data = e;
	p->next = NULL;

	 struct qnode<ElemType> *t = _tail;  //记录尾节点
	struct qnode<ElemType> *old_t = _tail;

	int count = 0;
	do{
	while(t->next != NULL)  //该操作是为了允许多线程的操作,并发情况下多个节点共同操作,在上边逻辑获取的尾节点在该步操作中可能已经改变
		t = t->next;
	if(count++ >= 1){ //表示线程进行了多次操作
		printf("push count: %d, t->next:%p\n", count, t->next);
	}
	}while(!__sync_bool_compare_and_swap(&t->next, NULL, p)); //原子操作保证获取的必然是尾节点

	 __sync_bool_compare_and_swap(&_tail, &old_t, p); //将尾节点插入其中
}
// __sync_bool_compare_and_swap(&_tail, &old_t, p); 
//上述是原子操作,如果 _tail = old_t 那么将 p 赋值给 _tail

上述代码进行了说明,而 pop 的逻辑也基本类似,保证并发场景下取到的是头节点。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 无锁
  • 内联汇编
  • 无锁队列实现
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档