Java8 关于CompletableFuture的介绍
1. non-blocking操作
这里的asynchronous操作即non-blocking操作,非阻塞式的操作,表示主线程可以提交task给独立的线程,独立的线程可以自行运行该task,主线程则会继续运行别的业务逻辑代码。
提交的task可以是Runnable实现,也可以是Callable实现(有返回值):
2. 通过ExecutorService进行task的提交
ExecutorService
提交Callable的task后,可以通过future.get()
拿到task的执行结果,get()
方法是阻塞式的获取结果,即如果该task还没有执行完毕,此时main thread需要等待,直到task执行完毕并返回结果。
ExecutorService存在的问题是如果一次性提交了4个tasks,那么在拿结果的时候,需要依次拿(future.get()),但这个方法是阻塞式的,假如task3或4提交完成了,也需要等待main thread从task1先拿到结果:
更为复杂的例子,假如处理1个order分5个步骤,即拿到order、补齐order必要的属性、付款、发送order,最后是邮件发送:
代码如下,因为get是阻塞式的,所以for循环里order1的处理,会影响后续order的速度,因为main thread有可能会卡在order1,而这时候后续的order可能已经处理完毕,这也使得使用多线程处理变的没有必要,因为order的处理顺序依旧是线性执行的:
public void orderLifeCycle() { ExecutorService service = Executors.newFixedThreadPool(10); for (int i = 0; i < 5; i ++) { try { Future<Order> future = service.submit(getOrderTask()); Order order = future.get(); // 阻塞 Future<Order> future1 = service.submit(enrichTask(order)); order = future1.get(); // 阻塞 Future<Order> future2 = service.submit(performPaymentTask(order)); order = future2.get(); // 阻塞 Future<Order> future3 = service.submit(dispatchTask(order)); order = future3.get(); // 阻塞 Future<Order> future4 = service.submit(sendEmailTask(order)); order = future4.get(); // 阻塞 } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
而我们想要的效果是,order本身的flow是一个整体(虽然它可能分了很多个Callable步骤),但order与order之间,不应该相互等待,比如order1可以由某个线程执行并正在执行sendEmail方法,而order2可以由另外一个线程执行并正在执行payment方法,即每个order都是独立的流水线(independent flows):
3. CompletableFuture
可以使用CompletableFuture
来实现上述想要的期望:
public void orderSubmit() { for (int i =0; i < 5; i ++) { CompletableFuture.supplyAsync(() -> getOrder()) .thenApply(order -> enrich(order)) .thenApply(order -> performPayment(order)) .thenApply(order -> dispatch(order)) .thenAccept(order -> sendEmail(order)); } }
3.1 supplyAsync方法
supplyAsync()
方法是CompletableFuture
中的static
方法,从定义可以看到它接受Supplier
接口的参数,Supplier
接口位于java的java.util.function
包中,也是Java 8引入的函数式接口,与之配套的还有另外两个接口:Function
和Consumer
,具体来说:
Supplier
接口:不接收参数,但返回一个对象(用消息组件来类比,可以看作是生产者)。Function
接口:接收参数后,再返回另一个对象(用消息组件来类比,即接收一个消息,进行处理后再发送至另一个地方)。Consumer
接口:接收参数,但不会再返回对象(用消息组件来类比,可以看作是消费者)。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(ASYNC_POOL, supplier); }
在我们order的例子中,supplyAsync(() -> getOrder())
即通过线程来处理拿到一个order的业务,然后返回。
supplyAsync()
除了接收suppilier函数外,还可接收线程池,即传入自定义的线程池来处理。如果没有指定,则会创建一个:Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor()
。
3.2 thenApply方法
thenApply()
方法接收的是Function
接口,Function
接口在上面介绍过了,它能接收一个参数,并返回一个结果。在我们的order例子中,它接收了getOrder()
返回的order参数,并进行处理后(通过方法enrich(order)
)再返回order结果。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); }
与thenApply()
方法类似的还有一个方法叫thenApplyAsync()
,两者的区别是:
thenApply()
会沿用上一个方法相同的线程。thenApplyAsync()
会使用另一个线程来执行方法体内的task,也可以传入线程池。
具体来说,假设我们的getOrder()
方法是IO开销比较大的(比如需要从DB里查询),而我们的enrich(order)
方法只是一些计算相关,即CPU开销比较大,那么我们在执行的时候,可能会用两个不同的线程池来执行,IO开销大的可以设置线程数多一些。
public void orderSubmit() { ExecutorService cpuService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ExecutorService ioService = Executors.newCachedThreadPool(); for (int i =0; i < 5; i ++) { CompletableFuture.supplyAsync(() -> getOrder(), ioService) .thenApplyAsync(order -> enrich(order), cpuService) .thenApply(order -> performPayment(order)) .thenApply(order -> dispatch(order)) .thenAccept(order -> sendEmail(order)); } }
3.3 thenAccept方法
thenAccept()
方法接收Comsumer
接口(接收一个参数,但没有返回值)。
与thenAccept()
方法相对的,有thenAcceptAsync()
方法,同样的,该方法能传入一个线程池,使得方法里的task可以在传入的线程池中进行提交执行。
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); }
3.4 exceptionally方法
异常处理。如果getOrder()
或enrich(order)
或performPayment(order)
中出现了一些异常,我们可以使用exceptionally()
来捕获异常,然后返回别的类,这样dispatch(order)
方法就知道在前面的步骤中出现了异常(比如拿到参数后先进行类型判断,是normal的Order
还是FailedOrder
。
CompletableFuture.supplyAsync(() -> getOrder(), ioService) .thenApplyAsync(order -> enrich(order), cpuService) .thenApply(order -> performPayment(order)) .exceptionally(e -> new FailedOrder()) .thenApply(order -> dispatch(order)) .thenAccept(order -> sendEmail(order));
3.5 其它方法,如thenCombine
这个可以连接两个stage的结果。但这个写起来比较复杂,推荐java的另一个框架,叫Reactor
框架,实现起来可能更加便捷,代码也更易读。具体看第5章。
4. Order例子的总结
CompletableFuture
中suplyAsnc()
或thenApply()
或thenAccept()
都不会阻塞main thread,如果我们有100个order flow需要处理,那么每个order的流程都不会相互阻塞。
这里强调的是这三个方法本身,而不是最终通过get()方法拿结果。因为例子中最后的sendEmail()
用的是thenAccept,这里接收的是Comsumer函数,即并没有返回值。这个例子强调的是每组CompletebleFuture之间,是不受影响的。
上述order flow的例子完整的代码如下:
@Slf4j public class OrderTest { @Test public void orderFlow() throws InterruptedException { for (int i =0; i < 5; i ++) { final int id = i; CompletableFuture.supplyAsync(() -> getOrder(id)) .thenApplyAsync(order -> enrich(order)) .thenApply(order -> performPayment(order)) .thenApply(order -> dispatch(order)) .thenAccept(order -> sendEmail(order)); } log.info("main thread done"); Thread.sleep(10000L); } public Order getOrder(int id) { Order order = new Order(id, 0); try { long sleepMs = new Random().nextLong(5000); log.info("try to sleep {} ms for order [{}].", sleepMs, order.getId()); Thread.sleep(sleepMs); } catch (InterruptedException e) { throw new RuntimeException(e); } return order; } public Order enrich(Order order) { order.setVersion(order.version + 1); return order; } public Order performPayment(Order order) { order.setVersion(order.version + 1); return order; } public Order dispatch(Order order) { order.setVersion(order.version + 1); return order; } public void sendEmail(Order order) { log.info("send email for order [{}] with version = {}", order.getId(), order.getVersion()); } @AllArgsConstructor @Data static class Order { private int id; private int version; } static class FailedOrder { } }
控制台打印:
开启了5个order flow,每个flow都不受彼此的干扰。
5. 另外的一些例子
这个例子的需求是有两个远程调用(remoteCall1和remoteCall2),彼此不受影响,在两个结束的时候通过combine操作,再用get()方法拿到结果:有评论说get()是否是阻塞,是的,这个方法是阻塞的。
@Slf4j public class TeaTest { @Test public void remoteCallTest() { log.info("main thread started..."); CompletableFuture<String> remoteCall1 = CompletableFuture.supplyAsync(() -> { sleep(2000L); log.info("remote call 1 finished..."); return "i am remote call1"; }); CompletableFuture<String> remoteCall2 = CompletableFuture.supplyAsync(() -> { sleep(2000L); log.info("remote call 2 finished..."); return "i am remote call2"; }); CompletableFuture<String> combineResult = remoteCall1.thenCombine(remoteCall2, (out1, out2) -> out1 + ", " + out2); // do some other thing here log.info("main thread started to do some other thing..."); try { log.info("Get result: {}", combineResult.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } log.info("here: need wait for getting finished..."); sleep(100000); } public static void sleep(long ms) { try { Thread.sleep(ms); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
控制台打印: