将 fork 与 setjmp/longjmp 一起使用

发布于 2024-12-13 12:21:02 字数 5509 浏览 3 评论 0原文

我试图使用 fork 结合 setjmp/longjmp 来实现基于多线程 fork 的检查点方案。我希望我的解决方案能够奏效,但正如预期的那样,它没有奏效。下面显示了代码以及检查点/回滚的示例用法。

主要思想是自己为线程分配堆栈,就像使用函数pthread_create_with_stack一样,然后只使用主线程的分支。分叉进程(检查点)在开始时被挂起,当被唤醒(回滚)时,分叉进程的主线程通过调用 pthread_create 重新创建线程,并使用与原始进程中的线程相同的堆栈。另外longjmp是在线程例程的开头完成的,以便作为检查点跳转到进程fork时代码中的同一点。请注意,所有 setjmp 调用都是在函数 my_pthread_barrier_wait 内完成的,因此没有线程获取锁。

我认为这里的问题是setjmp/lonjmpgetcontext/savecontext/makecontext 在这里有帮助吗?还是其他什么?甚至可以在这里以这样的方式使用 setjmp/longjmp 吗?任何解决方案将不胜感激。

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <unistd.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/types.h>
#include <setjmp.h>

#define PERFORM_JMP

#define NUM_THREADS 4

void *stackAddr[NUM_THREADS];
pthread_t thread[NUM_THREADS];
jmp_buf buf[NUM_THREADS];
pthread_attr_t attr[NUM_THREADS];
pthread_barrier_t bar;
sem_t sem;
pid_t cp_pid;
int rollbacked;
int iter;
long thread_id[NUM_THREADS];

void *BusyWork(void *t);

void sig_handler(int signum)
{
    printf( "signal_handler posting sem!\n" );
    sem_post( &sem );
}

int pthread_create_with_stack( void *(*start_routine) (void *), int tid )
{
    const size_t STACKSIZE = 0xC00000; //12582912
    size_t i;
    pid_t pid;
    int rc;

    printf( "tid = %d\n", tid );

    pthread_attr_init( &attr[tid] );
    stackAddr[tid] = malloc(STACKSIZE);
    pthread_attr_setstack( &attr[tid], stackAddr[tid], STACKSIZE );

    thread_id[tid] = tid;
    rc = pthread_create( &thread[tid], &attr[tid], start_routine, (void*)&thread_id[tid] );

    if (rc) 
    {
        printf("ERROR; return code from pthread_create() is %d\n", rc);
        exit(-1);
    }

    return rc;
}

pid_t checkpoint()
{
    pid_t pid;
    int t, rc;

    switch (pid=fork()) 
    {
    case -1: 
        perror("fork"); 
        break;
    case 0:         // child process starts
        sem_wait( &sem );
        rollbacked = 1;
        printf( "case 0: rollbacked = 1, my pid is %d\n", getpid() );
        for( t = 1; t < NUM_THREADS; t++ ) 
        {
            printf( "checkpoint: creating thread %d again\n", t );
            rc = pthread_create( &thread[t], &attr[t], BusyWork, (void*)&thread_id[t] );
            if (rc) 
            {
                printf("ERROR; return code from pthread_create() is %d\n", rc);
                exit(-1);
            }
        }
        return 1;  // child process ends
    default:        // parent process starts
        return pid;
    }
}

void restart_from_checkpoint( pid_t pid )
{
    printf( "Restart_from_checkpoint, sending signal to %d!\n", pid );
    kill( pid, SIGUSR1 );
    exit( 0 );
}

void take_checkpoint_or_rollback( int sig_diff )
{
    if ( cp_pid )
    {
        if ( sig_diff )
        {
            printf( "rollbacking\n" );
            if ( !rollbacked )
                restart_from_checkpoint( cp_pid );
        }
        else
        {
            kill( cp_pid, SIGKILL );
            cp_pid = checkpoint();
            printf( "%d: cp_pid = %d!\n", getpid(), cp_pid );
        }
    }
    else
        cp_pid = checkpoint();
}

void my_pthread_barrier_wait( int tid, pthread_barrier_t *pbar )
{
    pthread_barrier_wait( pbar );
#ifdef PERFORM_JMP   
    if ( tid == 0 )
    {
        if ( !rollbacked )
        {
            take_checkpoint_or_rollback( ++iter == 4 );
        }
    }
    if ( setjmp( buf[tid] ) != 0 ) {}
    else {}
    printf( "%d: %d is waiting at the second barrier!\n", getpid(), tid );
#endif
    pthread_barrier_wait( pbar );
}

void *BusyWork(void *t)
{
   volatile int i;
   volatile long tid = *((long*)t);
   volatile double result = 0.0;

   printf( "thread %ld in BusyWork!\n", tid );
#ifdef PERFORM_JMP   
   if ( rollbacked )
   {
    printf( "hmm, thread %ld is now doing a longjmp, goodluck!\n", tid );
    longjmp( buf[tid], 1 );
   }
#endif
   printf("Thread %ld starting...\n",tid);
   for ( i = 0; i < 10; i++)
   {
      result += (tid+1) * i;
      printf( "%d: tid %ld: result = %g\n", getpid(), tid, result );
      my_pthread_barrier_wait(tid, &bar);
   }
   printf("Thread %ld done. Result = %g\n", tid, result);
   //pthread_exit((void*) t);
}

int main (int argc, char *argv[])
{
   int rc;
   long t;
   void *status;

   /* Initialize and set thread detached attribute */
   pthread_barrier_init(&bar, NULL, NUM_THREADS);
#ifdef PERFORM_JMP   
   signal(SIGUSR1, sig_handler);
   sem_init( &sem, 0, 0 );
#endif
   for( t = 1; t < NUM_THREADS; t++ ) 
   {
      printf( "Main: creating thread %ld\n", t );
      rc = pthread_create_with_stack( BusyWork, t ); // This is the line 52
      if (rc) 
      {
         printf("ERROR; return code from pthread_create() is %d\n", rc);
         exit(-1);
      }
   }

   thread_id[0] = 0;
   BusyWork( &thread_id[0] );

   /* Free attribute and wait for the other threads */
   for(t=1; t<NUM_THREADS; t++) 
   {
      rc = pthread_join(thread[t], &status);
      if (rc) 
      {
         printf("ERROR; return code from pthread_join() is %d\n", rc);
         exit(-1);
      }
      printf("Main: completed join with thread %ld having a status"   
            "of %ld\n",t,(long)status);
    }

    printf("Main: program completed. Exiting.\n");
    pthread_exit(NULL);
}

I was trying to implement a checkpointing scheme based on multithreaded fork using fork combined with setjmp/longjmp. I was hoping my solution would work but as expected it didn't. The code is shown below with an example usage for checkpoint/rollback.

The main idea is to allocate stacks for the threads myself, as done by using the function pthread_create_with_stack and then just use a fork from the main thread. The forked process (checkpoint) is suspended at the beginning and when awoken (rollbacking), the main thread of the forked process recreates the threads by calling pthread_create and use the same stacks as threads in original process. Also longjmp is done in the thread routine at the beginning, so as to jump to the same point in the code when process was forked as a checkpoint. Note that all setjmp calls are done inside function my_pthread_barrier_wait so that no thread has acquired a lock.

I think the problem here is setjmp/lonjmp. Will getcontext/savecontext/makecontext help here, or anything else? Can even setjmp/longjmp be used in such a way here that it works? Any solution will be greatly appreciated.

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <unistd.h>
#include <semaphore.h>
#include <signal.h>
#include <sys/types.h>
#include <setjmp.h>

#define PERFORM_JMP

#define NUM_THREADS 4

void *stackAddr[NUM_THREADS];
pthread_t thread[NUM_THREADS];
jmp_buf buf[NUM_THREADS];
pthread_attr_t attr[NUM_THREADS];
pthread_barrier_t bar;
sem_t sem;
pid_t cp_pid;
int rollbacked;
int iter;
long thread_id[NUM_THREADS];

void *BusyWork(void *t);

void sig_handler(int signum)
{
    printf( "signal_handler posting sem!\n" );
    sem_post( &sem );
}

int pthread_create_with_stack( void *(*start_routine) (void *), int tid )
{
    const size_t STACKSIZE = 0xC00000; //12582912
    size_t i;
    pid_t pid;
    int rc;

    printf( "tid = %d\n", tid );

    pthread_attr_init( &attr[tid] );
    stackAddr[tid] = malloc(STACKSIZE);
    pthread_attr_setstack( &attr[tid], stackAddr[tid], STACKSIZE );

    thread_id[tid] = tid;
    rc = pthread_create( &thread[tid], &attr[tid], start_routine, (void*)&thread_id[tid] );

    if (rc) 
    {
        printf("ERROR; return code from pthread_create() is %d\n", rc);
        exit(-1);
    }

    return rc;
}

pid_t checkpoint()
{
    pid_t pid;
    int t, rc;

    switch (pid=fork()) 
    {
    case -1: 
        perror("fork"); 
        break;
    case 0:         // child process starts
        sem_wait( &sem );
        rollbacked = 1;
        printf( "case 0: rollbacked = 1, my pid is %d\n", getpid() );
        for( t = 1; t < NUM_THREADS; t++ ) 
        {
            printf( "checkpoint: creating thread %d again\n", t );
            rc = pthread_create( &thread[t], &attr[t], BusyWork, (void*)&thread_id[t] );
            if (rc) 
            {
                printf("ERROR; return code from pthread_create() is %d\n", rc);
                exit(-1);
            }
        }
        return 1;  // child process ends
    default:        // parent process starts
        return pid;
    }
}

void restart_from_checkpoint( pid_t pid )
{
    printf( "Restart_from_checkpoint, sending signal to %d!\n", pid );
    kill( pid, SIGUSR1 );
    exit( 0 );
}

void take_checkpoint_or_rollback( int sig_diff )
{
    if ( cp_pid )
    {
        if ( sig_diff )
        {
            printf( "rollbacking\n" );
            if ( !rollbacked )
                restart_from_checkpoint( cp_pid );
        }
        else
        {
            kill( cp_pid, SIGKILL );
            cp_pid = checkpoint();
            printf( "%d: cp_pid = %d!\n", getpid(), cp_pid );
        }
    }
    else
        cp_pid = checkpoint();
}

void my_pthread_barrier_wait( int tid, pthread_barrier_t *pbar )
{
    pthread_barrier_wait( pbar );
#ifdef PERFORM_JMP   
    if ( tid == 0 )
    {
        if ( !rollbacked )
        {
            take_checkpoint_or_rollback( ++iter == 4 );
        }
    }
    if ( setjmp( buf[tid] ) != 0 ) {}
    else {}
    printf( "%d: %d is waiting at the second barrier!\n", getpid(), tid );
#endif
    pthread_barrier_wait( pbar );
}

void *BusyWork(void *t)
{
   volatile int i;
   volatile long tid = *((long*)t);
   volatile double result = 0.0;

   printf( "thread %ld in BusyWork!\n", tid );
#ifdef PERFORM_JMP   
   if ( rollbacked )
   {
    printf( "hmm, thread %ld is now doing a longjmp, goodluck!\n", tid );
    longjmp( buf[tid], 1 );
   }
#endif
   printf("Thread %ld starting...\n",tid);
   for ( i = 0; i < 10; i++)
   {
      result += (tid+1) * i;
      printf( "%d: tid %ld: result = %g\n", getpid(), tid, result );
      my_pthread_barrier_wait(tid, &bar);
   }
   printf("Thread %ld done. Result = %g\n", tid, result);
   //pthread_exit((void*) t);
}

int main (int argc, char *argv[])
{
   int rc;
   long t;
   void *status;

   /* Initialize and set thread detached attribute */
   pthread_barrier_init(&bar, NULL, NUM_THREADS);
#ifdef PERFORM_JMP   
   signal(SIGUSR1, sig_handler);
   sem_init( &sem, 0, 0 );
#endif
   for( t = 1; t < NUM_THREADS; t++ ) 
   {
      printf( "Main: creating thread %ld\n", t );
      rc = pthread_create_with_stack( BusyWork, t ); // This is the line 52
      if (rc) 
      {
         printf("ERROR; return code from pthread_create() is %d\n", rc);
         exit(-1);
      }
   }

   thread_id[0] = 0;
   BusyWork( &thread_id[0] );

   /* Free attribute and wait for the other threads */
   for(t=1; t<NUM_THREADS; t++) 
   {
      rc = pthread_join(thread[t], &status);
      if (rc) 
      {
         printf("ERROR; return code from pthread_join() is %d\n", rc);
         exit(-1);
      }
      printf("Main: completed join with thread %ld having a status"   
            "of %ld\n",t,(long)status);
    }

    printf("Main: program completed. Exiting.\n");
    pthread_exit(NULL);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

找回味觉 2024-12-20 12:21:02

你想要做的事情根本不可能。 fork 从根本上来说与同步不兼容。即使您可以在子进程中可靠地重新创建线程,它们也会有新的线程 ID,因此它们不会成为它们应该拥有的锁的所有者。

执行检查点的唯一方法是使用高级操作系统支持。这必须包括单独的 pid 命名空间,以便程序的检查点副本将具有相同的 pid,并且其所有线程将具有相同的线程 id。即使如此,如果它正在与其他进程或外部世界进行通信,它也将无法工作。我相信有一些工具可以在 Linux 上执行此操作,但我对它们并不熟悉,此时您已经达到了 hack 的水平,可以询问是否有更好的方法来实现您正在尝试的目标去做。

What you're trying to do is simply impossible. fork is fundamentally incompatible with synchronization. Even if you could recreate threads reliably in the child process, they would have new thread ids, and thus they would not be the owners of the locks they're supposed to own.

The only way to do checkpointing is with advanced operating system support for it. This would have to include separate pid namespaces, so that the checkpointed copy of the program would have the same pid, and all its threads would have the same thread ids. Even then, if it's performing communication with other processes or the outside world, it won't work. I believe there are some tools for doing this on Linux, but I'm not familiar with them, and at this point you're getting to level of hacks where it's appropriate to ask if there's a better way to achieve what you're trying to do.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文