Files
linux-insides-zh/SyncPrim/linux-sync-4.md
2024-08-13 19:42:31 +08:00

27 KiB
Raw Permalink Blame History

内核同步原语. 第四部分.

引言

这是本章的第四部分 chapter ,本章描述了内核中的同步原语,且在之前的部分我们介绍完了 自旋锁信号量 两种不同的同步原语。我们将在这个章节持续学习 同步原语,并考虑另一简称 互斥锁 (mutex) 的同步原语,全名 MUTual EXclusion

本书 前面所有章节一样我们将先尝试从理论面来探究此同步原语在此基础上再探究Linux内核所提供用来操作 互斥锁API

让我们开始吧。

互斥锁 的概念

从前一 部分,我们已经很熟悉同步原语 信号量。其表示为:

struct semaphore {
	raw_spinlock_t		lock;
	unsigned int		count;
	struct list_head	wait_list;
};

此结构体包含 的状态和一由等待者所构成的列表。根据 count 字段的值,semaphore 可以向希望访问该资源的多个进程提供对该资源的访问。互斥锁 的概念与 信号量 相似,却有着一些差异。信号量互斥锁 同步原语的主要差异在于 互斥锁 具有更严格的语义。不像 信号量,单一时间一 互斥锁 只能由一个 进程 持有,并且只有该 互斥锁持有者 能够对其进行释放或解锁的动作。另外的差异是 API 实现,信号量 同步原语强置重新调度 (重新调度) 在等待列表中的进程。互斥锁 API 的实现则允许避免这种状况,进而减少这种昂贵的 上下文切换 操作。

互斥锁 同步原语由如下结构呈现在Linux内核中

struct mutex {
        atomic_t                count;
        spinlock_t              wait_lock;
        struct list_head        wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
        struct task_struct      *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        struct optimistic_spin_queue osq;
#endif
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map      dep_map;
#endif
};

此结构被定义在 include/linux/mutex.h 头文件并且包含着与 信号量 结构体类似的字段。互斥锁 结构的第一个字段是 - count。这个字段的值代表着该 互斥锁 的状态。在 count 字段为 1 的情况下,说明该 互斥锁 处于 无锁 状态。当 count 字段值处于 ,表示该 互斥锁 处于 上锁 状态。此外,count 字段的值可能是 的,在这种情况下表该 互斥锁 处于 上锁 状态且可能有其他的等待者。

互斥锁 结构的下面两个字段 - wait_lockwait_list 分别是用来保护 等待队列自旋锁,以及由某个锁的等待者们所构成的 等待队列 列表。你可能注意到了,互斥锁信号量 结构体的相似处到此结束。剩余的 互斥锁 结构体字段则如同我们所见取决于Linux内核的不同配置选项。

第一个字段 - owner 代表持有该锁的 进程。正如我们看到的,此字段是否存在于 mutex 结构体中取决于 CONFIG_DEBUG_MUTEXESCONFIG_MUTEX_SPIN_ON_OWNER 的内核配置选项。这个字段和下个 osq 字段的主要用途在于支援我们之后会介绍的乐观自旋 (optimistic spinning) 功能。最后两个字段 - magicdep_map 仅被用于 调试 模式。magic 字段用来存储跟 互斥锁 用来调试的相关资讯,而第二个字段 - lockdep_map 则在Linux内核的 锁验证器 (lock validator) 中被使用。

我们已探究完 互斥锁 的结构现在我们可以开始思考此同步原语是如何在Linux内核中运作的。你可能会猜一个想获取锁的进程在允许的情况下就直接对 mutex->count 字段做递减的操作来试图获取锁。而如果进程希望释放一个锁则递增该字段的值即可。大致上没错但也正如你可能猜到的那样在Linux内核中这可能不会那么简单。

事实上,当某进程试图获取 互斥锁 时,三条可能路径的选择策略要根据当前 mutex 的状态而定。

  • fastpath;
  • midpath;
  • slowpath.

第一条路径或 fastpath 是最快的,正如你从名称中可以领会到的那样。这种情况下的所有事情都很简单。因 互斥锁 还没被任何人获取,其 mutex 结构中的count字段可以直接被进行递减操作。在释放该 互斥锁 的情况下,算法是类似的,进程对该 互斥锁 结构中的 count 字段进行递增的操作即可。当然,所有的这些操作都必须是 原子 的。

是的,这看起来很简单。但如果一个进程想要获取一个已经被其他进程持有的 互斥锁 时,会发生什么事?在这种情况下,控制流程将被转由第二条路径决定 - midpathmidpath乐观自旋 会在锁的持有者仍在运行时,对我们已经熟悉的 MCS lock 进行 循环 操作。这条路径只有在没有其他更高优先级的进程准备运行时执行。这条路径被称作 乐观 是因为等待的进程不会被睡眠或是调度。这可以避免掉昂贵的 上下文切换.

在最后一种情况,当 fastpathmidpath 都不被执行时,就会执行最后一条路径 - slowpath。这条路径的行为跟 信号量 的锁操作相似。如果该锁无法被进程获取,那么此进程将被以如下的结构表示加入 等待队列

struct mutex_waiter {
        struct list_head        list;
        struct task_struct      *task;
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
};

此结构定义在include/linux/mutex.h 头文件且使用上可能会被睡眠。在研究Linux内核提供用来操作 互斥锁API 前,让我们先研究 mutex_waiter 结构。如果你有阅读此章节的 前一部分,你可以能会注意到 mutex_waiter 结构与 kernel/locking/semaphore.c 源代码中的 semaphore_waiter 结构相似:

struct semaphore_waiter {
        struct list_head list;
        struct task_struct *task;
        bool up;
};

它也包含 listtask 字段,用来表示该互斥锁的等待队列。这之间有一个差异是 mutex_waiter 没有 up 字段,但却有一个可以根据内核设定CONFIG_DEBUG_MUTEXES 选项而存在的 magic 字段,它能用来存储一些在调试互斥锁 问题时有用的资讯。

现在我们知道了什么是 互斥锁 以及他是如何在Linux内核中呈现的。在这种情况下我们可以开始一窥Linux内核所提供用来操作 互斥锁API

互斥锁 API

至此,在前面的章节我们已经了解什么是 互斥锁 原语而且看过了Linux内核中用来呈现 互斥锁互斥锁 结构。现在是时后开始研究用来操弄互斥锁的 API 了。详细的 互斥锁 API 被记录在 include/linux/mutex.h 头文件。一如既往,在考虑如何获取和释放一 互斥锁 之前,我们需要知道如何初始化它。

初始化 互斥锁 有两种方法第一种是静态初始化。为此Linux内核提供以下宏

#define DEFINE_MUTEX(mutexname) \
        struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

让我们来研究此宏的实现。正如我们所见,DEFINE_MUTEX 宏接受新定义的 互斥锁 名称,并将其扩展成一个新的 互斥锁 解构。此外新 mutex 结构由 __MUTEX_INITIALIZER 宏初始化。让我们看看 __MUTEX_INITIALIZER的实现:

#define __MUTEX_INITIALIZER(lockname)         \
{                                                             \
       .count = ATOMIC_INIT(1),                               \
       .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock), \
       .wait_list = LIST_HEAD_INIT(lockname.wait_list)        \
}

这个宏被定义在 相同的 头文件,并且我们可以了解到它初始化了 mutex 解构体中的字段。count 被初始化为 1 ,这代表该互斥锁状态为 无锁wait_lock 自旋锁 被初始化为无锁状态,而最后的栏位 wait_list 被初始化为空的 双向列表

第二种做法允许我们动态初始化一个 互斥锁。为此我们需要调用在 kernel/locking/mutex.c 源码文件的 __mutex_init 函数。事实上,大家很少直接调用 __mutex_init 函数。取而代之,我们使用下面的 mutex_init :

# define mutex_init(mutex)                \
do {                                                    \
        static struct lock_class_key __key;             \
                                                        \
        __mutex_init((mutex), #mutex, &__key);          \
} while (0)

我们可以看到 mutex_init 宏定义了 lock_class_key 并且调用 __mutex_init 函数。让我们看看这个函数的实现:

void
__mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
        atomic_set(&lock->count, 1);
        spin_lock_init(&lock->wait_lock);
        INIT_LIST_HEAD(&lock->wait_list);
        mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        osq_lock_init(&lock->osq);
#endif
        debug_mutex_init(lock, name, key);
}

如我们所见,__mutex_init 函数接收三个参数:

  • lock - 互斥锁本身;
  • name - 调试用的互斥锁名称;
  • key - 锁验证器用的key.

__mutex_init 函数的一开始,我们可以看到 互斥锁 状态被初始化。我们通过 atomic_set 函数原子地将给定的变量赋予给定的值来将 互斥锁 的状态设为 无锁。之后我们可以看到 自旋锁 被初始化为解锁状态,这将保护 互斥锁等待队列,之后始化 互斥锁等待队列。 之后我们清除该 的持有者并且通过调用 include/linux/osq_lock.h 头文件中的 osq_lock_init 函数来初始化乐观队列(optimistic queue)。 而此函数也只是将乐观队列的的tail设定为无锁状态:

static inline bool osq_is_locked(struct optimistic_spin_queue *lock)
{
        return atomic_read(&lock->tail) != OSQ_UNLOCKED_VAL;
}

__mutex_init 函数的结尾阶段,我们可以看到它调用了 debug_mutex_init 函数,不过就如同我在此 章节 前面提到的那样,我们并不会在此章节讨论调试相关的内容。

互斥锁 结构被初始化后,我们可以继续研究 互斥锁 同步原语的 上锁解锁 API。mutex_lockmutex_unlock 函数的实现位于 kernel/locking/mutex.c 源码文件。首先,让我们从 mutex_lock 的实现开始吧。代码如下:

void __sched mutex_lock(struct mutex *lock)
{
        might_sleep();
        __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
        mutex_set_owner(lock);
}

我们可以从 include/linux/kernel.h 头文件的 mutex_lock 函数开头看到 might_sleep 宏被调用。该宏的实现取决于 CONFIG_DEBUG_ATOMIC_SLEEP 内核配置选项,如果这个选项被启用,该宏会在 原子 上下文中执行时打印栈追踪。此宏为调试用途上的帮手,除此之外,这个宏什么都没做。

might_sleep 宏之后,我们可以看到 __mutex_fastpath_lock 函数被调用。此函数是体系结构相关的,并且因为我们此书探讨的是 x86_64 体系结构, \ __mutex_fastpath_lock 的实现部分位于 arch/x86/include/asm/mutex_64.h 头文件。正如我们从__mutex_fastpath_lock 函数名称可以理解的那样,此函数将尝试通过 fast path 试图获取一个锁,或者换句话说此函数试图递减一个给定互斥锁的 count 字段。

__mutex_fastpath_lock 函数的实现由两部分组成。第一部分是 内联汇编 。让我们看看:

asm_volatile_goto(LOCK_PREFIX "   decl %0\n"
                              "   jns %l[exit]\n"
                              : : "m" (v->counter)
                              : "memory", "cc"
                              : exit);

首先,让我们将注意力放在 asm_volatile_goto。这个宏被定义在 include/linux/compiler-gcc.h 头文件,并且它只是扩展成两个内联汇编:

#define asm_volatile_goto(x...) do { asm goto(x); asm (""); } while (0)

第一个汇编包含 goto 特性,而第二个空的内联汇编是 屏障 (barrier)。现在回去看看我们的内联汇编。正如我们所看到的,它从被 LOCK前缀 的宏定义开始,该宏仅是扩展 lock 指令:

#define LOCK_PREFIX LOCK_PREFIX_HERE "\n\tlock; "

正如我们从前面部分已经知道的那样,该指令允许 原子地 执行被它前缀修饰的指令 。所以,在我们的汇编语句的第一步中,我们尝试将给定的 mutex->counter 的字段递减。再下一步,如 mutex->counter 在被递减完后的值为非负数,那么 jns 指令将会执行跳转到 exit 标签所在处。exit 标签是 __mutex_fastpath_lock 函数的第二部分,而它仅仅是指到该函数的出口:

exit:
        return;

就目前为止, __mutex_fastpath_lock 函数的实现看起来还挺简单。但 mutex->counter 的字段有可能在递减后是负的,在这种情况下:

fail_fn(v);

将在内联汇编后被调用。fail_fn__mutex_fastpath_lock 函数的第二个参数,其为一指针指到用来获取给定锁的 midpath/slowpath 路径函数。我们的这个例子中fail_fn__mutex_lock_slowpath 函数。在我们一窥 __mutex_lock_slowpath 函数的实现前,我们先把 mutex_lock 函数的实现看完。在最简单的情况下,锁会被某进程通过执行 __mutex_fastpath_lock 路径后成功被获取。这种情况下,我们仅在 mutex_lock 结尾调用即可。

mutex_set_owner(lock);

mutex_set_owner 函数被定义在 kernel/locking/mutex.h 头文件,这将一个锁的持有者设为当前进程:

static inline void mutex_set_owner(struct mutex *lock)
{
        lock->owner = current;
}

另外一种情况,让我们研究一进程因为锁已被其它进程持有,而无法顺利获得的情况,我们已经知道在这种情境下 __mutex_lock_slowpath 函数会被调用,让我们开始研究这个函数。此函数被定义在 kernel/locking/mutex.c 源码文件,并且这个函数由 container_of 宏开头,意图通过 __mutex_fastpath_lock 给的互斥锁状态变量来获得互斥锁本身:

__visible void __sched
__mutex_lock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
                            NULL, _RET_IP_, NULL, 0);
}

之后将获得的 互斥锁 带入函数 __mutex_lock_common 进行调用。 __mutex_lock_common 函数一开始会关闭 抢占 直到下次的重新调度:

preempt_disable();

之后我们进到乐观自旋阶段。如同我们知道的,这个阶段取决于CONFIG_MUTEX_SPIN_ON_OWNER 内核配置选项。如该选项被禁用,我们就跳过这个阶段并迈入最后一条路径 - 获取 互斥锁slowpath

if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
        preempt_enable();
        return 0;
}

首先 mutex_optimistic_spin 函数检查我们需不需要被重新调度,或者换句话说,没有其他优先级更高的任务可以运行。 如果这个检查成立我们就将当前 循环者(spinner) 更新至 MCS 锁的等待队列。这种情况下互斥锁的获取在某个时间只会由一个循环者获得:

osq_lock(&lock->osq)

之后下个步奏,我们在下面的迭代中循环:

while (true) {
    owner = READ_ONCE(lock->owner);

    if (owner && !mutex_spin_on_owner(lock, owner))
        break;

    if (mutex_try_to_acquire(lock)) {
        lock_acquired(&lock->dep_map, ip);

        mutex_set_owner(lock);
        osq_unlock(&lock->osq);
        return true;
    }
}

并试图获取该锁。首先我们尝试获取该锁的持有者资讯,如果持有者存在 (在进程已释放互斥锁的情况下可能不存在),我们就在持有者释放锁前于 mutex_spin_on_owner 函数中等待。如果在等待锁持有者的过程中遭遇了高优先级任務,我们就离开循环进入睡眠。在另外一种情况下,该锁被进程释放,那我们就试图通过 mutex_try_to_acquired 来获取该锁。如果这个操作顺利完成,我们就为该互斥锁设定新的持有者,将我们自身从MCS 等待队列中移除,并从 mutex_optimistic_spin 函数中离开。至此锁就被一进程获取完成,我们接着开启 抢占 并从 __mutex_lock_common 函数中离开:

if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
    preempt_enable();
    return 0;
}

以上就是这种情况的全部了。

在其他情况下,一切可能都不那么顺利。例如,在我们于 mutex_optimistic_spin 迭代中循环的同时可能有新任务出现,更甚我们可能在进入 mutex_optimistic_spin循环前,系统就存在着更高优先级别的任务了。又或者 CONFIG_MUTEX_SPIN_ON_OWNER 内核选项根本是禁用的,在这种情况下 mutex_optimistic_spin 什么都不会做:

#ifndef CONFIG_MUTEX_SPIN_ON_OWNER
static bool mutex_optimistic_spin(struct mutex *lock,
                                  struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
    return false;
}
#endif

在所有这种情况下,__mutex_lock_common 函数的行为就像 信号量。我们就再次尝试获取锁,因为锁的持有者在此之前可能已经将它释放:

if (!mutex_is_locked(lock) &&
   (atomic_xchg_acquire(&lock->count, 0) == 1))
      goto skip_wait;

在失败的情况下,希望获取锁的进程将被添加到等待者列表中

list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;

在成功的情况下,我们更新该锁的持有者、允许抢占,并从__mutex_lock_common 函数离开:

skip_wait:
        mutex_set_owner(lock);
        preempt_enable();
        return 0;

在这个情况下锁将被获取。如果到这边都无法获取锁,我们将进入下面的迭代中:

for (;;) {

    if (atomic_read(&lock->count) >= 0 && (atomic_xchg_acquire(&lock->count, -1) == 1))
        break;

    if (unlikely(signal_pending_state(state, task))) {
        ret = -EINTR;
        goto err;
    }

    __set_task_state(task, state);

     schedule_preempt_disabled();
}

这边我们再尝试获取一次锁,如果成功的话就离开。是的,我们会在循环前的那次尝试失败后立再一次的尝试获取锁。我们需要这样做,以确保一旦锁在之后被解锁时,我们会被唤醒。 除此之外它还允许我们在睡眠后获得锁。在另一种情况下我们检查当前进程是否有pending 信号,如果进程在等待锁获取期间被 信号 中断,则退出。在迭代的结尾因为我们未能成功获取锁,所以我们将任务的状态设为 TASK_UNINTERRUPTIBLE 并通过调用 schedule_preempt_disabled 函数去睡眠。

这就是全部,我们已经考虑了进程获取锁时可能通过的所有三条路径。现在让我们来研究下 mutex_unlock 是如何被实现的。将被一个希望释放锁的进程调用 mutex_unlock __mutex_fastpath_unlock 也将被从 arch/x86/include/asm/mutex_64.h 头文件中调用:

void __sched mutex_unlock(struct mutex *lock)
{
    __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}

__mutex_fastpath_unlock 函数的实现与__mutex_fastpath_lock 函数非常相似:

static inline void __mutex_fastpath_unlock(atomic_t *v,
                                           void (*fail_fn)(atomic_t *))
{
       asm_volatile_goto(LOCK_PREFIX "   incl %0\n"
                         "   jg %l[exit]\n"
                         : : "m" (v->counter)
                         : "memory", "cc"
                         : exit);
       fail_fn(v);
exit:
       return;
}

事实上, 其中只有一个区别,我们递增 mutex->count 字段。这个操作过后锁将呈现 无锁 状态。随着 互斥锁 释放,如等待队列有条目的话我们必须将其更新。在这种情况下,fail_fn 函数会被调用,也就是 __mutex_unlock_slowpath__mutex_unlock_slowpath 函数仅是从给定的mutex->count 来获取 mutex 实例,并调用 __mutex_unlock_common_slowpath 函数:

__mutex_unlock_slowpath(atomic_t *lock_count)
{
      struct mutex *lock = container_of(lock_count, struct mutex, count);

      __mutex_unlock_common_slowpath(lock, 1);
}

__mutex_unlock_common_slowpath 函数中,如果等待队列非空,我们将从中获取第一个条目,并唤醒相关的进程:

if (!list_empty(&lock->wait_list)) {
    struct mutex_waiter *waiter =
           list_entry(lock->wait_list.next, struct mutex_waiter, list);
                wake_up_process(waiter->task);
}

在此之后,前一个进程将释放互斥锁,并由另一个在等待队列中的进程获取。

这就是全部了. 我们已经研究完两个主要用来操作 互斥锁 的 API mutex_lockmutex_unlock。除此之外Linux内核还提供以下 API

  • mutex_lock_interruptible;
  • mutex_lock_killable;
  • mutex_trylock.

以及对应相同前缀的 unlock 函数. 我们就不在此解释这些 API 了, 因为他们跟 信号量 所提供的 API 类似。你可以通过阅读 前一部分 来了解更多。

总结

至此我们结束了Linux内核 同步原语 章节的第四部分。在这部分我们见到了新的同步原语 - 互斥锁。就理论上来说,此同步原语与 信号量 非常相似。事实上,互斥锁 代表着二进制信号量。但它的实现与Linux内核中 信号量 实现并不同。在下一部分中我们将继续深入研究Linux内核中的同步原语。

如果你有问题或者建议请在twitter 0xAX上联系我,通过 email 联系我,或者创建一个issue

Please note that English is not my first language and I am really sorry for any inconvenience. If you found any mistakes please send me PR to linux-insides.

链接