行为参数化

  • 行为参数化,就是一个方法接受多个不同的行为作为参数,并在内部使用它们,完成不同行为的能力。
  • 行为参数化让代码更好地适应不断变化的要求,减轻未来的工作量。
  • 传递代码,就是将新行为作为参数传递给方法。Java API中包括排序、线程和GUI处理。

Lambda

在哪里使用Lambda?

函数式接口

函数式接口即只定义一个抽象方法的接口。

Lambda表达式允许你直接以内联的形式为函数式接口的抽象方法提供实现,并把整个表达式作为函数式接口的实例(具体说来,是函数式接口一个具体实现的实例)。

使用:

1
2
3
4
@FunctionalInterface
public interface BufferedReaderProcessor {
String process(BufferedReader b) throws IOException;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class BufferTest {
public static String processFile(BufferedReaderProcessor p) throws IOException {
try (BufferedReader br =
new BufferedReader(new FileReader("data.txt"))) {
return p.process(br);
}
}

public static void main(String[] args) throws IOException {
String oneLine = processFile((BufferedReader br) -> br.readLine());
// String oneLine = processFile(BufferedReader::readLine);
String twoLines = processFile(
(BufferedReader br) -> br.readLine() + br.readLine());
}
}

使用函数式接口

函数式接口很有用,因为抽象方法的签名可以描述Lambda表达式的签名。函数式接口的抽象方法的签名称为函数描述符。

Java API中已经有了几个函数式接口,比如 Comparable 、 Runnable 和Callable 。

Java 8 中有Predicate 、 Consumer 和 Function。

1
2
3
4
5
6
@FunctionalInterface
public interface Predicate<T>{
boolean test(T t);
}

Predicate<String> nonEmptyStringPredicate = (String s) -> !s.isEmpty();
1
2
3
4
5
6
7
8
9
10
11
12
13
@FunctionalInterface
public interface Consumer<T>{
void accept(T t);
}

public static <T> void forEach(List<T> list, Consumer<T> c){
for(T i: list){
c.accept(i);
}
}

forEach(
Arrays.asList(1,2,3,4,5),(Integer i) -> System.out.println(i));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@FunctionalInterface
public interface Function<T, R>{
R apply(T t);
}

public static <T, R> List<R> map(List<T> list,Function<T, R> f){
List<R> result = new ArrayList<>();
for(T s: list){
result.add(f.apply(s));
}
return result;
}

// [7, 2, 6]
List<Integer> l = map(Arrays.asList("lambdas","in","action"),(String s) -> s.length());

原始类型特化

比如,在下面的代码中,使用 IntPredicate 就避免了对值 1000 进行装箱操作,但要是用 Predicate<Integer> 就会把参数 1000 装箱到一个 Integer 对象中:

1
2
3
4
5
6
7
8
public interface IntPredicate{
boolean test(int t);
}
IntPredicate evenNumbers = (int i) -> i % 2 == 0;
evenNumbers.test(1000);

Predicate<Integer> oddNumbers = (Integer i) -> i % 2 == 1;
oddNumbers.test(1000);

funinter.PNG

funinter2.png

因函数式接口都不允许抛出受检异常。如果需要Lambda表达式抛出异常,有两种方法:定义一个自己的函数式接口,并声明受检异常,或把Lambda包在一个try/catch块中。

类型检查、类型推断以及限制

类型推断

Java编译器会从上下文(目标类型)推断出用什么函数式接口来配合Lambda表达式,这意味着它也可以推断出适合Lambda的签名,因为函数描述符可以通过目标类型来得到。这样做的好处在于,编译器可以了解Lambda表达式的参数类型,这样就可以在Lambda语法中省去标注参数类型。

方法引用

可以视为某些Lambda的快捷写法。可以重复使用现有的方法定义,并像Lambda一样传递它们。

如果一个Lambda表达式只是“直接调用这个方法”,那最好还是用名称来调用它,而不是去描述如何调用它。

复合Lambda表达式的有用方法

比较器复合

1
inventory.sort(comparing(Apple::getWeight).reversed());

比较器链

1
2
3
inventory.sort(comparing(Apple::getWeight)
.reversed()
.thenComparing(Apple::getCountry));

谓词复合

1
2
3
Predicate<Apple> redAndHeavyAppleOrGreen =
redApple.and(a -> a.getWeight() > 150)
.or(a -> "green".equals(a.getColor()));

函数复合

1
2
3
4
Function<Integer, Integer> f = x -> x + 1;
Function<Integer, Integer> g = x -> x * 2;
Function<Integer, Integer> h = f.andThen(g);
int result = h.apply(1);

它允许以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。

只能遍历一次。

内部迭代与外部迭代

Java Collection外部迭代,Stream内部迭代。

内部迭代优点:可以透明地并行处理, 或者用更优化的顺序进行处理。

中间操作

诸如 filter 或 sorted 等中间操作会返回另一个流。这让多个操作可以连接起来形成一个查询。重要的是,除非流水线上触发一个终端操作,否则中间操作不会执行任何处理——它们很懒。

终端操作

终端操作会从流的流水线生成结果。其结果是任何不是流的值,比如 List 、 Integer ,甚至 void 。

使用流

三件事:一数据源、二中间操作链、三终端操作。

流的流水线理念类似于构建器模式。

stream.PNG

筛选和切片

用谓词筛选

1
2
3
List<Dish> vegetarianMenu = menu.stream()
.filter(Dish::isVegetarian)
.collect(toList());

流还支持一个叫作 distinct 的方法,它会返回一个元素各异(根据流所生成元素的hashCode 和 equals 方法实现)的流。

截短流

流支持 limit(n) 方法,该方法会返回一个不超过给定长度的流。

跳过元素

流还支持 skip(n) 方法,返回一个扔掉了前 n 个元素的流。

映射

流的扁平化

Arrays.stream() 的方法可以接受一个数组并产生一个流

1
2
3
4
5
6
List<String> uniqueCharacters =
words.stream()
.map(w -> w.split(""))
.flatMap(Arrays::stream)
.distinct()
.collect(Collectors.toList());

flatMap合并为一个流。

查找和匹配

allMatch、anyMatch、noneMatch、findFirst、findAny

短路求值

对于流而言,某些操作(例如 allMatch 、 anyMatch 、 noneMatch 、 findFirst 和 findAny )不用处理整个流就能得到结果。只要找到一个元素,就可以有结果了。同样, limit 也是一个短路操作:它只需要创建一个给定大小的流,而用不着处理流中所有的元素。在碰到无限大小的流的时候,这种操作就有用了:它们可以把无限流变成有限流。

归约

算术
1
2
3
int sum = numbers.stream().reduce(0, (a, b) -> a + b);

int sum = numbers.stream().reduce(0, Integer::sum);
最值
1
Optional<Integer> max = numbers.stream().reduce(Integer::max);

原始类型流特化

IntStream、DoubleStream、LongStream,分别将流中的元素特化为int、long和double,从而避免了暗含的装箱成本。还带来了进行常用数值归约的新方法,如sum、max。

IntStream 还支持其他的方便方法,如max 、 min 、 average 等。

映射到数值流
1
2
3
int calories = menu.stream()
.mapToInt(Dish::getCalories)
.sum();
转换回对象流

IntStream 上的操作只能产生原始整数: IntStream 的 map 操作接受的Lambda必须接受 int 并返回 int。

1
Stream<Integer> stream = intStream.boxed();
数值范围

Java 8引入了两个可以用于 IntStream 和 LongStream 的静态方法,帮助生成这种范围:range 和 rangeClosed 。这两个方法都是第一个参数受起始值,第二个参数接受结束值。但range 是不包含结束值的,而 rangeClosed 则包含结束值。

构建流

由值创建流

静态方法Stream.of,通过显示值创建流。可接受任意数量的参数。

1
Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");

可使用empty得到一个空流。

1
Stream<String> emptyStream = Stream.empty();

由数组创建流

Arrays.stream接受数组作为参数,例如,可将一个原始类型int转换成IntStream。

1
2
int[] numbers = {2, 3, 5, 7, 11, 13};
int sum = Arrays.stream(numbers).sum();

由文件生成流

Java NIO(非阻塞IO)

由函数生成流:创建无限流

Stream API提供了两个静态方法来从函数生成流: Stream.iterate 和 Stream.generate 。

1
2
3
Stream.iterate(0, n -> n + 2)
.limit(10)
.forEach(System.out::println);

都是按需生成,但generate不是依次对每个新生成的值应用函数。它接受一个Supplier<T>类型的Lambda提供新的值。

1
2
3
Stream.generate(Math::random)
.limit(5)
.forEach(System.out::println);

用流<收集>数据

函数式编程相对于指令式编程一个主要的优势:只需指出希望的结果——“做什么”,而不用操心执行的步骤——“如何做”。

预定义收集器

三大功能:

  • 将流元素归约和汇总为一个值
  • 元素分组
  • 元素分区

归约和汇总

1
2
long howManyDishes = menu.stream().count();
int howManyDishes = menu.size();
查找流中的最大值和最小值

Collectors.maxBy 和Collectors.minBy ,用来计算流中的最大或最小值。这两个收集器接收一个 Comparator 参数来比较流中的元素。

1
2
3
4
5
6
7
8
9
10
Comparator<Dish> dishCaloriesComparator =
Comparator.comparingInt(Dish::getCalories);

Optional<Dish> mostCalorieDish =
menu.stream()
.collect(maxBy(dishCaloriesComparator));

Optional<Dish> mostCalorieDish =
menu.stream()
.max(dishCaloriesComp);
汇总
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 求和
int totalCalories =
menu.stream().collect(summingInt(Dish::getCalories));

int totalCalories =
menu.stream()
.mapToInt(Dish::getCalories).sum();

// 求平均值
double avgCalories =
menu.stream().collect(averagingInt(Dish::getCalories));

OptionalDouble avgCalories2 =
menu.stream()
.mapToDouble(Dish::getCalories)
.average();

有时候,我们可能希望得到两个或更多这样的结果,而且只需要一次操作就可以完成。此时,可以使用summarizingInt工厂方法返回收集器。例如,通过一次summarinzing操作,即可得到菜肴热量的总和、平均值、最大值、最小值:

1
2
IntSummaryStatistics menuStatistics =
menu.stream().collect(summarizingInt(Dish::getCalories));

收集到一个IntSummaryStatisics中,它提供了一个取值方法来访问结果。

连接字符串

joining工厂方法返回一个收集器会把对流中的每一个对象应用toString方法得到的字符串连接成一个字符串。

1
String shortMeu = menu.stream().map(Dish::getName).collect(joining());

joining内部使用StringBuilder。如果Dish类有一个toString方法来返回菜肴的名称,那无需用提取每一道菜名称的函数来对原流做映射,就能得到相同的结果:

1
String shortMenu = menu.stream().collect(joining());

joining()工厂方法有一个重载版本,可接受分界符,

1
String shortMenu = 		menu.stream().map(Dish::getName).collect(joining(", "));

广义的归约汇总

以上所有的收集器,都可以用reducing方法定义归约过程。以上方法只是方便程序员而已。

collect与reduce: collect()适合表达可变容器上的归约,更关键的是它适合并行操作。

根据情况选择最佳方案

以下开始发挥collect的作用

分组

Collections.groupingBy(Function)

Function称为分类函数。分组操作的结果是一个Map。

1
2
Map<Dish.Type, List<Dish>> dishesByType =
menu.stream().collect(groupingBy(Dish::getType));

可用Lambda编写复杂的分类函数。

Collections.groupingBy(Function,Collector)

接受collector类型的第二个参数,进行二级分组。可把一个内层的groupingBy传递给外层的groupingBy,作为二级分类标准。

这里的collector可以是任意类型,例如counting(),maxBy()。

maxBy(Comparator)

1
2
3
4
Map<Dish.Type, Optional<Dish>> mostCaloricByType =
menu.stream()
.collect(groupingBy(Dish::getType,
maxBy(comparingInt(Dish::getCalories))));

groupingBy 收集器只有在应用分组条件后,第一次在流中找到某个键对应的元素时才会把键加入分组 Map 中。这意味着 Optional 包装器在这里不是很有用。

把收集器返回的结果转换为另一种类型,你可以使用Collectors.collectingAndThen 工厂方法返回的收集器。

1
2
3
4
5
6
Map<Dish.Type, Dish> mostCaloricByType =
menu.stream()
.collect(groupingBy(Dish::getType,
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)),
Optional::get)));
1
2
3
Map<Dish.Type, Integer> totalCaloriesByType =
menu.stream().collect(groupingBy(Dish::getType,
summingInt(Dish::getCalories)));
1
2
3
4
5
6
7
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType =
menu.stream().collect(
groupingBy(Dish::getType, mapping(
dish -> { if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT; },
toSet() )));

注意在上一个示例中,对于返回的 Set 是什么类型并没有任何保证。但通过使用 toCollection ,你就可以有更多的控制。

1
2
3
4
5
6
7
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType =
menu.stream().collect(
groupingBy(Dish::getType, mapping(
dish -> { if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT; },
toCollection(HashSet::new) )));
分区

它是分组的特殊情况:由一个谓词(返回布尔值的函数)作为分类函数,称为分区函数。

1
2
3
4
5
Map<Boolean, List<Dish>> partitionedMenu =
menu.stream()
.collect(partitioningBy(Dish::isVegetarian));

List<Dish> vegetarianDishes = partitionedMenu.get(true);

分区的好处在于保留了分区函数返回 true 或 false 的两套流元素列表。

1
2
3
4
5
6
Map<Boolean, Dish> mostCaloricPartitionedByVegetarian =
menu.stream().collect(
partitioningBy(Dish::isVegetarian,
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)),
Optional::get)));

可以仿造groupingBy多级分区。


收集器接口

可为collector接口提供自己的实现,从而自由地创建自定义归约操作。

1
2
3
4
5
6
7
public interface Collector<T, A, R> {
Supplier<A> supplier();
BiConsumer<A, T> accumulator();
Function<A, R> finisher();
BinaryOperator<A> combiner();
Set<Characteristics> characteristics();
}

并行流

对顺序流调用parallel方法,并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,表示你想让调用 parallel 之后进行的所有操作都并行执行。

并行流内部使用了默认的ForkJoinPool,它默认的线程数量是处理器的数量,可由Runtime.getRuntime().availableProcessors()得到。

避免共享可变状态,确保并行 Stream 得到正确的结果。

并行流无需显式地处理线程和同步问题。

自定义Spliterator

因为原始的 String 在任意位置拆分,所以有时一个词会被分为两个词,然后数了两次。这就说明,拆分流会影响结果,而把顺序流换成并行流就可能使结果出错。

如何解决这个问题呢?解决方案就是要确保 String 不是在随机位置拆开的,而只能在尾
拆开。要做到这一点,你必须为 Character 实现一个 Spliterator ,它只能在两个词之间拆开String (如下所示),然后由此创建并行流。

Optional替代Null

如何为缺失的对象建模?

Optional:当变量存在时,Optional类只是对类的简单封装。当变量不存在时,缺失的值被建模成一个“空”的Optional对象,由方法Optional.empty返回。 Optional.empty() 方法是一个静态工厂方法,它返回 Optional 类的特定单一实例。

Optional.empty()用处:

应用Optional

创建Optional对象:
  1. 声明一个空的Optional

    Optional<Car> optCar= Optional.empty();

  2. 依据一个非空值创建Optional

    Optional<Car> optCar = Optional.of(car);

    若car是null,抛出NPE。

  3. 可接受null的Optional

    Optional<Car> optCar = Optional.ofNullable(car);

Optional提供的get方法在遭遇空的Optional对象时,也会抛出异常。

怎么办?

使用map从Optional对象中提取和转换值
1
2
3
4
5
6
public String getCarInsuranceName(Optional<Person> person) {
return person.flatMap(Person::getCar)
.flatMap(Car::getInsurance)
.map(Insurance::getName)
.orElse("Unknown"); // Optional为空时,设置默认值
}

由于 Optional 类设计时就没特别考虑将其作为类的字段使用,所以它也并未实现
Serializable 接口。

若要实现序列化的域模型,作为替代方案,提供一个能访问声明为 Optional 、变量值可能缺失的接口。

1
2
3
4
5
6
public class Person {
private Car car;
public Optional<Car> getCarAsOptional() {
return Optional.ofNullable(car);
}
}

CompleptableFuture:组合式异步编程

Future接口

它设计初衷是为将来某个时刻会发生的结果进行建模。

这种编程方式让你的线程可以在 ExecutorService 以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的 get 方法去获取操作的结果。如果操
作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。

Future 接口的局限性:很难表述 Future 结果之间的依赖性。

于是CompletableFuture,CompletableFuture 和 Future 的关系就跟 Stream 和 Collection 的关系一样。

使用 CompletableFuture 构建异步应用

1
2
3
4
5
6
7
8
9
10
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();

// 无需等待尚未结束的计算,直接返回Future对象
return futurePrice;
}
使用工厂方法 supplyAsync 创建 CompletableFuture
1
2
3
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

supplyAsync 方法接受一个生产者( Supplier )作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。

生产者方法会交由 ForkJoinPool池中的某个执行线程( Executor )运行,但是你也可以使用 supplyAsync 方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。一般而言,向 CompletableFuture 的工厂方法传递可选参数,指定生产者方法的执行线程是可行的。

使用 CompletableFuture 发起异步请求

这里使用两个不同的Stream流水线的原因是:流操作之间存在延迟。如果在单一的流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。

寻找更好的方案

当任务数超过四个时,定制执行器。

N(threads) = N(CPU) U(CPU) (1 + W/C)
其中:

  • N CPU 是处理器的核的数目,可以通过 Runtime.getRuntime().availableProce-
    ssors() 得到
  • U CPU 是期望的CPU利用率(该值应该介于0和1之间)
  • W/C是等待时间与计算时间的比率
1
2
3
4
5
6
7
8
9
private final Executor executor =
Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});

Java程序无法终止或者退出一个正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。

并行——使用流还是 CompletableFutures ?

  • 如果进行的是计算密集型的操作,并且没有IO,推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有线程都是计算密集型,根据以上估算公式,就没有必要创建比处理器核数更多的线程)。
  • 反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture 灵活性更好,你可以像前文讨论的那样,依据等待/计算,或者W/C的比率设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

对多个异步任务进行流水线操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public List<String> findDiscountFuture(String product) {
List<CompletableFuture<String>> futureList = shops.stream()
.map(shop ->
CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " + shop.getPrice(product),executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(toList());

return futureList.stream()
.map(CompletableFuture::join)
.collect(toList());
}

thenCompose方法允许对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。换句话说,即创建两个CompletableFuture对象调用thenCompose,并向其传递一个Function。当第一个CompletableFuture执行完毕后,它的结果结果将作为该函数的参数, 这个函数返回值是以第一个 CompletableFuture 的返回做输入计算出的第二个 CompletableFuture 对象。

thenCompose 方法像 CompletableFuture 类中的其他方法一
样,也提供了一个以 Async 后缀结尾的版本 thenComposeAsync 。通常而言,名称中不带 Async的方法和它的前一个任务一样,在同一个线程中运行;而名称以 Async 结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。就这个例子而言,第二个CompletableFuture 对象的结果取决于第一个CompletableFuture ,所以无论你使用哪个版本的方法来处理 CompletableFuture 对象,对于最终的结果,或者大致的时间而言都没有多少差别。我们选择 thenCompose 方法的原因是因为它更高效一些,因为少了很多线程切换的开销。

主线程还能执行其它重要的操作,如响应UI。

另一种比较常见的情况是,你需要将两个完全不相干的 CompletableFuture 对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务。

1
2
3
4
5
6
7
Future<Double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(
() -> exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
);

这里thenCombine方法,它接受BiFunction作为第二个参数,这个参数定义了两个CompletableFuture 对象完成计算后,如何合并结果。它的Async版本是:导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。其中的两个CompletableFuture 对象是在不同的线程执行的。

CompletableFuture 利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。

响应 CompletableFuture 的 completion 事件

避免的首要的问题是,等待创建一个包含了所有价格的List创建完成。应该直接处理CompletableFuture。这样每个 CompletableFuture 都在为某个商店执行必要的操
作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Stream<CompletableFuture<String>> findDiscountStream(String product) {
return shops.stream()
.map(shop ->
CompletableFuture.supplyAsync(
() -> shop.getName() + " price is " + shop.getPrice(product),
executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));

}

findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));

thenAccept方法也有Async版本。异步版本会对处理结果的消费者进行调度,从线程池中选择一个新的线程继续执行,不再由同一个线程完成CompletableFuture的所有任务。

如果想避免不必要的上下文切换,避免在等待线程上浪费时间,尽快响应CompletableFuture的completion事件,可以不使用异步版本。

由于 thenAccept 方法已经定义了如何处理 CompletableFuture 返回的结果,一旦
CompletableFuture 计算得到结果,它就返回一个CompletableFuture<Void>

1
2
3
4
CompletableFuture[] futures = findPricesStream("myPhone")
.map(f -> f.thenAccept(System.out::println))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();

allOf 工厂方法接收一个由 CompletableFuture 构成的数组,数组中的所有CompletableFuture 对象执行完成之后,它返回一个CompletableFuture<Void> 对象。这意味着,如果你需要等待最初 Stream 中的所有 CompletableFuture 对象执行完毕,对 allOf 方法返回的CompletableFuture 执行 join 操作是个不错的主意。

然而在另一些场景中,你可能希望只要 CompletableFuture 对象数组中有任何一个执行完毕就不再等待,比如,你正在查询两个汇率服务器,任何一个返回了结果都能满足你的需求。在这种情况下,你可以使用一个类似的工厂方法 anyOf 。该方法接收一个 CompletableFuture 对象构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的CompletableFuture<Object>

新的日期和时间API