父进程listen,子进程epoll监听事件,客户端会超时,请高手指教。
设计的一个server,开始是单进程模式的,2w/s的访问量没有问题。但为了充分利用多cpu,想把它改成多进程方式。
改动不大,主要就是父进程先创建listen对象,监听端口,然后fork子进程,每个子进程创建自己的epoll,监听listen_fd、accept_fd,以及跟后台各server交互的fd。
问题来了:比如我们起4个子进程(每个cpu一个),然后用10个线程客户端去压力测试,在开始测试的时候,客户端总会有少量的接收超时,没有收到应答包。但是过一段事件之后,就会跑的比较平稳,没有超时,压力也很固定。
调试的时候,关闭了到后台各Server的交互,收到包之后,本地处理就回包,也会有少量的超时。不知道这是怎么回事?
静待高手解答。不要告诉我换多线程,或者父进程epoll监听listen_fd的实现,哥懂那个,只是想明白这种设计范式下的问题可能是什么原因。
另外,据说linux 2.6内核版本已经解决了此设计模式下的惊群问题,但是否会有消息乱序,即A子进程的fd加入到了B子进程的epoll事件里去。
简化的实例代码如下:
- /* 单进程可以监听本地多端口,每个端口创建一个对象,用vector保存起来,用于传递到子进程里,每个子进程去创建epoll监听listen_fd */
- vector<CPollerObject*> g_listen_obj;
- int InitGlobalInstance(){
- pid_t pid = getpid ();
- int iRet;
- /* 启动epoll */
- iRet = CPollerUnit::Ins()->Create(g_conf.m_epoll._maxfd, g_conf.m_epoll._timeout);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create Epoll Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CPollerUnit Create Succ: %p", CPollerUnit::Ins());
- for( vector<CPollerObject*>::iterator it=g_listen_obj.begin(); it!=g_listen_obj.end(); it++){
- (*it)->EnableInput();
- (*it)->AttachPoller();
- }
- /* 初始化内存池管理 */
- iRet = CMemPool::Ins()->Create(g_conf.m_session._bufSize);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create Epoll Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CMemPool Create Succ: %p", CMemPool::Ins());
- /* 初始化连接池 */
- iRet = CTcpPoll::Ins()->Init(g_conf.m_session._max, g_conf.m_errReport._failCount, g_conf.m_errReport._timeInterval);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create CTcpPoll Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CTcpPoll Create Succ: %p", CTcpPoll::Ins());
- /* 初始化统计对象。 */
- char pathWithPid[512] = {'\0'};
- snprintf(pathWithPid, sizeof(pathWithPid)-1, "%s_%d", g_conf.m_stat._prefix, pid);
- iRet = CProxyStat::Instance().Init(g_conf.m_stat._path, pathWithPid);
- if( E_OK!=iRet ){
- fprintf(stderr, "Create CProxyStat Error:%d\n", iRet);
- return E_FAIL;
- }
- DEBUG_LOG("CProxyStat Create Succ: %p", &(CProxyStat::Instance()));
- return 0;
- }
- int MainThread()
- {
- InitGlobalInstance();
- time_t tNow = time(NULL);
- while(true){
- _g_child_flag = 1;
- /* 处理连接任务 */
- CPollerUnit::Ins()->ProcessAllEvents();
- /* 超时检查*/
- CSession::Ins()->CheckTimeOut();
- /* 清理空闲的tcp连接。 */
- CTcpPoll::Ins()->CheckIdle(g_conf.m_session._idle, g_conf.m_session._timeInterval, g_conf.m_session._emptyRatio);
- /* 检查是否到了统计间隔 */
- if( int(time(NULL) - tNow) < g_conf.m_stat._interval) continue;
- CProxyStat::Instance().ProcessStat();
- tNow = time(NULL);
- }
- return -1;
- };
- int fork_process_group()
- {
- //process group配置
- int cpu_process_num = sysconf(_SC_NPROCESSORS_CONF);
- int process_group_num = g_conf.m_iProcessNum;
- /* 不指定进程个数,或者指定进程数比cpu个数还多,修改为cpu的个数。 */
- if( 0 == process_group_num || process_group_num > cpu_process_num )
- process_group_num = cpu_process_num;
- ERROR_LOG( "cpu num: %d, config process num: %d, real process num: %d", cpu_process_num, g_conf.m_iProcessNum, process_group_num);
- int pid;
- for(int i = 0; i < process_group_num ; i++)
- {
- pid = fork();
- switch(pid)
- {
- case -1:
- ERROR_LOG("fork error!");
- return -1;
- case 0:
- SetRunCpu( i );
- MainThread();
- return 0;
- default:
- g_processcpu.insert(make_pair(pid, i));
- };
- }
- return 0;
- }
- int main(int argc,char** argv)
- {
- if( argc <2 ){
- fprintf(stderr, "usage:%s etc\n", argv[0]);
- return -1;
- }
- /* 显示系统说明。 */
- if( 0==strcasecmp(argv[1], "-v") ){
- PrintVersion();
- return 0;
- }
- PrintVersion();
- /* 初始化,读取配置文件。 */
- int iRet = g_conf.Init(argv[1]);
- if( E_OK != iRet ){
- fprintf(stderr, "Init Conf Error!:%d\n", iRet);
- return E_FAIL;
- }
- /* 初始化log */
- log_init(g_conf.m_log._path, g_conf.m_log._level, g_conf.m_log._size, g_conf.m_log._prefix);
- /* 设置信号 */
- SetUser1Handle();
- /* 绑定cpu */
- //if ( g_conf.m_iCpuBind>= 0 ) SetRunCpu(g_conf.m_iCpuBind);
- /* 监听端口 */
- if ( CreateClientSocket() < 0 )
- {
- fprintf(stderr, "CreateClientSocket\n");
- return E_FAIL;
- }
- DEBUG_LOG("CreateClientSocket Succ!");
- /* fork进程,设置为守护进程。 */
- daemon_init();
- fork_process_group();
- #ifndef DEBUG
- pid_t pid;
- for(;;)
- {
- pid = waitpid(-1,NULL,0);
- if ( pid <= 0 )
- {
- ERROR_LOG("waitpid error:%s\n", strerror(errno));
- sleep(1);
- continue;
- }
- fflush(NULL);
- ERROR_LOG("WorkMain[%d] restart...................................\n", pid);
- usleep(100000);
- int cpunum = g_processcpu.find(pid) == g_processcpu.end()? 0 : g_processcpu[pid];
- int newpid = fork();
- switch(newpid){
- case -1:
- ERROR_LOG("fork error\n");
- break;
- case 0:
- SetRunCpu(cpunum);
- MainThread();
- break;
- default:
- g_processcpu[newpid] = cpunum;
- break;
- }
- }
- #endif
- return 0;
- }
复制代码
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(9)
我没仔细看你的代码, 能说下主进程listen accept后怎么将socket fd传给子进程的?
我估计是这块引起的超时.
假设一个客户端被主进程accept后, 子进程没去epoll进行读写的监听,可能会引起超时.
这个我之前也是一直不懂 等人指教
嗯。。没有人来指教一下么。。
加日志查看了一下,消息并没有乱掉,但是epoll已经通知不上来事件。一个发送,或者接收,3s以上都没有触发epoll。
还是不明白这个问题到底是怎么产生的。
ps:修改了代码框架,主进程里增加一个线程进行listen,accept到的fd发给子进程,终于没有问题了,性能达到6w/s
期待内核级人物指点,呵呵。
看了你的描述和代码,说两点:
1 消息乱序理论上是不会的,每个fd只在一个epollfd中管理
2 超时现象比较奇怪,看了你的代码,没看出什么问题,照理说应该不会有这个情况,lighthttp用的进程模型和你是一样的。
是epoll的同一个listenFD吗
会不会取等待队列里的连接的时候出现竞争
好像不对 listenFD的引用计数增加而已
mark
如楼主所说,我们公司框架就是在主进程中进行监听,收到后发子进程。不过性能还远远不及楼主的。