要在异步调用上实现优雅关闭检查锁定,或处理异常?

发布于 2024-11-27 19:31:33 字数 2351 浏览 1 评论 0原文

我正在开发一个 Java 应用程序,在大部分时间(包括关闭时)都必须处理来自外部框架的大量传入异步调用。在正常操作期间,这些传入调用需要再次异步分派到另一个框架。

目前,我让我的模块成为一个“好”公民,并围绕关闭标志进行一些锁定,该标志一旦设置,将优雅地停止发送任何进一步的传出呼叫。

令人不安的是,因为传入和传出调用都是异步的,所以我必须使每个“工作人员”任务执行两组锁定(见下文)以执行相同的关闭标志检查(编辑 :有人在另一个问题中向我指出,使用信号量每个工作人员只需要一次获取/释放)。它有效,但有许多工作任务需要处理,我担心性能的累积下降。一旦框架稍微扩展一下,分析就会很快出现,但无论结果如何,遵循最佳实践都是很好的。

另一种方法是简单地不进行关闭标志检查锁定,并处理在异步调用完成处理之前关闭外部框架时生成的预期异常。我应该补充一点,如果采取这种方法,不会对运营产生不利影响。两种方法都会导致干净关闭。

请问您认为哪种做法更好?没有例外的严厉锁定,与没有锁定但有一系列例外的情况不同。

使用锁,工作任务代码看起来像这样:

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private boolean isShutdown;

private void workerTask()
{
   try
   {
      shutdownLock.readLock().lock();

      if (isShutdown)
         return;

      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               shutdownLock.readLock().lock();

               if (isShutdown)
                  return;

               // Do something here.
            }
            finally
            {
               shutdownLock.readLock().unlock();
            }
         }
      });
   }
   finally
   {
      shutdownLock.readLock().unlock();
   }
}

shutdown 方法请求 shutdownLock.writeLock(),然后设置 isShutdown 标志。

不锁定和预期关闭生成的异常的替代方案如下所示:

volatile private boolean isShutdown;

private void workerTask()
{
   try
   {
      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               // Do something here.
            }
            catch (final FrameworkRuntimeException exception)
            {
               if ((! isShutdown) || (exception.type != 
                                      FrameworkRuntimeException.FrameworkIsDisposed))
                  throw exception;
            }
         }
      });
   }
   catch (final FrameworkRuntimeException exception)
   {
      if ((! isShutdown) || (exception.type != 
                             FrameworkRuntimeException.FrameworkIsDisposed))
         throw exception;
   }
}

此实现的关闭方法将 易失性 isShutdown 标志设置为 true。

预先感谢您的任何反馈,

Russ

编辑:在另一个问题中有人向我指出,我可以使用信号量来避免第一种方法中的双重锁定,所以情况不会如此终究是严厉的,但问题仍然存在。

I'm developing a Java app that for much of the time, including the point of shutdown, is having to deal with a flood of incoming asynchronous calls from a foreign framework. During normal operation these incoming calls then need to be dispatched to another framework, again asynchronously.

At the moment I'm having my module be a "good" citizen and do some locking around a shutdown flag which, once set, will gracefully cease the dispatch of any further outgoing calls.

The troubling thing is that because both incoming and outgoing calls are asynchronous, I'm having to make each "worker" task perform two sets of locking (see below) to do the same shutdown flag check (EDIT: It's been pointed out to me in another question that using Semaphores only one acquire/release is needed for each worker). It works, but there are many of these worker tasks to handle and I worry about the cumulative slowdown in performance. Profiling will come soon once the framework is expanded a little but regardless of the result it'd be good to follow best practices.

An alternative is to simply do no shutdown flag check locking and handle the anticipated exceptions that are generated when the external frameworks are shutdown before the async calls have finished processing. I should add that there are no detrimental operational effects if this approach is taken. Both methods will result in a clean shutdown.

Your ideas on which is the better practice, please? Heavy-handed locking with no exceptions, versus no locking but a barrage of exceptions.

With locks, the worker task code looks something like this:

final private ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private boolean isShutdown;

private void workerTask()
{
   try
   {
      shutdownLock.readLock().lock();

      if (isShutdown)
         return;

      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               shutdownLock.readLock().lock();

               if (isShutdown)
                  return;

               // Do something here.
            }
            finally
            {
               shutdownLock.readLock().unlock();
            }
         }
      });
   }
   finally
   {
      shutdownLock.readLock().unlock();
   }
}

The shutdown method requests the shutdownLock.writeLock(), then sets the isShutdown flag.

An alternative without locking and anticipating the shutdown-generated exceptions looks something like this:

volatile private boolean isShutdown;

private void workerTask()
{
   try
   {
      executeAsynchronously(new Runnable()
      {
         @Override
         final public void run()
         {
            try
            {
               // Do something here.
            }
            catch (final FrameworkRuntimeException exception)
            {
               if ((! isShutdown) || (exception.type != 
                                      FrameworkRuntimeException.FrameworkIsDisposed))
                  throw exception;
            }
         }
      });
   }
   catch (final FrameworkRuntimeException exception)
   {
      if ((! isShutdown) || (exception.type != 
                             FrameworkRuntimeException.FrameworkIsDisposed))
         throw exception;
   }
}

The shutdown method for this implementation sets the volatile isShutdown flag to true.

Thanks in advance for any feedback,

Russ

EDIT: It's been helpfully pointed out to me in another question that I could use a Semaphore to avoid the double locking in the first approach, so it wouldn't be so heavy-handed after all, but the question still stands.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

青巷忧颜 2024-12-04 19:31:33

一般来说,我更喜欢检查关闭然后执行任务的方法。如果您乐观地丢弃您“知道”由于关闭而导致的异常,那么您将面临错误分类错误并错过真正问题的风险。

就简化代码而言,您可以摆脱所有锁定,只需确保您的 executeAsynchronously 方法使用 ExecutorService - 然后您的 shutdown 方法只是在服务上调用 shutdown ,如果 isShutdown 返回 true 则可以跳过任务创建,如果您需要等待任务完成后再返回,您可以使用有帮助的awaitTermination 方法。

In general I would favour the approach where you check for shutdown, then execute the task. If you optimistically and then throw away exceptions that you 'know' are due to shutdown then you run the risk of misclassifying an error and missing out on a real problem.

As far as simplifying the code goes, you can get rid of all the locks and just make sure that your executeAsynchronously method uses an ExecutorService - then your shutdown method just calls shutdown on the service, the task creation can be skipped if isShutdown returns true and if you need to wait for tasks to finish before returning you can use the helpful awaitTermination method.

掩耳倾听 2024-12-04 19:31:33

好吧,我认为这里应该可以工作,并且不会妨碍太多的运行时开销(注意这里是凌晨 4.30,所以最好仔细检查一下;))。另请注意,添加一些好的 try{}finally{} 代码块将是一个非常好的主意,但为了清楚起见,将其省略。

public static final AtomicInteger activeConnections = new AtomicInteger();
public static volatile boolean shutdown = false;

public static void shutdown() {
    shutdown = true;
    while (activeConnections.get() > 0) {
        synchronized(activeConnections) {
            try {
                activeConnections.wait();
            }
            catch(InterruptedException e) {
            }
        }
    }
    // proceed shutdown
}

public static void run() {
    if (shutdown) return;
    activeConnections.incrementAndGet();
    if (shutdown) {
        leave();
        return;
    }
    // do stuff
    leave();
}

private static void leave() {
    int outstandingConnections = activeConnections.decrementAndGet();
    if (shutdown && outstandingConnections == 0) {
        synchronized(activeConnections) {
            activeConnections.notifyAll();
        }
    }       
}

一旦设置了关闭标志,就没有新线程开始工作。每个线程在与外部框架通信时都会递增一个整数,并在完成时递减它。只有当没有线程再进行通信时,关闭才会继续 - 请注意,由于首先设置了关闭标志,因此不会再启动新线程。

这样你就可以获得相当轻量级的 AtomicInteger(它是作为 CAS 循环实现的,你无法获得更低的开销)和易失性内存屏障。

现在我仍然坚持我的第一条评论,并说捕获异常更简单、更高效、更短,但我喜欢这个问题:)

Ok so I think this here should work and not impede too much of a runtime overhead (note it's 4.30 in the morning here, so better double check ;) ). Also note that throwing in some good try{} finally{} code blocks would be a pretty good idea, but omitted for clarity.

public static final AtomicInteger activeConnections = new AtomicInteger();
public static volatile boolean shutdown = false;

public static void shutdown() {
    shutdown = true;
    while (activeConnections.get() > 0) {
        synchronized(activeConnections) {
            try {
                activeConnections.wait();
            }
            catch(InterruptedException e) {
            }
        }
    }
    // proceed shutdown
}

public static void run() {
    if (shutdown) return;
    activeConnections.incrementAndGet();
    if (shutdown) {
        leave();
        return;
    }
    // do stuff
    leave();
}

private static void leave() {
    int outstandingConnections = activeConnections.decrementAndGet();
    if (shutdown && outstandingConnections == 0) {
        synchronized(activeConnections) {
            activeConnections.notifyAll();
        }
    }       
}

As soon as the shutdown flag is set, no new thread starts working. Every thread increments an integer while communicating with the external framework and decrements it when it's finished. The shutdown may only proceed as soon as no thread is communicating anymore - note that since the shutdown flag is set at first no new thread will start anymore.

That way you get the pretty lightweight AtomicInteger (that's implemented as a CAS loop, you can't get much lower overhead) and the volatile memory barrier.

Now I'm still standing to my first comment and say that it's simpler, more efficient and shorter to catch the exceptions, but I liked the problem :)

单身狗的梦 2024-12-04 19:31:33

如果您使用 spring,下面的代码可以正常关闭。您可以更改重试次数。

package com.avea.vpspg.test.schedulers;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import com.avea.vpspg.core.VasProvLogger;

@Component
class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> , ApplicationContextAware,BeanPostProcessor{


private ApplicationContext context;

public Logger logger = XProvLogger.getInstance().x;

public void onApplicationEvent(ContextClosedEvent event) {


    Map<String, ThreadPoolTaskScheduler> schedulers = context.getBeansOfType(ThreadPoolTaskScheduler.class);

    for (ThreadPoolTaskScheduler scheduler : schedulers.values()) {         
        scheduler.getScheduledExecutor().shutdown();
        try {
            scheduler.getScheduledExecutor().awaitTermination(20000, TimeUnit.MILLISECONDS);
            if(scheduler.getScheduledExecutor().isTerminated() || scheduler.getScheduledExecutor().isShutdown())
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has stoped");
            else{
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has not stoped normally and will be shut down immediately");
                scheduler.getScheduledExecutor().shutdownNow();
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has shut down immediately");
            }
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    Map<String, ThreadPoolTaskExecutor> executers = context.getBeansOfType(ThreadPoolTaskExecutor.class);

    for (ThreadPoolTaskExecutor executor: executers.values()) {
        int retryCount = 0;
        while(executor.getActiveCount()>0 && ++retryCount<51){
            try {
                logger.info("Executer "+executor.getThreadNamePrefix()+" is still working with active " + executor.getActiveCount()+" work. Retry count is "+retryCount);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if(!(retryCount<51))
            logger.info("Executer "+executor.getThreadNamePrefix()+" is still working.Since Retry count exceeded max value "+retryCount+", will be killed immediately");
        executor.shutdown();
        logger.info("Executer "+executor.getThreadNamePrefix()+" with active " + executor.getActiveCount()+" work has killed");
    }
}


@Override
public void setApplicationContext(ApplicationContext context)
        throws BeansException {
    this.context = context;

}


@Override
public Object postProcessAfterInitialization(Object object, String arg1)
        throws BeansException {
    return object;
}


@Override
public Object postProcessBeforeInitialization(Object object, String arg1)
        throws BeansException {
    if(object instanceof ThreadPoolTaskScheduler)
        ((ThreadPoolTaskScheduler)object).setWaitForTasksToCompleteOnShutdown(true);
    if(object instanceof ThreadPoolTaskExecutor)
        ((ThreadPoolTaskExecutor)object).setWaitForTasksToCompleteOnShutdown(true);
    return object;
}

if you use spring below code works for gracefull shutdown. You may change the retry numbers.

package com.avea.vpspg.test.schedulers;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Logger;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;

import com.avea.vpspg.core.VasProvLogger;

@Component
class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> , ApplicationContextAware,BeanPostProcessor{


private ApplicationContext context;

public Logger logger = XProvLogger.getInstance().x;

public void onApplicationEvent(ContextClosedEvent event) {


    Map<String, ThreadPoolTaskScheduler> schedulers = context.getBeansOfType(ThreadPoolTaskScheduler.class);

    for (ThreadPoolTaskScheduler scheduler : schedulers.values()) {         
        scheduler.getScheduledExecutor().shutdown();
        try {
            scheduler.getScheduledExecutor().awaitTermination(20000, TimeUnit.MILLISECONDS);
            if(scheduler.getScheduledExecutor().isTerminated() || scheduler.getScheduledExecutor().isShutdown())
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has stoped");
            else{
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has not stoped normally and will be shut down immediately");
                scheduler.getScheduledExecutor().shutdownNow();
                logger.info("Scheduler "+scheduler.getThreadNamePrefix() + " has shut down immediately");
            }
        } catch (IllegalStateException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    Map<String, ThreadPoolTaskExecutor> executers = context.getBeansOfType(ThreadPoolTaskExecutor.class);

    for (ThreadPoolTaskExecutor executor: executers.values()) {
        int retryCount = 0;
        while(executor.getActiveCount()>0 && ++retryCount<51){
            try {
                logger.info("Executer "+executor.getThreadNamePrefix()+" is still working with active " + executor.getActiveCount()+" work. Retry count is "+retryCount);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        if(!(retryCount<51))
            logger.info("Executer "+executor.getThreadNamePrefix()+" is still working.Since Retry count exceeded max value "+retryCount+", will be killed immediately");
        executor.shutdown();
        logger.info("Executer "+executor.getThreadNamePrefix()+" with active " + executor.getActiveCount()+" work has killed");
    }
}


@Override
public void setApplicationContext(ApplicationContext context)
        throws BeansException {
    this.context = context;

}


@Override
public Object postProcessAfterInitialization(Object object, String arg1)
        throws BeansException {
    return object;
}


@Override
public Object postProcessBeforeInitialization(Object object, String arg1)
        throws BeansException {
    if(object instanceof ThreadPoolTaskScheduler)
        ((ThreadPoolTaskScheduler)object).setWaitForTasksToCompleteOnShutdown(true);
    if(object instanceof ThreadPoolTaskExecutor)
        ((ThreadPoolTaskExecutor)object).setWaitForTasksToCompleteOnShutdown(true);
    return object;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文