在使用disruptor的过程中,disruptor队列偶尔发现会报错,就是报的:
_FatalExceptionHandler这种错误,其实:
在disruptor中,它把异常给封装了:可以去看源码,我们知道
在disruptor中工作线程是com.lmax.disruptor.WorkProcessor#run 在这个方法中,
可以看到有个try ,catch,可以看到捕获到异常以后,
这里用exceptionHandler.handleEventException这里处理的,而:handleEventException这个方法
调用的最终实际上是,exceptionHandler的实现类中的方法:
catch (final Throwable ex)
{
// handle, mark as processed, unless the exception handler threw an exception
exceptionHandler.handleEventException(ex, nextSequence, event);
processedSequence = true;
}可以看到源码中:ExceptionHandler的实现类com.lmax.disruptor.FatalExceptionHandler,它的com.lmax.disruptor.FatalExceptionHandler#handleEventException方法如下:
@Override
public void handleEventException(final Throwable ex, final long sequence, final Object event)
{
logger.log(Level.SEVERE, "Exception processing: " + sequence + " " + event, ex);
throw new RuntimeException(ex);
}可以看到它仅仅做了个异常的抛出.打印了一下异常.
抛出异常,就会造成执行线程com.lmax.disruptor.WorkProcessor执行失败,如果消费消息异常比较多的话,基本上消费线程会很快被干掉,最终导致没有消费线程。
这样QPS就会不断下降,直到最后QPS能力降为零.
怎么办呢?很简单其实:
就是在消费的逻辑中,用try catch把逻辑保护起来,有异常,打印不要抛出就可以了..
另外还需要注意:
YieldingWaitStrategy一般都是这个,无锁的相当于,而BlockingWaitStrategy 这个会阻塞,慢一些,但是
在一些特殊场景,比如你希望数据一批一批处理的时候可以用.
然后再来看一下:Disruptor中的异常的结构源码就更清晰了:
Disruptor
1.可以看到它封装了一个ExceptionHandler 接口
public interface ExceptionHandler
{
/**
* Strategy for handling uncaught exceptions when processing an event.
*
* If the strategy wishes to terminate further processing by the {@link BatchEventProcessor}
* then it should throw a {@link RuntimeException}.
*
* @param ex the exception that propagated from the {@link EventHandler}.
* @param sequence of the event which cause the exception.
* @param event being processed when the exception occurred. This can be null.
*/
void handleEventException(Throwable ex, long sequence, T event);
/**
* Callback to notify of an exception during {@link LifecycleAware#onStart()}
*
* @param ex throw during the starting process.
*/
void handleOnStartException(Throwable ex);
/**
* Callback to notify of an exception during {@link LifecycleAware#onShutdown()}
*
* @param ex throw during the shutdown process.
*/
void handleOnShutdownException(Throwable ex);
}
它是个接口,定义了异常抛出时回调的方法。
2. Disruptor中默认使用ExceptionHandler的ExceptionHandlerWrapper实现,3所示,使用了代理,委托内部的delete来处理。
2.
public class Disruptor
{
private final RingBuffer ringBuffer;
private final Executor executor;
private final ConsumerRepository consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>();
...
3.
public class ExceptionHandlerWrapper implements ExceptionHandler
{
private ExceptionHandler<? super T> delegate = new FatalExceptionHandler();
public void switchTo(final ExceptionHandler<? super T> exceptionHandler)
{
this.delegate = exceptionHandler;
}
...
除了ExceptionHandlerWrapper这个实现,还有其它FatalExceptionHandler、IgnoreExceptionHandler,IgnoreExceptionHandler只是将异常打印下,但是FatalExceptionHandler则不同,正如其名字所示,它内部将异常再次封装到RuntimeException再次抛出,如下4
4
public final class FatalExceptionHandler implements ExceptionHandler
| 留言与评论(共有 0 条评论) “” |