如何在未来取消时终止 Callable 中的 CXF Web 服务调用
编辑
这个问题现在已经经历了几次迭代,所以请随意浏览修订版本,以了解有关历史和尝试过的事情的一些背景信息。
我将 CompletionService 与 ExecutorService 和 Callable 一起使用,通过 CXF 生成的代码同时调用几个不同 Web 服务上的多个函数。这些服务都向我使用的一组信息提供不同的信息我的项目。 然而,服务可能会在很长一段时间内无法响应而不抛出异常,从而延长了对组合信息集的等待时间。
为了解决这个问题,我同时运行所有服务调用,几分钟后想终止任何尚未完成的调用,最好从可调用内部或通过抛出来记录哪些尚未完成详细的异常。
下面是一些高度简化的代码来说明我已经在做什么:
private Callable<List<Feature>> getXXXFeatures(final WiwsPortType port,
final String accessionCode) {
return new Callable<List<Feature>>() {
@Override
public List<Feature> call() throws Exception {
List<Feature> features = new ArrayList<Feature>();
//getXXXFeatures are methods of the WS Proxy
//that can take anywhere from second to never to return
for (RawFeature raw : port.getXXXFeatures(accessionCode)) {
Feature ft = convertFeature(raw);
features.add(ft);
}
if (Thread.currentThread().isInterrupted())
log.error("XXX was interrupted");
return features;
}
};
}
以及同时启动 WS 调用的代码:
WiwsPortType port = new Wiws().getWiws();
List<Future<List<Feature>>> ftList = new ArrayList<Future<List<Feature>>>();
//Counting wrapper around CompletionService,
//so I could implement ccs.hasRemaining()
CountingCompletionService<List<Feature>> ccs =
new CountingCompletionService<List<Feature>>(threadpool);
ftList.add(ccs.submit(getXXXFeatures(port, accessionCode)));
ftList.add(ccs.submit(getYYYFeatures(port accessionCode)));
ftList.add(ccs.submit(getZZZFeatures(port, accessionCode)));
List<Feature> allFeatures = new ArrayList<Feature>();
while (ccs.hasRemaining()) {
//Low for testing, eventually a little more lenient
Future<List<Feature>> polled = ccs.poll(5, TimeUnit.SECONDS);
if (polled != null)
allFeatures.addAll(polled.get());
else {
//Still jobs remaining, but unresponsive: Cancel them all
int jobsCanceled = 0;
for (Future<List<Feature>> job : ftList)
if (job.cancel(true))
jobsCanceled++;
log.error("Canceled {} feature jobs because they took too long",
jobsCanceled);
break;
}
}
我在这段代码中遇到的问题是,在等待 port.getXXXFeatures(.) 时,Callables 实际上并未被取消。 ..) 返回,但以某种方式继续运行。 正如您从 if (Thread.currentThread().isInterrupted()) log.error("XXX was Interrupted");
语句中看到的,中断标志是在之后设置的port.getFeatures 返回,只有在 Webservice 调用正常完成后才可用,而不是在我调用 Cancel 时被中断。
谁能告诉我我做错了什么以及如何在给定时间段后停止正在运行的 CXF Webservice 调用,并在我的应用程序中注册此信息?
最好的问候,蒂姆
Edit
This question has gone through a few iterations by now, so feel free to look through the revisions to see some background information on the history and things tried.
I'm using a CompletionService together with an ExecutorService and a Callable, to concurrently call the a number of functions on a few different webservices through CXF generated code.. These services all contribute different information towards a single set of information I'm using for my project. The services however can fail to respond for a prolonged period of time without throwing an exception, prolonging the wait for the combined set of information.
To counter this I'm running all the service calls concurrently, and after a few minutes would like to terminate any of the calls that have not yet finished, and preferably log which ones weren't done yet either from within the callable or by throwing an detailed Exception.
Here's some highly simplified code to illustrate what I'm doing already:
private Callable<List<Feature>> getXXXFeatures(final WiwsPortType port,
final String accessionCode) {
return new Callable<List<Feature>>() {
@Override
public List<Feature> call() throws Exception {
List<Feature> features = new ArrayList<Feature>();
//getXXXFeatures are methods of the WS Proxy
//that can take anywhere from second to never to return
for (RawFeature raw : port.getXXXFeatures(accessionCode)) {
Feature ft = convertFeature(raw);
features.add(ft);
}
if (Thread.currentThread().isInterrupted())
log.error("XXX was interrupted");
return features;
}
};
}
And the code that concurrently starts the WS calls:
WiwsPortType port = new Wiws().getWiws();
List<Future<List<Feature>>> ftList = new ArrayList<Future<List<Feature>>>();
//Counting wrapper around CompletionService,
//so I could implement ccs.hasRemaining()
CountingCompletionService<List<Feature>> ccs =
new CountingCompletionService<List<Feature>>(threadpool);
ftList.add(ccs.submit(getXXXFeatures(port, accessionCode)));
ftList.add(ccs.submit(getYYYFeatures(port accessionCode)));
ftList.add(ccs.submit(getZZZFeatures(port, accessionCode)));
List<Feature> allFeatures = new ArrayList<Feature>();
while (ccs.hasRemaining()) {
//Low for testing, eventually a little more lenient
Future<List<Feature>> polled = ccs.poll(5, TimeUnit.SECONDS);
if (polled != null)
allFeatures.addAll(polled.get());
else {
//Still jobs remaining, but unresponsive: Cancel them all
int jobsCanceled = 0;
for (Future<List<Feature>> job : ftList)
if (job.cancel(true))
jobsCanceled++;
log.error("Canceled {} feature jobs because they took too long",
jobsCanceled);
break;
}
}
The problem I'm having with this code is that the Callables aren't actually canceled when waiting for port.getXXXFeatures(...) to return, but somehow keep running. As you can see from the if (Thread.currentThread().isInterrupted()) log.error("XXX was interrupted");
statements the interrupted flag is set after port.getFeatures returns, this is only available after the Webservice call completes normally, instead of it having been interrupted when I called Cancel.
Can anyone tell me what I am doing wrong and how I can stop the running CXF Webservice call after a given time period, and register this information in my application?
Best regards, Tim
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
编辑 3 新答案。
我看到以下选项:
For仅限超级专家:使用真正的线程/线程组并使用非正统方法杀死线程。Edit 3 New answer.
I see these options:
For über experts only: use true threads/thread group and kill the threads with unorthodox methods.CXF 文档有一些关于设置 HTTPURLConnection 读取超时的说明:
http://cwiki.apache.org/CXF20DOC/ client-http-transport-include-ssl-support.html
这可能会满足您的需求。 如果服务器没有及时响应,则会引发异常,并且可调用函数将收到异常。 (除了有一个错误可能会挂起。我不记得这个问题是否已在 2.2.2 中修复,或者是否仅存在于快照中。)
The CXF docs have some instructions for setting the read timeout on the HTTPURLConnection:
http://cwiki.apache.org/CXF20DOC/client-http-transport-including-ssl-support.html
That would probably meet your needs. If the server doesn't respond in time, an exception is raised and the callable would get the exception. (except there is a bug where is MAY hang instead. I cannot remember if that was fixed for 2.2.2 or if it's just in the SNAPSHOTS right now.)