如何在未来取消时终止 Callable 中的 CXF Web 服务调用

发布于 2024-07-26 09:00:18 字数 2840 浏览 6 评论 0原文

编辑

这个问题现在已经经历了几次迭代,所以请随意浏览修订版本,以了解有关历史和尝试过的事情的一些背景信息。


我将 CompletionService 与 ExecutorService 和 Callable 一起使用,通过 CXF 生成的代码同时调用几个不同 Web 服务上的多个函数。这些服务都向我使用的一组信息提供不同的信息我的项目。 然而,服务可能会在很长一段时间内无法响应而不抛出异常,从而延长了对组合信息集的等待时间。

为了解决这个问题,我同时运行所有服务调用,几分钟后想终止任何尚未完成的调用,最好从可调用内部或通过抛出来记录哪些尚未完成详细的异常。

下面是一些高度简化的代码来说明我已经在做什么:

private Callable<List<Feature>> getXXXFeatures(final WiwsPortType port, 
final String accessionCode) {
    return new Callable<List<Feature>>() {
        @Override
        public List<Feature> call() throws Exception {
            List<Feature> features = new ArrayList<Feature>();
            //getXXXFeatures are methods of the WS Proxy
            //that can take anywhere from second to never to return
            for (RawFeature raw : port.getXXXFeatures(accessionCode)) {
                Feature ft = convertFeature(raw);
                features.add(ft);
            }
            if (Thread.currentThread().isInterrupted())
                log.error("XXX was interrupted");
            return features;
        }
    };
}

以及同时启动 WS 调用的代码:

WiwsPortType port = new Wiws().getWiws();
List<Future<List<Feature>>> ftList = new ArrayList<Future<List<Feature>>>();
//Counting wrapper around CompletionService, 
    //so I could implement ccs.hasRemaining()
CountingCompletionService<List<Feature>> ccs = 
        new CountingCompletionService<List<Feature>>(threadpool);
ftList.add(ccs.submit(getXXXFeatures(port, accessionCode)));
ftList.add(ccs.submit(getYYYFeatures(port accessionCode)));
ftList.add(ccs.submit(getZZZFeatures(port, accessionCode)));

List<Feature> allFeatures = new ArrayList<Feature>();
while (ccs.hasRemaining()) {
            //Low for testing, eventually a little more lenient
    Future<List<Feature>> polled = ccs.poll(5, TimeUnit.SECONDS);
    if (polled != null)
        allFeatures.addAll(polled.get());
    else {
        //Still jobs remaining, but unresponsive: Cancel them all
        int jobsCanceled = 0;
        for (Future<List<Feature>> job : ftList)
            if (job.cancel(true))
                jobsCanceled++;
        log.error("Canceled {} feature jobs because they took too long",
                        jobsCanceled);
        break;
    }
}

我在这段代码中遇到的问题是,在等待 port.getXXXFeatures(.) 时,Callables 实际上并未被取消。 ..) 返回,但以某种方式继续运行。 正如您从 if (Thread.currentThread().isInterrupted()) log.error("XXX was Interrupted"); 语句中看到的,中断标志在之后设置的port.getFeatures 返回,只有在 Webservice 调用正常完成后才可用,而不是在我调用 Cancel 时被中断。

谁能告诉我我做错了什么以及如何在给定时间段后停止正在运行的 CXF Webservice 调用,并在我的应用程序中注册此信息?

最好的问候,蒂姆

Edit

This question has gone through a few iterations by now, so feel free to look through the revisions to see some background information on the history and things tried.


I'm using a CompletionService together with an ExecutorService and a Callable, to concurrently call the a number of functions on a few different webservices through CXF generated code.. These services all contribute different information towards a single set of information I'm using for my project. The services however can fail to respond for a prolonged period of time without throwing an exception, prolonging the wait for the combined set of information.

To counter this I'm running all the service calls concurrently, and after a few minutes would like to terminate any of the calls that have not yet finished, and preferably log which ones weren't done yet either from within the callable or by throwing an detailed Exception.

Here's some highly simplified code to illustrate what I'm doing already:

private Callable<List<Feature>> getXXXFeatures(final WiwsPortType port, 
final String accessionCode) {
    return new Callable<List<Feature>>() {
        @Override
        public List<Feature> call() throws Exception {
            List<Feature> features = new ArrayList<Feature>();
            //getXXXFeatures are methods of the WS Proxy
            //that can take anywhere from second to never to return
            for (RawFeature raw : port.getXXXFeatures(accessionCode)) {
                Feature ft = convertFeature(raw);
                features.add(ft);
            }
            if (Thread.currentThread().isInterrupted())
                log.error("XXX was interrupted");
            return features;
        }
    };
}

And the code that concurrently starts the WS calls:

WiwsPortType port = new Wiws().getWiws();
List<Future<List<Feature>>> ftList = new ArrayList<Future<List<Feature>>>();
//Counting wrapper around CompletionService, 
    //so I could implement ccs.hasRemaining()
CountingCompletionService<List<Feature>> ccs = 
        new CountingCompletionService<List<Feature>>(threadpool);
ftList.add(ccs.submit(getXXXFeatures(port, accessionCode)));
ftList.add(ccs.submit(getYYYFeatures(port accessionCode)));
ftList.add(ccs.submit(getZZZFeatures(port, accessionCode)));

List<Feature> allFeatures = new ArrayList<Feature>();
while (ccs.hasRemaining()) {
            //Low for testing, eventually a little more lenient
    Future<List<Feature>> polled = ccs.poll(5, TimeUnit.SECONDS);
    if (polled != null)
        allFeatures.addAll(polled.get());
    else {
        //Still jobs remaining, but unresponsive: Cancel them all
        int jobsCanceled = 0;
        for (Future<List<Feature>> job : ftList)
            if (job.cancel(true))
                jobsCanceled++;
        log.error("Canceled {} feature jobs because they took too long",
                        jobsCanceled);
        break;
    }
}

The problem I'm having with this code is that the Callables aren't actually canceled when waiting for port.getXXXFeatures(...) to return, but somehow keep running. As you can see from the if (Thread.currentThread().isInterrupted()) log.error("XXX was interrupted"); statements the interrupted flag is set after port.getFeatures returns, this is only available after the Webservice call completes normally, instead of it having been interrupted when I called Cancel.

Can anyone tell me what I am doing wrong and how I can stop the running CXF Webservice call after a given time period, and register this information in my application?

Best regards, Tim

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

我是男神闪亮亮 2024-08-02 09:00:18

编辑 3 新答案。

我看到以下选项:

  • 将您的问题作为功能请求发布在 Apache CXF 上
  • 自行修复 ACXF 并公开一些功能。
  • 在 Apache CXF 中查找异步 WS 调用支持的选项
  • 考虑切换到不同的 WS 提供程序(JAX-WS?)
  • 如果服务支持 RESTful API,您的 WS 是否使用 RESTful API 调用自己(例如带有参数的纯 HTTP 请求)
  • For仅限超级专家:使用真正的线程/线程组并使用非正统方法杀死线程。

Edit 3 New answer.

I see these options:

  • Post your problem on the Apache CXF as feature request
  • Fix ACXF yourself and expose some features.
  • Look for options for asynchronous WS call support within the Apache CXF
  • Consider switching to a different WS provider (JAX-WS?)
  • Do your WS call yourself using RESTful API if the service supports it (e.g. plain HTTP request with parameters)
  • For über experts only: use true threads/thread group and kill the threads with unorthodox methods.
过潦 2024-08-02 09:00:18

CXF 文档有一些关于设置 HTTPURLConnection 读取超时的说明:
http://cwiki.apache.org/CXF20DOC/ client-http-transport-include-ssl-support.html

这可能会满足您的需求。 如果服务器没有及时响应,则会引发异常,并且可调用函数将收到异常。 (除了有一个错误可能会挂起。我不记得这个问题是否已在 2.2.2 中修复,或者是否仅存在于快照中。)

The CXF docs have some instructions for setting the read timeout on the HTTPURLConnection:
http://cwiki.apache.org/CXF20DOC/client-http-transport-including-ssl-support.html

That would probably meet your needs. If the server doesn't respond in time, an exception is raised and the callable would get the exception. (except there is a bug where is MAY hang instead. I cannot remember if that was fixed for 2.2.2 or if it's just in the SNAPSHOTS right now.)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文