一、概述
Java 的Streams API 是一种功能强大且用途广泛的数据处理工具。根据定义,流式操作是对一组数据的单次迭代。
但是,有时我们希望以不同的方式处理流的某些部分并获得不止一组结果。
在本教程中,我们将学习如何将流拆分为多个组并独立处理它们。
2. 使用收集器
一个Stream 应该被操作一次并且有一个终端操作。它可以有多个中间操作,但数据只能在关闭之前收集一次。
这意味着Streams API 规范明确禁止分叉流并为每个分叉提供不同的中间操作。这将导致多个终端操作。但是,我们可以在终端操作里面拆分流。这会创建一个分为两个或更多组的结果。
2.1。使用partitioningBy
进行二进制拆分
如果我们想将一个流一分为二,我们可以使用Collectors
类中的partitioningBy
。它接受一个Predicate
并返回一个Map
,它将满足Boolean
true
键下的谓词和false
下的其余元素分组。
假设我们有一个文章列表,其中包含有关它们应该发布到的目标站点以及它们是否应该被推荐的信息。
List<Article> articles = Lists.newArrayList(
new Article("Baeldung", true),
new Article("Baeldung", false),
new Article("Programming Daily", false),
new Article("The Code", false));
我们将它分为两组,一组仅包含Baeldung 文章,第二组包含其余文章:
Map<Boolean, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.partitioningBy(a -> a.target.equals("Baeldung")));
我们看看地图中的true
键下分别归档false
哪些文章:
assertThat(groupedArticles.get(true)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(false)).containsExactly(
new Article("Programming Daily", false),
new Article("The Code", false));
2.2.使用groupingBy
拆分
如果我们想要更多的类别,那么我们需要使用groupingBy
方法。它需要一个函数,将每个元素分类到一个组中。然后它返回一个将每个组分类器链接到其元素集合的Map
。
假设我们要按目标站点对文章进行分组。返回的Map
将具有包含站点名称的键和包含与给定站点关联的文章集合的值:
Map<String, List<Article>> groupedArticles = articles.stream()
.collect(Collectors.groupingBy(a -> a.target));
assertThat(groupedArticles.get("Baeldung")).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get("Programming Daily")).containsExactly(new Article("Programming Daily", false));
assertThat(groupedArticles.get("The Code")).containsExactly(new Article("The Code", false));
3. 使用teeing
从Java 12 开始,我们为二进制拆分提供了另一种选择。我们可以使用teeing
收集器。teeing
将两个收集器组合成一个复合材料。每个元素都由它们处理,然后使用提供的合并函数合并为单个返回值。
3.1。与Predicate
teeing
teeing
收集器与Collectors
类中的另一个收集器很好地配对,称为filtering
。它接受一个谓词并使用它来过滤处理过的元素,然后将它们传递给另一个收集器。
让我们将文章分为Baeldung 和非Baeldung 的组并计算它们。我们还将使用List
构造函数作为合并函数:
List<Long> countedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.counting()),
Collectors.filtering(article -> !article.target.equals("Baeldung"), Collectors.counting()),
List::of));
assertThat(countedArticles.get(0)).isEqualTo(2);
assertThat(countedArticles.get(1)).isEqualTo(2);
3.2.处理eeing
结果
此解决方案与以前的解决方案之间存在一个重要区别。我们之前创建的组没有重叠,源流中的每个元素最多属于一个组。使用teeing,
我们不再受此限制的约束,因为每个收集器都可能处理整个流。让我们看看如何利用它。
我们可能希望将文章分为两组,一组仅包含特色文章,第二组仅包含Baeldung 文章。生成的文章集可能会重叠,因为一篇文章可以同时成为Baeldung 的特色和目标。
这次我们将它们收集到列表中,而不是计数:
List<List<Article>> groupedArticles = articles.stream().collect(Collectors.teeing(
Collectors.filtering(article -> article.target.equals("Baeldung"), Collectors.toList()),
Collectors.filtering(article -> article.featured, Collectors.toList()),
List::of));
assertThat(groupedArticles.get(0)).hasSize(2);
assertThat(groupedArticles.get(1)).hasSize(1);
assertThat(groupedArticles.get(0)).containsExactly(
new Article("Baeldung", true),
new Article("Baeldung", false));
assertThat(groupedArticles.get(1)).containsExactly(new Article("Baeldung", true));
4. 使用RxJava
虽然Java 的Streams API 是一个有用的工具,但有时它还不够。其他解决方案,例如RxJava 提供的响应式流,可能能够帮助我们。让我们看一个简短的示例,说明如何使用Observable
和多个Subscribers
来实现与Stream
示例相同的结果。
4.1。创建一个Observable
首先,我们需要从文章列表中创建一个Observable
实例。我们可以使用Observable
类的from
factory 方法:
Observable<Article> observableArticles = Observable.from(articles);
4.2.过滤Observables
接下来,我们需要创建将过滤文章的Observables
。为此,我们将使用Observable
类中的filter
方法:
Observable<Article> baeldungObservable = observableArticles.filter(
article -> article.target.equals("Baeldung"));
Observable<Article> featuredObservable = observableArticles.filter(
article -> article.featured);
4.3.创建多个Subscribers
最后,我们需要订阅Observables
并提供一个Action
来描述我们想要对文章做什么。一个真实的例子是将它们保存在数据库中或将它们发送给客户端,但我们将满足于将它们添加到列表中:
List<Article> baeldungArticles = new ArrayList<>();
List<Article> featuredArticles = new ArrayList<>();
baeldungObservable.subscribe(baeldungArticles::add);
featuredObservable.subscribe(featuredArticles::add);
5. 结论
在本教程中,我们学习了如何将流拆分为组并分别处理它们。首先,我们查看了较旧的Streams API 方法:groupingBy
和partitionBy
。接下来,我们使用了一种更新的方法,利用了Java 12 中引入的teeing
方法。最后,我们研究了如何使用RxJava 来获得具有更大弹性的类似结果。
0 评论