Disruptor线程队列_WorkProcessor异常_FatalExceptionHandler

在使用disruptor的过程中,disruptor队列偶尔发现会报错,就是报的:


_FatalExceptionHandler这种错误,其实:


在disruptor中,它把异常给封装了:可以去看源码,我们知道


在disruptor中工作线程是com.lmax.disruptor.WorkProcessor#run 在这个方法中,


可以看到有个try ,catch,可以看到捕获到异常以后,


这里用exceptionHandler.handleEventException这里处理的,而:handleEventException这个方法


调用的最终实际上是,exceptionHandler的实现类中的方法:


            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }


Disruptor线程队列_WorkProcessor异常_FatalExceptionHandler


可以看到源码中:ExceptionHandler的实现类com.lmax.disruptor.FatalExceptionHandler,它的com.lmax.disruptor.FatalExceptionHandler#handleEventException方法如下:


    @Override
    public void handleEventException(final Throwable ex, final long sequence, final Object event)
    {
        logger.log(Level.SEVERE, "Exception processing: " + sequence + " " + event, ex);

        throw new RuntimeException(ex);
    }


Disruptor线程队列_WorkProcessor异常_FatalExceptionHandler


可以看到它仅仅做了个异常的抛出.打印了一下异常.


抛出异常,就会造成执行线程com.lmax.disruptor.WorkProcessor执行失败,如果消费消息异常比较多的话,基本上消费线程会很快被干掉,最终导致没有消费线程。


这样QPS就会不断下降,直到最后QPS能力降为零.


怎么办呢?很简单其实:


就是在消费的逻辑中,用try catch把逻辑保护起来,有异常,打印不要抛出就可以了..


另外还需要注意:


  1. BlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,被唤醒后,再循环检查依赖的sequence是否已经消费。
  2. BusySpinWaitStrategy:线程一直自旋等待,可能比较耗cpu
  3. YieldingWaitStrategy:尝试100次,然后Thread.yield()让出cpu


YieldingWaitStrategy一般都是这个,无锁的相当于,而BlockingWaitStrategy 这个会阻塞,慢一些,但是


在一些特殊场景,比如你希望数据一批一批处理的时候可以用.


然后再来看一下:Disruptor中的异常的结构源码就更清晰了:


Disruptor
1.可以看到它封装了一个ExceptionHandler 接口
public interface ExceptionHandler
{
    /**
     * 

Strategy for handling uncaught exceptions when processing an event.

* *

If the strategy wishes to terminate further processing by the {@link BatchEventProcessor} * then it should throw a {@link RuntimeException}.

* * @param ex the exception that propagated from the {@link EventHandler}. * @param sequence of the event which cause the exception. * @param event being processed when the exception occurred. This can be null. */ void handleEventException(Throwable ex, long sequence, T event); /** * Callback to notify of an exception during {@link LifecycleAware#onStart()} * * @param ex throw during the starting process. */ void handleOnStartException(Throwable ex); /** * Callback to notify of an exception during {@link LifecycleAware#onShutdown()} * * @param ex throw during the shutdown process. */ void handleOnShutdownException(Throwable ex); } 它是个接口,定义了异常抛出时回调的方法。 2. Disruptor中默认使用ExceptionHandler的ExceptionHandlerWrapper实现,3所示,使用了代理,委托内部的delete来处理。 2. public class Disruptor { private final RingBuffer ringBuffer; private final Executor executor; private final ConsumerRepository consumerRepository = new ConsumerRepository<>(); private final AtomicBoolean started = new AtomicBoolean(false); private ExceptionHandler<? super T> exceptionHandler = new ExceptionHandlerWrapper<>(); ... 3. public class ExceptionHandlerWrapper implements ExceptionHandler { private ExceptionHandler<? super T> delegate = new FatalExceptionHandler(); public void switchTo(final ExceptionHandler<? super T> exceptionHandler) { this.delegate = exceptionHandler; } ... 除了ExceptionHandlerWrapper这个实现,还有其它FatalExceptionHandler、IgnoreExceptionHandler,IgnoreExceptionHandler只是将异常打印下,但是FatalExceptionHandler则不同,正如其名字所示,它内部将异常再次封装到RuntimeException再次抛出,如下4 4 public final class FatalExceptionHandler implements ExceptionHandler { private static final Logger LOGGER = Logger.getLogger(FatalExceptionHandler.class.getName()); private final Logger logger; public FatalExceptionHandler() { this.logger = LOGGER; } public FatalExceptionHandler(final Logger logger) { this.logger = logger; } @Override public void handleEventException(final Throwable ex, final long sequence, final Object event) { logger.log(Level.SEVERE, "Exception processing: " + sequence + " " + event, ex); throw new RuntimeException(ex); } ... 如果使用了FatalExceptionHandler,运行中抛出异常,那么会时Disruptor线程阻塞的。ExceptionHandlerWrapper和BatchEventProcessor中exceptionHandler都默认使用了FatalExceptionHandler,所以要特别注意,如果异常抛出多了就会导致消费停止.


Disruptor线程队列_WorkProcessor异常_FatalExceptionHandler



发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章