Project Reactor源码阅读-flatMap
功能介绍
public final <V> Flux<V> flatMap(Function<? super T,? extends Publisher<? extends V>> mapper, int concurrency, int prefetch)

将外部Publisher发射出来的元素转换成内部Publisher,然后将这些内部的Publisher合并到一个Flux中,允许元素交错。
- mapper:转换函数,将外部Publisher发射出来的元素转换成一个新的Publisher。
- concurrency:针对外部Publisher的最大请求数量,同时也是scalarQueue队列大小。可通过reactor.bufferSize.x属性配置,默认32。
- prefetch:针对内部Publisher的最大请求数量,同时也是innerQueue队列大小。可通过reactor.bufferSize.small属性配置,默认256。
代码示例
public Flux<Integer> flat(int delayMillis, int i) { return delayPublishFlux(delayMillis, i * 10, i * 10 + 5); } @Test public void test() { delayPublishFlux(100, 1, 6) .doOnRequest(r -> logLong(r, "main-request")) .flatMap((i) -> flat(1000, i).doOnRequest(r -> logLong(r, "inner-request")) .subscribeOn(Schedulers.newElastic("inner")), 3, 2) .subscribe(i -> logInt(i, "消费")); sleep(10000); }


可以看到mian-request最大是3,inner-request最大是2。证明了concurrency和prefetch的作用。
源码分析
首先看一下flatMap()操作符在装配阶段做了什么。
Flux#flatMap()

创建FluxFlatMap对象,它既是Subscriber,又是Subscription。
另外还创建了2个队列Supplier,第一个是以concurrency为大小创建的mainQueueSupplier(创建scalarQueue),另一个是以prefetch为大小的创建的innerQueueSupplier。接下来查看订阅阶段发生了什么。
Flux#subscribe()

因为FluxFlatMap实现了OptimizableOperator接口,实际的Subscriber是通过调用subscribeOrReturn()返回的。
FlatMapMain#onSubscribe()

这里最关键的是调用request()向上游请求数据,请求数量是maxConcurrency。这正是flatMap()方法传入的concurrency。当数据下发时,肯定会调用onNext()。
FlatMapMain#onNext()

- 将当前元素转换成一个Publisher。
- 如果转换后的Publisher是Callable,则直接获取元素调用tryEmitScalar()下发。
- 否则创建FlatMapInner对象,用它来订阅Publisher。
在前面的方法定义中提到过:flatMap()支持并发处理,允许元素交错。
看到这里,我们能得出一个推论:flatMap()是否真的会并发处理,取决转换后的Publisher是否支持异步订阅,即p.subscribe(innner)是否异步执行。
代码验证
首先回顾一下前面讲过的publishOn()和subScribeOn()工作机制。
publishOn():在onNext()、onComplete()、onError()方法进行线程切换,publishOn()使得它下游的消费阶段异步执行。 subScribeOn():在subscribe的时候进行线程切换,subscribeOn()使得它上游的订阅阶段以及整个消费阶段异步执行。
同步执行(不会发生交错)
@Test public void testSync() { delayPublishFlux(100, 1, 6) .flatMap((i) -> flat(1000, i), 3, 2) .subscribe(i -> logInt(i, "消费")); sleep(10000); }
重点是去掉了flat()的subscribeOn()调用。

同步执行,元素没有发生交错。
subscribeOn()异步执行
@Test public void testSubscribeOn() { delayPublishFlux(100, 1, 6) .flatMap((i) -> flat(1000, i).subscribeOn(Schedulers.newElastic("inner")), 3, 2) .subscribe(i -> logInt(i, "消费")); sleep(10000); }

异步执行,并且此时最大并发是3。
publishOn()异步执行
@Test public void testPublishOn() { delayPublishFlux(100, 1, 6) .flatMap((i) -> flat(10, i) .publishOn(Schedulers.newElastic("inner")) // 故意让下游执行慢一点 .doOnNext(x -> sleep(1000)), 3, 2) .subscribe(i -> logInt(i, "消费")); sleep(10000); }

异步执行,并且此时最大并发是3。
通过以上代码可以得知,如果内部Publisher生产数据慢,推荐使用subscribeOn()。如果只是内部Publisher消费速度慢,推荐使用publishOn()。如果生产都消费都慢的话,两个操作符一起使用。
在调用内部Publisher的subscribe()方法之后,后续肯定会执行FlatMapInner#onSubscribe()。
FlatMapInner#onSubscribe()

与publishOn类似,FlatMapInner也支持同步队列融合、异步队列融合以及非融合三种处理方式。
如果上游的Subscription是QueueSubscription类型,则会进行队列融合。具体采用同步还是异步,取决于该QueueSubscription#requestFusion()实现。
- 同步队列融合:复用当前队列,然后直接调用FlatMapMain#drain()排空队列。
- 异步队列融合:复用当前队列,然后调用上游s.request()请求数据,请求数量是prefetch。
- 非融合:直接调用上游s.request()请求数据,请求数量也是prefetch。
非融合
以下代码会非融合方式执行。(和SubscribeOn()异步执行逻辑是一样的)
@Test public void testNoFused() { delayPublishFlux(100, 1, 6) .flatMap((i) -> flat(100, i) .subscribeOn(Schedulers.newElastic("inner")), 3, 2) .subscribe(i -> { // 消费慢一点,innerQueue更容易有数据积压 sleep(1000); logInt(i, "消费"); }); sleep(10000); }
FlatMapInner#onNext()

此时onNext()方法入参就是内部Publisher实际下发的元素,继续调用FlatMapMain#tryEmit()下发。
FlatMapMain#tryEmit()

上面代码逻辑其实主要由两种情况组成:(后面两大块逻辑是差不多的)
- 可能直接调用下游Subscriber#onNext()继续下发元素。
- 创建队列,然后将元素加入队列,视情况调用drainLoop()排空队列。
具体取决于数据生产以及消费的速度: 1、如果消费速度大于生产速度,没有数据积压,则直接调用下游Subscriber#onNext()进行下发。 2、如果消费速度跟不上生产速度,元素会直接先保存到innerQueue中,然后在wip==0时调用drainLoop()排空队列。
FlatMapMain#tryEmitScalar()
前面提到过,如果转换后的Publisher是Callable,会执行tryEmitScalar()方法。该方法做的事情跟tryEmit()处理逻辑基本一致,主要差别就是处理的元素和使用的队列不同。
tryEmitScalar()处理的元素是内部Publisher直接调用call()获取的,而tryEmit()是内部Publisher向下游发送的。tryEmitScalar()使用scalarQueue缓存元素,而tryEmit()使用innerQueue。
FlatMapMain#drainLoop()
drainLoop()逻辑非常多,截取部分关键代码:



- 排空innerQueue中的元素,下发给下游。
- 请求内部Publisher下发元素,请求数量就是本次innerQueue排出数量。
- 请求外部Publisher下发元素,请求数量是可补充数量,不会超过concurrency。
另外还有一些完成和取消控制。
同步队列融合
以下代码会以同步队列融合方式执行。
@Test public void testSyncFused() { delayPublishFlux(100, 1, 6) .flatMap((i) -> flatRange(i), 3, 2) .subscribe(i -> logInt(i, "消费")); sleep(10000); }
复用当前队列,然后直接调用FlatMapMain.drain()排空队列。
FlatMapMain#drain()

依然是调用drainLoop()排空队列中的元素。注意,同步队列融合没有request()过程,直接在onSubscribe()阶段进行元素下发。
异步队列融合
以下代码会以异步队列融合方式执行。
@Test public void testAsyncFused() { delayPublishFlux(100, 1, 6) .flatMap((i) -> flatRange(i).publishOn(Schedulers.newElastic("inner")), 3, 2) .subscribe(i -> logInt(i, "消费")); sleep(10000); }
复用当前队列,然后调用上游s.request()请求数据,请求数量是prefetch。后续肯定会调用FlatMapInner#onNext()。
FlatMapInner#onNext()

此时入参为null,然后调用了FlatMapMain.drain(),排空队列元素。异步队列融合会复用队列,上游实际发送是null,可以将其理解成一个信号,告知下游排空队列中的元素。
#Java##程序员#