Linux内核编程–常见IO模型与select/poll/epoll编程

一,Linux系统的五种基本I/O模型

0.前置知识
套接字中的数据传输模式:
套接字上的数据传输分两步执行:第一步,等待网络中的数据送达,将送达后的数据复制到内核中的缓冲区。第二步,把数据从内核中的缓冲区拷贝到应用进程的缓冲区。整个过程的运行空间是从应用进程空间切换到内核进程空间然后再切换回应用进程空间。
缓存IO:
含义:数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。
缺点:数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

1.阻塞式I/O
该模式为最流行的I/O模式。套接字通信在默认情况下使用的就是阻塞模式。
阻塞模式下的数据报套接字通信示意图:

注意:后面的示意图都是以UDP的数据报套接字通信为例,因为TCP的流程太复杂。示意图所示的步骤为recvfrom函数的执行过程。进程调用recvfrom函数时,当有数据报到达且被复制到应用进程的缓冲区中或发生错误时才返回。

2.非阻塞式I/O

非阻塞模式和阻塞模式的区别在于,阻塞模式没有收到数据报返回的时候会一直阻塞等待,直到数据报到达或者报错产生,而非阻塞模式是不断地检查数据报的到达状态,有数据报到达就返回数据报,没有数据报到达或发生错误就报错返回。

示意图:

如图所示,前三次调用recvfrom时没有数据报到达,于是内核立即返回一个EWOULDBLOCK报错。第四次调用时,数据报已经到达,于是内核开始复制数据。

一个应用进程如果是非阻塞式地对一个函数进行循环调用并立即返回处理结果,我们称这个方式为轮询–polling。应用进程持续轮询内核,直到某个操作就绪,这个做法往往会耗费大量的CPU时间。

3.多路复用I/O

该模式一次遍历所有的文件描述符,通过非阻塞 I/O 查看其是否就绪。该模式的优点是可以在一个线程内同时处理多个I/O请求。

当用户进程调用了select时,整个进程会被阻塞。阻塞期间,内核会同时监听传入的所有的文件描述符/套接字描述符,当其中一个描述符(读操作,写操作等)就绪时,内核会把描述符传递给用户进程,用户进程开始处理。使用select可以同时等待多个操作就绪。

示意图:

4.信号驱动式I/O

当数据报到达时,内核给应用进程发送一个SIGIO信号,随后既可以在信号处理函数中调用recvfrom函数读取数据报,也可以通知主循环,让它读取数据报。这个模式的优点在于等待数据报到达期间,进程可以干别的事情且不被阻塞。

对于UDP通信,当数据报到达套接字或者套接字发生异常时,产生SIGIO信号给UDP应用处理。

示意图:

5.异步I/O

应用进程调用aio_read函数,把数据报相关信息告诉给内核,然后应用进程开始干别的。当内核把数据复制到应用进程的缓冲区时,再发送指定信号给应用进程,通知应用进程处理数据报和信号。该步骤和信号驱动式I/O模型的区别在于,直到数据被复制到应用进程的缓冲区时才发送信号。

示意图:

五种I/O模型的比较:

除了异步I/O模型,前四种I/O模型都是按顺序分步执行,且需要通过阻塞应用进程来完成数据的复制,因此前四种I/O模型被成为同步I/O模型。

二,IO模型编程

0.关于fd_set

文件描述符有个集合fd_set,对fd_set有如下操作:

include

int FD_ZERO(int fd, fd_set *fdset); //将fd_set所有位置0,清空fd_set所指向的集合
int FD_CLR(int fd, fd_set *fdset); //将fd_set某一位置0,将指定的fd从fd_set移除
int FD_SET(int fd, fd_set *fd_set); //将fd_set某一位置1,将指定的fd添加到fd_set
int FD_ISSET(int fd, fd_set *fdset); //检测fd_set某一位是否为1,检测某fd是否在fd_set中

1.select函数

select函数让内核等待多个事件中的任何一个发生,当有一个或多个事件发生或者等待时间超过设定时间后,内核唤醒应用进程开始处理。

不止是套接字描述符,任何描述符都可以使用select进行等待。

include

include

int select(int maxfdp, fd_set * readset, fd_set * writeset, fd_set * exceptset, const struct timeval * timeout);

返回:若有就绪的描述符,返回其数量;若超时,返回0;若出错,返回-1。
–maxfdp参数:

含义:指定需要等待就绪的描述符的个数。

–timeout参数:

含义:内核的最长等待时间,设为const,表示函数返回时参数不会被select修改。

timeval结构体:

struct timeval {
long tv_sec; /* second / long tv_usec; / microsecond */
};
timeout参数有三种模式:

一直等待下去(timeout设为空指针时),等待一段固定时间,立即返回不做等待(轮询模式,秒数和微秒数设为0)。

–readset/writeset/exceptset参数:

含义:可读的描述符/可写的描述符/异常状态就绪时,内核唤醒应用进程做相应处理。

读/写/异常就绪的条件:

2.poll函数

include

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

返回:若有就绪的描述符,返回其数量;若超时,返回0;若出错,返回-1。
–fds参数:

含义:指向被监听的描述符的数组

pollfd结构体:

struct pollfd {
   int fd; /* file descriptor to check /    short events; / events of interest on fd /    short revents; / events that occured on fd */
};
–ndfs参数:

含义:fds数组中的fd数

–timeout参数:

含义:poll函数返回前等待多长时间。

timeout=INFTIM:永远等待

timeout=0:立即返回,不阻塞进程

timeout>0:等待指定的毫秒数

3.epoll函数

epoll使用一个文件描述符管理多个描述符,执行步骤:

step.01:调用epoll_create,创建epoll实例。

step.02:调用epoll_ctl,往epoll对象添加需要监听的文件描述符,给被监听的文件描述符注册回调函数。

step.03:调用epoll_wait,类似于select的执行过程,等待监听事件的发生, 返回需要处理的事件数目。

epoll_create:创建epoll实例,向系统申请资源,用size告诉内核需要监听的事件数目

include

int epoll_create(int size);
一个epoll实例内部的组成:

需要被监听的文件描述符–使用红黑树结构存储

已经就绪的文件描述符–使用链表结构存储

epoll_ctl:对指定的文件描述符进行处理

include

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
epoll_ctl 会将文件描述符 fd 添加到 epoll 实例的监听列表里,并给fd配一个回调函数。然后epoll_ctl开始监听fd上有没有发生event事件,如果发生了,则调用对应的回调函数。最后将fd从被监听的列表移出,添加到就绪队列上。

event事件的结构体:

struct epoll_event {
__uint32_t events; /* Epoll events / epoll_data_t data; / User data variable */
};

events 描述事件类型,具体类型有:
EPOLLIN:表示对应的文件描述符可读
EPOLLOUT:表示对应的文件描述符可写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断
EPOLLLET:将EPOLL设置为边缘触发模式
EPOLLONESHOT:只监听一次事件
–epfd参数:

含义:由epoll_create创建的epoll实例

–fd参数:

含义:需要被监听的文件描述符

–event参数:

含义:需要被监听的事件(可读,可写,发生异常等)

–op参数:

含义:对fd执行的操作

EPOLL_CTL_ADD:注册新的fd到epfd中,并指定新fd的监听事件

EPOLL_CTL_MOD:修改已经注册的fd的监听事件

EPOLL_CTL_DEL:从epfd中删除一个fd

epoll_wait:等待epfd上的io事件到来,最多返回maxevents个事件

include

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

返回:若有就绪的描述符,返回其数量;若超时,返回0;若出错,返回-1。
–events参数:

含义:是一个数组,保存就绪状态的文件描述符

–maxevents参数:

含义:指定events数组的大小

–timeout参数:

含义:最长阻塞等待的时间。如果timeout=-1,则会一直阻塞,如果timeout=0,则函数为非阻塞模式,会立即返回。

*使用完epoll后,必须调用close函数关闭描述符。

select/poll/epoll三者的比较

1.epoll函数是select和poll的增强版,具体表现在:

epoll监视的描述符数量不受限制,它所支持的描述符上限是最大可以打开的文件数目。

被监视的文件描述符列表采用了红黑树结构,IO操作的效率不会随着监视的描述符数量的增长而降低。

2.对被监听描述符集合的存储形式:

poll/select使用线性表存储,epoll使用红黑树存储。

三,水平触发&边缘触发

水平触发LT(Level Trigger):当文件描述符就绪时,会触发通知,如果用户程序没有一次性把数据读/写完,下次就绪时还会发出可读/可写信号进行通知。select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知给进程。

边缘触发ET(Edge Trigger):仅当描述符从未就绪变为就绪时,通知进程一次,之后不会再通知。

两者的区别:边缘触发效率更高,减少了事件被重复触发的次数,边缘触发时不会返回大量用户程序不需要的文件描述符。

对于epoll实现的服务器:

使用边缘触发模式时,当被监听的套接字描述符上有可读事件发生时,服务器进程只会在 epoll_wait 中被通知一次,即使用户进程没有从内核读取数据或者没有把内核中的数据一次性读取完;

使用水平触发模式时,当被监听的套接字描述符上有可读事件发生时,服务器进程会在 epoll_wait 中反复被通知,直到内核中的数据被全部读完才结束;

各种I/O模型对触发方式的支持程度统计如下:

I/O模型

水平触发

边缘触发

select/poll

支持

不支持

信号驱动I/O

不支持

支持

epoll

支持

支持

代码样例:

Demo1:select实现的多路复用

include

include

include

include

include

include

include

include

include

include

define TRUE 1

define FALSE 0

define PORT 8888

int main(int argc , char *argv[])
{
int opt = TRUE;
int master_socket , addrlen , new_socket , client_socket[30] ,
max_clients = 30 , activity, i , valread , sd;
int max_sd;
struct sockaddr_in address;

char buffer[1025];  //data buffer of 1K

//set of socket descriptors
fd_set readfds;  

//a message
char *message = "ECHO Daemon v1.0 \r\n";  

//initialise all client_socket[] to 0 so not checked
for (i = 0; i < max_clients; i++)  
{  
    client_socket[i] = 0;  
}  

//create a master socket
if( (master_socket = socket(AF_INET , SOCK_STREAM , 0)) == 0)  
{  
    perror("socket failed");  
    exit(EXIT_FAILURE);  
}  

//set master socket to allow multiple connections ,
//this is just a good habit, it will work without this
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
      sizeof(opt)) < 0 )  
{  
    perror("setsockopt");  
    exit(EXIT_FAILURE);  
}  

//type of socket created
address.sin_family = AF_INET;  
address.sin_addr.s_addr = INADDR_ANY;  
address.sin_port = htons( PORT );  

//bind the socket to localhost port 8888
if (bind(master_socket, (struct sockaddr *)&address, sizeof(address))<0)  
{  
    perror("bind failed");  
    exit(EXIT_FAILURE);  
}  
printf("Listener on port %d \n", PORT);  

//try to specify maximum of 3 pending connections for the master socket
if (listen(master_socket, 3) < 0)  
{  
    perror("listen");  
    exit(EXIT_FAILURE);  
}  

//accept the incoming connection
addrlen = sizeof(address);  
puts("Waiting for connections ...");  

while(TRUE)  
{  
    //clear the socket set
    FD_ZERO(&readfds);  


    //add master socket to set
    FD_SET(master_socket, &readfds);  
    max_sd = master_socket;  

    //add child sockets to set
    for ( i = 0 ; i < max_clients ; i++)  
    {  
        //socket descriptor
        sd = client_socket[i];  

        //if valid socket descriptor then add to read list
        if(sd > 0)  
            FD_SET( sd , &readfds);  

        //highest file descriptor number, need it for the select function
        if(sd > max_sd)  
            max_sd = sd;  
    }  

    //wait for an activity on one of the sockets , timeout is NULL ,
    //so wait indefinitely
    activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);  

    if ((activity < 0) && (errno!=EINTR))  
    {  
        printf("select error");  
    }  

    //If something happened on the master socket ,
    //then its an incoming connection
    if (FD_ISSET(master_socket, &readfds))  
    {  
        if ((new_socket = accept(master_socket,
                (struct sockaddr *)&address, (socklen_t*)&addrlen))<0)  
        {  
            perror("accept");  
            exit(EXIT_FAILURE);  
        }  

        //inform user of socket number - used in send and receive commands
        printf("New connection , socket fd is %d , ip is : %s , port : %d
              \n" , new_socket , inet_ntoa(address.sin_addr) , ntohs
              (address.sin_port));  

        //send new connection greeting message
        if( send(new_socket, message, strlen(message), 0) != strlen(message) )  
        {  
            perror("send");  
        }  

        puts("Welcome message sent successfully");  

        //add new socket to array of sockets
        for (i = 0; i < max_clients; i++)  
        {  
            //if position is empty
            if( client_socket[i] == 0 )  
            {  
                client_socket[i] = new_socket;  
                printf("Adding to list of sockets as %d\n" , i);  

                break;  
            }  
        }  
    }  

    //else its some IO operation on some other socket
    for (i = 0; i < max_clients; i++)  
    {  
        sd = client_socket[i];  

        if (FD_ISSET( sd , &readfds))  
        {  
            //Check if it was for closing , and also read the
            //incoming message
            if ((valread = read( sd , buffer, 1024)) == 0)  
            {  
                //Somebody disconnected , get his details and print
                getpeername(sd , (struct sockaddr*)&address , \
                    (socklen_t*)&addrlen);  
                printf("Host disconnected , ip %s , port %d \n" ,
                      inet_ntoa(address.sin_addr) , ntohs(address.sin_port));  

                //Close the socket and mark as 0 in list for reuse
                close( sd );  
                client_socket[i] = 0;  
            }  

            //Echo back the message that came in
            else
            {  
                //set the string terminating NULL byte on the end
                //of the data read
                buffer[valread] = '\0';  
                send(sd , buffer , strlen(buffer) , 0 );  
            }  
        }  
    }  
}  

return 0;  

}

Demo2:poll的使用

include

include

include

include

include

include

include

include

define SERVER_PORT 12345

define TRUE 1

define FALSE 0

main (int argc, char *argv[])
{
int len, rc, on = 1;
int listen_sd = -1, new_sd = -1;
int desc_ready, end_server = FALSE, compress_array = FALSE;
int close_conn;
char buffer[80];
struct sockaddr_in6 addr;
int timeout;
struct pollfd fds[200];
int nfds = 1, current_size = 0, i, j;

listen_sd = socket(AF_INET6, SOCK_STREAM, 0);
if (listen_sd < 0)
{
perror(“socket() failed”);
exit(-1);
}

rc = setsockopt(listen_sd, SOL_SOCKET, SO_REUSEADDR,
(char *)&on, sizeof(on));
if (rc < 0)
{
perror(“setsockopt() failed”);
close(listen_sd);
exit(-1);
}

rc = ioctl(listen_sd, FIONBIO, (char *)&on);
if (rc < 0)
{
perror(“ioctl() failed”);
close(listen_sd);
exit(-1);
}

memset(&addr, 0, sizeof(addr));
addr.sin6_family = AF_INET6;
memcpy(&addr.sin6_addr, &in6addr_any, sizeof(in6addr_any));
addr.sin6_port = htons(SERVER_PORT);
rc = bind(listen_sd,
(struct sockaddr *)&addr, sizeof(addr));
if (rc < 0)
{
perror(“bind() failed”);
close(listen_sd);
exit(-1);
}

rc = listen(listen_sd, 32);
if (rc < 0)
{
perror(“listen() failed”);
close(listen_sd);
exit(-1);
}

memset(fds, 0 , sizeof(fds));
fds[0].fd = listen_sd;
fds[0].events = POLLIN;

timeout = (3 * 60 * 1000);

do
{
printf(“Waiting on poll()…\n”);
rc = poll(fds, nfds, timeout);

if (rc < 0)
{
  perror("  poll() failed");
  break;
}
if (rc == 0)
{
  printf("  poll() timed out.  End program.\n");
  break;
}
current_size = nfds;
for (i = 0; i < current_size; i++)
{
  if(fds[i].revents == 0)
    continue;
  if(fds[i].revents != POLLIN)
  {
    printf("  Error! revents = %d\n", fds[i].revents);
    end_server = TRUE;
    break;
  }
  if (fds[i].fd == listen_sd)
  {
    printf("  Listening socket is readable\n");
    do
    {
      new_sd = accept(listen_sd, NULL, NULL);
      if (new_sd < 0)
      {
        if (errno != EWOULDBLOCK)
        {
          perror("  accept() failed");
          end_server = TRUE;
        }
        break;
      }
      printf("  New incoming connection - %d\n", new_sd);
      fds[nfds].fd = new_sd;
      fds[nfds].events = POLLIN;
      nfds++;
    } while (new_sd != -1);
  }
  else
  {
    printf("  Descriptor %d is readable\n", fds[i].fd);
    close_conn = FALSE;
    /* Receive all incoming data on this socket.*/ 
    /* before we loop back and call poll again.*/
    do
    {
      rc = recv(fds[i].fd, buffer, sizeof(buffer), 0);
      if (rc < 0)
      {
        if (errno != EWOULDBLOCK)
        {
          perror("  recv() failed");
          close_conn = TRUE;
        }
        break;
      }

      if (rc == 0)
      {
        printf("  Connection closed\n");
        close_conn = TRUE;
        break;
      }

      len = rc;
      printf("  %d bytes received\n", len);

      rc = send(fds[i].fd, buffer, len, 0);
      if (rc < 0)
      {
        perror("  send() failed");
        close_conn = TRUE;
        break;
      }
    } while(TRUE);

    if (close_conn)
    {
      close(fds[i].fd);
      fds[i].fd = -1;
      compress_array = TRUE;
    }
  }  
}

if (compress_array)
{
  compress_array = FALSE;
  for (i = 0; i < nfds; i++)
  {
    if (fds[i].fd == -1)
    {
      for(j = i; j < nfds-1; j++)
      {
        fds[j].fd = fds[j+1].fd;
      }
      i--;
      nfds--;
    }
  }
}

} while (end_server == FALSE); /* End of serving running.*/

/* Clean up all of the sockets that are open.*/
for (i = 0; i < nfds; i++) { if(fds[i].fd >= 0)
close(fds[i].fd);
}
}

Demo3:epoll的使用

include

include

include

include

include

include

include

include

include

define PORT 1500

define MAX_CON (1200)

static struct epoll_event *events;

int main(int argc, char *argv[])
{
fd_set master;
fd_set read_fds;
struct sockaddr_in serveraddr;
struct sockaddr_in clientaddr;
int fdmax;
int listener;
int newfd;
char buf[1024];
int nbytes;
int addrlen;
int yes;
int epfd = -1;
int res = -1;
struct epoll_event ev;
int i=0;
int index = 0;
int client_fd = -1;

int SnumOfConnection = 0;
time_t Sstart, Send;

if((listener = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
        perror("Server-socket() error lol!");
        exit(1);
}

if(setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1)
{
        perror("Server-setsockopt() error lol!");
        exit(1);
}
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = INADDR_ANY;
serveraddr.sin_port = htons(PORT);
memset(&(serveraddr.sin_zero), '\0', 8);
if(bind(listener, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) == -1)
{
        perror("Server-bind() error lol!");
        exit(1);
}
if(listen(listener, 10) == -1)
{
        perror("Server-listen() error lol!");
        exit(1);
}
fdmax = listener;

events = calloc(MAX_CON, sizeof(struct epoll_event));
if ((epfd = epoll_create(MAX_CON)) == -1) {
        perror("epoll_create");
        exit(1);
}
ev.events = EPOLLIN;
ev.data.fd = fdmax;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fdmax, &ev) < 0) {
        perror("epoll_ctl");
        exit(1);
}
//time(&start);
for(;;)
{
        res = epoll_wait(epfd, events, MAX_CON, -1);
        client_fd = events[index].data.fd;
        for (index = 0; index < MAX_CON; index++) {
                if(client_fd == listener)
                {
                        addrlen = sizeof(clientaddr);
                        if((newfd = accept(listener, (struct sockaddr *)&clientaddr, &addrlen)) == -1)
                        {
                                perror("Server-accept() error lol!");
                        }
                        else
                        {
                                ev.events = EPOLLIN;
                                ev.data.fd = newfd;
                                if (epoll_ctl(epfd, EPOLL_CTL_ADD, newfd, &ev) < 0) {
                                        perror("epoll_ctl");
                                        exit(1);
                                }
                        }
                        break;
                }
                else
                {
                        if (events[index].events & EPOLLHUP)
                        {
                                if (epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev) < 0) {
                                        perror("epoll_ctl");
                                }
                                close(client_fd);
                                break;
                        }
                        if (events[index].events & EPOLLIN)  {
                                if((nbytes = recv(client_fd, buf, sizeof(buf), 0)) <= 0)
                                {
                                        if(nbytes == 0) {
                                        //      printf("socket %d hung up\n", client_fd);
                                        }
                                        else {
                                                printf("recv() error lol! %d", client_fd);
                                                perror("");
                                        }


                                        if (epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &ev) < 0) {
                                                perror("epoll_ctl");
                                        }
                                        close(client_fd);
                                }
                                else
                                {
                                        if(send(client_fd, buf, nbytes, 0) == -1)
                                                perror("send() error lol!");
                                }
                                break;
                        }
                }
        }
}
return 0;

}

写到这里,Linux内核系列也到了尾声,我在写这些推文的过程中,看完了三本书,醍醐灌顶,想把它们推荐给对Linux环境下的C/C++开发感兴趣的朋友:

《UNIX环境高级编程 第3版》

《UNIX网络编程 卷1:套接字联网API 第3版》

《UNIX网络编程 卷2:进程间通信 第2版》

参考教程:

https://code-examples.net/en/q/6a6f
https://www.geeksforgeeks.org/socket-programming-in-cc-handling-multiple-clients-on-server-without-multi-threading
https://www.ibm.com/docs/en/i/7.4?topic=designs-using-poll-instead-select

The End

声明:文中观点不代表本站立场。本文传送门:http://eyangzhen.com/206840.html

(0)
联系我们
联系我们
分享本页
返回顶部