Files
linux-insides-zh/interrupts/interrupts-9.md
Yangjing Zhang f528fd0e90 add more
2016-05-10 10:23:43 +08:00

23 KiB
Raw Blame History

中断和中断处理。 第九部分。

延后中断(软中断Tasklets和工作队列)介绍

这是linux内核揭密中断部分的第九小节,在之前章节我们了解了源文件arch/x86/kernel/irqinit.cinit_IRQ的实现。接下来的这一节我们将继续深入学习和外部硬件中断相关的初始化。

init/main.c中我们可以看到在init_IRQ函数后面调用了softirq_init函数。这个函数在源文件kernel/softirq.c中定义,从名字我们可以看出,它的作用是初始化软中断或者也可以说是初始化延后中断。那么什么是延后中断?在讲解内核初始化过程的部分第九小结我们已经对他有了一些了解Linux内核中一共有三种'延后中断'

  • 软中断;
  • tasklets;
  • 工作队列;

在这一小节我们将详细介绍这三种实现。就像我说的,我们对这个主题有一些了解。那么,现在是时间深入了解一下了。

延后中断

对中断处理有一些严格的要求,总的来说有两种:

  • 中断处理必须快速执行完毕
  • 有时中断处理必须做很多冗长的事情

就像你所想到的,我们几乎不可能同时做到这两点,之前的中断被分为两部分:

  • 前半部
  • 后半部

后半部曾经是Linux内核延后中断执行的一种方式但现在的实际情况已经不是这样了。这种遗留称谓现在作为名词代表所有延后中断执行的方式。伴随着内核对并行处理的支持出于性能考虑所有新的下半部实现方案都基于被称之为ksoftirqd(稍后将详细讨论)的内核线程。ksoftirqd中断处理方式几乎和硬件中断处理一样重要。中断延后处理会在系统负载较低的时候才执行一个中断的具体实现行为。如你所知,中断处理代码运行于禁止响应后续中断的中断处理上下文中,所以要避免长时间执行。但有时中断处理却又有很多的工作需要执行,所以中断处理有时会被分为两部分。第一部分中,中断处理先只做少量的最重要工作,接下来提交第二部分到内核调度,然后就结束运行。当系统比较空闲并且处理器上下文允许处理中断时,第二部分就会开始执行被延后的剩余中断任务。以上是对延后中断处理的简要介绍。

就像上面说的,延后中断(或者叫软中断)和tasklets是由一些内核线程(每个处理器一个线程)来执行的。每个处理器都有自己的内核线程,名字叫做ksoftirqd/nn是处理器的编号。我们可以通过系统命令systemd-cgls看到它们:

$ systemd-cgls -k | grep ksoft
├─   3 [ksoftirqd/0]
├─  13 [ksoftirqd/1]
├─  18 [ksoftirqd/2]
├─  23 [ksoftirqd/3]
├─  28 [ksoftirqd/4]
├─  33 [ksoftirqd/5]
├─  38 [ksoftirqd/6]
├─  43 [ksoftirqd/7]

spawn_ksoftirqd函数启动这些线程。就像我们看到的,这个函数在早期的initcall被调用。

early_initcall(spawn_ksoftirqd);

延后中断在Linux内核编译时就静态的确定了open_softirq函数负责softirq初始化。open_softirqkernel/softirq.c中定义:

void open_softirq(int nr, void (*action)(struct softirq_action *))
{
	softirq_vec[nr].action = action;
}

这个函数有两个参数:

  • softirq_vec数组的索引序号
  • 一个指向软中断处理函数的指针

我们首先来看softirq_vec数组:

static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

它在同一个源文件中定义。softirq_vec数组包含了NR_SOFTIRQS(其值为10)个不同softirq类型的softirq_action。当前版本的Linux内核定义了十种软中断向量。其中两个tasklet相关两个网络相关两个块处理层相关两个定时器相关另外调度器和RCU也各占一个。所有这些都在一个枚举中定义

enum
{
        HI_SOFTIRQ=0,
        TIMER_SOFTIRQ,
        NET_TX_SOFTIRQ,
        NET_RX_SOFTIRQ,
        BLOCK_SOFTIRQ,
        BLOCK_IOPOLL_SOFTIRQ,
        TASKLET_SOFTIRQ,
        SCHED_SOFTIRQ,
        HRTIMER_SOFTIRQ,
        RCU_SOFTIRQ,
        NR_SOFTIRQS
};

以上软中断的名字在如下的数组中定义:

const char * const softirq_to_name[NR_SOFTIRQS] = {
        "HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "BLOCK_IOPOLL",
        "TASKLET", "SCHED", "HRTIMER", "RCU"
};

我们也可以在'/proc/softirqs'的输出中看到他们:

~$ cat /proc/softirqs
                    CPU0       CPU1       CPU2       CPU3       CPU4       CPU5       CPU6       CPU7
          HI:          5          0          0          0          0          0          0          0
       TIMER:     332519     310498     289555     272913     282535     279467     282895     270979
      NET_TX:       2320          0          0          2          1          1          0          0
      NET_RX:     270221        225        338        281        311        262        430        265
       BLOCK:     134282         32         40         10         12          7          8          8
BLOCK_IOPOLL:          0          0          0          0          0          0          0          0
     TASKLET:     196835          2          3          0          0          0          0          0
       SCHED:     161852     146745     129539     126064     127998     128014     120243     117391
     HRTIMER:          0          0          0          0          0          0          0          0
         RCU:     337707     289397     251874     239796     254377     254898     267497     256624

可以看到softirq_vec数组的元素类型为softirq_action。这是软中断机制里一个重要的数据结构,它只有一个指向中断处理函数的成员:

struct softirq_action
{
         void    (*action)(struct softirq_action *);
};

现在我们可以理解到open_softirq函数实际上用softirq_action参数填充了softirq_vec数组。由open_softirq注册的延后中断处理函数会由raise_softirq调用。这个函数只有一个参数 -- 软中断序号nr。来看下它的实现:

void raise_softirq(unsigned int nr)
{
        unsigned long flags;

        local_irq_save(flags);
        raise_softirq_irqoff(nr);
        local_irq_restore(flags);
}

可以看到在local_irq_savelocal_irq_restore两个宏中间调用了raise_softirq_irqoff函数。local_irq_save的定义位于include/linux/irqflags.h头文件,它保存了eflags寄存器中的IF标志位并且禁用了当前处理器的中断。local_irq_restore宏定义于同一头文件中,它做了完全相反的事情:装回之前保存的中断标志位然后允许中断。这里之所以要禁用中断是因为softirq中断运行于中断上下文中,并且????????????????????

raise_softirq_irqoff函数设置当前处理器上和nr参数对应的软中断标志位(__softirq_pending)。这是通过以下代码做到的:

__raise_softirq_irqoff(nr);

然后,通过in_interrupt函数获得irq_count值。我们在这一章的第一小节已经知道它是用来检测一个cpu是否已经有软中断需要处理。如果我们不处于中断上下文中我们就退出raise_softirq_irqoff函数,装回IF标志位并允许当前处理器的中断。如果在中断上下文中,就会调用wakeup_softirqd函数:

if (!in_interrupt())
	wakeup_softirqd();

wakeup_softirqd函数会激活当前处理器上的ksoftirqd内核线程:

static void wakeup_softirqd(void)
{
	struct task_struct *tsk = __this_cpu_read(ksoftirqd);

    if (tsk && tsk->state != TASK_RUNNING)
        wake_up_process(tsk);
}

每个ksoftirqd内核线程都运行run_ksoftirqd函数来检测是否有延后中断需要处理,如果有的话就会调用__do_softirq函数。__do_softirq读取当前处理器对应的__softirq_pending软中断标记,并调用所有已被标记中断对应的处理函数。在执行一个延后函数的同时,可能会发生新的软中断。这会导致用户态代码由于__do_softirq要处理很多延后中断而很长时间不能返回。为了解决这个问题,系统限制了延后中断处理的最大耗时:

unsigned long end = jiffies + MAX_SOFTIRQ_TIME;
...
...
...
restart:
while ((softirq_bit = ffs(pending))) {
	...
	h->action(h);
	...
}
...
...
...
pending = local_softirq_pending();
if (pending) {
	if (time_before(jiffies, end) && !need_resched() &&
		--max_restart)
            goto restart;
}
...

除周期性检测是否有延后中断需要执行之外,系统还会在一些关键时间点上检测。一个主要的检测时间点就是当定义在arch/x86/kernel/irq.c的函数do_IRQ被调用时这是Linux内核中执行延后中断的主要时机。在这个函数要完成中断处理时它会调用arch/x86/include/asm/apic.h中定义的exiting_irq函数,exiting_irq又调用了irq_exitirq_exit函数会检测当前处理器上下文是否有延后中断,有的话就会调用invoke_softirq

if (!in_interrupt() && local_softirq_pending())
    invoke_softirq();

这样就调用到了我们上面提到的__do_softirq。每个softirq都有如下的阶段:通过open_softirq函数注册一个软中断,通过raise_softirq函数标记一个延后中断来激活它然后所有被标记的软中断将会在Linux内核下一次执行周期性延后中断检测时得以调度对应此类型中断的处理函数也就得以执行。

如上所讲,软中断是静态分配的,但这对于后期加载的内核模块是一个问题。基于软中断的tasklets解决了这个问题。

Tasklets

如果你阅读Linux内核源码中软中断相关的代码你会发现它很少会被用到。内核中实现延后中断的主要途径是tasklets。正如上面说的,tasklets是构建于softirq中断之上,他是基于下面两个软中断实现的:

  • TASKLET_SOFTIRQ;
  • HI_SOFTIRQ.

简而言之,tasklets是运行时分配和初始化的软中断。和软中断不同的是,同一类型的tasklets可以同一时间运行在不同的处理器上。我们已经了解到一些软中断的知识,当然上面的文字并不能详细讲解所有的细节,但我们现在可以通过直接阅读代码一步步的更深入了解软中断。我们返回到开始部分讨论的softirq_init函数实现,这个函数在kernel/softirq.c中定义如下:

void __init softirq_init(void)
{
        int cpu;

        for_each_possible_cpu(cpu) {
                per_cpu(tasklet_vec, cpu).tail =
                        &per_cpu(tasklet_vec, cpu).head;
                per_cpu(tasklet_hi_vec, cpu).tail =
                        &per_cpu(tasklet_hi_vec, cpu).head;
        }

        open_softirq(TASKLET_SOFTIRQ, tasklet_action);
        open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

可以看到在函数开头定义了一个integer类型的变量cpu。接下来他会作为参数传递给宏for_each_possible_cpu来获得系统中所有的处理器。如果possible_cpu对你来说是一个新的术语,你可以阅读CPU masks章节来了解更多知识。简单的说,possible_cpu是系统运行期间随时插入的处理器集合。所有的possible processor存储在cpu_possible_bits位图中,你可以在kernel/cpu.c中找到他的定义:

static DECLARE_BITMAP(cpu_possible_bits, CONFIG_NR_CPUS) __read_mostly;
...
...
...
const struct cpumask *const cpu_possible_mask = to_cpumask(cpu_possible_bits);

好了我们定义了integer类型变量cpu并且通过for_each_possible_cpu宏遍历了所有处理器,初始化了两个per-cpu变量:

  • tasklet_vec;
  • tasklet_hi_vec;

这两个per-cpu变量和softirq_init函数都定义在code中,他们被定义为tasklet_head类型:

static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);

tasklet_head结构代表一组Tasklets它包含两个成员head和tail

struct tasklet_head {
        struct tasklet_struct *head;
        struct tasklet_struct **tail;
};

tasklet_struct数据类型在include/linux/interrupt.h中定义,它代表一个Tasklet。这本书之前部分我们没有见过这个单词,那我们先试着理解一下Tasklet究竟为何物。实际上,Tasklet是处理延后中断的一种机制,来看一下tasklet_struct的具体定义:

struct tasklet_struct
{
        struct tasklet_struct *next;
        unsigned long state;
        atomic_t count;
        void (*func)(unsigned long);
        unsigned long data;
};

这个数据结构包含有下面5个成员

  • 调度队列中的下一个Tasklet;
  • 当前这个Tasklet的状态;
  • 代表这个Tasklet是否处于活动状态;
  • Tasklet的回调函数;
  • 回调的参数.

上面代码中,在softirq_init函数中初始化了两个tasklets数组tasklet_vectasklet_hi_vec。Tasklets和高优先级Tasklets分别存储于这两个数组中。初始化完成后我们看到代码kernel/softirq.csoftirq_init函数的最后又两次调用了open_softirq

open_softirq(TASKLET_SOFTIRQ, tasklet_action);
open_softirq(HI_SOFTIRQ, tasklet_hi_action);

open_softirq函数的主要作用是用软中断处理函数初始化软中断接下来让我们看看它是怎么做的。和Tasklets相关的软中断处理函数有两个分别是tasklet_actiontasklet_hi_action。其中tasklet_hi_actionHI_SOFTIRQ关联在一起,tasklet_actionTASKLET_SOFTIRQ关联在一起。

Linux内核提供一些API供操作Tasklets之用。首先是tasklet_init函数,它接受一个task_struct数据结构,一个处理函数,和另外一个参数,并利用这些参数来初始化所给的task_struct结构:

void tasklet_init(struct tasklet_struct *t,
                  void (*func)(unsigned long), unsigned long data)
{
    t->next = NULL;
    t->state = 0;
    atomic_set(&t->count, 0);
    t->func = func;
    t->data = data;
}

另外还有如下两个宏可以静态的初始化一个tasklet

DECLARE_TASKLET(name, func, data);
DECLARE_TASKLET_DISABLED(name, func, data);

Linux内核提供三个函数标记一个tasklet已经准备就绪

void tasklet_schedule(struct tasklet_struct *t);
void tasklet_hi_schedule(struct tasklet_struct *t);
void tasklet_hi_schedule_first(struct tasklet_struct *t);

第一个函数使用普通优先级调度一个tasklet第二个使用高优先级第三个则用更高优先级。所有这三个函数的实现都很类似所以我们只看一下第一个tasklet_schedule的实现:

static inline void tasklet_schedule(struct tasklet_struct *t)
{
    if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
        __tasklet_schedule(t);
}

void __tasklet_schedule(struct tasklet_struct *t)
{
        unsigned long flags;

        local_irq_save(flags);
        t->next = NULL;
        *__this_cpu_read(tasklet_vec.tail) = t;
        __this_cpu_write(tasklet_vec.tail, &(t->next));
        raise_softirq_irqoff(TASKLET_SOFTIRQ);
        local_irq_restore(flags);
}

我们看到它检测并设置所给的tasklet为TASKLET_STATE_SCHED状态然后以所给tasklet为参数执行了__tasklet_schedule函数。__tasklet_schedule看起来和前面见到的raise_softirq很像。一开始它保存中断标志并禁用中断继而将新的tasklet添加到tasklet_vec,然后调用了我们前面见过的raise_softirq_irqoff函数。当Linux内核调度器决定去运行一个延后函数tasklet_action函数会被被作为和TASKLET_SOFTIRQ相关联的延后函数调用。同样的,tasklet_hi_action会被作为和HI_SOFTIRQ相关联的延后函数调用。这些函数之所以如此相似是因为他们之间只有一个地方不同 --- tasklet_action使用tasklet_vectasklet_hi_action使用tasklet_hi_vec

让我们看下tasklet_action函数的实现:

static void tasklet_action(struct softirq_action *a)
{
    local_irq_disable();
    list = __this_cpu_read(tasklet_vec.head);
    __this_cpu_write(tasklet_vec.head, NULL);
    __this_cpu_write(tasklet_vec.tail, this_cpu_ptr(&tasklet_vec.head));
    local_irq_enable();

    while (list) {
		if (tasklet_trylock(t)) {
	        t->func(t->data);
            tasklet_unlock(t);
	    }
		...
		...
		...
    }
}

tasklet_action开始时利用local_irq_disable宏禁用了当前处理器的中断(你可以阅读本书第二部分了解更多关于此宏的信息)。接下来获取到当前处理器对应的普通优先级tasklet列表并把它设置为NULL这是因为所有的tasklet都将被执行。然后使能当前处理器的中断循环遍历tasklet列表每一次遍历都会对当前tasklet调用tasklet_trylock函数来更新它的状态为TASKLET_STATE_RUN

static inline int tasklet_trylock(struct tasklet_struct *t)
{
    return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}

如果这个操作成功了就会执行此tasklet的处理函数(我们在tasklet_init中所设置的),然后调用tasklet_unlock函数清除他的TASKLET_STATE_RUN状态。

通常情况下,这就是tasklet的所有概念。当然这些还不足以覆盖所有的tasklets,但是我想这是一个继续学习下去的很好的切入点。

tasklets在Linux内核中是一个广泛使用的概念,但就像我在本章开头所写的,还有第三个延后延后函数 -- 工作队列。接下来我们将会看看它又是怎样一种机制。

工作队列

工作队列是另外一个处理延后函数的概念,它大体上和tasklets类似。工作队列运行于内核进程上下文,而tasklets运行于软中断上下文。这意味着工作队列函数不必像tasklets一样必须是原子性的。Tasklets总是运行于它提交自的那个处理器工作队列在默认情况下也是这样。工作队列在Linux内核代码kernel/workqueue.c中由如下的数据结构表示:

struct worker_pool {
    spinlock_t              lock;
    int                     cpu;
    int                     node;
    int                     id;
    unsigned int            flags;

    struct list_head        worklist;
    int                     nr_workers;
...
...
...

因为这个结构有非常多的成员,这里就不把它们全部罗列出来,下面只讨论上面列出的这几个。

工作队列最基础的用法,是作为创建内核线程的接口来处理提交到队列里的工作任务。所有这些内核线程称之为worker thread。工作队列内的任务是由代码include/linux/workqueue.h中定义的work_struct表示的,起定义如下:

struct work_struct {
    atomic_long_t data;
    struct list_head entry;
    work_func_t func;
#ifdef CONFIG_LOCKDEP
    struct lockdep_map lockdep_map;
#endif
};

这里有两个字段比较有意思:func--将被工作队列调度执行的函数,data--这个函数的参数。Linux内核提供了称之为kworker的特定于每个cpu的内核线程

systemd-cgls -k | grep kworker
├─    5 [kworker/0:0H]
├─   15 [kworker/1:0H]
├─   20 [kworker/2:0H]
├─   25 [kworker/3:0H]
├─   30 [kworker/4:0H]
...
...
...

这些线程会被用来调度执行工作队列的延后函数(就像ksoftirqd之于软中断)。除此之外我们还可以为一个工作队列创建一个新的工作线程。Linux内核提供了如下宏静态创建一个队列

#define DECLARE_WORK(n, f) \
    struct work_struct n = __WORK_INITIALIZER(n, f)

它需要两个参数:工作队列的名字和工作队列的函数。我们还可以在运行时动态创建:

#define INIT_WORK(_work, _func)       \
    __INIT_WORK((_work), (_func), 0)

#define __INIT_WORK(_work, _func, _onstack)                     \
    do {                                                        \
            __init_work((_work), _onstack);                     \
            (_work)->data = (atomic_long_t) WORK_DATA_INIT();   \
            INIT_LIST_HEAD(&(_work)->entry);                    \
             (_work)->func = (_func);                           \
    } while (0)

这个宏需要一个work_struct数据结构作为将要创建的工作队列,和一个将在这个队列里调度运行的函数。通过这其中一个宏穿件一个work后,我们需要把它放到工作队列中去。可以通过queue_work或者queue_delayed_work来做到这一点:

static inline bool queue_work(struct workqueue_struct *wq,
                              struct work_struct *work)
{
    return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

queue_work只是调用了queue_work_on函数指定相应的处理器。注意这里给queue_work_on函数传递了WORK_CPU_UNBOUND参数,它作为代表工作队列要绑定到哪一个处理器的枚举一员,定义于include/linux/workqueue.hqueue_work_on函数测试并设置所给任务WORK_STRUCT_PENDING_BIT标志位,然后以所给的工作队列和工作任务为参数执行__queue_work函数:

bool queue_work_on(int cpu, struct workqueue_struct *wq,
           struct work_struct *work)
{
    bool ret = false;
    ...
    if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
        __queue_work(cpu, wq, work);
        ret = true;
    }
    ...
    return ret;
}