本文共 10168 字,大约阅读时间需要 33 分钟。
epoll 是linux下特有的IO复用的接口,相比于poll 与select 的特点,epoll不需要每次调用监听都要向内核空间写入要监听的文件描述符以及相应的事件,减少了用户空间向内核空间的数据的拷贝,epoll 内部维护这一个内核事件表,采用红黑树的数据结构,将需要监听的事件与文件描述符存储再该结构上面。当有事件产生的时候,epoll 通过注册的回调函数来唤醒正在睡眠等待的线程,而不用去循环检测文件描述符上是否有事件产生。
epoll_create(int size)
该接口用于创建一个epoll专用的文件描述符。epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epfd: 用epfd 创建的epoll专用的文件描述符;
op: 对文件描述符操作方法EPOLL_CTL_ADD | 添加一个监听的文件描述符 |
---|---|
EPOLL_CTL_MOD | 修改文件描述符上的监听的事件 |
EPOLL_CTL_DEL | 删除一个监听的文件描述符 |
fd: 传入需要监听的文件描述符
event: 需要监听的事件EPOLLIN: | 触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭); |
---|---|
EPOLLOUT: | 触发该事件,表示对应的文件描述符上可以写数据; |
EPOLLPRI: | 表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); |
EPOLLERR: | 表示对应的文件描述符发生错误; |
EPOLLHUP: | 表示对应的文件描述符被挂断; |
EPOLLET: | 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。 |
EPOLLONESHOT: | 只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把 |
epoll_create 函数系统调用通过sys_poll_create来实现
asmlinkage long sys_epoll_create(int size){ int error, fd; struct inode *inode; struct file *file; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d)\n", current, size)); /* Sanity check on the size parameter */ error = -EINVAL; if (size <= 0) goto eexit_1; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure, and inode and a free file descriptor. */ //创建文件描述 并将新建的文件描述符添加到当前进程的文件描述符列表中 //这里不仅创建了一个epoll专用的文件描述符,相应的文件系统的节点,与相应文件 //系统的file结构体,而且将三者建立起了关联 error = ep_getfd(&fd, &inode, &file); if (error) goto eexit_1; /* Setup the file internal data structure ( "struct eventpoll" ) */ error = ep_file_init(file); //创建内核时间表, if (error) goto eexit_2; DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n", current, size, fd)); return fd;eexit_2: sys_close(fd);eexit_1: DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %d\n", current, size, error)); return error;}
在ep_getfd(&fd, &inode, &file)中
//创建了相应的结构体,建立了相应的对应的关系,并且初始化了结构体中的相应的内容 //获取一个file结构体 file = get_empty_filp(); //获取一个eventpoll文件系统的 inode 节点 inode = ep_eventpoll_inode(); //获取一个未使用的fd 文件描述符 error = get_unused_fd(); //最后看file 结构体与fd的对应关系 //当前的所有打开的文件队列上,添加新申请的这个文件描述符 fd_install(fd, file);
void fastcall fd_install(unsigned int fd, struct file * file){ //获取当前进程pcb块中files_struct结构体, //在该结构体中的fd存放当前进程所打开的文件描述符的列表 //该操作将新产生的文件描述符放到当前进程的文件描述的的列表中 struct files_struct *files = current->files; spin_lock(&files->file_lock); if (unlikely(files->fd[fd] != NULL)) BUG(); files->fd[fd] = file; spin_unlock(&files->file_lock);}
上述的过程对应的结构体的关系为
epoll_ctl 在内核中通过sys_epoll_ctl 实现。
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event){ ... /* 想要更新epoll 文件描述符上监听的文件描述符或事件,首先要获取相应的epoll专用的文件描述符,需要更新事件的文件描述符,还要获取相应的的内核事件表。其中内核事件表存放在eventpoll结构体中。,下面的代码中就是获取相应的资源。 */ //获取epoll专用文件描述符 file = fget(epfd); if (!file) goto eexit_1; /* Get the "struct file *" for the target file */ //获取需要监听的文件描述符 struct file tfile = fget(fd); if (!tfile) goto eexit_2; /* The target file descriptor must support poll */ error = -EPERM; if (!tfile->f_op || !tfile->f_op->poll) goto eexit_3; /* * We have to check that the file structure underneath the file descriptor * the user passed to us _is_ an eventpoll file. And also we do not permit * adding an epoll file descriptor inside itself. */ //需要监听的文件描述符不能是epoll文件系统的文件描述符 error = -EINVAL; if (file == tfile || !IS_FILE_EPOLL(file)) goto eexit_3; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ //获取eventpool,通过eventpool可以获取内核时间表 ep = file->private_data; ...}
通过获取到的信息,查看参数中的文件描述符是否在内核事件表中已经存在,因为更新内核事件表的方式有很多,插入一个文件描述符、删除一个文件描述符、更新文件描述符的事件。所以这里要先查找一下需要操作的文件描述符是否存在,毕竟我们不能允许往内核事件表中插入一个已经存在的文件描述符。
//遍历红黑树,判断是否fd所对应的文件描述符tfile是否在红黑树中已经存在了 //如果存在,返回所处的节点,否则返回NULL epi = ep_find(ep, tfile, fd); switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; //如果文件描述符不存在的情况下,允许插入 error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) //文件描述符存在的情况下允许删除 error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { epds.events |= POLLERR | POLLHUP; //文件描述符存在的情况下允许修改一个文件描述符上的事件 error = ep_modify(ep, epi, &epds); } else error = -ENOENT; break; }
这里是插入一个文件描述符的过程
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd){ int error, revents, pwake = 0; unsigned long flags; //红黑树的节点, struct epitem *epi; struct ep_pqueue epq; error = -ENOMEM; if (!(epi = EPI_MEM_ALLOC())) goto eexit_1; /* Item initialization follow here ... */ //初始化新申请的红黑树的节点 EP_RB_INITNODE(&epi->rbn); INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->txlink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; EP_SET_FFD(&epi->ffd, tfile, fd); epi->event = *event; atomic_set(&epi->usecnt, 1); epi->nwait = 0; /* Initialize the poll table using the queue callback */ epq.epi = epi; //设置回调函数,当有事件产生的时候会调用回调函数,将有事件产生的文件描述符放到 //就绪队列上,并唤醒正在睡眠状态的线程。 init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); /* * Attach the item to the poll hooks and get current event bits. * We can safely use the file* here because its usage count has * been increased by the caller of this function. */ //先获取需要监听的文件描述符上的事件 revents = tfile->f_op->poll(tfile, &epq.pt); /* * We have to check if something went wrong during the poll wait queue * install process. Namely an allocation for a wait queue failed due * high memory pressure. */ if (epi->nwait < 0) goto eexit_2; /* Add the current item to the list of active epoll hook for this file */ spin_lock(&tfile->f_ep_lock); //监听队列中添加文件描述符 list_add_tail(&epi->fllink, &tfile->f_ep_links); spin_unlock(&tfile->f_ep_lock); /* We have to drop the new item inside our item list to keep track of it */ write_lock_irqsave(&ep->lock, flags); /* Add the current item to the rb-tree */ //向红黑树中添加元素(内核事件表中) ep_rbtree_insert(ep, epi); /* If the file is already "ready" we drop it inside the ready list */ //如果已经存在就绪事件,将文件描述符添加到就绪队列 if ((revents & event->events) && !EP_IS_LINKED(&epi->rdllink)) { list_add_tail(&epi->rdllink, &ep->rdllist); /* Notify waiting tasks that events are available */ if (waitqueue_active(&ep->wq)) wake_up(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; } write_unlock_irqrestore(&ep->lock, flags); /* We have to call this outside the lock */ //如果pwake 大于0说明有就绪的文件描述符 //唤醒正在等待的线程区执行 if (pwake) ep_poll_safewake(&psw, &ep->poll_wait); DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_insert(%p, %p, %d)\n", current, ep, tfile, fd)); return 0;eexit_2: ep_unregister_pollwait(ep, epi); /* * We need to do this because an event could have been arrived on some * allocated wait queue. */ write_lock_irqsave(&ep->lock, flags); if (EP_IS_LINKED(&epi->rdllink)) EP_LIST_DEL(&epi->rdllink); write_unlock_irqrestore(&ep->lock, flags); EPI_MEM_FREE(epi);eexit_1: return error;}
这里给出其中结构体的部分的对应的关系,以便于理解
内核中调用sys_epoll_wait()
//这部分与sys_epoll_ctl 类似,需要先获取相应的文件描述符与内核事件表 file = fget(epfd); if (!file) goto eexit_1; /* * We have to check that the file structure underneath the fd * the user passed to us _is_ an eventpoll file. */ error = -EINVAL; if (!IS_FILE_EPOLL(file)) goto eexit_2; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ ep = file->private_data;
//开始监听error = ep_poll(ep, events, maxevents, timeout);
进入ep_poll函数
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout){ int res, eavail; unsigned long flags; //超时时间 long jtimeout; //等待队列 wait_queue_t wait; /* * Calculate the timeout by checking for the "infinite" value ( -1 ) * and the overflow condition. The passed timeout is in milliseconds, * that why (t * HZ) / 1000. */ jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ? MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;retry: write_lock_irqsave(&ep->lock, flags); res = 0; //首先判断一次就绪队列是否为空 if (list_empty(&ep->rdllist)) { /* * We don't have any available event to return to the caller. * We need to sleep here, and we will be wake up by * ep_poll_callback() when events will become available. */ init_waitqueue_entry(&wait, current); add_wait_queue(&ep->wq, &wait); for (;;) { /* * We don't want to sleep if the ep_poll_callback() sends us * a wakeup in between. That's why we set the task state * to TASK_INTERRUPTIBLE before doing the checks. */ set_current_state(TASK_INTERRUPTIBLE); //如果监听的队列不为空,或者到达了超时的事件,就退出循环 if (!list_empty(&ep->rdllist) || !jtimeout) break; //如果当前进程有信号产生,也要退出 if (signal_pending(current)) { res = -EINTR; break; } write_unlock_irqrestore(&ep->lock, flags); //进入睡眠状态 jtimeout = schedule_timeout(jtimeout); write_lock_irqsave(&ep->lock, flags); } remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING); } /* Is it worth to try to dig for events ? */ //判断退出循环的条件是有事件产生还是到了超时的时间 eavail = !list_empty(&ep->rdllist); write_unlock_irqrestore(&ep->lock, flags); /* * Try to transfer events to user space. In case we get 0 events and * there's still timeout left over, we go trying again in search of * more luck. */ //如果有事件产生,就将就绪的文件描述符拷贝用户空间 if (!res && eavail && //往用户空间拷贝有事件就绪的文件描述符 !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout) goto retry; //返回就绪事件的个数 return res;}
深入ep_events_transfer函数可以看一下
static int ep_events_transfer(struct eventpoll *ep, struct epoll_event __user *events, int maxevents){ int eventcnt = 0; struct list_head txlist; INIT_LIST_HEAD(&txlist); /* * We need to lock this because we could be hit by * eventpoll_release_file() and epoll_ctl(EPOLL_CTL_DEL). */ down_read(&ep->sem); /* Collect/extract ready items */ //收集就绪的事件的文件描述符,放到txlist链表中 if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) { /* Build result set in userspace */ //将就绪的事件拷贝到用户空间 eventcnt = ep_send_events(ep, &txlist, events); /* Reinject ready items into the ready list */ ep_reinject_items(ep, &txlist); } up_read(&ep->sem); return eventcnt;}
如果有兴趣可以跟踪函数,看一下到底怎样实现内核空间到用户空间的数据的拷贝的,以及怎样通过回调函数唤醒一个正在睡眠状态的线程的,这里不再往下跟踪。
转载地址:http://xanwi.baihongyu.com/