Linux-Interrupt笔记知识详解笔记

中断

中断是外围设备通知处理器的一种机制

典型的例子是:网卡从网络收到报文,把报文放到接收环,然后发送中断请求通知处理器,接着处理器响应中断请求,执行中断处理程序,从网卡的接收环取走报文;
网卡驱动程序发送报文的时候,把报文放到网卡的发送环,当网卡从发送环取出报文发送的时候,发送中断请求通知处理器发送完成。

中断控制器

外围设备不是把中断请求直接发给处理器,而是发给中断控制器,由中断控制器转发给处理器。ARM公司提供了一种标准的中断控制器,称为通用中断控制器(Generic Interrupt Controller, GIC)。目前GIC架构规范有4个版本:v1~v4。GIC v2最多支持8个处理器,GIC v3最多支持128个处理器,GIC v3和GIC v4只支持ARM64处理器。

GIC硬件的实现形态有两种。

  • 厂商研发自己的ARM处理器,向ARM公司购买GIC的授权,ARM公司提供的GIC型号有:GIC-400、GIC-500和GIC-600。GIC-400遵循GIC v2规范,GIC-500和GIC-600遵循GIC v3规范。
  • 厂商直接向ARM公司购买处理器的授权,这些处理器包含了GIC。

从软件的角度看,GIC v2控制器有两个主要的功能块。

  • 分发器(Distributor):系统中所有的中断源连接到分发器,分发器的寄存器用来控制单个中断的属性:优先级、状态、安全、转发信息(可以被发送到哪些处理器)和使能状态。分发器决定哪个中断应该通过处理器接口转发到哪个处理器。
  • 处理器接口(CPU Interface):处理器通过处理器接口接收中断。处理器接口提供的寄存器用来屏蔽和识别中断,控制中断的状态。每个处理器有一个单独的处理器接口。

软件通过中断号识别中断,每个中断号唯一对应一个中断源。中断有以下4种类型。

  • 软件生成的中断(Software Generated Interrupt, SGI):中断号0~15,通常用来实现处理器间中断(Inter-Processor Interrupt, IPI)。
    这种中断是由软件写分发器的软件生成中断寄存器(GICD_SGIR)生成的。
  • 私有外设中断(Private Peripheral Interrupt, PPI):中断号16~31。
    处理器私有的中断源,不同处理器的相同中断源没有关系,比如每个处理器的定时器。
  • 共享外设中断(Shared Peripheral Interrupt, SPI):中断号32~1020。
    这种中断可以被中断控制器转发到多个处理器。
  • 局部特定外设中断(Locality-specific Peripheral Interrupt, LPI):基于消息的中断。
    GIC v1和GIC v2不支持LPI。

中断可以是边沿触发(edge-triggered),也可以是电平触发(level-triggered)。

边沿触发是在电压变化的一瞬间触发,电压由高到低变化触发的中断称为下降沿触发,电压由低到高变化触发的中断称为上升沿触发。
电平触发是在高电压或低电压保持的时间内触发,低电压触发的中断称为低电平触发,高电压触发的中断称为高电平触发。

中断有以下4种状态。

  • Inactive:中断源没有发送中断。
  • Pending:中断源已经发送中断,等待处理器处理。
  • Active:处理器已经确认中断,正在处理。
  • Active and pending:处理器正在处理中断,相同的中断源又发送了一个中断。

中断的状态转换过程如下。

  • Inactive -> Pending:外围设备发送了中断。
  • Pending -> Active:处理器确认了中断。
  • Active -> Inactive:处理器处理完中断。

处理器可以通过中断控制器的寄存器访问中断控制器。
中断控制器的寄存器和物理内存使用统一的物理地址空间,把寄存器的物理地址映射到内核的虚拟地址空间,可以像访问内存一样访问寄存器。
所有处理器可以访问公共的分发器,但是每个处理器使用相同的地址只能访问自己私有的处理器接口

外围设备把中断发送给分发器,如果中断的状态是inactive,那么切换到pending;如果中断的状态已经是active,那么切换到active and pending。
分发器取出优先级最高的状态为pending的中断,转发到目标处理器的处理器接口,然后处理器接口把中断发送到处理器。

处理器取出中断,执行中断处理程序,中断处理程序读取处理器接口的中断确认寄存器(Interrupt Acknowledge Register),得到中断号,读取操作导致分发器里面的中断状态切换到active。
中断处理程序根据中断号可以知道中断是由哪个设备发出的,从而调用该设备的处理程序。

中断处理程序执行完的时候,把中断号写到处理器接口的中断结束寄存器(End of Interrupt Register)中, 指示中断处理完成,分发器里面的中断状态从active切换到inactive,或者从active and pending切换到pending。

不同种类的中断控制器的访问方法存在差异,为了屏蔽差异,内核定义了中断控制器描述符irq_chip,每种中断控制器自定义各种操作函数。

GIC v2控制器的描述符如下:

中断域

一个大型系统可能有多个中断控制器,这些中断控制器可以级联,一个中断控制器作为中断源连接到另一个中断控制器,但只有一个中断控制器作为根控制器直接连接到处理器。
为了把每个中断控制器本地的硬件中断号映射到全局唯一的Linux中断号(也称为虚拟中断号),内核定义了中断域irq_domain,每个中断控制器有自己的中断域。

创建中断域

一个大型系统可能有多个中断控制器,这些中断控制器可以级联,一个中断控制器作为中断源连接到另一个中断控制器,但只有一个中断控制器作为根控制器直接连接到处理器。
为了把每个中断控制器本地的硬件中断号映射到全局唯一的Linux中断号(也称为虚拟中断号),内核定义了中断域irq_domain,每个中断控制器有自己的中断域。

中断控制器的驱动程序使用分配函数irq_domain_add_*()创建和注册中断域。每种映射方法提供不同的分配函数,调用者必须给分配函数提供irq_domain_ops结构体,分配函数在执行成功的时候返回irq_domain的指针。

中断域支持以下映射方法。

  • 线性映射(linear map)
    线性映射维护一个固定大小的表,索引是硬件中断号。
    如果硬件中断号的最大数量是固定的,并且比较小(小于256),那么线性映射是好的选择。
    对于线性映射,分配中断域的函数如下
  • 树映射(tree map)。
    树映射使用基数树(radix tree)保存硬件中断号到Linux中断号的映射。
    如果硬件中断号可能非常大,那么树映射是好的选择,因为不需要根据最大硬件中断号分配一个很大的表。
    对于树映射,分配中断域的函数如下
  • 不映射(no map)
    有些中断控制器很强,硬件中断号是可以配置的,例如PowerPC架构使用的MPIC (Multi-Processor Interrupt Controller)。
    我们直接把Linux中断号写到硬件,硬件中断号就是Linux中断号,不需要映射。
    对于不映射,分配中断域的函数如下

    分配函数把主要工作委托给函数irq_domain_add()。
    函数
    irq_domain_add()的执行过程是:分配一个irq_domain结构体,初始化成员,然后把中断域添加到全局链表irq_domain_list中。

创建映射

创建中断域以后,需要向中断域添加硬件中断号到Linux中断号的映射,内核提供了函数irq_create_mapping

输入参数是中断域和硬件中断号,返回Linux中断号。
该函数首先分配Linux中断号,然后把硬件中断号到Linux中断号的映射添加到中断域。

查找映射

中断处理程序需要根据硬件中断号查找Linux中断号,内核提供了函数irq_find_mapping

输入参数是中断域和硬件中断号,返回Linux中断号。

Linux中断处理

对于中断控制器的每个中断源,向中断域添加硬件中断号到Linux中断号的映射时,内核分配一个Linux中断号和一个中断描述符irq_desc,如图所示,中断描述符有两个层次的中断处理函数。

  • 第一层处理函数是中断描述符的成员handle_irq()。
  • 第二层处理函数是设备驱动程序注册的处理函数。中断描述符有一个中断处理链表(irq_desc.action),每个中断处理描述符(irq_action)保存设备驱动程序注册的处理函数。因为多个设备可以共享同一个硬件中断号,所以中断处理链表可能挂载多个中断处理描述符。

怎么存储Linux中断号到中断描述符的映射关系?有两种实现方式。

  • 如果中断编号是稀疏的(即不连续),那么使用基数树(radix tree)存储。需要开启配置宏CONFIG_SPARSE_IRQ。
  • 如果中断编号是连续的,那么使用数组存储。

ARM64架构默认开启配置宏CONFIG_SPARSE_IRQ,使用基数树存储。

把硬件中断号映射到Linux中断号的时候,根据硬件中断的类型设置中断描述符的成员handle_irq(),以GIC v2控制器为例,函数gic_irq_domain_map所做的处理如下所示。

  • 如果硬件中断号小于32,说明是软件生成的中断或私有外设中断,那么把中断描述符的成员handle_irq()设置为函数handle_percpu_devid_irq。
  • 如果硬件中断号大于或等于32,说明是共享外设中断,那么把中断描述符的成员handle_irq()设置为函数handle_fasteoi_irq。

设备驱动程序可以使用函数request_irq()注册中断处理函数:

1
int request_irq(unsigned int irq, irq_handler_t handler, unsigned long flags, const char *name, void *dev);
  • 参数irq是Linux中断号。
  • 参数handler是处理函数。
  • 参数fags是标志位,可以是0或者以下标志位的组合。
    • IRQF_SHARED:允许多个设备共享同一个中断号。
    • __IRQF_TIMER:定时器中断。
    • IRQF_PERCPU:中断是每个处理器私有的。
    • IRQF_NOBALANCING:不允许该中断在处理器之间负载均衡。
    • IRQF_NO_THREAD:中断不能线程化。
  • 参数name是设备名称。
  • 参数dev是传给处理函数(由参数handler指定)的参数。

在ARM64架构下,在异常级别1的异常向量表中,中断的入口有3个。

  • 如果处理器处在内核模式(异常级别1),中断的入口是el1_irq。
  • 如果处理器正在用户模式(异常级别0)下执行64位应用程序,中断的入口是el0_irq。
  • 如果处理器正在用户模式(异常级别0)下执行32位应用程序,中断的入口是el0_irq_compat。

假设处理器正在用户模式(异常级别0)下执行64位应用程序,中断控制器是GIC v2控制器,Linux中断处理流程如图所示

  • 读取处理器接口的中断确认寄存器得到中断号,分发器里面的中断状态切换到active。
  • 如果硬件中断号大于15且小于1020,即中断是由外围设备发送的,处理如下。
    • 把中断号写到处理器接口的中断结束寄存器中,指示中断处理完成,分发器里面的中断状态从active切换到inactive,或者从active and pending切换到pending。
    • 调用函数irq_enter(),进入中断上下文。
    • 调用函数irq_find_mapping(),根据硬件中断号查找Linux中断号。
    • 调用中断描述符的成员handle_irq()。
    • 调用函数irq_exit(),退出中断上下文。
  • 如果硬件中断号小于16,即中断是由软件生成的,处理如下。
    • 把中断号写到处理器接口的中断结束寄存器中,指示中断处理完成。
    • 调用函数handle_IPI()进行处理。

函数el0_irq的代码如下

  • 把进程的寄存器值保存到内核栈。
  • 开启调试异常。
  • irq_handler是一个宏,执行过程如下。

    • 从进程的内核栈切换到中断栈。每个处理器有一个专用的中断栈:
      1
      2
      arch/arm64/kerne1/ irq. c
      DEFINE_PER_CPU(unsigned 1ong [IRQ_STACK_SIZE/sizeof (1ong)],irg_stack)__aligned(16) ;
  • 调用函数指针handle_arch_irq指向的函数。中断控制器在内核初始化的时候设置函数指针handle_arch_irq, GIC v2控制器把该函数指针设置为函数gic_handle_irq。

  • 从中断栈切换到进程的内核栈。
  • 使用内核栈保存的寄存器值恢复进程的寄存器,返回用户模式。

GIC v2控制器的函数gic_handle_irq的代码如下

如果是私有外设中断,那么中断描述符的成员handle_irq()是函数handle_percpu_devid_irq,其代码如下

如果是共享外设中断,那么中断描述符的成员handle_irq()是函数handle_fasteoi_irq,其代码如下

调用函数handle_irq_event,执行设备驱动程序注册的处理函数。
函数handle_irq_event把主要工作委托给函数__handle_irq_event_percpu
函数__handle_irq_event_percpu遍历中断描述符的中断处理链表,执行每个中断处理描述符的处理函数,其代码如下

中断线程化

中断线程化就是使用内核线程处理中断,目的是减少系统关中断的时间,增强系统的实时性。
内核提供的函数request_threaded_irq()用来注册线程化的中断

参数thread_fn是线程处理函数。
少数中断不能线程化,典型的例子是时钟中断,有些流氓进程不主动让出处理器,内核只能依靠周期性的时钟中断夺回处理器的控制权,时钟中断是调度器的脉搏。
对于不能线程化的中断,注册处理函数的时候必须设置标志IRQF_NO_THREAD
如果开启了强制中断线程化的配置宏CONFIG_IRQ_FORCED_THREADING,并且在引导内核的时候指定内核参数“threadirqs”,那么强制除了标记IRQF_NO_THREAD以外的所有中断线程化。
ARM64架构默认开启配置宏CONFIG_IRQ_FORCED_THREADING

每个中断处理描述符(irqaction)对应一个内核线程,成员thread指向内核线程的进程描述符,成员thread_fn指向线程处理函数,其代码如下

可以看到,中断处理线程是优先级为50、调度策略是SCHED_FIFO的实时内核线程,名称是“irq/”后面跟着Linux中断号,线程处理函数是irq_thread()。

在中断处理程序中,函数__handle_irq_event_percpu遍历中断描述符的中断处理链表,执行每个中断处理描述符的处理函数。
如果处理函数返回IRQ_WAKE_THREAD,说明是线程化的中断,那么唤醒中断处理线程。

中断处理线程的处理函数是irq_thread(),调用函数irq_thread_fn(),然后函数irq_thread_fn()调用注册的线程处理函数。

禁止/开启中断

软件可以禁止中断,使处理器不响应所有中断请求,但是不可屏蔽中断(Non Maskable Interrupt,NMI)是个例外。

禁止中断的接口如下

  • local_irq_disable()。
  • local_irq_save(fags):首先把中断状态保存在参数fags中,然后禁止中断。

这两个接口只能禁止本处理器的中断,不能禁止其他处理器的中断。禁止中断以后,处理器不会响应中断请求。

开启中断的接口如下

  • local_irq_enable()。
  • local_irq_restore(fags):恢复本处理器的中断状态。local_irq_disable()和local_irq_enable()不能嵌套使用,local_irq_save(fags)和local_irq_restore(fags)可以嵌套使用。

ARM64架构禁止中断的函数local_irq_disable()如下

把处理器状态的中断掩码位设置成1,从此以后处理器不会响应中断请求。
ARM64架构开启中断的函数local_irq_enable()如下:

把处理器状态的中断掩码位设置成0

禁止/开启单个中断

软件可以禁止某个外围设备的中断,中断控制器不会把该设备发送的中断转发给处理器。

禁止单个中断的函数是

1
void disable_irq(unsigned int irq);

参数irq是Linux中断号。

开启单个中断的函数是

1
void enable_irq(unsigned int irq);

参数irq是Linux中断号。

对于ARM64架构的GIC控制器,如果需要开启硬件中断n,那么设置分发器的寄存器GICD_ISENABLERn(Interrupt Set-Enable Register);
如果需要禁止硬件中断n,那么设置分发器的寄存器GICD_ICENABLERn(Interrupt Clear-Enable Register)。

假设某个外围设备的硬件中断号是n,当这个外围设备发送中断给分发器的时候,只有在分发器上开启了硬件中断n,分发器才会把硬件中断n转发给处理器。

中断亲和性

在多处理器系统中,管理员可以设置中断亲和性,允许中断控制器把某个中断转发给哪些处理器,有两种配置方法。

  • 写文件“/proc/irq/IRQ#/smp_affinity”,参数是位掩码。
  • 写文件“/proc/irq/IRQ#/smp_affinity_list”,参数是处理器列表。

例如,管理员想要配置允许中断控制器把Linux中断号为32的中断转发给处理器0~3,配置方法有两种。

  • echo 0f > /proc/irq/32/smp_affinity
  • echo 0-3 > /proc/irq/32/smp_affinity_list

配置完以后,可以连续执行命令“cat /proc/interrupts | grep 'CPU\|32:'”,观察是否只有处理器0~3收到了Linux中断号为32的中断。

内核提供了设置中断亲和性的函数

1
int irq_set_affinity(unsigned int irq, const struct cpumask *cpumask);

参数irq是Linux中断号,参数cpumask是处理器位掩码。

对于ARM64架构的GIC控制器,可以设置分发器的寄存器GICD_ITARGETSRn(中断目标寄存器,Interrupt Targets Register)允许把硬件中断n转发到哪些处理器,硬件中断n必须是共享外设中断。

处理器间中断

处理器间中断(Inter-Processor Interrupt, IPI)是一种特殊的中断,在多处理器系统中,一个处理器可以向其他处理器发送中断,要求目标处理器执行某件事情。常见的使用处理器间中断的函数如下。

  • 在所有其他处理器上执行一个函数。
1
int smp_call_function(smp_call_func_t func, void *info, int wait);

参数func是要执行的函数,目标处理器在中断处理程序中执行该函数;
参数info是传给函数func的参数;参数wait表示是否需要等待目标处理器执行完函数。

  • 在指定的处理器上执行一个函数
1
2
3
4
5
6
7
int smp_call_function_single(int cpu, smp_call_func_t func, void *info, int wait);
```

- 要求指定的处理器重新调度进程

```c
void smp_send_reschedule(int cpu);

对于ARM64架构的GIC控制器,把处理器间中断称为软件生成的中断,可以写分发器的寄存器GICD_SGIR(软件生成中断寄存器,Software Generated Interrupt Register)以生成处理器间中断。

假设处理器正在用户模式(异常级别0)下执行64位应用程序,中断控制器是GIC v2控制器,处理处理器间中断的执行流程如图所示。

函数handle_IPI负责处理处理器间中断,参数ipinr是硬件中断号,其代码如下


目前支持7种处理器间中断。

  • IPI_RESCHEDULE:硬件中断号是0,重新调度进程,函数smp_send_reschedule()生成的中断。
  • IPI_CALL_FUNC:硬件中断号是1,执行函数,函数smp_call_function()生成的中断。
  • IPI_CPU_STOP:硬件中断号是2,使处理器停止,函数smp_send_stop()生成的中断。
  • IPI_CPU_CRASH_STOP:硬件中断号是3,使处理器停止,函数smp_send_crash_stop()生成的中断。
  • IPI_TIMER:硬件中断号是4,广播的时钟事件,函数tick_broadcast()生成的中断。
  • IPI_IRQ_WORK:硬件中断号是5,在硬中断上下文中执行回调函数,函数irq_work_queue()生成的中断。
  • IPI_WAKEUP:硬件中断号是6,唤醒处理器,函数acpi_parking_protocol_cpu_boot()生成的中断。





中断下半部

为了避免处理复杂的中断嵌套,中断处理程序是在关闭中断的情况下执行的。可是,如果关闭中断的时间太长,可能导致中断请求丢失。例如周期时钟每隔10毫秒发送一个中断请求,如果执行某个中断处理程序花费的时间超过10毫秒,在这段时间里时钟发送了两个中断请求,但是处理器只认为收到一个时钟中断请求。

最激进的解决办法是中断线程化,但是常用的解决办法是:把中断处理程序分为两部分,上半部(top half, th)在关闭中断的情况下执行,只做对时间非常敏感、与硬件相关或者不能被其他中断打断的工作;下半部(bottom half, bh)在开启中断的情况下执行,可以被其他中断打断。

上半部称为硬中断(hardirq),下半部有3种:软中断(softirq)、小任务(tasklet)和工作队列(workqueue)。3种下半部的区别如下。

  • 软中断和小任务不允许睡眠;工作队列是使用内核线程实现的,处理函数可以睡眠。
  • 软中断的种类是编译时静态定义的,在运行时不能添加或删除;小任务可以在运行时添加或删除。
  • 同一种软中断的处理函数可以在多个处理器上同时执行,处理函数必须是可以重入的,需要使用锁保护临界区;一个小任务同一时刻只能在一个处理器上执行,不要求处理函数是可以重入的。

软中断

软中断(softirq)是中断处理程序在开启中断的情况下执行的部分,可以被硬中断抢占。
内核定义了一张软中断向量表,每种软中断有一个唯一的编号,对应一个softirq_action实例,softirq_action实例的成员action是处理函数。

软中断的种类

目前内核定义了10种软中断,各种软中断的编号如下

  • HI_SOFTIRQ:高优先级的小任务。
  • TIMER_SOFTIRQ:定时器软中断。
  • NET_TX_SOFTIRQ:网络栈发送报文的软中断。
  • NET_RX_SOFTIRQ:网络栈接收报文的软中断。
  • BLOCK_SOFTIRQ:块设备软中断。
  • IRQ_POLL_SOFTIRQ:支持I/O轮询的块设备软中断。
  • TASKLET_SOFTIRQ:低优先级的小任务。
  • SCHED_SOFTIRQ:调度软中断,用于在处理器之间负载均衡。
  • HRTIMER_SOFTIRQ:高精度定时器,这种软中断已经被废弃,目前在中断处理程序的上半部处理高精度定时器。
  • RCU_SOFTIRQ:RCU软中断。

软中断的编号形成了优先级顺序,编号小的软中断优先级高。

注册软中断的处理函数

函数open_softirq()用来注册软中断的处理函数,在软中断向量表中为指定的软中断编号设置处理函数。

同一种软中断的处理函数可以在多个处理器上同时执行,处理函数必须是可以重入的,需要使用锁保护临界区。

触发软中断

函数raise_softirq用来触发软中断,参数是软中断编号

1
void raise_softirq(unsigned int nr);

在已经禁止中断的情况下可以调用函数raise_softirq_irqoff来触发软中断。

1
void raise_softirq_irqoff(unsigned int nr);

函数raise_softirq在当前处理器的待处理软中断位图中为指定的软中断编号设置对应的位,如下所示

1
2
3
4
5
6
7
raise_softirq()->raise_softirq_irqoff()->__raise_softirq_irqoff()
kernel/softirq.c

void __raise_softirq_irqoff(unsigned int nr)
{
or_softirq_pending(1UL << nr);
}

把宏or_softirq_pending展开以后是

1
irq_stat[smp_processor_id()].__softirq_pending |= (1UL << nr);

执行软中断

内核执行软中断的地方如下。

  • 在中断处理程序的后半部分执行软中断,对执行时间有限制:不能超过2毫秒,并且最多执行10次。
  • 每个处理器有一个软中断线程,调度策略是SCHED_NORMAL,优先级是120。
  • 开启软中断的函数local_bh_enable()。

如果开启了强制中断线程化的配置宏CONFIG_IRQ_FORCED_THREADING,并且在引导内核的时候指定内核参数“threadirqs”,那么所有软中断由软中断线程执行。

中断处理程序执行软中断

在中断处理程序的后半部分,调用函数irq_exit()以退出中断上下文,处理软中断,其代码如下

1
2
3
4
5
6
7
8
9
10
kernel/softirq.c
void irq_exit(void)
{
...
preempt_count_sub(HARDIRQ_OFFSET);
if(!in_interrupt() && local_softirq_pending()){
invoke_softirq();
}
...
}

如果in_interrupt()为真,表示在不可屏蔽中断、硬中断或软中断上下文,或者禁止软中断。
如果正在处理的硬中断没有抢占正在执行的软中断,没有禁止软中断,并且当前处理器的待处理软中断位图不是空的,那么调用函数invoke_softirq()来处理软中断。

函数invoke_softirq的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
kernel/softirq.c
static inline void invoke_softirq(void)
{
if(ksoftirqd_running()){
return;
}

if(!force_irqthreads){
__do_softirq();
} else {
wakeup_softirqd();
}
}
  • 如果软中断线程处于就绪状态或运行状态,那么让软中断线程执行软中断。
  • 如果没有强制中断线程化,那么调用函数__do_softirq()执行软中断。
  • 如果强制中断线程化,那么唤醒软中断线程执行软中断。

函数__do_softirq是执行软中断的核心函数,其主要代码如下

  • 把抢占计数器的软中断计数加1。第20行代码,把当前处理器的待处理软中断位图重新设置为0。
  • 开启硬中断。
  • 从低位向高位扫描待处理软中断位图,针对每个设置了对应位的软中断编号,执行软中断的处理函数。
  • 禁止硬中断。
  • 如果软中断的处理函数又触发软中断,处理如下。
    • 如果软中断的执行时间小于2毫秒,不需要重新调度进程,并且软中断的执行次数没超过10,那么跳转到第19行代码继续执行软中断。
    • 唤醒软中断线程执行软中断。
  • 把抢占计数器的软中断计数减1。

软中断线程

每个处理器有一个软中断线程,名称是“ksoftirqd/”后面跟着处理器编号,调度策略是SCHED_NORMAL,优先级是120。
软中断线程的核心函数是run_ksoftirqd(),其代码如下

1
2
3
4
5
6
7
8
9
10
11
12
kernel/softirq.c
static void run_ksoftirqd(unsigned int cpu)
{
local_irq_disabled();
if(local_softirq_pending()){
__do_softirq();
local_irq_enable();
...
return;
}
local_irq_enable();
}

开启软中断时执行软中断

当进程调用函数local_bh_enable()开启软中断的时候,如果是开启最外层的软中断,并且当前处理器的待处理软中断位图不是空的,那么执行软中断

1
2
3
4
5
6
7
8
9
10
11
12
local_bh_enable() -> __local_bh_enable_ip()
kernel/softirq.c
void __local_bh_enable_ip(unsigned long ip, unsigned int cnt)
{
...
preempt_count_sub(cnt - 1);
if(unlikely(!in_interrupt() && local_softirq_pending())){
do_softirq();
}
preempt_count_dec();
...
}

抢占计数器

在介绍“禁止/开启软中断”之前,首先了解一下抢占计数器这个背景知识。每个进程的thread_info结构体有一个抢占计数器:int preempt_count,它用来表示当前进程能不能被抢占。
抢占是指当进程在内核模式下运行的时候可以被其他进程抢占,如果优先级更高的进程处于就绪状态,强行剥夺当前进程的处理器使用权。
但是有时候进程可能在执行一些关键操作,不能被抢占,所以内核设计了抢占计数器。如果抢占计数器为0,表示可以被抢占;如果抢占计数器不为0,表示不能被抢占。
当中断处理程序返回的时候,如果进程在被打断的时候正在内核模式下执行,就会检查抢占计数器是否为0。如果抢占计数器是0,可以让优先级更高的进程抢占当前进程。

虽然抢占计数器不为0意味着禁止抢占,但是内核进一步按照各种场景对抢占计数器的位进行了划分,如图所示。

其中第0~7位是抢占计数,第8~15位是软中断计数,第16~19位是硬中断计数,第20位是不可屏蔽中断(Non Maskable Interrupt, NMI)计数

各种场景分别利用各自的位禁止或开启抢占。

  • 普通场景(PREEMPT_MASK):对应函数preempt_disable()preempt_enable()
  • 软中断场景(SOFTIRQ_MASK):对应函数local_bh_disable()local_bh_enable()
  • 硬中断场景(HARDIRQ_MASK):对应函数 __irq_enter()__irq_exit()
  • 不可屏蔽中断场景(NMI_MASK):对应函数nmi_enter()nmi_exit()

反过来,我们可以通过抢占计数器的值判断当前处在什么场景

in_irq()表示硬中断场景,也就是正在执行硬中断。
in_softirq()表示软中断场景,包括禁止软中断和正在执行软中断。
in_interrupt()表示正在执行不可屏蔽中断、硬中断或软中断,或者禁止软中断。
in_serving_softirq()表示正在执行软中断。
in_nmi()表示不可屏蔽中断场景。
in_task()表示普通场景,也就是进程上下文。

禁止/开启软中断

如果进程和软中断可能访问同一个对象,那么进程和软中断需要互斥,进程需要禁止软中断。
禁止软中断的函数是local_bh_disable(),注意:这个函数只能禁止本处理器的软中断,不能禁止其他处理器的软中断。

该函数把抢占计数器的软中断计数加2,其代码如下

开启软中断的函数是local_bh_enable(),该函数把抢占计数器的软中断计数减2。

为什么禁止软中断的函数local_bh_disable()把抢占计数器的软中断计数加2,而不是加1呢?

目的是区分禁止软中断和正在执行软中断这两种情况。
执行软中断的函数__do_softirq()把抢占计数器的软中断计数加1。
如果软中断计数是奇数,可以确定正在执行软中断。

tasklet

tasklet是基于软中断实现的。

为什么要提供小任务(tasklet)?

因为小任务相对软中断有以下优势。

  • 软中断的种类是编译时静态定义的,在运行时不能添加或删除;小任务可以在运行时添加或删除。
  • 同一种软中断的处理函数可以在多个处理器上同时执行,处理函数必须是可以重入的,需要使用锁保护临界区;
    一个小任务同一时刻只能在一个处理器上执行,不要求处理函数是可以重入的。
    小任务根据优先级分为两种:低优先级小任务和高优先级小任务。

数据结构

小任务的数据结构如下

成员next用来把小任务添加到单向链表中。成员state是小任务的状态,取值如下。

  • 0:小任务没有被调度。
  • (1 << TASKLET_STATE_SCHED):小任务被调度,即将被执行。
  • (1 << TASKLET_STATE_RUN):只在多处理器系统中使用,表示小任务正在执行。

成员count是计数,0表示允许小任务被执行,非零值表示禁止小任务被执行。
成员func是处理函数,成员data是传给处理函数的参数。

每个处理器有两条单向链表:低优先级小任务链表和高优先级小任务链表。

编程接口

定义一个静态的小任务,并且允许小任务被执行,方法如下

1
DECLARE_TASKLET(name, func, data);

定义一个静态的小任务,并且禁止小任务被执行,方法如下

1
DECLARE_TASKLET_DISABLED(name, func, data);

在运行时动态初始化小任务,并且允许小任务被执行,方法如下

1
void tasklet_init(struct tasklet_struct *t, void(*func)(unsigned long), unsigned long data);

函数tasklet_disable()用来禁止小任务被执行,如果小任务正在被执行,该函数等待小任务执行完。

1
void tasklet_disable(struct tasklet_struct *t);

函数tasklet_disable_nosync()用来禁止小任务被执行,如果小任务正在被执行,该函数不会等待小任务执行完。

1
void tasklet_disable_nosync(struct tasklet_struct *t);

函数tasklet_enable()用来允许小任务被执行。

1
void tasklet_enable(struct tasklet_struct *t);

函数tasklet_schedule()用来调度低优先级小任务:把小任务添加到当前处理器的低优先级小任务链表中,并且触发低优先级小任务软中断。

1
void tasklet_schedule(struct tasklet_struct *t);

函数tasklet_hi_schedule()用来调度高优先级小任务:把小任务添加到当前处理器的高优先级小任务链表的尾部,并且触发高优先级小任务软中断。

1
void tasklet_hi_schedule(struct tasklet_struct *t);

函数tasklet_hi_schedule_first()用来调度高优先级小任务:把小任务添加到当前处理器的高优先级小任务链表的首部,并且触发高优先级小任务软中断。

1
void tasklet_hi_schedule_first(struct tasklet_struct *t);

函数tasklet_kill()用来杀死小任务,确保小任务不会被调度和执行。如果小任务正在被执行,该函数等待小任务执行完。通常在卸载内核模块的时候调用该函数。

1
void tasklet_kill(struct tasklet_struct *t);

技术原理

小任务是基于软中断实现的,根据优先级分为两种:低优先级小任务和高优先级小任务。软中断HI_SOFTIRQ执行高优先级小任务,软中断TASKLET_SOFTIRQ执行低优先级小任务。

调度小任务

函数tasklet_schedule()用来调度低优先级小任务,函数tasklet_hi_schedule()用来调度高优先级小任务。
以函数tasklet_schedule()为例说明,其代码如下

如果小任务没有被调度过,那么首先设置调度标志位,然后把小任务添加到当前处理器的低优先级小任务链表的尾部,最后触发软中断TASKLET_SOFTIRQ。

执行小任务

初始化的时候,把软中断TASKLET_SOFTIRQ的处理函数注册为函数tasklet_action,把软中断HI_SOFTIRQ的处理函数注册为函数tasklet_hi_action

以函数tasklet_action()为例说明,其代码如下


  • 把当前处理器的低优先级小任务链表中的所有小任务移到临时链表list中。
  • 遍历临时链表list,依次处理每个小任务,如下。
    • 尝试锁住小任务,确保一个小任务同一时刻只在一个处理器上执行。
    • 如果小任务的计数为0,表示允许小任务被执行。
    • 清除小任务的调度标志位,其他处理器可以调度这个小任务,但是不能执行这个小任务。
    • 执行小任务的处理函数。
    • 释放小任务的锁,其他处理器就可以执行这个小任务了。
    • 如果尝试锁住小任务失败(表示小任务正在其他处理器上执行),或者禁止小任务被执行,那么把小任务重新添加到当前处理器的低优先级小任务链表的尾部,然后触发软中断TASKLET_SOFTIRQ。

工作队列

工作队列(work queue)是使用内核线程异步执行函数的通用机制。

工作队列是中断处理程序的一种下半部机制,中断处理程序可以把耗时比较长并且可能睡眠的函数交给工作队列执行。
工作队列不完全是中断处理程序的下半部。内核的很多模块需要异步执行函数,这些模块可以创建一个内核线程来异步执行函数。
但是如果每个模块都创建自己的内核线程,会造成内核线程的数量过多,内存消耗比较大,影响系统性能。
所以最好的方法是提供一种通用机制,让这些模块把需要异步执行的函数交给工作队列执行,共享内核线程,节省资源。

编程接口

内核使用工作项保存需要异步执行的函数,工作项的数据类型是work_struct,需要异步执行的函数的原型如下所示

1
typedef void (*work_func_t)(struct work_struct *work);

有一类工作项称为延迟工作项,数据类型是delayed_work。把延迟工作项添加到工作队列中的时候,延迟一段时间才会真正地把工作项添加到工作队列中。延迟工作项是工作项和定时器的结合,可以避免使用者自己创建定时器。

我们可以使用内核定义的工作队列,也可以自己创建专用的工作队列。内核定义了以下工作队列

  • system_wq:如果工作项的执行时间比较短,应该使用这个工作队列。早期的内核版本只提供了这个工作队列,称为全局工作队列,函数schedule_work()和schedule_delayed_work()使用这个工作队列。
  • system_highpri_wq:高优先级的工作队列。
  • system_long_wq:如果工作项的执行时间比较长,应该使用这个工作队列。
  • system_unbound_wq:这个工作队列使用的内核线程不绑定到某个特定的处理器。
  • system_freezable_wq:这个工作队列可以冻结。
  • system_power_efficient_wq:如果开启了工作队列模块的参数“wq_power_efficient”,那么这个工作队列倾向于省电;否则和system_wq相同。
  • system_freezable_power_efficient_wq:这个工作队列和system_power_efficient_wq的区别是可以冻结。

定义工作项

定义一个静态的工作项,参数n是变量名称,参数f是工作项的处理函数。

1
DECLARE_WORK(n, f);

定义一个静态的延迟工作项,参数n是变量名称,参数f是工作项的处理函数。

1
DECLARE_DELAYED_WORK(n, f);

使用DECLARE_DEFERRABLE_WORK(n, f)也可以定义一个静态的延迟工作项,和DECLARE_DELAYED_WORK()的区别是它使用可推迟的定时器(deferrable timer)。
可推迟的定时器在系统忙的时候工作正常,但是在处理器空闲的时候不会处理可推迟的定时器。当一个不可推迟的定时器唤醒处理器的时候,才会处理可推迟的定时器。

在运行时动态初始化工作项,方法如下

  • INIT_WORK(_work, _func):初始化一个工作项,参数_work是工作项的地址,参数_func是需要异步执行的函数。
  • INIT_WORK_ONSTACK(_work, _func):初始化一个工作项,工作项是栈里面的局部变量,参数_work是工作项的地址,参数_func是需要异步执行的函数。
  • INIT_DELAYED_WORK(_work, _func):初始化一个延迟工作项,参数_work是延迟工作项的地址,参数_func是需要异步执行的函数。
  • INIT_DELAYED_WORK_ONSTACK(_work, _func):初始化一个延迟工作项,延迟工作项是栈里面的局部变量,参数_work是延迟工作项的地址,参数_func是需要异步执行的函数。
  • INIT_DEFERRABLE_WORK(_work, _func):初始化一个延迟工作项,和INIT_DELAYED_WORK()的区别是它使用可推迟的定时器。

全局工作队列

在全局工作队列中添加一个工作项

1
bool schedule_work(struct work_struct *work);

在全局工作队列中添加一个工作项,并且指定执行工作项的处理器

1
bool schedule_work_on(int cpu, struct work_struct* work);

在全局工作队列中添加一个延迟工作项,参数delay是把工作项添加到工作队列中之前等待的时间,单位是嘀嗒(tick)。

1
bool schedule_delayed_work(struct delayed_work *dwork, unsigned long delay);

在全局工作队列中添加一个延迟工作项,并且指定执行工作项的处理器。

1
bool schedule_delayed_work_on(int cpu, struct delayed_work *dwork, unsigned long delay);

冲刷全局工作队列,确保全局工作队列中的所有工作项执行完。

1
void flush_scheduled_work(void);

专用工作队列

分配工作队列的函数是

1
alloc_workqueue(fmt, flags, max_active, args...)
  • 参数fmt是工作队列名称的格式。
  • 参数fags是标志位,可以是0,也可以是下面这些标志位的组合。
    • WQ_UNBOUND:处理工作项的内核线程不绑定到任何特定的处理器。
    • WQ_FREEZABLE:在系统挂起的时候冻结。
    • WQ_MEM_RECLAIM:在内存回收的时候可能使用这个工作队列。
    • WQ_HIGHPRI:高优先级。❑ WQ_CPU_INTENSIVE:处理器密集型。
    • WQ_POWER_EFFICIENT:省电。
  • 参数max_active是每个处理器可以同时执行的工作项的最大数量,0表示使用默认值。
  • 参数args是传给参数fmt的参数。

下面的函数用来分配一个有序的工作队列。有序的工作队列在任何时刻,按照入队的顺序只执行一个工作项

1
alloc_ordered_workqueue(fmt, flags, args...)

旧版本的创建工作队列的函数create_workqueue()create_freezable_workqueue()create_singlethread_workqueue()已经被废弃。
在指定的工作队列中添加一个工作项。

1
bool queue_work(struct workqueue_struct *wq, struct work_struct *work);

在指定的工作队列中添加一个工作项,并且指定执行工作项的处理器

1
bool queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work);

在指定的工作队列中添加一个延迟工作项,参数delay是把工作项添加到工作队列中之前等待的时间,单位是嘀嗒(tick)。

1
bool queue_delayed_work(struct workqueue_struct *wq, struct delayed_work *dwork, unsigned long delay);

在指定的工作队列中添加一个延迟工作项,并且指定执行工作项的处理器。

1
bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq, struct delayed_work *work, unsigned long delay);

冲刷工作队列,确保工作队列中的所有工作项执行完。

1
void flush_workqueue(struct workqueue_struct *wq);

销毁工作队列的函数是

1
void destroy_workqueue(struct workqueue_struct *wq);

其他编程接口

取消一个工作项

1
bool cancel_work(struct work_struct *work);

取消一个工作项,并且等待取消操作执行完

1
bool cancel_work_sync(struct work_struct *work);

取消一个延迟工作项

1
bool cancel_delayed_work(struct delayed_work *dwork);

取消一个延迟工作项,并且等待取消操作执行完

1
bool cancel_delayed_work_sync(struct delayed_work *dwork);

等待一个工作项执行完

1
bool flush_work(struct work_struct *work);

等待一个延迟工作项执行完

1
bool flush_delayed_work(struct delayed_work *dwork);

技术原理

首先介绍一下工作队列使用的术语。

  • work:工作,也称为工作项。
  • work queue:工作队列,就是工作的集合,work queue和work是一对多的关系。
  • worker:工人,一个工人对应一个内核线程,我们把工人对应的内核线程称为工人线程。
  • worker_pool:工人池,就是工人的集合,工人池和工人是一对多的关系。
  • pool_workqueue:中介,负责建立工作队列和工人池之间的关系。工作队列和pool_workqueue是一对多的关系,pool_workqueue和工人池是一对一的关系。

数据结构

工作队列分为两种。

  • 绑定处理器的工作队列:默认创建绑定处理器的工作队列,每个工人线程绑定到一个处理器。
  • 不绑定处理器的工作队列:创建工作队列的时候需要指定标志位WQ_UNBOUND,工人线程不绑定到某个处理器,可以在处理器之间迁移。

绑定处理器的工作队列的数据结构如图所示,工作队列在每个处理器上有一个pool_workqueue实例,一个pool_workqueue实例对应一个工人池,一个工人池有一条工人链表,每个工人对应一个内核线程。
向工作队列中添加工作项的时候,选择当前处理器的pool_workqueue实例、工人池和工人线程。

不绑定处理器的工作队列的数据结构如图所示,工作队列在每个内存节点上有一个pool_workqueue实例,一个pool_workqueue实例对应一个工人池,一个工人池有一条工人链表,每个工人对应一个内核线程。向工作队列中添加工作项的时候,选择当前处理器所属的内存节点的pool_workqueue实例、工人池和工人线程。

不绑定处理器的工作队列还有一个默认的pool_workqueue实例(workqueue_struct.df_pwq),当某个处理器下线的时候,使用默认的pool_workqueue实例。

工作项负责保存需要异步执行的函数,数据类型是work_struct

成员func是需要异步执行的函数,成员data是传给函数func的参数。
成员entry用来把工作项添加到链表中。
延迟工作项是工作项和定时器的结合,数据类型是delayed_work。
把延迟工作项添加到工作队列中的时候,延迟一段时间才会真正地把工作项添加到工作队列中。

添加工作项

函数queue_work()用来向工作队列中添加一个工作项,把主要工作委托给函数queue_work_on(),把第一个参数“int cpu”设置为WORK_CPU_UNBOUND,意思是不绑定到任何处理器,优先选择当前处理器。

函数queue_work_on的代码如下

如果工作项没有添加过,那么给工作项设置标志位WORK_STRUCT_PENDING_BIT,然后把主要工作委托给函数queue_work()。
函数
queue_work的代码如下



  • 从工作队列中选择pool_workqueue实例。如果是绑定处理器的工作队列,那么选择当前处理器的pool_workqueue实例;如果是不绑定处理器的工作队列,那么选择当前处理器所属的内存节点的pool_workqueue实例。
  • 如果工作正在被其他pool_workqueue实例的工人执行,那么还是把工作添加到那个pool_workqueue实例。
  • 如果pool_workqueue实例的未处理工作数量小于限制,那么把工作添加到pool_workqueue实例对应的工人池的链表worklist中;如果pool_workqueue实例的未处理工作数量达到限制,那么给工作设置标志位WORK_STRUCT_DELAYED,并且把工作添加到pool_workqueue实例的链表delayed_works中。
  • 把工作添加到选择的链表中。

工人处理工作

每个工人对应一个内核线程,一个工人池对应一个或多个工人。
多个工人从工人池的未处理工作链表(worker_pool.worklist)中取工作并处理。
工人线程的处理函数是worker_thread(),调用函数process_one_work()处理一个工作项。

函数worker_thread()的代码如下



  • 如果工人太多,想要减少工人的数量,那么当前工人线程退出。
  • 工人退出空闲状态。
  • 如果不需要本工人执行工作,那么本工人进入空闲状态。
  • 如果工人池中没有空闲的工人,那么创建一些工人备用。
  • 从工人池的链表worklist中取一个工作。
  • 如果是正常工作,那么调用函数process_one_work()执行正常工作,然后执行工人的链表scheduled中的特殊工作。
  • 如果是特殊工作,那么首先把工作添加到工人的链表scheduled的尾部,然后执行工人的链表scheduled中的特殊工作。
  • 如果有工作需要处理,并且处于运行状态的工人数量不超过1,那么本工人继续执行工作。
  • 工人进入空闲状态,睡眠。

下面解释一下正常工作和特殊工作。

向工作队列中添加正常工作,是直接添加到工人池的链表worklist中。
调用函数fush_work(t)等待工作t执行完,实现方法是添加一个特殊工作:屏障工作,执行这个屏障工作的时候就可以确定工作t执行完。如果工作t正在被工人p执行,那么把屏障工作直接添加到工人p的链表scheduled中;如果工作t没有执行,那么把屏障工作添加到工人池的链表worklist中,并且给屏障工作设置标志位WORK_STRUCT_LINKED。

函数process_one_work()负责处理一个工作,其代码如下



  • 一个工作不应该被多个工人并发执行。如果一个工作正在被工人池的其他工人执行,那么把这个工作添加到这个工人的链表scheduled中延后执行。
  • 把工人添加到工人池的散列表busy_hash中。
  • 工人的成员current_work指向当前工作,成员current_func指向当前工作的处理函数,成员current_pwq指向当前pool_workqueue实例。
  • 如果工作队列是处理器密集型的,那么给工人设置标志位WORKER_CPU_INTENSIVE,工人不再被工人池动态调度。
  • 对于不绑定处理器或处理器密集型的工作队列,唤醒更多空闲的工人处理工作。
  • 执行工作的处理函数。

工人池动态管理工人

工人池可以动态增加和删除工人,算法如下

  • 工人有3种状态:空闲(idle)、运行(running)和挂起(suspend)。空闲是指没有执行工作,运行是指正在执行工作,挂起是指在执行工作的过程中睡眠。
  • 如果工人池中有工作需要处理,至少保持一个处在运行状态的工人来处理。
  • 如果处在运行状态的工人在执行工作的过程中进入挂起状态,为了保证其他工作的执行,需要唤醒空闲的工人处理工作。
  • 如果有工作需要执行,并且处在运行状态的工人数量大于1,会让多余的工人进入空闲状态。
  • 如果没有工作需要执行,会让所有工人进入空闲状态。
  • 如果创建的工人过多,工人池把空闲时间超过300秒(IDLE_WORKER_TIMEOUT)的工人删除。

为了跟踪工人的运行和挂起状态、动态调整工人的数量,工作队列使用在进程调度中加钩子函数的技巧。

  • 跟踪工人从挂起进入运行状态。唤醒工人线程的时候,如果工人线程正在执行工作的过程中,那么把工人池中处在运行状态的工人计数(nr_running)加1。
    相关代码如下
  • 跟踪工人从运行进入挂起状态。当一个工人睡眠的时候,如果工人池没有工人处于运行状态,并且工人池有工作需要执行,那么唤醒一个空闲的工人。
    相关代码如下

工人池的调度思想是如果有工作需要处理,保持一个处在运行状态的工人来处理,不多也不少。
这种做法有个问题:如果工作是处理器密集型的,虽然工人没有进入挂起状态,但是会长时间占用处理器,让后续的工作阻塞太长时间。
为了解决这个问题,可以在创建工作队列的时候设置标志位WQ_CPU_INTENSIVE,声明工作队列是处理器密集的。当一个工人执行工作的时候,让这个工人不受工人池动态调度,像是进入了挂起状态,工人池创建新的工人来执行后续的工作。

工人线程对处理器密集的特殊处理如下


可以看到,给工人设置标志位WORKER_CPU_INTENSIVE的时候,把工人池的计数nr_running减1,相当于工人进入挂起状态。





系统调用

系统调用是内核给用户程序提供的编程接口。用户程序调用系统调用,通常使用glibc库针对单个系统调用封装的函数。
如果glibc库没有针对某个系统调用封装函数,用户程序可以使用通用的封装函数syscall():

参数number是系统调用号,后面是传递给系统调用的参数。返回值0表示成功,返回值−1表示错误,错误号存储在变量errno中。

例如应用程序使用系统调用fork()创建子进程,有两种调用方法。

  • ret = fork();
  • ret = syscall(SYS_fork);

ARM64处理器提供的系统调用指令是svc,调用约定如下。

  • 64位应用程序使用寄存器x8传递系统调用号,32位应用程序使用寄存器x7传递系统调用号。
  • 使用寄存器x0~x6最多可以传递7个参数。
  • 当系统调用执行完的时候,使用寄存器x0存放返回值。

定义系统调用

Linux内核使用宏SYSCALL_DEFINE定义系统调用,以创建子进程的系统调用fork为例:

把宏“SYSCALL_DEFINE0(fork)”展开以后是:

1
asmlinkage long sys_fork(void)

“SYSCALL_DEFINE”后面的数字表示系统调用的参数个数,“SYSCALL_DEFINE0”表示系统调用没有参数,“SYSCALL_DEFINE6”表示系统调用有6个参数,如果参数超过6个,使用宏“SYSCALL_DEFINEx”。头文件“include/linux/syscalls.h”定义了这些宏。

“asmlinkage”表示这个C语言函数可以被汇编代码调用。如果使用C++编译器,“asmlinkage”被定义为extern “C”;如果使用C编译器,“asmlinkage”是空的宏。

系统调用的函数名称以“sys_”开头。
需要在系统调用表中保存系统调用号和处理函数的映射关系,ARM64架构定义的系统调用表sys_call_table如下:

arch/arm64/kernel/sys.c

对于ARM64架构,头文件“asm/unistd.h”是“arch/arm64/include/asm/unistd.h”。

执行系统调用

ARM64处理器把系统调用划分到同步异常,在异常级别1的异常向量表中,系统调用的入口有两个:

  • 如果64位应用程序执行系统调用指令svc,系统调用的入口是el0_sync。
  • 如果32位应用程序执行系统调用指令svc,系统调用的入口是el0_sync_compat。

el0_sync的代码如下

  • 把当前进程的寄存器值保存在内核栈中。
  • 读取异常症状寄存器esr_el1。
  • 解析出异常症状寄存器的异常类别字段。
  • 如果异常类别是系统调用,跳转到el0_svc。

el0_svc负责执行系统调用,其代码如下


  • 把寄存器x27设置为系统调用表sys_call_table的起始地址。
  • 把寄存器x26设置为系统调用号。64位进程使用寄存器x8传递系统调用号,w8是寄存器x8的32位形式。
  • 把寄存器x25设置为系统调用的数量,也就是(最大的系统调用号+1)。
  • 把寄存器x0和x8的值保存到内核栈中,x0存放系统调用的第一个参数,x8存放系统调用号。
  • 开启调试异常和中断。
  • 如果使用ptrace跟踪系统调用,跳转到__sys_trace处理。
  • 如果进程传递的系统调用号等于或大于系统调用的数量,即大于最大的系统调用号,那么是非法值,跳转到ni_sys处理错误。
  • 计算出系统调用号对应的表项地址(sys_call_table + 系统调用号 * 8),然后取出处理函数的地址。
  • 调用系统调用号对应的处理函数。
  • 从系统调用返回用户空间。

ret_fast_syscall从系统调用返回用户空间,其代码如下


  • 禁止中断。
  • 寄存器x0已经存放了处理函数的返回值,把保存在内核栈中的寄存器x0的值更新为返回值。
  • 如果使用ptrace跟踪系统调用,跳转到ret_fast_syscall_trace处理。
  • 如果进程的thread_info.fags设置了需要重新调度(_TIF_NEED_RESCHED)或者有信号需要处理(_TIF_SIGPENDING)等标志位,跳转到work_pending处理。
  • 如果使用系统调用ptrace设置了软件单步执行,那么开启单步执行。
  • 使用保存在内核栈中的寄存器值恢复寄存器,从内核模式返回用户模式。

work_pending调用函数do_notify_resume,函数do_notify_resume的代码如下

  • 如果当前进程的thread_info.fags设置了标志位_TIF_NEED_RESCHED,那么调度进程。
  • 如果设置了标志位_TIF_UPROBE,调用函数uprobe_notify_resume()处理。uprobes(user-space probes,用户空间探测器)可以在进程的任何指令地址插入探测器,收集调试和性能信息,发现性能问题。需要内核支持,编译内核时开启配置宏CONFIG_UPROBE_EVENTS。
  • 如果设置了标志位_TIF_SIGPENDING,调用函数do_signal()处理信号。
  • 如果设置了标志位_TIF_NOTIFY_RESUME,那么调用函数tracehook_notify_resume(),执行返回用户模式之前的回调函数。
  • 如果设置了标志位_TIF_FOREIGN_FPSTATE,那么恢复浮点寄存器。




中断细节

外部设备与中央处理器交互一般有两种手段:轮询和中断。
对于轮询要求处理器不停地查询外设的状态,在此期间处理器不能做别的事情。
而中断不要求处理器不停地查询自己的状态,而是在自己的状态满足处理器的要求时主动发送一个硬件信号给处理器,后者在接收到这一信号时,会挂起当前正在执行的任务转而去处理外设的中断信号。

现代设备绝大多数采用中断的方式与处理器进行沟通,因此设备驱动程序必须能够支持设备的中断特性。
处理器在中断到达时会根据不同的中断号找到对应的处理函数对该信号进行处理,这些处理函数称为中断处理例程ISR(Interrupt Service Routine),设备驱动程序负责为管理的设备提供中断处理例程并向系统注册。
从设备发出中断信号到处理器最终调用中断处理例程进行处理,期间会经过很多步骤,这个过程构成了中断处理框架。

不同的操作系统对中断处理框架的设计不尽相同,但是要达到的目的是一样的,那就是最终调用设备的中断处理例程。
将先描述Linux系统下的中断处理框架设计,然后在此基础上讨论设备驱动程序如何利用内核提供的接口函数向系统挂载中断处理例程,最后讨论中断上下文的相关内容,包括软中断等。

中断的硬件框架

处理器一般只有两根左右的中断引脚,而管理的外设却很多。
为了解决这个问题,现代设备的中断信号线并不是与处理器直接相连,而是与一个称为中断控制器的设备相连接,后者才跟处理器的中断引脚直接连接。
中断控制器一般可以通过处理器进行编程配置,所以常称为可编程中断控制器PIC

PIC的输出中断信号线连接到处理器的INT引脚上,这是处理器专门用来接收中断信号的pin脚。
外部设备的中断线连接到PIC的pin引脚上,这是PIC用来接收外设中断的pin脚,比如外部设备1的中断线通过P0连到PIC上。

在实际的硬件平台上,PIC有的在CPU外部,比如x86平台上的8259控制器;有的被封装到了CPU的内部,这广泛见于嵌入式领域,一颗SoC芯片内部集成了处理器和各种外部设备的控制器,其中包括PIC。
中断方面的内容常常涉及硬件平台的差别,但是这里不会纠结于某个具体的硬件设计,而是希望相关的内容可以很快被读者吸纳到自己手边的平台上。

PIC与软件中断号

实际使用中在处理器能处理外部设备的中断前,常常需要对PIC进行配置,配置工作常常作为操作系统初始化任务的一部分。
当然中断处理框架也需要提供适当的PIC配置接口函数供设备驱动程序调用,因为设备驱动所管理的设备也许并不是一开始就连接到PIC的某一中断引脚上的。
如果在系统运行起来之后,某一外设才被用户接入系统,那么它的驱动程序应该负责配置PIC的对应引脚,使该外设能正常中断处理器。

对PIC的配置工作主要包括:

  • 设定外部设备中断触发电信号的类型,常见的触发类型有水平触发和边沿触发。
  • 将外设的中断引脚编号映射到处理器可见的软件中断号irq。
  • 屏蔽掉某些外部设备的中断触发。

为了让处理器可以配置自己,PIC常常需要提供一系列的控制寄存器。这些控制寄存器可以完成上述所有配置工作,并且配置粒度可以细分到PIC的每一个中断输入引脚P。
此处一个需要明确定义的概念是软件中断号irq,它是发生设备中断时处理器从PIC中读到的中断号码,在操作系统建立的中断处理框架内,会使用这个irq号来标识一个外设的中断并调用对应的中断处理例程。
作为描述的示例,外部设备0触发的一个中断电信号被处理的大体流程。

PIC将首先接收到该信号,如果它没有被屏蔽,那么PIC应该在INT引脚上产生一个中断信号告诉处理器。
后者在接收到该信号后会从PIC那里得到一个特定的标识号码,该号码告诉中断处理框架,是设备0发生了中断。
于是中断处理框架会调用设备0的中断处理例程,此处的这个特定的标识设备0的中断号码就称为软件中断号irq或者中断号irq。

此处还有一个概念需要提一下,那就是中断向量表(vector table)。
这其实是处理器内部的一个概念,因为处理器除了会被外部设备中断,其内部也可能会产生异常等事件。
当这些事发生时,CPU必须暂停当前的工作,转而去处理中断或者异常,因此处理器需要知道到哪里去获得这些中断或异常的处理函数的目标地址。
中断向量表就用来解决这个问题,其每一项都是一个中断或异常处理函数的入口地址。
外部设备的中断常常对应向量表中的某一项,这是个通用的外部中断处理函数的入口,因此在进入通用的中断处理函数之后,系统必须要知道正在处理的中断是哪一个设备产生的,而这正是由前面提到的软件中断号irq决定的。
中断向量表中的内容由操作系统在初始化阶段来填写,对于外部中断,操作系统负责实现一个通用的外部中断处理函数,然后把这个函数的入口地址放到中断向量表中的对应位置。

通用的中断处理函数

当有外部中断发生时,预先设计好的处理器硬件逻辑往往会做一些特定的动作,为从软件层面发起的中断处理做准备工作。
不同的处理器有不同的逻辑设计,但这些动作常常包括把当前任务的上下文寄存器保存在一个特定的中断栈中,屏蔽掉处理器响应外部中断的能力等。
在这些动作的结束部分,硬件逻辑根据中断向量表中的外部中断对应的入口地址,开始调用由操作系统提供的通用中断处理函数。

不同的架构平台上通用中断处理函数的实现也不尽相同,但在开始部分都会设法从PIC中得到导致本次中断发生的外部设备对应的软件中断号irq,这部分代码通常都是用汇编语言实现,在Linux源码树中散落在各个特定架构对应的目录中。
然后通用处理函数开始调用一个C函数,大部分平台上这个C函数的名字是do_IRQ,但也有例外比如ARM平台上是asm_do_IRQ,本书采用do_IRQ来指代该C函数的名称。

中断处理的绝大部分流程都浓缩在了这个C函数当中,当这个函数返回时,通用中断处理函数余下部分的代码将完成中断现场恢复的工作,这也标志着整个中断处理流程的结束:被中断的任务开始继续执行,仿佛中断根本没有发生过一样。
通常处理器在接收到外部的中断信号时,硬件逻辑会自动屏蔽处理器响应外部中断的能力,因此如果操作系统实现的中断处理框架不主动打开中断的话,整个中断处理的流程是在中断关闭的情况下进行的。
因为各个设备的中断处理函数一般是由驱动程序实现的,内核无法保证这些中断处理函数执行时间的长短,如果某一中断处理函数执行时间过长,则将会导致系统可能很长时间无法接收中断,这可能会使某些外部设备丢失数据或者操作系统响应时间变长等。
为了解决这一问题,Linux内核为驱动程序提供的中断处理机制分成了两个部分:HARDIRQ和SOFTIRQ。
前者是在中断关闭的情况下执行,用来完成中断发生后最关键的操作,它的执行时间应该尽可能短。
后者是在中断开启的情况下进行,此时外部设备仍可以继续中断处理器,驱动程序因此可以将一些比较耗时的工作延迟到这部分执行。
在do_IRQ函数中,对irq_enter的调用可以认为是HARDIRQ部分的开始,而SOFTIRQ则在irq_exit中完成。

do_IRQ函数

do_IRQ函数从通用中断处理函数中发起,负责整个中断处理流程中实质性的中断处理任务。
虽然该函数在各个平台上的实现代码不尽相同,但是原理基本上大同小异,一个典型的实现如下:

1
2
3
4
5
6
7
8
9
void__irq_entry  do_IRQ(unsigned int irq,struct pt_regs*regs)
{
struct pt_regs *old_regs = set_irq_regs(regs);
irq_enter();
check_stack_overflow();
generic_handle_irq(irq);
irq_exit();
set_irq_regs(old_regs);
}

先看该函数的两个参数,irq是该函数的调用者——通用中断处理函数从PIC中得到的软件中断号,regs是保存下来的被中断任务的执行现场,不同的处理器有不同的执行现场,也就是有不同的寄存器。
函数首先调用set_irq_regs将一个per-CPU型的指针变量__irq_regs保存到old_regs中,然后将__irq_regs赋予了一个新值regs,这样中断处理过程中,系统中的每一个CPU都可以通过__irq_regs来访问系统保存的中断现场。
在函数的结束,调用set_irq_regs(old_regs)来恢复__irq_regs__irq_regs一般用来在调试或者诊断时打印当前栈的信息,也可以通过这些保存的中断现场寄存器判断出被中断的进程当时运行在用户态还是内核态。

接下来irq_enter会更新系统中的一些统计量,同时会把当前栈中的preempt_count变量加上HARDIRQ_OFFSET来标识一个HARDIRQ中断上下文:preempt_count() +=HARDIRQ_OFFSET
HARDIRQ是Linux下对中断处理上半部分的称谓,与之对应的是中断处理的下半部分SOFTIRQ,此处irq_enter告诉系统现在进入了中断处理的上半部分。
与irq_enter行为配对的是irq_exit,在当前中断处理完成准备退出时调用,除了更新一些系统统计量和清除中断上下文的标识外,它还有一个重要的功能是处理软中断,也就是中断处理的下半部分

check_stack_overflow()函数用来检查当前中断是否会导致栈的溢出,因为每次中断发生时系统都会做保护现场的动作,从代码的层面,就是将系统的寄存器压入中断栈中。
理想情况下一个中断处理结束时将恢复现场,也就是将之前在栈中保存的寄存器弹出堆栈,因此不会发生栈溢出的情况。
但是如果中断处理函数中打开了处理器响应外部中断的能力,那就有可能在当前中断正在被处理时,处理器又接收到了新的中断,也就是所谓的中断嵌套,这将导致系统重复地进行中断现场保护的动作,甚至发生大量的中断嵌套行为,使得栈不断增长,从而出现堆栈的溢出,影响到系统的稳定性。
为此系统使用check_stack_overflow函数来对栈是否溢出进行检查,如果发现本次中断有可能导致栈的溢出,通常会打印出当前栈的信息(dump_stack),对于某些启用了watchdog的系统,也可能会强制系统进行reset动作。

do_IRQ的核心是调用generic_handle_irq函数,后者在其函数调度链中负责对当前发生的中断进行实际的处理:

1
2
3
4
5
6
<include/linux/irq.h>
static inline void generic_handle_irq(unsigned int irq)
{
struct irq_desc *desc = &irq_desc[irq];
desc->handle_irq(irq, desc);
}

函数通过软件中断号irq来索引数组irq_desc,得到一个struct irq_desc类型的指针变量desc,然后调用其成员函数handle_irq对当前中断进行实际的处理。
irq_desc是个struct irq_desc类型的数组,在Linux的整个中断处理框架中非常重要,起着沟通从通用的中断处理函数到设备特定的中断处理例程之间的桥梁作用,图展示了该数组的组成结构:

通常其定义和默认值如下

1
2
3
4
5
6
7
8
<kernel/irq/irqdesc.c>
struct irq_desc irq_desc[NR_IRQS] __cacheline_aligned_in_smp = {
[0 ... NR_IRQS-1] = {
.handle_irq =handle_bad_irq,
.depth =1,
.lock =__RAW_SPIN_LOCK_UNLOCKED(irq_desc->lock),
}
};

NR_IRQS是个平台相关的常量,用来表示特定的平台上可以处理的外部中断的数量。
Linux操作系统初始化期间通过调用early_irq_init函数来对这个数组初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<kernel/irq/irqdesc.c>
nt __init early_irq_init(void)
{
int count, i, node = first_online_node;
struct irq_desc *desc;
init_irq_default_affinity();
printk(KERN_INFO "NR_IRQS:%d\n", NR_IRQS);
desc = irq_desc;
count = ARRAY_SIZE(irq_desc);
for (i = 0; i < count; i++) {
desc[i].irq_data.irq = i;
desc[i].irq_data.chip = &no_irq_chip;
desc[i].kstat_irqs = alloc_percpu(unsigned int);
irq_settings_clr_and_set(desc, ~0, _IRQ_DEFAULT_INIT_FLAGS);
alloc_masks(desc + i, GFP_KERNEL, node);
desc_smp_init(desc + i, node);
lockdep_set_class(&desc[i].lock, &irq_desc_lock_class);
}
return arch_early_irq_init();
}

irq_desc

数组的类型struct irq_desc是个非常重要的数据结构, 定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<include/linux/irqdesc.h>
struct irq_desc {
struct irq_data irq_data; struct timer_rand_state*timer_rand_state;
unsigned int__percpu *kstat_irqs;irq_flow_handler_t handle_irq;
#ifdef CONFIG_IRQ_PREFLOW_FASTEOI
irq_preflow_handler_t preflow_handler;
#endif
struct irqaction *action; /*IRQ action list*/
unsigned int status_use_accessors;
unsigned int istate;unsigned int depth; /*nested irq disables*/
unsigned int wake_depth; /*nested wake enables*/
unsigned int irq_count; /*For detecting broken IRQs*/
unsigned long last_unhandled; /*Aging timer for unhandled count*/
unsigned int irqs_unhandled;
raw_spinlock_t lock;
#ifdef CONFIG_SMP
const struct cpumask *affinity_hint;
struct irq_affinity_notify *affinity_notify;
#ifdef CONFIG_GENERIC_PENDING_IRQ
cpumask_var_t pending_mask;
#endif
#endif
unsigned long threads_oneshot;
atomic_t threads_active;
wait_queue_head_t wait_for_threads;
#ifdef CONFIG_PROC_FS
struct proc_dir_entry *dir;
#endif
const char *name;
} ____cacheline_internodealigned_in_smp;

主要用来保存软件中断号irq和chip相关的数据

irq_data

1
2
3
4
5
6
7
8
9
10
11
12
13
<include/linux/irqdesc.h>
struct irq_data {
unsigned int irq;
unsigned int node;
unsigned int state_use_accessors;
struct irq_chip *chip;
void *handler_data;
void *chip_data;
struct msi_desc *msi_desc;
#ifdef CONFIG_SMP
cpumask_var_t affinity;
#endif
};

成员irq代表软件中断号。
chip成员则代表着当前中断来自的PIC,chip所在的数据结构是在软件层面对PIC的一个抽象。
Linux通过封装在struct irq_data中的chip来屏蔽各种不同硬件平台上PIC的差异,给上层的软件提供统一的对PIC操作的接口。
利用PIC中封装的函数,可以屏蔽或启用当前中断,设定外部设备中断触发电信号的类型等。

kstat_irqs

1
unsigned int __percpu *kstat_irqs

一个per-CPU型成员,用于系统的中断统计计数。

handle_irq

1
irq_flow_handler_t handle_irq

这是个函数指针,一般用来指向一个跟当前设备中断触发电信号类型相关的函数。
比如如果外部设备的中断电信号是边沿触发,那么此处handle_irq将指向一个边沿触发类的处理函数;
如果是水平触发那么将指向一个水平触发类的处理函数。

如果在某一平台上边沿触发的中断和水平触发的中断处理起来完全相同,那么就没有必要如此细分,提供一个常规的处理函数就可以了。
在handle_irq指向的函数内部,才会调用设备特定的中断服务例程。
特定平台的Linux系统在初始化阶段会提供handle_irq的具体实现,这是内核设计者或者嵌入式平台BSP模块所承担的任务,设备驱动程序员在这一层面通常没有什么工作要做。

action

1
struct irqaction *action

action是针对某一具体设备的中断处理的抽象。设备驱动程序会通过request_irq来向其中挂载设备特定的中断处理函数,相对于前面提到的通用中断处理函数,本称action中的handler为设备中断服务例程ISR
通过前面讨论的handle_irq和action,可以看到从通用中断处理函数发起的对某一中断处理,实际上又被划分成了两个层次,第一层是handle_irq函数,它与软件中断号irq一一对应,代表了对IRQ line上的处理动作,而action则代表着与具体设备相关的中断处理,也是设备驱动程序员要直接与之打交道的对象,通过action成员,可以在一条IRQ line上挂载多个设备,换句话说多个设备可以通过同一条IRQ line来共享同一个软件中断号irq,形成所谓的中断链,所以可以推想到action中必然有构成链表的成员

图展示了hangle_irq与action的层次关系

通过上面的讨论,为使读者对Linux下的中断处理流程有个全局性的直观印象

从图中可以看到,Linux内核将中断的处理分成了两大部分,分别是HARDIRQ和SOFTIRQ,前者一般是在处理器屏蔽外部中断的情况下工作,而后者在工作前会启用处理器响应外部中断的能力。
通用中断处理函数是外部设备的中断到达处理器后,处理器首先进入的函数,在完成必要的工作后,调用do_IRQ来对中断进行实际的处理。后者通过引发本次中断的软件中断号来索引irq_desc数组,找到对应的处理函数并调用,而设备驱动程序等内核模块则通过修改irq_desc数组中对应项的action成员来达到安装或卸载设备中断处理服务例程ISR的目的。
设备的中断处理函数调用结束后,中断流程进入SOFTIRQ部分,在这里如果有等待的softirq需要处理,则处理之,否则返回到通用中断处理函数。

struct irq_chip

数据结构struct irq_data中的struct irq_chip *chip成员用来表示一个PIC的对象,如果系统中只有一个PIC,那么irq_desc数组的每一项中的chip都应该指向该PIC的对象。
平台的初始化函数负责实现该平台使用的PIC的对象并将其安装到irq_desc数组中。
PIC对象用来实现对PIC的配置,配置工作主要包括设定外部设备的中断触发信号的类型,屏蔽或者启用某一设备的中断信号,向发出中断请求的设备发送中断响应信号等。

struct irq_chip定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<include/linux/irq.h>
struct irq_chip {
const char *name;
unsigned int (*irq_startup)(struct irq_data *data);
void (*irq_shutdown)(struct irq_data*data);
void (*irq_enable)(struct irq_data*data);
void (*irq_disable)(struct irq_data*data);
void (*irq_ack)(struct irq_data*data);
void (*irq_mask)(struct irq_data*data);
void (*irq_mask_ack)(struct irq_data*data);
void (*irq_unmask)(struct irq_data*data);
void (*irq_eoi)(struct irq_data*data);
int (*irq_set_affinity)(struct irq_data*data,const struct cpumask*dest,bool force);
int (*irq_retrigger)(struct irq_data*data);
int (*irq_set_type)(struct irq_data*data,unsigned int flow_type);
int (*irq_set_wake)(struct irq_data*data,unsigned int on);
void (*irq_bus_lock)(struct irq_data*data);
void (*irq_bus_sync_unlock)(struct irq_data*data);
void (*irq_cpu_online)(struct irq_data*data);
void (*irq_cpu_offline)(struct irq_data*data);
void (*irq_print_chip)(struct irq_data*data,struct seq_file*p);
unsigned long flags;
/* Currently used only by UML, might disappear one day.*/
#ifdef CONFIG_IRQ_RELEASE_METHOD
void (*release)(unsigned int irq,void*dev_id);
#endif
};

其成员绝大多数是函数指针,用来指向具体平台实现的PIC控制函数。

struct irqaction

在继续下面的讨论前,有必要了解struct irqaction这个重要的数据结构。
在struct irq_desc结构中,成员变量action是一指向struct irqaction类型的指针,设备驱动程序通过这个结构将其中断处理函数挂载在action上。

以下是该数据结构的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<include/linux/interrupt.h>
struct irqaction {
irq_handler_t handler;
unsigned long flags;
void *dev_id;
struct irqaction *next;
int irq;
irq_handler_t thread_fn;
struct task_struct *thread;
unsigned long thread_flags;
unsigned long thread_mask;
const char *name;
struct proc_dir_entry *dir;
} ____cacheline_internodealigned_in_smp;

其中irq_handler_t handler指向设备特定的中断服务例程函数的指针,irq_handler_t的声明如下

1
2
<include/linux/interrupt.h>
typedef irqreturn_t (*irq_handler_t)(int, void *);

设备驱动程序负责实现该函数,然后调用request_irq函数,后者会把驱动程序实现的中断服务例程赋值给handler。

1
void *dev_id

调用handler时传给它的参数,在多个设备共享一个irq的情况下特别重要,这种链式的action中,设备驱动程序通过dev_id来标识自己。

1
struct irqaction *next

指向下一个action对象,用于多个设备共享同一个irq的情形,此时action通过next构成一个链表。

1
struct proc_dir_entry *dir

中断处理函数中用来创建在proc文件系统中的目录项。

1
irq_handler_t thread_fn、struct task_struct *threadunsigned long thread_flags

当驱动程序调用request_threaded_irq函数来安装中断处理例程时,用来实现irq_thread机制。

irq_set_handler

现在把讨论的焦点集中到irq_desc数组中被软件中断号irq索引的某一项irq_desc[irq],对于一个特定的irq_desc[irq],其上的中断处理分为两级,第一级是调用irq_desc[irq].handle_irq,第二级是设备特定的中断处理例程ISR,在handle_irq的内部通过irq_desc[irq].action->handler调用。

第一级函数在平台初始化期间被安装到irq_desc数组中,第二级函数的注册发生在设备驱动程序调用request_irq安装对应设备的中断处理例程时。
第一级函数主要面向PIC的某一中断线IRQ line,第二级函数则面向该中断线上连接的具体设备,正如我们在前面图中看到的那样。
内核通过这种两级操作的方式除了可以增加设计的灵活性外,也可以获得某些额外的好处,比如后面将看到的设备软件中断号的探测机制等。

从上节的讨论可知,irq_desc[irq].handle_irq会被do_IRQ调用到,在Linux源码中handle_irq的类型声明如下:

1
2
<include/linux/irq.h>
typedef void(*irq_flow_handler_t)(unsigned int irq,struct irq_desc*desc);

为了让平台的初始化代码能够通过handle_irq注册第一级中断处理函数,内核提供了两个接口函数:irq_set_handler和irq_set_chained_handler。

1
2
3
4
5
6
7
8
9
<include/linux/irq.h>
static inline void irq_set_handler(unsigned int irq,irq_flow_handler_t handle)
{
__irq_set_handler(irq, handle, 0, NULL);
}
static inline void irq_set_chained_handler(unsigned int irq,irq_flow_handler_t handle)
{
__irq_set_handler(irq, handle, 1, NULL);
}

参数handle就是要安装在irq_desc[irq].handle_irq上的第一级处理函数,最终的安装任务通过__irq_set_handler来完成。其原型如下:

1
2
3
<kernel/irq/chip.c>
void __irq_set_handler(unsigned int irq, irq_flow_handler_t handle, int is_chained,
const char *name)

__irq_set_handler在对传递进来的参数作一些必要的检查后,将handle安装到irq_desc[irq]上:

1
2
irq_desc[irq].handle_irq = handle;
irq_desc[irq].name = name;

参数is_chained用来表示irq_desc[irq]对应的项是否支持中断共享,如果是则将irq_desc[irq].status_use_accessors作如下设置:

1
desc->status_use_accessors |= _IRQ_NOPROBE | _IRQ_NOREQUEST;

_IRQ_NOREQUEST意味着对于irq_desc[irq]而言,无法通过request_irq来安装中断处理例程。_IRQ_NOPROBE意味着无法对irq_desc[irq]执行中断号的探测机制。因此若irq_desc[irq]对应的项支持中断的共享,那么它将不能支持自动探测中断号,这是由自动探测机制的设计原理所决定的,后面会看到这一点。
作为handle_irq的一个具体实现例子,下面来看一个用来处理边沿中断触发信号的函数handle_edge_irq的主要实现代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<kernel/irq/chip.c>
void handle_edge_irq(unsigned int irq,struct irq_desc*desc)
{
raw_spin_lock(&desc->lock);
desc->istate&=~(IRQS_REPLAY|IRQS_WAITING);
/*
* If we're currently running this IRQ, or its disabled,
* we shouldn't process the IRQ. Mark it pending, handle
* the necessary masking and go out
*/
if (unlikely(irqd_irq_disabled(&desc->irq_data) ||
irqd_irq_inprogress(&desc->irq_data) || !desc->action)) {
if (!irq_check_poll(desc)) {
desc->istate |= IRQS_PENDING;
mask_ack_irq(desc);
goto out_unlock;
}
}
kstat_incr_irqs_this_cpu(irq,desc);
/*Start handling the irq*/
desc->irq_data.chip->irq_ack(&desc->irq_data);
do{
if(unlikely(!desc->action)){
mask_irq(desc);
goto out_unlock;
}
/*
* When another irq arrived while we were handling
* one, we could have masked the irq.
* Renable it, if it was not disabled in meantime.
*/
if(unlikely(desc->istate&IRQS_PENDING)){
if (!irqd_irq_disabled(&desc->irq_data) &&
irqd_irq_masked(&desc->irq_data))
unmask_irq(desc);
}
handle_irq_event(desc);
}while((desc->istate&IRQS_PENDING)&&
!irqd_irq_disabled(&desc->irq_data));
out_unlock:
raw_spin_unlock(&desc->lock);
}

desc->istate &= ~( IRQS_REPLAY IRQS_WAITING)清除掉IRQ_REPLAY和IRQ_WAITING位,用来实现设备软件中断号的自动探测机制,稍后有专门的小节讨论如何自动探测中断号。
函数首先检查desc->irq_data中的state_use_accessors成员,确定其IRQD_IRQ_DISABLED或IRQD_IRQ_INPROGRESS位有没有被置1,这两位中的任一位被置1或者desc->action为空,handle_edge_irq函数都需要做进一步的特殊处理。
IRQD_IRQ_DISABLED表示当前的desc指向一个被禁止的中断线IRQ line,IRQD_IRQ_INPROGRESS表示当前的中断线正在处理中,同一中断irq的嵌套或者共享会出现该情况。
desc->action为空表示当前中断线上尚没有被安装特定的设备的中断ISR。
从设备驱动程序员的角度来看,这三种情况出现的概率较小,if条件中的unlikely也可说明这一点。这里的特殊处理是:

如果当前的中断线不处在正被轮询的阶段(IRQS_POLL_INPROGRESS),handle_edge_irq需要将desc->istate的IRQS_PENDING位置1,同时调用mask_ack_irq(desc)利用PIC对象的irq_mask例程将该条中断线在PIC中屏蔽掉,然后将IRQD_IRQ_MASKED位置1。
这样的处理其实很好理解:对一个正在被处理(因此没有必要作进一步处理),或者被disabled (当前的触发信号是非预期的,很可能是一种人为或者硬件线路的故障导致的“假”中断信号),或者压根儿没有安装设备中断处理例程ISR(没有设备在使用这根中断线),对于这样的中断线来说,这条正在触发中断信号的IRQ line都应该被屏蔽掉,当然为了后续的跟踪处理,IRQS_PENDING和IRQD_IRQ_MASKED位需要置1。
如果当前中断线正在被轮询,那么需要根据轮询的结果决定下一步的处理。
kstat_incr_irqs_this_cpu用来更新与中断相关的一些统计量,比如统计某一CPU上中断发生的次数。

经过上述这些步骤之后,可以正式进入下一阶段对该中断信号进行处理。handle_edge_irq首先调用desc->irq_data.chip->irq_ack(&desc->irq_data)函数,利用PIC对象的irq_ack例程向设备发出一个中断响应信号,从硬件逻辑角度,这一步通常使得当前发出中断信号的设备中产生一个信号电平的转换,防止设备在它的中断已经在设备驱动程序中处理时依然不停地发出同一中断信号。

do while循环是handle_edge_irq函数的核心部分,通过调用handle_irq_event来对本次中断进行实际的处理操作。
do while中首先对当前irq对应的desc->action指针进行判断,如果action是个空指针表明到目前为止还没有设备驱动程序在这条中断线上安装中断处理例程ISR,对于这种情况的处理是调用mask_irq函数通过PIC对象的irq_mask例程来屏蔽掉当前中断线在PIC中对应的中断位,同时将desc->irq_data.state_use_accessors的IRQD_IRQ_MASKED位置1,这样做是合理的,对于一个没有安装中断处理函数的外部中断,应该屏蔽掉它直到它的处理函数被安装上,否则该设备将不停地中断处理器。
之后再次对desc->istate进行检查,如果发现有等待的中断信号出现而且是被屏蔽掉的,同时其所对应的中断线又没有被disable掉,则通过PIC的unmask函数取消对应设备的屏蔽位,这主要是针对中断处理例程在执行过程中又产生了新的中断这种情况,对于第二次出现的中断信号,handle_edge_irq做的处理是将desc->istate上的IRQS_PENDING位和desc->irq_data.state_use_accessors上的IRQD_IRQ_MASKED位置1,同时在PIC中将对应的中断线屏蔽掉。
这样当前的中断处理例程结束后while循环条件满足,重新执行do while,在接下来的新循环中,这个处于IRQS_PENDING状态的中断线在PIC中的屏蔽将被解除, IRQD_IRQ_MASKED位也被清除掉。

handle_irq_event

函数handle_irq_event的工作比较简单,它为调用设备驱动程序安装的中断处理例程做最后的准备工作,比较容易急躁的读者此时也许还要耐心一点,不过我们很快就会看到与具体的设备中断处理例程相关的调用。
实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
<kernel/irq/handle.c>
irqreturn_t handle_irq_event(struct irq_desc *desc)
{
struct irqaction *action = desc->action;
irqreturn_t ret;
desc->istate &= ~IRQS_PENDING;
irqd_set(&desc->irq_data, IRQD_IRQ_INPROGRESS);
raw_spin_unlock(&desc->lock);
ret = handle_irq_event_percpu(desc, action);
raw_spin_lock(&desc->lock);
irqd_clear(&desc->irq_data, IRQD_IRQ_INPROGRESS);
return ret;
}

在进入正式的设备中断处理例程之前,通过desc->istate &= ~IRQS_PENDING语句清除掉IRQS_PENDING位,因为紧接下来就会调用设备的中断处理例程ISR,所以IRQS_PENDING不应再置1,同时需要将当前的中断线设置IRQD_IRQ_INPROGRESS状态,表明该中断线上一个中断正在被处理。
真正的设备驱动程序实现的中断处理函数例程的调用发生在handle_irq_event_percpu函数中,后者在源码中的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<kernel/irq/handle.c>
irqreturn_t
handle_irq_event_percpu(struct irq_desc *desc, struct irqaction *action)
{
irqreturn_t retval = IRQ_NONE;
unsigned int random = 0, irq = desc->irq_data.irq;
do {
irqreturn_t res;
trace_irq_handler_entry(irq, action);
res = action->handler(irq, action->dev_id);
trace_irq_handler_exit(irq, action, res);
if (WARN_ONCE(!irqs_disabled(),"irq %u handler %pF enabled interrupts\n",
irq, action->handler))
local_irq_disable();
switch (res) {
case IRQ_WAKE_THREAD:
/*
* Set result to handled so the spurious check
* does not trigger.
*/
res = IRQ_HANDLED;
/*
* Catch drivers which return WAKE_THREAD but
* did not set up a thread function
*/
if (unlikely(!action->thread_fn)) {
warn_no_thread(irq, action);
break;
}
irq_wake_thread(desc, action);
/* Fall through to add to randomness */
case IRQ_HANDLED:
random |= action->flags;
break;
default:
break;
}
retval |= res;
action = action->next;
} while (action);
if (random & IRQF_SAMPLE_RANDOM)
add_interrupt_randomness(irq);
if (!noirqdebug)
note_interrupt(irq, desc, retval);
return retval;
}

函数的主体是一do while循环,用于遍历action可能形成的链表结构,当然大部分情况下,一个中断线只安装了一个设备中断处理例程,此时action对象并不构成链表,但是从代码中可以清楚地看到内核对同一中断线上多个设备共享中断的支持。
循环的一开始就通过action->handler来调用具体设备的中断处理例程(action对象中的handler由设备驱动程序通过request_irq函数进行安装,Linux下的设备驱动程序员对此应该不会陌生)。
函数接下来对action->handler调用的返回值进行处理,驱动程序中实现的中断处理例程函数绝大部分返回值IRQ_HANDLED,返回IRQ_WAKE_THREAD的情形相对比较少,如果返回的是IRQ_WAKE_THREAD,那么函数将调用irq_wake_thread来唤醒action->thread表示的一个内核线程,关于这种情形将在后续的中断安装部分予以讨论。
在结束一个具体设备的中断处理例程之后,函数通过action = action->next来获得action的下一个节点(如果节点存在的话)。

do while的循环条件是action->next不为空,这种情况表明正在处理一个共享的中断。对共享中断形成的链式结构的处理是遍历action链表,对每一个节点调用其上的handler函数。

request_irq

前面讲了Linux下处理一个外部中断的整个流程,其中大部分的工作都是由内核来完成,这里之所以用一定量的篇幅对其进行讨论,目的是希望读者对设备驱动程序提供的中断处理例程被调用时的上下文背景有个清晰的认识,这样我们才能知道如何去实现一个无安全隐患的中断处理例程。
现在开始讨论驱动程序如何与Linux的中断处理框架进行交互,向irq_desc数组中安装设备的中断处理例程。驱动程序中安装一个设备中断服务例程是通过调用request_irq函数完成的,其定义如下:

1
2
3
4
5
6
7
<include/linux/interrupt.h>
static inline int __must_check
request_irq(unsigned int irq, irq_handler_t handler, unsigned long flags,
const char *name, void *dev)
{
return request_threaded_irq(irq, handler, NULL, flags, name, dev);
}

函数的第一个参数irq是当前要安装的中断处理例程所对应的软中断号;handler就是已经多次提及的中断处理例程ISR,由设备驱动程序负责实现;flags是标志变量,可影响内核在安装ISR时的一些行为模式;name是当前安装中断ISR的设备名称,内核会在proc文件系统中生成name的一个入口点;dev是个传递到中断处理例程的指针,在中断共享的情形下,将在free_irq时被用到,以区分当前的free_irq要释放的是哪一个struct irqaction对象,因此必须确保dev参数在内核整个中断处理框架中的唯一性,由于内核在用request_irq安装一个中断处理例程时并不对dev的唯一性进行检查,因此设备驱动程序应该努力做到这一点,通常的做法是将设备驱动程序所管理的与设备相关的某一数据结构对象的指针作为dev的实参。
另外由于内核中断处理框架在调用设备驱动程序的ISR时,会将该dev参数一并传入,因此也可以借助它在被中断的进程与中断处理例程中传递数据之用。

request_irq函数的核心是通过调用request_threaded_irq完成中断处理函数的实际安装工作,可以看到request_irq在调用request_threaded_irq函数时传入的第三个参数是NULL,这个参数跟内核中一个用于中断处理的线程irq_thread有关,如果设备驱动程序通过request_irq来安装一个中断处理例程,因为对thread_fn传入的实参是NULL,所以不会涉及irq_thread部分,但是设备驱动程序也可以直接通过调用request_threaded_irq来安装中断,此时就有机会使用到irq_thread机制。request_threaded_irq函数的实现源码为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<kernel/irq/manage.c>
int request_threaded_irq(unsigned int irq, irq_handler_t handler,
irq_handler_t thread_fn, unsigned long irqflags,
const char *devname, void *dev_id)
{
struct irqaction *action;
struct irq_desc *desc;
int retval;
if ((irqflags & IRQF_SHARED) && !dev_id)
return -EINVAL;
desc = irq_to_desc(irq);
if (!desc)
return -EINVAL;
if (!irq_settings_can_request(desc))
return -EINVAL;
if (!handler) {
if (!thread_fn)
return -EINVAL;
handler = irq_default_primary_handler;
}
action = kzalloc(sizeof(struct irqaction), GFP_KERNEL);
if (!action)
return -ENOMEM;
action->handler = handler;
action->thread_fn = thread_fn;
action->flags = irqflags;
action->name = devname;
action->dev_id = dev_id;
chip_bus_lock(irq, desc);
retval = __setup_irq(irq, desc, action);
chip_bus_sync_unlock(irq, desc);
if (retval)
kfree(action);
return retval;
}

函数一开始进行了一系列的检查。
比如如果irqflags中的IRQF_SHARED位被置1,表明正在安装一个共享的中断,这种情况下驱动程序必须提供dev_id,如果dev_id为空则是非法情况,因为在free_irq中将无法确定到底卸载哪一个action。
如果desc->status_use_accessors上的_IRQ_NOREQUEST位被置1,表明irq_desc数组中的这一项禁止通过request_threaded_irq来安装中断处理函数,也是非法情况。这些检查通过之后,函数调用kzalloc分配一块类型为struct irqaction的地址空间action,然后根据函数传入的参数初始化action,并调用__setup_irq来安装中断处理函数。__setup_irq的声明如下:

1
2
static int
__setup_irq(unsigned int irq, struct irq_desc *desc, struct irqaction *new);

因为中断安装时有诸多的细节需要内核仔细处理,所以__setup_irq函数的源码实现看起来比较冗长,但其实质性的工作其实就是将desc中的action成员指针指向要安装的中断处理例程。下面按照request_irq调用时desc->action是否为空分别进行讨论(先暂不考虑irq_thread机制)。

desc->action为空

这种情况比较简单,因为此时desc->action为空,意味着当前尚无设备驱动程序正在使用这条中断线,所以只需先获得指向desc->action的指针old_ptr:struct irqaction **old_ptr =&desc->action,然后将request_threaded_irq中新分配的action指针赋值给old_ptr即可:*old_ptr =new;。
从设备驱动程序的角度而言,有几个需要注意的地方,如果设备驱动调用request_irq时,参数flags中设定了IRQF_TRIGGER_MASK标志位,表明驱动程序需要利用request_irq对irq的触发类型进行配置,因为desc->irq_data中的chip是PIC的抽象,所以此时只需调用chip中的irq_set_type成员函数就可配置PIC。系统定义的中断信号触发类型标志有:

1
2
3
4
5
6
7
8
9
10
11
12
<include/linux/interrupt.h>
//上升沿触发
#define IRQF_TRIGGER_RISING 0x00000001
//下降沿触发
#define IRQF_TRIGGER_FALLING 0x00000002
//高电平触发
#define IRQF_TRIGGER_HIGH 0x00000004
//低电平触发
#define IRQF_TRIGGER_LOW 0x00000008
//中断触发信号掩码
#define IRQF_TRIGGER_MASK (IRQF_TRIGGER_HIGH|IRQF_TRIGGER_LOW|\
IRQF_TRIGGER_RISING | IRQF_TRIGGER_FALLING)

设定中断触发信号类型的函数为__irq_set_trigger,其主要功能是通过PIC对象的irq_set_type成员函数设定当前中断线上有效的中断触发信号类型,同时将设定的类型记录到desc->irq_data.state_use_accessors中:

1
2
3
4
5
6
7
8
9
10
11
12
13
<kernel/irq/manage.c>
int __irq_set_trigger(struct irq_desc *desc, unsigned int irq,
unsigned long flags)
{
struct irq_chip *chip = desc->irq_data.chip;
int ret, unmask = 0;

ret = chip->irq_set_type(&desc->irq_data, flags);

desc->irq_data. state_use_accessors &= IRQD_TRIGGER_MASK;
desc->irq_data. state_use_accessors |= flags;

}

因为不是共享中断的情形,所以当前的request_irq调用将独占irq所对应的中断线的所有权,可以根据设备自身需要随意设置其中断触发信号类型,这在存在中断共享的情形下是不可能的。
所以如果驱动程序需要将irq的触发信号配置成下降沿触发,可以作如下调用:

1
request_irq(irq, demo_handler, IRQF_TRIGGER_FALLING, NULL, NULL);

desc->action不为空

这种情形表明当前irq所对应的中断线此前已经被安装了中断处理函数,换言之,这意味着正在安装一个共享该irq的中断处理例程。在中断共享的情况下,事情变得有些复杂,因为在此之前至少有一个设备驱动程序在当前的中断线上安装了中断处理例程,此时内核再安装一个新的中断处理例程就有了相当的限制,一个大体的原则是,新的安装不能破坏之前已有的中断工作模式。
从代码的角度,新的irqaction对象的flags成员必须与action链上已有的节点的flags成员作检查比较,如果有不一致的情形出现,安装将不会成功,函数返回一个错误码-EBUSY。
被检查的flags标志有IRQF_SHARED、IRQF_TRIGGER_MASK、IRQF_ONESHOT及IRQF_PERCPU,这些都是设备驱动程序在调用request_irq时通过参数flags传入的标志位。

在共享中断的情形下,如果新的request_irq调用去设定当前的触发信号的类型,__setup_irq函数并不会去真正调用PIC对象的irq_set_type函数,而只是检查当前要设定的中断触发信号类型是否与这条线上已经设定的类型相符,如果不符合,__setup_irq会给出一个警告信息,当前的安装虽然可以成功,但是未必能如预期的那样正常工作。
如果这些检查都成功通过,那么request_irq此时要做的是,将新分配的action加到action链的末尾。

__setup_irq函数的结束部分,如果desc->dir还是空,那么调用register_irq_proc在/proc/irq目录下创建类似/proc/irq/125这样的新目录项。最后调用的register_handler_proc在action->name不为空的情况下,会为此新action在proc文件系统中创建类似/proc/irq/125/action_name这样的目录。
内核通过这些proc文件系统的操作,可以方便开发者在用户空间查看系统中设备驱动程序的中断安装情况,例如x86平台上对应irq=45的proc文件系统节点的下列输出:

1
2
3
4
5
6
7
8
9
root@AMDLinuxFGL:/proc/irq/45# ll
total 0
dr-xr-xr-x 3 root root 0 Aug 6 17:48.
dr-xr-xr-x 27 root root 0 Aug 6 17:44..
-r-------- 1 root root 0 Aug 6 17:48affinity_hint
dr-xr-xr-x 2 root root 0 Aug 6 17:48hda_intel
-r--r--r-- 1 root root 0 Aug 6 17:48node
-rw------- 1 root root 0 Aug 6 17:48smp_affinity
-r--r--r-- 1 root root 0 Aug 6 17:48spurious

中断处理的irq_thread机制

下面再简单讨论一下内核为中断处理提供的另一种机制,这种机制在设备驱动程序通过调用request_threaded_irq函数来安装一个中断时,需要在struct irqaction对象中实现它的thread_fn成员。
request_threaded_irq函数内部会生成一个名为irq_thread的独立线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<kernel/irq/manage.c>
static int
__setup_irq(unsigned int irq, struct irq_desc *desc, struct irqaction *new)
{

if (new->thread_fn && !nested) {
struct task_struct *t;
t = kthread_create(irq_thread, new, "irq/%d-%s", irq,
new->name);
if (IS_ERR(t))
return PTR_ERR(t);
/*
* We keep the reference to the task struct even if
* the thread dies to avoid that the interrupt code
* references an already freed task_struct.
*/
get_task_struct(t);
new->thread = t;
}

}

irq_thread线程被创建出来时将以TASK_INTERRUPTIBLE的状态睡眠等待中断的发生,当中断发生时action->handler只负责唤醒睡眠的irq_thread,后者将调用action->thread_fn进行实际的中断处理工作。
因为irq_thread本质上是系统中的一个独立进程,所以采用这种机制将使实质的中断处理工作发生在进程空间,而不是中断的上下文中。

free_irq

通过request_irq安装的中断处理函数,如果不再需要的话应该调用free_irq予以释放。
free_irq完成的任务和request_irq正好相反,其声明如下:

1
2
<include/linux/interrupt.h>
extern void free_irq(unsigned int irq, void * dev_id);

根据第一个参数irq,函数在irq_desc数组中查找对应的action,遍历该action所在的链表,如果有action->dev_id == dev_id,那么就找到了要释放的action。
找到后调用kfree释放action所占的空间。如果释放的action是irq_desc[irq]中唯一的一个action节点,那么释放后还需要把desc->irq_data.state_use_accessors的IRQD_DISABLED位置1,同时调用irq_desc[irq].chip的irq_shutdown或者irq_disable/irq_mask函数在PIC中屏蔽掉irq所对应的外部设备中断线。
request_irq中建立的proc文件系统节点也将被删除。

SOFTIRQ

前面讨论了HARDIRQ的执行流程,下面再来看看Linux内核如何实现SOFTIRQ。SOFTIRQ的处理是在do_IRQ函数的irq_exit中实现的,irq_exit函数中实现SOFTIRQ调用的代码为:

1
2
3
4
5
6
7
8
9
<kernel/softirq.c>
void irq_exit(void)
{

sub_preempt_count(IRQ_EXIT_OFFSET);
if (!in_interrupt() && local_softirq_pending())
invoke_softirq();

}

函数首先把当前栈中的preempt_count变量减去IRQ_EXIT_OFFSET来标识一个HARDIRQ中断上下文的结束:preempt_count() -= IRQ_EXIT_OFFSET,这步动作对应do_IRQ中的irq_enter。
在没有配置内核可抢占的系统中,IRQ_EXIT_OFFSET=HARDIRQ_OFFSET;如果配置了可抢占,那么IRQ_EXIT_OFFSET=(HARDIRQ_OFFSET-1),意味着在HARDIRQ部分结束之后,内核已经启动可抢占性。

invoke_softirq是真正处理SOFTIRQ部分的函数,不过这个函数的调用有个前提,就是if中的两个条件:in_interrupt和local_softirq_pending。in_interrupt是个宏,展开为:

1
2
<include/linux/hardirq.h>
#define in_interrupt() (preempt_count()&(HARDIRQ_MASK|SOFTIRQ_MASK|NMI_MASK))

其主要用意是根据当前栈中的preempt_count变量,来判断当前是否在一个中断上下文中执行。
根据in_interrupt的定义来看,Linux内核认为HARDIRQ、SOFTIRQ和NMI都属于interrupt范畴。
对于HARDIRQ,前面讨论do_IRQ时可以看到在irq_enter和irq_exit之间,内核在preempt_count()上标示了HARDIRQ_OFFSET,表示这是个HARDIRQ的上下文。

Linux内核对preempt_count的使用如图所示:

由图可见,preempt_count的低8位与PREEMPT相关,8~15位留给SOFTIRQ使用,16~25位给HARDIRQ使用,NMI占据1位。local_softirq_pending也是一个宏,展开为:

1
2
<include/linux/irq_cpustat.h>
#define local_softirq_pending() (irq_stat[smp_processor_id()].__softirq_pending)

irq_stat是个数组,其具体定义取决于__ARCH_IRQ_STAT宏,在大部分体系架构中这是个per-CPU变量,比如对于x86平台:

1
2
<arch/x86/include/asm/hardirq.h>
DECLARE_PER_CPU_SHARED_ALIGNED(irq_cpustat_t, irq_stat);

如果没有定义__ARCH_IRQ_STAT,那么irq_stat定义如下:

1
2
3
4
5
<kernel/softirq.c>
#ifndef __ARCH_IRQ_STAT
irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
EXPORT_SYMBOL(irq_stat);
#endif

基本上可以认为这是个per-CPU变量,系统中的每个CPU都拥有各自的副本。其类型irq_cpustat_t定义如下:

1
2
3
4
<include/asm-generic/hardirq.h>
typedef struct {
unsigned int __softirq_pending;
} ____cacheline_aligned irq_cpustat_t;

内核用一个无符号整型__softirq_pending来表示当前正在等待被处理的softirq,每一种softirq在__softirq_pending中占据一位,每个CPU都拥有自己的__softirq_pending变量。
回到irq_exit,现在知道invoke_softirq被调用的前提是:当前不在interrupt上下文中而且__softirq_pending中有等待的softirq。当前不在interrupt上下文中保证了如果代码正在SOFTIRQ部分执行时(此时处理器可以处理外部中断),如果发生了一个外部中断,那么在中断处理函数结束HARDIRQ部分时,将不会处理softirq,而是直接返回,这样此前被中断的SOFTIRQ部分将继续执行。

现在开始讨论softirq的处理部分,invoke_softirq是一个宏,定义如下:

1
2
3
4
5
6
<kernel/softirq.c>
#ifdef __ARCH_IRQ_EXIT_IRQS_DISABLED
#define invoke_softirq() __do_softirq()
#else
#define invoke_softirq() do_softirq()
#endif

__ARCH_IRQ_EXIT_IRQS_DISABLED是个体系架构相关的宏,用来决定在HARDIRQ部分结束时,有没有关闭处理器响应外部中断的能力。
如果定义了__ARCH_IRQ_EXIT_IRQS_DISABLED,就意味着在处理SOFTIRQ部分时,可以保证外部中断已经关闭,此时可以直接调用__do_softirq,否则调用do_softirq,后者最终会调用到__do_softirq,不过之前要做一些中断屏蔽的事情,保证__do_softirq开始执行时中断是关闭的:

1
2
3
4
5
6
7
8
9
10
11
12
13
<kernel/softirq.c>
asmlinkage void do_softirq(void)
{
__u32 pending;
unsigned long flags;
if (in_interrupt())
return;
local_irq_save(flags);
pending = local_softirq_pending();
if (pending)
__do_softirq();
local_irq_restore(flags);
}

__do_softirq的核心代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<kernel/softirq.c>
#define MAX_SOFTIRQ_RESTART 10
asmlinkage void __do_softirq(void)
{
struct softirq_action *h;
__u32 pending;
int max_restart = MAX_SOFTIRQ_RESTART;
int cpu;
pending = local_softirq_pending();
__local_bh_disable((unsigned long)__builtin_return_address(0));
cpu = smp_processor_id();
restart:
set_softirq_pending(0);
local_irq_enable();
h = softirq_vec;
do {
if (pending & 1) {
h->action(h);
}
h++;
pending >>= 1;
} while (pending);
local_irq_disable();
pending = local_softirq_pending();
if (pending && --max_restart)
goto restart;
if (pending)
wakeup_softirqd();
_local_bh_enable();
}

在具体讨论这个函数之前,先看看系统定义的几个softirq的类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
enum
{
HI_SOFTIRQ=0,
TIMER_SOFTIRQ,
NET_TX_SOFTIRQ,
NET_RX_SOFTIRQ,
BLOCK_SOFTIRQ,
BLOCK_IOPOLL_SOFTIRQ,
TASKLET_SOFTIRQ,
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ,
NR_SOFTIRQS
};

每个softirq对应__softirq_pending中的一个位。其中,HI_SOFTIRQ和TASKLET_SOFTIRQ用来实现tasklet,TIMER_SOFTIRQ和HRTIMER_SOFTIRQ用于定时器,NET_TX_SOFTIRQ和NET_RX_SOFTIRQ用于网络设备的发送和接收操作,BLOCK_SOFTIRQ和BLOCK_IOPOLL_SOFTIRQ用于块设备的操作,SCHED_SOFTIRQ用于调度器。内核在此基础上定义了一个struct softirq_action类型的数值softirq_vec,用来放置softirq对应的处理函数:

1
2
3
4
5
6
7
<include/linux/interrupt.h>
struct softirq_action
{
void (*action)(struct softirq_action*);
};
<kernel/softirq.c>
static struct softirq_action softirq_vec[NR_SOFTIRQS]__cacheline_aligned_in_smp;

所以__do_softirq的核心思想是:从CPU本地的softirq_pending的最低位开始,依次往高位扫描,如果发现某位为1,说明对应该位有个等待中的softirq需要处理,那么就调用softirq_vec数组中对应项的action函数。这个过程会一直持续下去,直到`softirq_pending`为0。具体的函数实现上,有以下几点需要注意:

  • __local_bh_disable_local_bh_enable用来在preempt_count()上标示SOFTIRQ的上下文,考虑到SOFTIRQ执行过程可能会被外部中断的情况,这可以防止SOFTIRQ部分的重入,因为只有在非interrupt的上下文中才可以进入到SOFTIRQ部分。
  • 在执行softirq的前后分别调用了local_irq_enable和local_irq_disable,这说明SOFTIRQ部分在执行时处理器可以响应并处理外部的中断。
  • softirq执行的先后顺序由__softirq_pending中的位决定,低位的softirq要先于高位的softirq执行。
  • 在do while循环之后,会再次检测__softirq_pending是否为0,这主要是因为SOFTIRQ在执行过程中可能被外部设备中断,其设备驱动程序在实现该中断处理函数时可能使用了一个softirq,因此在do while循环之后,需要检测有没有新加入的softirq需要处理。
  • 如果上面第3步的动作执行超过一定的次数,则需要唤醒ksoftirqd来处理。因为如果在SOFTIRQ部分耗费太多的时间,会导致一个中断处理流程迟迟无法结束,这意味着此前被中断的任务无法继续运行。为了避免这种情况,Linux系统在初始化期间生成了一个新的进程ksoftirqd,该进程运行时要完成的主要任务就是调用do_softirq来执行等待中的softirq,如果没有softirq需要处理,该进程将进入睡眠状态。

因此为了避免在一个中断的SOFTIRQ部分耗费太多时间处理softirq导致该中断流程迟迟无法结束,__do_softirq通过wakeup_softirqd唤醒ksoftirqd,让调度器来平衡当前中断在SOFTIRQ部分的工作负荷。

irq的自动探测

如果一个设备的驱动程序无法确定它所管理的设备的软件中断号irq,此时设备驱动程序可以使用irq的自动探测机制来获得其正在使用的irq。
内核为此提供了几个接口函数供驱动程序使用,需要注意的是,中断探测机制的实现需要内核和驱动程序共同努力才能完成,并且这种探测只限于非共享中断的情况,因此只有当一个设备能确定其irq不会与别的设备共享时,才可以使用这里的探测。
探测前的情形是,该设备关联到了某个irq,但是因为设备驱动程序还不清楚是哪个irq,因此不可能调用request_irq来向该irq安装中断处理例程,所以对应该irq的action为空。探测要完成的任务是找到该设备所关联的irq。

探测的原理是,调用probe_irq_on函数遍历整个irq_desc数组,对于每个action为空的元素且在该项允许自动探测的情形下,将其istate上的IRQS_WAITING位置1,然后让设备产生一次中断,irq_desc数组中与该设备irq关联的那一项的第一级中断函数handle_irq会被调用,后者将会清除IRQS_WAITING位,然后调用probe_irq_off再遍历一遍irq_desc数组,对于每个action为空的元素,查看其istate上的IRQS_WAITING位是否被清0,如果是那么该元素对应的irq就是正与目前设备关联的。

以下是一个设备驱动程序用来实现自动探测的代码序列的示例

1
2
3
4
5
6
7
8
9
10
11
12
unsigned long irqs;
//清除设备内部的中断

irqs=probe_irq_on();
/*等待5ms*/
msleep(5);
/*让设备产生一次中断 */

/*等待5ms*/
msleep(5);
/* 得到探测到的中断号 */
irq=probe_irq_off(irqs);

这段代码中用到了probe_irq_on和probe_irq_off两个函数,它们都是内核为驱动程序实现的自动探测接口函数,下面将看到这两个函数的实现原理。
probe_irq_on函数的核心代码是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
<kernel/irq/autoprobe.c>
unsigned long probe_irq_on(void)
{
struct irq_desc *desc;
unsigned long mask = 0;
int i;
/*
* quiesce the kernel, or at least the asynchronous portion
*/
async_synchronize_full();
mutex_lock(&probing_active);
/*
* something may have generated an irq long ago and we want to
* flush such a longstanding irq before considering it as spurious.
*/
for_each_irq_desc_reverse(i, desc) {
raw_spin_lock_irq(&desc->lock);
if (!desc->action && irq_settings_can_probe(desc)) {
/*
* Some chips need to know about probing in
* progress:
*/
if (desc->irq_data.chip->irq_set_type)
desc->irq_data.chip->irq_set_type(&desc->irq_data,
IRQ_TYPE_PROBE);
irq_startup(desc);
}
raw_spin_unlock_irq(&desc->lock);
}
/* Wait for longstanding interrupts to trigger. */
}
raw_spin_unlock_irq(&desc->lock);
}
msleep(20);
/*
* enable any unassigned irqs
* (we must startup again here because if a longstanding irq
* happened in the previous stage, it may have masked itself)
*/
for_each_irq_desc_reverse(i, desc) {
raw_spin_lock_irq(&desc->lock);
if (!desc->action && irq_settings_can_probe(desc)) {
desc->istate |= IRQS_AUTODETECT | IRQS_WAITING;
if (irq_startup(desc))
desc->istate |= IRQS_PENDING;
}
raw_spin_unlock_irq(&desc->lock);
}
}
raw_spin_unlock_irq(&desc->lock);
}
/*
* Wait for spurious interrupts to trigger
*/
msleep(100);
/*
* Now filter out any obviously spurious interrupts
*/
for_each_irq_desc(i, desc) {
raw_spin_lock_irq(&desc->lock);
if (desc->istate & IRQS_AUTODETECT) {
/* It triggered already - consider it spurious. */
if (!(desc->istate & IRQS_WAITING)) {
desc->istate &= ~IRQS_AUTODETECT;
irq_shutdown(desc);
} else
if (i < 32)
mask |= 1 << i;
}
raw_spin_unlock_irq(&desc->lock);
}
return mask;
}

函数的主体是三个for_each_irq_desc所引导的循环。

  • 第一个for_each_irq_desc循环从后向前遍历irq_desc数组,遍历过程中对于每一个desc,只要能满足desc->action为空并且desc->status_use_accessors没有设置_IRQ_NOPROBE位,那么就通过PIC中的irq_startup函数把对应的中断启用起来。desc->action为空说明该irq上还没有安装中断处理例程, desc->status_use_accessors没有设置_IRQ_NOPROBE位说明该desc允许被探测,设备所关联的irq只可能存在满足这两个条件的desc中。
  • 第二个for_each_irq_desc循环依旧从后向前遍历irq_desc数组,对于满足desc->action为空并且desc->status_use_accessors没有设置_IRQ_NOPROBE位的desc,将其istate重新设置为:

    1
    desc->istate |= IRQS_AUTODETECT | IRQS_WAITING;
  • 第三个for_each_irq_desc循环从前向后遍历irq_desc数组,对于满足(desc->istate &IRQS_AUTODETECT) != 0的每一个desc,说明它正是我们在探测的元素,此时检查desc->istate上的IRQS_WAITING位有没有被置1。因为根据探测的流程,在调用probe_irq_on时,驱动程序还没有让设备产生中断,因此IRQS_WAITING位不可能被清0,如果它被清0,说明该desc上的第一级函数被调用了,这意味着这个irq所对应的中断线上正在产生无意义的触发信号(不可能是由安装了ISR的正常设备所产生,因为request_irq在安装ISR时会清除掉IRQS_AUTODETECT位),对此的处理是通过PIC屏蔽该中断,然后继续查找下一个irq_desc元素。

这个函数的返回值如同probe_irq_off中的参数一样并无实际的用途。
当probe_irq_off被调用时,驱动程序已经让设备产生了一次中断,所以probe_irq_off需要使用for_each_irq_desc循环从前向后遍历irq_desc数组,试图找到这样一个desc:(desc->istate &IRQS_AUTODETECT) != 0并且desc->istate的IRQS_WAITING位被清除,这正是probe_irq_off的主要流程。
如果找到了设备所关联的irq就返回之,否则函数返回0。

中断处理例程

如果设备需要通过中断这种方式与处理器进行沟通,那么它的驱动程序就有必要实现一个中断处理例程并负责把它安装到系统中,这样当设备的中断信号来临时,处理器才可能调用到它的处理例程。
虽然中断处理例程不过是种普通的函数,但是内核作为这种游戏规则的制定者,为了确保一切尽在它的掌握之中,对于中断处理例程的实现有着特定的要求。
首先从中断处理例程的原型看,它必须与struct irqaction中handler函数指针的原型保持一致。
这很正常,因为中断处理例程的安装,本质上是让这个指针指向中断处理例程。

handler的原型前面提过,这里再重复一遍:

1
typedef irqreturn_t (*irq_handler_t)(int, void *);

因此一个实际的中断处理例程应该这样声明自己:

1
irqreturn_t demo_isr(int irq, void * dev_id);

函数的返回值是个irqreturn_t类型,该类型在内核源码中的定义如下

1
2
3
4
5
6
7
<include/linux/irqreturn.h>
enum irqreturn {
IRQ_NONE,
IRQ_HANDLED,
IRQ_WAKE_THREAD,
};
typedef enum irqreturn irqreturn_t;

因此,中断处理例程只能有三种返回值,分别是

  • IRQ_NONE中断例程发现正在处理一个不是自己的设备触发的中断,此时它唯一要做的就是返回该值。
  • IRQ_HANDLED中断处理例程成功地处理了自己设备的中断,返回该值。
  • IRQ_WAKE_THREAD中断处理例程被用来作唤醒一个等待在它的irq上的一个进程使用,此时它返回该值。

其次,中断处理例程是在中断上下文中执行,这对它的实现提出了某些限制。因为中断上下文不隶属于某个进程,在这里current指针不再有意义,它们游离在Linux进程世界的边缘,因此在这种环境下绝对禁止任何形式的进程切换。实际的代码实现中,确保中断处理上下文中不出现进程的切换并不是件容易的事,需要仔细审查中断处理例程中的每行代码,包括调用的每一个函数,确保它们不会进入睡眠状态而使调度器介入其中。
一个典型的例子是在中断处理例程中使用内存分配函数kmalloc,如果在调用这个函数时使用了GFP_KERNEL标志而不是GFP_ATOMIC,那么很小的概率下会因为内存难以满足需求而进入睡眠,虽然大部分时间都不会遇到麻烦,但是偶尔出现的睡眠将会带来真正的大麻烦。

最后,中断处理例程作为系统中的一种并发源头,可能会去访问一些共享的资源,如果不幸恰好有别的进程(最典型的是被它中断的进程)也在使用同样的共享资源,竞态将不可避免,因此需要考虑互斥的机制来保护。
内核所提供的用于互斥的各种机制,在中断处理例程中使用这些机制需要格外小心,防止出现睡眠的可能性,因此信号量和互斥锁首先就会被排除掉,绝大多数的情况你需要使用自旋锁spin lock及其变体。

中断共享

即便PIC已经提供足够多的中断引脚供外部设备使用,但也有不够用的时候,此时中断共享机制可能就会派上用场。所谓中断共享是指多个设备共享一根中断线,使用同一个irq。
对驱动程序来说,需要注意的地方在调用request_irq和中断处理例程的实现上。对于一个共享的中断,驱动程序在调用request_irq时应该使用IRQF_SHARED标志,同时提供dev_id,提供的dev_id在中断处理例程中并没有什么特别的用处,之所以要求在中断共享时提供这个参数,主要是为了在free_irq时能在action链中找到它,因此这个dev_id在中断共享的action链中应该具有唯一性,实际使用中可以像下面这样:

1
2
3
4
//pDev是一个指向设备相关的结构体的指针
struct demo_dev*pDev=…
//request_irq
int retval=request_irq(irq,demo_isr,IRQF_SHARED,"demo device",pDev);

接下来是中断共享时的中断处理例程的实现,因为当irq上的中断发生时,内核会调用irq上的每个action中的handler,因此即便不是你的设备产生的中断,你的中断处理例程ISR也会被调用到,因此共享中断时的ISR需要能判断是否是自己的设备产生的中断,这主要靠读取自己设备的中断状态寄存器来完成。如果发现你的设备没有产生中断,那么ISR只需要返回一个IRQ_NONE就好了,下面是一个共享中断下的ISR的实现:

1
2
3
4
5
6
7
8
9
10
11
12
irqreturn_t  demo_isr(int irq,void*dev_id)
{
//读取设备中断状态寄存器
status=read_intr_reg(…);
//判断自己的设备有没有产生中断,没有的话直接返回IRQ_NONE
if(status&…){
return IRQ_NONE;
}else{
//中断处理
}
return IRQ_HANDLED
}

小结

讨论了Linux下一个外部中断发生后的整个处理流程。通过内核精心设计的中断处理框架,如果一个设备驱动程序为其所管理的设备通过request_irq注册了中断处理例程,那么该设备产生的中断在中断处理框架中经过多层的调用,最终会进入到该中断处理例程中来。
可以看到这其中大部分的任务来自于内核的代码(嵌入式系统还需要BSP代码)。

驱动程序对中断的支持相对简单,只需要实现中断处理例程函数并调用request_irq向系统注册即可,在某些特定的情形下,设备驱动程序也可以使用request_threaded_irq函数来向系统注册中断处理例程,比如需要使用irq_thread机制。
了解整个中断处理流程,有助于了解中断处理例程的执行环境,避免因为不安全的中断例程实现给系统造成负面影响。
Linux内核将中断处理分成了HARDIRQ和SOFTIRQ两部分。HARDIRQ在执行时中断是关闭的,因此这部分的代码应该完成中断处理中最关键的任务,执行时间也应尽可能短。如果需要执行时间很长的操作,可以将其延迟到SOFTIRQ部分执行,因为SOFTIRQ部分在执行时处理器的中断是打开的。




中断下半部细节

inux内核将对一个外部设备中断的处理分成两大部分HARDIRQ和SOFTIRQ,因为HARDIRQ部分在执行时处理器的中断是关闭的,所以驱动程序的中断处理例程在这部分只应该完成一些关键的中断操作,而将耗时的工作延迟到SOFTIRQ部分执行。
内核为此给驱动程序提供了一个基于SOFTIRQ的任务延迟的实现机制tasklet。因为tasklet需要在中断上下文中执行,所以有些延迟的操作无法用tasklet来完成,为此内核又提供了一个基于进程的延迟操作实现机制——工作队列work queue。

本节将先描述tasklet和工作队列的内核实现机制,然后再分别讨论设备驱动程序如何使用它们来实现延迟的操作。
当然驱动程序中可以使用的延迟操作机制并非只有softirq/tasklet和工作队列workqueue这两种,比如定时器timer也可以用来实现延迟的操作。

tasklet

tasklet是内核定义的几种softirq之一,设备驱动程序的中断处理例程常常利用tasklet来完成一些延后的处理。
根据优先级的不同,内核将tasklet分成两种,在softirq中对应TASKLET_SOFTIRQ和HI_SOFTIRQ,后者的执行顺序优于前者。

Linux内核定义的softirq有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<include/linux/interrupt.h>
enum
{
HI_SOFTIRQ=0,
TIMER_SOFTIRQ,
NET_TX_SOFTIRQ,
NET_RX_SOFTIRQ,
BLOCK_SOFTIRQ,
BLOCK_IOPOLL_SOFTIRQ,
TASKLET_SOFTIRQ,
SCHED_SOFTIRQ,
HRTIMER_SOFTIRQ,
RCU_SOFTIRQ, /*Preferable RCU should always be the last softirq*/
NR_SOFTIRQS
};

其中HI_SOFTIRQ和TASKLET_SOFTIRQ就是本章要讨论的主题tasklet。

tasklet机制初始化

Linux系统初始化期间通过调用softirq_init为TASKLET_SOFTIRQ和HI_SOFTIRQ安装了执行函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<kernel/softirq.c>
void __init softirq_init(void)
{
int cpu;
for_each_possible_cpu(cpu) {
int i;
per_cpu(tasklet_vec, cpu).tail =
&per_cpu(tasklet_vec, cpu).head;
per_cpu(tasklet_hi_vec, cpu).tail =
&per_cpu(tasklet_hi_vec, cpu).head;
for (i = 0; i < NR_SOFTIRQS; i++)
INIT_LIST_HEAD(&per_cpu(softirq_work_list[i], cpu));
}
register_hotcpu_notifier(&remote_softirq_cpu_notifier);
open_softirq(TASKLET_SOFTIRQ, tasklet_action);
open_softirq(HI_SOFTIRQ, tasklet_hi_action);
}

函数中的for_each_possible_cpu循环用来初始化管理tasklet链表的变量tasklet_vec和tasklet_hi_vec,稍后会谈到这两个变量的具体用途。
open_softirq用来给TASKLET_SOFTIRQ和HI_SOFTIRQ安装对应的执行函数:

1
2
softirq_vec[TASKLET_SOFTIRQ].action = tasklet_action;
softirq_vec[HI_SOFTIRQ].action = tasklet_hi_action;

上述代码中,softirq_vec是一个struct softirq_action类型的数组,数组中的每一项都对应一个软中断处理函数指针。该数组在源码中的定义如下

1
2
<kernel/softirq.c>
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

软中断处理函数原型则由struct softirq_action来定义:

1
2
3
4
5
<include/linux/interrupt.h>
struct softirq_action
{
void (*action)(struct softirq_action*);
};

如此,在中断处理的SOFTIRQ部分,如果发现本地CPU的softirq_pending上TASKLET_SOFTIRQ或者HI_SOFTIRQ位被置1,就将调用tasklet_action或者tasklet_hi_action。
后面会看到
softirq_pending与softirq_vec数组间的对应关系。

提交一个tasklet

本节将讨论设备驱动程序如何利用内核提供的tasklet机制来实现一个延迟的操作。内核为此定义了一个表示tasklet对象的数据结构struct tasklet_struct:

1
2
3
4
5
6
7
8
9
<include/linux/interrupt.h>
struct tasklet_struct
{
struct tasklet_struct *next;
unsigned long state;
atomic_t count;
void (*func)(unsigned long);
unsigned long data;
};

其中

1
struct tasklet_struct *next

用来将系统中的tasklet对象构建成链表。

1
unsigned long state

记录每个tasklet在系统中的状态,其值是枚举型变量TASKLET_STATE_SCHED和TASKLET_STATE_RUN两者之一。
TASKLET_STATE_SCHED表示当前tasklet已经被提交;TASKLET_STATE_RUN只用在对称多处理器系统SMP中,表示当前tasklet正在执行。

1
atomic_t count

用来实现tasklet的disable和enable操作,count.counter=0表示当前的tasklet是enabled的,可以被调度执行,否则便是个disabled的tasklet,不可以被执行。

1
void (*func)(unsigned long)

该tasklet上的执行函数或者延迟函数,当该tasklet在SOFTIRQ部分被调度执行时,该函数指针指向的函数被调用,用来完成驱动程序中实际的延迟操作任务。

1
unsigned long data

func所指向的函数被调用时,data将作为参数传给func函数。驱动程序可以利用data向tasklet上的执行函数传递特定的参数。
驱动程序为了实现基于tasklet机制的延迟操作,首先需要声明一个tasklet对象。驱动程序可以用DECLARE_TASKLET宏声明并初始化一个静态的tasklet对象:

1
2
3
<include/linux/interrupt.h>
#define DECLARE_TASKLET(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }

相对于DECLARE_TASKLET宏,另一个相似的宏DECLARE_TASKLET_DISABLED则用来声明一个处于disabled状态的tasklet对象:

1
2
#define DECLARE_TASKLET_DISABLED(name, func, data) \
struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }

以上宏定义中,name是声明的tasklet对象的名称,func是驱动程序中用来实现延迟操作的函数,data是传递给func函数的参数。如果驱动程序在运行过程中构建了一个tasklet对象,这种情况下对tasklet对象的初始化可以通过调用函数tasklet_init来完成:

1
2
3
4
5
6
7
8
9
10
<kernel/softirq.c>
void tasklet_init(struct tasklet_struct *t,
void (*func)(unsigned long), unsigned long data)
{
t->next = NULL;
t->state = 0;
atomic_set(&t->count, 0);
t->func = func;
t->data = data;
}

声明了tasklet对象之后,驱动程序需要调用tasklet_schedule来向系统提交这个tasklet。这里所谓的提交,实际上就是将一个tasklet对象加入到tasklet_vec管理的链表中。对于HI_SOFTIRQ,提交tasklet对象的函数为tasklet_hi_schedule,除了用来管理tasklet对象链表的变量为tasklet_hi_vec外,其他方面完全一样。
鉴于这种代码层面的一致性,所以接下来把讨论的主角设定为tasklet_schedule。为了方便这里的讨论,这里对tasklet_schedule函数进行了轻微的改动。

1
2
3
4
5
6
7
8
9
10
11
12
13
<include/linux/interrupt.h>
static inline void tasklet_schedule(struct tasklet_struct *t)
{
unsigned long flags;
if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)){
local_irq_save(flags);
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_restore(flags);
}
}

函数中用到的tasklet_vec是个per-CPU型的变量,用来将系统中所有通过tasklet_schedule函数提交的tasklet对象构建成链表,如果是多处理器系统,那么每个处理器都将用各自的tasklet_vec链表管理提交到其上的tasklet。
tasklet_vec在Linux源码中具体的声明和类型如下:

1
2
3
4
5
6
7
<kernel/softirq.c>
struct tasklet_head
{
struct tasklet_struct *head;
struct tasklet_struct **tail;
};
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);

tasklet_vec变量的初始化最早发生在Linux系统初始化阶段调用的softirq_init函数中。
上一小节提到了该函数,在那里将tasklet_vec的成员head的地址赋给了tail,在tasklet_schedule函数中正是通过操作tail的方式将tasklet对象依次加入到了链表中。

函数首先检查要提交的tasklet的state上的TASKLET_STATE_SCHED位有没有置1,对一个尚未提交过的tasklet对象来说,其值应该是0,所以test_and_set_bit函数会返回0,同时把tasklet的state上的TASKLET_STATE_SCHED位置1表明这个tasklet已被提交。此后该tasklet对象的TASKLET_STATE_SCHED位一直为1直到被调度运行,因此一个tasklet对象在被成功提交进系统但尚未被调度执行时,处于TASKLET_STATE_SCHED状态。
此时即便是在多处理器系统中,运行在其他处理器上的tasklet_schedule函数也无法再次提交一个处于TASKLET_STATE_SCHED状态的tasklet对象,因此一个tasklet对象在同一时间只可能在一个处理器上运行,而不会同时有多个实例在不同的CPU上运行。

如果tasklet可以被提交,那么接下来的工作就是把它加入到当前处理器tasklet_vec管理的链表中,然后再通过raise_softirq_irqoff(TASKLET_SOFTIRQ)调用告诉SOFTIRQ部分当前处理器有个TASKLET_SOFTIRQ正等待处理。raise_softirq_irqoff用一个整型变量的位来表示该位上是否有待决的softirq等待处理,1表示有,0则是没有。

关于tasklet_vec建立链表的操作,因为接下来的讨论中还会经常见到,读者不妨通过图来建立个初步的印象:

图中,tasklet_vec的head成员总是指向所管理链表的第一个节点,tail总是保存链表最后一个节点next成员的地址。作为示例,图中的0xE2042608表示next所在的内存地址,这里笔者简单地用一根带箭头的线指向链表的最后一个节点。

这样如果要把一个新的节点t加入到链表尾部,只需如下操作即可:

1
2
3
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);

其实像这种单向链表的操作很简单,似乎用不着通过tasklet_vec这种很绕的方式来实现,但是Linux内核这样做有它的道理,后面在讨论tasklet_action的时候会看到这点。
下面是一个设备驱动程序在其中断处理例程demo_isr中通过tasklet_schedule实现的一个延迟操作示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//设备相关的指针
static struct demo_dev * p = …;
//延迟操作函数
void demo_delay_action(unsigned long data)
{
//通过data获得设备相关指针
static struct demo_dev * pdev = (static struct demo_dev *)data;
//延迟操作

}
//用DECLARE_TASKLET(name, func, data)定义一个tasklet对象demo_tasklet
DECLARE_TASKLET(demo_tasklet, demo_delay_action, (unsigned long)p);
//中断处理例程
irqreturn_t demo_isr(int irq,void*dev_id)
{

//通过tasklet_schedule实现延迟操作
tasklet_schedule(&demo_tasklet);
}

示例中的demo_delay_action函数将延迟到中断处理的SOFTIRQ部分才会被执行到。

tasklet_action

上一小节讨论了设备驱动程序通过tasklet_schedule向系统提交一个tasklet对象执行延迟操作的实现机制,本节将会看到中断处理的SOFTIRQ部分如何去调用这些延迟的操作函数。
在“tasklet机制初始化”小节,内核为TASKLET_SOFTIRQ和HI_SOFTIRQ分别安装了执行函数tasklet_action和tasklet_hi_action,鉴于这两个执行函数的实现机制完全一样,在此只对tasklet_action的实现机制进行分析。下面是这个函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<kernel/softirq.c>
static void tasklet_action(struct softirq_action *a)
{
struct tasklet_struct *list;
local_irq_disable();
list = __get_cpu_var(tasklet_vec).head;
__get_cpu_var(tasklet_vec).head = NULL;
__get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;
local_irq_enable();
while (list) {
struct tasklet_struct *t = list;
list = list->next;
if (tasklet_trylock(t)) {
if (!atomic_read(&t->count)) {
if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))
BUG();
t->func(t->data);
tasklet_unlock(t);
continue;
}
tasklet_unlock(t);
}
local_irq_disable();
t->next = NULL;
*__get_cpu_var(tasklet_vec).tail = t;
__get_cpu_var(tasklet_vec).tail = &(t->next);
__raise_softirq_irqoff(TASKLET_SOFTIRQ);
local_irq_enable();
}
}

函数的主体是个while循环,在进入while循环之前,需要得到tasklet链表的头指针,这需要访问per-CPU变量tasklet_vec,因为该变量用来管理tasklet链表,tasklet_vec.head指向tasklet链表的第一个节点。注意在访问tasklet_vec之前,函数用local_irq_disable关闭了处理器的中断,这是因为虽然tasklet_vec在系统的每个处理器中都有个副本,但是在单一CPU的范围里,依然存在SOFTIRQ在执行时被外部设备中断,在它的中断处理例程中使用到了tasklet的功能比如调用tasklet_schedule来提交一个tasklet对象,这样会导致两个执行路径都有操作tasklet_vec的可能性。所以此处用local_irq_disable和local_irq_enable来保护tasklet_vec不会在可能的并发访问中遭到破坏,其间的代码将tasklet_vec管理的链表的第一个节点存放在本地变量list中,然后将tasklet_vec设置成其最初的状态(空链表)。

在继续对while循环中代码的讨论之前,有两点需要注意。

首先,tasklet_action作为一个softirq执行函数,在多处理器系统中可能同时在不同的CPU上运行。
虽然一个处于TASKLET_STATE_SCHED状态的tasklet对象不能被多次提交,但是当一个tasklet对象被调度运行时,TASKLET_STATE_SCHED状态位会被清除,这样就可能导致该tasklet对象在别的处理器上被重新提交。
考虑一下如下的情形:在一个有A和B两个处理器的系统中,某设备对处理器A产生了一次中断,在它的中断处理例程中会调用tasklet_schedule函数向系统提交一个tasklet对象,假设处理器A已经进入本次中断处理的SOFTIRQ部分并且正在运行该tasklet,注意此时它的TASKLET_STATE_SCHED状态位已经被清除,此时该设备又产生了一次中断,这次的中断发送给了处理器B,处理器B在它的中断处理例程中同样会调用tasklet_schedule把同一个tasklet对象向处理器B上的tasklet_vec链表提交,因为该tasklet的TASKLET_STATE_SCHED状态位已经被清除,所以提交是可能成功的。
如此就可能出现同一tasklet对象的执行函数在不同的处理器上同时运行的情形,因此while循环需要某种机制来确保这种情况不会发生。我们不妨把这个问题称为SMP中tasklet运行冲突的问题,等下会在while循环具体的代码实现中看到内核对此给出的解决方案。

其次,在while循环中tasklet_action通过一个本地变量list来实现对tasklet链表的遍历。对于遍历过程中的每一个tasklet节点,如果不满足执行的条件,将通过操作tasklet_vec.tail指针将其重新加入tasklet_vec链表;如果它被成功执行了,那么该tasklet对象将不会再出现于tasklet_vec链表中。通过启用一个本地变量list,使得我们在调用tasklet上的执行函数时,无须再考虑list链表的互斥访问问题,因此读者可以看到tasklet上的执行函数在运行期间,中断是打开的,这也是SOFTIRQ当初的设计初衷。

如果考虑到在某个tasklet运行期间发生了中断,那么可能会有新的tasklet要被提交到当前处理器的tasklet_vec链表上,不过这不会影响到list所在链表,新的tasklet对象将会加入到tasklet_vec链表中。如此,在tasklet_action执行前后,tasklet_vec链表发生的变化是:一些新的tasklet对象可能被提交进来,只是因为还没有被运行过,所以新节点将处于TASKLET_STATE_SCHED状态,而被运行过的老节点其TASKLET_STATE_SCHED状态位将被清除,而且也不会再出现在当前处理器的tasklet_vec链表中。

现在来看看while循环实际的代码,tasklet_trylock在单处理器系统中直接返回1,在多处理器中,其定义是:

1
2
3
4
5
<include/linux/interrupt.h>
static inline int tasklet_trylock(struct tasklet_struct *t)
{
return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
}

函数将tasklet中state的TASKLET_STATE_RUN位置1,同时返回TASKLET_STATE_RUN位原来的值。因此,while循环中的if (tasklet_trylock(t))实际上就是用来解决前面提到的SMP中tasklet运行冲突的问题的。在SMP系统中一个运行中的tasklet(其TASKLET_STATE_RUN位被置1,TASKLET_STATE_SCHED位被清0)有可能被重新提交到另一个处理器的tasklet_vec链表中,为了防止该tasklet同时在不同的处理器上运行,内核在SMP系统中为tasklet对象增加了一个额外的状态位TASKLET_STATE_RUN,这个状态位只对SMP系统有效,单处理器系统不需要这个状态。

内核用tasklet对象的TASKLET_STATE_RUN位来标记对应的tasklet当前是否正在运行,如果没有,那么tasklet_trylock(t)将返回真,同时tasklet_trylock也会将state中的TASKLET_STATE_RUN位置1,这样别的CPU再运行tasklet_action时,将不会处理该tasklet直到其运行完毕清除掉TASKLET_STATE_RUN。

内核通过这种方式实现了tasklet的串行化:任一时刻tasklet只可能在一个CPU上运行。对于单处理器,不存在tasklet运行冲突的问题,所以tasklet_trylock直接返回1。

接下来通过atomic_read对tasklet的count成员进行测试,这个成员主要用来实现enable或者disable一个tasklet,如果某个tasklet对象的count为0,说明它处在enabled的状态。对于一个enabled的tasklet,需要再测试其state的TASKLET_STATE_SCHED位有没有被置1,提交tasklet的函数会设置该位,如果该位没有被设置,说明tasklet_action函数正试图调度一个没有被提交的tasklet,这是非正常状况。

如果一切顺利,当前tasklet上的函数被调用,意味着延迟的操作开始进行。
从代码中可以看到,如果一个tasklet被调度执行完之后,其state的TASKLET_STATE_SCHED位被清0,这意味着除非被再次提交,否则下次的SOFTIRQ部分将不会再调度到它,这是一种one-shot特性:提交一次,调度运行一次,运行完后就从CPU的tasklet_vec链表中消失,除非有代码再次提交该tasklet对象。

通过上面对tasklet_action的分析可以看出,一个提交的tasklet在被SOFTIRQ调度执行完后,将从当前处理器的tasklet_vec链表中消失,因此除非再次提交,否则该tasklet对象将不会有机会被再次运行。
同时内核对tasklet的实现机制确保了同一个tasklet对象不会同时在不同的处理器上运行,因此驱动程序在实现tasklet的延迟函数时,无须考虑多处理器间的并发问题。
另外tasklet运行在中断上下文环境中,因此在中断上下文中的种种限制同样适用于tasklet的延迟函数。这些都是tasklet这种机制最典型的特质。

tasklet的其他操作

前面已经讨论了tasklet的整个实现机制,下面在此基础上讲述tasklet一些其他的操作,包括如何disable和enable一个tasklet等。
tasklet_disabletasklet_disable_nosync这两个函数可以用来disable一个tasklet,使之无法被SOFTIRQ调度运行。

函数定义为:

1
2
3
4
5
6
7
8
9
10
11
12
<include/linux/interrupt.h>
static inline void tasklet_disable_nosync(struct tasklet_struct *t)
{
atomic_inc(&t->count);
smp_mb__after_atomic_inc();
}
static inline void tasklet_disable(struct tasklet_struct *t)
{
tasklet_disable_nosync(t);
tasklet_unlock_wait(t);
smp_mb();
}

disable本身的行为很简单,将要操作的tasklet对象t上的count加1就可以了。相对于tasklet_disable_nosync,tasklet_disable是个“同步”版本,它在调用tasklet_disable_nosync函数之后,会再调用tasklet_unlock_wait函数实现所谓“同步”功能,这里的术语“同步”只限于SMP系统,单处理器系统中,tasklet_unlock_wait什么也不做。
多处理器系统中,如果要disable的tasklet正在运行,那么tasklet_unlock_wait要一直忙等待到t的TASKLET_STATE_RUN状态位被清除,就是说tasklet_disable要等到t运行完毕才会返回,这意味着tasklet_disable返回之后,可以确保该tasklet不会在系统的任何地方运行。
一个处于disabled状态的tasklet可以被提交到tasklet_vec中,但是不会被调度执行。

tasklet_enable

tasklet_enable函数用来enable一个tasklet,其定义如下:

1
2
3
4
5
6
<include/linux/interrupt.h>
static inline void tasklet_enable(struct tasklet_struct *t)
{
smp_mb__before_atomic_dec();
atomic_dec(&t->count);
}

将指定的tasklet对象t上的count减1。一个tasklet对象要能被执行,count应该为0。
所以如果想enable一个先前被disable的tasklet,使之能被调度执行,tasklet_enable和tasklet_disable的调用次数要匹配。

tasklet_kill

1
2
<kernel/softirq.c>
void tasklet_kill(struct tasklet_struct *t);

该函数通过清除一个tasklet对象的TASKLET_STATE_SCHED状态位,使SOFTIRQ不再能够调度运行它。如果当前tasklet对象正在运行,那么tasklet_kill将忙等待直到tasklet运行结束,这样可以确保tasklet_kill返回后系统中不再有运行中的该tasklet对象。
如果一个tasklet对象被提交到了系统但还没有被调度执行,那么针对该tasklet对象调用tasklet_kill,后者将会睡眠直到该tasklet被执行完从tasklet_vec链表中移除,所以tasklet_kill是个可能会被阻塞的函数。

一般在设备驱动程序所在的内核模块要被移除或者是设备要被关闭时,才调用该函数,因为这种情况下虽然你可以删除tasklet对象所在的空间,但这不会影响到tasklet_vec已有的链表元素构成,所以一个可能的情况是,在你的驱动模块已经移出系统,SOFTIRQ还是调度运行了你的驱动程序提交的tasklet对象,这是一种危险情况,因为当你的模块已经从系统中移除之后,被调度运行的tasklet函数也许会使用到模块中的资源,但是现在它们已经不存在了。内核模块调用tasklet_kill可以确保不会发生这种情况。




工作队列work queue

工作队列是设备驱动程序可以使用的另一种延迟执行的方法。
为了实现这种延迟执行的机制,内核或者驱动程序需要建立一套完整的基础设施,这里的设计思想与现实中的工厂加工非常相像:基础设施就是一条加工厂的成产流水线和在流水线上工作的工人,平时没事的时候,流水线上的工人就休息。
如果某一客户想要加工一件工件,只需要把要加工的工件打个包(包里放有记载该工件应该如何加工的文档),扔到流水线上,然后客户可以继续做自己的事情。流水线上的工人发现有活要做,就结束休息,帮助客户加工工件。
这个我们所熟悉的场景对应到Linux内核代码的世界,流水线变成了worklist,工人变成了worker_thread,打成包的工件就是struct work_struct对象,把包扔到流水线的工作变成了queue_work函数的调用等等,所有这些我们都将在接下来的内容中看清它们的内部运作流程。

为了叙述上的方便,我们不妨就把这整个所谓的基础设施统称为工作队列。

内核本身提供了一套默认的工作队列,但是驱动程序自身也可以另起炉灶创建属于自己的基工作队列。
本节将先从驱动程序创建自己的工作队列谈起,讨论整个延迟处理的工作流程,然后再把讨论的范围延伸到内核自己创建的基础设施上去,最后对比工作队列与tasklet机制的区别以及各自的适用场景。

数据结构

在具体讨论创建工作队列的内核机制之前,先交代几个核心的数据结构,它们在后面的讨论中会频频出现。

1
2
3
4
5
6
<include/linux/workqueue.h>
struct work_struct {
atomic_long_t data;
struct list_head entry;
work_func_t func;
};
  • 驱动程序要通过工作队列实现延迟操作时,需要生成一个struct work_struct对象,本书称之为工作节点,然后通过queue_work函数将其提交给工作队列。
  • atomic_long_t data驱动程序可以利用data来将设备驱动程序使用的某些指针传递给延迟函数。
  • struct list_head entry双向链表对象,用来将提交的等待处理的工作节点形成链表。
  • work_func_t func工作节点的延迟函数,用来完成实际的延迟操作。其原型定义如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    typedef void (*work_func_t)(struct work_struct *work);
    <kernel/workqueue.c>
    struct cpu_workqueue_struct {
    spinlock_t lock;
    struct list_head worklist;
    wait_queue_head_t more_work;
    struct work_struct *current_work;
    struct workqueue_struct *wq;
    struct task_struct *thread;
    } ____cacheline_aligned;

    实际的代码中,struct cpu_workqueue_struct对象是个per-CPU型的变量,通过alloc_percpu函数动态创建,系统中的每个CPU都有一份,本书称struct cpu_workqueue_struct为CPU工作队列管理结构。

  • spinlock_t lock对象的自旋锁,用于对可能的并发访问该对象时提供互斥保护机制。
  • struct list_head worklist双向链表对象,用来将驱动程序提交的工作节点形成链表。驱动程序中的延迟操作以工作节点的形式存在。
  • wait_queue_head_t more_work等待队列头节点,工作队列的工人线程(worker_thread)没有工作节点需要处理时将进入睡眠状态,此时它需要进入该等待队列。
  • struct work_struct *current_work用于记录当前工人线程正在处理的工作节点。
  • struct workqueue_struct *wq指向系统工作队列管理结构,接下来有它的具体定义。
  • struct task_struct *thread指向工人线程所在的进程空间结构workqueue_struct。

    1
    2
    3
    4
    5
    6
    7
    8
    struct workqueue_struct {
    struct cpu_workqueue_struct *cpu_wq;
    struct list_head list;
    const char *name;
    int singlethread;
    int freezeable;
    int rt;
    };

    相对于上面的CPU工作队列管理结构,本书称struct workqueue_struct为工作队列管理结构,内核会为创建的每个工作队列生成一个工作队列管理结构对象。

  • struct cpu_workqueue_struct *cpu_wq指向CPU工作队列管理结构的per-CPU类型的指针。根据该指针,系统中的每个CPU都可以通过per_cpu_ptr来获得属于自己的CPU工作队列管理结构的对象。
  • struct list_head list双向链表对象,用于将工作队列管理结构加入到一个全局变量workqueues中,只对非singlethread工作队列有效。
  • const char *name工作队列的名称。
  • int singlethread标识创建的工作队列中工人线程的数量。
  • int freezeable表示进程可否处于冻结状态。
  • int rt用来调整worker_thread线程所在进程的调度策略。

create_singlethread_workqueue和create_workqueue

设备驱动程序通过这两个函数创建属于自己的基础设施,严格地说,其实它们是宏,不过这种文字上的小区别对理解整个流程的内核实现并没有什么特别的意义,所以不妨先展开来看看它们各自的定义:

1
2
3
4
<include/linux/workqueue.h>
#define create_workqueue(name) __create_workqueue_key ((name), 0, 0, 0,NULL, NULL)
#define create_singlethread_workqueue(name) \
__create_workqueue_key ((name), 1, 0, 0,NULL,NULL)

所以最终调用的函数是create_workqueue_key,这才是真正的核心函数,create_workqueue和create_singlethread_workqueue的区别在于调用`create_workqueue_key时的第二个参数,接下来讨论__create_workqueue_key函数的实现时再来看这个参数对驱动程序而言意味着什么。 内核源码中__create_workqueue_key`的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<kernel/workqueue.c>
struct workqueue_struct *__create_workqueue_key(const char *name,
int singlethread,
int freezeable,
int rt,
struct lock_class_key *key,
const char *lock_name)
{
struct workqueue_struct *wq;
struct cpu_workqueue_struct *cwq;
int err = 0, cpu;
wq = kzalloc(sizeof(*wq), GFP_KERNEL);
if (!wq)
return NULL;
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
if (!wq->cpu_wq) {
kfree(wq);
return NULL;
}
wq->name = name;
wq->singlethread = singlethread;
wq->freezeable = freezeable;
wq->rt = rt;
INIT_LIST_HEAD(&wq->list);
if (singlethread) {
cwq = init_cpu_workqueue(wq, singlethread_cpu);
err = create_workqueue_thread(cwq, singlethread_cpu);
start_workqueue_thread(cwq, -1);
} else {
cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_add(&wq->list, &workqueues);
spin_unlock(&workqueue_lock);
for_each_possible_cpu(cpu) {
cwq = init_cpu_workqueue(wq, cpu);
if (err || !cpu_online(cpu))
continue;
err = create_workqueue_thread(cwq, cpu);
start_workqueue_thread(cwq, cpu);
}
cpu_maps_update_done();
}
if (err) {
destroy_workqueue(wq);
wq = NULL;
}
return wq;
}

函数一开始便调用kzalloc生成了一个工作队列管理结构的对象wq并初始化,同时利用alloc_percpu函数生成了per-CPU类型的CPU工作队列管理结构对象:

1
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);

接下来函数根据参数singlethread的值对单线程队列和多线程队列分别进行处理。create_singlethread_workqueue函数生成的工作队列是单线程的,singlethread=1,对这种情况,函数需要做的是:

  • 调用init_cpu_workqueue函数,在该函数中获得系统中第一个CPU(代码中的称谓是singlethread_cpu)对应的CPU工作队列管理结构的指针cwq,同时初始化cwq中的等待队列和双向链表等成员变量。
  • 调用create_workqueue_thread函数生成工人线程(worker_thread)。Linux内核中所谓的内核线程其实是一个进程,拥有独立的task_struct结构,这里的工人线程也不例外。create_workqueue_thread函数实际的操作是生成一个新的进程,将该进程task_struct中保存有进程执行现场寄存器的pc值指向worker_thread函数(本书称worker_thread函数为工人线程的线程函数),这样当该进程被调度运行时将执行worker_thread函数,传给函数的参数是系统中第一个CPU上的cwq指针。新进程的task_struct结构体指针p将保存在CPU工作队列管理结构的thread成员中:cwq->thread = p。
  • 调用start_workqueue_thread函数,后者再通过wake_up_process函数将新进程投入到系统的运行队列中:wake_up_process(p),如此之后新进程就具备了被调度器调度运行的条件。

如果singlethread不为1,那么__create_workqueue_key将对系统中的每个CPU调用singlethread中的三大步骤,这样每个CPU都将拥有自己的CPU工作队列管理结构和工作在其上的工人线程。
这种情况下工作队列管理结构对象wq还将把自己加入到workqueues管理的链表中:

1
list_add(&wq->list, &workqueues);

workqueues是一个全局型的双向链表对象,用来链接系统中所有非singlethread的工作队列:

1
2
<kernel/workqueue.c>
static LIST_HEAD(workqueues);

图中内核部分描述了通过create_singlethread_workqueue或者create_workqueue创建的工作队列及其上的工人线程worker_thread,后者的任务是操作worklist链表上的工作节点,如果worklist上面没有工作节点,那么worker_thread所在的进程将进入睡眠状态并驻留在more_work维护的等待队列中。

驱动程序部分将要延迟的操作打包进struct work_struct类型的工作节点中,然后通过queue_work向worklist上提交该工作节点,最后唤醒worker_thread线程。
图描述的是singlethread工作队列,对于非singlethread工作队列,上面的工作原理依然适用,只是此时系统中的每个CPU都拥有自己的工作队列和工人线程worker_thread。至于驱动程序提交节点时向哪个工作队列提交,在queue_work部分再讨论。

工人线程worker_thread

工人线程worker_thread用来处理驱动程序提交到工作队列中的工作节点,如果工作队列中没有节点需要处理,那么它将睡眠在cwq->more_work表示的等待队列中。
worker_thread运行在一个独立的新进程空间中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<kernel/workqueue.c>
static int worker_thread(void *__cwq){
struct cpu_workqueue_struct *cwq = __cwq;
DEFINE_WAIT(wait);
if (cwq->wq->freezeable)
set_freezable();
for (;;) {
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE);
if (!freezing(current) &&
!kthread_should_stop() &&
list_empty(&cwq->worklist))
schedule();
finish_wait(&cwq->more_work, &wait);
try_to_freeze();
if (kthread_should_stop())
break;
run_workqueue(cwq);
}
return 0;
}

worker_thread的主体是一for(;;)循环,它首先用kthread_should_stop检测有没有别的函数对它调用了kthread_stop,如果有的话代表该线程的kthread对象的should_stop成员将被置1,此时worker_thread将通过break跳出循环,线程函数所在的进程将会终结。
如果worker_thread不需要stop而且cwq->worklist上也没有工作节点等待处理,工人线程将调用schedule以TASK_INTERRUPTIBLE状态睡眠在等待队列cwq->more_work中,直到驱动程序向cwq->worklist上提交了一个新的节点并唤醒worker_thread,它醒来之后将调用run_workqueue来处理cwq->worklist上的工作节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<kernel/workqueue.c>
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
spin_lock_irq(&cwq->lock);
while (!list_empty(&cwq->worklist)) {
struct work_struct *work = list_entry(cwq->worklist.next,
struct work_struct, entry);
work_func_t f = work->func;
cwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irq(&cwq->lock);
work_clear_pending(work);
f(work);
spin_lock_irq(&cwq->lock);
cwq->current_work = NULL;
}
spin_unlock_irq(&cwq->lock);
}

函数在while循环中遍历cwq->worklist链表,对于其中的每个工作节点work,先将其从cwq->worklist链表删除,然后调用工作节点上的延迟函数f(work),传递给函数的参数是延迟函数所在工作节点的指针work。从run_workqueue的代码可以看出,一个工作节点被处理完之后,将不会再出现在工作队列的cwq->worklist链表中,除非被再次提交。
函数中的work_clear_pending用来清除work->data的WORK_STRUCT_PENDING位(位0),这里内核把work->data的低2位用于记录work的状态信息,当驱动程序调用queue_work向工作队列提交节点work时,queue_work会把work->data的WORK_STRUCT_PENDING位置1,这是为了防止驱动程序将一个尚未被处理的工作节点再次向cwq->worklist上提交。

destroy_workqueue

destroy_workqueue执行与create_singlethread_workqueue/create_workqueue相反的任务,当驱动程序不再需要使用后者创建的工作队列时(比如驱动程序所在的模块要从系统中移走或者关闭设备等),需要调用destroy_workqueue来做工作队列的清理善后工作,比如释放create_workqueue分配使用的一些系统资源如内存等,还有worker_thread线程也应该被安全地终结。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<kernel/workqueue.c>
void destroy_workqueue(struct workqueue_struct *wq)
{
const struct cpumask *cpu_map = wq_cpu_map(wq);
int cpu;
cpu_maps_update_begin();
spin_lock(&workqueue_lock);
list_del(&wq->list);
spin_unlock(&workqueue_lock);
for_each_cpu(cpu, cpu_map)
cleanup_workqueue_thread(per_cpu_ptr(wq->cpu_wq, cpu));
cpu_maps_update_done();
free_percpu(wq->cpu_wq);
kfree(wq);
}

除了将wq从workqueues中移走及释放工作队列管理结构等对象所占用的内存外,主要的工作是调用cleanup_workqueue_thread来完全地终结worker_thread,因为destroy_workqueue被调用的时候,worker_thread很有可能正在处理worklist中余下的工作节点,因此函数要小心处理,避免发生不必要的麻烦。
这里将cleanup_workqueue_thread稍作改写以突出其主线:

1
2
3
4
5
6
7
8
9
<kernel/workqueue.c>
static void cleanup_workqueue_thread(struct cpu_workqueue_struct *cwq)
{
if (cwq->thread == NULL)
return;
flush_cpu_workqueue(cwq);
kthread_stop(cwq->thread);
cwq->thread = NULL;
}

函数的主要作用是通过调用kthread_stop函数来让worker_thread所在的进程终止,因为一旦进程的执行函数worker_thread结束,进程就将调用do_exit而终结,所以kthread_stop让worker_thread结束的原理就是设置should_stop=1,前面在讨论worker_thread时已看到过should_stop的这一用法。
但是终止worker_thread所在进程的一个前提条件是要确保所有提交到cwq->worklist中的工作节点都已处理完毕,这是由flush_cpu_workqueue函数完成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<kernel/workqueue.c>
static int flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
int active = 0;
struct wq_barrier barr;
WARN_ON(cwq->thread == current);
spin_lock_irq(&cwq->lock);
if (!list_empty(&cwq->worklist) || cwq->current_work != NULL) {
insert_wq_barrier(cwq, &barr, &cwq->worklist);
active = 1;
}
spin_unlock_irq(&cwq->lock);
if (active) {
wait_for_completion(&barr.done);
destroy_work_on_stack(&barr.work);
}
return active;
}

flush_cpu_workqueue确保cwq->worklist上所有工作节点都已处理完毕的设计思想是利用完成接口completion:如果cwq->worklist不为空或者cwq->current_work不为空,说明cwq_worklist上还有工作节点或者worker_thread正在处理一个工作节点,则向cwq->worklist上提交一个新的工作节点,这里不妨称之为中止节点。
当中止节点上的延迟函数被执行时,它将调用complete函数通知flush_cpu_workqueue,而后者在提交完中止节点之后将睡眠等待在wait_for_completion函数上,直到之前提交的中止节点上的延迟函数执行结束,如此可确保所有中止节点前的工作节点都会被处理完毕。

从函数的实现代码可以看到,虽然在insert_wq_barrier函数提交了中止节点之后,其他部分的代码依然可以向cwq->worklist提交新的工作节点,但是内核无法保证这些工作节点上的延迟函数有机会执行。函数中的WARN_ON(cwq->thread == current)意味着驱动程序不应该在提交的工作节点延迟函数中调用flush_cpu_workqueue。

flush_cpu_workqueue的操作范围只限于单个CPU。对于非singlethread工作队列,因为每个CPU上都有一个工作队列和worker_thread,要确保系统中所有CPU上的工作队列中的工作节点都被处理完,应该使用flush_workqueue函数:

1
2
3
4
5
6
7
<kernel/workqueue.c>
void flush_workqueue(struct workqueue_struct *wq)
{

for_each_cpu(cpu, cpu_map)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
}

不管是flush_cpu_workqueue还是flush_workqueue,都是对工作队列worklist上所有的工作节点进行操作:函数返回后,可以确保函数调用前提交的所有工作节点都已处理完毕。
与之类似的还有另外一个函数flush_work,如果驱动程序想等待在某单个提交的工作节点上直到该节点处理完毕函数才返回,就可以使用flush_work函数,其原型如下:

1
int flush_work(struct work_struct *work);

参数work就是调用者要等待在其上的工作节点,如果该函数调用时work已处理完毕,那么函数返回0。

提交工作节点queue_work

前面几节描述了工作队列在内核内部的实现机制,本节开始讨论驱动程序如何通过向工作队列提交节点的方式来执行延迟操作。
设备驱动程序将要延迟的操作打包进一个struct work_struct对象,也就是所谓的工作节点,然后通过queue_work函数来向工作队列提交该节点。

1
2
3
4
5
6
7
8
<kernel/workqueue.c>
int queue_work(struct workqueue_struct *wq, struct work_struct *work)
{
int ret;
ret = queue_work_on(get_cpu(), wq, work);
put_cpu();
return ret;
}

在之前的讨论中,驱动程序可以调用create_singlethread_workqueue和create_workqueue函数来让内核生成属于自己的工作队列,两者的区别是:create_singlethread_workqueue只在系统中的第一个CPU(singlethread_cpu)上创建工作队列和工人线程,而create_workqueue函数会在系统中的每个CPU上都创建工作队列和工人线程。
在用queue_work向工作队列提交工作节点时,如果工作队列是singlethread类型的,因为此时只有一个worklist,所以queue_work没得选择,工作节点只能提交到这唯一的一个worklist上。
反之如果队列不是singlethread类型的,那么工作节点将会提交到当前运行queue_work的CPU所在的worklist中。

1
2
3
4
5
6
7
8
9
10
11
<kernel/workqueue.c>
int queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
int ret = 0;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
BUG_ON(!list_empty(&work->entry));
__queue_work(wq_per_cpu(wq, cpu), work);
ret = 1;
}
return ret;
}

函数首先检测work->data的WORK_STRUCT_PENDING位有没有被置1,置1的话意味着此前该work已被提交还没有处理,内核禁止驱动程序在一个工作节点还没处理完就再次提交该节点。
此处的检测也告诉驱动程序,在构造工作节点对象work时,应该确保work->data低2位为0。如果work->data的WORK_STRUCT_PENDING位是0,那么就把该位置1表明工作节点处于等待处理的状态,然后调用__queue_work来提交节点。

__queue_work的原型为:

1
static void __queue_work(struct cpu_workqueue_struct *cwq, struct work_struct *work);

第一个参数是CPU工作队列管理结构,第二个参数是待提交的节点指针。queue_work_on在调用__queue_work时传递的第一个参数是wq_per_cpu(wq, cpu),为了搞清向哪个cwq提交节点,不妨看看wq_per_cpu的实现:

1
2
3
4
5
6
7
<kernel/workqueue.c>
static struct cpu_workqueue_struct *wq_per_cpu(struct workqueue_struct *wq, int cpu)
{
if (unlikely(is_wq_single_threaded(wq)))
cpu = singlethread_cpu;
return per_cpu_ptr(wq->cpu_wq, cpu);
}

函数的实现很简单,如果是singlethread类型的工作队列,那么工作节点就提交到第一个CPU的cwq上,否则哪个CPU调用queue_work,工作节点就提交到哪个CPU的cwq上。

下面继续看__queue_work,其内部通过调用insert_work(cwq, work, &cwq->worklist)来完成节点的提交。insert_work的定义如下:

1
2
3
4
5
6
7
8
9
<kernel/workqueue.c>
static void insert_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work, struct list_head *head)
{
set_wq_data(work, cwq);
smp_wmb();
list_add_tail(&work->entry, head);
wake_up(&cwq->more_work);
}

函数的主体和我们的预期完全一样:将工作节点加到cwq->worklist链表的尾部,然后调用wake_up唤醒在等待队列cwq->more_work上睡眠的worker_thread,如果worker_thread正在运行,那么wake_up就什么也不做。
在把驱动程序向工作队列提交节点的queue_work函数搞清楚之后,再回过头来看看实际的驱动程序代码中如何动态初始化一个工作队列节点work_struct的对象。内核为此提供了两个宏PREPARE_WORK和INIT_WORK,展开后如下:

1
2
3
4
5
6
7
8
9
10
11
<include/linux/workqueue.h>
#define PREPARE_WORK(_work,_func) \
do{ \
(_work)->func=(_func); \
} while (0)
#define INIT_WORK(_work,_func) \
do{ \
(_work)->data = (atomic_long_t) ATOMIC_LONG_INIT(0); \
INIT_LIST_HEAD(&(_work)->entry); \
PREPARE_WORK((_work),(_func)); \
} while (0)

INIT_WORK初始化struct work_struct中的每个成员,而PREPARE_WORK只是重新设置struct work_struct中的func指针。在实际的驱动程序中,使用INIT_WORK的机会要比PREPARE_WORK大得多。

除了这种动态初始化,内核还提供了另外一个宏DECLARE_WORK,可以让驱动程序静态定义一个struct work_struct对象同时初始化:

1
2
3
4
5
6
7
<include/linux/workqueue.h>
#define DECLARE_WORK(n,f) struct work_struct n={ \
.data=WORK_DATA_STATIC_INIT(), \
.entry={&(n).entry,&(n).entry}, \
.func=(f), \
__WORK_INIT_LOCKDEP_MAP(#n,&(n)) \
}

除queue_work之外,内核还提供了另外一个提交节点的函数queue_delayed_work:

1
2
3
4
5
6
7
8
<kernel/workqueue.c>
int queue_delayed_work(struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
if (delay == 0)
return queue_work(wq, &dwork->work);
return queue_delayed_work_on(-1, wq, dwork, delay);
}

相对于queue_work,queue_delayed_work多了个参数delay,不过在delay=0的情况下,对queue_delayed_work就蜕变成了queue_work,上面的代码很明显地展示了这一点。
如果delay不等于0,那么它代表一个延迟的时间,换句话说调用queue_delayed_work之后,工作节点work需要等到delay指定的时间过后才会被真正提交到队列wq上。这种延迟提交的工作在queue_delayed_work_on函数中完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<kernel/workqueue.c>
int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *dwork, unsigned long delay)
{
int ret = 0;
struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
timer_stats_timer_set_start_info(&dwork->timer);
set_wq_data(work, wq_per_cpu(wq, raw_smp_processor_id()));
timer->expires = jiffies + delay;
timer->data = (unsigned long)dwork;
timer->function = delayed_work_timer_fn;
if (unlikely(cpu >= 0))
add_timer_on(timer, cpu);
else
add_timer(timer);
ret = 1;
}
return ret;
}

函数的设计思想比较直白,利用定时器timer来实现延迟提交的工作,timer->expires = jiffies+delay,这样当delay时间到期后,timer->function = delayed_work_timer_fn将被调用,delayed_work_timer_fn会把queue_delayed_work_on要提交的节点提交到工作队列中。
所以驱动程序如果要使用queue_delayed_work,要先生成一个struct delayed_work对象。struct delayed_work定义为:

1
2
3
4
5
<include/linux/workqueue.h>
struct delayed_work {
struct work_struct work;
struct timer_list timer;
};

延迟函数所在的工作节点在struct delayed_work结构体的work成员中,其另一个成员是个timer对象,用来实现时间上的延迟操作,queue_delayed_work中的dalay参数将用来给timer中的延时成员赋值。
至此我们已经完整地讨论了工作队列整个框架的实现机制,包括内核部分如何建立队列和工人线程,以及驱动程序部分如何利用内核提供的接口向工作队列中提交工作节点实现延迟的操作。
为了加深读者对这部分内容的理解,下面用一个使用了工作队列执行延迟操作的代码片段作为具体的范例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//定义全局性的struct workqueue_struct指针demo_dev_wq
static struct workqueue_struct*demo_dev_wq;
//设备特定的数据结构,实际使用中大部分struct work_struct结构都内嵌在这个数据结构中struct demo_device{

struct work_struct work;

}
static struct demo_device *demo_dev;
//定义延迟的操作函数
void demo_work_func(struct work_struct*work)
{

}
//驱动程序模块初始化代码调用create_singlethread_workqueue创建工作队列
static int__init demo_dev_init(void)
{

demo_dev = kzalloc(sizeof * demo_dev, GFP_KERNEL);
demo_dev_wq = create_singlethread_workqueue("demo_dev_workqueue");
INIT_WORK(&demo_dev->work, demo_work_func);

}
//模块退出函数
static void demo_dev_exit(void)
{

flush_workqueue(demo_dev_wq);
destroy_workqueue(demo_dev_wq );

}
//中断处理函数
irqreturn_t demo_isr(int irq,void*dev_id)
{

queue_work(demo_dev_wq, &demo_dev-> work);

}

内核创建的工作队列

Linux系统在初始化阶段的init_workqueues函数中通过调用create_workqueue创建了一个名为events的工作队列

1
2
3
4
5
6
7
8
<kernel/workqueue.c>
static struct workqueue_struct *keventd_wq __read_mostly;
void __init init_workqueues(void)
{

keventd_wq = create_workqueue("events");

}

前面已经仔细讨论了create_workqueue函数,内核在初始化阶段创建的工作队列与驱动程序自己调用create_workqueue函数创建的在本质上没有任何不同。
设备驱动程序就算不创建自己的工作队列,也可以利用内核创建的工作队列来实现延迟操作,在提交工作节点时只需要调用queue_work(keventd_wq, work)即可。
不过内核已经用另一个函数schedule_work包装了queue_work(keventd_wq, work)调用

1
2
3
4
5
<kernel/workqueue.c>
int schedule_work(struct work_struct *work)
{
return queue_work(keventd_wq, work);
}

可以看到,驱动程序如果使用内核创建的工作队列,在提交工作节点时只需调用schedule_work函数就可以了。对应queue_delayed_work,对于内核创建的工作队列而言,延迟提交函数就变成了schedule_delayed_work。

使用内核提供的工作队列的好处是,驱动程序无须创建自己的工作队列就可以提交节点来实现延迟操作。但是不好的地方也很明显:我们正在与系统中其他模块共享一个工作队列以及该队列上的worker_thread,所以队列上的工作节点的多少我们无法预期,意味着我们无法确定在提交一个工作节点之后,需要多长时间才有机会被调度执行。同时,我们也应该注意避免在提交的延迟函数中执行很耗时的任务影响到他人。所以对使用内核创建的工作队列总结起来就是:虽然少了创建队列销毁队列这些麻烦事,但是使用起来的灵活性就下降了。这里没有一成不变的规则,设备驱动程序需要根据实际情况做出适合自己的选择。

小结

深入讨论了设备驱动程序中经常使用的两种延迟操作的实现机制,分别是tasklet和workqueue。
tasklet的实现基于softirq机制,内核在初始化期间就初始化了HI_SOFTIRQ和TASKLET_SOFTIRQ两个softirq所对应的action函数:tasklet_hi_action和tasklet_action。

驱动程序在可以使用tasklet机制实现延迟操作前,需要定义一个tasklet对象,将要延迟的函数封装到该对象中,之后需要调用tasklet_schedule函数向系统提交该对象。
中断处理的SOFTIRQ部分发现有等待的softirq需要处理时,就去处理被驱动程序提交的tasklet对象,在那里被打包进tasklet对象的驱动程序中的实际延迟函数将被调用。

当一个tasklet对象在SOFTIRQ部分处理完之后,除非再次提交,否则将不再会被执行。
由此可见,通过tasklet实现的延迟操作,是运行在中断处理的上下文环境中,因此它不应该引入睡眠。tasklet是严格串行化的:在任一时刻,同一tasklet只能有一个实例在运行,即使是多处理器系统也是如此。
tasklet的另一个特性是:哪个处理器调用tasklet_schedule提交的tasklet,只能在该处理器上运行。

相对于tasklet,工作队列的延迟函数是在一个独立的进程环境下运行的。
系统中可能有两种形式的工作队列:singlethread的和非singlethread的。对于多处理器系统而言,前者只在系统中的第一个CPU上产生工作队列和工人线程,而后者则为系统中的每个处理器都产生一个工作队列和工人线程。
内核在初始化阶段创建了一个非singlethread的工作队列,驱动程序可以使用该队列,也可以调用create_workqueue或者create_singlethread_workqueue创建属于自己的工作队列。

为了实现延迟操作,驱动程序需要生成一个类型为struct work_struct的工作队列对象,将要延迟执行的函数打包到该对象中,然后通过queue_work函数向工作队列提交该节点。
同tasklet对象一样,当work对象被处理完毕后除非被再次提交,否则将不再有执行的机会。与tasklet不同的是,基于工作队列的延迟操作是运行在进程的上下文中,所以允许睡眠。