一、简介
Project Reactor 为JVM 提供了一个完全非阻塞的编程基础。它提供了Reactive Streams 规范的实现,并提供了可组合的异步API,例如Flux。Flux 是具有多个反应式运算符的反应式流发布者。它发出0 到N 个元素,然后成功或错误地完成。它可以根据我们的需要以几种不同的方式创建。
2. 理解通量
Flux 是一个Reactive Stream 发布者,可以发出0 到N 个元素。它有几个运算符用于生成、编排和转换Flux 序列。Flux 可以成功完成,也可以完成但有错误。
Flux API 在Flux 上提供了几个静态工厂方法来创建源或从多个回调类型生成。它还提供实例方法和运算符来构建异步处理管道。该管道产生一个异步序列。
在接下来的部分中,让我们看看Fluxgenerate()
和create()
方法的一些用法。
3. Maven依赖
我们需要[reactor-core](https://search.maven.org/search?q=g:io.projectreactor%20AND%20a:reactor-core&core=gav)
和reactor-test
Maven 依赖项:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.17</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.17</version>
<scope>test</scope>
</dependency>
4.通量生成
Flux API 的generate()
方法提供了一种简单直接的编程方法来创建Flux。generate()
方法采用生成器函数来生成一系列项目。
生成方法有三种变体:
generate(Consumer<SynchronousSink<T>> generator)
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
generate 方法根据需要计算并发出值。最好在计算下游可能不使用的元素的成本太高的情况下使用。如果发出的事件受应用程序状态的影响,也可以使用它。
4.1。例子
在这个例子中,让我们使用generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
来生成一个Flux
:
public class CharacterGenerator {
public Flux<Character> generateCharacters() {
return Flux.generate(() -> 97, (state, sink) -> {
char value = (char) state.intValue();
sink.next(value);
if (value == 'z') {
sink.complete();
}
return state + 1;
});
}
}
在generate()
方法中,我们提供了两个函数作为参数:
第一个是
Callable
函数。此函数定义生成器的初始状态,值为97第二个是
BiFunction.
这是一个使用SynchronousSink.
每当调用接收器的next
方法时,此SynchronousSink 都会返回一个项目
根据其名称,SynchronousSink
实例同步工作。但是,我们不能在每个生成器调用中多次调用SynchronousSink
对象的next
方法。
让我们使用StepVerifier
验证生成的序列:
@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
CharacterGenerator characterGenerator = new CharacterGenerator();
Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);
StepVerifier.create(characterFlux)
.expectNext('a', 'b', 'c')
.expectComplete()
.verify();
}
在此示例中,订阅者仅请求三个项目。因此,生成的序列以发出三个字符——a、b 和c 结束。expectNext()
期望从Flux 中得到我们期望的元素。expectComplete
() 表示从Flux 发射元素的完成。
5.通量创建
当我们想要create()
**不受应用程序状态影响的多个(0 到无穷大)值**时,使用Flux 中的create() 方法。这是因为Fluxcreate()
方法的底层方法不断计算元素。
此外,下游系统决定了它需要多少元素。因此,如果下游系统无法跟上,已经发出的元素要么被缓冲要么被移除。
默认情况下,发出的元素会被缓冲,直到下游系统请求更多元素。
5.1。例子
现在让我们演示create()
方法的示例:
public class CharacterCreator {
public Consumer<List<Character>> consumer;
public Flux<Character> createCharacterSequence() {
return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
}
}
我们可以注意到create
运算符要求我们使用FluxSink
而不是generate
() 中使用的SynchronousSink
。在这种情况下,我们将为items
列表中的每个项目调用next()
,逐个发出。
现在让我们使用带有两个字符序列的CharacterCreator
:
@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
CharacterGenerator characterGenerator = new CharacterGenerator();
List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();
}
我们在上面的代码片段中创建了两个序列——sequence1
和sequence2
。这些序列用作字符项的来源。请注意,我们使用CharacterGenerator
实例来获取字符序列。
现在让我们定义一个characterCreator
实例和两个线程实例:
CharacterCreator characterCreator = new CharacterCreator();
Thread producerThread1 = new Thread(() -> characterCreator.consumer.accept(sequence1));
Thread producerThread2 = new Thread(() -> characterCreator.consumer.accept(sequence2));
我们现在正在创建两个线程实例,它们将为发布者提供元素。当调用接受运算符时,字符元素开始流入序列源。接下来,我们subscribe
新的合并序列:
List<Character> consolidated = new ArrayList<>();
characterCreator.createCharacterSequence().subscribe(consolidated::add);
请注意,createCharacterSequence
返回一个我们订阅的Flux 并使用consolidated
列表中的元素。接下来,让我们触发查看项目在两个不同线程上移动的整个过程:
producerThread1.start();
producerThread2.start();
producerThread1.join();
producerThread2.join();
最后,让我们验证一下操作的结果:
assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
接收到的序列中的前三个字符来自sequence1.
最后两个字符来自sequence2
。由于这是一个异步操作,因此无法保证这些序列中元素的顺序。
6. Flux Create vs. Flux Generate
以下是create 和generate 操作之间的一些区别:
通量创建 | 通量生成 |
---|---|
此方法接受Consumer<FluxSink> 的实例 | 此方法接受Consumer<SynchronousSink> 的实例 |
Create 方法只调用消费者一次 | 生成方法根据下游应用的需要多次调用消费者方法 |
消费者可以立即发出0..N 个元素 | 只能发射一种元素 |
发布者不知道下游状态。因此create 接受溢出策略作为流量控制的附加参数 | 发布者根据下游应用需求生成元素 |
FluxSink 允许我们在需要时使用多个线程发出元素 | 对多线程没有用,因为它一次只发出一个元素 |
7. 结论
在本文中,我们讨论了Flux API 的create 和generate 方法之间的区别。
首先,我们介绍了反应式编程的概念并讨论了Flux API。然后我们讨论了Flux API 的create 和generate 方法。最后,我们提供了Flux API 的create 和generate 方法之间的差异列表。
0 评论