队列作为最常用的基础数据结构之一,相信大家都已经非常非常熟悉了,这里省略关于队列的介绍。在平时开发中队列的出现频率非常非常高,因此我们也会很关心队列的性能问题。当并发访问队列时,队列的性能往往受到同步手段的制约,最简单的方式是使用互斥锁对整个队列加锁,但其并发性能却惨不忍睹。
因此,有了各式各样的无锁队列实现,本文介绍其中的一种实现。还是老样子,实现基于x86体系结构,Linux环境。
初始化
和大部分无锁数据结构的实现一样,这个无锁队列的实现也是基于链表的,声明如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
#ifndef _QUEUE_H_ #define _QUEUE_H_ #define CACHE_ALIGN_SIZE 64 #define CACHE_ALIGNED __attribute__((aligned(CACHE_ALIGN_SIZE))) template <typename T> class Queue { public: Queue() : head_(NULL), tail_(NULL) { head_ = tail_ = new qnode(); head_->next = NULL; } virtual ~Queue() { T tmp; while (dequeue(tmp)) { } delete head_; } void enqueue(const T &data); bool dequeue(T &data); private: class qnode { public: T data; qnode *next; }; qnode * head_ CACHE_ALIGNED; qnode * tail_ CACHE_ALIGNED; }; #endif /* _QUEUE_H_ */ |
初始化后队列的头尾都指向一个dummy节点,并且头尾指针都按照cache line对齐,以避免产生false sharing问题。
进队(enqueue)
进队操作分三步:
- 创建新节点
- 将当前尾节点的 next指针指向新节点
- 修改尾指针指向新创建的节点
其中第二步和第三步在常规的队列实现中往往需要借助于锁来保证原子性,如果不使用锁会怎么样?
除了第一步是没有竞争的,第二步和第三步在并发执行的时候都会有竞争(都是对共享变量的read-modify-write操作),这要求我们需要使用原子操作来实现这两步。然而仅仅这样也还是不够的,这不能保证第二步和第三步的原子性,因此需要一些特殊的处理,看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
template <typename T> void Queue<T>::enqueue(const T &data) { qnode *node = new qnode(); node->data = data; node->next = NULL; qnode *t = NULL; qnode *next = NULL; while (true) { t = tail_; next = t->next; asm volatile("" ::: "memory"); if (tail_ != t) { continue; } if (next) { __sync_bool_compare_and_swap(&tail_, t, next); continue; } if (__sync_bool_compare_and_swap(&t->next, NULL, node)) { break; } } __sync_bool_compare_and_swap(&tail_, t, node); } |
注意代码17-20行,这段逻辑就是处理两个操作不原子的关键所在。当并发执行进队时,通过循环检查并向前推进尾指针来保证拿到最新的尾指针。
另一段有意思的代码是13-16行,这一行对变量 t进行检查,目的在于确认 t和 next的值是一致的,但是在这个特定的情景中我理解实际上可以删去而不影响正确性,因为后续的CAS操作同样会检查这一条件。
出队(dequeue)
出队操作相对入队要简单不少,只要把头指针向后移并拿出数据就可以了。但是在无锁并发的情况下仍有不少细节需要考虑,直接看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
template <typename T> bool Queue<T>::dequeue(T &data) { qnode *t = NULL; qnode *h = NULL; qnode *next = NULL; while (true) { h = head_; t = tail_; next = h->next; asm volatile("" ::: "memory"); if (head_ != h) { continue; } if (!next) { return false; } if (h == t) { __sync_bool_compare_and_swap(&tail_, t, next); continue; } data = next->data; if (__sync_bool_compare_and_swap(&head_, h, next)) { break; } } h->next = (qnode *)1; // bad address, It's a trap! /* delete h; */ return true; } |
需要解释的地方不多,只有一个比较有意思的地方,在代码的12-15行有一个检查,这个检查和之前提到的一样,是用来保证 h和 next变量是一致的。和进队过程中的那个不同的是,这段逻辑不是可有可无的,因为在23行对 next指针的访问发生在CAS操作之前,如果访问了已经出队的节点则会导致程序崩溃。
What’s next?
到这里看上去这个无锁队列已经完成了,并且可以正确的运行。但你一定会发现一个严重的问题:这个队列中的内存从来没有被释放所以在不断的泄露。
没错,这就是无锁数据结构普遍存在的问题,因为没有使用互斥的同步机制,所以很难找到一个安全的释放内存的时机。你可能会认为在完成出队后释放掉内存不就可以了么?但是由于没有使用同步机制,我们无法保证这个已经出队的节点没有被其他并发线程持有,如果此时释放内存,Boom…如果之后这个内存又被重用,那又可能会遇到著名的ABA问题。
那这个无锁队列岂不是没用了么?当然不是,只是我们需要一种与之配套的内存生命周期管理机制,比如Linux内核中广泛使用的RCU,又比如我之后会介绍的Hazard Pointer。
一点解释
你可能注意到了我在出队逻辑的29-30行写了两行略有些奇怪的代码,这里我来做一些解释。实际上这里本应该执行的操作是释放 h指向的节点占用的内存,但是我在论文1原本算法的基础上做了一点改动,删除了modify count的部分(一种常用的避免ABA问题的方法),这就导致如果真的执行了内存释放就会产生ABA问题。因此我不对内存进行释放,但是修改节点保存的内容为一个非法指针,让内存非法访问的问题暴露出来(比如你可以尝试去掉出队代码12-15行的检查,这一段真的非常精妙,值得细细品味)。
我之所以做这样的修改,并不是因为论文中提出的算法不好,恰恰相反,论文中的算法非常精妙,但通用性不足,我只是单纯的为了引出一种更为通用的内存生命周期管理机制:Hazard Pointer。
参考资料
- Michael M M, Scott M L. Simple, fast, and practical non-blocking and blocking concurrent queue algorithms[C]//Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing. ACM, 1996: 267-275. ↩