Disruptor - 未调用事件处理程序
我正在使用 Disruptor 框架,发现我的事件处理程序没有被调用。
这是我的设置代码:
private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private void initializeDisruptor() {
if (disruptor != null)
return;
disruptor =
new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(searchTermMatchingHandler)
.then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);
this.ringBuffer = disruptor.start();
}
在其他地方,我发布事件。我尝试了以下两种方法:
事件发布方法 A:
private void handleStatus(final Status status)
{
long sequence = ringBuffer.next();
TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
ringBuffer.publish(sequence);
}
在这种情况下,我发现第一个 EventHandler
被调用,但除此之外没有任何其他方法。
事件发布方法 B:
private void handleStatus(final Status status)
{
disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {
@Override
public TwitterStatusReceivedEvent translateTo(
TwitterStatusReceivedEvent event, long sequence) {
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
return event;
}
});
}
在这种情况下,我发现根本没有调用任何事件处理程序。
我做错了什么?
更新
这是我的完整事件处理程序。我应该如何表示处理已完成?
public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {
@Override
public void onEvent(TwitterStatusReceivedEvent event, long sequence,
boolean endOfBatch) throws Exception {
String statusText = event.getStatus().getText();
for (Instrument instrument : event.getSearchInstruments())
{
if (statusText.contains(instrument.getSearchTerm()))
{
event.setMatchedInstrument(instrument);
break;
}
}
}
}
I'm playing around with the Disruptor framework, and am finding that my event handlers are not being invoked.
Here's my setup code:
private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private void initializeDisruptor() {
if (disruptor != null)
return;
disruptor =
new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(searchTermMatchingHandler)
.then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);
this.ringBuffer = disruptor.start();
}
Elsewhere, I publish events. I've tried each of the following two approaches:
Event Publishing Approach A:
private void handleStatus(final Status status)
{
long sequence = ringBuffer.next();
TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
ringBuffer.publish(sequence);
}
In this scenario, I find the the first EventHandler
gets invoked, but never anything beyond that.
Event Publishing Approach B:
private void handleStatus(final Status status)
{
disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {
@Override
public TwitterStatusReceivedEvent translateTo(
TwitterStatusReceivedEvent event, long sequence) {
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
return event;
}
});
}
In this scenario, I find that none of the event handlers get invoked at all.
What am I doing wrong?
Update
Here's my EventHandler in it's entirety. How should I be signalling that processing is complete?
public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {
@Override
public void onEvent(TwitterStatusReceivedEvent event, long sequence,
boolean endOfBatch) throws Exception {
String statusText = event.getStatus().getText();
for (Instrument instrument : event.getSearchInstruments())
{
if (statusText.contains(instrument.getSearchTerm()))
{
event.setMatchedInstrument(instrument);
break;
}
}
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
每个事件处理程序都需要在自己的线程中运行,直到您关闭干扰器为止,该线程不会退出。由于您使用的是单线程执行程序,因此只有恰好执行的第一个事件处理程序才会运行。 (Disruptor 类将每个处理程序存储在哈希图中,因此最终运行的处理程序会有所不同)
如果您切换到cachedThreadPool,您应该会发现它全部开始运行。您不需要对序列号进行任何管理,因为这一切都是由 Disruptor 类为您设置和管理的 EventProcessor 处理的。仅处理您收到的每个事件是完全正确的。
Each event handler needs to run in its own thread which wont exit until you shutdown the disruptor. Since you're using a single threaded executor, only the first event handler that happens to execute will ever run. (The Disruptor class stores each handler in a hashmap so which handler winds up running will vary)
If you switch to a cachedThreadPool you should find it all starts running. You won't need to do any management of the sequence numbers because that's all handled by the EventProcessor that the Disruptor class sets up and manages for you. Just processing each event you get is exactly right.
您需要确保您的 searchTermMatchingHandler 在处理事件后更新其序列号。更下游的事件处理程序(appendStatusHandler、updatePriceHandler、persistUpdatesHandler)将检查 searchTermMatchingHandler 序列号,以查看它们可以从环形缓冲区中获取哪些事件。
You need to make sure your searchTermMatchingHandler is updating its sequence number after it processes the event. The EventHandlers further downstream (appendStatusHandler, updatePriceHandler, persistUpdatesHandler) will be inspecting the searchTermMatchingHandler sequence number to see which events they can pick up off the ring buffer.
我遇到了同样的问题,但这是因为我使用 Spring(Java 配置)实例化 Disruptor,并使用与 Disruptor 相同的 @Bean 方法实例化 Executor。
我通过在单独的 @Bean 方法中实例化 Executor 解决了该问题。
I had the same problem, but it was because I was instantiating the Disruptor using Spring (Java config) and was instantiating the Executor in the same @Bean method as the Disruptor.
I fixed the problem by instantiating the Executor in a separate @Bean method.