一、 什么是eventfd

eventfd是一种linux上的线程通信方式,和信号量等其他线程通信不同的是eventfd可以用于进程间的通信,还可以用于内核发信号给用户态的进程,eventfd是linux上的系统调用,本质上是用于事件通知。

二、 eventfd特点

eventfd被设计为一个简单轻量级的文件描述符,支持在用户态作为事件进行wait和dispatch和在内核中作为事件进行dispatch,并且eventfd还可以在所有仅仅需要事件通知场景中替换pipe,并且eventfd相比于pipe在内核中开销更低,不需要消耗两个fd,当在kernel中,可以提供对fd-bridge的启用,类似 KAIO or syslets/threadlets这些功能一样通知一个fd完成某些操作,eventfd还支持epoll/poll/select的方式进行事件通知

三、 eventfd原理分析

用户态进程通过eventfd()创建一个内核对象,这个内核对象包含一个由内核保持的无符号64位整型计数器。这个计数器由参数initval说明的值来初始化。用户态进程可以通过write/read去读写这个eventfd()返回的文件描述符,然后使用poll或select检测文件描述符的变化,继而达到事件的通知/响应机制。

1. int eventfd(unsigned int initval, int flags);

这个API的作用是用于产生一个文件描述符(致敬:Linux下一切皆文件),可以对这个文件描述符进行read、write、poll、select等操作,接下来看下这个api的内核实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)
{
int fd, error;
struct file *file;
error = get_unused_fd_flags(flags & EFD_SHARED_FCNTL_FLAGS);
if (error < 0)
return error;
fd = error;

file = eventfd_file_create(count, flags);
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto err_put_unused_fd;
}
fd_install(fd, file);

return fd;

err_put_unused_fd:
put_unused_fd(fd);

return error;
}

SYSCALL_DEFINE1(eventfd, unsigned int, count)
{
return sys_eventfd2(count, 0);
}

可以看到代码本身就是创建一个文件描述符,这里说明下eventfd_file_create这个API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct file *eventfd_file_create(unsigned int count, int flags)
{
struct file *file;
struct eventfd_ctx *ctx;

/* Check the EFD_* constants for consistency. */
BUILD_BUG_ON(EFD_CLOEXEC != O_CLOEXEC);
BUILD_BUG_ON(EFD_NONBLOCK != O_NONBLOCK);

if (flags & ~EFD_FLAGS_SET)
return ERR_PTR(-EINVAL);

ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
if (!ctx)
return ERR_PTR(-ENOMEM);

kref_init(&ctx->kref);
init_waitqueue_head(&ctx->wqh);
ctx->count = count;
ctx->flags = flags;

file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx,
O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS));
if (IS_ERR(file))
eventfd_free_ctx(ctx);

return file;
}

这里创建了eventfd_ctx结构,这个结构就是用来存储可以被read/write的计数器count以及后面会用到的flags,同时ctx里还有个wqh变量,这个就是等待队列,等待队列的作用就是当进程需要阻塞的时候挂在对应evnetfd的等待队列,然后调用anon_inode_getfile创建一个file结构(anon_inode_getfile的实现这里暂不多讲),同时把eventfd_ctx作为file结构的private_data,同时关联eventfd自身的文件操作结构体eventfd_fops。

2. static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count, loff_t *ppos)

这个API用于读取count值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 cnt;

if (count < sizeof(cnt))
return -EINVAL;
res = eventfd_ctx_read(ctx, file->f_flags & O_NONBLOCK, &cnt);
if (res < 0)
return res;

return put_user(cnt, (__u64 __user *) buf) ? -EFAULT : sizeof(cnt);
}
ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt)
{
ssize_t res;
DECLARE_WAITQUEUE(wait, current);

spin_lock_irq(&ctx->wqh.lock);
*cnt = 0;
res = -EAGAIN;
if (ctx->count > 0)
res = 0;
else if (!no_wait) {
__add_wait_queue(&ctx->wqh, &wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (ctx->count > 0) {
res = 0;
break;
}
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
schedule();
spin_lock_irq(&ctx->wqh.lock);
}
__remove_wait_queue(&ctx->wqh, &wait);
__set_current_state(TASK_RUNNING);
}
if (likely(res == 0)) {
eventfd_ctx_do_read(ctx, cnt);
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLOUT);
}
spin_unlock_irq(&ctx->wqh.lock);

return res;
}
static void eventfd_ctx_do_read(struct eventfd_ctx *ctx, __u64 *cnt)
{
*cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count;
ctx->count -= *cnt;
}

在eventfd_read里先判断请求buf的size是否满足条件,这里count是64位即8个字节,所以最小读取8个字节。然后调用eventfd_ctx_read获取eventfd_ctx中的count计数,并清零,如果读取有问题则返回,否则把值写入到用户空间。

这里着重说一下eventfd_ctx_read:首先调用DECLARE_WAITQUEUE(wait, current)初始化一个wait_queue_t对象wait,之后对eventfd_ctx加锁(由于可能存在并发读取)。往下走会判断count是否大于0,如果大于0则res=0,否则返回res(默认值为-EAGAIN),接下来看看传递进来的参数标志,如果设置了O_NONBLOCK,则就不需要等待,直接返回res(这里返回的res是小于0的情况)。如果没有指定O_NONBLOCK标志,此时由于读取不到count值(count值为0),就阻塞。然后调用__add_wait_queue把把自己加入到等待队列中,该队列会在进程等待的条件满足时唤醒它。加入到队列后进入一个死循环,设置当前进程状态为TASK_INTERRUPTIBLE,并不断检查count值,如果count大于0了,意味着有信号了,就设置res=0,然后break,然后把进程从等待队列去掉,然后设置状态TASK_RUNNING。如果count值为0,则检查是否有挂起的信号,如果有信号,同样需要先对信号进行处理,不过这次读应该就是返回失败了。之后调用调度器进行调度。如果发生了write的动作,count值会大于0,这是就会置res=0并break出去,然后会对count值进行读取。具体读取通过eventfd_ctx_do_read函数,该函数很简单, 先判断有没有指定EFD_SEMAPHORE标志,如果指定了则一次只返回1,,没有就返回count值。然后对count做减法,如果没有指定EFD_SEMAPHORE标志,实际上减去之后就为0了,有的话就减去1。之前如果有在该eventfd上阻塞的write进程,现在就可以唤醒了,所以这里检查了下,如果等待队列还有进程,则调用wake_up_locked_poll对对应的进程进行唤醒。

3.static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count,
loff_t *ppos)
{
struct eventfd_ctx *ctx = file->private_data;
ssize_t res;
__u64 ucnt;
DECLARE_WAITQUEUE(wait, current);

if (count < sizeof(ucnt))
return -EINVAL;
if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
return -EFAULT;
if (ucnt == ULLONG_MAX)
return -EINVAL;
spin_lock_irq(&ctx->wqh.lock);
res = -EAGAIN;
if (ULLONG_MAX - ctx->count > ucnt)
res = sizeof(ucnt);
else if (!(file->f_flags & O_NONBLOCK)) {
__add_wait_queue(&ctx->wqh, &wait);
for (res = 0;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (ULLONG_MAX - ctx->count > ucnt) {
res = sizeof(ucnt);
break;
}
if (signal_pending(current)) {
res = -ERESTARTSYS;
break;
}
spin_unlock_irq(&ctx->wqh.lock);
schedule();
spin_lock_irq(&ctx->wqh.lock);
}
__remove_wait_queue(&ctx->wqh, &wait);
__set_current_state(TASK_RUNNING);
}
if (likely(res > 0)) {
ctx->count += ucnt;
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLIN);
}
spin_unlock_irq(&ctx->wqh.lock);

return res;
}

这个API和上面的eventfd_ctx_read大致类型,先创建wait,然后判断count是不是小于8位 ,之后copy用户态传过来的值到ucnt,并判断ucnt是否等于ULLONG_MAX, 初始化res为-EAGAIN,再往下判断ucnt+ctx->count是否小于ULLONG_MAX,小于的话设置res=sizeof(ucnt),并且把ucnt加到ctx->count上,然后唤醒read进程,并返回,如果ULLONG_MAX小于ucnt+ctx->count,则加入等待队列,设置当前进程状态为TASK_INTERRUPTIBLE,并不断检查ucnt+ctx->count是否小于ULLONG_MAX,如果ucnt+ctx->count小于ULLONG_MAX(即eventfd_read被调用了),则设置res=sizeof(ucnt)并break出去,并且把ucnt加到ctx->count上,然后唤醒等待队列的进程,如果等待期间有信号进来,同样需要先对信号进行处理。

4.__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n)
{
unsigned long flags;

spin_lock_irqsave(&ctx->wqh.lock, flags);
if (ULLONG_MAX - ctx->count < n)
n = ULLONG_MAX - ctx->count;
ctx->count += n;
if (waitqueue_active(&ctx->wqh))
wake_up_locked_poll(&ctx->wqh, POLLIN);
spin_unlock_irqrestore(&ctx->wqh.lock, flags);

return n;
}

该函数一般在内核中经常用到(例如vhost),用户内核通知用户态,和eventf_write函数类似,只不过没有阻塞,这里有点不同的是如果传进来的n+ctx->count大于ULLONG_MAX,则置n=ULLONG_MAX-ctx->count,之后设置ctx->count+=n,之后就是唤醒等待队列的进程了