一、 什么是eventfd eventfd是一种linux上的线程通信方式,和信号量等其他线程通信不同的是eventfd可以用于进程间的通信,还可以用于内核发信号给用户态的进程,eventfd是linux上的系统调用,本质上是用于事件通知。
二、 eventfd特点 eventfd被设计为一个简单轻量级的文件描述符,支持在用户态作为事件进行wait和dispatch和在内核中作为事件进行dispatch,并且eventfd还可以在所有仅仅需要事件通知场景中替换pipe,并且eventfd相比于pipe在内核中开销更低,不需要消耗两个fd,当在kernel中,可以提供对fd-bridge的启用,类似 KAIO or syslets/threadlets这些功能一样通知一个fd完成某些操作,eventfd还支持epoll/poll/select的方式进行事件通知
三、 eventfd原理分析 用户态进程通过eventfd()创建一个内核对象,这个内核对象包含一个由内核保持的无符号64位整型计数器。这个计数器由参数initval说明的值来初始化。用户态进程可以通过write/read去读写这个eventfd()返回的文件描述符,然后使用poll或select检测文件描述符的变化,继而达到事件的通知/响应机制。
1. int eventfd(unsigned int initval, int flags); 这个API的作用是用于产生一个文件描述符(致敬:Linux下一切皆文件),可以对这个文件描述符进行read、write、poll、select等操作,接下来看下这个api的内核实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 SYSCALL_DEFINE2(eventfd2, unsigned int , count, int , flags) { int fd, error; struct file *file ; error = get_unused_fd_flags(flags & EFD_SHARED_FCNTL_FLAGS); if (error < 0 ) return error; fd = error; file = eventfd_file_create(count, flags); if (IS_ERR(file)) { error = PTR_ERR(file); goto err_put_unused_fd; } fd_install(fd, file); return fd; err_put_unused_fd: put_unused_fd(fd); return error; } SYSCALL_DEFINE1(eventfd, unsigned int , count) { return sys_eventfd2(count, 0 ); }
可以看到代码本身就是创建一个文件描述符,这里说明下eventfd_file_create这个API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 struct file *eventfd_file_create (unsigned int count, int flags) { struct file *file ; struct eventfd_ctx *ctx ; BUILD_BUG_ON(EFD_CLOEXEC != O_CLOEXEC); BUILD_BUG_ON(EFD_NONBLOCK != O_NONBLOCK); if (flags & ~EFD_FLAGS_SET) return ERR_PTR(-EINVAL); ctx = kmalloc(sizeof (*ctx), GFP_KERNEL); if (!ctx) return ERR_PTR(-ENOMEM); kref_init(&ctx->kref); init_waitqueue_head(&ctx->wqh); ctx->count = count; ctx->flags = flags; file = anon_inode_getfile("[eventfd]" , &eventfd_fops, ctx, O_RDWR | (flags & EFD_SHARED_FCNTL_FLAGS)); if (IS_ERR(file)) eventfd_free_ctx(ctx); return file; }
这里创建了eventfd_ctx结构,这个结构就是用来存储可以被read/write的计数器count以及后面会用到的flags,同时ctx里还有个wqh变量,这个就是等待队列,等待队列的作用就是当进程需要阻塞的时候挂在对应evnetfd的等待队列,然后调用anon_inode_getfile创建一个file结构(anon_inode_getfile的实现这里暂不多讲),同时把eventfd_ctx作为file结构的private_data,同时关联eventfd自身的文件操作结构体eventfd_fops。
2. static ssize_t eventfd_read(struct file *file, char __user *buf, size_t count, loff_t *ppos) 这个API用于读取count值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 static ssize_t eventfd_read (struct file *file, char __user *buf, size_t count, loff_t *ppos) { struct eventfd_ctx *ctx = file ->private_data ; ssize_t res; __u64 cnt; if (count < sizeof (cnt)) return -EINVAL; res = eventfd_ctx_read(ctx, file->f_flags & O_NONBLOCK, &cnt); if (res < 0 ) return res; return put_user(cnt, (__u64 __user *) buf) ? -EFAULT : sizeof (cnt); } ssize_t eventfd_ctx_read(struct eventfd_ctx *ctx, int no_wait, __u64 *cnt){ ssize_t res; DECLARE_WAITQUEUE(wait, current); spin_lock_irq(&ctx->wqh.lock); *cnt = 0 ; res = -EAGAIN; if (ctx->count > 0 ) res = 0 ; else if (!no_wait) { __add_wait_queue(&ctx->wqh, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (ctx->count > 0 ) { res = 0 ; break ; } if (signal_pending(current)) { res = -ERESTARTSYS; break ; } spin_unlock_irq(&ctx->wqh.lock); schedule(); spin_lock_irq(&ctx->wqh.lock); } __remove_wait_queue(&ctx->wqh, &wait); __set_current_state(TASK_RUNNING); } if (likely(res == 0 )) { eventfd_ctx_do_read(ctx, cnt); if (waitqueue_active(&ctx->wqh)) wake_up_locked_poll(&ctx->wqh, POLLOUT); } spin_unlock_irq(&ctx->wqh.lock); return res; } static void eventfd_ctx_do_read (struct eventfd_ctx *ctx, __u64 *cnt) { *cnt = (ctx->flags & EFD_SEMAPHORE) ? 1 : ctx->count; ctx->count -= *cnt; }
在eventfd_read里先判断请求buf的size是否满足条件,这里count是64位即8个字节,所以最小读取8个字节。然后调用eventfd_ctx_read获取eventfd_ctx中的count计数,并清零,如果读取有问题则返回,否则把值写入到用户空间。
这里着重说一下eventfd_ctx_read:首先调用DECLARE_WAITQUEUE(wait, current)初始化一个wait_queue_t对象wait,之后对eventfd_ctx加锁(由于可能存在并发读取)。往下走会判断count是否大于0,如果大于0则res=0,否则返回res(默认值为-EAGAIN),接下来看看传递进来的参数标志,如果设置了O_NONBLOCK,则就不需要等待,直接返回res(这里返回的res是小于0的情况)。如果没有指定O_NONBLOCK标志,此时由于读取不到count值(count值为0),就阻塞。然后调用__add_wait_queue把把自己加入到等待队列中,该队列会在进程等待的条件满足时唤醒它。加入到队列后进入一个死循环,设置当前进程状态为TASK_INTERRUPTIBLE,并不断检查count值,如果count大于0了,意味着有信号了,就设置res=0,然后break,然后把进程从等待队列去掉,然后设置状态TASK_RUNNING。如果count值为0,则检查是否有挂起的信号,如果有信号,同样需要先对信号进行处理,不过这次读应该就是返回失败了。之后调用调度器进行调度。如果发生了write的动作,count值会大于0,这是就会置res=0并break出去,然后会对count值进行读取。具体读取通过eventfd_ctx_do_read函数,该函数很简单, 先判断有没有指定EFD_SEMAPHORE标志,如果指定了则一次只返回1,,没有就返回count值。然后对count做减法,如果没有指定EFD_SEMAPHORE标志,实际上减去之后就为0了,有的话就减去1。之前如果有在该eventfd上阻塞的write进程,现在就可以唤醒了,所以这里检查了下,如果等待队列还有进程,则调用wake_up_locked_poll对对应的进程进行唤醒。
3.static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 static ssize_t eventfd_write (struct file *file, const char __user *buf, size_t count, loff_t *ppos) { struct eventfd_ctx *ctx = file ->private_data ; ssize_t res; __u64 ucnt; DECLARE_WAITQUEUE(wait, current); if (count < sizeof (ucnt)) return -EINVAL; if (copy_from_user(&ucnt, buf, sizeof (ucnt))) return -EFAULT; if (ucnt == ULLONG_MAX) return -EINVAL; spin_lock_irq(&ctx->wqh.lock); res = -EAGAIN; if (ULLONG_MAX - ctx->count > ucnt) res = sizeof (ucnt); else if (!(file->f_flags & O_NONBLOCK)) { __add_wait_queue(&ctx->wqh, &wait); for (res = 0 ;;) { set_current_state(TASK_INTERRUPTIBLE); if (ULLONG_MAX - ctx->count > ucnt) { res = sizeof (ucnt); break ; } if (signal_pending(current)) { res = -ERESTARTSYS; break ; } spin_unlock_irq(&ctx->wqh.lock); schedule(); spin_lock_irq(&ctx->wqh.lock); } __remove_wait_queue(&ctx->wqh, &wait); __set_current_state(TASK_RUNNING); } if (likely(res > 0 )) { ctx->count += ucnt; if (waitqueue_active(&ctx->wqh)) wake_up_locked_poll(&ctx->wqh, POLLIN); } spin_unlock_irq(&ctx->wqh.lock); return res; }
这个API和上面的eventfd_ctx_read大致类型,先创建wait,然后判断count是不是小于8位 ,之后copy用户态传过来的值到ucnt,并判断ucnt是否等于ULLONG_MAX, 初始化res为-EAGAIN,再往下判断ucnt+ctx->count是否小于ULLONG_MAX,小于的话设置res=sizeof(ucnt),并且把ucnt加到ctx->count上,然后唤醒read进程,并返回,如果ULLONG_MAX小于ucnt+ctx->count,则加入等待队列,设置当前进程状态为TASK_INTERRUPTIBLE,并不断检查ucnt+ctx->count是否小于ULLONG_MAX,如果ucnt+ctx->count小于ULLONG_MAX(即eventfd_read被调用了),则设置res=sizeof(ucnt)并break出去,并且把ucnt加到ctx->count上,然后唤醒等待队列的进程,如果等待期间有信号进来,同样需要先对信号进行处理。
4.__u64 eventfd_signal(struct eventfd_ctx *ctx, __u64 n) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 __u64 eventfd_signal (struct eventfd_ctx *ctx, __u64 n) { unsigned long flags; spin_lock_irqsave(&ctx->wqh.lock, flags); if (ULLONG_MAX - ctx->count < n) n = ULLONG_MAX - ctx->count; ctx->count += n; if (waitqueue_active(&ctx->wqh)) wake_up_locked_poll(&ctx->wqh, POLLIN); spin_unlock_irqrestore(&ctx->wqh.lock, flags); return n; }
该函数一般在内核中经常用到(例如vhost),用户内核通知用户态,和eventf_write函数类似,只不过没有阻塞,这里有点不同的是如果传进来的n+ctx->count大于ULLONG_MAX,则置n=ULLONG_MAX-ctx->count,之后设置ctx->count+=n,之后就是唤醒等待队列的进程了