Java-多线程-CompletionService(优先处理)

java.util.concurrent.CompletionService 是对 ExecutorService 的一个功能增强封装,优化了获取异步操作结果的接口。

假设我们要向线程池提交一批任务,并获取任务结果。一般的方式是提交任务后,从线程池得到一批 Future 对象集合,然后依次调用其 get() 方法。进行阻塞所有线程执行完毕,然后按线程执行的顺序获取到结果

这里有个问题:因为我们会要按固定的顺序来遍历 Future 元素,而 get() 方法又是阻塞的,因此如果某个 Future 对象执行时间太长,会使得我们的遍历过程阻塞在该元素上,无法及时从后面早已完成的 Future 当中取得结果。

CompletionService 解决了这个问题。下面介绍如何创建和使用 CompletionService。

CompletionService 本身不包含线程池,创建它的实例之前,先要创建一个 ExecutorService。下面是一个例子:

ExecutorService executor = Executors.newFixedThreadPool(4);CompletionService completionService = new ExecutorCompletionService<>(executor);

向 CompletionService 提交任务的方式与 ExecutorService 一样: completionService.submit(() -> "Hello");

两者的区别在于取结果的方式。有了 CompletionService,你不需要再持有 Future 集合。如果要得到最早的执行结果,只需要像下面这样:

String result = completionService.take().get(); 这个 take() 方法返回的是最早完成的任务的结果,这个就解决了一个任务被另一个任务阻塞的问题。下面是一个封装好的例子:

//不用等待所有线程执行完毕,而是谁先执行完毕,就返回谁的结果 ,以此类推,等待全部线程执行完毕    public  static void createCompletionServicesAll(String key, List callables, Consumer< Future> consumer)  {         CompletionService<?> completionService = new ExecutorCompletionService<>(getExecutor(key));        for (Callable callable : callables) {             completionService.submit(callable);        }        try {             for (int i = 0; i < callables.size(); i++) {                 consumer.accept(completionService.take());            }        } catch (InterruptedException e) {             e.printStackTrace();        }    }    //一堆线程同时执行,谁先执行完毕那么就采用谁的结果,其余线程结果不管    public  static Object createCompletionServicesOne(String key, List callables) throws ExecutionException {         CompletionService<?> completionService = new ExecutorCompletionService<>(getExecutor(key));        for (Callable callable : callables) {             completionService.submit(callable);        }        Object result=null;        try {             result=completionService.take().get();        } catch (InterruptedException e) {             e.printStackTrace();        }        return result;    }

测试

@Test    public  void show(){         List callables=new LinkedList<>();        for (int i = 0; i < 100; i++) {             int finalI = i;            callables.add(()->{ //                throw  new Exception("---");                int i1 = new Random().nextInt(100);                Thread.sleep(i1 );                System.out.println("睡眠:"+i1+"返回结果:"+finalI);                return finalI;            });        }//        Collections.shuffle(callables);        ExecutorUtils.createCompletionServicesAll("test",callables,(o)->{             try {                 System.out.println(o.get()); //如果某个线程出现异常则抛出异常我们这里可以捕捉到            } catch (InterruptedException | ExecutionException e) {                 e.printStackTrace();            }        });    }    @Test    public  void show1(){         List callables=new LinkedList<>();        for (int i = 0; i < 100; i++) {             int finalI = i;            callables.add(()->{ //                throw  new Exception("---");                int i1 = new Random().nextInt(1000);                Thread.sleep(i1);                System.out.println("睡眠:"+i1+"返回结果:"+finalI);                return finalI;            });        }        try {             //获取第一个执行完毕的结果            Integer test = (Integer)ExecutorUtils.createCompletionServicesOne("test", callables);            System.out.println("result:"+test);        } catch (ExecutionException e) {             e.printStackTrace();        }    }

点赞 -收藏-关注-便于以后复习和收到最新内容,有其他问题在评论区讨论-或者私信我-收到会在第一时间回复,内容仅供学习、交流与参考

免责声明:本文部分素材来源于网络,版权归原创者所有,如存在文章/图片/音视频等使用不当的情况,请随时私信联系我、以迅速采取适当措施,避免给双方造成不必要的经济损失。感谢,配合,希望我的努力对你有帮助^_^

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章