多进程/线程并发模型,为每个socket分配一个进程/线程。

IO多路复用:通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

应用:适用于针对大量的io请求的情况,对于服务器必须在同时处理来自客户端的大量的io操作的时候,就非常适合

与多进程和多线程技术相比,I/O多路复用技术的最大优势就是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

目前支持I/O多路复用的系统调用有selectpselectpollepoll, 但他们本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

selectpselectpollepoll 都是属于IO设计模式Reactor的IO策略。

IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:

  • 当客户处理多个描述符时(一般是交互式输入和网络套接口),必须使用I/O复用。

  • 当一个客户同时处理多个套接口时,这种情况是可能的,但很少出现。

  • 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。

  • 如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。

  • 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

select基本原理

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以通过遍历fdset,来找到就绪的描述符。

select基本流程

select函数原型

该函数准许进程指示内核等待多个事件中的任何一个发送,并只在有一个或多个事件发生或经历一段指定的时间后才唤醒自己。函数原型如下:

#include <sys/select.h>
#include <sys/time.h>

int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);

// 返回值:就绪描述符的数目,超时返回0,出错返回-1
// 函数参数介绍如下:
//(1)第一个参数maxfdp1指定待测试的描述字个数,它的值是待测试的最大描述字加1(因此把该参数命名为maxfdp1)描述字0、1、2...(maxfdp1-1)均将被测试(文件描述符是从0开始的)。
//(2)中间的三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述字。如果对某一个的条件不感兴趣,就可以把它设为空指针。
// writeset的write会阻塞,但是阻塞时间是非常短的,所以一般需要监听,设置为空
struct fd_set;   //可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:

void FD_ZERO(fd_set *fdset);           //清空集合
void FD_SET(int fd, fd_set *fdset);    //将一个给定的文件描述符加入集合之中
void FD_CLR(int fd, fd_set *fdset);    //将一个给定的文件描述符从集合中删除
int FD_ISSET(int fd, fd_set *fdset);   // 检查集合中指定的文件描述符是否可以读写 

//(3)timeout指定等待的时间,告知内核等待所指定描述字中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。
struct timeval {
    long tv_sec;   //seconds
    long tv_usec;  //microseconds
};
/*
 这个参数有三种可能:
(1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
(2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
(3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。
*/

位图Bitmap的原理

select优点

  • 跨平台。(几乎所有的平台都支持)

  • 时间精度高。(ns级别)

select缺点

  • 最大限制:单个进程能够监视的文件描述符的数量存在最大限制。(基于数组存储的赶脚)一般来说这个数目和系统内存关系很大,具体数目可以cat /proc/sys/fs/file-max察看。它由FD_SETSIZE设置,32位机默认是1024个。64位机默认是2048.

  • 时间复杂度: 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低,时间复杂度O(n)。 当套接字比较多的时候,每次select() 都要通过遍历FD_SETSIZE个Socket来完成调度,不管哪个Socket是活跃的,都遍历一遍。这会浪费很多CPU时间。
    它仅仅知道有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度 ,同时处理的流越多,无差别轮询时间就越长。

  • 内存拷贝:需要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。

Select的超时机制

  • int maxfdp 是指集合中所有描述符的最大值加1

  • fd_set *readfds 监视是否有新的socket连接,或现有的描述符是否有数据可读。

  • fd_set *writefds 监视是否可以向描述符中写入数据,只要缓存没满,所监视的描述符都可以写,select立即返回。

  • fd_set *exceptfds 监视描述符中的异常,从未使用过

  • struct timeval *timeout 超时机制。

select模型会丢失事件和数据吗?

答:不会。select采用水平触发的方式,如果报告fd后事件没有被处理或者数据没有被完全读取,那么下次select时会再次报告该id,也就是说select不会丢失事件和数据。

select的其它用途

在Unix(Linux)世界里,一切皆文件,文件就是一串二进制流,不管socket、管道、终端、设备等都是文件,一切都是流,在信息交换的过程中, 都是对这些流进行数据的收发操作,简称为I/O操作(input and output), 往流中读出数据,系统调用read,写入数据,系统调用write。

select是I/O复用函数,除了用于网络通信,还可以用于文件、管道、终端、设备等操作,但开发场景比较少。

示例代码

tcpselect.cpp

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/fcntl.h>

// 初始化服务端的监听端口。
int initserver(int port);

int main(int argc, char *argv[]) {
    if (argc != 2) {
        printf("usage: ./tcpselect port\n");
        return -1;
    }

    // 初始化服务端用于监听的socket。
    int listensock = initserver(atoi(argv[1]));
    printf("listensock=%d\n", listensock);

    if (listensock < 0) {
        printf("initserver() failed.\n");
        return -1;
    }

    fd_set readfdset;  // 读事件的集合,包括监听socket和客户端连接上来的socket。
    int maxfd;  // readfdset中socket的最大值。

    // 初始化结构体,把listensock添加到集合中。
    FD_ZERO(&readfdset);

    FD_SET(listensock, &readfdset);
    maxfd = listensock;

    while (1) {
        // 调用select函数时,会改变socket集合的内容,所以要把socket集合保存下来,传一个临时的给select。
        fd_set tmpfdset = readfdset;

        int infds = select(maxfd + 1, &tmpfdset, NULL, NULL, NULL);
        // printf("select infds=%d\n",infds);

        // 返回失败。
        if (infds < 0) {
            printf("select() failed.\n");
            perror("select()");
            break;
        }

        // 超时,在本程序中,select函数最后一个参数为空,不存在超时的情况,但以下代码还是留着。
        if (infds == 0) {
            printf("select() timeout.\n");
            continue;
        }

        // 检查有事情发生的socket,包括监听和客户端连接的socket。
        // 这里是客户端的socket事件,每次都要遍历整个集合,因为可能有多个socket有事件。
        for (int eventfd = 0; eventfd <= maxfd; eventfd++) {
            if (FD_ISSET(eventfd, &tmpfdset) <= 0) continue;

            if (eventfd == listensock) {
                // 如果发生事件的是listensock,表示有新的客户端连上来。
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int clientsock = accept(listensock, (struct sockaddr *) &client, &len);
                if (clientsock < 0) {
                    printf("accept() failed.\n");
                    continue;
                }

                printf("client(socket=%d) connected ok.\n", clientsock);

                // 把新的客户端socket加入集合。
                FD_SET(clientsock, &readfdset);

                if (maxfd < clientsock) maxfd = clientsock;

                continue;
            } else {
                // 客户端有数据过来或客户端的socket连接被断开。
                char buffer[1024];
                memset(buffer, 0, sizeof(buffer));

                // 读取客户端的数据。
                ssize_t isize = read(eventfd, buffer, sizeof(buffer));

                // 发生了错误或socket被对方关闭。
                if (isize <= 0) {
                    printf("client(eventfd=%d) disconnected.\n", eventfd);

                    close(eventfd);  // 关闭客户端的socket。

                    FD_CLR(eventfd, &readfdset);  // 从集合中移去客户端的socket。

                    // 重新计算maxfd的值,注意,只有当eventfd==maxfd时才需要计算。
                    if (eventfd == maxfd) {
                        for (int ii = maxfd; ii > 0; ii--) {
                            if (FD_ISSET(ii, &readfdset)) {
                                maxfd = ii;
                                break;
                            }
                        }

                        printf("maxfd=%d\n", maxfd);
                    }

                    continue;
                }

                printf("recv(eventfd=%d,size=%d):%s\n", eventfd, isize, buffer);

                // 把收到的报文发回给客户端。
                write(eventfd, buffer, strlen(buffer));
            }
        }
    }

    return 0;
}

// 初始化服务端的监听端口。
int initserver(int port) {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        printf("socket() failed.\n");
        return -1;
    }

    // Linux如下
    int opt = 1;
    unsigned int len = sizeof(opt);
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, len);
    setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &opt, len);

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    if (bind(sock, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        printf("bind() failed.\n");
        close(sock);
        return -1;
    }

    if (listen(sock, 5) != 0) {
        printf("listen() failed.\n");
        close(sock);
        return -1;
    }

    return sock;
}

client.cpp

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>

int main(int argc, char *argv[]) {
    if (argc != 3) {
        printf("usage:./tcpclient ip port\n");
        return -1;
    }

    int sockfd;
    struct sockaddr_in servaddr;
    char buf[1024];

    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        printf("socket() failed.\n");
        return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(atoi(argv[2]));
    servaddr.sin_addr.s_addr = inet_addr(argv[1]);

    if (connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) != 0) {
        printf("connect(%s:%s) failed.\n", argv[1], argv[2]);
        close(sockfd);
        return -1;
    }

    printf("connect ok.\n");

    for (int ii = 0; ii < 10000; ii++) {
        // 从命令行输入内容。
        memset(buf, 0, sizeof(buf));
        printf("please input:");
        scanf("%s", buf);
        // sprintf(buf,"1111111111111111111111ii=%08d",ii);

        if (write(sockfd, buf, strlen(buf)) <= 0) {
            printf("write() failed.\n");
            close(sockfd);
            return -1;
        }

        memset(buf, 0, sizeof(buf));
        if (read(sockfd, buf, sizeof(buf)) <= 0) {
            printf("read() failed.\n");
            close(sockfd);
            return -1;
        }

        printf("recv:%s\n", buf);

        // close(sockfd); break;
    }
}

poll基本原理

poll和select在本质上没有差别,管理多个描述符也是送行轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量的限制,它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

  • select采用fdse煤用bitmap , poll采用了数组.

  • poll和select同样存在一个缺点就是,文件描述符的数组植整体复制于 用户态和内核态的地址空间之间,而不论这些文件描述符是否有事件, 它的开销随着文件描述符数量的增加而线性增大

poll基本流程

类似select

poll函数原型

#include <poll.h>
#include <arpa/inet.h>

int poll(struct pollfd *fds, unsigned int nfds, int timeout);

// (1)pollfd结构体定义如下:
struct pollfd {
    int fd;              /* 文件描述符 */
    short events;        /* 等待的事件 */
    short revents;       /* 实际发生了的事件 */
};
/*
  每一个pollfd结构体指定了一个被监视的文件描述符。因此可以传递多个结构体,指示poll()监视多个文件描述符。
 (2)events域是监视该文件描述符的事件掩码,由用户来设置这个域。
    POLLIN         有数据可读。
    POLLRDNORM      有普通数据可读。
    POLLRDBAND      有优先数据可读。
    POLLPRI        有紧迫数据可读。
    POLLOUT        写数据不会导致阻塞。
    POLLWRNORM      写普通数据不会导致阻塞。
    POLLWRBAND      写优先数据不会导致阻塞。
    POLLMSGSIGPOLL    消息可用。
(3)revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。events域中请求的任何事件都可能在revents域中返回。
   此外,revents域中还可能返回下列事件:   
    POLLER    指定的文件描述符发生错误。
    POLLHUP   指定的文件描述符挂起事件。
    POLLNVAL  指定的文件描述符非法。
   这些事件在events域中无意义,因为它们在合适的时候总是会从revents中返回。   
(4)举个栗子:要同时监视一个文件描述符是否可读和可写,
    我们可以设置 events 为POLLIN | POLLOUT。
    在poll返回时,我们可以检查revents中的标志,对应于文件描述符请求的events结构体。
    如果POLLIN事件被设置,则文件描述符可以被读取而不阻塞。
    如果POLLOUT被设置,则文件描述符可以写入而不导致阻塞。
    这些标志并不是互斥的:它们可能被同时设置,表示这个文件描述符的读取和写入操作都会正常返回而不阻塞。
  
(5)nfds参数是数组fds元素的个数。
(6)timeout参数指定等待的毫秒数,无论I/O是否准备好,poll都会返回。
    timeout指定为负数值表示无限超时,使poll()一直挂起直到一个指定事件发生;
    timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。
 
(7)返回值和错误代码   
  成功时,poll()返回结构体中revents域不为0的文件描述符个数;
  如果在超时前没有任何事件发生,poll()返回0;
  失败时,poll()返回-1,
    并设置errno为下列值之一:   
    EBADF        一个或多个结构体中指定的文件描述符无效。   
    EFAULTfds    指针指向的地址超出进程的地址空间。   
    EINTR       请求的事件之前产生一个信号,调用可以重新发起。   
    EINVALnfds   参数超出PLIMIT_NOFILE值。   
    ENOMEM       可用内存不足,无法完成请求。
*/

poll优点

没有最大连接数的限制。(基于链表来存储的)

poll缺点

  • 时间复杂度: 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低,时间复杂度O(n)。 它将用户传入的数组拷贝到内核空间,然后查询每个fd对应的设备状态,如果设备就绪则在设备等待队列中加入一项并继续遍历,如果遍历完所有fd后没有发现就绪设备,则挂起当前进程,直到设备就绪或者主动超时,被唤醒后它又要再次遍历fd。这个过程经历了多次无谓的遍历。

  • 内存拷贝:大量的fd数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。

  • 水平触发:如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

注意:select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。 事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

示例代码

tcpselect.cpp

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/fcntl.h>

// ulimit -n
#define MAXNFDS  1024

// 初始化服务端的监听端口。
int initserver(int port);

int main(int argc, char *argv[]) {
    if (argc != 2) {
        printf("usage: ./tcppoll port\n");
        return -1;
    }

    // 初始化服务端用于监听的socket。
    int listensock = initserver(atoi(argv[1]));
    printf("listensock=%d\n", listensock);

    if (listensock < 0) {
        printf("initserver() failed.\n");
        return -1;
    }

    int maxfd;   // fds数组中需要监视的socket的大小。
    struct pollfd fds[MAXNFDS];  // fds存放需要监视的socket。

    for (int ii = 0; ii < MAXNFDS; ii++) fds[ii].fd = -1; // 初始化数组,把全部的fd设置为-1。

    // 把listensock添加到数组中。
    fds[listensock].fd = listensock;
    fds[listensock].events = POLLIN;  // 有数据可读事件,包括新客户端的连接、客户端socket有数据可读和客户端socket断开三种情况。
    maxfd = listensock;

    while (1) {
        int infds = poll(fds, maxfd + 1, 5000);
        // printf("poll infds=%d\n",infds);

        // 返回失败。
        if (infds < 0) {
            printf("poll() failed.\n");
            perror("poll():");
            break;
        }

        // 超时。
        if (infds == 0) {
            printf("poll() timeout.\n");
            continue;
        }

        // 检查有事情发生的socket,包括监听和客户端连接的socket。
        // 这里是客户端的socket事件,每次都要遍历整个集合,因为可能有多个socket有事件。
        for (int eventfd = 0; eventfd <= maxfd; eventfd++) {
            if (fds[eventfd].fd < 0) continue;

            if ((fds[eventfd].revents & POLLIN) == 0) continue;

            fds[eventfd].revents = 0;  // 先把revents清空。

            if (eventfd == listensock) {
                // 如果发生事件的是listensock,表示有新的客户端连上来。
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int clientsock = accept(listensock, (struct sockaddr *) &client, &len);
                if (clientsock < 0) {
                    printf("accept() failed.\n");
                    continue;
                }

                printf("client(socket=%d) connected ok.\n", clientsock);

                if (clientsock > MAXNFDS) {
                    printf("clientsock(%d)>MAXNFDS(%d)\n", clientsock, MAXNFDS);
                    close(clientsock);
                    continue;
                }

                fds[clientsock].fd = clientsock;
                fds[clientsock].events = POLLIN;
                fds[clientsock].revents = 0;
                if (maxfd < clientsock) maxfd = clientsock;

                printf("maxfd=%d\n", maxfd);
                continue;
            } else {
                // 客户端有数据过来或客户端的socket连接被断开。
                char buffer[1024];
                memset(buffer, 0, sizeof(buffer));

                // 读取客户端的数据。
                ssize_t isize = read(eventfd, buffer, sizeof(buffer));

                // 发生了错误或socket被对方关闭。
                if (isize <= 0) {
                    printf("client(eventfd=%d) disconnected.\n", eventfd);

                    close(eventfd);  // 关闭客户端的socket。

                    fds[eventfd].fd = -1;

                    // 重新计算maxfd的值,注意,只有当eventfd==maxfd时才需要计算。
                    if (eventfd == maxfd) {
                        for (int ii = maxfd; ii > 0; ii--) {
                            if (fds[ii].fd != -1) {
                                maxfd = ii;
                                break;
                            }
                        }

                        printf("maxfd=%d\n", maxfd);
                    }

                    continue;
                }

                printf("recv(eventfd=%d,size=%d):%s\n", eventfd, isize, buffer);

                // 把收到的报文发回给客户端。
                write(eventfd, buffer, strlen(buffer));
            }
        }
    }

    return 0;
}

// 初始化服务端的监听端口。
int initserver(int port) {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        printf("socket() failed.\n");
        return -1;
    }

    // Linux如下
    int opt = 1;
    unsigned int len = sizeof(opt);
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, len);
    setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &opt, len);

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    if (bind(sock, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        printf("bind() failed.\n");
        close(sock);
        return -1;
    }

    if (listen(sock, 5) != 0) {
        printf("listen() failed.\n");
        close(sock);
        return -1;
    }

    return sock;
}

client.cpp

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>

int main(int argc, char *argv[]) {
    if (argc != 3) {
        printf("usage:./tcpclient ip port\n");
        return -1;
    }

    int sockfd;
    struct sockaddr_in servaddr;
    char buf[1024];

    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        printf("socket() failed.\n");
        return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(atoi(argv[2]));
    servaddr.sin_addr.s_addr = inet_addr(argv[1]);

    if (connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) != 0) {
        printf("connect(%s:%s) failed.\n", argv[1], argv[2]);
        close(sockfd);
        return -1;
    }

    printf("connect ok.\n");

    for (int ii = 0; ii < 10000; ii++) {
        // 从命令行输入内容。
        memset(buf, 0, sizeof(buf));
        printf("please input:");
        scanf("%s", buf);
        // sprintf(buf,"1111111111111111111111ii=%08d",ii);

        if (write(sockfd, buf, strlen(buf)) <= 0) {
            printf("write() failed.\n");
            close(sockfd);
            return -1;
        }

        memset(buf, 0, sizeof(buf));
        if (read(sockfd, buf, sizeof(buf)) <= 0) {
            printf("read() failed.\n");
            close(sockfd);
            return -1;
        }

        printf("recv:%s\n", buf);

        // close(sockfd); break;
    }
}

epoll

epoll是在2.6内核中提出的,是之前的select和poll的增强版本。是为处理大批量句柄而作了改进的poll。 epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的拷贝只需要一次。

epoll基本原理

epoll有两大特点:

  • 边缘触发,它只告诉进程哪些fd刚刚变为就绪态,并且只会通知一次。

  • 事件驱动,每个事件关联上fd,使用事件就绪通知方式,通过 epoll_ctl 注册 fd,一旦该fd就绪,内核就会采用 callback 的回调机制来激活该fd,epoll_wait 便可以收到通知。

epoll基本流程

一棵红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。

  • 执行 epoll_create
    内核在epoll文件系统中建了个file结点,(使用完,必须调用close()关闭,否则导致fd被耗尽) 在内核cache里建了红黑树存储epoll_ctl传来的socket, 在内核cache里建了rdllist双向链表存储准备就绪的事件。

  • 执行 epoll_ctl
    如果增加socket句柄,检查红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,告诉内核如果这个句柄的中断到了,就把它放到准备就绪list链表里。 ps:所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立回调关系,相应的事件发生时,会调用回调方法。

  • 执行 epoll_wait
    立刻返回准备就绪表里的数据即可(将内核cache里双向列表中存储的准备就绪的事件 复制到用户态内存) 当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。 如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。

epoll函数原型

#include <sys/epoll.h>

int epoll_create(int size); // 创建epoll的句柄,它本身就是一个fd
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  //  注册需要监视fd和事件
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); // 等待事件发生

int epoll_create(int size);  /*创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。*/
// 这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值。
// 需要注意的是: 当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,    
// 所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
/*
epoll的事件注册函数: 它不同与select()是在监听事件时告诉内核要监听什么类型的事件epoll的事件注册函数,而是在这里先注册要监听的事件类型。 
第一个参数 epfd 是epoll_create()的返回值,  
第二个参数 op 表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;   
第三个参数是需要监听的fd,  
第四个参数是告诉内核需要监听什么事,      
struct epoll_event结构如下:
*/
struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
};

/*
events可以是以下几个宏的集合:       
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);      
EPOLLOUT:表示对应的文件描述符可以写;       
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); 
EPOLLERR:表示对应的文件描述符发生错误;       
EPOLLHUP:表示对应的文件描述符被挂断;       
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。       
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
*/

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
/*
等待事件的产生    
类似于select()调用。    
参数 events用来从内核得到事件的集合,    
参数 maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,   
参数 timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。    
该函数返回需要处理的事件数目,如返回0表示已超时。
*/

epoll优点

  • 没有最大连接数的限制。(基于 红黑树+双链表 来存储的:1G的内存上能监听约10万个端口)

  • 时间复杂度低: 边缘触发和事件驱动,监听回调,时间复杂度O(1)。 只有活跃可用的fd才会调用callback函数;即epoll最大的优点就在于它只管“活跃”的连接,而跟连接总数无关,因此实际网络环境中,Epoll的效率就会远远高于select和poll。

  • 内存拷贝:利用mmap()文件映射内存加速与内核空间的消息传递,减少拷贝开销。

epoll缺点

依赖于操作系统:Lunix

epoll应用场景

  • 适合用epoll的应用场景

    • 对于连接特别多,活跃的连接特别少

    • 典型的应用场景为一个需要处理上万的连接服务器,例如各种app的入口服务器,例如qq

  • 不适合epoll的场景

    • 连接比较少,数据量比较大,例如ssh

    • epoll 的惊群问题: 因为epoll 多用于多个连接,只有少数活跃的场景,但是万一某一时刻,epoll 等的上千个文件描述符都就绪了,这时候epoll 要进行大量的I/O,此时压力太大。

epoll两种模式

epoll对文件描述符的操作有两种模式:LT(level trigger) 和 ET(edge trigger)。LT是默认的模式,ET是“高速”模式。

  • LT(水平触发)模式下,只要有数据就触发,缓冲区剩余未读尽的数据会导致 epoll_wait都会返回它的事件;

  • ET(边缘触发)模式下,只有新数据到来才触发,不管缓存区中是否还有数据,缓冲区剩余未读尽的数据不会导致epoll_wait返回。

LT模式

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket

在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。

如果你不作任何操作,内核还是会继续通知你的,只要这个文件描述符还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作

ET模式

ET(edge-triggered)是高速工作方式,只支持no-block socket,在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。 然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了 (比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。 但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once)。

在它检测到有 I/O 事件时,通过 epoll_wait 调用会得到有事件通知的文件描述符,对于每一个被通知的文件描述符,如可读,则必须将该文件描述符一直读到空, 让 errno 返回 EAGAIN (提示你的应用程序现在没有数据可读请稍后再试)为止,否则下次的 epoll_wait 不会返回余下的数据,会丢掉事件。

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

注意:

  • 1、在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符, 一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。 此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。

  • 2、如果没有大量的idle-connection或者dead-connection,epoll的效率并不会比select/poll高很多, 但是当遇到大量的idle-connection,就会发现epoll的效率大大高于select/poll。

示例代码

tcpselect.cpp

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>

#define MAXEVENTS 100

// 把socket设置为非阻塞的方式。
int setnonblocking(int sockfd);

// 初始化服务端的监听端口。
int initserver(int port);

int main(int argc, char *argv[]) {
    if (argc != 2) {
        printf("usage:./tcpepoll port\n");
        return -1;
    }

    // 初始化服务端用于监听的socket。
    int listensock = initserver(atoi(argv[1]));
    printf("listensock=%d\n", listensock);

    if (listensock < 0) {
        printf("initserver() failed.\n");
        return -1;
    }

    int epollfd;

    char buffer[1024];
    memset(buffer, 0, sizeof(buffer));

    // 创建一个描述符
    epollfd = epoll_create(1);

    // 添加监听描述符事件
    struct epoll_event ev;
    ev.data.fd = listensock;
    ev.events = EPOLLIN;             // 默认水平触发
    // ev.events = EPOLLIN|EPOLLET;  // 设置边缘触发
    epoll_ctl(epollfd, EPOLL_CTL_ADD, listensock, &ev);

    while (1) {
        struct epoll_event events[MAXEVENTS]; // 存放有事件发生的结构数组。

        // 等待监视的socket有事件发生。
        int infds = epoll_wait(epollfd, events, MAXEVENTS, -1);
        // printf("epoll_wait infds=%d\n",infds);

        // 返回失败。
        if (infds < 0) {
            printf("epoll_wait() failed.\n");
            perror("epoll_wait()");
            break;
        }

        // 超时。
        if (infds == 0) {
            printf("epoll_wait() timeout.\n");
            continue;
        }

        // 遍历有事件发生的结构数组。
        for (int ii = 0; ii < infds; ii++) {
            if ((events[ii].data.fd == listensock) && (events[ii].events & EPOLLIN)) {
                // 如果发生事件的是listensock,表示有新的客户端连上来。
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int clientsock = accept(listensock, (struct sockaddr *) &client, &len);
                if (clientsock < 0) {
                    printf("accept() failed.\n");
                    continue;
                }

                // 把新的客户端添加到epoll中。
                memset(&ev, 0, sizeof(struct epoll_event));
                ev.data.fd = clientsock;
                ev.events = EPOLLIN;
                epoll_ctl(epollfd, EPOLL_CTL_ADD, clientsock, &ev);

                printf("client(socket=%d) connected ok.\n", clientsock);

                continue;
            } else if (events[ii].events & EPOLLIN) {
                // 客户端有数据过来或客户端的socket连接被断开。
                char buffer[1024];
                memset(buffer, 0, sizeof(buffer));

                // 读取客户端的数据。
                ssize_t isize = read(events[ii].data.fd, buffer, sizeof(buffer));

                // 发生了错误或socket被对方关闭。
                if (isize <= 0) {
                    printf("client(eventfd=%d) disconnected.\n", events[ii].data.fd);

                    // 把已断开的客户端从epoll中删除。
                    memset(&ev, 0, sizeof(struct epoll_event));
                    ev.events = EPOLLIN;
                    ev.data.fd = events[ii].data.fd;
                    epoll_ctl(epollfd, EPOLL_CTL_DEL, events[ii].data.fd, &ev);
                    close(events[ii].data.fd);
                    continue;
                }

                printf("recv(eventfd=%d,size=%d):%s\n", events[ii].data.fd, isize, buffer);

                // 把收到的报文发回给客户端。
                write(events[ii].data.fd, buffer, strlen(buffer));
            }
        }
    }

    close(epollfd);

    return 0;
}

// 初始化服务端的监听端口。
int initserver(int port) {
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        printf("socket() failed.\n");
        return -1;
    }

    // Linux如下
    int opt = 1;
    unsigned int len = sizeof(opt);
    setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, len);
    setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &opt, len);

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    if (bind(sock, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0) {
        printf("bind() failed.\n");
        close(sock);
        return -1;
    }

    if (listen(sock, 5) != 0) {
        printf("listen() failed.\n");
        close(sock);
        return -1;
    }

    return sock;
}

// 把socket设置为非阻塞的方式。
int setnonblocking(int sockfd) {
    if (fcntl(sockfd, F_SETFL, fcntl(sockfd, F_GETFD, 0) | O_NONBLOCK) == -1) return -1;

    return 0;
}

client.cpp

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>

int main(int argc, char *argv[]) {
    if (argc != 3) {
        printf("usage:./tcpclient ip port\n");
        return -1;
    }

    int sockfd;
    struct sockaddr_in servaddr;
    char buf[1024];

    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        printf("socket() failed.\n");
        return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(atoi(argv[2]));
    servaddr.sin_addr.s_addr = inet_addr(argv[1]);

    if (connect(sockfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) != 0) {
        printf("connect(%s:%s) failed.\n", argv[1], argv[2]);
        close(sockfd);
        return -1;
    }

    printf("connect ok.\n");

    for (int ii = 0; ii < 10000; ii++) {
        // 从命令行输入内容。
        memset(buf, 0, sizeof(buf));
        printf("please input:");
        scanf("%s", buf);
        // sprintf(buf,"1111111111111111111111ii=%08d",ii);

        if (write(sockfd, buf, strlen(buf)) <= 0) {
            printf("write() failed.\n");
            close(sockfd);
            return -1;
        }

        memset(buf, 0, sizeof(buf));
        if (read(sockfd, buf, sizeof(buf)) <= 0) {
            printf("read() failed.\n");
            close(sockfd);
            return -1;
        }

        printf("recv:%s\n", buf);

        // close(sockfd); break;
    }
}

select、poll、epoll区别

  • 1、支持一个进程所能打开的最大连接数

select poll epoll
单个进程所能打开的最大连接数有FD_SETSIZE宏定义,其大小是32个整数的大小(在32位的机器上,大小就是3232,同理64位机器上FD_SETSIZE为3264),当然我们可以对进行修改,然后重新编译内核,但是性能可能会受到影响,这需要进一步的测试。 poll本质上和select没有区别,但是它没有最大连接数的限制,原因是它是基于链表来存储的 虽然连接数有上限,但是很大,1G内存的机器上可以打开10万左右的连接,2G内存的机器可以打开20万左右的连接
  • 2、FD剧增后带来的IO效率问题

select poll epoll
因为每次调用时都会对连接进行线性遍历,所以随着FD的增加会造成遍历速度慢的“线性下降性能问题”。 同select 因为epoll内核中实现是根据每个fd上的callback函数来实现的,只有活跃的socket才会主动调用callback,所以在活跃socket较少的情况下,使用epoll没有前面两者的线性下降的性能问题,但是所有socket都很活跃的情况下,可能会有性能问题。
  • 3、消息传递方式

select poll epoll
内核需要将消息传递到用户空间,都需要内核拷贝动作 同select epoll通过mmap把对应设备文件片断映射到用户空间上, 消息传递不通过内核, 内存与设备文件同步数据.
  • 总结:

    • 1、表面上看epoll的性能最好,但是在连接数少并且连接都十分活跃的情况下,select和poll的性能可能比epoll好,毕竟epoll的通知机制需要很多函数回调。

    • 2、select低效是因为每次它都需要轮询。但低效也是相对的,视情况而定,也可通过良好的设计改善