一、简介
Guava 为我们提供了ListenableFuture
一个基于默认 JavaFuture.
让我们看看我们如何利用它来发挥我们的优势。
2. Future
,ListenableFuture
和Futures
让我们简要地看看这些不同的类是什么以及它们是如何相互关联的。
2.1. Future
从Java 5,
我们可以使用java.util.concurrent.Future
来表示异步任务。
Future
允许我们访问已经完成或将来可能完成的任务的结果,并支持取消它们。
2.2. ListenableFuture
java.util.concurrent.Future
删除时的一个功能是添加侦听器以在完成时运行,这是大多数流行的异步框架提供的常见功能。
番石榴通过允许我们将侦听器附加到它的[com.google.common.util.concurrent.ListenableFuture](https://guava.dev/releases/29.0-jre/api/docs/com/google/common/util/concurrent/ListenableFuture.html "可聽未來") .
2.3. Futures
Guava 为我们提供了便利类com.google.common.util.concurrent.Futures
以使其更容易使用ListenableFuture.
该类提供了多种与ListenableFuture,
交互的方式,其中包括支持添加成功/失败的会话,并可以通过我们聚合或转换来协商多个未来。
3. 简单使用
现在让我们看看如何以最简单的方式ListenableFuture
创建和添加头像。
3.1. 创造ListenableFuture
**ListenableFuture
的最简单方法是向ListeningExecutorService
**提交任务(很像我们如何使用普通的ExecutorService
来获取普通的Future
):
ExecutorService execService = Executors.newSingleThreadExecutor(); ListeningExecutorService lExecService = MoreExecutors.listeningDecorator(execService); ListenableFuture<Integer> asyncTask = lExecService.submit(() -> { TimeUnit.MILLISECONDS.sleep(500); // long running task return 5; });
请注意我们如何使用MoreExecutors
类将ExecutorService
装饰为ListeningExecutorService.
我们可以参考线程池在 Guava 中的实现来了解更多MoreExecutors
。
如果我们已经有一个返回Future
的 API 并且我们需要将它转化为ListenableFuture
,这很容易通过初始化它的具体实现来完成ListenableFutureTask:
// old api public FutureTask<String> fetchConfigTask(String configKey) { return new FutureTask<>(() -> { TimeUnit.MILLISECONDS.sleep(500); return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE)); }); } // new api public ListenableFutureTask<String> fetchConfigListenableTask(String configKey) { return ListenableFutureTask.create(() -> { TimeUnit.MILLISECONDS.sleep(500); return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE)); }); }
我们需要注意,我们可以自己将它们提交给直接Executor.
与ListenableFutureTask
交互不会经常使用,只有在极端情况下完成(例如:实现我们的ExecutorService
)。实际使用请参考 Guava 的[AbstractListeningExecutorService](https://github.com/google/guava/blob/v18.0/guava/src/com/google/common/util/concurrent/AbstractListeningExecutorService.java "AbstractListeningExecutorService")
如果我们的不太实用的任务无法使用ListeningExecutorService
或提供的Futures
实用程序方法,我们也可以使用com.google.common.util.concurrent.SettableFuture
,并且我们需要手动设置未来值。对于更复杂的合适的,我们还可以考虑com.google.common.util.concurrent.AbstractFuture.
3.添加侦听器/截图2
我们可以**ListenableFuture
添加侦听器的一种Futures.addCallback(),
注册角色,使我们可以在成功或失败发生时访问结果或异常:**
Executor listeningExecutor = Executors.newSingleThreadExecutor(); ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask() Futures.addCallback(asyncTask, new FutureCallback<Integer>() { @Override public void onSuccess(Integer result) { // do on success } @Override public void onFailure(Throwable t) { // do on failure } }, listeningExecutor);
我们**可以通过直接将侦听器添加到ListenableFuture.
**请注意,此侦听器将在未来或异常完成时运行。 另请注意,我们无权访问还异步任务的结果:
Executor listeningExecutor = Executors.newSingleThreadExecutor(); int nextTask = 1; Set<Integer> runningTasks = ConcurrentHashMap.newKeySet(); runningTasks.add(nextTask); ListenableFuture<Integer> asyncTask = new ListenableFutureService().succeedingTask() asyncTask.addListener(() -> runningTasks.remove(nextTask), listeningExecutor);
4.复杂的用法
现在让我们看看如何在更复杂的场景中使用这些期货。
4.1.扇入
我们有时可能需要一些异步任务并收集它们的结果,通常是个别扇入操作。
Guava 为我们提供了两种确定的方法。但是,我们应该谨慎地选择我们要求的正确方法。假设我们需要协调以下几点任务:
ListenableFuture<String> task1 = service.fetchConfig("config.0"); ListenableFuture<String> task2 = service.fetchConfig("config.1"); ListenableFuture<String> task3 = service.fetchConfig("config.2");
入扇多个期货的一种方法的英文使用Futures.allAsList()
方法。如果所有期货都成功,网求允许这我们按照提供的期货的顺序收集所有期货的查询查询结果。如果这些未来的中的任何一个失败,那么整个结果就是一个失败的未来:
ListenableFuture<List<String>> configsTask = Futures.allAsList(task1, task2, task3); Futures.addCallback(configsTask, new FutureCallback<List<String>>() { @Override public void onSuccess(@Nullable List<String> configResults) { // do on all futures success } @Override public void onFailure(Throwable t) { // handle on at least one failure } }, someExecutor);
如果我们需要收集所有异步任务的结果,不管它们是否失败,都可以使用Futures.successfulAsList()
。这将返回一个列表,其结果将与传递给参数的任务具有相同的顺序,失败的任务将在列表中的单独位置分配null
ListenableFuture<List<String>> configsTask = Futures.successfulAsList(task1, task2, task3); Futures.addCallback(configsTask, new FutureCallback<List<String>>() { @Override public void onSuccess(@Nullable List<String> configResults) { // handle results. If task2 failed, then configResults.get(1) == null } @Override public void onFailure(Throwable t) { // handle failure } }, listeningExecutor);
在上面的情况下,我们应该小心未来的任务,通常,如果null
面对失败的任务(也将结果设置为null
)无法解决。
4.2.用组合器扇入
如果我们需要协商一些返回不同结果的期货,上面的解决方案可能还很神奇。
与简单的扇入操作类似,番石榴为我们提供了两种变体;一种在所有任务成功完成时成功,一种在使用Futures.whenAllSucceed()
和Futures.whenAllComplete()
方法时部分任务失败也成功。
让我们看看如何使用Futures.whenAllSucceed()
组合来自多种期货的不同结果类型:
ListenableFuture<Integer> cartIdTask = service.getCartId(); ListenableFuture<String> customerNameTask = service.getCustomerName(); ListenableFuture<List<String>> cartItemsTask = service.getCartItems(); ListenableFuture<CartInfo> cartInfoTask = Futures.whenAllSucceed(cartIdTask, customerNameTask, cartItemsTask) .call(() -> { int cartId = Futures.getDone(cartIdTask); String customerName = Futures.getDone(customerNameTask); List<String> cartItems = Futures.getDone(cartItemsTask); return new CartInfo(cartId, customerName, cartItems); }, someExecutor); Futures.addCallback(cartInfoTask, new FutureCallback<CartInfo>() { @Override public void onSuccess(@Nullable CartInfo result) { //handle on all success and combination success } @Override public void onFailure(Throwable t) { //handle on either task fail or combination failed } }, listeningExecService);
如果我们需要允许部分失败,我们可以使用Futures.whenAllComplete()
。Futures.getDone()
ExecutionException
4.3. 转型
有时我们需要转换成功后的结果。Futures.transform()
和Futures.lazyTransform()
为我们提供了两种方法。
我们让看看如何使用Futures.transform()
来转换未来的结果只要转换计算量不大,就可以使用它:
ListenableFuture<List<String>> cartItemsTask = service.getCartItems(); Function<List<String>, Integer> itemCountFunc = cartItems -> { assertNotNull(cartItems); return cartItems.size(); }; ListenableFuture<Integer> itemCountTask = Futures.transform(cartItemsTask, itemCountFunc, listenExecService);
**我们还可以使用Futures.lazyTransform()
**将转换函数转化为java.util.concurrent.Future.
我们需要记住的,这个选项不会返回一个可能ListenableFuture
的java.util.concurrent.Future
get()
结果未来上调用时普遍应用转换函数。
4.4.链式期货
我们可能会遇到我们的期货调用其他期货的情况。在这种情况下,番石榴为我们提供了async()
变体来安全地链接这些期货以一个接一个地执行。
让我们看看如何使用Futures.submitAsync()
从提交Callable
内部调用未来:
AsyncCallable<String> asyncConfigTask = () -> { ListenableFuture<String> configTask = service.fetchConfig("config.a"); TimeUnit.MILLISECONDS.sleep(500); //some long running task return configTask; }; ListenableFuture<String> configTask = Futures.submitAsync(asyncConfigTask, executor);
如果我们想要真正的链接,其中一个未来的结果被赋予了一个未来的计算中,我们可以使用Futures.transformAsync()
:
ListenableFuture<String> usernameTask = service.generateUsername("john"); AsyncFunction<String, String> passwordFunc = username -> { ListenableFuture<String> generatePasswordTask = service.generatePassword(username); TimeUnit.MILLISECONDS.sleep(500); // some long running task return generatePasswordTask; }; ListenableFuture<String> passwordTask = Futures.transformAsync(usernameTask, passwordFunc, executor);
Guava 还为我们提供了Futures.scheduleAsync()
和Futures.catchingAsync()
来分别提交计划任务并提供错误恢复的回退任务async()
。
五、使用注意事项
现在让我们研究一下在使用期货时可能遇到的一些常见陷阱以及如何避免它们。
5.1. 工作与聆听执行者
在使用 Guava 期货时,了解工作执行者和监听执行者之间的区别很重要。例如,如果获取我们有一个真实任务来配置:
public ListenableFuture<String> fetchConfig(String configKey) { return lExecService.submit(() -> { TimeUnit.MILLISECONDS.sleep(500); return String.format("%s.%d", configKey, new Random().nextInt(Integer.MAX_VALUE)); }); }
假设我们想为上述未来附加一个侦听器:
ListenableFuture<String> configsTask = service.fetchConfig("config.0"); Futures.addCallback(configsTask, someListener, listeningExecutor);
请注意,lExecService
是运行异步任务的执行器,而listeningExecutor
器的执行器。
如上所示,我们应该始终考虑将这两个执行程序分开,以避免出现我们的侦听器和工作线程争用相同线程池资源的情况。共享同一个执行器可能会导致我们的繁重任务使听者执行饿死了。或者一个写得很惨的重量级演讲最终被排除在了我们的重要任务上。
5.2. 小心使用directExecutor()
我们MoreExecutors.directExecutor()
和MoreExecutors.newDirectExecutorService()
来更容易地处理异步执行,但在生产代码中我们应该小心使用它们。
当我们从上述方法中获取到执行或执行时,我们都提交给它的任何任务,不管是重量级的还是监听器,都在当前线程上执行。很危险。
比如使用一个directExecutor
,在 UI 线程中向它提交一个重量级的任务,就会自动呈现我们的 UI 线程。
我们也可能循环这样一种情况:我们的听众最终会减慢我们所有其他侦听器(即使是那些与那些人directExecutor
)。因为 Guava 在各自的Executors,
while
中的所有侦听器,但directExecutor
会导致侦听器运行在与while
循环相同的线程中。
5.3. 钩词不好
在使用链式期货时,我们应该注意不要以创建期货的方式从另一个期货内部调用一个期货:
public ListenableFuture<String> generatePassword(String username) { return lExecService.submit(() -> { TimeUnit.MILLISECONDS.sleep(500); return username + "123"; }); } String firstName = "john"; ListenableFuture<ListenableFuture<String>> badTask = lExecService.submit(() -> { final String username = firstName.replaceAll("[^a-zA-Z]+", "") .concat("@service.com"); return generatePassword(username); });
如果我们曾经遇到过这种情况,ListenableFuture<ListenableFuture<V>>,
我们应该知道这是一个真实的不可能的未来,因为外部未来的取消和完成有可能发生的事情,并且取消可能不会传播到自己的未来。
如果我们看到上述情况,我们应该总是使用Futures.async()
变体以连接的方式安全地解开这些期货。
5.4. 小心JdkFutureAdapters.listenInPoolThread()
Guava 建议我们利用其ListenableFuture
最佳方式是将所有使用Future
的代码转换为ListenableFuture.
如果在某些情况下这种转换不可行,番石榴我们为提供了适配器来使用JdkFutureAdapters.listenInPoolThread()
覆盖来来到来到lái这一点一。虽然这看起来很有帮助,但番石榴警告我们这些是重量级的适配器,应该尽可能避免。
六、结论
在本文中,我们看到了如何使用 Guava 的ListenableFuture
来我们对期货的使用,以及如何使用Futures
API 来轻松地使用这些期货。
我们还看到了在使用这些期货和提供的执行时可能会犯的一些常见程序错误。
0 评论