父进程listen,子进程epoll监听事件,客户端会超时,请高手指教。

发布于 2022-10-15 08:47:20 字数 13813 浏览 25 评论 0

设计的一个server,开始是单进程模式的,2w/s的访问量没有问题。但为了充分利用多cpu,想把它改成多进程方式。

改动不大,主要就是父进程先创建listen对象,监听端口,然后fork子进程,每个子进程创建自己的epoll,监听listen_fd、accept_fd,以及跟后台各server交互的fd。

问题来了:比如我们起4个子进程(每个cpu一个),然后用10个线程客户端去压力测试,在开始测试的时候,客户端总会有少量的接收超时,没有收到应答包。但是过一段事件之后,就会跑的比较平稳,没有超时,压力也很固定。

调试的时候,关闭了到后台各Server的交互,收到包之后,本地处理就回包,也会有少量的超时。不知道这是怎么回事?

静待高手解答。不要告诉我换多线程,或者父进程epoll监听listen_fd的实现,哥懂那个,只是想明白这种设计范式下的问题可能是什么原因。

另外,据说linux 2.6内核版本已经解决了此设计模式下的惊群问题,但是否会有消息乱序,即A子进程的fd加入到了B子进程的epoll事件里去。

简化的实例代码如下:

  1. /* 单进程可以监听本地多端口,每个端口创建一个对象,用vector保存起来,用于传递到子进程里,每个子进程去创建epoll监听listen_fd */
  2. vector<CPollerObject*> g_listen_obj;
  3. int InitGlobalInstance(){
  4.         pid_t pid = getpid ();
  5.         int iRet;
  6.        
  7.         /* 启动epoll */
  8.         iRet = CPollerUnit::Ins()->Create(g_conf.m_epoll._maxfd, g_conf.m_epoll._timeout);
  9.         if( E_OK!=iRet ){
  10.                 fprintf(stderr, "Create Epoll Error:%d\n", iRet);
  11.                 return E_FAIL;
  12.         }
  13.         DEBUG_LOG("CPollerUnit Create Succ: %p", CPollerUnit::Ins());
  14.        
  15.         for( vector<CPollerObject*>::iterator it=g_listen_obj.begin(); it!=g_listen_obj.end(); it++){
  16.                 (*it)->EnableInput();
  17.                 (*it)->AttachPoller();
  18.         }
  19.        
  20.         /* 初始化内存池管理 */
  21.         iRet = CMemPool::Ins()->Create(g_conf.m_session._bufSize);
  22.         if( E_OK!=iRet ){
  23.                 fprintf(stderr, "Create Epoll Error:%d\n", iRet);
  24.                 return E_FAIL;
  25.         }
  26.         DEBUG_LOG("CMemPool Create Succ: %p", CMemPool::Ins());
  27.         /* 初始化连接池 */
  28.         iRet = CTcpPoll::Ins()->Init(g_conf.m_session._max, g_conf.m_errReport._failCount, g_conf.m_errReport._timeInterval);
  29.         if( E_OK!=iRet ){
  30.                 fprintf(stderr, "Create CTcpPoll Error:%d\n", iRet);
  31.                 return E_FAIL;
  32.         }
  33.         DEBUG_LOG("CTcpPoll Create Succ: %p", CTcpPoll::Ins());
  34.         /* 初始化统计对象。 */
  35.         char pathWithPid[512] = {'\0'};
  36.         snprintf(pathWithPid, sizeof(pathWithPid)-1, "%s_%d", g_conf.m_stat._prefix, pid);
  37.         iRet = CProxyStat::Instance().Init(g_conf.m_stat._path, pathWithPid);
  38.         if( E_OK!=iRet ){
  39.                 fprintf(stderr, "Create CProxyStat Error:%d\n", iRet);
  40.                 return E_FAIL;               
  41.         }
  42.         DEBUG_LOG("CProxyStat Create Succ: %p", &(CProxyStat::Instance()));
  43.         return 0;
  44. }
  45. int MainThread()
  46. {
  47.         InitGlobalInstance();
  48.         time_t tNow = time(NULL);
  49.         while(true){
  50.                 _g_child_flag = 1;
  51.                 /* 处理连接任务 */
  52.                 CPollerUnit::Ins()->ProcessAllEvents();
  53.                        
  54.                 /* 超时检查*/
  55.                 CSession::Ins()->CheckTimeOut();
  56.                 /* 清理空闲的tcp连接。 */
  57.                 CTcpPoll::Ins()->CheckIdle(g_conf.m_session._idle, g_conf.m_session._timeInterval, g_conf.m_session._emptyRatio);
  58.                 /* 检查是否到了统计间隔 */
  59.                 if( int(time(NULL) - tNow) < g_conf.m_stat._interval) continue;
  60.                 CProxyStat::Instance().ProcessStat();
  61.                 tNow = time(NULL);
  62.         }
  63.        
  64.         return -1;
  65. };
  66. int fork_process_group()
  67. {
  68.         //process group配置
  69.         int cpu_process_num = sysconf(_SC_NPROCESSORS_CONF);
  70.         int process_group_num        = g_conf.m_iProcessNum;       
  71.        
  72.         /* 不指定进程个数,或者指定进程数比cpu个数还多,修改为cpu的个数。 */
  73.         if( 0 == process_group_num || process_group_num > cpu_process_num )
  74.                 process_group_num = cpu_process_num;
  75.         ERROR_LOG( "cpu num: %d, config process num: %d, real process num: %d", cpu_process_num, g_conf.m_iProcessNum, process_group_num);
  76.         int pid;
  77.         for(int i = 0; i < process_group_num ; i++)
  78.         {
  79.                 pid = fork();
  80.                 switch(pid)
  81.                 {
  82.                         case -1:
  83.                                 ERROR_LOG("fork error!");
  84.                                 return -1;
  85.                         case 0:
  86.                                 SetRunCpu( i );
  87.                                 MainThread();
  88.                                 return 0;
  89.                         default:
  90.                                 g_processcpu.insert(make_pair(pid, i));
  91.                 };
  92.         }
  93.         return 0;
  94. }
  95. int main(int argc,char** argv)
  96. {
  97.         if( argc <2 ){
  98.                 fprintf(stderr, "usage:%s etc\n", argv[0]);
  99.                 return -1;
  100.         }
  101.         /* 显示系统说明。 */
  102.         if( 0==strcasecmp(argv[1], "-v") ){
  103.                 PrintVersion();
  104.                 return 0;
  105.         }
  106.         PrintVersion();
  107.         /* 初始化,读取配置文件。 */
  108.         int iRet = g_conf.Init(argv[1]);
  109.         if( E_OK != iRet ){
  110.                 fprintf(stderr, "Init Conf Error!:%d\n", iRet);
  111.                 return E_FAIL;
  112.         }
  113.         /* 初始化log */
  114.         log_init(g_conf.m_log._path, g_conf.m_log._level, g_conf.m_log._size, g_conf.m_log._prefix);
  115.         /* 设置信号 */
  116.         SetUser1Handle();
  117.         /* 绑定cpu */
  118.         //if ( g_conf.m_iCpuBind>= 0 )        SetRunCpu(g_conf.m_iCpuBind);       
  119.         /* 监听端口 */
  120.         if ( CreateClientSocket() < 0 )
  121.         {
  122.                 fprintf(stderr, "CreateClientSocket\n");
  123.                 return E_FAIL;
  124.         }
  125.         DEBUG_LOG("CreateClientSocket Succ!");
  126.         /* fork进程,设置为守护进程。 */
  127.         daemon_init();
  128.         fork_process_group();
  129. #ifndef DEBUG       
  130.         pid_t pid;
  131.         for(;;)
  132.         {
  133.                 pid = waitpid(-1,NULL,0);
  134.                 if ( pid <= 0 )
  135.                 {
  136.                         ERROR_LOG("waitpid error:%s\n", strerror(errno));
  137.                         sleep(1);
  138.                         continue;
  139.                 }
  140.                 fflush(NULL);
  141.                
  142.                 ERROR_LOG("WorkMain[%d] restart...................................\n", pid);
  143.                 usleep(100000);
  144.                
  145.                 int cpunum = g_processcpu.find(pid) == g_processcpu.end()?  0 : g_processcpu[pid];
  146.                 int newpid = fork();
  147.                 switch(newpid){
  148.                         case -1:
  149.                                 ERROR_LOG("fork error\n");
  150.                                 break;
  151.                         case 0:
  152.                                 SetRunCpu(cpunum);
  153.                                 MainThread();
  154.                                 break;
  155.                         default:
  156.                                 g_processcpu[newpid] = cpunum;
  157.                                 break;
  158.                 }
  159.         }
  160. #endif
  161.        
  162.         return 0;
  163. }

复制代码

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(9

浮世清欢 2022-10-22 08:47:21

我没仔细看你的代码, 能说下主进程listen accept后怎么将socket fd传给子进程的?
我估计是这块引起的超时.

假设一个客户端被主进程accept后, 子进程没去epoll进行读写的监听,可能会引起超时.

桃扇骨 2022-10-22 08:47:20

这个我之前也是一直不懂 等人指教

贱贱哒 2022-10-22 08:47:20

嗯。。没有人来指教一下么。。

哑剧 2022-10-22 08:47:20

梦里寻她 2022-10-22 08:47:20

加日志查看了一下,消息并没有乱掉,但是epoll已经通知不上来事件。一个发送,或者接收,3s以上都没有触发epoll。

还是不明白这个问题到底是怎么产生的。

ps:修改了代码框架,主进程里增加一个线程进行listen,accept到的fd发给子进程,终于没有问题了,性能达到6w/s

凉城 2022-10-22 08:47:20

期待内核级人物指点,呵呵。

旧人哭 2022-10-22 08:47:20

看了你的描述和代码,说两点:
1 消息乱序理论上是不会的,每个fd只在一个epollfd中管理
2 超时现象比较奇怪,看了你的代码,没看出什么问题,照理说应该不会有这个情况,lighthttp用的进程模型和你是一样的。

夜声 2022-10-22 08:47:20

是epoll的同一个listenFD吗
会不会取等待队列里的连接的时候出现竞争
好像不对  listenFD的引用计数增加而已

仙女 2022-10-22 08:47:20

mark
如楼主所说,我们公司框架就是在主进程中进行监听,收到后发子进程。不过性能还远远不及楼主的。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文