java.nio.ByteBuffer

Java NIO Buffers用于和NIO Channel交互。 我们从Channel中读取数据到buffers里,从Buffer把数据写入到Channels。Buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。java中java.nio.Buffer的常见实现类如下,不过我们这里只说一下ByteBuffer这个实现。

java中`java.nio.Buffer`的常见实现类

Buffer的重要属性

Buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据,为了便于理解,你可以把它理解为一个字节数组。它有有四个重要属性:

1
2
3
4
5
6
7
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
}
  • capacity
    • 这个属性表示这个Buffer最多能放多少数据,在创建buffer的时候指定。int类型。
  • position
    下一个要读写的元素位置(从0开始),当使用buffer的相对位置进行读/写操作时,读/写会从这个下标进行,并在操作完成后,buffer会更新下标的值。
    • 写模式:当写入数据到Buffer的时候需要从一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.
    • 读模式:当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归0,每次读取后,position向后移动。
  • limit
    在Buffer上进行的读写操作都不能越过这个limit。
    • 写模式:limit的含义是我们所能写入的最大数据量,它等同于buffer的容量capacity
    • 读模式:limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。换句话说,您可以读取与写入数量相同的字节数。
  • mark
    • 一个临时存放的位置下标,用户选定的position的前一个位置或-1。
      • 调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设
        置为mark的值。mark的值总是小于等于position的值,如果将position的值设的比mark小,当前的mark值会被抛弃掉。

注:

  • position和limit之间的距离指示了可读/存的字节数。
  • boolean hasRemaining():当缓冲区至少还有一个元素时,返回true。
  • int remaining():position和limit之间字节个数。

这些属性总是满足以下条件:

0 <= mark <= position <= limit <= capacity

通过上面的描述可以看出,其中position和limit的具体含义取决于当前buffer的模式(读模式还是写模式)。capacity在两种模式下都表示容量。

Buffer的常见API

  • ByteBuffer allocate(int capacity)
    • 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,实现类是HeapByteBuffer
  • ByteBuffer allocateDirect(int capacity)
    • 类似于allocate方法,不过使用的是对外内存,实现类是DirectByteBuffer
  • ByteBuffer wrap(byte[] array)
    • 把byte数组包装为ByteBuffer,bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方。其实ByteBuffer底层本来就有一个bytes数组负责来保存buffer缓冲区中的数据,通过allocate方法系统会帮你构造一个byte数组,实现类是HeapByteBuffer
  • ByteBuffer wrap(byte[] array, int offset, int length)
    • 在上一个方法的基础上可以指定偏移量和长度,buffer的capacity就是array.length,这个offset也就是包装后byteBuffer的position,limit=length+position(offset),mark=-1实现类是HeapByteBuffer
  • abstract Object array()
    • 返回支持此缓冲区的数组 (可选操作)
  • abstract int arrayOffset()
    • 返回此缓冲区中的第一个元素在缓冲区的底层实现数组中的偏移量(可选操作)。调用此方法之前要调用hasarray方法,以确保此缓冲区具有可访问的底层实现数组。
  • abstract boolean hasArray()
    • 告诉这个缓冲区是否由可访问的数组支持
  • int capacity()
    • 返回此缓冲区的容量
  • Buffer clear()
    • 清除此缓存区。将position = 0;limit = capacity;mark = -1;把position设为0,一般在把数据写入Buffer前调用。
  • Buffer flip()
    • flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。一般在从Buffer读出数据前调用。
  • abstract boolean isDirect()
    • 判断个缓冲区是否为direct, 也就是是否是对外内存
  • abstract boolean isReadOnly()
    • 判断告知这个缓冲区是否是只读的
  • int limit()
    • 返回此缓冲区的limit的属性值
  • Buffer position(int newPosition)
    • 设置这个缓冲区的位置
  • boolean hasRemaining()
    • return position < limit,返回是否还有未读内容
  • int remaining()
    • return limit - position; 返回limit和position之间相对位置差
  • Buffer rewind()
    • 把position设为0,mark设为-1,不改变limit的值,一般在把数据重写入Buffer前调用
  • Buffer mark()
    • 设置mark的值,mark=position,做个标记
  • Buffer reset()
    • 还原标记,position=mark。
  • ByteBuffer compact()
    • 该方法的作用是将 position 与 limit之间的数据复制到buffer的开始位置,与limit 之间没有数据的话发,就不会进行复制。一个例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
例如:ByteBuffer.allowcate(10); 
内容:[0 ,1 ,2 ,3 4, 5, 6, 7, 8, 9]
## compact前
[0 ,1 ,2 , 3, 4, 5, 6, 7, 8, 9]
pos=4
lim=10
cap=10
## compact后
[4, 5, 6, 7, 8, 9, 6, 7, 8, 9]
pos=6
lim=10
cap=10
  • ByteBuffer slice();
    • 创建一个分片缓冲区。分片缓冲区与主缓冲区共享数据。 分配的起始位置是主缓冲区的position位置,容量为limit-position。 分片缓冲区无法看到主缓冲区positoin之前的元素。

创建buffer的注意点

创建ByteBuffer可以使用allocate或者wrap,就像下面这样:

  • ByteBuffer allocate(int capacity)
  • ByteBuffer allocateDirect(int capacity)
  • ByteBuffer wrap(int capacity)
  • ByteBuffer wrap(byte[] array,int offset,int length)

需要注意的是:创建的缓冲区都是定长的,大小无法改变。若发现刚创建的缓冲区容量太小,只能重新创建一个合适的

关于ByteBuffer wrap(byte[] array,int offset,int length)这个方法,这里再强调一下,这样创建的ByteBuffer的capacity和array的大小是一样的,buffer的position是offset, buffer的limit是offset+length,position之前和limit之后的数据依然可以访问到。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Demo {
public static void main(String args[]){
byte arr[]=new byte[100];
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
System.out.println("Capacity is: "+buffer.capacity());
System.out.println("Position is: "+buffer.position());
System.out.println("limit is: "+buffer.limit());
}
}
//结果:
Capacity is: 100
Position is: 3
limit is: 28

allocate和wrap的区别

wrap只是简单地创建一个具有指向被包装数组的引用的缓冲区,该数组成为后援数组。对后援数组中的数据做的任何修改都将改变缓冲区中的数据,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String args[]){  
byte arr[]=new byte[100];
//将arr数组全部置为1
Arrays.fill(arr, (byte)1);
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
//对后援数组中的数据做的任何修改都将改变缓冲区中的数据
arr[0]=(byte)2;
buffer.position(0);
System.out.println(buffer.get());
//在缓冲区上调用array()方法即可获得后援数组的引用。
System.out.println(Arrays.toString(buffer.array()));
}
//运行结果:
2
[2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ....]//总共100个元素

allocate则是创建了自己的后援数组,在缓冲区上调用array()方法也可获得后援数组的引用。通过调用arrayOffset()方法,可以获取缓冲区中第一个元素在后援数组的偏移量。但是使用wrap创建的ByteBuffer,调用arrayOffset永远是0。

1
2
3
4
5
6
7
8
9
10
11
12
13
 public static void main(String args[]){  
ByteBuffer buffer=ByteBuffer.allocate(100);
//对后援素组的修改也可以反映到buffer上
byte arr[]=buffer.array();
arr[1]=(byte)'a';
buffer.getInt();

System.out.println(Arrays.toString(buffer.array()));
System.out.println(buffer.arrayOffset());
}
//运行结果:
[0, 97, 0, 0, 0, 0, 0, 0,...]//总共100个元素
0

通过ByteBuffer allocateDirect(int capacity)创建的叫直接缓冲区,使用的堆外内存。可以通过isDirect()方法查看一个缓冲区是否是直接缓冲区。由于直接缓冲区是没有后援数组的,所以在其上面调用array()或arrayOffset()都会抛出UnsupportedOperationException异常。注意有些平台或JVM可能不支持这个创建直接缓冲区。

图解

put

写模式下,往buffer里写一个字节,并把postion移动一位。写模式下,一般limit与capacity相等。

bytebuffer-put

flip

写完数据,需要开始读的时候,将postion复位到0,并将limit设为当前postion。

bytebuffer-flip

get

从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。

bytebuffer-get

clear

将position置为0,并不清除buffer内容。

bytebuffer-clear

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Demo {
private static final int SIZE = 1024;

public static void main(String[] args) throws Exception {
// 获取通道,该通道允许写操作
FileChannel fc = new FileOutputStream("data.txt").getChannel();
// 将字节数组包装到缓冲区中
fc.write(ByteBuffer.wrap("Some text".getBytes()));
// 关闭通道
fc.close();

// 随机读写文件流创建的管道
fc = new RandomAccessFile("data.txt", "rw").getChannel();
// fc.position()计算从文件的开始到当前位置之间的字节数
System.out.println("此通道的文件位置:" + fc.position());
// 设置此通道的文件位置,fc.size()此通道的文件的当前大小,该条语句执行后,通道位置处于文件的末尾
fc.position(fc.size());
// 在文件末尾写入字节
fc.write(ByteBuffer.wrap("Some more".getBytes()));
fc.close();

// 用通道读取文件
fc = new FileInputStream("data.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
// 将文件内容读到指定的缓冲区中
fc.read(buffer);
//此行语句一定要有, 如果没有,就是从文件最后开始读取的,当然读出来的都是byte=0时候的字符。通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char)buffer.get());
}
fc.close();
}
}

确保数据落盘

在之前的文章《unix IO模型》我们曾经提到过,用户空间内核空间缓存IO等概念。关于这些概念,大家可以阅读这篇文章,在本篇文章中,我们就不在涉及这些概念了。

IO缓冲机制

大家需要有一个认知就是我们平时写的程序,在将数据到文件中时,其实数据不会立马写入磁盘中进行持久化存储的,而是会经过层层缓存,如下图所示:

I/O buffering

其中这每层缓存都有自己的刷新时机,每层缓存都刷新后才会写入磁盘进行持久化存储。这些缓存的存在目的本意都是为了加速读写操作,因为如果每次读写都对应真实磁盘操作,那么读写的效率会大大降低。但是同样带来的坏处是如果期间发生掉电或者别的故障,还未写入磁盘的数据就丢失了。对于数据安全敏感的应用,比如数据库,比如交易程序,这是无法忍受的。所以操作系统提供了保证文件落盘的机制。

在上面这图中说明了操作系统到磁盘的数据流,以及经过的缓冲区。首先数据会先存在于应用的内存空间,如果调用库函数写入,库函数可能还会把数据缓存在库函数所维护的缓冲区空间中,比如C标准库stdio提供的方法就会进行缓存,目的是为了减少系统调用的次数。这两个缓存都是在用户空间中的。库函数缓存flush时,会调用write系统调用将数据写入内核空间,内核同样维护了一个页缓存(page cache),操作系统会在合适的时间把脏页的数据写入磁盘。即使是写入磁盘了,磁盘也可能维护了一个缓存,在这个时候掉电依然会丢失数据的,只有写入了磁盘的持久存储物理介质上,数据才是真正的落盘了,是安全的。

比如在网络套接字上侦听连接并将从每个客户端接收的数据写入文件的应用程序。 在关闭连接之前,服务器确保将接收到的数据写入稳定存储器,并向客户端发送此类确认,请看下面的简化代码(代码中已经注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int sock_read(int sockfd, FILE *outfp, size_t nrbytes)
{
int ret;
size_t written = 0;
//example of an application buffer
char *buf = malloc(MY_BUF_SIZE);

if (!buf)
return -1;

//take care of reading the data from the socket
//and writing it to the file stream
while (written < nrbytes) {
ret = read(sockfd, buf, MY_BUF_SIZE);
if (ret =< 0) {
if (errno == EINTR)
continue;
return ret;
}
written += ret;
ret = fwrite((void *)buf, ret, 1, outfp);
if (ret != 1)
return ferror(outfp);
}

//flush file stream, the data to move into the "Kernel Buffers" layer
ret = fflush(outfp);
if (ret != 0)
return -1;

//makethe data is saved to the "Stable Storage" layer
ret = fsync(fileno(outfp));
if (ret < 0)
return -1;
return 0;
}

在上面的这幅图中,可以看到数据流向经过了用户控件缓冲区内核缓存区,下面我们说说这2个缓存区。

用户空间缓冲区

用户空间的缓存分为:

  • 应用程序本身维护的缓冲区
  • 库维护的缓冲区

应用本身维护的缓冲区需要开发者自己刷新,调用库函数写入到库函数的缓冲区中(这一步可能不存在)。如果应用程序不依赖任何库函数,而是直接使用系统调用,那么则是把数据写入系统的缓冲区去。

库函数一般都会维护缓冲区,目的是简化应用程序的编写,应用程序就不需要编写维护缓冲区的代码,库维护的缓冲区针对那些没有应用程序本身维护的缓存区的程序来说,在某些时候是会提升不少的性能的,因为缓冲区大大减少了系统调用的次数,而系统调用是非常耗时的,系统调用涉及到用户态到内核态的切换,这个切换需要很多的步骤与校验,较为耗时。

比如C标准库stdio就维护着一个缓冲区,对应这个缓冲区,C标准库提供了fflush方法强制把缓冲区数据写入操作系统。

在Java的OutputStream接口提供了一个flush方法,具体的作用要看实现类的具体实现。BufferedOutputStream#flush就会把自己维护的缓冲区数据写入下一层的OutputStream。比如是new BufferedOutputStream(new FileOutputStream("/"))这样的模式,则调用BufferedOutputStream#flush会将数据写入操作系统。

内核缓冲区

应用程序直接或者通过库函数间接的使用系统调用write将数据写入操作系统缓冲区.UNIX系统在内核中设有高速缓存或页面高速缓存。目的是为了减少磁盘读写次数。

用户写入系统的数据先写入系统缓冲区,系统缓冲区写满后,将其排入输出队列,然后得到队首时,才进行实际的IO操作。这种输出方式被称为延迟写

UNIX系统提供了三个系统调用来执行刷新内核缓冲区:sync,fsync,fdatasync

sync

1
2
3
// sync() causes all pending modifications to filesystem metadata and
//cached file data to be written to the underlying filesystems.
void sync(void)

sync函数只是将所有修改过的块缓冲区排入输出队列就返回,并不等待实际的写磁盘操作返回。 操作系统的update系统守护进程会周期地调用sync函数,来保证系统中的数据能定期落盘。

根据sync(2) - Linux manual page的描述,Linux对sync的实现与POSIX规范不太一样,POSIX规范中,sync可能在文件真正落盘前就返回,而Linux的实现则是文件真正落盘后才会返回。所以Linux中,sync与fsync的效果是一样的!但是1.3.20之前的Linux存在BUG,导致sync并不会在真正落盘后返回。

fsync

1
void fsync(int filedes)

fsync对指定的文件起作用,它传输内核缓冲区中这个文件的数据到存储设备中,并阻塞直到存储设备响应说数据已经保存好了。

fsync对文件数据与文件元数据都有效。文件的元数据可以理解为文件的属性数据,比如文件的更新时间,访问时间,长度等。

fdatasync

1
void fdatasync(int filedes)

fdatasyncfsync类似,两者的区别是,fdatasync不一定需要刷新文件的元数据部分到存储设备。

是否需要刷新文件的元数据,是要看元数据的变化部分是否对之后的读取有影响,比如文件元数据的访问时间st_atime和修改时间st_mtime变化了,fdatasync不会去刷新元数据数据到存储设备,因为即使这个数据丢失了不一致了,也不影响故障恢复后的文件读取。但是如果文件的长度st_size变化了,那么就需要刷新元数据数据到存储设备。

所以如果你每次都更新文件长度,那么调用fsyncfdatasync的效果是一样的。

但是如果更新能做到不修改文件长度,那么fdatasync能比fsync少了一次磁盘写入,这个是非常大的速度提升。

open中的O_SYNC和O_DSYNC

除了上面三个系统调用,open系统调用在打开文件时,可以设置和同步相关的标志位:O_SYNCO_DSYNC

  • 设置O_SYNC的效果相当于是每次write后自动调用fsync。
  • 设置O_DSYNC的效果相当于是每次write后自动调用fdatasync。

关于新建文件

在一个文件上调用fsync/fdatasync只能保证文件本身的数据落盘,但是对于文件系统来说,目录中也保存着文件信息,fsync/fdatasync的调用并不会保证这部分的数据落盘。如果此时发生掉电,这个文件就无法被找到了。所以对于新建文件来说,还需要在父目录上调用fsync。

关于覆盖现有文件

覆盖现有文件时,如果发生掉电,新的数据是不会写入成功,但是可能会污染现有的数据,导致现有数据丢失。所以最佳实践是新建一个临时文件,写入成功后,再原子性替换原有文件。具体步骤:

  • 新建一个临时文件
  • 向临时文件写入数据
  • 对临时文件调用fsync,保证数据落盘。期间发生掉电对现有文件无影响。
  • 重命名临时文件为目标文件名
  • 对父目录调用fsync

存储设备缓冲区

存储设备为了提高性能,也会加入缓存。高级的存储设备能提供非易失性的缓存,比如有掉电保护的缓存。但是无法对所有设备做出这种保证,所以如果数据只是写入了存储设备的缓存的话,遇到掉电等故障,依然会导致数据丢失。

对于保证数据能保存到存储设备的持久化存储介质上,而不管设备本身是否有易失性缓存,操作系统提供了write barriers这个机制。开启了write barriers的文件系统,能保证调用fsync/fdatasync数据持久化保存,无论是否发生了掉电等其他故障,但是会导致性能下降。

许多文件系统提供了配置write barriers的功能。比如ext3, ext4, xfsbtrfsmount参数-o barrier表示开启写屏障,调用fsync/fdatasync能保证刷新存储设备的缓存到持久化介质上。-o nobarrier则表示关闭写屏障,调用fsync/fdatasync无法保证数据落盘。

Linux默认开启write barriers,所以默认情况下,我们调用fsync/fdatasync,就可以认为是文件真正的可靠落盘了

对于这个层面的数据安全保证来说,应用程序是不需要去考虑的,因为如果这台机器的硬盘被挂载为没有开启写屏障,那么可以认为这个管理员知道这个风险,他选择了更高的性能,而不是更高的安全性。

Java世界中的对应API

针对确保数据落盘,掉电也不丢失数据的情况,JDK也封装了对应的功能,并且为我们做好了跨平台的保证。

JDK中有三种方式可以强制文件数据落盘:

  • 调用FileDescriptor#sync函数
  • 调用FileChannel#force函数
  • 使用RandomAccessFilerws或者rwd模式打开文件

FileDescriptor#sync

FileDescriptor类提供了sync方法,可以用于保证数据保存到持久化存储设备后返回。使用方法:

1
2
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");
outputStream.getFD().sync();

可以看一下JDK是如何实现FileDescriptor#sync的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public native void sync() throws SyncFailedException;

// jdk/src/solaris/native/java/io/FileDescriptor_md.c
JNIEXPORT void JNICALL
Java_java_io_FileDescriptor_sync(JNIEnv *env, jobject this) {
// 获取文件描述符
FD fd = THIS_FD(this);
// 调用IO_Sync来执行数据同步
if (IO_Sync(fd) == -1) {
JNU_ThrowByName(env, "java/io/SyncFailedException", "sync failed");
}
}
// IO_Sync在UNIX系统上的定义就是fsync:
// jdk/src/solaris/native/java/io/io_util_md.h
#define IO_Sync fsync

FileChannel#force

之前的文章提到了,操作系统提供了fsync/fdatasync两个用户同步数据到持久化设备的系统调用,后者尽可能的会不同步文件元数据,来减少一次磁盘IO,提高性能。但是Java IO的FileDescriptor#sync只是对fsync的封装,JDK中没有对于fdatasync的封装,这是一个特性缺失。

Java NIO对这一点也做了增强,FileChannel类的force方法,支持传入一个布尔参数metaData,表示是否需要确保文件元数据落盘,如果为true,则调用fsync。如果为false,则调用fdatasync。

使用例子如下:

1
2
3
4
5
6
7
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");

// 强制文件数据与元数据落盘
outputStream.getChannel().force(true);

// 强制文件数据落盘,不关心元数据是否落盘
outputStream.getChannel().force(false);

在jdk中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class FileChannelImpl extends FileChannel {
private final FileDispatcher nd;
private final FileDescriptor fd;
private final NativeThreadSet threads = new NativeThreadSet(2);

public final boolean isOpen() {
return open;
}

private void ensureOpen() throws IOException {
if(!this.isOpen()) {
throw new ClosedChannelException();
}
}

// 布尔参数metaData用于指定是否需要文件元数据也确保落盘
public void force(boolean metaData) throws IOException {
// 确保文件是已经打开的
ensureOpen();
int rv = -1;
int ti = -1;
try {
begin();
ti = threads.add();

// 再次确保文件是已经打开的
if (!isOpen())
return;
do {
// 调用FileDispatcher#force
rv = nd.force(fd, metaData);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
} finally {
threads.remove(ti);
end(rv > -1);
assert IOStatus.check(rv);
}
}
}

实现中有许多线程同步相关的代码,不属于我们要关注的部分,就不分析了。FileChannel#force调用FileDispatcher#forceFileDispatcher是NIO内部实现用的一个类,封装了一些文件操作方法,其中包含了刷新文件的方法:

1
2
3
4
abstract class FileDispatcher extends NativeDispatcher {
abstract int force(FileDescriptor fd, boolean metaData) throws IOException;
// ...
}

FileDispatcher#force的实现:

1
2
3
4
5
6
class FileDispatcherImpl extends FileDispatcher {
int force(FileDescriptor fd, boolean metaData) throws IOException {
return force0(fd, metaData);
}
static native int force0(FileDescriptor fd, boolean metaData) throws IOException;
// ...

FileDispatcher#force的本地方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
jobject fdo, jboolean md)
{
// 获取文件描述符
jint fd = fdval(env, fdo);
int result = 0;

if (md == JNI_FALSE) {
// 如果调用者认为不需要同步文件元数据,调用fdatasync
result = fdatasync(fd);
} else {
#ifdef _AIX
/* On AIX, calling fsync on a file descriptor that is opened only for
* reading results in an error ("EBADF: The FileDescriptor parameter is
* not a valid file descriptor open for writing.").
* However, at this point it is not possibly anymore to read the
* 'writable' attribute of the corresponding file channel so we have to
* use 'fcntl'.
*/
int getfl = fcntl(fd, F_GETFL);
if (getfl >= 0 && (getfl & O_ACCMODE) == O_RDONLY) {
return 0;
}
#endif
// 如果调用者认为需要同步文件元数据,调用fsync
result = fsync(fd);
}
return handle(env, result, "Force failed");
}

可以看出,其实FileChannel#force就是简单的通过metaData参数来区分调用fsync和fdatasync。

同时在zookeeper的org.apache.zookeeper.common.AtomicFileOutputStream类中我们可以看到下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public void close() throws IOException {
boolean triedToClose = false, success = false;
try {
flush();
((FileOutputStream) out).getChannel().force(true);

triedToClose = true;
super.close();
success = true;
} finally {
if (success) {
boolean renamed = tmpFile.renameTo(origFile);
if (!renamed) {
// On windows, renameTo does not replace.
if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
throw new IOException(
"Could not rename temporary file " + tmpFile
+ " to " + origFile);
}
}
} else {
if (!triedToClose) {
// If we failed when flushing, try to close it to not leak
// an FD
IOUtils.closeStream(out);
}
// close wasn't successful, try to delete the tmp file
if (!tmpFile.delete()) {
LOG.warn("Unable to delete tmp file " + tmpFile);
}
}
}
}

RandomAccessFile结合rws/rwd模式

RandomAccessFile打开文件支持4中模式:

  • r 以只读方式打开。调用结果对象的任何 write 方法都将导致抛出 IOException。
  • rw打开以便读取和写入。如果该文件尚不存在,则尝试创建该文件。
  • rws 打开以便读取和写入,对于rws,还要求对文件的内容或元数据的每个更新都同步写入到底层存储设备。
  • rwd 打开以便读取和写入,对于rwd,还要求对文件内容的每个更新都同步写入到底层存储设备。

其中rws模式会在open文件时传入O_SYNC标志位。rwd模式会在open文件时传入O_DSYNC标志位。

RandomAccessFile源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 4个标志位,用于组合表示4种模式
private static final int O_RDONLY = 1;
private static final int O_RDWR = 2;
private static final int O_SYNC = 4;
private static final int O_DSYNC = 8;

public RandomAccessFile(File file, String mode)
throws FileNotFoundException
{
String name = (file != null ? file.getPath() : null);
int imode = -1;
// 只读模式
if (mode.equals("r"))
imode = O_RDONLY;
else if (mode.startsWith("rw")) {
// 读写模式
imode = O_RDWR;
rw = true;

// 读写模式下,可以结合O_SYNC和O_DSYNC标志
if (mode.length() > 2) {
if (mode.equals("rws"))
imode |= O_SYNC;
else if (mode.equals("rwd"))
imode |= O_DSYNC;
else
imode = -1;
}
}
if (imode < 0)
throw new IllegalArgumentException("Illegal mode \"" + mode
+ "\" must be one of "
+ "\"r\", \"rw\", \"rws\","
+ " or \"rwd\"");
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkRead(name);
if (rw) {
security.checkWrite(name);
}
}
if (name == null) {
throw new NullPointerException();
}
if (file.isInvalid()) {
throw new FileNotFoundException("Invalid file path");
}
// 新建文件描述符
fd = new FileDescriptor();
fd.attach(this);
path = name;
open(name, imode);
}

private void open(String name, int mode)
throws FileNotFoundException {
open0(name, mode);
}

private native void open0(String name, int mode)
throws FileNotFoundException;

其中open0的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// jdk/src/share/native/java/io/RandomAccessFile.c
JNIEXPORT void JNICALL
Java_java_io_RandomAccessFile_open0(JNIEnv *env,
jobject this, jstring path, jint mode)
{
int flags = 0;
// JAVA中的标志位与操作系统标志位转换
if (mode & java_io_RandomAccessFile_O_RDONLY)
flags = O_RDONLY;
else if (mode & java_io_RandomAccessFile_O_RDWR) {
flags = O_RDWR | O_CREAT;
if (mode & java_io_RandomAccessFile_O_SYNC)
flags |= O_SYNC;
else if (mode & java_io_RandomAccessFile_O_DSYNC)
flags |= O_DSYNC;
}

// 调用fileOpen打开函数
fileOpen(env, this, path, raf_fd, flags);
}

fileOpen之后的流程与FileInputStream的一致。可以看出,相比于FileInputStream固定使用O_RDONLYFileOutputStream固定使用O_WRONLY | O_CREATRandomAccessFile提供了在Java中指定打开模式的能力。

但是同时我们需要清除,rwsrwd的效率比rw低非常非常多,因为每次读写都需要刷到磁盘才会返回,这两个中rwdrws效率高一些,因为rwd只刷新文件内容,rws刷新文件内容与元数据,文件的元数据就是文件更新时间等信息。

原子性的重命名文件

在java中的File类的renameTo方法,提供了重命名文件的功能。但是需要注意的是这个方法并不能保证原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Renames the file denoted by this abstract pathname.
*
* <p> Many aspects of the behavior of this method are inherently
* platform-dependent: The rename operation might not be able to move a
* file from one filesystem to another, it might not be atomic, and it
* might not succeed if a file with the destination abstract pathname
* already exists. The return value should always be checked to make sure
* that the rename operation was successful.
*
* <p> Note that the {@link java.nio.file.Files} class defines the {@link
* java.nio.file.Files#move move} method to move or rename a file in a
* platform independent manner.
*
* @param dest The new abstract pathname for the named file
*
* @return <code>true</code> if and only if the renaming succeeded;
* <code>false</code> otherwise
*
* @throws SecurityException
* If a security manager exists and its <code>{@link
* java.lang.SecurityManager#checkWrite(java.lang.String)}</code>
* method denies write access to either the old or new pathnames
*
* @throws NullPointerException
* If parameter <code>dest</code> is <code>null</code>
*/
public boolean renameTo(File dest) {

因此如果想原子性的重命名和移动文件,我们应该使用java.nio.file.Files类中的move方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Move or rename a file to a target file.
*
* <p> By default, this method attempts to move the file to the target
* file, failing if the target file exists except if the source and
* target are the {@link #isSameFile same} file, in which case this method
* has no effect. If the file is a symbolic link then the symbolic link
* itself, not the target of the link, is moved. This method may be
* invoked to move an empty directory. In some implementations a directory
* has entries for special files or links that are created when the
* directory is created. In such implementations a directory is considered
* empty when only the special entries exist. When invoked to move a
* directory that is not empty then the directory is moved if it does not
* require moving the entries in the directory. For example, renaming a
* directory on the same {@link FileStore} will usually not require moving
* the entries in the directory. When moving a directory requires that its
* entries be moved then this method fails (by throwing an {@code
* IOException}). To move a <i>file tree</i> may involve copying rather
* than moving directories and this can be done using the {@link
* #copy copy} method in conjunction with the {@link
* #walkFileTree Files.walkFileTree} utility method.
*
* <p> The {@code options} parameter may include any of the following:
*
* <table border=1 cellpadding=5 summary="">
* <tr> <th>Option</th> <th>Description</th> </tr>
* <tr>
* <td> {@link StandardCopyOption#REPLACE_EXISTING REPLACE_EXISTING} </td>
* <td> If the target file exists, then the target file is replaced if it
* is not a non-empty directory. If the target file exists and is a
* symbolic link, then the symbolic link itself, not the target of
* the link, is replaced. </td>
* </tr>
* <tr>
* <td> {@link StandardCopyOption#ATOMIC_MOVE ATOMIC_MOVE} </td>
* <td> The move is performed as an atomic file system operation and all
* other options are ignored. If the target file exists then it is
* implementation specific if the existing file is replaced or this method
* fails by throwing an {@link IOException}. If the move cannot be
* performed as an atomic file system operation then {@link
* AtomicMoveNotSupportedException} is thrown. This can arise, for
* example, when the target location is on a different {@code FileStore}
* and would require that the file be copied, or target location is
* associated with a different provider to this object. </td>
* </table>
*
* <p> An implementation of this interface may support additional
* implementation specific options.
*
* <p> Moving a file will copy the {@link
* BasicFileAttributes#lastModifiedTime last-modified-time} to the target
* file if supported by both source and target file stores. Copying of file
* timestamps may result in precision loss. An implementation may also
* attempt to copy other file attributes but is not required to fail if the
* file attributes cannot be copied. When the move is performed as
* a non-atomic operation, and an {@code IOException} is thrown, then the
* state of the files is not defined. The original file and the target file
* may both exist, the target file may be incomplete or some of its file
* attributes may not been copied from the original file.
*
* <p> <b>Usage Examples:</b>
* Suppose we want to rename a file to "newname", keeping the file in the
* same directory:
* <pre>
* Path source = ...
* Files.move(source, source.resolveSibling("newname"));
* </pre>
* Alternatively, suppose we want to move a file to new directory, keeping
* the same file name, and replacing any existing file of that name in the
* directory:
* <pre>
* Path source = ...
* Path newdir = ...
* Files.move(source, newdir.resolve(source.getFileName()), REPLACE_EXISTING);
* </pre>
*
* @param source
* the path to the file to move
* @param target
* the path to the target file (may be associated with a different
* provider to the source path)
* @param options
* options specifying how the move should be done
*
* @return the path to the target file
*
* @throws UnsupportedOperationException
* if the array contains a copy option that is not supported
* @throws FileAlreadyExistsException
* if the target file exists but cannot be replaced because the
* {@code REPLACE_EXISTING} option is not specified <i>(optional
* specific exception)</i>
* @throws DirectoryNotEmptyException
* the {@code REPLACE_EXISTING} option is specified but the file
* cannot be replaced because it is a non-empty directory
* <i>(optional specific exception)</i>
* @throws AtomicMoveNotSupportedException
* if the options array contains the {@code ATOMIC_MOVE} option but
* the file cannot be moved as an atomic file system operation.
* @throws IOException
* if an I/O error occurs
* @throws SecurityException
* In the case of the default provider, and a security manager is
* installed, the {@link SecurityManager#checkWrite(String) checkWrite}
* method is invoked to check write access to both the source and
* target file.
*/
public static Path move(Path source, Path target, CopyOption... options)
throws IOException

其中参数中的CopyOption可选性有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package java.nio.file;

/**
* Defines the standard copy options.
*
* @since 1.7
*/

public enum StandardCopyOption implements CopyOption {
/**
* Replace an existing file if it exists.
*/
REPLACE_EXISTING,
/**
* Copy attributes to the new file.
*/
COPY_ATTRIBUTES,
/**
* Move the file as an atomic file system operation.
*/
ATOMIC_MOVE;
}

我们看看kafka中怎么使用的,在kafka的org.apache.kafka.common.utils中有下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
try {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException outer) {
try {
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to "
+ outer.getMessage());
} catch (IOException inner) {
inner.addSuppressed(outer);
throw inner;
}
}
}

参考资料

java中的zero copy

在web应用程序中,我们经常会在server和client之间传输数据。比如server发数据给client,server首先将数据从硬盘读出之后,然后原封不动的通过socket传输给client,大致原理如下:

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

下面的例子展示了传统的数据复制实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class TraditionalClient {


public static void main(String[] args) {

int port = 2000;
String server = "localhost";
Socket socket = null;
String lineToBeSent;

DataOutputStream output = null;
FileInputStream inputStream = null;
int ERROR = 1;


// connect to server
try {
socket = new Socket(server, port);
System.out.println("Connected with server " +
socket.getInetAddress() +
":" + socket.getPort());
}
catch (UnknownHostException e) {
System.out.println(e);
System.exit(ERROR);
}
catch (IOException e) {
System.out.println(e);
System.exit(ERROR);
}

try {
String fname = "sendfile/NetworkInterfaces.c";
inputStream = new FileInputStream(fname);

output = new DataOutputStream(socket.getOutputStream());
long start = System.currentTimeMillis();
byte[] b = new byte[4096];
long read = 0, total = 0;
while ((read = inputStream.read(b)) >= 0) {
total = total + read;
output.write(b);
}
System.out.println("bytes send--" + total + " and totaltime--" + (System.currentTimeMillis() - start));
}
catch (IOException e) {
System.out.println(e);
}

try {
output.close();
socket.close();
inputStream.close();
}
catch (IOException e) {
System.out.println(e);
}
}
}

这种操作看起来可能不会怎么消耗CPU,但是实际上它是低效的。因为传统的 Linux 操作系统的标准 I/O 接口是基于数据拷贝操作的,即 I/O 操作会导致数据在操作系统内核地址空间的缓冲区和应用程序地址空间定义的缓冲区之间进行传输。如下图:

Traditional data copying approach

  • 数据首先被从磁盘读取到内核的read buffer
  • 然后在从内核的read buffer中复制到应用程序的buffer中
  • 然后在从应用程序的buffer中复制到内核的socket buffer
  • 最后在从内核的socket buffer中复制到网卡中

然后其中涉及了4次上下文切换:

Traditional context switches

分析上面的描述,我们可以看到kernel buffer其实在这个过程中充当了一个ahead cache。之所以引入这个kernel buffer其实是在很多的情况下是可以减少磁盘 I/O 的操作,进而提升效率的。

  • 比如对于读请求,如果我们所请求的数据的大小小于kernel buffer并且如果要读取的数据已经存放在操作系统的高速缓冲存储器中,那么就不需要再进行实际的物理磁盘 I/O 操作。直接从kernel buffer中读取就好了。
  • 对于写请求:利用这个ahead cache可以实现异步写操作

但是没有银弹,这样会带来一个问题就是,如果当我们请求的数据量的大小远大于kernel buffer的大小的话,这种情况下kernel buffer的存在反而会导致数据在kernel buffer用户缓冲区之间多次复制。

java中的zero copy

如果我们想在java中使用zero copy,我们一般会用java.nio.channels.FileChannel类中的transferTo()方法。下面是它的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Transfers bytes from this channel's file to the given writable byte
* channel.
*
* <p> An attempt is made to read up to <tt>count</tt> bytes starting at
* the given <tt>position</tt> in this channel's file and write them to the
* target channel. An invocation of this method may or may not transfer
* all of the requested bytes; whether or not it does so depends upon the
* natures and states of the channels. Fewer than the requested number of
* bytes are transferred if this channel's file contains fewer than
* <tt>count</tt> bytes starting at the given <tt>position</tt>, or if the
* target channel is non-blocking and it has fewer than <tt>count</tt>
* bytes free in its output buffer.
*
* <p> This method does not modify this channel's position. If the given
* position is greater than the file's current size then no bytes are
* transferred. If the target channel has a position then bytes are
* written starting at that position and then the position is incremented
* by the number of bytes written.
*
* <p> This method is potentially much more efficient than a simple loop
* that reads from this channel and writes to the target channel. Many
* operating systems can transfer bytes directly from the filesystem cache
* to the target channel without actually copying them. </p>
*
* @param position
* The position within the file at which the transfer is to begin;
* must be non-negative
*
* @param count
* The maximum number of bytes to be transferred; must be
* non-negative
*
* @param target
* The target channel
*
* @return The number of bytes, possibly zero,
* that were actually transferred
*
* @throws IllegalArgumentException
* If the preconditions on the parameters do not hold
*
* @throws NonReadableChannelException
* If this channel was not opened for reading
*
* @throws NonWritableChannelException
* If the target channel was not opened for writing
*
* @throws ClosedChannelException
* If either this channel or the target channel is closed
*
* @throws AsynchronousCloseException
* If another thread closes either channel
* while the transfer is in progress
*
* @throws ClosedByInterruptException
* If another thread interrupts the current thread while the
* transfer is in progress, thereby closing both channels and
* setting the current thread's interrupt status
*
* @throws IOException
* If some other I/O error occurs
*/
public abstract long transferTo(long position, long count,
WritableByteChannel target)
throws IOException;

transferTo()方法把数据从file channel传输到指定的writable byte channel。它需要底层的操作系统支持zero copy。在UNIX和各种Linux中,会执行系统调用sendfile(),该命令把数据从一个文件描述符传输到另一个文件描述符(Linux中万物皆文件):

1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

因此传统的方式中的

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

可以被transferTo()替代。下面的图展示了使用transferTo(), 也就是zero copy技术后的流程:

 Data copy with transferTo()

Context switching with transferTo()

  • transferTo()方法使文件内容被DMA引擎复制到读缓冲区中。 然后,内核将数据复制到与输出套接字关联的内核缓冲区中。
  • 第三个副本发生在DMA引擎将数据从内核套接字缓冲区传递到协议引擎时。

这是一个很明显的进步:我们把context switch的次数从4次减少到了2次,同时也把data copy的次数从4次降低到了3次(而且其中只有一次占用了CPU,另外两次由DMA完成)。但是,要做到zero copy,这还差得远。

如果网卡支持gather operation,我们可以通过kernel进一步减少数据的拷贝操作。在2.4及以上版本的linux内核中,开发者修改了socket buffer descriptor来适应这一需求。这个方法不仅减少了context switch,还消除了和CPU有关的数据拷贝。使用层面的使用方法没有变,但是内部原理却发生了变化:

Data copies when transferTo() and gather operations are used

  • transferTo()方法使文件内容被DMA引擎复制到内核缓冲区中。
  • 没有数据被复制到套接字缓冲区中。 相反,只有具有有关数据位置和长度信息的描述符才会附加到套接字缓冲区。 DMA引擎将数据直接从内核缓冲区传递到协议引擎,从而消除了剩余的最终CPU副本。

下面这个例子展示了如何使用transferTo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class TransferToClient {

public static void main(String[] args) throws IOException{
TransferToClient sfc = new TransferToClient();
sfc.testSendfile();
}
public void testSendfile() throws IOException {
String host = "localhost";
int port = 9026;
SocketAddress sad = new InetSocketAddress(host, port);
SocketChannel sc = SocketChannel.open();
sc.connect(sad);
sc.configureBlocking(true);

String fname = "sendfile/NetworkInterfaces.c";
long fsize = 183678375L, sendzise = 4094;

// FileProposerExample.stuffFile(fname, fsize);
FileChannel fc = new FileInputStream(fname).getChannel();
long start = System.currentTimeMillis();
long nsent = 0, curnset = 0;
curnset = fc.transferTo(0, fsize, sc);
System.out.println("total bytes transferred--"+curnset+" and time taken in MS--"+(System.currentTimeMillis() - start));
//fc.close();
}
}

参考资料

Iterable和Iterator结合使用的一个小例子

这篇文章主要是记录一下使用Iterable和Iterator用作迭代处理的一个例子。基于这种模式可以很方便的实现
流式处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Array<T> implements Iterable<T> {
T[] values; // this contains the actual elements of the array

// Constructor that takes a "raw" array and stores it
public Array(T[] values) {
this.values = values;
}

// This is a private class that implements iteration over the elements
// of the list. It is not accessed directly by the user, but is used in
// the iterator() method of the Array class. It implements the hasNext()
// and next() methods.
class ArrayIterator implements Iterator<T> {
int current = 0; // the current element we are looking at

// return whether or not there are more elements in the array that
// have not been iterated over.
public boolean hasNext() {
if (current < Array.this.values.length) {
return true;
} else {
return false;
}
}

// return the next element of the iteration and move the current
// index to the element after that.
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return values[current++];
}
}

// Return the value at a given index
public T get(int index) {
return values[index];
}

// Set the value at a given index
public void set(int index, T value) {
values[index] = value;
}

// Return the length of the array
public int length() {
return values.length;
}

// Return an iterator over the elements in the array. This is generally not
// called directly, but is called by Java when used in a "simple" for loop.
public Iterator<T> iterator() {
return new ArrayIterator();
}

// This is just a sample program that can be run to show how the Array
// class might be used.
public static void main(String[] args) {
// create an array of strings
String[] strings = new String[]{"Hello", "World"};

// create a new array to hold these strings
Array<String> array = new Array<String>(strings);

// get and print the first values (prints "Hello")
System.out.println(array.get(0));

// set the second value
array.set(1, "Javaland!");

// iterate over the array, printing "Hello\nJavaland!"
for (String s : array) {
System.out.println(s);
}
}
}

Guava CacheLoader中当load方法返回null

Guava LoadingCache在实际工作中用的还是比较频繁的。但是最近在review代码时,发现有些同学在使用CacheLoader时没有注意到
CacheLoader#load方法的注释:

1
2
3
4
5
6
7
8
9
10
11
/**
* Computes or retrieves the value corresponding to {@code key}.
*
* @param key the non-null key whose value should be loaded
* @return the value associated with {@code key}; <b>must not be null</b>
* @throws Exception if unable to load the result
* @throws InterruptedException if this method is interrupted. {@code InterruptedException} is
* treated like any other {@code Exception} in all respects except that, when it is caught,
* the thread's interrupt status is set
*/
public abstract V load(K key) throws Exception;

源码中明确指出了这个方法不能返回null。但是在review代码时发现很多同学没注意到到这个,而在部分情况下存在返回null的情况。
一般使用Optional封装一下就好了。

这篇文章主要说一下当load方法返回null时会出现什么异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.util.concurrent.ExecutionException;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;


public class Test {

public static void main(String[] args) {
LoadingCache cache = CacheBuilder.newBuilder().build(new CacheLoader<Object, Object>() {
@Override
public Object load(Object key) {
return null;
}
});

try {
cache.getUnchecked("asda");
}
catch (Exception e) {
System.out.println("本例子中这里会出现异常 这里会cache住抛出异常");
}

try {
cache.get("adsa");
}
catch (ExecutionException e) {
System.out.println("本例子中不会抛出这个异常");
}catch (Exception e) {
System.out.println("本例子中这里会出现异常 这里会cache住抛出异常");
}

System.out.println("fuck");

}
}

上面的代码分别使用了getUncheckedget方法来测试当load方法返回null的情况。

所以一般出现的问题是使用方可能仅仅cache了ExecutionException,这样会导致异常cache不住。这是一个问题,在某些
情况下会影响程序逻辑。需要注意一下。所以尽可能的使用Optional来封装结果

java日志不打印异常栈

问题描述

今天在排查一个问题的时候发现在日志输出中,只有异常的Message,并没有详细的异常堆栈。

问题解释

对于这个问题的官方解释为:

The compiler in the server VM now provides correct stack backtraces for all “cold” built-in exceptions. For performance purposes, when such an exception is thrown a few times, the method may be recompiled. After recompilation, the compiler may choose a faster tactic using preallocated exceptions that do not provide a stack trace. To disable completely the use of preallocated exceptions, use this new flag: -XX:-OmitStackTraceInFastThrow.

简单的描述就是:

它跟JDK5的一个新特性有关,对于一些频繁抛出的异常,JDK为了性能会做一个优化,即JIT重新编译后会抛出没有堆栈的异常,
而在使用-server模式时,该优化选项是开启的,因此在频繁抛出某个异常一段时间后,该优化开始起作用,即只抛出没有堆栈的异常信息

问题验证

比如下面的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class TestCompile {
private static final int count = 1000000;

public static void main(String[] args) throws Exception {
int index = count;
while (index-- > 0) {
try {
work();
}
catch (Exception e) {
e.printStackTrace();
}
}
}

private static void work() {
String value = null;
value.length();
}
}

编译后使用java -server -XX:-OmitStackTraceInFastThrow TestCompile运行,发现一直都是类似的stacktrace。

1
2
3
java.lang.NullPointerException
at TestCompile.work(TestCompile.java:25)
at TestCompile.main(TestCompile.java:17)

换成java -server -XX:+OmitStackTraceInFastThrow TestCompile运行一段时间后就会出现

1
2
3
4
java.lang.NullPointerException
java.lang.NullPointerException
java.lang.NullPointerException
java.lang.NullPointerException

这样的exception,说明stacktrace 该优化已经起作用。-XX:+OmitStackTraceInFastThrow选项在-server情况下默认开启。

如何解决

  • 方法1:查看很早之前的日志,那个时候jit的优化还没生效
  • 方法2:重启服务,在重启的以后的一段时间内jit的优化也暂时不会生效
  • 方法3:配置参数-XX:-OmitStackTraceInFastThrow

log4j-api-2.11.1.jar ClassFormatException

今天在部署系统的时候tomcat出现下面的异常信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Mar 08, 2019 4:18:05 PM org.apache.catalina.startup.ContextConfig processAnnotationsJar
SEVERE: Unable to process Jar entry [META-INF/versions/9/module-info.class] from Jar [jar:file:/xxxxx/webapps/ROOT/WEB-INF/lib/log4j-api-2.11.1.jar!/] for annotations
org.apache.tomcat.util.bcel.classfile.ClassFormatException: Invalid byte tag in constant pool: 19
at org.apache.tomcat.util.bcel.classfile.Constant.readConstant(Constant.java:133)
at org.apache.tomcat.util.bcel.classfile.ConstantPool.<init>(ConstantPool.java:60)
at org.apache.tomcat.util.bcel.classfile.ClassParser.readConstantPool(ClassParser.java:209)
at org.apache.tomcat.util.bcel.classfile.ClassParser.parse(ClassParser.java:119)
at org.apache.catalina.startup.ContextConfig.processAnnotationsStream(ContextConfig.java:2134)
at org.apache.catalina.startup.ContextConfig.processAnnotationsJar(ContextConfig.java:2010)
at org.apache.catalina.startup.ContextConfig.processAnnotationsUrl(ContextConfig.java:1976)
at org.apache.catalina.startup.ContextConfig.processAnnotations(ContextConfig.java:1961)
at org.apache.catalina.startup.ContextConfig.webConfig(ContextConfig.java:1319)
at org.apache.catalina.startup.ContextConfig.configureStart(ContextConfig.java:878)
at org.apache.catalina.startup.ContextConfig.lifecycleEvent(ContextConfig.java:376)
at org.apache.catalina.util.LifecycleSupport.fireLifecycleEvent(LifecycleSupport.java:119)
at org.apache.catalina.util.LifecycleBase.fireLifecycleEvent(LifecycleBase.java:90)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5322)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:150)
at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:901)
at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:877)
at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:633)
at org.apache.catalina.startup.HostConfig.deployDirectory(HostConfig.java:1120)
at org.apache.catalina.startup.HostConfig$DeployDirectory.run(HostConfig.java:1678)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

出现这个异常的原因是因为我们的服务器使用的是java8,而工程代码中依赖的log4j-api-2.11.1.jar有部分java9版本的字节码文件。

log4j-api-2.11.1.jar工程结构图

然后排查了发现log4j-api-2.11.1.jar是通过elasticsearch6.5.1引入的。

java Unsafe介绍

Unsafe类对于并发编程来说是个很重要的类,J.U.C里的源码到处充斥着这个类的方法调用。

这个类的最大的特点在于,它提供了硬件级别的CAS原子操作。CAS可以说是实现了最轻量级的锁,当多个线程尝试使用CAS同时更新同一个变量时,只有其中的一个线程能成功地更新变量的值,而其他的线程将失败。然而,失败的线程并不会被挂起。

CAS操作包含了三个操作数: 需要读写的内存位置,进行比较的原值,拟写入的新值

Unsafe类中,实现CAS操作的方法是: compareAndSwapXXX

例如:

1
public native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
  • obj是我们要操作的目标对象
  • offset表示了目标对象中,对应的属性的内存偏移量
  • expect是进行比较的原值
  • update是拟写入的新值。
  • 所以该方法实现了对目标对象obj中的某个成员变量(field)进行CAS操作的功能。

那么,要怎么获得目标field的内存偏移量offset呢? Unsafe类为我们提供了一个方法:

1
public native long objectFieldOffset(Field field);

该方法的参数是我们要进行CAS操作的field对象,要怎么获得这个field对象呢?最直接的办法就是通过反射了:

1
2
Class<?> k = FutureTask.class;
Field stateField = k.getDeclaredField("state");

这样一波下来,我们就能对FutureTask的state属性进行CAS操作了o( ̄▽ ̄)o

除了compareAndSwapObject,Unsafe类还提供了更为具体的对int和long类型的CAS操作:

1
2
public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
public native boolean compareAndSwapLong(Object obj, long offset, long expect, long update);

从方法签名可以看出,这里只是把目标field的类型限定成int和long类型,而不是通用的Object.

最后,FutureTask还用到了一个方法:

1
public native void putOrderedInt(Object obj, long offset, int value);

可以看出,该方法只有三个参数,所以它没有比较再交换的概念,某种程度上就是一个赋值操作,即设置obj对象中offset偏移地址对应的int类型的field的值为指定值。这其实是Unsafe的另一个方法putIntVolatile的有序或者有延迟的版本,并且不保证值的改变被其他线程立即看到,只有在field被volatile修饰并且期望被意外修改的时候使用才有用。

那么putIntVolatile方法的定义是什么呢?

1
public native void putIntVolatile(Object obj, long offset, int value);

该方法设置obj对象中offset偏移地址对应的整型field的值为指定值,支持volatile store语义。由此可以看出:

当操作的int类型field本身已经被volatile修饰时,putOrderedInt和putIntVolatile是等价的

FutureTask使用和源码解析

本篇文章说一下java.util.concurrent中的FutureTaskFutureTask是一个同步工具类,它实现了Future语义,表示了一种抽象的可生成结果的计算。在包括线程池在内的许多工具类中都会用到,弄懂它的实现将有利于我们更加深入地理解Java异步操作实现。

下面是FutureTask的类图:

FutureTask类图

在分析它的源码之前, 我们需要先了解一些预备知识。本篇我们先来看看FutureTask中所使用到的接口:RunnableCallableFutureRunnableFuture以及所使用到的工具类ExecutorsUnsafe

FutureTask所使用到的接口

Runnable

创建线程最重要的是传递一个run()方法, 这个run方法定义了这个线程要做什么事情, 它被抽象成了Runnable接口:

1
2
3
4
@FunctionalInterface
public interface Runnable {
public abstract void run();
}

从这里可以看出Runnable最大的问题有下面2个:

  • 没有返回值,我们不能从里面返回相关的处理结果
  • 不能抛出checked exception

而这2个问题,导致我们很多时候使用Runnable其实都会丧失很多的灵活性。而为了解决这两个问题,JDK提供了Callable

Callable

1
2
3
4
5
6
7
8
9
10
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

对于Runnable可以知道,Callable解决了Runnable的两个最大的问题。但是Callable自己带来了一个问题:就是如何获取返回值。

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Callable<String> myCallable = () -> "This is the results.";
try {
String result = myCallable.call();
System.out.println("Callable 执行的结果是: " + result);
} catch (Exception e) {
System.out.println("There is a exception.");
}
}

这种方式获取的返回值是在当前线程中同步获取的,这种方法确实可以, 但是它存在几个问题:

  • call方法是在当前线程中直接调用的, 无法利用多线程。
  • call方法可能是一个特别耗时的操作, 这将导致程序停在myCallable.call()调用处, 无法继续运行, 直到call方法返回。
  • 如果call方法始终不返回, 我们没办法中断它的运行。

因此, 理想的操作应当是, 我们将call方法提交给另外一个线程执行, 并在合适的时候, 判断任务是否完成, 然后获取线程的执行结果或者撤销任务, 这种思路的实现就是Future接口:

Future

Future接口被设计用来代表一个异步操作的执行结果。你可以用它来获取一个操作的执行结果、取消一个操作、判断一个操作是否已经完成或者是否被取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns {@code true} if this task was cancelled before it completed
* normally.
*
* @return {@code true} if this task was cancelled before it completed
*/
boolean isCancelled();

/**
* Returns {@code true} if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* {@code true}.
*
* @return {@code true} if this task completed
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future接口一共定义了5个方法:

  • get()
    • 该方法用来获取执行结果, 如果任务还在执行中, 就阻塞等待;
  • get(long timeout, TimeUnit unit)
    • 该方法同get方法类似, 所不同的是, 它最多等待指定的时间, 如果指定时间内任务没有完成, 则会抛出TimeoutException异常;
  • cancel(boolean mayInterruptIfRunning)
    • 该方法用来尝试取消一个任务的执行, 它的返回值是boolean类型, 表示取消操作是否成功。以下三种情况之一的,cancel操作一定是失败的,返回false:
      • 任务已经执行完成了
      • 任务已经被取消过了
      • 任务因为某种原因不能被取消
    • 值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时任务所处的状态
      • 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
      • 如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:
        • 如果mayInterruptIfRunning为true, 则当前在执行的任务会被中断
        • 如果mayInterruptIfRunning为false, 则可以允许正在执行的任务继续运行,直到它执行完
  • isCancelled()
    • 该方法用于判断任务是否被取消了。如果一个任务在正常执行完成之前被cancel掉了, 则返回true
  • isDone()
    • 如果一个任务已经结束, 则返回true。注意, 这里的任务结束包含了以下三种情况:
      • 任务正常执行完毕
      • 任务抛出了异常
      • 任务已经被取消

RunnableFuture

RunnableFuture接口同时实现了Runnable接口和Future接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* A {@link Future} that is {@link Runnable}. Successful execution of
* the {@code run} method causes completion of the {@code Future}
* and allows access to its results.
* @see FutureTask
* @see Executor
* @since 1.6
* @author Doug Lea
* @param <V> The result type returned by this Future's {@code get} method
*/
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask实现了该接口,也就是相当于它同时实现了Runnable接口和Future接口。

FutureTask所使用到的工具类

Executors

Executors 是一个用于创建线程池的工厂类,关于线程池的概念,我们以后再说。这个类同时也提供了一些有用的静态方法。

前面我们提到了Callable接口,它是JDK1.5才引入的,而Runnable接口在JDK1.0就有了,我们有时候需要将一个已经存在Runnable对象转换成Callable对象,Executors工具类为我们提供了这一实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns {@code null}.
* @param task the task to run
* @return a callable object
* @throws NullPointerException if task null
*/
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}

/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}

可以明显看出来,这个方法采用了设计模式中的适配器模式,将一个Runnable类型对象适配成Callable类型。

因为Runnable接口没有返回值, 所以为了与Callable兼容, 我们额外传入了一个result参数, 使得返回的Callable对象的call方法直接执行Runnable的run方法, 然后返回传入的result参数。有的同学要说了, 你把result参数传进去, 又原封不动的返回出来, 有什么意义呀?
这样做确实没什么意义, result参数的存在只是为了将一个Runnable类型适配成Callable类型。

Unsafe

关于Java中的Unsafe,请看我的另外一篇文章:java Unsafe介绍

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class FutureTaskDemo {

private static final ExecutorService executorService = Executors.newSingleThreadExecutor();

public FutureTask<String> getFutureTask() throws ExecutionException, InterruptedException {
return new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("hello");
return "ok";
}
});
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTaskDemo futureTaskDemo = new FutureTaskDemo();

FutureTask<String> futureTask = futureTaskDemo.getFutureTask();
executorService.execute(futureTask);

Thread.sleep(100);
if (futureTask.isDone()) {
System.out.println(futureTask.get());
}
executorService.shutdown();
}
}

源码解析

Java并发工具类的三板斧: 状态,队列,CAS。 以这三个方面为切入点来看源码,有助于我们快速的看清FutureTask的概貌:

FutureTask状态

FutureTask的源代码中定义了如下的7种状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

为确保不同线程对state修改的可见性,所以statevolatile类型。

state属性是贯穿整个FutureTask的最核心的属性,该属性的值代表了任务在运行过程中的状态,随着任务的执行,状态将不断地进行转变,从上面的定义中可以看出,总共有7种状态:包括了1个初始态,2个中间态和4个终止态。虽说状态有这么多,但是状态的转换路径却只有四种:

FutureTask状态流转

  • 任务的初始状态都是NEW, 这一点是构造函数保证的。
  • 任务的终止状态有4种:

    • NORMAL:任务正常执行完毕
    • EXCEPTIONAL:任务执行过程中发生异常
    • CANCELLED:任务被取消
    • INTERRUPTED:任务被中断
  • 任务的中间状态有2种:

    • COMPLETING 正在设置任务结果
    • INTERRUPTING 正在中断运行任务的线程

值得一提的是,任务的中间状态是一个瞬态,它非常的短暂。而且任务的中间态并不代表任务正在执行,而是任务已经执行完了,正在设置最终的返回结果,所以可以这么说:

只要state不处于 NEW 状态,就说明任务已经执行完毕

注意,这里的执行完毕是指传入的Callable对象的call方法执行完毕,或者抛出了异常。所以这里的COMPLETING的名字显得有点迷惑性,它并不意味着任务正在执行中,而意味着call方法已经执行完毕,正在设置任务执行的结果。而将一个任务的状态设置成终止态只有三种方法:

  • set
  • setException
  • cancel

接着我们来看队列,在FutureTask中,队列的实现是一个单向链表,它表示所有等待任务执行完毕的线程的集合。我们知道,FutureTask实现了Future接口,可以获取“Task”的执行结果,那么如果获取结果时,任务还没有执行完毕怎么办呢?那么获取结果的线程就会在一个等待队列中挂起,直到任务执行完毕被唤醒。

在并发编程中使用队列通常是将当前线程包装成某种类型的数据结构扔到等待队列中,我们先来看看队列中的每一个节点是怎么个结构:

1
2
3
4
5
6
7
8
9
10
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

它只包含了一个记录线程的thread属性和指向下一个节点的next属性。FutureTask中的这个单向链表是当做栈来使用的,确切来说是当做Treiber栈来使用的,不了解Treiber栈是个啥的可以简单的把它当做是一个线程安全的栈, 它使用CAS来完成入栈出栈操作。

为啥要使用一个线程安全的栈呢,因为同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,则这些线程就要被包装成WaitNode扔到Treiber栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用CAS操作保证入栈的线程安全,对于出栈的情况也是同理。

由于FutureTask中的队列本质上是一个Treiber栈,那么使用这个队列就只需要一个指向栈顶节点的指针就行了,在FutureTask中,就是waiters属性:

1
2
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事实上,它就是整个单向链表的头节点。综上,FutureTask中所使用的队列的结构如下:

FutureTask中所使用的队列的结构

CAS操作大多数是用来改变状态的,在FutureTask中也不例外。我们一般在静态代码块中初始化需要CAS操作的属性的偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}

从这个静态代码块中我们也可以看出,CAS操作主要针对3个属性,包括staterunnerwaiters,说明这3个属性基本是会被多个线程同时访问的。

  • state属性代表了任务的状态
  • waiters属性代表了指向栈顶节点的指针,这两个我们上面已经分析过了。
  • runner属性代表了执行FutureTask中的“Task”的线程。

为什么需要一个属性来记录执行任务的线程呢?这是为了中断或者取消任务做准备的,只有知道了执行任务的线程是谁,我们才能去中断它。

定义完属性的偏移量之后,接下来就是CAS操作本身了。在FutureTask,CAS操作最终调用的还是Unsafe类的compareAndSwapXXX方法。

FutureTask的核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

可以看出,FutureTask的核心属性只有5个:

  • state 属性代表了任务的状态
  • callable 属性代表了要执行的任务本身,即FutureTask中的“Task”部分。这里之所以用Callable而不用Runnable是因为FutureTask实现了Future接口,需要获取任务的执行结果。
  • outcome 属性代表了任务的执行结果或者抛出的异常,为Object类型,也就是说outcome可以是任意类型的对象,所以当我们将正常的执行结果返回给调用者时,需要进行强制类型转换, 返回由Callable定义的V类型
  • runner 属性代表了执行FutureTask中的“Task”的线程,这是为了中断或者取消任务做准备。
  • waiters 属性代表了指向栈顶节点的指针

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Callable}.
*
* @param callable the callable task
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

FutureTask共有2个构造函数,这2个构造函数一个是直接传入Callable对象, 一个是传入一个Runnable对象和一个指定的result, 然后通过Executors工具类将它适配成callable对象, 所以这两个构造函数的本质是一样的:

  • 用传入的参数初始化callable成员变量
  • 将FutureTask的状态设为NEW

FutureTask接口实现

FutureTask实现了RunnableFuture接口, 因此,它必须实现Runnable和Future接口的所有方法。

Runnable接口实现

要实现Runnable接口, 就得覆写run方法, 我们看看FutureTask的run方法干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
  • 首先检查当前状态是不是New, 并且使用CAS操作将runner属性设置位当前线程,即记录执行任务的线程。可见runner属性是在运行时被初始化的。如果不满足这个条件,则直return。
  • 然后调用Callable对象的call方法来执行任务,如果任务执行成功,就使用set(result)设置结果,否则,用setException(ex)设置抛出的异常。
  • 最后在finally块中,我们将runner属性置为null,并且检查有没有遗漏的中断,如果发现s >= INTERRUPTING, 说明执行任务的线程有可能被中断了,因为s >= INTERRUPTING 只有两种可能,state状态为INTERRUPTINGINTERRUPTED

然后详细说一下其中的set(result)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

这个方法一开始通过CAS操作将state属性由原来的NEW状态修改为COMPLETING状态,我们在一开始介绍state状态的时候说过,COMPLETING是一个非常短暂的中间态,表示正在设置执行的结果。

状态设置成功后,我们就把任务执行结果赋值给outcome, 然后直接把state状态设置成NORMAL,注意,这里是直接设置,没有先比较再设置的操作,由于state属性被设置成volatile,这里putOrderedIntputIntVolatile是等价的,保证了state状态对其他线程的可见性。

在这之后,我们调用了 finishCompletion()来完成执行结果的设置。

接下来我们再来看看发生了异常的版本setException(ex)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

可见,除了将outcome属性赋值为异常对象,以及将state的终止状态修改为EXCEPTIONAL,其余都和set方法类似。在方法的最后,都调用了 finishCompletion()来完成执行结果的设置。那么我们就来看看 finishCompletion()干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

这个方法事实上完成了一个“善后”工作。我们先来看看if条件语句中的CAS操作:

1
UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)

该方法是将waiters属性的值由原值设置为null, 我们知道,waiters属性指向了Treiber栈的栈顶节点,可以说是代表了整个Treiber栈,将该值设为null的目的就是清空整个栈。

然后又进行下一轮for循环,而下一轮for循环的判断条件又是waiters!=null ,由此我们知道,虽然最外层的for循环乍一看好像是什么遍历节点的操作,其实只是为了确保waiters属性被成功设置成null,本质上相当于一个自旋操作

waiters属性设置成null以后,接下了 for (;;)死循环才是真正的遍历节点,可以看出,循环内部就是一个普通的遍历链表的操作,我们前面讲属性的时候说过,Treiber栈里面存放的WaitNode代表了当前等待任务执行结束的线程,这个循环的作用也正是遍历链表中所有等待的线程,并唤醒他们。

Treiber栈中所有挂起的线程都唤醒后,下面就是执行done方法:

1
2
3
4
5
6
7
8
9
10
/**
* Protected method invoked when this task transitions to state
* {@code isDone} (whether normally or via cancellation). The
* default implementation does nothing. Subclasses may override
* this method to invoke completion callbacks or perform
* bookkeeping. Note that you can query status inside the
* implementation of this method to determine whether this task
* has been cancelled.
*/
protected void done() { }

这个方法是一个空方法,从注释上看,它是提供给子类覆写的,以实现一些任务执行结束前的额外操作。
done方法之后就是callable属性的清理了(callable = null)。

然后我们在详细说说finally代码块中的方法。

在finally块中,我们将runner属性置为null,并且检查有没有遗漏的中断,如果发现s >= INTERRUPTING, 说明执行任务的线程有可能被中断了,因为s >= INTERRUPTING 只有两种可能,state状态为INTERRUPTINGINTERRUPTED

有的同学可能就要问了,咱前面已经执行过的set方法或者setException方法不是已经将state状态设置成NORMAL或者EXCEPTIONAL了吗?怎么会出现INTERRUPTING或者INTERRUPTED状态呢?别忘了,咱们在多线程的环境中,在当前线程执行run方法的同时,有可能其他线程取消了任务的执行,此时其他线程就可能对state状态进行改写,这也就是我们在设置终止状态的时候用putOrderedInt方法,而没有用CAS操作的原因——我们无法确信在设置state前是处于COMPLETING中间态还是INTERRUPTING中间态。

接下来我们来看看handlePossibleCancellationInterrupt方法干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Ensures that any interrupt from a possible cancel(true) is only
* delivered to a task while in run or runAndReset.
*/
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt

// assert state == INTERRUPTED;

// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}

可见该方法是一个自旋操作,如果当前的state状态是INTERRUPTING,我们在原地自旋,直到state状态转换成终止态。

至此,run方法的分析就真的结束了。我们来总结一下:

run方法重点做了以下几件事:

  • runner属性设置成当前正在执行run方法的线程
  • 调用callable成员变量的call方法来执行任务
  • 设置执行结果outcome, 如果执行成功, 则outcome保存的就是执行结果;如果执行过程中发生了异常, 则outcome中保存的就是异常,设置结果之前,先将state状态设为中间态COMPLETING
  • outcome的赋值完成后,设置state状态为终止态(NORMAL或者EXCEPTIONAL)
  • 唤醒Treiber栈中所有等待的线程
  • 善后清理(waiters, callable,runner设为null)
  • 检查是否有遗漏的中断,如果有,等待中断状态完成。

这里再插一句,我们前面说“state只要不是NEW状态,就说明任务已经执行完成了”就体现在这里,因为run方法中,我们是在c.call()执行完毕或者抛出了异常之后才开始设置中间态和终止态的。

Future接口实现

Future接口一共定义了5个方法,我们一个个来看:

cancel(boolean mayInterruptIfRunning)

既然上面在分析run方法的最后,我们提到了任务可能被别的线程取消,那我们看看怎么取消一个任务的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

在前面我们已经介绍过Future#cancel方法了:

  • cancel(boolean mayInterruptIfRunning)
    • 该方法用来尝试取消一个任务的执行, 它的返回值是boolean类型, 表示取消操作是否成功。以下三种情况之一的,cancel操作一定是失败的,返回false:
      • 任务已经执行完成了
      • 任务已经被取消过了
      • 任务因为某种原因不能被取消
    • 值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时任务所处的状态
      • 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;
      • 如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:
        • 如果mayInterruptIfRunning为true, 则当前在执行的任务会被中断
        • 如果mayInterruptIfRunning为false, 则可以允许正在执行的任务继续运行,直到它执行完

我们来看看FutureTask是怎么实现cancel方法的这几个规范的:

首先,对于「任务已经执行完成了或者任务已经被取消过了,则cancel操作一定是失败的(返回false)」这两条,是通过简单的判断state值是否为NEW实现的,因为我们前面说过了,只要state不为NEW,说明任务已经执行完毕了。从代码中可以看出,只要state不为NEW,则直接返回false。

如果state还是NEW状态,我们再往下看:

1
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)

这一段是根据mayInterruptIfRunning的值将state的状态由NEW设置成INTERRUPTING或者CANCELLED,当这一操作也成功之后,就可以执行后面的try语句了,但无论怎么,该方法最后都返回了true。

我们再接着看try块干了点啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
try {    // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}

我们知道,runner属性中存放的是当前正在执行任务的线程,因此,这个try块的目的就是中断当前正在执行任务的线程,最后将state的状态设为INTERRUPTED,当然,中断操作完成后,还需要通过finishCompletion()来唤醒所有在Treiber栈中等待的线程。我们现在总结一下,cancel方法实际上完成以下两种状态转换之一:

  • NEW -> CANCELLED (对应于mayInterruptIfRunning=false)
  • NEW -> INTERRUPTING -> INTERRUPTED (对应于mayInterruptIfRunning=true)

对于第一条路径,虽说cancel方法最终返回了true,但它只是简单的把state状态设为CANCELLED,并不会中断线程的执行。但是这样带来的后果是,任务即使执行完毕了,也无法设置任务的执行结果,因为前面分析run方法的时候我们知道,设置任务结果有一个中间态,而这个中间态的设置,是以当前state状态为NEW为前提的。

对于第二条路径,则会中断执行任务的线程,我们在倒回上面的run方法看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

虽然第二条路径中断了当前正在执行的线程,但是,响不响应这个中断是由执行任务的线程自己决定的,更具体的说,这取决于c.call()方法内部是否对中断进行了响应,是否将中断异常抛出。那call方法中是怎么处理中断的呢?从上面的代码中可以看出,catch语句处理了所有的Throwable的异常,这自然也包括了中断异常。

然而,值得一提的是,即使这里进入了catch (Throwable ex){}代码块,setException(ex)的操作一定是失败的,因为在我们取消任务执行的线程中,我们已经先把state状态设为INTERRUPTING了,而setException(ex)的操作要求设置前线程的状态为NEW所以这里响应cancel方法所造成的中断最大的意义不是为了对中断进行处理,而是简单的停止任务线程的执行,节省CPU资源。

那读者可能会问了,既然这个setException(ex)的操作一定是失败的,那放在这里有什么用呢?事实上,这个setException(ex)是用来处理任务自己在正常执行过程中产生的异常的,在我们没有主动去cancel任务时,任务的state状态在执行过程中就会始终是NEW,如果任务此时自己发生了异常,则这个异常就会被setException(ex)方法成功的记录到outcome中。

反正无论如何,run方法最终都会进入finally块,而这时候它会发现s >= INTERRUPTING,如果检测发现s = INTERRUPTING,说明cancel方法还没有执行到中断当前线程的地方,那就等待它将state状态设置成INTERRUPTED。到这里,对cancel方法的分析就和上面对run方法的分析对接上了。

cancel方法到这里就分析完了,如果你一条条的去对照Future接口对于cancel方法的规范,它每一条都是实现了的,而它实现的核心机理,就是对state的当前状态的判断和设置。由此可见,state属性是贯穿整个FutureTask的最核心的属性。

isCancelled()

说完了cancel,我们再来看看 isCancelled()方法,相较而言,它就简单多了:

1
2
3
public boolean isCancelled() {
return state >= CANCELLED;
}

那么state >= CANCELLED包含了那些状态呢,它包括了: CANCELLED INTERRUPTING INTERRUPTED

Future接口对于isCancelled()方法的规范:

该方法用于判断任务是否被取消了。如果一个任务在正常执行完成之前被Cancel掉了, 则返回true

再对比state的状态图:

state的状态图

可见选取这三个状态作为判断依据是很合理的, 因为只有调用了cancel方法,才会使state状态进入这三种状态。

isDone()

与 isCancelled方法类似,isDone方法也是简单地通过state状态来判断。

1
2
3
public boolean isDone() {
return state != NEW;
}

关于这一点,其实我们之前已经说过了,只要state状态不是NEW,则任务已经执行完毕了,因为state状态不存在类似“任务正在执行中”这种状态,即使是短暂的中间态,也是发生在任务已经执行完毕,正在设置任务结果的时候。

get()

最后我们来看看获取执行结果的get方法,先来看看无参的版本:

1
2
3
4
5
6
7
8
9
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

该方法其实很简单,当任务还没有执行完毕或者正在设置执行结果时,我们就使用awaitDone方法等待任务进入终止态,注意,awaitDone的返回值是任务的状态,而不是任务的结果。任务进入终止态之后,我们就根据任务的执行结果来返回计算结果或者抛出异常。

我们先来看看等待任务完成的awaitDone方法,该方法是获取任务结果最核心的方法,它完成了获取结果,挂起线程,响应中断等诸多操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

在具体分析它的源码之前,有一点我们先特别说明一下,FutureTask中会涉及到两类线程,一类是执行任务的线程,它只有一个,FutureTask的run方法就由该线程来执行;一类是获取任务执行结果的线程,它可以有多个,这些线程可以并发执行,每一个线程都是独立的,都可以调用get方法来获取任务的执行结果。如果任务还没有执行完,则这些线程就需要进入Treiber栈中挂起,直到任务执行结束,或者等待的线程自身被中断。

理清了这一点后,我们再来详细看看awaitDone方法。可以看出,该方法的大框架是一个自旋操作,我们一段一段来看:

1
2
3
4
5
6
7
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// ...
}

首先一开始,我们先检测当前线程是否被中断了,这是因为get方法是阻塞式的,如果等待的任务还没有执行完,则调用get方法的线程会被扔到Treiber栈中挂起等待,直到任务执行完毕。但是,如果任务迟迟没有执行完毕,则我们也有可能直接中断在Treiber栈中的线程,以停止等待。

当检测到线程被中断后,我们调用了removeWaiter:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

removeWaiter的作用是将参数中的node从等待队列(即Treiber栈)中移除。如果此时线程还没有进入Treiber栈,则 q=null,那么removeWaiter(q)啥也不干。在这之后,我们就直接抛出了InterruptedException异常。

接着看awaitDone中的for(;;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
for (;;) {
/*if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}*/
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
  • 如果任务已经进入终止态(s > COMPLETING),我们就直接返回任务的状态;
  • 如果任务正在设置执行结果(s == COMPLETING),我们就让出当前线程的CPU资源继续等待
  • 否则,就说明任务还没有执行,或者任务正在执行过程中,那么这时,如果q现在还为null, 说明当前线程还没有进入等待队列,于是我们新建了一个WaitNode, WaitNode的构造函数我们之前已经看过了,就是生成了一个记录了当前线程的节点;
  • 如果q不为null,说明代表当前线程的WaitNode已经被创建出来了,则接下来如果queued=false,表示当前线程还没有入队,所以我们执行了:
1
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

这行代码的作用是通过CAS操作将新建的q节点添加到waiters链表的头节点之前,其实就是Treiber栈的入栈操作,写的还是很简洁的,一行代码就搞定了,如果大家还是觉得晕乎,下面是它等价的伪代码:

1
2
3
4
5
q.next = waiters; //当前节点的next指向目前的栈顶元素
//如果栈顶节点在这个过程中没有变,即没有发生并发入栈的情况
if(waiters的值还是上面q.next所使用的waiters值){
waiters = q; //修改栈顶的指针,指向刚刚入栈的节点
}

这个CAS操作就是为了保证同一时刻如果有多个线程在同时入栈,则只有一个能够操作成功,也即Treiber栈的规范。

如果以上的条件都不满足,则再接下来因为现在是不带超时机制的gettimedfalse,则else if代码块跳过,然后来到最后一个else, 把当前线程挂起,此时线程就处于阻塞等待的状态。

至此,在任务没有执行完毕的情况下,获取任务执行结果的线程就会在Treiber栈中被LockSupport.park(this)挂起了。

那么这个挂起的线程什么时候会被唤醒呢?有两种情况:

  • 任务执行完毕了,在finishCompletion方法中会唤醒所有在Treiber栈中等待的线程
  • 等待的线程自身因为被中断等原因而被唤醒。

我们接下来就继续看看线程被唤醒后的情况,此时,线程将回到for(;;)循环的开头,继续下一轮循环,在复制一下上面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this); // 挂起的线程从这里被唤醒
}

首先自然还是检测中断,所不同的是,此时q已经不为null了,因此在有中断发生的情况下,在抛出中断之前,多了一步removeWaiter(q)操作,该操作是将当前线程从等待的Treiber栈中移除,相比入栈操作,这个出栈操作要复杂一点,这取决于节点是否位于栈顶。下面我们来仔细分析这个出栈操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

首先,我们把要出栈的WaitNodethread属性设置为null, 这相当于一个标记,是我们后面在waiters链表中定位该节点的依据。

  • 要移除的节点就在栈顶
    • 我们先来看看该节点就位于栈顶的情况,这说明在该节点入栈后,并没有别的线程再入栈了。由于一开始我们就将该节点的thread属性设为了null,因此,前面的q.thread != nullpred != null都不满足,我们直接进入到最后一个else if分支:
1
2
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
continue retry;

这一段是栈顶节点出栈的操作,和入栈类似,采用了CAS比较,将栈顶元素设置成原栈顶节点的下一个节点。

值得注意的是,当CAS操作不成功时,程序会回到retry处重来,但即使CAS操作成功了,程序依旧会遍历完整个链表,找寻node.thread == null 的节点,并将它们一并从链表中剔除。

  • 要移除的节点不在栈顶
    • 当要移除的节点不在栈顶时,我们会一直遍历整个链表,直到找到q.thread == null的节点,找到之后,我们将进入
1
2
3
4
5
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}

这是因为节点不在栈顶,则其必然是有前驱节点pred的,这时,我们只是简单的让前驱节点指向当前节点的下一个节点,从而将目标节点从链表中剔除。

注意,后面多加的那个if判断是很有必要的,因为removeWaiter方法并没有加锁,所以可能有多个线程在同时执行,WaitNode的两个成员变量thread和next都被设置成volatile,这保证了它们的可见性,如果我们在这时发现了pred.thread == null,那就意味着它已经被另一个线程标记了,将在另一个线程中被拿出waiters链表,而我们当前目标节点的原后继节点现在是接在这个pred节点上的,因此,如果pred已经被其他线程标记为要拿出去的节点,我们现在这个线程再继续往后遍历就没有什么意义了,所以这时就调到retry处,从头再遍历。

如果pred节点没有被其他线程标记,那我们就接着往下遍历,直到整个链表遍历完。

至此,将节点从waiters链表中移除的removeWaiter操作我们就分析完了,我们总结一下该方法:

在该方法中,会传入一个需要移除的节点,我们会将这个节点的thread属性设置成null,以标记该节点。然后无论如何,我们会遍历整个链表,清除那些被标记的节点(只是简单的将节点从链表中剔除)。如果要清除的节点就位于栈顶,则还需要注意重新设置waiters的值,指向新的栈顶节点。所以可以看出,虽说removeWaiter方法传入了需要剔除的节点,但是 事实上它可能剔除的不止是传入的节点,而是所有已经被标记了的节点,这样不仅清除操作容易了些(不需要专门去定位传入的node在哪里),而且提升了效率(可以同时清除所有已经被标记的节点)。

我们再回到awaitDone方法里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q); // 刚刚分析到这里了,我们接着往下看
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

如果线程不是因为中断被唤醒,则会继续往下执行,此时会再次获取当前的state状态。所不同的是,此时q已经不为null, queued已经为true了,所以已经不需要将当前节点再入waiters栈了。

至此我们知道,除非被中断,否则get方法会在原地自旋等待(用的是Thread.yield,对应于s == COMPLETING)或者直接挂起(对应任务还没有执行完的情况),直到任务执行完成。而我们前面分析run方法和cancel方法的时候知道,在run方法结束后,或者cancel方法取消完成后,都会调用finishCompletion()来唤醒挂起的线程,使它们得以进入下一轮循环,获取任务执行结果。

最后,等awaitDone函数返回后,get方法返回了report(s),以根据任务的状态,汇报执行结果:

1
2
3
4
5
6
7
8
9
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

可见,report方法非常简单,它根据当前state状态,返回正常执行的结果,或者抛出指定的异常。

至此,get方法就分析结束了。

值得注意的是,awaitDone方法和get方法都没有加锁,这在多个线程同时执行get方法的时候会不会产生线程安全问题呢?通过查看方法内部的参数我们知道,整个方法内部用的大多数是局部变量,因此不会产生线程安全问题,对于全局的共享变量waiters的修改时,也使用了CAS操作,保证了线程安全,而state变量本身是volatile的,保证了读取时的可见性,因此整个方法调用虽然没有加锁,它仍然是线程安全的。

get(long timeout, TimeUnit unit)

最后我们来看看带超时版本的get方法:

1
2
3
4
5
6
7
8
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

它和上面不带超时时间的get方法很类似,只是在awaitDone方法中多了超时检测:

1
2
3
4
5
6
7
8
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}

即,如果指定的超时时间到了,则直接返回,如果返回时,任务还没有进入终止状态,则直接抛出TimeoutException异常,否则就像get()方法一样,正常的返回执行结果。

总结

FutureTask实现了Runnable和Future接口,它表示了一个带有任务状态和任务结果的任务,它的各种操作都是围绕着任务的状态展开的。

值得注意的是,在所有的7个任务状态中,只要不是NEW状态,就表示任务已经执行完毕或者不再执行了,并没有表示“任务正在执行中”的状态。

除了代表了任务的Callable对象、代表任务执行结果的outcome属性,FutureTask还包含了一个代表所有等待任务结束的线程的Treiber栈,这一点其实和各种锁的等待队列特别像,即如果拿不到锁,则当前线程就会被扔进等待队列中;这里则是如果任务还没有执行结束,则所有等待任务执行完毕的线程就会被扔进Treiber栈中,直到任务执行完毕了,才会被唤醒。

FutureTask虽然为我们提供了获取任务执行结果的途径,遗憾的是,在获取任务结果时,如果任务还没有执行完成,则当前线程会自旋或者挂起等待,这和我们实现异步的初衷是相违背的,我们后面将继续介绍另一个同步工具类CompletableFuture, 它解决了这个问题。

参考资料

类加载器那些事儿(一)

在之前的文章《Java类的生命周期》我们谈了一下类的生命周期。
在这篇文章中,我们谈谈java的类加载器哪些事情。从下面的JVM架构图可以看到

JVM架构图

class Loader subSystem负责管理和维护java类的生命周期的前三个阶段:

  • 加载
  • 链接
  • 初始化

当我们编写一个java的源文件后,我们对这个xxx.java编译会得到xxx.class的字节码文件,因为jvm只能运行字节码文件。为了能够使用这个class字节码文件,我们就会用到java中的ClassLoader。 而我们这篇文章就来说说java类加载器的那些事情。

ClassLoader是什么

ClassLoader顾名思义就是用来加载Class的。它负责将Class的字节码形式转换成内存形式的Class对象。

类的加载方式比较灵活,我们最常用的加载方式有下面几种:

  • 一种是根据类的全路径名找到相应的class文件,然后从class文件中读取文件内容;
  • 另一种是从jar文件中读取
  • 从网络中获取,比如早期的Applet
  • 基于字节码生成技术生成的代理类

字节码的本质就是一个字节数组(byte[]),它有特定的复杂的内部格式。因为字节码文件有一定的格式,而且由ClassLoader进行加载,那么我们其实可以通过定制ClassLoader来实现字节码加密,原理很简单:

  • 加密:对java源代码进行编译得到字节码文件,然后使用某种算法对字节码文件进行加密
  • 解密:定制的ClassLoader会先使用加密算法对应的解密算法对加密的字节码文件进行解密,然后使用在正常加载jvm标准的字节码格式文件。

3个重要的ClassLoader

在上面的JVM架构图中,我们可以看到在类的加载阶段有3个重要的ClassLoader,下面分别介绍一下这3个比较重要的ClassLoader。

启动类加载器(BootstrapClassLoader)

这个类加载器负责加载JVM运行时核心类, 将<JAVA_HOME>\lib目录下的核心类库或-Xbootclasspath参数指定的路径下的jar包加载到虚拟机内存中,这个 ClassLoader比较特殊,它是由C/C++代码实现的,我们将它称之为「根加载器」。此类加载器并不继承于java.lang.ClassLoader,不能被java程序直接调用。

注意必由于虚拟机是按照文件名识别加载jar包的,如rt.jar,如果文件名不被虚拟机识别,即使把jar包丢到lib目录下也是没有作用的(出于安全考虑,Bootstrap启动类加载器只加载包名为java、javax、sun等开头的类)。

扩展类加载器(ExtensionClassLoader)

这个类加载器sun.misc.Launcher$ExtClassLoader由Java语言实现的,是Launcher的静态内部类, 它负责加载<JAVA_HOME>/lib/ext目录下或者由系统变量-Djava.ext.dir指定位路径中的类库,开发者可以直接使用使用这个类加载器。

常见的比如 swing 系列、内置的 js 引擎、xml 解析器等等都是由这个类加载器加载的, 这些库名通常以javax开头,它们的jar包位于<JAVA_HOME>\lib\ext目录下的类库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//ExtClassLoader类中获取路径的代码
private static File[] getExtDirs() {
//加载<JAVA_HOME>/lib/ext目录中的类库
String s = System.getProperty("java.ext.dirs");
File[] dirs;
if (s != null) {
StringTokenizer st =
new StringTokenizer(s, File.pathSeparator);
int count = st.countTokens();
dirs = new File[count];
for (int i = 0; i < count; i++) {
dirs[i] = new File(st.nextToken());
}
} else {
dirs = new File[0];
}
return dirs;
}

应用程序类加载器(AppClassLoader)

sun.misc.Launcher$AppClassLoader才是直接面向我们用户的加载器,它负责加载系统类路径java -classpath-Djava.class.path指定路径下的类库,也就是我们经常用到的classpath路径jar包和目录。我们自己编写的代码以及使用的第三方 jar 包通常都是由它来加载的。开发者可以直接使用系统类加载器, 这个类加载器是CLassLoader中的getSystemClassLoader()方法的返回值, 所以也称为系统类加载器.一般情况下这就是系统默认的类加载器. 当我们的 main 方法执行的时候,这第一个用户类的加载器就是AppClassLoader

那些位于网络上静态文件服务器提供的jar包和class文件,jdk 内置了一个URLClassLoader,用户只需要传递规范的网络路径给构造器,就可以使用 URLClassLoader 来加载远程类库了。URLClassLoader不但可以加载远程类库,还可以加载本地路径的类库,取决于构造器中不同的地址形式。

ExtensionClassLoaderAppClassLoader都是URLClassLoader的子类,它们都是从本地文件系统里加载类库。

ClassLoader之间的层级关系

1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class Class {

// Initialized in JVM not by private constructor
// This field is filtered from reflection access, i.e. getDeclaredField
// will throw NoSuchFieldException
private final ClassLoader classLoader;
}
public abstract class ClassLoader {
// The parent class loader for delegation
// Note: VM hardcoded the offset of this field, thus all new fields
// must be added *after* it.
private final ClassLoader parent;
}

我们翻看jdk的代码会发现:

  • ClassLoader是一个抽象类
  • 每一个ClassLoader都有一个父ClassLoader的引用
  • 每一个Class中都有一个标记自己是哪个ClassLoader加载的属性

我们编写下面的测试代码:

1
2
3
4
5
6
7
8
9
public class TestClassLoader {

public static void main(String[] args) {
ClassLoader loader = TestClassLoader.class.getClassLoader();
System.out.println(loader.toString());
System.out.println(loader.getParent().toString());
System.out.println(loader.getParent().getParent());
}
}

输出结果:

1
2
3
sun.misc.Launcher$AppClassLoader@500c05c2
sun.misc.Launcher$ExtClassLoader@454e2c9c
null

从日志输出我们可以看出,我们的TestClassLoader是由AppClassLoader加载的,AppClassLoader的父ClassLoader是ExtClassLoader,而ExtClassLoader
父ClassLoader是null,jvm约定当ClassLoader#getParent()返回时null的话,就默认使用启动类加载器作为父加载器.下面是ClassLoader.java中的关于getParent方法的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Returns the parent class loader for delegation. Some implementations may
* use <tt>null</tt> to represent the bootstrap class loader. This method
* will return <tt>null</tt> in such implementations if this class loader's
* parent is the bootstrap class loader.
*
* <p> If a security manager is present, and the invoker's class loader is
* not <tt>null</tt> and is not an ancestor of this class loader, then this
* method invokes the security manager's {@link
* SecurityManager#checkPermission(java.security.Permission)
* <tt>checkPermission</tt>} method with a {@link
* RuntimePermission#RuntimePermission(String)
* <tt>RuntimePermission("getClassLoader")</tt>} permission to verify
* access to the parent class loader is permitted. If not, a
* <tt>SecurityException</tt> will be thrown. </p>
*
* @return The parent <tt>ClassLoader</tt>
*
* @throws SecurityException
* If a security manager exists and its <tt>checkPermission</tt>
* method doesn't allow access to this class loader's parent class
* loader.
*
* @since 1.2
*/
@CallerSensitive
public final ClassLoader getParent() {
if (parent == null)
return null;
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Check access to the parent class loader
// If the caller's class loader is same as this class loader,
// permission check is performed.
checkClassLoaderPermission(parent, Reflection.getCallerClass());
}
return parent;
}

因此我们可以给出ClassLoader的继承关系图:

双亲委派模型

程序在运行过程中,遇到了一个未知的类,它会选择哪个 ClassLoader 来加载它呢?

虚拟机的策略是: 使用调用者Class对象的ClassLoader来加载当前未知的类。

何为调用者 Class 对象?就是在遇到这个未知的类时,虚拟机肯定正在运行一个方法调用(静态方法或者实例方法),这个方法挂在哪个类上面,那这个类就是调用者 Class对象。前面我们提到每个Class对象里面都有一个 classLoader 属性记录了当前的类是由谁来加载的。

但是在加载的过程中,并不是直接加载的,而是会有一个层级查找关系在,这也就是所谓的「双亲委派模型」。

我们可以看一下ClassLoader的源代码来确认这一点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* Loads the class with the specified <a href="#name">binary name</a>.
* This method searches for classes in the same manner as the {@link
* #loadClass(String, boolean)} method. It is invoked by the Java virtual
* machine to resolve class references. Invoking this method is equivalent
* to invoking {@link #loadClass(String, boolean) <tt>loadClass(name,
* false)</tt>}.
*
* @param name
* The <a href="#name">binary name</a> of the class
*
* @return The resulting <tt>Class</tt> object
*
* @throws ClassNotFoundException
* If the class was not found
*/
public Class<?> loadClass(String name) throws ClassNotFoundException {
return loadClass(name, false);
}

/**
* Loads the class with the specified <a href="#name">binary name</a>. The
* default implementation of this method searches for classes in the
* following order:
*
* <ol>
*
* <li><p> Invoke {@link #findLoadedClass(String)} to check if the class
* has already been loaded. </p></li>
*
* <li><p> Invoke the {@link #loadClass(String) <tt>loadClass</tt>} method
* on the parent class loader. If the parent is <tt>null</tt> the class
* loader built-in to the virtual machine is used, instead. </p></li>
*
* <li><p> Invoke the {@link #findClass(String)} method to find the
* class. </p></li>
*
* </ol>
*
* <p> If the class was found using the above steps, and the
* <tt>resolve</tt> flag is true, this method will then invoke the {@link
* #resolveClass(Class)} method on the resulting <tt>Class</tt> object.
*
* <p> Subclasses of <tt>ClassLoader</tt> are encouraged to override {@link
* #findClass(String)}, rather than this method. </p>
*
* <p> Unless overridden, this method synchronizes on the result of
* {@link #getClassLoadingLock <tt>getClassLoadingLock</tt>} method
* during the entire class loading process.
*
* @param name
* The <a href="#name">binary name</a> of the class
*
* @param resolve
* If <tt>true</tt> then resolve the class
*
* @return The resulting <tt>Class</tt> object
*
* @throws ClassNotFoundException
* If the class could not be found
*/
protected Class<?> loadClass(String name, boolean resolve)
throws ClassNotFoundException
{
synchronized (getClassLoadingLock(name)) {
// First, check if the class has already been loaded
Class<?> c = findLoadedClass(name);
if (c == null) {
long t0 = System.nanoTime();
try {
if (parent != null) {
c = parent.loadClass(name, false);
} else {
c = findBootstrapClassOrNull(name);
}
} catch (ClassNotFoundException e) {
// ClassNotFoundException thrown if class not found
// from the non-null parent class loader
}

if (c == null) {
// If still not found, then invoke findClass in order
// to find the class.
long t1 = System.nanoTime();
c = findClass(name);

// this is the defining class loader; record the stats
sun.misc.PerfCounter.getParentDelegationTime().addTime(t1 - t0);
sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
sun.misc.PerfCounter.getFindClasses().increment();
}
}
if (resolve) {
resolveClass(c);
}
return c;
}
}

从上面的代码我们就可以看到protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException函数实现了「双亲委派」。简单描述如下:

  • 检查一下指定名称的类是否已经加载过,如果加载过了,就不需要再加载,直接返回。
  • 如果此类没有加载过,那么,再判断一下是否有父加载器;如果有父加载器,则由父加载器加载(即调用parent.loadClass(name, false);).或者是调用bootstrap类加载器来加载。
  • 如果父加载器及bootstrap类加载器都没有找到指定的类,那么调用当前类加载器的findClass方法来完成类加载。

换句话说,如果自定义类加载器,就必须重写findClass方法!

「双亲委派模型」是一种组织类加载器之间关系的一种规范,他的工作原理是:
如果一个类加载器收到了类加载的请求,它不会自己去尝试加载这个类,而是把这个请求委派给父类加载器去完成,这样层层递进,最终所有的加载请求都被传到最顶层的启动类加载器中,只有当父类加载器无法完成这个加载请求(它的搜索范围内没有找到所需的类)时,才会交给子类加载器去尝试加载.

从上面的分析我们可以知道:一般情况下,我们编写的java代码所有延迟加载的类都会由初始调用main方法的这个ClassLoader全全负责,它就是AppClassLoader

为什么需要双亲委派模型

比如java.lang.Object,它存放在\jre\lib\rt.jar中,它是所有java类的父类,因此无论哪个类加载都要加载这个类,最终所有的加载请求都汇总到顶层的启动类加载器中,因此Object类会由启动类加载器来加载,所以加载的都是同一个类,如果不使用双亲委派模型,由各个类加载器自行去加载的话,系统中就会出现不止一个Object类,应用程序就会全乱了

因为在JVM中,判断一个对象是否是某个类型时,如果该对象的实际类型与待比较的类型的类加载器不同,那么会返回false。

举个简单例子:

ClassLoader1、ClassLoader2都加载java.lang.String类,对应Class1、Class2对象。那么Class1对象不属于ClassLoad2对象加载的java.lang.String类型。

这样的好处是: java类随着它的类加载器一起具备了带有优先级的层次关系。

双亲委派规则可能会变成三亲委派,四亲委派,取决于你使用的父加载器是谁,它会一直递归委派到根加载器。只是一般我们习惯称为「双亲委派」。

延迟加载

JVM具体什么加载类,需要按照jvm的实现来说的。不过我们平时用的Hotspot虚拟机,运行并不是一次性加载所需要的全部类的,它是按需加载,也就是延迟加载。程序在运行的过程中会逐渐遇到很多不认识的新类,这时候就会调用 ClassLoader 来加载这些类。加载完成后就会将 Class 对象存在 ClassLoader 里面,下次就不需要重新加载了。

ClassLoader的相关核心方法

loadClass()

loadClass()方法是加载目标类的入口,在这个方法内部实现了「双亲委派模型」。它首先会查找当前 ClassLoader以及它的双亲里面是否已经加载了目标类,如果没有找到就会让双亲尝试加载,如果双亲都加载不了,就会调用findClass() 让自定义加载器自己来加载目标类。ClassLoader 的findClass()方法是需要子类来覆盖的,不同的加载器将使用不同的逻辑来获取目标类的字节码。拿到这个字节码之后再调用defineClass()方法将字节码转换成Class对象。

下面这个图还是画的比较形象的:

ClassLoader.loadClass()这是一个实例方法,需要一个ClassLoader对象来调用该方法,该方法将Class文件加载到内存时,并不会执行类的初始化,直到这个类第一次使用时才进行初始化.该方法因为需要得到一个ClassLoader对象,所以可以根据需要指定使用哪个类加载器.

1
2
ClassLoader cl= …….;
cl.loadClass(“com.wang.HelloWorld”);

提到这个ClassLoader.loadClass()方法,一般就需要提一下Class类的forName方法。

Class.forname()

Class.forname():是一个静态方法, 根据传入的类的全限定名返回一个Class对象.该方法在将Class文件加载到内存的同时,会执行类的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Returns the {@code Class} object associated with the class or
* interface with the given string name. Invoking this method is
* equivalent to:
*
* <blockquote>
* {@code Class.forName(className, true, currentLoader)}
* </blockquote>
*
* where {@code currentLoader} denotes the defining class loader of
* the current class.
*
* <p> For example, the following code fragment returns the
* runtime {@code Class} descriptor for the class named
* {@code java.lang.Thread}:
*
* <blockquote>
* {@code Class t = Class.forName("java.lang.Thread")}
* </blockquote>
* <p>
* A call to {@code forName("X")} causes the class named
* {@code X} to be initialized.
*
* @param className the fully qualified name of the desired class.
* @return the {@code Class} object for the class with the
* specified name.
* @exception LinkageError if the linkage fails
* @exception ExceptionInInitializerError if the initialization provoked
* by this method fails
* @exception ClassNotFoundException if the class cannot be located
*/
@CallerSensitive
public static Class<?> forName(String className)
throws ClassNotFoundException {
Class<?> caller = Reflection.getCallerClass();
return forName0(className, true, ClassLoader.getClassLoader(caller), caller);
}

比如当我们在使用jdbc驱动时,经常会使用 Class.forName 方法来动态加载驱动类。

1
Class.forName("com.mysql.cj.jdbc.Driver");

其原理是 mysql 驱动的Driver类里有一个静态代码块,它会在 Driver 类被加载的时候执行。这个静态代码块会将 mysql 驱动实例注册到全局的 jdbc 驱动管理器里。

1
2
3
4
5
6
7
8
9
10
class Driver {
static {
try {
java.sql.DriverManager.registerDriver(new Driver());
} catch (SQLException E) {
throw new RuntimeException("Can't register driver!");
}
}
...
}

forName方法同样也是使用调用者Class对象的ClassLoader来加载目标类。不过 forName还提供了多参数版本,可以指定使用哪个ClassLoader来加载:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
/**
* Returns the {@code Class} object associated with the class or
* interface with the given string name, using the given class loader.
* Given the fully qualified name for a class or interface (in the same
* format returned by {@code getName}) this method attempts to
* locate, load, and link the class or interface. The specified class
* loader is used to load the class or interface. If the parameter
* {@code loader} is null, the class is loaded through the bootstrap
* class loader. The class is initialized only if the
* {@code initialize} parameter is {@code true} and if it has
* not been initialized earlier.
*
* <p> If {@code name} denotes a primitive type or void, an attempt
* will be made to locate a user-defined class in the unnamed package whose
* name is {@code name}. Therefore, this method cannot be used to
* obtain any of the {@code Class} objects representing primitive
* types or void.
*
* <p> If {@code name} denotes an array class, the component type of
* the array class is loaded but not initialized.
*
* <p> For example, in an instance method the expression:
*
* <blockquote>
* {@code Class.forName("Foo")}
* </blockquote>
*
* is equivalent to:
*
* <blockquote>
* {@code Class.forName("Foo", true, this.getClass().getClassLoader())}
* </blockquote>
*
* Note that this method throws errors related to loading, linking or
* initializing as specified in Sections 12.2, 12.3 and 12.4 of <em>The
* Java Language Specification</em>.
* Note that this method does not check whether the requested class
* is accessible to its caller.
*
* <p> If the {@code loader} is {@code null}, and a security
* manager is present, and the caller's class loader is not null, then this
* method calls the security manager's {@code checkPermission} method
* with a {@code RuntimePermission("getClassLoader")} permission to
* ensure it's ok to access the bootstrap class loader.
*
* @param name fully qualified name of the desired class
* @param initialize if {@code true} the class will be initialized.
* See Section 12.4 of <em>The Java Language Specification</em>.
* @param loader class loader from which the class must be loaded
* @return class object representing the desired class
*
* @exception LinkageError if the linkage fails
* @exception ExceptionInInitializerError if the initialization provoked
* by this method fails
* @exception ClassNotFoundException if the class cannot be located by
* the specified class loader
*
* @see java.lang.Class#forName(String)
* @see java.lang.ClassLoader
* @since 1.2
*/
@CallerSensitive
public static Class<?> forName(String name, boolean initialize,
ClassLoader loader)
throws ClassNotFoundException
{
Class<?> caller = null;
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Reflective call to get caller class is only needed if a security manager
// is present. Avoid the overhead of making this call otherwise.
caller = Reflection.getCallerClass();
if (sun.misc.VM.isSystemDomainLoader(loader)) {
ClassLoader ccl = ClassLoader.getClassLoader(caller);
if (!sun.misc.VM.isSystemDomainLoader(ccl)) {
sm.checkPermission(
SecurityConstants.GET_CLASSLOADER_PERMISSION);
}
}
}
return forName0(name, initialize, loader, caller);
}

通过这种形式的forName方法可以突破内置加载器的限制,通过使用自定类加载器允许我们自由加载其它任意来源的类库。根据ClassLoader的传递性,目标类库传递引用到的其它类库也将会使用自定义加载器加载。

Class.forNameClassLoader.loadClass都可以用来加载目标类,它们之间有一个小小的区别,那就是Class.forName()方法可以获取原生类型的Class,而ClassLoader.loadClass()则会报错:

1
2
3
4
5
6
7
8
9
10
Class<?> x = Class.forName("[I");
System.out.println(x);

x = ClassLoader.getSystemClassLoader().loadClass("[I");
System.out.println(x);

---------------------
class [I

Exception in thread "main" java.lang.ClassNotFoundException: [I

findClass()

在上面的「双亲委派模型」小节中,我们从ClassLoader类的源代码分析了,loadClass()方法在父加载器无法加载类的时候,就会调用我们自定义的类加载器中的findeClass()函数, 这样就可以保证自定义的类加载器也符合「双亲委派」。

如果想实现自定义的ClassLoader,那么必须实现findClass()方法,而ClassLoader中的默认实现为直接抛出ClassNotFoundException异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Finds the class with the specified <a href="#name">binary name</a>.
* This method should be overridden by class loader implementations that
* follow the delegation model for loading classes, and will be invoked by
* the {@link #loadClass <tt>loadClass</tt>} method after checking the
* parent class loader for the requested class. The default implementation
* throws a <tt>ClassNotFoundException</tt>.
*
* @param name
* The <a href="#name">binary name</a> of the class
*
* @return The resulting <tt>Class</tt> object
*
* @throws ClassNotFoundException
* If the class could not be found
*
* @since 1.2
*/
protected Class<?> findClass(String name) throws ClassNotFoundException {
throw new ClassNotFoundException(name);
}

defineClass(byte[] b, int off, int len)

defineClass()方法是用来将byte字节流解析成JVM能够识别的Class对象。在ClassLoader中已实现该方法逻辑,通过这个方法不仅能够通过class文件实例化class对象,也可以通过其他方式实例化class对象,如通过网络接收一个类的字节码,然后转换为byte字节流创建对应的Class对象,defineClass()方法通常与findClass()方法一起使用。

一般情况下,在自定义类加载器时,会直接覆盖ClassLoaderfindClass()方法并编写加载规则,取得要加载类的字节码后转换成流,然后调用defineClass()方法生成类的Class对象,简单例子如下:

1
2
3
4
5
6
7
8
9
10
protected Class<?> findClass(String name) throws ClassNotFoundException {
// 获取类的字节数组
byte[] classData = getClassData(name);
if (classData == null) {
throw new ClassNotFoundException();
} else {
//使用defineClass生成class对象
return defineClass(name, classData, 0, classData.length);
}
}

在下面的「自定义类加载器」小节中也会介绍这个方法的使用。

需要注意的是,如果直接调用defineClass()方法生成类的Class对象,这个类的Class对象并没有解析(也可以理解为链接阶段,毕竟解析是链接的最后一步),其解析操作需要等待初始化阶段进行。

关于java类的生命周期,如果不了解的话,建议看看之前的文章《Java类的生命周期》

resolveClass(Class≺?≻ c)

使用该方法可以使用类的Class对象创建完成也同时被解析

上述4个方法是ClassLoader类中的比较重要的方法,也是我们可能会经常用到的方法。

SercureClassLoader扩展了ClassLoader,新增了几个与使用相关的代码源(对代码源的位置及其证书的验证)和权限定义类验证(主要指对class源码的访问权限)的方法,一般我们不会直接跟这个类打交道,更多是与它的子类URLClassLoader有所关联.

前面说过,ClassLoader是一个抽象类,很多方法是空的没有实现,比如 findClass()、findResource()等。而URLClassLoader这个实现类为这些方法提供了具体的实现,并新增了URLClassPath类协助取得Class字节码流等功能,在编写自定义类加载器时,如果没有太过于复杂的需求,可以直接继承URLClassLoader类,这样就可以避免自己去编写findClass()方法及其获取字节码流的方式,使自定义类加载器编写更加简洁。

class文件的显示加载与隐式加载的概念

  • 显示加载 指的是在代码中通过调用ClassLoader加载class对象,如直接使用Class.forName(name)this.getClass().getClassLoader().loadClass()加载class对象。
  • 隐式加载则是不直接在代码中调用ClassLoader的方法加载class对象,而是通过虚拟机自动加载到内存中,如在加载某个类的class文件时,该类的class文件中引用了另外一个类的对象,此时额外引用的类将通过JVM自动加载到内存中。

自定义类加载器

下面写一个简单的自定义类加载的例子

首先我们编写一个简单的java类,这个类就是后面需要被我们的自定义类加载器加载的类:

1
2
3
4
5
6
7
8
package xyz.xkrivzooh;

public class HelloWorld {

public void sayHello() {
System.out.println("hello " + this.getClass().getClassLoader().toString());
}
}

我们使用javac编译后,将得到的HelloWorld.class文件。此处我们想一下,如果我们把这个字节码文件放置在zai当前的项目中的话,那么根据「双亲委派模型」可知这个字节码文件将会被sun.misc.Launcher$AppClassLoader类加载器加载,为了让我们自定义的类加载器加载,我们把HelloWorld.class文件放入到其他目录。

1
2
3
4
~ » tree xyz
xyz
└── xkrivzooh
└── HelloWorld.class

然后编写我们自定义的类加载器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package xyz.xkrivzooh;

import java.io.File;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Paths;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

public class CustomerClassLoader extends ClassLoader {

private final String classPath;

public CustomerClassLoader(String classPath) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(classPath));
this.classPath = classPath;
}

@Override
protected Class<?> findClass(String name) throws ClassNotFoundException {
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
try {
String path = name.replaceAll("\\.", "/");
byte[] bytes = Files.readAllBytes(Paths.get(classPath + File.separator + path + ".class"));
return defineClass(name, bytes, 0, bytes.length);
}
catch (Exception e) {
throw new ClassNotFoundException(e.getMessage(), e);
}
}

public static void main(String[] args) throws Exception{
CustomerClassLoader customerClassLoader = new CustomerClassLoader("/Users/rollenholt");
Class<?> aClass = customerClassLoader.loadClass("xyz.xkrivzooh.HelloWorld");
Object instance = aClass.newInstance();
Method sayHello = aClass.getDeclaredMethod("sayHello", null);
sayHello.invoke(instance, null);
}
}

输出结果为

1
hello xyz.xkrivzooh.CustomerClassLoader@4d405ef7

从上的例子我们可以看出,我们自定义的类加载器运行是没问题的。

我们平时在自定义类加载器的时候需要注意的是不要轻易的去破坏双亲委派模型,也就是不要去覆盖loadClass方法,除非你明确知道你在做什么

因为这样就可以导致导致自定义加载器无法加载内置的核心类库。在使用自定义加载器时,要明确好它的父加载器是谁,将父加载器通过子类的构造器传入。如果父类加载器是 null,那就表示父加载器是「根加载器」BootstrapClassLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Creates a new class loader using the specified parent class loader for
* delegation.
*
* <p> If there is a security manager, its {@link
* SecurityManager#checkCreateClassLoader()
* <tt>checkCreateClassLoader</tt>} method is invoked. This may result in
* a security exception. </p>
*
* @param parent
* The parent class loader
*
* @throws SecurityException
* If a security manager exists and its
* <tt>checkCreateClassLoader</tt> method doesn't allow creation
* of a new class loader.
*
* @since 1.2
*/
protected ClassLoader(ClassLoader parent) {
this(checkCreateClassLoader(), parent);
}

钻石依赖

项目管理上有一个著名的概念叫着「钻石依赖」,是指软件依赖导致同一个软件包的两个版本需要共存而不能冲突。

maven是这样解决钻石依赖的: 它会从多个冲突的版本中选择一个来使用,如果不同的版本之间兼容性很糟糕,那么程序将无法正常编译运行。Maven 这种形式叫「扁平化」依赖管理。

使用ClassLoader可以解决钻石依赖问题。不同版本的软件包使用不同的 ClassLoader 来加载,位于不同ClassLoader中名称一样的类实际上是不同的类。

我们通过下面的代码来验证这个问题:

首先准备下面的环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
~/xyz/xkrivzooh » tree .
.
├── v1
│   ├── Test.class
│   └── Test.java
└── v2
├── Test.class
└── Test.java

2 directories, 4 files
------------------------------------------------------------
~/xyz/xkrivzooh » cat v1/Test.java

public class Test {
public void sayHello() {
System.out.println("v1");
}
}

------------------------------------------------------------
~/xyz/xkrivzooh » cat v2/Test.java

public class Test {
public void sayHello() {
System.out.println("v2");
}
}

------------------------------------------------------------

然后使用测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package xyz.xkrivzooh;

import java.net.URL;
import java.net.URLClassLoader;

public class Test {
public static void main(String[] args) throws Exception {

String dir1 = "file:///Users/rollenholt/xyz/xkrivzooh/v1/";
String dir2 = "file:///Users/rollenholt/xyz/xkrivzooh/v2/";
URLClassLoader classLoader1 = new URLClassLoader(new URL[] {new URL(dir1)});
URLClassLoader classLoader2 = new URLClassLoader(new URL[] {new URL(dir2)});

Class<?> aClass1 = classLoader1.loadClass("Test");
Object instance1 = aClass1.newInstance();
aClass1.getDeclaredMethod("sayHello", null).invoke(instance1, null);

Class<?> aClass2 = classLoader2.loadClass("Test");
Object instance2 = aClass2.newInstance();
aClass2.getDeclaredMethod("sayHello", null).invoke(instance2, null);

System.out.println(aClass1.equals(aClass2));
System.out.println(instance1.equals(instance2));


URLClassLoader classLoader3 = new URLClassLoader(new URL[] {new URL(dir1)});
Class<?> aClass3 = classLoader3.loadClass("Test");
Object instance3 = aClass3.newInstance();
aClass3.getDeclaredMethod("sayHello", null).invoke(instance3, null);

System.out.println(aClass3.equals(aClass1));
System.out.println(instance3.equals(instance1));
}
}

程序运行输出:

1
2
3
4
5
6
7
v1
v2
false
false
v1
false
false

我们还可以让两个不同版本的Test类实现同一个接口,这样可以避免使用反射的方式来调用Test类里面的方法。

1
2
3
Class<?> aClass = classLoader1.loadClass("Test");
SomeInterface inter1 = (SomeInterface)aClass.getConstructor().newInstance();
inter1.sayHello()

ClassLoader固然可以解决依赖冲突问题,不过它也限制了不同软件包的操作界面必须使用反射或接口的方式进行动态调用。Maven没有这种限制,它依赖于虚拟机的默认懒惰加载策略,运行过程中如果没有显示使用定制的ClassLoader,那么从头到尾都是在使用AppClassLoader,而不同版本的同名类必须使用不同的ClassLoader加载,所以Maven不能完美解决钻石依赖。

蚂蚁金服开源的sofa-ark其实就是采用ClassLoader的方式来做类隔离的。

Thread.contextClassLoader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Returns the context ClassLoader for this Thread. The context
* ClassLoader is provided by the creator of the thread for use
* by code running in this thread when loading classes and resources.
* If not {@linkplain #setContextClassLoader set}, the default is the
* ClassLoader context of the parent Thread. The context ClassLoader of the
* primordial thread is typically set to the class loader used to load the
* application.
*
* <p>If a security manager is present, and the invoker's class loader is not
* {@code null} and is not the same as or an ancestor of the context class
* loader, then this method invokes the security manager's {@link
* SecurityManager#checkPermission(java.security.Permission) checkPermission}
* method with a {@link RuntimePermission RuntimePermission}{@code
* ("getClassLoader")} permission to verify that retrieval of the context
* class loader is permitted.
*
* @return the context ClassLoader for this Thread, or {@code null}
* indicating the system class loader (or, failing that, the
* bootstrap class loader)
*
* @throws SecurityException
* if the current thread cannot get the context ClassLoader
*
* @since 1.2
*/
@CallerSensitive
public ClassLoader getContextClassLoader() {
if (contextClassLoader == null)
return null;
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
ClassLoader.checkClassLoaderPermission(contextClassLoader,
Reflection.getCallerClass());
}
return contextClassLoader;
}

Thread.contextClassLoader「线程上下文类加载器」,从方法的描述我们可以知道,线程的contextClassLoader是从父线程那里继承过来的,所谓父线程就是创建了当前线程的线程。程序启动时的main线程的contextClassLoader 就是AppClassLoader。这意味着如果没有人工去设置,那么所有的线程的contextClassLoader都是AppClassLoader。它可以做到跨线程共享类,只要它们共享同一个 contextClassLoader。父子线程之间会自动传contextClassLoader,所以共享起来将是自动化的。如果不同的线程使用不同的 contextClassLoader,那么不同的线程使用的类就可以隔离开来。

总结

但是如果仅仅把ClassLoader当成一个将字节码形式的class转为内存形式的Class对象的工具的话有点狭义:他不仅仅是一个转换工具,他也相当于一个类的容器,或者叫命名空间可以起到「类隔离」的作用。位于同一个ClassLoader 里面的类名是唯一的,不同的ClassLoader可以持有同名的类。

同时通过「双亲委派模型」,不同的ClassLoader之间相互合作,形成一个层级关系。parent具有更高的加载优先级。除此之外,parent还表达了一种共享关系,当多个子ClassLoader共享同一个parent时,那么这个parent里面包含的类可以认为是所有子ClassLoader共享的。这也是为什么BootstrapClassLoader被所有的类加载器视为最顶层的加载器,JVM核心类库自然应该被共享。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×