bloom filter

当系统设计中出现多级缓存结构时,为了防止大量不存在的key值击穿高速缓存(比如主存),去直接访问低速缓存(如本地磁盘),我们一般需要将这部分key值,直接拦截在高速缓存阶段。这里,当然可以使用普通的hash table,也可以使用bitmap,但是这两种方式都比较耗费内存,当面对海量key值时,问题会变得更加严重。这时,就该介绍我们的主角bloom filter出场了。

一般的,bloom filter用于判断一个key值是否在一个set中,拥有比hash table/bitmap更好的空间经济性。如果bloom filter指示一个key值“不在”一个set中,那么这个判断是100%准确的。这样的特性,非常适合于上述的缓存场景。

bloom filter原理

  • 首先估计要判断的set中的元素个数N,然后选定k个独立的哈希函数。根据N和k,选定一个长度为M的bit array。

  • 遍历set中的N个元素

    • 对每个元素,使用k个哈希函数,得到k个哈希值(一般为一个大整数)
    • 将上述bit array中,k个哈希值所对应的bit置1
  • 对于需要判断的key值
    • 使用k个哈希函数,得到k个哈希值
    • 如果k个哈希值所对应的bit array中的值均为1,则判断此值在set中“可能”存在;否则,判定“一定”不存在

根据上面的原理我们其实可以看到,bloom filter有以下特点:

  • 比较节省空间
  • bloom的识别准确率和数据大小,k个哈希函数有关
  • 如果bloom filter判断key不存在,那么就一定不存在,100%不存在。
  • 如果bloom filter判断key存在,那么可能存在,也可能不存在

bloom filter优缺点

优点:

  • 插入、查找都是常数时间

  • 多个hash函数之间互相独立,可以并行计算

  • 不需要存储元素本身,从而带来空间效率优势,以及一些保密上的优势

  • bloom filter的bitmap可以进行交、并、差运算

缺点:

  • 判断元素是否在集合中的结果其实是不准确的
  • bloom filter中的元素是不能删除的

bloom filter的实际使用

guava bloom filter

guava中提供了bloom filter的一种实现:com.google.common.hash.BloomFilter,可以方便我们在单机情况下使用bloom filter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
/**
* A Bloom filter for instances of {@code T}. A Bloom filter offers an approximate containment test
* with one-sided error: if it claims that an element is contained in it, this might be in error,
* but if it claims that an element is <i>not</i> contained in it, then this is definitely true.
*
* <p>If you are unfamiliar with Bloom filters, this nice
* <a href="http://llimllib.github.com/bloomfilter-tutorial/">tutorial</a> may help you understand
* how they work.
*
* <p>The false positive probability ({@code FPP}) of a bloom filter is defined as the probability
* that {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that
* has not actually been put in the {@code BloomFilter}.
*
* <p>Bloom filters are serializable. They also support a more compact serial representation via the
* {@link #writeTo} and {@link #readFrom} methods. Both serialized forms will continue to be
* supported by future versions of this library. However, serial forms generated by newer versions
* of the code may not be readable by older versions of the code (e.g., a serialized bloom filter
* generated today may <i>not</i> be readable by a binary that was compiled 6 months ago).
*
* @param <T> the type of instances that the {@code BloomFilter} accepts
* @author Dimitris Andreou
* @author Kevin Bourrillion
* @since 11.0
*/
@Beta
public final class BloomFilter<T> implements Predicate<T>, Serializable {

/** The bit set of the BloomFilter (not necessarily power of 2!) */
private final BitArray bits;

/** Number of hashes per element */
private final int numHashFunctions;

/** The funnel to translate Ts to bytes */
private final Funnel<? super T> funnel;

/**
* The strategy we employ to map an element T to {@code numHashFunctions} bit indexes.
*/
private final Strategy strategy;

/**
* Creates a BloomFilter.
*/
private BloomFilter(
BitArray bits, int numHashFunctions, Funnel<? super T> funnel, Strategy strategy) {
checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
checkArgument(
numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
this.bits = checkNotNull(bits);
this.numHashFunctions = numHashFunctions;
this.funnel = checkNotNull(funnel);
this.strategy = checkNotNull(strategy);
}


/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and
* expected false positive probability.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @param fpp the desired false positive probability (must be positive and less than 1.0)
* @return a {@code BloomFilter}
*/
public static <T> BloomFilter<T> create(
Funnel<? super T> funnel, int expectedInsertions, double fpp) {
return create(funnel, (long) expectedInsertions, fpp);
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and
* expected false positive probability.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @param fpp the desired false positive probability (must be positive and less than 1.0)
* @return a {@code BloomFilter}
* @since 19.0
*/
public static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp) {
return create(funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64);
}

@VisibleForTesting
static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
checkNotNull(funnel);
checkArgument(
expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
checkNotNull(strategy);

if (expectedInsertions == 0) {
expectedInsertions = 1;
}
/*
* TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size
* is proportional to -log(p), but there is not much of a point after all, e.g.
* optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares!
*/
long numBits = optimalNumOfBits(expectedInsertions, fpp);
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
try {
return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
}
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and a
* default expected false positive probability of 3%.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @return a {@code BloomFilter}
*/
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions) {
return create(funnel, (long) expectedInsertions);
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and a
* default expected false positive probability of 3%.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @return a {@code BloomFilter}
* @since 19.0
*/
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions) {
return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions
}

// other codes ...
}

基本使用:

1
2
3
4
5
6
7
8
9
10
BloomFilter<Integer> filter = BloomFilter.create(Funnels.integerFunnel(),500,0.01);

filter.put(1);
filter.put(2);
filter.put(3);

assertThat(filter.mightContain(1)).isTrue();
assertThat(filter.mightContain(2)).isTrue();
assertThat(filter.mightContain(3)).isTrue();
assertThat(filter.mightContain(100)).isFalse();

正确估计预期插入数量是很关键的一个参数。当插入的数量接近或高于预期值的时候,布隆过滤器将会填满,这样的话,它会产生很多无用的误报点。

不过也有文章指出,guava的bloom filter在数据量变大以后,准确性大大降低,这个虽然本身就是bloom filter的特性,但是在这篇文章中也给出了一些参考值,大家可以看看: google guava bloom filter包的坑 :

在0.0001的错误率下,插入量不到1.5亿的时候,numBits已经到达了BitArray的最大容量了,这时如果再增加插入量,哈希函数个数就开始退化。到5亿的时候,哈希函数个数退化到了只有3个,也就是说,对每一个key,只有3位来标识,这时准确率就会大大下降。
第一种当然就是减少预期插入量,1亿以内,还是可以保证理论上的准确率的。
第二种,如果你的系统很大,就是会有上亿的key,这时可以考虑拆分,将一个大的bloom filter拆分成几十个小的(比如32或64个),每个最多可以容纳1亿,这时整体就能容纳32或64亿的key了。查询的时候,先对key计算一次哈希,然后取模,查找特定的bloom filter即可。

基于redis的bloom filter

guava中的bloom filter是单机的,如果想使用分布式的的话,可以考虑基于redis 的bloom filter。

主要参考这个文章:ReBloom – Bloom Filter Datatype for Redis

redis 在 4.0 的版本中加入了 module 功能,布隆过滤器可以通过module的形式添加到 redis 中,所以使用 redis 4.0 以上的版本可以通过加载 module 来使用 redis 中的布隆过滤器。但是这不是最简单的方式,使用 docker 可以直接在 redis 中体验布隆过滤器。

1
2
> docker run -d -p 6379:6379 --name bloomfilter redislabs/rebloom
> docker exec -it bloomfilter redis-cli

redis 布隆过滤器主要就两个命令:

  • bf.add 添加元素到布隆过滤器中:bf.add urls https://jaychen.cc
  • bf.exists 判断某个元素是否在过滤器中:bf.exists urls https://jaychen.cc

上面说过布隆过滤器存在误判的情况,在 redis 中有两个值决定布隆过滤器的准确率:

  • error_rate:允许布隆过滤器的错误率,这个值越低过滤器的位数组的大小越大,占用空间也就越大。
  • initial_size:布隆过滤器可以储存的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降。

redis 中有一个命令可以来设置这两个值:

1
bf.reserve urls 0.01 100

上面三个参数的含义为:

  • 第一个值是过滤器的名字。
  • 第二个值为 error_rate 的值。
  • 第三个值为 initial_size 的值。

使用这个命令要注意一点:执行这个命令之前过滤器的名字应该不存在,如果执行之前就存在会报错:(error) ERR item exists

参考文章:

使用阿里云maven镜像加速

maven是一个好东西,但是默认情况下,maven使用的是中央仓央是:http://repo1.maven.org/maven2http://uk.maven.org/maven2。这两个镜像在国内
访问其实是比较慢的,因此我们需要尽可能使用国内同步好的镜像。

我在国内选择的是阿里云的镜像:公共代理库

maven的配置为:打开maven的配置文件(windows机器一般在maven安装目录的conf/settings.xml),在<mirrors></mirrors>标签中添加mirror子节点:

1
2
3
4
5
6
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

其他的如gradle的配置指南请参见公共代理库中描述的那样操作就好了。

但是一般情况下在公司开发的时候,公司也会有自己的maven镜像仓库,这个时候搞多个mirror就好了。

参考资料:

java.nio.ByteBuffer

Java NIO Buffers用于和NIO Channel交互。 我们从Channel中读取数据到buffers里,从Buffer把数据写入到Channels。Buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。java中java.nio.Buffer的常见实现类如下,不过我们这里只说一下ByteBuffer这个实现。

java中`java.nio.Buffer`的常见实现类

Buffer的重要属性

Buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据,为了便于理解,你可以把它理解为一个字节数组。它有有四个重要属性:

1
2
3
4
5
6
7
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
}
  • capacity
    • 这个属性表示这个Buffer最多能放多少数据,在创建buffer的时候指定。int类型。
  • position
    下一个要读写的元素位置(从0开始),当使用buffer的相对位置进行读/写操作时,读/写会从这个下标进行,并在操作完成后,buffer会更新下标的值。
    • 写模式:当写入数据到Buffer的时候需要从一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.
    • 读模式:当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归0,每次读取后,position向后移动。
  • limit
    在Buffer上进行的读写操作都不能越过这个limit。
    • 写模式:limit的含义是我们所能写入的最大数据量,它等同于buffer的容量capacity
    • 读模式:limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。换句话说,您可以读取与写入数量相同的字节数。
  • mark
    • 一个临时存放的位置下标,用户选定的position的前一个位置或-1。
      • 调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设
        置为mark的值。mark的值总是小于等于position的值,如果将position的值设的比mark小,当前的mark值会被抛弃掉。

注:

  • position和limit之间的距离指示了可读/存的字节数。
  • boolean hasRemaining():当缓冲区至少还有一个元素时,返回true。
  • int remaining():position和limit之间字节个数。

这些属性总是满足以下条件:

0 <= mark <= position <= limit <= capacity

通过上面的描述可以看出,其中position和limit的具体含义取决于当前buffer的模式(读模式还是写模式)。capacity在两种模式下都表示容量。

Buffer的常见API

  • ByteBuffer allocate(int capacity)
    • 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,实现类是HeapByteBuffer
  • ByteBuffer allocateDirect(int capacity)
    • 类似于allocate方法,不过使用的是对外内存,实现类是DirectByteBuffer
  • ByteBuffer wrap(byte[] array)
    • 把byte数组包装为ByteBuffer,bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方。其实ByteBuffer底层本来就有一个bytes数组负责来保存buffer缓冲区中的数据,通过allocate方法系统会帮你构造一个byte数组,实现类是HeapByteBuffer
  • ByteBuffer wrap(byte[] array, int offset, int length)
    • 在上一个方法的基础上可以指定偏移量和长度,buffer的capacity就是array.length,这个offset也就是包装后byteBuffer的position,limit=length+position(offset),mark=-1实现类是HeapByteBuffer
  • abstract Object array()
    • 返回支持此缓冲区的数组 (可选操作)
  • abstract int arrayOffset()
    • 返回此缓冲区中的第一个元素在缓冲区的底层实现数组中的偏移量(可选操作)。调用此方法之前要调用hasarray方法,以确保此缓冲区具有可访问的底层实现数组。
  • abstract boolean hasArray()
    • 告诉这个缓冲区是否由可访问的数组支持
  • int capacity()
    • 返回此缓冲区的容量
  • Buffer clear()
    • 清除此缓存区。将position = 0;limit = capacity;mark = -1;把position设为0,一般在把数据写入Buffer前调用。
  • Buffer flip()
    • flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。一般在从Buffer读出数据前调用。
  • abstract boolean isDirect()
    • 判断个缓冲区是否为direct, 也就是是否是对外内存
  • abstract boolean isReadOnly()
    • 判断告知这个缓冲区是否是只读的
  • int limit()
    • 返回此缓冲区的limit的属性值
  • Buffer position(int newPosition)
    • 设置这个缓冲区的位置
  • boolean hasRemaining()
    • return position < limit,返回是否还有未读内容
  • int remaining()
    • return limit - position; 返回limit和position之间相对位置差
  • Buffer rewind()
    • 把position设为0,mark设为-1,不改变limit的值,一般在把数据重写入Buffer前调用
  • Buffer mark()
    • 设置mark的值,mark=position,做个标记
  • Buffer reset()
    • 还原标记,position=mark。
  • ByteBuffer compact()
    • 该方法的作用是将 position 与 limit之间的数据复制到buffer的开始位置,与limit 之间没有数据的话发,就不会进行复制。一个例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
例如:ByteBuffer.allowcate(10); 
内容:[0 ,1 ,2 ,3 4, 5, 6, 7, 8, 9]
## compact前
[0 ,1 ,2 , 3, 4, 5, 6, 7, 8, 9]
pos=4
lim=10
cap=10
## compact后
[4, 5, 6, 7, 8, 9, 6, 7, 8, 9]
pos=6
lim=10
cap=10
  • ByteBuffer slice();
    • 创建一个分片缓冲区。分片缓冲区与主缓冲区共享数据。 分配的起始位置是主缓冲区的position位置,容量为limit-position。 分片缓冲区无法看到主缓冲区positoin之前的元素。

创建buffer的注意点

创建ByteBuffer可以使用allocate或者wrap,就像下面这样:

  • ByteBuffer allocate(int capacity)
  • ByteBuffer allocateDirect(int capacity)
  • ByteBuffer wrap(int capacity)
  • ByteBuffer wrap(byte[] array,int offset,int length)

需要注意的是:创建的缓冲区都是定长的,大小无法改变。若发现刚创建的缓冲区容量太小,只能重新创建一个合适的

关于ByteBuffer wrap(byte[] array,int offset,int length)这个方法,这里再强调一下,这样创建的ByteBuffer的capacity和array的大小是一样的,buffer的position是offset, buffer的limit是offset+length,position之前和limit之后的数据依然可以访问到。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Demo {
public static void main(String args[]){
byte arr[]=new byte[100];
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
System.out.println("Capacity is: "+buffer.capacity());
System.out.println("Position is: "+buffer.position());
System.out.println("limit is: "+buffer.limit());
}
}
//结果:
Capacity is: 100
Position is: 3
limit is: 28

allocate和wrap的区别

wrap只是简单地创建一个具有指向被包装数组的引用的缓冲区,该数组成为后援数组。对后援数组中的数据做的任何修改都将改变缓冲区中的数据,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String args[]){  
byte arr[]=new byte[100];
//将arr数组全部置为1
Arrays.fill(arr, (byte)1);
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
//对后援数组中的数据做的任何修改都将改变缓冲区中的数据
arr[0]=(byte)2;
buffer.position(0);
System.out.println(buffer.get());
//在缓冲区上调用array()方法即可获得后援数组的引用。
System.out.println(Arrays.toString(buffer.array()));
}
//运行结果:
2
[2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ....]//总共100个元素

allocate则是创建了自己的后援数组,在缓冲区上调用array()方法也可获得后援数组的引用。通过调用arrayOffset()方法,可以获取缓冲区中第一个元素在后援数组的偏移量。但是使用wrap创建的ByteBuffer,调用arrayOffset永远是0。

1
2
3
4
5
6
7
8
9
10
11
12
13
 public static void main(String args[]){  
ByteBuffer buffer=ByteBuffer.allocate(100);
//对后援素组的修改也可以反映到buffer上
byte arr[]=buffer.array();
arr[1]=(byte)'a';
buffer.getInt();

System.out.println(Arrays.toString(buffer.array()));
System.out.println(buffer.arrayOffset());
}
//运行结果:
[0, 97, 0, 0, 0, 0, 0, 0,...]//总共100个元素
0

通过ByteBuffer allocateDirect(int capacity)创建的叫直接缓冲区,使用的堆外内存。可以通过isDirect()方法查看一个缓冲区是否是直接缓冲区。由于直接缓冲区是没有后援数组的,所以在其上面调用array()或arrayOffset()都会抛出UnsupportedOperationException异常。注意有些平台或JVM可能不支持这个创建直接缓冲区。

图解

put

写模式下,往buffer里写一个字节,并把postion移动一位。写模式下,一般limit与capacity相等。

bytebuffer-put

flip

写完数据,需要开始读的时候,将postion复位到0,并将limit设为当前postion。

bytebuffer-flip

get

从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。

bytebuffer-get

clear

将position置为0,并不清除buffer内容。

bytebuffer-clear

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Demo {
private static final int SIZE = 1024;

public static void main(String[] args) throws Exception {
// 获取通道,该通道允许写操作
FileChannel fc = new FileOutputStream("data.txt").getChannel();
// 将字节数组包装到缓冲区中
fc.write(ByteBuffer.wrap("Some text".getBytes()));
// 关闭通道
fc.close();

// 随机读写文件流创建的管道
fc = new RandomAccessFile("data.txt", "rw").getChannel();
// fc.position()计算从文件的开始到当前位置之间的字节数
System.out.println("此通道的文件位置:" + fc.position());
// 设置此通道的文件位置,fc.size()此通道的文件的当前大小,该条语句执行后,通道位置处于文件的末尾
fc.position(fc.size());
// 在文件末尾写入字节
fc.write(ByteBuffer.wrap("Some more".getBytes()));
fc.close();

// 用通道读取文件
fc = new FileInputStream("data.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
// 将文件内容读到指定的缓冲区中
fc.read(buffer);
//此行语句一定要有, 如果没有,就是从文件最后开始读取的,当然读出来的都是byte=0时候的字符。通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char)buffer.get());
}
fc.close();
}
}

java中的zero copy

在web应用程序中,我们经常会在server和client之间传输数据。比如server发数据给client,server首先将数据从硬盘读出之后,然后原封不动的通过socket传输给client,大致原理如下:

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

下面的例子展示了传统的数据复制实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class TraditionalClient {


public static void main(String[] args) {

int port = 2000;
String server = "localhost";
Socket socket = null;
String lineToBeSent;

DataOutputStream output = null;
FileInputStream inputStream = null;
int ERROR = 1;


// connect to server
try {
socket = new Socket(server, port);
System.out.println("Connected with server " +
socket.getInetAddress() +
":" + socket.getPort());
}
catch (UnknownHostException e) {
System.out.println(e);
System.exit(ERROR);
}
catch (IOException e) {
System.out.println(e);
System.exit(ERROR);
}

try {
String fname = "sendfile/NetworkInterfaces.c";
inputStream = new FileInputStream(fname);

output = new DataOutputStream(socket.getOutputStream());
long start = System.currentTimeMillis();
byte[] b = new byte[4096];
long read = 0, total = 0;
while ((read = inputStream.read(b)) >= 0) {
total = total + read;
output.write(b);
}
System.out.println("bytes send--" + total + " and totaltime--" + (System.currentTimeMillis() - start));
}
catch (IOException e) {
System.out.println(e);
}

try {
output.close();
socket.close();
inputStream.close();
}
catch (IOException e) {
System.out.println(e);
}
}
}

这种操作看起来可能不会怎么消耗CPU,但是实际上它是低效的。因为传统的 Linux 操作系统的标准 I/O 接口是基于数据拷贝操作的,即 I/O 操作会导致数据在操作系统内核地址空间的缓冲区和应用程序地址空间定义的缓冲区之间进行传输。如下图:

Traditional data copying approach

  • 数据首先被从磁盘读取到内核的read buffer
  • 然后在从内核的read buffer中复制到应用程序的buffer中
  • 然后在从应用程序的buffer中复制到内核的socket buffer
  • 最后在从内核的socket buffer中复制到网卡中

然后其中涉及了4次上下文切换:

Traditional context switches

分析上面的描述,我们可以看到kernel buffer其实在这个过程中充当了一个ahead cache。之所以引入这个kernel buffer其实是在很多的情况下是可以减少磁盘 I/O 的操作,进而提升效率的。

  • 比如对于读请求,如果我们所请求的数据的大小小于kernel buffer并且如果要读取的数据已经存放在操作系统的高速缓冲存储器中,那么就不需要再进行实际的物理磁盘 I/O 操作。直接从kernel buffer中读取就好了。
  • 对于写请求:利用这个ahead cache可以实现异步写操作

但是没有银弹,这样会带来一个问题就是,如果当我们请求的数据量的大小远大于kernel buffer的大小的话,这种情况下kernel buffer的存在反而会导致数据在kernel buffer用户缓冲区之间多次复制。

java中的zero copy

如果我们想在java中使用zero copy,我们一般会用java.nio.channels.FileChannel类中的transferTo()方法。下面是它的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Transfers bytes from this channel's file to the given writable byte
* channel.
*
* <p> An attempt is made to read up to <tt>count</tt> bytes starting at
* the given <tt>position</tt> in this channel's file and write them to the
* target channel. An invocation of this method may or may not transfer
* all of the requested bytes; whether or not it does so depends upon the
* natures and states of the channels. Fewer than the requested number of
* bytes are transferred if this channel's file contains fewer than
* <tt>count</tt> bytes starting at the given <tt>position</tt>, or if the
* target channel is non-blocking and it has fewer than <tt>count</tt>
* bytes free in its output buffer.
*
* <p> This method does not modify this channel's position. If the given
* position is greater than the file's current size then no bytes are
* transferred. If the target channel has a position then bytes are
* written starting at that position and then the position is incremented
* by the number of bytes written.
*
* <p> This method is potentially much more efficient than a simple loop
* that reads from this channel and writes to the target channel. Many
* operating systems can transfer bytes directly from the filesystem cache
* to the target channel without actually copying them. </p>
*
* @param position
* The position within the file at which the transfer is to begin;
* must be non-negative
*
* @param count
* The maximum number of bytes to be transferred; must be
* non-negative
*
* @param target
* The target channel
*
* @return The number of bytes, possibly zero,
* that were actually transferred
*
* @throws IllegalArgumentException
* If the preconditions on the parameters do not hold
*
* @throws NonReadableChannelException
* If this channel was not opened for reading
*
* @throws NonWritableChannelException
* If the target channel was not opened for writing
*
* @throws ClosedChannelException
* If either this channel or the target channel is closed
*
* @throws AsynchronousCloseException
* If another thread closes either channel
* while the transfer is in progress
*
* @throws ClosedByInterruptException
* If another thread interrupts the current thread while the
* transfer is in progress, thereby closing both channels and
* setting the current thread's interrupt status
*
* @throws IOException
* If some other I/O error occurs
*/
public abstract long transferTo(long position, long count,
WritableByteChannel target)
throws IOException;

transferTo()方法把数据从file channel传输到指定的writable byte channel。它需要底层的操作系统支持zero copy。在UNIX和各种Linux中,会执行系统调用sendfile(),该命令把数据从一个文件描述符传输到另一个文件描述符(Linux中万物皆文件):

1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

因此传统的方式中的

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

可以被transferTo()替代。下面的图展示了使用transferTo(), 也就是zero copy技术后的流程:

 Data copy with transferTo()

Context switching with transferTo()

  • transferTo()方法使文件内容被DMA引擎复制到读缓冲区中。 然后,内核将数据复制到与输出套接字关联的内核缓冲区中。
  • 第三个副本发生在DMA引擎将数据从内核套接字缓冲区传递到协议引擎时。

这是一个很明显的进步:我们把context switch的次数从4次减少到了2次,同时也把data copy的次数从4次降低到了3次(而且其中只有一次占用了CPU,另外两次由DMA完成)。但是,要做到zero copy,这还差得远。

如果网卡支持gather operation,我们可以通过kernel进一步减少数据的拷贝操作。在2.4及以上版本的linux内核中,开发者修改了socket buffer descriptor来适应这一需求。这个方法不仅减少了context switch,还消除了和CPU有关的数据拷贝。使用层面的使用方法没有变,但是内部原理却发生了变化:

Data copies when transferTo() and gather operations are used

  • transferTo()方法使文件内容被DMA引擎复制到内核缓冲区中。
  • 没有数据被复制到套接字缓冲区中。 相反,只有具有有关数据位置和长度信息的描述符才会附加到套接字缓冲区。 DMA引擎将数据直接从内核缓冲区传递到协议引擎,从而消除了剩余的最终CPU副本。

下面这个例子展示了如何使用transferTo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class TransferToClient {

public static void main(String[] args) throws IOException{
TransferToClient sfc = new TransferToClient();
sfc.testSendfile();
}
public void testSendfile() throws IOException {
String host = "localhost";
int port = 9026;
SocketAddress sad = new InetSocketAddress(host, port);
SocketChannel sc = SocketChannel.open();
sc.connect(sad);
sc.configureBlocking(true);

String fname = "sendfile/NetworkInterfaces.c";
long fsize = 183678375L, sendzise = 4094;

// FileProposerExample.stuffFile(fname, fsize);
FileChannel fc = new FileInputStream(fname).getChannel();
long start = System.currentTimeMillis();
long nsent = 0, curnset = 0;
curnset = fc.transferTo(0, fsize, sc);
System.out.println("total bytes transferred--"+curnset+" and time taken in MS--"+(System.currentTimeMillis() - start));
//fc.close();
}
}

参考资料

Iterable和Iterator结合使用的一个小例子

这篇文章主要是记录一下使用Iterable和Iterator用作迭代处理的一个例子。基于这种模式可以很方便的实现
流式处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Array<T> implements Iterable<T> {
T[] values; // this contains the actual elements of the array

// Constructor that takes a "raw" array and stores it
public Array(T[] values) {
this.values = values;
}

// This is a private class that implements iteration over the elements
// of the list. It is not accessed directly by the user, but is used in
// the iterator() method of the Array class. It implements the hasNext()
// and next() methods.
class ArrayIterator implements Iterator<T> {
int current = 0; // the current element we are looking at

// return whether or not there are more elements in the array that
// have not been iterated over.
public boolean hasNext() {
if (current < Array.this.values.length) {
return true;
} else {
return false;
}
}

// return the next element of the iteration and move the current
// index to the element after that.
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return values[current++];
}
}

// Return the value at a given index
public T get(int index) {
return values[index];
}

// Set the value at a given index
public void set(int index, T value) {
values[index] = value;
}

// Return the length of the array
public int length() {
return values.length;
}

// Return an iterator over the elements in the array. This is generally not
// called directly, but is called by Java when used in a "simple" for loop.
public Iterator<T> iterator() {
return new ArrayIterator();
}

// This is just a sample program that can be run to show how the Array
// class might be used.
public static void main(String[] args) {
// create an array of strings
String[] strings = new String[]{"Hello", "World"};

// create a new array to hold these strings
Array<String> array = new Array<String>(strings);

// get and print the first values (prints "Hello")
System.out.println(array.get(0));

// set the second value
array.set(1, "Javaland!");

// iterate over the array, printing "Hello\nJavaland!"
for (String s : array) {
System.out.println(s);
}
}
}

Guava CacheLoader中当load方法返回null

Guava LoadingCache在实际工作中用的还是比较频繁的。但是最近在review代码时,发现有些同学在使用CacheLoader时没有注意到
CacheLoader#load方法的注释:

1
2
3
4
5
6
7
8
9
10
11
/**
* Computes or retrieves the value corresponding to {@code key}.
*
* @param key the non-null key whose value should be loaded
* @return the value associated with {@code key}; <b>must not be null</b>
* @throws Exception if unable to load the result
* @throws InterruptedException if this method is interrupted. {@code InterruptedException} is
* treated like any other {@code Exception} in all respects except that, when it is caught,
* the thread's interrupt status is set
*/
public abstract V load(K key) throws Exception;

源码中明确指出了这个方法不能返回null。但是在review代码时发现很多同学没注意到到这个,而在部分情况下存在返回null的情况。
一般使用Optional封装一下就好了。

这篇文章主要说一下当load方法返回null时会出现什么异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.ExecutionException;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;


public class Test {

public static void main(String[] args) {
LoadingCache cache = CacheBuilder.newBuilder().build(new CacheLoader<Object, Object>() {
@Override
public Object load(Object key) {
return null;
}
});

try {
cache.getUnchecked("asda");
}
catch (Exception e) {
System.out.println("本例子中这里会出现异常 这里会cache住抛出异常");
}

try {
cache.get("adsa");
}
catch (ExecutionException e) {
System.out.println("本例子中不会抛出这个异常");
}catch (Exception e) {
System.out.println("本例子中这里会出现异常 这里会cache住抛出异常");
}

System.out.println("fuck");

}
}

上面的代码分别使用了getUncheckedget方法来测试当load方法返回null的情况。

所以一般出现的问题是使用方可能仅仅cache了ExecutionException,这样会导致异常cache不住。这是一个问题,在某些
情况下会影响程序逻辑。需要注意一下。所以尽可能的使用Optional来封装结果

java日志不打印异常栈

问题描述

今天在排查一个问题的时候发现在日志输出中,只有异常的Message,并没有详细的异常堆栈。

问题解释

对于这个问题的官方解释为:

The compiler in the server VM now provides correct stack backtraces for all “cold” built-in exceptions. For performance purposes, when such an exception is thrown a few times, the method may be recompiled. After recompilation, the compiler may choose a faster tactic using preallocated exceptions that do not provide a stack trace. To disable completely the use of preallocated exceptions, use this new flag: -XX:-OmitStackTraceInFastThrow.

简单的描述就是:

它跟JDK5的一个新特性有关,对于一些频繁抛出的异常,JDK为了性能会做一个优化,即JIT重新编译后会抛出没有堆栈的异常,
而在使用-server模式时,该优化选项是开启的,因此在频繁抛出某个异常一段时间后,该优化开始起作用,即只抛出没有堆栈的异常信息

问题验证

比如下面的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestCompile {
private static final int count = 1000000;

public static void main(String[] args) throws Exception {
int index = count;
while (index-- > 0) {
try {
work();
}
catch (Exception e) {
e.printStackTrace();
}
}
}

private static void work() {
String value = null;
value.length();
}
}

编译后使用java -server -XX:-OmitStackTraceInFastThrow TestCompile运行,发现一直都是类似的stacktrace。

1
2
3
java.lang.NullPointerException
at TestCompile.work(TestCompile.java:25)
at TestCompile.main(TestCompile.java:17)

换成java -server -XX:+OmitStackTraceInFastThrow TestCompile运行一段时间后就会出现

1
2
3
4
java.lang.NullPointerException
java.lang.NullPointerException
java.lang.NullPointerException
java.lang.NullPointerException

这样的exception,说明stacktrace 该优化已经起作用。-XX:+OmitStackTraceInFastThrow选项在-server情况下默认开启。

如何解决

  • 方法1:查看很早之前的日志,那个时候jit的优化还没生效
  • 方法2:重启服务,在重启的以后的一段时间内jit的优化也暂时不会生效
  • 方法3:配置参数-XX:-OmitStackTraceInFastThrow

log4j-api-2.11.1.jar ClassFormatException

今天在部署系统的时候tomcat出现下面的异常信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Mar 08, 2019 4:18:05 PM org.apache.catalina.startup.ContextConfig processAnnotationsJar
SEVERE: Unable to process Jar entry [META-INF/versions/9/module-info.class] from Jar [jar:file:/xxxxx/webapps/ROOT/WEB-INF/lib/log4j-api-2.11.1.jar!/] for annotations
org.apache.tomcat.util.bcel.classfile.ClassFormatException: Invalid byte tag in constant pool: 19
at org.apache.tomcat.util.bcel.classfile.Constant.readConstant(Constant.java:133)
at org.apache.tomcat.util.bcel.classfile.ConstantPool.<init>(ConstantPool.java:60)
at org.apache.tomcat.util.bcel.classfile.ClassParser.readConstantPool(ClassParser.java:209)
at org.apache.tomcat.util.bcel.classfile.ClassParser.parse(ClassParser.java:119)
at org.apache.catalina.startup.ContextConfig.processAnnotationsStream(ContextConfig.java:2134)
at org.apache.catalina.startup.ContextConfig.processAnnotationsJar(ContextConfig.java:2010)
at org.apache.catalina.startup.ContextConfig.processAnnotationsUrl(ContextConfig.java:1976)
at org.apache.catalina.startup.ContextConfig.processAnnotations(ContextConfig.java:1961)
at org.apache.catalina.startup.ContextConfig.webConfig(ContextConfig.java:1319)
at org.apache.catalina.startup.ContextConfig.configureStart(ContextConfig.java:878)
at org.apache.catalina.startup.ContextConfig.lifecycleEvent(ContextConfig.java:376)
at org.apache.catalina.util.LifecycleSupport.fireLifecycleEvent(LifecycleSupport.java:119)
at org.apache.catalina.util.LifecycleBase.fireLifecycleEvent(LifecycleBase.java:90)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5322)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:901)
at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:877)
at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:633)
at org.apache.catalina.startup.HostConfig.deployDirectory(HostConfig.java:1120)
at org.apache.catalina.startup.HostConfig$DeployDirectory.run(HostConfig.java:1678)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

出现这个异常的原因是因为我们的服务器使用的是java8,而工程代码中依赖的log4j-api-2.11.1.jar有部分java9版本的字节码文件。

log4j-api-2.11.1.jar工程结构图

然后排查了发现log4j-api-2.11.1.jar是通过elasticsearch6.5.1引入的。

java Unsafe介绍

Unsafe类对于并发编程来说是个很重要的类,J.U.C里的源码到处充斥着这个类的方法调用。

这个类的最大的特点在于,它提供了硬件级别的CAS原子操作。CAS可以说是实现了最轻量级的锁,当多个线程尝试使用CAS同时更新同一个变量时,只有其中的一个线程能成功地更新变量的值,而其他的线程将失败。然而,失败的线程并不会被挂起。

CAS操作包含了三个操作数: 需要读写的内存位置,进行比较的原值,拟写入的新值

Unsafe类中,实现CAS操作的方法是: compareAndSwapXXX

例如:

1
public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
  • obj是我们要操作的目标对象
  • offset表示了目标对象中,对应的属性的内存偏移量
  • expect是进行比较的原值
  • update是拟写入的新值。
  • 所以该方法实现了对目标对象obj中的某个成员变量(field)进行CAS操作的功能。

那么,要怎么获得目标field的内存偏移量offset呢? Unsafe类为我们提供了一个方法:

1
public native long objectFieldOffset(Field field);

该方法的参数是我们要进行CAS操作的field对象,要怎么获得这个field对象呢?最直接的办法就是通过反射了:

1
2
Class<?> k = FutureTask.class;
Field stateField = k.getDeclaredField("state");

这样一波下来,我们就能对FutureTask的state属性进行CAS操作了o( ̄▽ ̄)o

除了compareAndSwapObject,Unsafe类还提供了更为具体的对int和long类型的CAS操作:

1
2
public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
public native boolean compareAndSwapLong(Object obj, long offset, long expect, long update);

从方法签名可以看出,这里只是把目标field的类型限定成int和long类型,而不是通用的Object.

最后,FutureTask还用到了一个方法:

1
public native void putOrderedInt(Object obj, long offset, int value);

可以看出,该方法只有三个参数,所以它没有比较再交换的概念,某种程度上就是一个赋值操作,即设置obj对象中offset偏移地址对应的int类型的field的值为指定值。这其实是Unsafe的另一个方法putIntVolatile的有序或者有延迟的版本,并且不保证值的改变被其他线程立即看到,只有在field被volatile修饰并且期望被意外修改的时候使用才有用。

那么putIntVolatile方法的定义是什么呢?

1
public native void putIntVolatile(Object obj, long offset, int value);

该方法设置obj对象中offset偏移地址对应的整型field的值为指定值,支持volatile store语义。由此可以看出:

当操作的int类型field本身已经被volatile修饰时,putOrderedInt和putIntVolatile是等价的

FutureTask使用和源码解析

本篇文章说一下java.util.concurrent中的FutureTaskFutureTask是一个同步工具类,它实现了Future语义,表示了一种抽象的可生成结果的计算。在包括线程池在内的许多工具类中都会用到,弄懂它的实现将有利于我们更加深入地理解Java异步操作实现。

下面是FutureTask的类图:

FutureTask类图

在分析它的源码之前, 我们需要先了解一些预备知识。本篇我们先来看看FutureTask中所使用到的接口:RunnableCallableFutureRunnableFuture以及所使用到的工具类ExecutorsUnsafe

FutureTask所使用到的接口

Runnable

创建线程最重要的是传递一个run()方法, 这个run方法定义了这个线程要做什么事情, 它被抽象成了Runnable接口:

1
2
3
4
@FunctionalInterface
public interface Runnable {
public abstract void run();
}

从这里可以看出Runnable最大的问题有下面2个:

  • 没有返回值,我们不能从里面返回相关的处理结果
  • 不能抛出checked exception

而这2个问题,导致我们很多时候使用Runnable其实都会丧失很多的灵活性。而为了解决这两个问题,JDK提供了Callable

Callable

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

对于Runnable可以知道,Callable解决了Runnable的两个最大的问题。但是Callable自己带来了一个问题:就是如何获取返回值。

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Callable<String> myCallable = () -> "This is the results.";
try {
String result = myCallable.call();
System.out.println("Callable 执行的结果是: " + result);
} catch (Exception e) {
System.out.println("There is a exception.");
}
}

这种方式获取的返回值是在当前线程中同步获取的,这种方法确实可以, 但是它存在几个问题:

  • call方法是在当前线程中直接调用的, 无法利用多线程。
  • call方法可能是一个特别耗时的操作, 这将导致程序停在myCallable.call()调用处, 无法继续运行, 直到call方法返回。
  • 如果call方法始终不返回, 我们没办法中断它的运行。

因此, 理想的操作应当是, 我们将call方法提交给另外一个线程执行, 并在合适的时候, 判断任务是否完成, 然后获取线程的执行结果或者撤销任务, 这种思路的实现就是Future接口:

Future

Future接口被设计用来代表一个异步操作的执行结果。你可以用它来获取一个操作的执行结果、取消一个操作、判断一个操作是否已经完成或者是否被取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();

/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口一共定义了5个方法:

  • get()
    • 该方法用来获取执行结果, 如果任务还在执行中, 就阻塞等待;
  • get(long timeout, TimeUnit unit)
    • 该方法同get方法类似, 所不同的是, 它最多等待指定的时间, 如果指定时间内任务没有完成, 则会抛出TimeoutException异常;
  • cancel(boolean mayInterruptIfRunning)
    • 该方法用来尝试取消一个任务的执行, 它的返回值是boolean类型, 表示取消操作是否成功。以下三种情况之一的,cancel操作一定是失败的,返回false:
      • 任务已经执行完成了
      • 任务已经被取消过了
      • 任务因为某种原因不能被取消
    • 值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时任务所处的状态
      • 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
      • 如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:
        • 如果mayInterruptIfRunning为true, 则当前在执行的任务会被中断
        • 如果mayInterruptIfRunning为false, 则可以允许正在执行的任务继续运行,直到它执行完
  • isCancelled()
    • 该方法用于判断任务是否被取消了。如果一个任务在正常执行完成之前被cancel掉了, 则返回true
  • isDone()
    • 如果一个任务已经结束, 则返回true。注意, 这里的任务结束包含了以下三种情况:
      • 任务正常执行完毕
      • 任务抛出了异常
      • 任务已经被取消

RunnableFuture

RunnableFuture接口同时实现了Runnable接口和Future接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask实现了该接口,也就是相当于它同时实现了Runnable接口和Future接口。

FutureTask所使用到的工具类

Executors

Executors 是一个用于创建线程池的工厂类,关于线程池的概念,我们以后再说。这个类同时也提供了一些有用的静态方法。

前面我们提到了Callable接口,它是JDK1.5才引入的,而Runnable接口在JDK1.0就有了,我们有时候需要将一个已经存在Runnable对象转换成Callable对象,Executors工具类为我们提供了这一实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns {@code null}.
* @param task the task to run
* @return a callable object
* @throws NullPointerException if task null
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}

/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

可以明显看出来,这个方法采用了设计模式中的适配器模式,将一个Runnable类型对象适配成Callable类型。

因为Runnable接口没有返回值, 所以为了与Callable兼容, 我们额外传入了一个result参数, 使得返回的Callable对象的call方法直接执行Runnable的run方法, 然后返回传入的result参数。有的同学要说了, 你把result参数传进去, 又原封不动的返回出来, 有什么意义呀?
这样做确实没什么意义, result参数的存在只是为了将一个Runnable类型适配成Callable类型。

Unsafe

关于Java中的Unsafe,请看我的另外一篇文章:java Unsafe介绍

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {

private static final ExecutorService executorService = Executors.newSingleThreadExecutor();

public FutureTask<String> getFutureTask() throws ExecutionException, InterruptedException {
return new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("hello");
return "ok";
}
});
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTaskDemo futureTaskDemo = new FutureTaskDemo();

FutureTask<String> futureTask = futureTaskDemo.getFutureTask();
executorService.execute(futureTask);

Thread.sleep(100);
if (futureTask.isDone()) {
System.out.println(futureTask.get());
}
executorService.shutdown();
}
}

源码解析

Java并发工具类的三板斧: 状态,队列,CAS。 以这三个方面为切入点来看源码,有助于我们快速的看清FutureTask的概貌:

FutureTask状态

FutureTask的源代码中定义了如下的7种状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

为确保不同线程对state修改的可见性,所以statevolatile类型。

state属性是贯穿整个FutureTask的最核心的属性,该属性的值代表了任务在运行过程中的状态,随着任务的执行,状态将不断地进行转变,从上面的定义中可以看出,总共有7种状态:包括了1个初始态,2个中间态和4个终止态。虽说状态有这么多,但是状态的转换路径却只有四种:

FutureTask状态流转

  • 任务的初始状态都是NEW, 这一点是构造函数保证的。
  • 任务的终止状态有4种:

    • NORMAL:任务正常执行完毕
    • EXCEPTIONAL:任务执行过程中发生异常
    • CANCELLED:任务被取消
    • INTERRUPTED:任务被中断
  • 任务的中间状态有2种:

    • COMPLETING 正在设置任务结果
    • INTERRUPTING 正在中断运行任务的线程

值得一提的是,任务的中间状态是一个瞬态,它非常的短暂。而且任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果,所以可以这么说:

只要state不处于 NEW 状态,就说明任务已经执行完毕

注意,这里的执行完毕是指传入的Callable对象的call方法执行完毕,或者抛出了异常。所以这里的COMPLETING的名字显得有点迷惑性,它并不意味着任务正在执行中,而意味着call方法已经执行完毕,正在设置任务执行的结果。而将一个任务的状态设置成终止态只有三种方法:

  • set
  • setException
  • cancel

接着我们来看队列,在FutureTask中,队列的实现是一个单向链表,它表示所有等待任务执行完毕的线程的集合。我们知道,FutureTask实现了Future接口,可以获取“Task”的执行结果,那么如果获取结果时,任务还没有执行完毕怎么办呢?那么获取结果的线程就会在一个等待队列中挂起,直到任务执行完毕被唤醒。

在并发编程中使用队列通常是将当前线程包装成某种类型的数据结构扔到等待队列中,我们先来看看队列中的每一个节点是怎么个结构:

1
2
3
4
5
6
7
8
9
10
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

它只包含了一个记录线程的thread属性和指向下一个节点的next属性。FutureTask中的这个单向链表是当做栈来使用的,确切来说是当做Treiber栈来使用的,不了解Treiber栈是个啥的可以简单的把它当做是一个线程安全的栈, 它使用CAS来完成入栈出栈操作。

为啥要使用一个线程安全的栈呢,因为同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,则这些线程就要被包装成WaitNode扔到Treiber栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用CAS操作保证入栈的线程安全,对于出栈的情况也是同理。

由于FutureTask中的队列本质上是一个Treiber栈,那么使用这个队列就只需要一个指向栈顶节点的指针就行了,在FutureTask中,就是waiters属性:

1
2
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事实上,它就是整个单向链表的头节点。综上,FutureTask中所使用的队列的结构如下:

FutureTask中所使用的队列的结构

CAS操作大多数是用来改变状态的,在FutureTask中也不例外。我们一般在静态代码块中初始化需要CAS操作的属性的偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}

从这个静态代码块中我们也可以看出,CAS操作主要针对3个属性,包括staterunnerwaiters,说明这3个属性基本是会被多个线程同时访问的。

  • state属性代表了任务的状态
  • waiters属性代表了指向栈顶节点的指针,这两个我们上面已经分析过了。
  • runner属性代表了执行FutureTask中的“Task”的线程。

为什么需要一个属性来记录执行任务的线程呢?这是为了中断或者取消任务做准备的,只有知道了执行任务的线程是谁,我们才能去中断它。

定义完属性的偏移量之后,接下来就是CAS操作本身了。在FutureTask,CAS操作最终调用的还是Unsafe类的compareAndSwapXXX方法。

FutureTask的核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

可以看出,FutureTask的核心属性只有5个:

  • state 属性代表了任务的状态
  • callable 属性代表了要执行的任务本身,即FutureTask中的“Task”部分。这里之所以用Callable而不用Runnable是因为FutureTask实现了Future接口,需要获取任务的执行结果。
  • outcome 属性代表了任务的执行结果或者抛出的异常,为Object类型,也就是说outcome可以是任意类型的对象,所以当我们将正常的执行结果返回给调用者时,需要进行强制类型转换, 返回由Callable定义的V类型
  • runner 属性代表了执行FutureTask中的“Task”的线程,这是为了中断或者取消任务做准备。
  • waiters 属性代表了指向栈顶节点的指针

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

FutureTask共有2个构造函数,这2个构造函数一个是直接传入Callable对象, 一个是传入一个Runnable对象和一个指定的result, 然后通过Executors工具类将它适配成callable对象, 所以这两个构造函数的本质是一样的:

  • 用传入的参数初始化callable成员变量
  • 将FutureTask的状态设为NEW

FutureTask接口实现

FutureTask实现了RunnableFuture接口, 因此,它必须实现Runnable和Future接口的所有方法。

Runnable接口实现

要实现Runnable接口, 就得覆写run方法, 我们看看FutureTask的run方法干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
  • 首先检查当前状态是不是New, 并且使用CAS操作将runner属性设置位当前线程,即记录执行任务的线程。可见runner属性是在运行时被初始化的。如果不满足这个条件,则直return。
  • 然后调用Callable对象的call方法来执行任务,如果任务执行成功,就使用set(result)设置结果,否则,用setException(ex)设置抛出的异常。
  • 最后在finally块中,我们将runner属性置为null,并且检查有没有遗漏的中断,如果发现s >= INTERRUPTING, 说明执行任务的线程有可能被中断了,因为s >= INTERRUPTING 只有两种可能,state状态为INTERRUPTINGINTERRUPTED

然后详细说一下其中的set(result)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

这个方法一开始通过CAS操作将state属性由原来的NEW状态修改为COMPLETING状态,我们在一开始介绍state状态的时候说过,COMPLETING是一个非常短暂的中间态,表示正在设置执行的结果。

状态设置成功后,我们就把任务执行结果赋值给outcome, 然后直接把state状态设置成NORMAL,注意,这里是直接设置,没有先比较再设置的操作,由于state属性被设置成volatile,这里putOrderedIntputIntVolatile是等价的,保证了state状态对其他线程的可见性。

在这之后,我们调用了 finishCompletion()来完成执行结果的设置。

接下来我们再来看看发生了异常的版本setException(ex)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

可见,除了将outcome属性赋值为异常对象,以及将state的终止状态修改为EXCEPTIONAL,其余都和set方法类似。在方法的最后,都调用了 finishCompletion()来完成执行结果的设置。那么我们就来看看 finishCompletion()干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

这个方法事实上完成了一个“善后”工作。我们先来看看if条件语句中的CAS操作:

1
UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)

该方法是将waiters属性的值由原值设置为null, 我们知道,waiters属性指向了Treiber栈的栈顶节点,可以说是代表了整个Treiber栈,将该值设为null的目的就是清空整个栈。

然后又进行下一轮for循环,而下一轮for循环的判断条件又是waiters!=null ,由此我们知道,虽然最外层的for循环乍一看好像是什么遍历节点的操作,其实只是为了确保waiters属性被成功设置成null,本质上相当于一个自旋操作

waiters属性设置成null以后,接下了 for (;;)死循环才是真正的遍历节点,可以看出,循环内部就是一个普通的遍历链表的操作,我们前面讲属性的时候说过,Treiber栈里面存放的WaitNode代表了当前等待任务执行结束的线程,这个循环的作用也正是遍历链表中所有等待的线程,并唤醒他们。

Treiber栈中所有挂起的线程都唤醒后,下面就是执行done方法:

1
2
3
4
5
6
7
8
9
10
/**
* Protected method invoked when this task transitions to state
* {@code isDone} (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }

这个方法是一个空方法,从注释上看,它是提供给子类覆写的,以实现一些任务执行结束前的额外操作。
done方法之后就是callable属性的清理了(callable = null)。

然后我们在详细说说finally代码块中的方法。

在finally块中,我们将runner属性置为null,并且检查有没有遗漏的中断,如果发现s >= INTERRUPTING, 说明执行任务的线程有可能被中断了,因为s >= INTERRUPTING 只有两种可能,state状态为INTERRUPTINGINTERRUPTED

有的同学可能就要问了,咱前面已经执行过的set方法或者setException方法不是已经将state状态设置成NORMAL或者EXCEPTIONAL了吗?怎么会出现INTERRUPTING或者INTERRUPTED状态呢?别忘了,咱们在多线程的环境中,在当前线程执行run方法的同时,有可能其他线程取消了任务的执行,此时其他线程就可能对state状态进行改写,这也就是我们在设置终止状态的时候用putOrderedInt方法,而没有用CAS操作的原因——我们无法确信在设置state前是处于COMPLETING中间态还是INTERRUPTING中间态。

接下来我们来看看handlePossibleCancellationInterrupt方法干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Ensures that any interrupt from a possible cancel(true) is only
* delivered to a task while in run or runAndReset.
*/
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt

// assert state == INTERRUPTED;

// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}

可见该方法是一个自旋操作,如果当前的state状态是INTERRUPTING,我们在原地自旋,直到state状态转换成终止态。

至此,run方法的分析就真的结束了。我们来总结一下:

run方法重点做了以下几件事:

  • runner属性设置成当前正在执行run方法的线程
  • 调用callable成员变量的call方法来执行任务
  • 设置执行结果outcome, 如果执行成功, 则outcome保存的就是执行结果;如果执行过程中发生了异常, 则outcome中保存的就是异常,设置结果之前,先将state状态设为中间态COMPLETING
  • outcome的赋值完成后,设置state状态为终止态(NORMAL或者EXCEPTIONAL)
  • 唤醒Treiber栈中所有等待的线程
  • 善后清理(waiters, callable,runner设为null)
  • 检查是否有遗漏的中断,如果有,等待中断状态完成。

这里再插一句,我们前面说“state只要不是NEW状态,就说明任务已经执行完成了”就体现在这里,因为run方法中,我们是在c.call()执行完毕或者抛出了异常之后才开始设置中间态和终止态的。

Future接口实现

Future接口一共定义了5个方法,我们一个个来看:

cancel(boolean mayInterruptIfRunning)

既然上面在分析run方法的最后,我们提到了任务可能被别的线程取消,那我们看看怎么取消一个任务的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

在前面我们已经介绍过Future#cancel方法了:

  • cancel(boolean mayInterruptIfRunning)
    • 该方法用来尝试取消一个任务的执行, 它的返回值是boolean类型, 表示取消操作是否成功。以下三种情况之一的,cancel操作一定是失败的,返回false:
      • 任务已经执行完成了
      • 任务已经被取消过了
      • 任务因为某种原因不能被取消
    • 值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时任务所处的状态
      • 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
      • 如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:
        • 如果mayInterruptIfRunning为true, 则当前在执行的任务会被中断
        • 如果mayInterruptIfRunning为false, 则可以允许正在执行的任务继续运行,直到它执行完

我们来看看FutureTask是怎么实现cancel方法的这几个规范的:

首先,对于「任务已经执行完成了或者任务已经被取消过了,则cancel操作一定是失败的(返回false)」这两条,是通过简单的判断state值是否为NEW实现的,因为我们前面说过了,只要state不为NEW,说明任务已经执行完毕了。从代码中可以看出,只要state不为NEW,则直接返回false。

如果state还是NEW状态,我们再往下看:

1
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)

这一段是根据mayInterruptIfRunning的值将state的状态由NEW设置成INTERRUPTING或者CANCELLED,当这一操作也成功之后,就可以执行后面的try语句了,但无论怎么,该方法最后都返回了true。

我们再接着看try块干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
try {    // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}

我们知道,runner属性中存放的是当前正在执行任务的线程,因此,这个try块的目的就是中断当前正在执行任务的线程,最后将state的状态设为INTERRUPTED,当然,中断操作完成后,还需要通过finishCompletion()来唤醒所有在Treiber栈中等待的线程。我们现在总结一下,cancel方法实际上完成以下两种状态转换之一:

  • NEW -> CANCELLED (对应于mayInterruptIfRunning=false)
  • NEW -> INTERRUPTING -> INTERRUPTED (对应于mayInterruptIfRunning=true)

对于第一条路径,虽说cancel方法最终返回了true,但它只是简单的把state状态设为CANCELLED,并不会中断线程的执行。但是这样带来的后果是,任务即使执行完毕了,也无法设置任务的执行结果,因为前面分析run方法的时候我们知道,设置任务结果有一个中间态,而这个中间态的设置,是以当前state状态为NEW为前提的。

对于第二条路径,则会中断执行任务的线程,我们在倒回上面的run方法看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

虽然第二条路径中断了当前正在执行的线程,但是,响不响应这个中断是由执行任务的线程自己决定的,更具体的说,这取决于c.call()方法内部是否对中断进行了响应,是否将中断异常抛出。那call方法中是怎么处理中断的呢?从上面的代码中可以看出,catch语句处理了所有的Throwable的异常,这自然也包括了中断异常。

然而,值得一提的是,即使这里进入了catch (Throwable ex){}代码块,setException(ex)的操作一定是失败的,因为在我们取消任务执行的线程中,我们已经先把state状态设为INTERRUPTING了,而setException(ex)的操作要求设置前线程的状态为NEW所以这里响应cancel方法所造成的中断最大的意义不是为了对中断进行处理,而是简单的停止任务线程的执行,节省CPU资源。

那读者可能会问了,既然这个setException(ex)的操作一定是失败的,那放在这里有什么用呢?事实上,这个setException(ex)是用来处理任务自己在正常执行过程中产生的异常的,在我们没有主动去cancel任务时,任务的state状态在执行过程中就会始终是NEW,如果任务此时自己发生了异常,则这个异常就会被setException(ex)方法成功的记录到outcome中。

反正无论如何,run方法最终都会进入finally块,而这时候它会发现s >= INTERRUPTING,如果检测发现s = INTERRUPTING,说明cancel方法还没有执行到中断当前线程的地方,那就等待它将state状态设置成INTERRUPTED。到这里,对cancel方法的分析就和上面对run方法的分析对接上了。

cancel方法到这里就分析完了,如果你一条条的去对照Future接口对于cancel方法的规范,它每一条都是实现了的,而它实现的核心机理,就是对state的当前状态的判断和设置。由此可见,state属性是贯穿整个FutureTask的最核心的属性。

isCancelled()

说完了cancel,我们再来看看 isCancelled()方法,相较而言,它就简单多了:

1
2
3
public boolean isCancelled() {
return state >= CANCELLED;
}

那么state >= CANCELLED包含了那些状态呢,它包括了: CANCELLED INTERRUPTING INTERRUPTED

Future接口对于isCancelled()方法的规范:

该方法用于判断任务是否被取消了。如果一个任务在正常执行完成之前被Cancel掉了, 则返回true

再对比state的状态图:

state的状态图

可见选取这三个状态作为判断依据是很合理的, 因为只有调用了cancel方法,才会使state状态进入这三种状态。

isDone()

与 isCancelled方法类似,isDone方法也是简单地通过state状态来判断。

1
2
3
public boolean isDone() {
return state != NEW;
}

关于这一点,其实我们之前已经说过了,只要state状态不是NEW,则任务已经执行完毕了,因为state状态不存在类似“任务正在执行中”这种状态,即使是短暂的中间态,也是发生在任务已经执行完毕,正在设置任务结果的时候。

get()

最后我们来看看获取执行结果的get方法,先来看看无参的版本:

1
2
3
4
5
6
7
8
9
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

该方法其实很简单,当任务还没有执行完毕或者正在设置执行结果时,我们就使用awaitDone方法等待任务进入终止态,注意,awaitDone的返回值是任务的状态,而不是任务的结果。任务进入终止态之后,我们就根据任务的执行结果来返回计算结果或者抛出异常。

我们先来看看等待任务完成的awaitDone方法,该方法是获取任务结果最核心的方法,它完成了获取结果,挂起线程,响应中断等诸多操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

在具体分析它的源码之前,有一点我们先特别说明一下,FutureTask中会涉及到两类线程,一类是执行任务的线程,它只有一个,FutureTask的run方法就由该线程来执行;一类是获取任务执行结果的线程,它可以有多个,这些线程可以并发执行,每一个线程都是独立的,都可以调用get方法来获取任务的执行结果。如果任务还没有执行完,则这些线程就需要进入Treiber栈中挂起,直到任务执行结束,或者等待的线程自身被中断。

理清了这一点后,我们再来详细看看awaitDone方法。可以看出,该方法的大框架是一个自旋操作,我们一段一段来看:

1
2
3
4
5
6
7
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// ...
}

首先一开始,我们先检测当前线程是否被中断了,这是因为get方法是阻塞式的,如果等待的任务还没有执行完,则调用get方法的线程会被扔到Treiber栈中挂起等待,直到任务执行完毕。但是,如果任务迟迟没有执行完毕,则我们也有可能直接中断在Treiber栈中的线程,以停止等待。

当检测到线程被中断后,我们调用了removeWaiter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

removeWaiter的作用是将参数中的node从等待队列(即Treiber栈)中移除。如果此时线程还没有进入Treiber栈,则 q=null,那么removeWaiter(q)啥也不干。在这之后,我们就直接抛出了InterruptedException异常。

接着看awaitDone中的for(;;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for (;;) {
/*if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}*/
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
  • 如果任务已经进入终止态(s > COMPLETING),我们就直接返回任务的状态;
  • 如果任务正在设置执行结果(s == COMPLETING),我们就让出当前线程的CPU资源继续等待
  • 否则,就说明任务还没有执行,或者任务正在执行过程中,那么这时,如果q现在还为null, 说明当前线程还没有进入等待队列,于是我们新建了一个WaitNode, WaitNode的构造函数我们之前已经看过了,就是生成了一个记录了当前线程的节点;
  • 如果q不为null,说明代表当前线程的WaitNode已经被创建出来了,则接下来如果queued=false,表示当前线程还没有入队,所以我们执行了:
1
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

这行代码的作用是通过CAS操作将新建的q节点添加到waiters链表的头节点之前,其实就是Treiber栈的入栈操作,写的还是很简洁的,一行代码就搞定了,如果大家还是觉得晕乎,下面是它等价的伪代码:

1
2
3
4
5
q.next = waiters; //当前节点的next指向目前的栈顶元素
//如果栈顶节点在这个过程中没有变,即没有发生并发入栈的情况
if(waiters的值还是上面q.next所使用的waiters值){
waiters = q; //修改栈顶的指针,指向刚刚入栈的节点
}

这个CAS操作就是为了保证同一时刻如果有多个线程在同时入栈,则只有一个能够操作成功,也即Treiber栈的规范。

如果以上的条件都不满足,则再接下来因为现在是不带超时机制的gettimedfalse,则else if代码块跳过,然后来到最后一个else, 把当前线程挂起,此时线程就处于阻塞等待的状态。

至此,在任务没有执行完毕的情况下,获取任务执行结果的线程就会在Treiber栈中被LockSupport.park(this)挂起了。

那么这个挂起的线程什么时候会被唤醒呢?有两种情况:

  • 任务执行完毕了,在finishCompletion方法中会唤醒所有在Treiber栈中等待的线程
  • 等待的线程自身因为被中断等原因而被唤醒。

我们接下来就继续看看线程被唤醒后的情况,此时,线程将回到for(;;)循环的开头,继续下一轮循环,在复制一下上面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this); // 挂起的线程从这里被唤醒
}

首先自然还是检测中断,所不同的是,此时q已经不为null了,因此在有中断发生的情况下,在抛出中断之前,多了一步removeWaiter(q)操作,该操作是将当前线程从等待的Treiber栈中移除,相比入栈操作,这个出栈操作要复杂一点,这取决于节点是否位于栈顶。下面我们来仔细分析这个出栈操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

首先,我们把要出栈的WaitNodethread属性设置为null, 这相当于一个标记,是我们后面在waiters链表中定位该节点的依据。

  • 要移除的节点就在栈顶
    • 我们先来看看该节点就位于栈顶的情况,这说明在该节点入栈后,并没有别的线程再入栈了。由于一开始我们就将该节点的thread属性设为了null,因此,前面的q.thread != nullpred != null都不满足,我们直接进入到最后一个else if分支:
1
2
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;

这一段是栈顶节点出栈的操作,和入栈类似,采用了CAS比较,将栈顶元素设置成原栈顶节点的下一个节点。

值得注意的是,当CAS操作不成功时,程序会回到retry处重来,但即使CAS操作成功了,程序依旧会遍历完整个链表,找寻node.thread == null 的节点,并将它们一并从链表中剔除。

  • 要移除的节点不在栈顶
    • 当要移除的节点不在栈顶时,我们会一直遍历整个链表,直到找到q.thread == null的节点,找到之后,我们将进入
1
2
3
4
5
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}

这是因为节点不在栈顶,则其必然是有前驱节点pred的,这时,我们只是简单的让前驱节点指向当前节点的下一个节点,从而将目标节点从链表中剔除。

注意,后面多加的那个if判断是很有必要的,因为removeWaiter方法并没有加锁,所以可能有多个线程在同时执行,WaitNode的两个成员变量thread和next都被设置成volatile,这保证了它们的可见性,如果我们在这时发现了pred.thread == null,那就意味着它已经被另一个线程标记了,将在另一个线程中被拿出waiters链表,而我们当前目标节点的原后继节点现在是接在这个pred节点上的,因此,如果pred已经被其他线程标记为要拿出去的节点,我们现在这个线程再继续往后遍历就没有什么意义了,所以这时就调到retry处,从头再遍历。

如果pred节点没有被其他线程标记,那我们就接着往下遍历,直到整个链表遍历完。

至此,将节点从waiters链表中移除的removeWaiter操作我们就分析完了,我们总结一下该方法:

在该方法中,会传入一个需要移除的节点,我们会将这个节点的thread属性设置成null,以标记该节点。然后无论如何,我们会遍历整个链表,清除那些被标记的节点(只是简单的将节点从链表中剔除)。如果要清除的节点就位于栈顶,则还需要注意重新设置waiters的值,指向新的栈顶节点。所以可以看出,虽说removeWaiter方法传入了需要剔除的节点,但是 事实上它可能剔除的不止是传入的节点,而是所有已经被标记了的节点,这样不仅清除操作容易了些(不需要专门去定位传入的node在哪里),而且提升了效率(可以同时清除所有已经被标记的节点)。

我们再回到awaitDone方法里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q); // 刚刚分析到这里了,我们接着往下看
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

如果线程不是因为中断被唤醒,则会继续往下执行,此时会再次获取当前的state状态。所不同的是,此时q已经不为null, queued已经为true了,所以已经不需要将当前节点再入waiters栈了。

至此我们知道,除非被中断,否则get方法会在原地自旋等待(用的是Thread.yield,对应于s == COMPLETING)或者直接挂起(对应任务还没有执行完的情况),直到任务执行完成。而我们前面分析run方法和cancel方法的时候知道,在run方法结束后,或者cancel方法取消完成后,都会调用finishCompletion()来唤醒挂起的线程,使它们得以进入下一轮循环,获取任务执行结果。

最后,等awaitDone函数返回后,get方法返回了report(s),以根据任务的状态,汇报执行结果:

1
2
3
4
5
6
7
8
9
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

可见,report方法非常简单,它根据当前state状态,返回正常执行的结果,或者抛出指定的异常。

至此,get方法就分析结束了。

值得注意的是,awaitDone方法和get方法都没有加锁,这在多个线程同时执行get方法的时候会不会产生线程安全问题呢?通过查看方法内部的参数我们知道,整个方法内部用的大多数是局部变量,因此不会产生线程安全问题,对于全局的共享变量waiters的修改时,也使用了CAS操作,保证了线程安全,而state变量本身是volatile的,保证了读取时的可见性,因此整个方法调用虽然没有加锁,它仍然是线程安全的。

get(long timeout, TimeUnit unit)

最后我们来看看带超时版本的get方法:

1
2
3
4
5
6
7
8
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

它和上面不带超时时间的get方法很类似,只是在awaitDone方法中多了超时检测:

1
2
3
4
5
6
7
8
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}

即,如果指定的超时时间到了,则直接返回,如果返回时,任务还没有进入终止状态,则直接抛出TimeoutException异常,否则就像get()方法一样,正常的返回执行结果。

总结

FutureTask实现了Runnable和Future接口,它表示了一个带有任务状态和任务结果的任务,它的各种操作都是围绕着任务的状态展开的。

值得注意的是,在所有的7个任务状态中,只要不是NEW状态,就表示任务已经执行完毕或者不再执行了,并没有表示“任务正在执行中”的状态。

除了代表了任务的Callable对象、代表任务执行结果的outcome属性,FutureTask还包含了一个代表所有等待任务结束的线程的Treiber栈,这一点其实和各种锁的等待队列特别像,即如果拿不到锁,则当前线程就会被扔进等待队列中;这里则是如果任务还没有执行结束,则所有等待任务执行完毕的线程就会被扔进Treiber栈中,直到任务执行完毕了,才会被唤醒。

FutureTask虽然为我们提供了获取任务执行结果的途径,遗憾的是,在获取任务结果时,如果任务还没有执行完成,则当前线程会自旋或者挂起等待,这和我们实现异步的初衷是相违背的,我们后面将继续介绍另一个同步工具类CompletableFuture, 它解决了这个问题。

参考资料

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×