Linux内核同步

Linux内核中有很多同步机制,这篇文章主要总结一下在《Linux Kernel Development》看到的部分内核同步机制,依旧是备忘。
内核同步机制和用户空间的同步机制并不是一一对应的,但是基本的思想都是相同的:保护临界区,只是内核同步机制更适合于在解决内核中的同步问题。先思考下自己的Nanos内核中使用了什么同步机制?Nanos中使用了关中断和信号量机制。
Nanos中的信号量主要用来实现消息传递机制;lock()方法封装了基本的关中断操作,即通过关闭CPU中断(设置IF位)使得不会有线程切换发生,也就保护了临界区。但这在支持多核环境的内核中明显是不适用的,因为每个CPU都有自己的控制寄存器(eflags),关中断仅能保证当前CPU不会发生线程切换,而不能保证其他CPU上运行的线程不会进入临界区,因此,在Linux的SMP环境中需要更多粒度不同、开销不同的同步手段。
注:文中引用的Linux内核代码版本为2.6.32.63
原子操作(Atomic Operations)
原子操作是所有内核同步机制中粒度最细,开销最小的一种。也是其他同步机制的基础,例如如果没有原子操作,则无法保证自旋锁获取操作的原子性,自旋锁也就无法工作了。
首先要理解为什么要保证单独一个操作的原子性?难道一条语句还能再分么?这里有一个误区,虽然单条语句在高级语言层次上看上去是原子的,但是实际上我们所使用的变量都存放在内存中,对变量的单条操作实际上涉及到读取内存到寄存器、修改寄存器、写会内存中三个步骤,在这些机器指令之间是有可能发生线程切换的。
Linux中原子操作的操作对象的定义如下(<linux/types.h>):
typedef struct {
volatile int counter;
} atomic_t;
一个实际使用的例子:
atomic_t v;
atomic_t u = ATOMIC_INIT(0);
atomic_set(&v, 4);
atomic_add(2, &v);
atomic_inc(&v);
实际上原子操作的实现并不复杂,但大部分的原子操作依赖于硬件指令集,所以不同的平台都有不同的实现,以x86平台中的整数原子操作为例,实现在<asm/atomic_32.h>
/**
* atomic_read - read atomic variable
* @v: pointer of type atomic_t
*
* Atomically reads the value of @v.
*/
static inline int atomic_read(const atomic_t *v)
{
return v->counter;
}
/**
* atomic_set - set atomic variable
* @v: pointer of type atomic_t
* @i: required value
*
* Atomically sets the value of @v to @i.
*/
static inline void atomic_set(atomic_t *v, int i)
{
v->counter = i;
}
/**
* atomic_add - add integer to atomic variable
* @i: integer value to add
* @v: pointer of type atomic_t
*
* Atomically adds @i to @v.
*/
static inline void atomic_add(int i, atomic_t *v)
{
asm volatile(LOCK_PREFIX "addl %1,%0"
: "+m" (v->counter)
: "ir" (i));
}
/**
* atomic_sub - subtract integer from atomic variable
* @i: integer value to subtract
* @v: pointer of type atomic_t
*
* Atomically subtracts @i from @v.
*/
static inline void atomic_sub(int i, atomic_t *v)
{
asm volatile(LOCK_PREFIX "subl %1,%0"
: "+m" (v->counter)
: "ir" (i));
}
/**
* atomic_sub_and_test - subtract value from variable and test result
* @i: integer value to subtract
* @v: pointer of type atomic_t
*
* Atomically subtracts @i from @v and returns
* true if the result is zero, or false for all
* other cases.
*/
static inline int atomic_sub_and_test(int i, atomic_t *v)
{
unsigned char c;
asm volatile(LOCK_PREFIX "subl %2,%0; sete %1"
: "+m" (v->counter), "=qm" (c)
: "ir" (i) : "memory");
return c;
}
/**
* atomic_inc - increment atomic variable
* @v: pointer of type atomic_t
*
* Atomically increments @v by 1.
*/
static inline void atomic_inc(atomic_t *v)
{
asm volatile(LOCK_PREFIX "incl %0"
: "+m" (v->counter));
}
自旋锁(Spin Locks)
自旋锁即忙等待,当发现无法获取锁时,内核执行路径不断循环检查锁直到锁被释放。
# <linux/spinlock_types.h>
typedef struct {
raw_spinlock_t raw_lock;
} spinlock_t;
# <asm/spinlock_types.h>
typedef struct raw_spinlock {
unsigned int slock;
} raw_spinlock_t;
锁结构的定义分为两部分,第一部分为实际使用的结构,定义在<linux/spinlock_types.h>中,另一部分为体系结构相关的具体实现,定义在<asm/spinlock_types.h>中,来个例子:
DEFINE_SPINLOCK(mr_lock);
spin_lock(&mr_lock);
/* critical region ... */
spin_unlock(&mr_lock);
除了简单的lock和unlock,内核还提供了其他的自旋锁操作,比如下面两个:
spin_lock_irqsave()
spin_unlock_irqrestore()
这两个方法除了自旋之外,还附加了关/开中断的操作:当lock时记录当前IF位状态并关闭中断,unlock时恢复IF位。(尼玛好像Nanos中的lock、unlock有木有)
x86的实现中slock字段标记该锁是否已被获取,如果已被获取为1,反之为0。 说到自旋锁就不得不提到互斥锁,其在用户空间的使用分析在之前的文章中已经有了分析,在内核空间中,自旋锁和互斥锁以及信号量有一个更为重要的不同:由于自旋锁不会导致阻塞(sleep),因此自旋锁是可以用在中断处理路径中的,而互斥锁和信号量都不行(中断上下文不允许sleep)。
读写自旋锁(Reader-Writer Spin Locks)
读写自旋锁和自旋锁一样以忙等待方式工作。但是读写自旋锁针对读写者问题提供更好的并发性:读写自旋锁允许读者并发执行,而对写者互斥,且保证在写者占有锁的情况下不会有读者进入临界区。
# <linux/spinlock_types.h>
typedef struct {
raw_rwlock_t raw_lock;
} rwlock_t;
# <asm/spinlock_types.h>
typedef struct {
unsigned int lock;
} raw_rwlock_t;
锁结构的定义和自旋锁非常类似,不过lock字段的含义并不仅仅是锁的状态了。lock字段分为两个部分:
- 0-23位:计数器,临界区内读者的数量
- 24位:锁标记位,当有写者进入临界区时置1,离开时置为0
通过计数器和标记的组合,Linux可以很方便的实现读写者问题。但是有额外的一点要注意:读写者自旋锁是读者亲和(favor)的,也就是说只要有读者不断的获取锁,写者只能一直保持在忙等待状态,有些时候这可能不是我们需要的行为,在Cocoa中有一个写者亲和的读写锁实现,之后有机会具体分析一下。
差点忘了例子:
DEFINE_RWLOCK(&mr_rwlock);
# readers
read_lock(&mr_rwlock);
/* critical section (read only) ... */
read_unlock(&mr_rwlock);
# writers
write_lock(&mr_rwlock);
/* critical section (read and write) ... */
write_unlock(&mr_rwlock);
信号量(Semaphores)
信号量的概念在操作系统中课程中非常重要,还记得当时用信号量来解决各种同步问题…在Nanos中也实现了信号量的基本PV操作,看看Linux是如何定义Semaphore的:
/* Please don't access any members of this structure directly */
struct semaphore {
spinlock_t lock;
unsigned int count;
struct list_head wait_list;
};
结构体包含三个变量,count和wati_list都和Nanos中的实现类似:一个计数器以及在该信号量上挂起的进程task_struct队列,不同的是Linux内核中还增加了自旋锁来保护这个条件变量。另外,Linux对task_struct队列进行了封装,包装进了semaphore_waiter结构中:
struct semaphore_waiter {
struct list_head list;
struct task_struct *task;
int up;
};
down和up方法中包含了很多进程调度方面的细节和内核中的黑魔法:
/**
* up - release the semaphore
* @sem: the semaphore to release
*
* Release the semaphore. Unlike mutexes, up() may be called from any
* context and even by tasks which have never called down().
*/
void up(struct semaphore *sem)
{
unsigned long flags;
spin_lock_irqsave(&sem->lock, flags);
if (likely(list_empty(&sem->wait_list)))
sem->count++;
else
__up(sem);
spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __up(struct semaphore *sem)
{
struct semaphore_waiter *waiter = list_first_entry(&sem->wait_list,
struct semaphore_waiter, list);
list_del(&waiter->list);
waiter->up = 1;
wake_up_process(waiter->task);
}
/**
* down - acquire the semaphore
* @sem: the semaphore to be acquired
*
* Acquires the semaphore. If no more tasks are allowed to acquire the
* semaphore, calling this function will put the task to sleep until the
* semaphore is released.
*
* Use of this function is deprecated, please use down_interruptible() or
* down_killable() instead.
*/
void down(struct semaphore *sem)
{
unsigned long flags;
spin_lock_irqsave(&sem->lock, flags);
if (likely(sem->count > 0))
sem->count--;
else
__down(sem);
spin_unlock_irqrestore(&sem->lock, flags);
}
static noinline void __sched __down(struct semaphore *sem)
{
__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
/*
* Because this function is inlined, the 'state' parameter will be
* constant, and thus optimised away by the compiler. Likewise the
* 'timeout' parameter for the cases without timeouts.
*/
static inline int __sched __down_common(struct semaphore *sem, long state,
long timeout)
{
struct task_struct *task = current;
struct semaphore_waiter waiter;
list_add_tail(&waiter.list, &sem->wait_list);
waiter.task = task;
waiter.up = 0;
for (;;) {
if (signal_pending_state(state, task))
goto interrupted;
if (timeout <= 0)
goto timed_out;
__set_task_state(task, state);
spin_unlock_irq(&sem->lock);
timeout = schedule_timeout(timeout);
spin_lock_irq(&sem->lock);
if (waiter.up)
return 0;
}
timed_out:
list_del(&waiter.list);
return -ETIME;
interrupted:
list_del(&waiter.list);
return -EINTR;
}
likely是内核中非常常见的黑魔法,用来对条件判断语句进行编译优化的,简单说就是告诉编译器这个条件很可能为真,还有一个对应的宏unlike,你懂的。除此以外代码相当直观,就不多解释了。
读写信号量(Reader-Writer Semaphores)
读写信号量是信号量的好兄弟,和读写自旋锁一样,是为解决读写者问题编写的特化版本。
/*
* the semaphore definition
*/
struct rw_semaphore {
signed long count;
spinlock_t wait_lock;
struct list_head wait_list;
};
结构体的定义非常相似,不过wait_list队列中存放的结构变成了rwsem_waiter:
struct rwsem_waiter {
struct list_head list;
struct task_struct *task;
unsigned int flags;
#define RWSEM_WAITING_FOR_READ 0x00000001
#define RWSEM_WAITING_FOR_WRITE 0x00000002
};
当信号量已经被写者占用时,新来的读者和写者被添加到wait_list队列中,并标明是读事件还是写事件。当信号量被释放时,按顺序唤醒队列首部的所有事件,直到唤醒一个写事件。
来个例子:
static DECLARE_RWSEM(mr_rwsem);
# readers
down_read(&mr_rwsem);
/* critical section (read only) ... */
up_read(&mr_rwsem);
# writers
down_write(&mr_rwsem);
/* critical section (read and write) ... */
up_write(&mr_rwsem);
互斥锁(Mutexes)
互斥锁即”sleeping lock”,和用户空间的互斥锁一样,会导致获取锁失败的进程阻塞。因此如上文所分析的,互斥锁不能够用在中断上下文中。
struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
};
/*
* This is the control structure for tasks blocked on mutex,
* which resides on the blocked task's kernel stack:
*/
struct mutex_waiter {
struct list_head list;
struct task_struct *task;
#endif
};
可以看到自旋锁的定义和信号量的定义多么相似,只是count的作用变为了记录锁状态。
大内核锁(The Big Kernel Lock)
大内核锁可以说是Linux版本过渡时期的产物(2.0-2.2),为了适应SMP的变更,大内核锁提供了一种内核中全局的自旋锁机制,但是有一些有趣的特性:
- 持有大内核锁的进程可以sleep,这和普通的自旋锁很不相同。这是因为大内核的实现在持有大内核锁的进程阻塞时会自动丢弃大内核锁,而在恢复运行时又获得大内核锁
- 大内核锁是递归锁
- 大内核锁不能在中断上下文中使用
- 不要再使用新的大内核锁。如前文提到的,大内核锁是Linux版本过渡时期的产物,随着Linux的发展,大内核锁必然会越来越少,取而代之的是其他更加优化、粒度更为合适的锁
屏障(Barriers)
屏障是一种特殊的同步机制,和其他几种同步机制不同,屏障所保护的不是程序中存在的数据临界区,而是程序没有按照我们想象的顺序执行所产生的同步问题。
那么为什么程序会不按照我们想象的顺序执行呢?
- 编译器:在编译时期,编译器为了提高执行效率会对程序中没有明显关联的(至少在编译器眼中是没有关联的)语句进行重排以提高性能,这一过程是静态的,生成固定的、重新排序的机器指令
- CPU:在运行时处理器会动态的对机器指令的执行进行重排(为了高效的利用流水线)
知道了原因,解决方法也就简单了:明确告诉编译器或者处理器:“嘿!这个地方不是你想的那样的!不要乱动我的代码!”。
注意:由于屏障操作严重依赖于使用的编译器和运行的处理器体系结构,下面的例子中选取gcc编译器和x86体系结构的处理器为例。
对于编译器重排,Linux内核提供了barrier()方法:
/* Optimization barrier */
/* The "volatile" is due to gcc bugs */
#define barrier() __asm__ __volatile__("": : :"memory")
gcc编译器在遇到该内嵌汇编语句时将其作为一条屏障,重排序内存操作:即此语句之前的各种编译优化将不会持续到此语句之后。这样的操作作用于编译时期,对机器指令的执行无任何负担。
而对于处理器造成的乱序,则需要使用体系结构提供的机器指令直接阻止机器对指令进行重排:
/*
* Force strict CPU ordering.
* And yes, this is required on UP too when we're talking
* to devices.
*/
#ifdef CONFIG_X86_32
/*
* Some non-Intel clones support out of order store. wmb() ceases to be a
* nop for these.
*/
#define mb() alternative("lock; addl $0,0(%%esp)", "mfence", X86_FEATURE_XMM2)
#define rmb() alternative("lock; addl $0,0(%%esp)", "lfence", X86_FEATURE_XMM2)
#define wmb() alternative("lock; addl $0,0(%%esp)", "sfence", X86_FEATURE_XMM)
#else
#define mb() asm volatile("mfence":::"memory")
#define rmb() asm volatile("lfence":::"memory")
#define wmb() asm volatile("sfence" ::: "memory")
#endif
lfence、sfence和mfence都是x86体系结构的指令。sfence是store fence,对其前后的store指令起屏障作用;lfence是load fence,对其前后的load指令其屏障作用,而mfence是两者的结合,
这一部分的更多内容参见这里。
其他
除了上述提到的内核同步工具,Linux内核中还提供了其他的几种内核同步机制,如Completion Variables、顺序锁(Sequential locks)等,鉴于作者能力有限,仅对感兴趣的几种同步方式做了探索,你可以在各种Linux图书和源码中探索其他同步方式。
参考资料
- 《Linux Kernel Development(Third Edition)》
- 《深入理解Linux内核》
Comments