《Java8实战》读书笔记

第一章 为什么要关心 Java 8

从有点修正主义的角度来看,在 Java 8 中加入 Streams 可以看作把另外两项扩充加入 Java 8 的直接原因:把代码传递给方法的简洁方式(方法引用、Lambda)和接口中的默认方法。

多核 CPU 的每个处理器内核都有独立的高速缓存。加锁需要这些高速缓存同步运行,然而这又需要在内核间进行较慢的缓存一致性协议通信。

流是一系列数据项,一次只生成一项。程序可以从输入流中一个一个读取数据项,然后以同样的方式将数据项写入输出流。一个程序的输出流很可能是另一个程序的输入流。

编程语言中的函数一词通常是指方法,尤其是静态方法;这是在数学函数,也就是没有副作用的函数之外的新含义。

在 Java 8 里写下 File::isHidden 的时候,你就创建了一个方法引用,你同样可以传递它。

Collection 主要是为了存储和访问数据,而 Stream 则主要用于描述对数据的计算。

第二章 通过行为参数化传递代码

行为参数化,就是一个方法接受多个不同的行为作为参数,并在内部使用它们,完成不同行为的能力。

行为参数化可让代码更好地适应不断变化的要求,减轻未来的工作量。

第三章 Lambda 表达式

只有在接受函数式接口的地方才可以使用 Lambda 表达式。

函数式接口

只定义一个抽象方法(不包括默认方法)的接口,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 函数式接口
@FunctionalInterface
public interface Runnable {
void run();
}

// 使用Lambda
Runnable R1 = () -> System.out.println("hello world");
// 使用匿名类
Runnable R2 = new Runnable() {
public void run() {
System.out.println("hello world");
}
}

函数式接口的抽象方法的签名称为函数描述符。

JDK 中已设计好的一些函数式接口(大多数在 java.util.function 包里):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface Comparable<T> {
public int compareTo(T o);
}

@FunctionalInterface
public interface Runnable {
public abstract void run();
}

@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}

@FunctionalInterface
public interface Predicate<T> {
boolean test(T t);
}

@FunctionalInterface
public interface Consumer<T>{
void accept(T t);
}

@FunctionalInterface
public interface Function<T, R> {
R apply(T t);
}

泛型只能绑定到引用类型,因此 Java 中有自动拆装箱机制,但是自动拆装箱会有内存消耗和性能损失,所以 Java8 给上述函数式接口扩展了许多专门的版本。例如:

1
2
3
IntPridicate LongPredicate DoublePredicate
IntConsumer LongConsumer DoubleConsumer
IntFunction<R> IntToDoubleFunction ToDoubleFunction<T>

任何 Lambda 表达式都不允许抛出受检异常(checked exception),如果需要 Lambda 表达式来抛出异常,有两种办法:

定义一个自己的函数式接口,并声明受检异常:

1
2
3
public interface BufferedReaderProcessor {
String process(BufferedReader b) throws IOException;
}

显式捕捉受检异常:

1
2
3
4
5
6
7
() -> {
try {
...
}catch (Exception e){
...
}
}

类型检查、类型推断以及限制

Lambda 的类型是从使用 Lambda 表达式的上下文中推断出来的,与对应函数式接口的函数描述符保持兼容。

Lambda 参数列表中的类型可以不写,Java 编译器会自动推断出来,但有时写上会增加可读性。

Lambda 表达式只能捕获外层局部变量一次(捕获实例变量可以被看做捕获最终局部变量 this),下面的代码就无法通过编译:

1
2
3
4
int portNumber = 8000;
Runnable r = () -> System.out.println(portNumber);
portNumber = 8080;
// 将会编译错误,Lambda表达式引用的局部变量必须是最终的(final)或事实上是最终的。

为什么会存在该限制?

因为局部变量保存在栈上,并且依附于栈所在的线程上。如果允许捕获可改变的局部变量且 Lambda 是在一个单独的线程中使用的,那么就会造成数据一致性问题,所以引用的局部变量必须是 final 的。

但实例变量不受影响,因为其保存在堆上,而堆是在线程之间共享的。

方法引用

1
2
(Apple a) -> a.getWeight()
Apple::getWeight // 方法引用写法

是 Lambda 表达式的快捷写法,显式地指明方法名称会使代码的可读性更好。

主要有四类:

  1. 指向静态方法的引用:Integer::parseInt
  2. 指向任意类型实例方法的引用:String::length
  3. 指向现有对象实例方法的引用:obj::getValue
  4. 指向构造函数的引用:String::new
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Supplier<Apple> c1 = Apple::new;
Apple a1 = c1.get();
// 等价于
Supplier<Apple> c1 = () -> new Apple();
Apple a1 = c1.get();

Function<Integer, Apple> c2 = Apple::new;
Apple a2 = c2.apply(100);
// 等价于
Function<Integer, Apple> c2 = weight -> new Apple(weight);
Apple a2 = c2.apply(100);

BiFunction<String, Integer, Apple> c3 = Apple::new;
Apple a3 = c3.apply("Red", 100);
// 等价于
BiFunction<String, Integer, Apple> c3 = (color, weight) -> new Apple(color, weight);
Apple a3 = c3.apply("Red", 100);

// 四个参数则需要自己定义函数式接口

复合 Lambda 表达式

函数式接口中的默认方法(comparing、reserved、and、or、compose、andThen)可以把多个 Lambda 表达式复合成为更复杂的表达式。

比较器复合

1
2
3
4
5
6
7
// 逆序
inventory.sort(Comparator.comparing(Apple::getWeight).reserved());

// 比较器链
inventory.sort(Comparator.comparing(Apple::getWeight)
.reserved() // 按重量递减
.thenComparing(Apple::getColor)); // 一样重时进一步按颜色排序

谓词复合

1
2
3
4
5
6
7
8
9
Predicate<Apple> notRedApple = redApple.negate(); // 产生现用predicate对象的非

Predicate<Apple> redAndHeavyApple =
redApple.and(a -> a.getWeight() > 150); // 链接两个谓词产生另一个predicate对象

Predicate<Apple> redAndHeavyAppleOrGreen =
redApple.and(a -> a.getWeight() > 150)
.or(a -> "green".equals(a.getColor()));
// 链接多个谓词构造更复杂的predicate对象

函数复合

1
2
3
4
5
6
7
8
Function<Integer, Integer> f = x -> x + 1;
Function<Integer, Integer> g = x -> x * 2;

Function<Integer, Integer> h = f.andThen(g); // 数学上为g(f(x))
int result = h.apply(1) // => 4

Function<Integer, Integer> h = f.compose(g); // 数学上为f(g(x))
int result = h.apply(1) // => 3

第四章 引入流

流是 Java API 的新成员,允许你使用声明式方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。可以把它们看作是遍历数据集的高级迭代器,流可以透明地并行处理,无需编写额外的多线程代码,充分利用了计算机多核特性。此外,你可以在流中把几个基础操作链接起来,来表达复杂的数据处理流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class Dish {
private final String name;
private final boolean vegetarian;
private final int calories;
private final Type type;

public enum Type {
MEAT, FISH, OTHER
}

public Dish(String name, boolean vegetarian, int calories, Type type) {
this.name = name;
this.vegetarian = vegetarian;
this.calories = calories;
this.type = type;
}

public String getName() {
return name;
}

public int getCalories() {
return calories;
}

public Type getType() {
return type;
}

@Override
public String toString() {
return name;
}

public boolean isVegetarian() {
return vegetarian;
}
}

public class Main {
private static List<Dish> menuList = Arrays.asList(
new Dish("pork",false, 800, Dish.Type.MEAT),
new Dish("beef", false, 700, Dish.Type.MEAT),
new Dish("chicken", false, 400, Dish.Type.MEAT),
new Dish("french fries", true, 530, Dish.Type.OTHER),
new Dish("rice", true, 350, Dish.Type.OTHER),
new Dish("season fruit", true, 120, Dish.Type.OTHER),
new Dish("pizza", true, 550, Dish.Type.OTHER),
new Dish("prawns", false, 300, Dish.Type.FISH),
new Dish("salmon", false, 300, Dish.Type.FISH)
);

public static void main(String[] args) {
List<String> lowCaloricDishesName =
Main.menuList.stream()
.filter(d -> d.getCalories() < 400) // 选出小于400卡的菜肴
.sorted(Comparator.comparing(Dish::getCalories)) // 依据热量排序
.map(Dish::getName) // 提取菜肴的名称
.collect(Collectors.toList()); // 讲所有名称保存在List中

lowCaloricDishesName.forEach(System.out::println));
}
}

Java 8 的集合新增一个stream()方法,它会返回一个流。集合代表数据,流代表操作。

创建流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 由值创建流
Stream<String> stream = Stream.of("Java 8", "Lambdas ", "In");

// 由数组创建流
int sum = Arrays.stream(new int[]{1, 2, 3, 4}).sum();

// 由文件生成流
// java.nio.file.Files中有许多静态方法都会返回一个流。
// Files.lines()返回一个由指定文件中的各行构成的字符串流
Stream<String> lines = Files.lines(Paths.get("data.txt"), Charset.defaultCharset());

// 或者直接由文件创建流
InputStream is = new FileInputStream(operatorForm.getFile());
Reader reader = new InputStreamReader(is, StandardCharsets.UTF_8);
BufferedReader buffReader = new BufferedReader(reader);
buffReader.lines().onClose(() -> {
try {
buffReader.close();
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}).map()

// 由函数生成流:创建无限流
// Stream.iterate和Stream.generate这两个静态方法可以创建无限流
// 生成斐波那契数列前10项:
Stream.iterate(new int[]{0, 1}, d -> new int[]{d[1], d[0] + d[1]})
.limit(10)
.map(d -> d[0])
.forEach(d -> System.out.printf("%d ", d));
// 生成10个随机数
Stream.generate(Math::random).limit(5).forEach(System.out::println);

流的操作

java.util.stream.Stream中的Stream接口定义了许多操作,可以分为两大类:中间操作、终端操作。

中间操作:会返回一个流,除非流水线上触发一个终端操作,否则中间操作不会执行任何处理。这是因为中间操作一般都可以合并起来在终端操作中一次性全部处理。

终端操作:会从流水线中产生结果,结果是任何不是流的值。

流与集合

  1. 流中的值是按需生成的,是生产者和消费者的关系,类似流媒体。
  2. 集合是急切创建的,只有包含了所有的结果才能使用,类似非流媒体。
  3. 流和迭代器类似,只能被遍历一次,遍历完成后就可以说这个流被消费掉了。
  4. 流是内部迭代的,集合则需要用户进行外部迭代。

第五章 使用流

Stream 接口方法

方法 说明
filter() 接受一个谓词(一个返回 boolean 的函数)作为参数,并返回一个包括所有符合谓词的元素的流。
distinct() 返回一个不存在相同元素的流。
limit() 返回一个不超过指定长度的流。
skip() 返回一个跳过前 n 个元素的流。如果流中元素不足 n 个,则返回空流。
map() 接受一个函数作为参数,这个函数会被应用到每个元素上,并将其映射为一个新的元素。例如提取名称:map(Dish::getName)
flatmap() 把一个流中的每个值都换成另一个流,然后把所有的流连接起来成为一个流。
anyMatch() 流中是否有一个元素能匹配给定的谓词。该方法返回 boolean,因此为终端操作。
allMatch() 检查谓词是否匹配所有元素。该方法返回 boolean,因此为终端操作。
noneMatch() 和 allMatch()相对。该方法返回 boolean,因此为终端操作。
findAny() 将返回当前流中的任意元素。返回后立即停止(短路、终端操作)
findFirst() 类似 findAny()找到第一个元素。(有并行限制)终端操作。
reduce() 规约、终端操作,例如数组元素求和。int sum = numbers.stream().reduce(0, Integer::sum) 或 Optional sum = numbers.stream().reduce(Integer::sum)
count() 获取元素个数,终端操作
collect() 终端操作(有状态、有界)

原始类型流特化

Java 8 引入了三个原始类型流特化接口来避免暗含的装箱成本:IntStreamDoubleStreamLongStream。分别将流中的元素特化为 Int、Double、Long。每个接口还有一些辅助方法比如:sum、max、min、average。

将流转换为特化版本的常用方法是:mapToIntmapToDoublemapToLong。还原方法为:boxed()

1
2
3
4
5
6
7
8
9
10
11
12
13
IntStream intStream = menu.stream().mapToInt(Dish::getCalories);
// 卡路里求和
int calories = intStream.sum();
// 最大值
OptionalInt maxCalories = intStream.max();
// 如果没有最大值,则使用默认值
int max = maxCalories.orElse(1);
// 卡路里范围区间(左闭右开)
Long count = intStream.range(1, 100).count();
// 卡路里范围区间(闭区间)
count = intStream.rangeClosed(1, 100).count();
// 还原为Steam流
Stream<Integer> stream = intStream.boxed();

第六章 用流收集数据

收集器

对流调用collect()将对流中的元素触发一个规约操作(由 Collector 来参数化)。规约操作遍历流中的每个元素,并让 Collector 进行处理。Collector 会对元素应用一个转换函数,并将结果累积在一个数据结构中,从而产生这一过程的最终输出。

Collectors 实用类中提供了许多静态工厂方法,可以方便的创建收集器(Collector)的实例。

工厂方法 说明
toList 把流中所有项目收集到一个 List
toSet 把流中所有项目收集到一个 Set, 不存在重复项
toCollection 把流中所有项目收集到给定的供应源创建的集合
counting 计算流中元素个数
summingInt 对流中项目的一个整数属性求和
averagingInt 对流中项目的一个整数属性求平均值
summarizingInt 收集关于流中项目 Integer 属性的统计值,例如最大、最小、总和和平均值
joining 连接对于流中的每个项目调用 toString 方法所生成的字符串
maxBy 选出按照给定比较器算出的最大元素(Optional)
minBy
reducing 从一个作为累加器的初始值开始,利用 BinaryOperator 与流中的元素逐个结合,从而将流规约为单个值。
collectingAndThen 包裹另外一个收集器,对其结果应用转换函数
groupingBy 根据项目的一个属性值进行分组,并将属性值作为结果的 Map 键。
partitionBy 对流中的每个项目应用谓词的结果来对项目进行分区。

自定义一个收集器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* 将Stream<T>中的所有元素收集到一个List<T>中
* public interface Collector<T, A, R> {...}
* T 就是流中要收集的项目的泛型
* A 是累加器类型,累加器在收集过程中用于累积部分结果的对象
* R 是收集操作得到的对象的类型
* @param <T>
*/
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
/**
* 创建一个空的累加器实例
* @return 空的List<T>类型构造函数
*/
public Supplier<List<T>> supplier() {
return ArrayList::new;
}

/**
* 将元素添加到结果容器
* @return
*/
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}

/**
* 合并两个结果容器
* @return
*/
public BinaryOperator<List<T>> combiner() {
return (listDst, listSrc) -> {
listDst.addAll(listSrc);
return listDst;
};
}

/**
* 对结果容器应用最终转换
* 在此处累加器对象恰好符合预期的最终结果,因此无需转换
* @return
*/
public Function<List<T>, List<T>> finisher() {
return Function.identity();
}

/**
* 定义收集器的行为
* @return 返回一个不可变的Characteristics集合
* Characteristics是一个包含三个项目的枚举
* UNORDERED 表示规约项目不受流中项目的遍历和累积顺序的影响
* CONCURRENT 表示该收集器可以并行规约流,如果没有必要标示,那它
* 仅在用于无序数据源时才可以进行规约
* IDENTITY_FINISH 表示完成器方法返回的函数是一个恒等函数,可以跳过。
* 这种情况表示累加器对象将会直接用作规约过程的最终结果
* 意味着累加器A不加检查地转换为结果R是安全的
*/
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT));
}
}

第七章 并行数据处理与性能

并行流

可以通过对收集源调用parallelStream()方法来把集合转换为并行流。

并行流就是一个把内容分为多个数据块,并用不同线程分别处理每个数据块的流。并行流背后使用的基础架构是 Java 7 引入的分支/合并框架。

1
2
3
4
5
6
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> 1 + i)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}

对并行流调用sequential()方法就可以把它变为顺序流。

并行流所使用的线程池

并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是处理器的数量,这个值由Runtime.getRuntime().availableProcessors()得到的。

通过设置java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小:

1
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");

流性能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class Main {
public static LongStream generateLongStream() {
return LongStream.iterate(1L, i -> 1 + i);
}

public static long parallelSum(long n) {
return generateLongStream()
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}

public static long sequentialSum(long n) {
return generateLongStream()
.limit(n)
.reduce(0L, Long::sum);
}

public static long measureSumPref(Function<Long, Long> adder, long n) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
long sum = adder.apply(n);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result: " + sum);
if (duration < fastest) fastest = duration;
}
return fastest;
}

public static void main(String[] args) {
System.out.println("ParallelSum done in: " +
measureSumPref(Main::parallelSum, 10_000_000) +
" msecs"); // => 198 msecs

System.out.println("SequentialSum done in: " +
measureSumPref(Main::sequentialSum, 10_000_000) +
" msecs"); // => 17 msecs
}
}

为什么求和方法的并行版本比顺序版本慢很多?

  1. 这里如果我们不使用LongStream而使用Stream的话会更慢,因为iterate生成的是装箱对象,必须拆箱成数字才能求和。
  2. 这里很难将iterate分解为多个独立块来并行执行。因为每次应用这个函数都会依赖前一次的结果。

这就说明并行编程可能很复杂,有时候甚至有点违反直觉。用的不对会使程序性能很差。

解决方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static LongStream generateLongStream(long n) {
// rangeClosed 直接产生原始类型的long数字,没有装箱拆箱
// 它会生成数字范围,很容易拆分为独立的小块。
return LongStream.rangeClosed(1, n);
}

public static long parallelSum(long n) {
return generateLongStream(n)
.parallel()
.reduce(0L, Long::sum);
}

public static long sequentialSum(long n) {
return generateLongStream(n)
.reduce(0L, Long::sum);
}

public static void main(String[] args) {
System.out.println("ParallelSum done in: " +
measureSumPref(Main::parallelSum, 10_000_000) +
" msecs"); // => 3 msecs

System.out.println("SequentialSum done in: " +
measureSumPref(Main::sequentialSum, 10_000_000) +
" msecs"); // => 6 msecs
}

正确的数据结构可以使并行保持最佳性能。

并行化是有代价的:计算过程需要进行额外的划分和创建线程、多个内核间的数据移动代价也很大。因此,要保证在内核中并行执行的工作时间比在内核之间传输数据的时间长。

并行的一些陷阱:

  1. 使用的算法改变了某些共享状态。
  2. 处理的数据量太小。
  3. 依赖于元素顺序的操作。
  4. 流背后的数据结构不易于分解。
  5. 每个组合操作的代价很大。

分支/合并框架

分支/合并框架的目的是以地柜方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体的结果。

它是ExecutorService接口的一个实现,它把子任务分配给线程池(ForkJoinPool)中的工作线程。

使用

要把任务提交到池,必须创建RecursiveTask<R>的一个子类,其中 R 是并行化任务(以及所有子任务)产生的结果类型。如果任务不返回结果则是RecursiveAction类型。

要定义RecursiveTask则要实现它唯一的抽象方法compute

1
2
3
4
5
6
7
8
9
10
11
// 该方法定义了将任务拆分为子任务的逻辑,以及无法再拆分
// 或不方便再拆分是,生成单个子任务结果的逻辑。
protected abstract R compute();

// if (任务足够小或不可分) {
// 顺序计算该任务
// } else {
// 将每个任务分成两个子任务
// 递归调用本方法,拆分每个子任务,等待所有子任务完成
// 合并每个子任务结果
// }

使用分支/合并框架求和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
// 要求和的数组
private final long[] numbers;
// 子任务处理的数组的开始和结束位置
private final int start, end;
// 不再将任务分解为子任务的数组大小
public static final long THRESHOLD = 10_000;

public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}

private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}

private long computeSequentially() {
long sum = 0;
for (int i = 0; i < numbers.length; i++) {
sum += numbers[i];
}
return sum;
}

@Override
protected Long compute() {
int length = end - start;
// 如果小于阈值,顺序执行求和
if (length < THRESHOLD) {
return computeSequentially();
}
// 创建一个子任务来为数组的前半部分求和
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
// 利用另一个ForkJoinPool线程异步执行新创建的子任务
leftTask.fork();
// 创建一个任务为数组的后半部分求和
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
// 同步执行第二个子任务(有可能允许进一步递归划分)
Long rightResult = rightTask.compute();
// 读取第一个子任务的结果,如果尚未完成就等待
Long leftResult = leftTask.join();
// 合并结果
return leftResult + rightResult;
}
}

public static void main(String[] args) {
long[] numbers = LongStream.rangeClosed(1, 100).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);

System.out.println(new ForkJoinPool().invoke(task));
}

关于阈值大小问题:

分出大量的小任务一般来说都是一个好的选择。我们需要让所有的 CPU 内核都有效的运作起来,但实际中每个子任务所花费的时间可能天差地别,这时需要给每个工作线程做一下负载均衡,防止某些线程偷懒。

分支/合并框架用一种称为工作窃取(work stealing)的技术来实现负载均衡。在实际应用中,每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出一个任务开始执行。如果某个线程的队列已经空了,那么它会随机选择一个另外线程的工作队列,然后从该队列的尾巴上偷取一个任务。这个过程一直持续到所有的任务都被完成。 这也就是为什么要划分成许多的小任务而不是少数几个大任务的原因。

在并行流中,Spliterator(可分迭代器)接口用来自动拆分流。

第八章 重构、测试和调试

使用 Lambda 重构面向对象的设计模式

策略模式

策略模式代表了解决一类算法的通用解决方案,你可以在运行时选择使用哪种方案。

策略模式包含三个部分:

  1. 一个代表某个算法的接口(它是策略模式的接口)。
  2. 一个或多个该接口的具体实现,它们代表了算法的多种实现。
  3. 一个或多个使用策略对象的客户。

Lambda 重构后,避免了采用策略模式时僵化的模板代码(多个接口的实现类):

1
2
Validator numericValidator = new Validator((String s) -> s.matches("[a-z]+"));
Validator lowerCaseValidator = new Validator((String s) -> s.matches("d+"));

模板方法模式

模板方法模式在你“希望使用某个算法,但是需要对其中的某些行进行改进,才能达到希望的效果”时是非常有用的。

Lambda 创建算法框架,让具体的实现插入某些部分。你想要插入的不同算法组件可以通过 Lambda 表达式或者方法引用的方式实现。

1
2
3
4
5
6
public void processCustomer(int id, Consumer<Customer> makeCustomerHappy) {
Customer c = Database.getCustomerWithID(id);
makeCustomerHappy.accept(c);
}

new OnlineBankingLambda().processCustomer(1337, (Customer c) -> System.out.println("Hello " + c.getName());

观察者模式

某些事件发生时(比如状态改变),如果一个对象(通常我们称之为主题)需要自动地通知其他多个对象(称为观察者),就会采用该方案。例如:在图像用户界面上的按钮点击事件绑定多个处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
// 观察者接口
interface Observer {
void notify(String msg);
}

// 消息接口
interface Subject {
// 用来注册观察者
void registerObserver(Observer o);
// 遍历观察者列表,通知观察者有新消息
void notifyObservers(String tweet);
}

由于观察者接口只包含一个方法notify()。使用 Lambda 后,无需显式的实例化观察者对象,直接传递需要执行的行为即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Feed implements Subject {...}

Feed f = new Feed();

f.registerObservers((String tweet) -> {
if (tweet != null && tweet.contains("money")){
...
}
});

f.registerObserver((String tweet) -> {
if (tweet != null && tweet.contains("queen")) {
...
}
});

责任链模式

是一种创建处理对象序列(比如操作序列)的通用方案。一个处理对象可能需要在完成一些工作之后,将结果传递给另一个对象,这个对象接着做一些工作,再转交给下一个对象,以此类推。

这个模式看起来像在链接函数,为了链接这些函数,可以使用 Lambda 的函数复合形式:

1
2
3
4
5
6
7
8
UnaryOperator<String> headerProcessing =
(String text) -> "From Raoll, Mario and Alan: " + text;
UnaryOperator<String> spellCheckerProcessing =
(String text) -> text.replaceAll("labda", "lambda");

Function<String, String> pipeline = headerProcessing.andThen(spellCheckerProcessing);

String result = pipeline.apply("Aren''t labdas really sexy?");

工厂模式

使用工厂模式,你无须向客户暴露实例化的逻辑就能完成对象的创建。

1
2
3
4
5
6
7
8
9
10
11
public class productFactory {
public static Product createProduct(String name) {
switch (name) {
case "iron": return Iron();
case "steel": return Steel();
default: throw new RuntimeException("No such product " + name);
}
}
}

Product p = productFactory.createProduct("iron");

使用 Lambda 表达式:

1
2
3
4
5
6
7
8
9
10
11
final static Map<String, Supplier<Product>> map = new HashMap<>();
static {
map.put("iron", Iron::new);
map.put("steel", Steel::new);
}

public static Product createProduct(String name) {
Supplier<Product> p = map.get(name);
if (p != null) return p.get();
throw new IllegalArgumentException("No such product " + name);
}

第九章 默认方法

Java 8 引入了一个新的功能,叫默认方法,通过默认方法你可以指定接口方法的默认实现。因此,实现接口的类如果不显式地提供该方法的具体实现,就会自动继承默认的实现。

1
2
3
4
// 使用default修饰
default void sort(Comparator<? super E> c) {
Collections.sort(this, c);
}

默认方法的主要目标用户是类库的设计者,默认方法的引入就是为了以兼容的方式解决像 Java API 这样的类库的演进问题的。

同时定义接口以及辅助工具类是常用的一种模式,工具类定义了与接口实例协作的很多静态方法。由于静态方法可以存在于接口内部,你代码中的辅助类就没有了存在的必要,你可以把这些静态方法转移到接口内部。但是,为了保持向后的兼容性,这些类依然会存在于 Java 应用程序的接口之中。

向接口添加新方法是二进制兼容的,这意味着如果不重新编译该类,即使不实现新的方法,现有的类依旧可以运行。

默认方法的使用模式

可选方法:

1
2
3
4
5
6
7
interface Iterator<T> {
boolean hasNext();
T next();
default void remove() {
throw UnsupportedOperationException();
}
}

行为多继承:

1
2
3
4
public class ArrayList<E> extends AbstractList<E>
implements List<E>, RandomAccess, Cloneable, Serializable, Iterable<E>, Collection<E> {
...
}

注意:继承不应该成为代码复用的万金油,如果你从一个拥有 100 个或更多方法及字段的类进行继承,那么就引入了不必要的复杂性。这时,你完全可以使用代理即创建一个方法通过该类的成员变量直接调用该类的方法。这就是为什么有的类刻意声明为final的一个意图。

这种思想也适用于使用了默认方法的接口。通过精简接口,我们能获得最有效的组合。

解决冲突

如果一个类使用相同的函数签名从多个地方(比如另一类或接口)继承了方法,通过三条规则可以判断该方法的归属:

  1. 类中方法优先级最高
  2. 接下来,子接口优先级最高
  3. 接下来,显式地选择使用哪一个默认方法的实现

显式选择方法:

1
2
3
4
5
public class C implements B, A {
void hello() {
B.super.hello();
}
}

菱形继承问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface A {
default void hello() {
System.out.println("Hello from A");
}
}

public interface B extends A {}
public interface C extends A {}

public class D implements B, C {
public static void main(String[] args) {
new D().hello(); // =>?
}
}
  1. 只有 A 声明了一个默认方法,所以打印:“Hello from A”
  2. 如果 B 也通用声明了一个同样的默认方法,则 B 比 A 更具体,打印“Hello from B”
  3. 如果 B、C 都声明了同样的默认方法,则需要显式的指定,否则编译错误。

第十章 用 Optional 取代 null

为了解决访问空对象(null)的属性或方法从而产生NullPointerException的问题。

null 带来的种种问题

  1. 它是错误之源:NullPointerException 是目前 Java 程序开发中最典型的异常。
  2. 它会使你的代码膨胀:代码中充斥着深度嵌套的 null 检查。
  3. 它自身是毫无意义的:null 没有任何语义。
  4. 它破坏了 Java 的哲学:Java 一直试图避免让程序员意识到指针的存在,唯一的例外是:null 指针。
  5. 它在 Java 的类型系统上开了个口子:null 并不属于任何类型,这意味着它可以被赋值给任意引用类型的变量。当这个变量被传递到系统中的另外一个部分后,你讲无法获知这个 null 变量最初的赋值到底是什么类型。

Optional 类入门

Java 8 从 Scala 的optional值的想法中吸取了灵感,引入了一个名为java.util.Optional<T>的新类。

变量存在时,Optional 类只是对类的简单封装。变量不存在时,缺失的值会被建模成一个“空”的 Optional 对象,有Optional.empty()返回。Optional.empty()是一个静态工厂方法,它返回 Optional 类的特定单一实例。

如果你尝试解引用一个 null,一定会触发 NullPointerException,但是使用 Optional.empty()就没事,因为它是 Optional 类的一个有效对象。

在语义上面,也清楚的说明变量缺失是被允许的。它帮助你更好地设计 API,让程序员看到方法签名,就能了解它是否接受一个 Optional 的值。

创建 Optional

1
2
3
4
5
6
// 声明一个空的Optional
Optional<Car> optCar = Optional.empty();
// 依据非空值创建Optional
Optional<Car> optCar = Optional.of(car);
// 可接受null的Optional
Optional<Car> optCar = Optional.ofNullable(car);

获取 Optional 值

1
2
3
// 使用map()从Optional对象中提取和转换值
Optional<Insurance> optInsurance = Optional.ofNullable(insurance);
Optional<String> name = optInsurance.map(Insurance::getName);
1
2
3
4
5
6
7
8
9
10
11
12
13
Optional<Person> optPerson = Optional.of(person);
Optional<String> name = optPerson.map(Person::getCar)
.map(Car::getInsurance)
.map(Insurance::getName);
// 无法通过编译,因为getCar()返回的是一个
// Optional<Car>类型的对象,意味着map操作的结果是一个
// Optional<Optional<Car>>类型的对象。它对getInsurance的调用是非法的。

// 使用flatmap()链接Optional对象
String name = optPerson.flatMap(Person::getCar)
.flatMap(Car::getInsurance)
.map(Insurance::getName)
.orElse("Unknown");

Java 语言的架构师 Brian Goetz 曾经非常明确地陈述过,Optional 的设计初衷仅仅是要支持能返回 Optional 对象的语法。在设计之初就没特别考虑将其作为类的字段使用,所以它也并未实现 Serializable 接口。

Optional 类方法

方法 说明
empty() 返回一个空的 Optional 实例
of() 将指定值用 Optional 封装后返回,如果该值为 null 则抛 NullPointerException 异常
ofNullable() 将指定值用 Optional 封装后返回,如果该值为 null 则返回一个空的 Optional 对象
map() 如果值存在,就对该值执行提供的 mapping 函数
flatMap() 如果值存在,就对该值执行提供的 mapping 函数,返回一个 Optional 类型的值,否则返回空的 Optional 对象
get() 如果变量存在,直接返回封装的变量值,否则抛 NoSuchElementException 异常。除非你确定 Optional 一定包含值,否则使用该方法是个相当糟糕的注意
orElse() 允许在 Optional 不包含值时提供默认值
orElseGet() 是 orElse()的延迟调用版本。如果创建默认值是一件费事的工作,你应该考虑此种方法提高程序性能
orElseThrow() 和 get()类似但可以自定义抛出的异常
ifPresent() 在变量值存在时执行传入的操作,否则不进行任何操作
isPresent() 变量值存在时返回 true,否则返回 false
filter() 如果 Optional 对象的值存在且符合传入的谓词条件,则该方法就返回其值,否则返回一个空的 Optional 对象

第十一章 组合式异步编程

Future 接口

在 Java 5 中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 创建Executor service 通过它你可以向线程池提交任务
ExecutorService executor = Executors.newCachedThreadPool();
// 向Executor service提交一个Callable对象
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
// 以异步方式在新的线程中执行耗时的操作
return doSomethingComputation();
}
});

// 异步操作进行的同时,你可以做其他的事情
doSomethingElse();

try {
// 获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟后退出。
Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
// 计算抛出了一个异常
} catch (InterruptedException ie) {
// 当前线程在等待过程中被中断
} catch (TimeoutException te) {
// 在Future对象完成之前超过已过期
}

这种编程方式让你的线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。

如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。

实现异步 API

将同步方法转换为异步方法:

1
public Future<Double> getPriceAsync(String product) {...}

Future接口是一个暂时还不知道值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。

因为这样的设计,上述异步方法getPriceAsync能够在调用后立即返回,让调用线程去执行其他的任务。

新的CompletableFuture类提供了大量的方法,让我们有机会以多种可能的方式轻松地实现这个方法,下面为一段实现的实例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();

// 在另一个线程中以异步方式执行计算
new Thread(() -> {
double price = calculatePrice(product);
// 需长时间计算的任务结束并得出结果时,
// 设置Future的返回值。
futurePrice.complete(price);
}).start();
// 无需等待还没有结束的计算,直接返回Future对象。
return futurePrice;
}

或者使用CompletableFutre类的工厂方法supplyAsync创建CompletableFuture对象:

1
2
3
4
5
6
7
public Future<Double> getPriceAsync(String product) {
// supplyAsync()接受一个生产者作为参数,返回一个CompletableFuture对象
// 生产者方法会交由ForkJoinPool池中的某个执行线程执行
// 也可以使用该方法的重载版本,指定执行生产者的线程
// 该方法创建的CompletableFuture对象和上面的是等价的,并具有错误处理机制
return CompletableFuture.supplyAsync(() -> calculatePrice());
}

使用异步 API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Shop shop = new Shop("BestSop");
long start = System.nanoTime();
Future<Double> futurePrice = shop.getPriceAsync("my favourite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime + "msecs");

// 执行更多任务,比如查询其他商店
doSomethingElse();

try {
// 从Future对象中读取价格,如果价格未知,会发生阻塞。
double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (Exception e) {
throw new RuntimeException(e);
}

long retrievalTime = ((System.nanoTIme() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");

错误处理

如果在异步线程中发生错误,用于提示错误的异常会被限制在异步线程的范围内,最终会杀死该线程,而这会导致等待get方法(无超时设置)返回结果的客户端永久地被阻塞。

如果客户端可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况,这是一种被推荐的做法。使用这种方法至少能防止程序永久地等待下去。超时发生时,程序会得到通知发生了TimeoutException。但这种方式并不能了解异步线程失败的原因。

为了能让客户端知道异步线程为何执行失败,我们可以使用CompletableFuturecompleteExceptionally方法将问题抛出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new Completable<>();

new Thread(() -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception e) {
futurePrice.completeException(e);
}
}).start();

return futurePrice;
}

使用定制的执行器

线程池中的线程数目大小取决于你的应用需要处理的负荷大小,在《Java 并发编程实战》一书中,Brian Goetz 建议线程池大小与处理器的利用率之比可以使用下面的公式进行估算:

1
2
3
4
5
6
T = C * R * (1 + W/C)

T : 线程数目
C : 处理器核的数目
R : 期望的CPU利用率(0~1)
W/C : 等待时间与计算时间的比率

实际操作中,是按照任务量的多少,大致定义线程池的大小范围。

定制执行器:

1
2
3
4
5
6
7
8
9
10
private final Executor executor =
Executor.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
// 使用守护线程
// 这种方式不会阻止程序的关闭
t.setDaemon(true);
return t;
}
});

使用定制执行器:

1
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);

并行还是 CompletableFuture

目前,我们已经知道对集合进行并行计算有两种方式:

  1. 将其转换为并行流,利用 map 这样的操作展开工作。
  2. 枚举出集合中的每一个元素,创建新的线程,在 CompletableFuture 内对其进行操作。

后者提供了更多的灵活性,可以调整线程池的大小,这样能帮助我们确保整体的计算不会因为线程都在等待 I/O 而发生阻塞。

具体使用哪种方式建议如下:

  • 计算密集型,没有 I/O,推荐使用流接口。
  • 有等待 I/O 的操作,那么使用 CompletableFuture 更灵活。不使用流的另一个原因是:处理流水线中如果发生了 I/O 等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

对多个异步任务进行流水线操作

同步顺序流:

1
2
3
4
5
6
7
8
// 耗时10s
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> shop.getPrice(product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(toList());
}

构造同步和异步操作:

1
2
3
4
5
6
7
8
9
10
11
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.applyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote -> CompletableFuture.applyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(toList());

return pricesFutures.stream()
.map(CompletableFuture::join)
.collect(toList());
}
作者

m3m0ry

发布于

2020-04-11

更新于

2020-09-20

许可协议

评论