概述
- 网络程序需要处理的第三类事件是定时器事件,比如定期检测一个客户连接的活动状态。
- 服务器程序通常管理着众多定时事件,因此有效地组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响。
为此,需要将每个定时事件分别封装成定时器,并使用某种容器类数据结构,比如链表、排序表和时间轮,将所有定时器串联起来,以实现对定时事件的统一管理。 - 定时是指在一段时间之后触发某段代码的机制,我们可以在这段代码中依次处理所有到期的定时器。
换言之,定时机制是定时器得以被处理的原动力。 - Linux提供了3种定时方法:
- socket选项SO_RCVTIMEO和SO_SNDTIMEO
- SIGALRM信号
- I/O复用系统调用的超时参数
SO_RCVTIMEO和SO_SNDTIMEO
- 这两个socket选项分别用来设置socket接收数据超时时间和发送数据超时时间。
因此,这两个选项仅对与数据接收和发送相关的socket专用系统调用(socketAPI)有效,包括:- send、sendmsg、recv、recvmsg、accept和connect
- 设置connect超时时间:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdlib.h> #include <assert.h> #include <stdio.h> #include <errno.h> #include <fcntl.h> #include <unistd.h> #include <string.h> int timeout_connect(const char* ip, int port, int time) { int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int sockfd = socket(PF_INET, SOCK_STREAM, 0); assert(sockfd >= 0); // 通过选项SO_RCVTIMEO和SO_SNDTIMEO所设置的超时时间的类型是timeval // 这和select系统调用的超时参数类型相同 struct timeval timeout; timeout.tv_sec = time; timeout.tv_usec = 0; socklen_t len = sizeof(timeout); ret = setsockop(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len); assert(ret != -1); ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address)); if (ret == -1) { // 超时对应的错误号EINPROGRESS,下面这个条件如果成立,就可以处理定时任务 if (errno == EINPROGRESS) { printf("connecting timeout, please timeout logic\n"); return -1; } printf("error occur when connecting to server\n"); return -1; } return sockfd; } int main(int argc, char* argv[]) { if (argc <= 2) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int sockfd = timeout_connect(ip, port, 10); if (sockfd < 0) { return 1; } return 0; } |
SIGALRM信号
- 由alarm和setitimer函数设置的实时闹钟一旦超时,将触发SIGALRM信号。
因此,可以利用该信号处理函数来处理定时器任务。 - 但是要处理多个定时任务,就需要不断地触发SIGALRM信号,并在其信号处理函数中执行到期的任务。
- 一般而言,SIGALRM信号按照固定的频率生成,即由alarm或setitimer函数设置的定时周期T保持不变。如果某个定时任务的超时时间不是T的整数倍,那么它实际被执行的时间和预期的时间将略有偏差。因此定时周期T反映了定时的精度。
基于升序链表的定时器
- 定时器通常至少要包含两个成员:
- 一个超时时间(相对时间或绝对时间)
- 一个任务回调函数
- 有的时候还可能包含回调函数被执行时需要传入的参数,以及是否重启定时器等信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
#ifndef LST_TIMER #define LST_TIMER #include <time.h> #define BUFFER_SIZE 64 class util_timer; // 用户数据结构 struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; util_timer* timer; }; // 定时器类 class util_timer { public: util_timer():prev(NULL),next(NULL) {} public: time_t expire; // 任务的超时时间 void (*cb_func)(client_data*); // 任务回调函数 // 回调函数处理的客户数据,由定时器的执行者传递给回调函数 client_data* user_data; util_timer* prev; // 指向前一个定时器 util_timer* next; // 指向下一个定时器 }; class sort_timer_lst { public: sort_timer_lst():head(NULL),tail(NULL){} ~sort_timer_lst() { util_timer* tmp = head; while(tmp) { head = tmp->next; delete tmp; tmp = head; } } // 将目标定时器timer添加到链表中 void add_timer(util_timer* timer) { if (!timer) { return; } if(!head) { head = tail = timer; return; } // 如果目标定时器的超时时间小于当前链表中所有定时器的超时时间 // 则把该定时器插入链表头部作为链表新的头节点 // 否则就需要调用重载函数add_timer(util_timer* timer, util_timer* lst_head), // 把它插入链表中合适的位置,以保证链表的升序特性 if (timer->expire < head->expire) { timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head); } // 当某个定时任务发生变化时,调整对应的定时器在链表中的位置 // 这个函数只有考虑被调整的定时器的超时时间延长的情况,即该定时器需要往链表的尾部移动 void adjust_timer(util_timer* timer) { if (!timer) { return; } util_timer* tmp = timer->next; // 如果被调整的目标定时器处在链表尾部,或者该定时器新的超时值仍然小于其下一个定时器的超时值,则不用调整 if (!tmp || (timer->expire < tmp->expire)) { return; } // 如果目标定时器是链表的头节点,则该定时器从链表中取出并重新插入链表 if (timer = head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else { // 如果定时器不是链表的头节点,则将该定时器从链表中取出,然后插入其原来所在位置之后的部分链表中 timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); } } // 将目标定时器timer从链表中删除 void del_timer(util_timer* timer) { if (!timer) { return; } // 下面成立表示链表中只有一个定时器,即目标定时器 if ((timer == head) && (timer == tail)) { delete timer; head = NULL; tail = NULL; return; } // 如果链表中至少有两个定时器,且目标定时器是链表的头结点 // 则链表的头结点重置为原头结点的下一个节点,然后删除目标定时器 if (timer == head) { head = head->next; tail->next = NULL; delete timer; return; } // 如果目标定时器位于链表的中间,则把它前后的定时器串联起来,然后删除目标定时器 timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; } // SIGALRM信号每次被触发就在其信号处理函数(如果使用统一信号源,则是主函数)中 // 执行一次tick函数,以处理链表上到期的任务 void tick() { if (!head) { return; } printf("timer tick\n"); time_t cur = time(NULL); // 获得系统当前时间 util_timer* tmp = head; // 从头结点开始依次处理每个定时器,直到遇到一个尚未到期的定时器 // 这就是定时器的核心逻辑 while(tmp) { // 因为每个定时器都使用绝对时间作为超时值 // 所以我们可以把定时器的超时值和系统当前时间,比较以判断定时器是否到期 if (cur < tmp->expire) { break; } // 调用定时器的回调函数,以执行定时任务 tmp->cb_func(tmp->user_data); // 执行完定时器中的定时任务之后,就讲它从链表中删除,并重置链表头结点 head = tmp->next; if (head) { head->prev = NULL; } delete tmp; tmp = head; } } private: // 一个重载的辅助函数,它被公有的add_timer函数和adjust_timer函数调用, // 该函数表示讲目标定时器timer添加到节点lst_head之后的部分链表中 void add_timer(util_timer* timer, util_timer* lst_head) { util_timer* prev = lst_head; util_timer* tmp = prev->next; // 遍历lst_head节点之后的部分链表,直到直到一个超时时间大于目标定时器的超时时间的 // 节点,并将目标定时器插入该节点之前 while(tmp) { if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } // 如果遍历lst_head节点之后的部分链表,仍未找到超时时间大于目标定时器的超时时间节点, // 则将目标定时器插入链表尾部,并将它设置为链表新的尾节点 if (!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } } private: util_timer* head; util_timer* tail; }; #endif |
处理非活动连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #inlcude <sys/epoll.h> #include <pthread.h> #include "lst_timer.h" #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; static sort_timer_lst timer_lst; static int epollfd = 0; int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(pipefd[1], (char*)&msg, 1, 0); errno = save_errno; } void addsig(int sig) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } void timer_handler() { // 定时处理任务,实际上就是调用tick函数 timer_lst.tick(); // 因为一次alarm调用只会引起一次SIGALRM信号,所以要重新定时 // 以不断触发SIGALRM信号 alarm(TIMESLOT); } // 定时器回调函数,它删除非活动连接socket上的注册事件,并关闭之 void cb_func(client_data* user_data) { epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); assert(user_data); close(user_data->sockfd); printf("close fd %d\n", user_data->sockfd); } int main(int argc, char* argv[]) { if (argc <= 2) { printf("usage %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setnonblocking(pepefd[1]); addfd(epollfd, pipefd[0]); // 设置信号处理函数 addsig(SIGALRM); addsig(SIGTERM); bool stop_server = false; client_data* users = new client_data[FD_LIMIT]; bool timeout = false; alarm(TIMESLOT); // 定时 while(!stop_server) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; // 处理新到的客户连接 if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd); users[connfd].address = client_address; users[connfd].sockfd = connfd; // 创建定时器,设置其回调函数与超时时间,然后绑定定时器与用户数据 // 最后将定时器添加到链表timer_lst中 util_timer* timer = new util_timer; timer_user_data = &users[connfd]; timer->cb_func = cb_func; time_t cur = time(NULL); tiemr->expire = cur + 3*TIMESLOT; users[connfd].timer = timer; timer_lst.add_timer(timer); } else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { // 处理信号 int sig; char signals[1024]; ret = recv(pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { // handle the error continue; } else if(ret == 0) { continue; } else { for(inti = 0; i < ret; ++i) { switch(signals[i]) { case SIGALRM: // 用timeout变量标记有定时任务需要处理,但不立即处理定时任务 // 这是因为定时任务的优先级不是很高,优先处理其他更重要的任务 timeout = true; break; case SIGTERM: stop_server = true; } } } } else if(events[i].events & EPOLLIN) { // 处理客户连接上接收到的数据 memset(users[sockfd].buf, '\0', BUFFER_SIZE); ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0); printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd); util_timer* timer = users[sockfd].timer; if (ret < 0) { // 如果发生读错误,则关闭连接,并移除其对应的定时器 if (errno != EAGAIN) { cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } } else if (ret == 0) { // 如果对方已经关闭连接,则我们也关闭连接,并移除对应的定时器 cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } else { // 如果某个客户上有数据可读 // 则我们需要调整该连接对应的定时器,以延迟该连接关闭的时间 if (timer) { time_t cur = time(NULL); tiemr->expire = cur + 3*TIMESLOT; printf("adjust timer once\n"); tiemr_lst.adjust_timer(timer); } } } else { // others } } if (timeout) { timer_handler(); timeout = false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); delete[] users; return 0; } |
I/O复用系统调用的超时参数
- Linux下的3组I/O复用系统调用都带有超时参数,因此它们不仅能统一处理信号和I/O事件,也能统一处理定时事件.
- 但是由于I/O复用系统调用可能在超时时间到期之前就返回(比如有I/O事件发生),所以我们要利用它们来定时,就需要不断更新定时参数以反映剩余的时间.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
#define TIMEOUT 5000 int timeout = TIMEOUT; time_t start = time(NULL); time_t end = time(NULL); while (1) { printf("the timeout is now %d mil-seconds\n", timeout); start = time(NULL); int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } // 如果epoll_wait成功返回0,就说明超时时间到,此时便可处理定时任务,并重置定时时间 if (number == 0) { timeout = TIMEOUT; continue; } end = time(NULL); // 如果epoll_wait的返回值大于0,则本次epoll_wait调用持续时间是(end - start) // 我们需要特定时间timeout减去这段时间,以获得下次epoll_wait调用的超时参数 timeout = (end - start) * 1000; // 重新计算之后的timeout值有可能等于0,说明本次epoll_wait调用返回时,不仅有文件描述符就绪 // 而且其超时时间也刚好到达,此时我们也要处理定时任务,并重置定时时间 if (timeout <= 0) { timeout = TIMEOUT; } // handle connection } |
本文为原创文章,版权归Aet所有,欢迎分享本文,转载请保留出处!
你可能也喜欢
- ♥ Linux高性能服务器编程:TCP/IP协议族09/02
- ♥ Linux 基于文件指针的文件操作(缓冲)03/23
- ♥ vim编辑器的配置03/18
- ♥ 51CTO:Linux C++网络编程二08/14
- ♥ x86_64汇编学习记述二08/07
- ♥ Linux_ 命令大全 文件管理03/16