gc Roots对象有哪些

JVM的垃圾自动回收是我们经常说的一个话题,这里的垃圾的含义是:

内存中已经不再被使用到的对象就是垃圾

要进行垃圾回收,如何判断一个对象是否可以被回收?

一般有两种办法:

  • 引用计数法
    • 实现简单,但是没法解决对象之间的循环引用问题
  • 枚举根节点做可达性分析
    • 通过一系列名为“GC Roots”的对象作为起始点,从“GC Roots”对象开始向下搜索,如果一个对象到“GC Roots”没有任何引用链相连,说明此对象可以被回收

常见的常见的GC Root有如下:

  • 通过System Class Loader或者Boot Class Loader加载的class对象,通过自定义类加载器加载的class不一定是GC Root
  • 处于激活状态的线程
  • 栈中的对象
  • 本地方法栈中 JNI (Native方法)的对象
  • JNI中的全局对象
  • 正在被用于同步的各种锁对象
  • JVM自身持有的对象,比如系统类加载器等。

Synchronized的一些东西

synchronized是Java中解决并发问题的一种最常用的方法,从语法上讲synchronized总共有三种用法:

  • 修饰普通方法
  • 修饰静态方法
  • 修饰代码块

synchronized 原理

为了查看synchronized的原理,我们首先反编译一下下面的代码, 这是一个synchronized修饰代码块的demo

1
2
3
4
5
6
7
public class SynchronizedDemo {
public void method() {
synchronized (this) {
System.out.println("Method 1 start");
}
}
}

http://7niucdn.wenchao.ren/20190902124248.png

从上面截图可以看到,synchronized的实现依赖2个指令:

  • monitorenter
  • monitorexit

但是从上面的截图可以看到有一个monitorenter和2个monitorexit,这里之所以有2个monitorexit是因为synchronized的锁释放有2种情况:

  • 方法正常执行完毕synchronized的范围,也就是正常情况下的锁释放
  • synchronized圈起来的范围内的代码执行抛出异常,导致锁释放

monitorenter

关于这个指令,jvm中的描述为:

Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:

• If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
• If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
• If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

翻译一下大概为:

每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:

  • 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
  • 如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
  • 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。

monitorexit

The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.

翻译一下为:

执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。

通过这两段描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。

上面的demo是使用synchronized修饰代码块的demo,下面我们看一个使用synchronized修饰方法的demo:

1
2
3
4
5
public class SynchronizedMethod {
public synchronized void method() {
System.out.println("Hello World!");
}
}

我们继续对这个类进行反编译:

http://7niucdn.wenchao.ren/20190902125437.png

从反编译的结果来看,方法的同步并没有通过指令monitorentermonitorexit来完成(虽然理论上其实也可以通过这两条指令来实现),不过相对于普通方法,其常量池中多了ACC_SYNCHRONIZED标示符。JVM就是根据该标示符来实现方法的同步的:

  • 当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。
  • 在方法执行期间,其他任何线程都无法再获得同一个monitor对象。 其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。

观城模型

Synchronized 锁升级

这四种锁是指锁的状态,专门针对synchronized的。在介绍这四种锁状态之前还需要介绍一些额外的知识。

首先为什么Synchronized能实现线程同步?在回答这个问题之前我们需要了解两个重要的概念:Java对象头Monitor

Java对象头

synchronized是悲观锁,在操作同步资源之前需要给同步资源先加锁,这把锁就是存在Java对象头里的,而Java对象头又是什么呢?
我们以Hotspot虚拟机为例,Hotspot的对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针)。如下面的一个示例:

java object示意图

我们以64位虚拟机来说,object header的结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|------------------------------------------------------------------------------------------------------------|--------------------|
| Object Header (128 bits) | State |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| Mark Word (64 bits) | Klass Word (64 bits) | |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Normal |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Biased |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| ptr_to_lock_record:62 | lock:2 | OOP to metadata object | Lightweight Locked |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| ptr_to_heavyweight_monitor:62 | lock:2 | OOP to metadata object | Heavyweight Locked |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| | lock:2 | OOP to metadata object | Marked for GC |
|------------------------------------------------------------------------------|-----------------------------|--------------------|

Mark Word

Mark Word默认存储:

  • 对象的HashCode
  • 分代年龄
  • 锁标志位信息

这些信息都是与对象自身定义无关的数据,所以Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。

Klass Point

Klass Point:对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。

Monitor

Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁。
Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。

synchronized通过Monitor来实现线程同步,Monitor是依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的线程同步。

synchronized最初实现同步的方式是阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长,这就是JDK 6之前synchronized效率低的原因。这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”,JDK 6中为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。

所以目前 锁一共有4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。锁状态只能升级不能降级

下面是出四种锁状态对应的的Mark Word内容,然后再分别讲解四种锁状态的思路以及特点:

上下两幅图对照着看,我们就能够清楚的知道在不同的锁状态下,mack word区域存储的内容的不同了。

无锁

无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。

无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。

在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。,也就是说偏向锁一般是一个线程的事情

当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

偏向锁在JDK 6及以后的JVM里是默认启用的。可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。

轻量级锁

轻量级锁是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。因此一般情况下轻量级锁大多数是2个线程的事情。

在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。

拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。

  • 如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。
  • 如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。

若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。

重量级锁

升级为重量级锁时,锁标志的状态值变为“10”,此时Mark Word中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。

整体的锁状态升级流程如下:

无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

综上,偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。

参考资料

ThreadPoolExecutor相关

java中的线程池相关的东西抛不开ThreadPoolExecutor,本文就简单的说说这个ThreadPoolExecutor

先看一个ThreadPoolExecutor的demo,然后我们说说它的相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {
private static ThreadPoolExecutor threadPoolExecutor;

public static void main(String[] args) {
threadPoolExecutor = new ThreadPoolExecutor(
4, 8, 0, TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<>(100), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
});
System.out.println(threadPoolExecutor.getCorePoolSize()); //4
System.out.println(threadPoolExecutor.getMaximumPoolSize()); //8
System.out.println(threadPoolExecutor.getPoolSize());//0
boolean b = threadPoolExecutor.prestartCoreThread();
System.out.println(threadPoolExecutor.getCorePoolSize());//4
System.out.println(threadPoolExecutor.getMaximumPoolSize());//8
System.out.println(threadPoolExecutor.getPoolSize());//1
int i = threadPoolExecutor.prestartAllCoreThreads();
System.out.println(threadPoolExecutor.getCorePoolSize());//4
System.out.println(threadPoolExecutor.getMaximumPoolSize());//8
System.out.println(threadPoolExecutor.getPoolSize());//4
}
}

参数介绍

ThreadPoolExecutor的几个参数是必须要清楚的:

  • corePoolSize
    • 线程池中的核心线程数
  • maximumPoolSize
    • 线程池最大线程数,它表示在线程池中最多能创建多少个线程
  • keepAliveTime
    • 线程池中非核心线程闲置超时时长(准确来说应该是没有任务执行时的回收时间)
    • 一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉
    • 如果设置allowCoreThreadTimeOut(boolean value),则会作用于核心线程, 也就是说当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收
  • TimeUnit
    • 时间单位。可选的单位有分钟(MINUTES),秒(SECONDS),毫秒(MILLISECONDS) 等
  • BlockingQueue
    • 任务的阻塞队列,缓存将要执行的Runnable任务,由各线程轮询该任务队列获取任务执行。可以选择以下几个阻塞队列
      • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
      • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
      • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
      • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • ThreadFactory
    • 线程创建的工厂。可以进行一些属性设置,比如线程名,优先级等等,有默认实现。
  • RejectedExecutionHandler
    • 任务拒绝策略,当运行线程数已达到maximumPoolSize,并且队列也已经装满时会调用该参数拒绝任务,默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
      • AbortPolicy:直接抛出异常, 这个是默认的拒绝策略。
      • CallerRunsPolicy:只用调用者所在线程来运行任务。
      • DiscardOldestPolicy:丢弃队列里最早的一个任务,并执行当前任务。
      • DiscardPolicy:不处理,丢弃掉。

运行原理

  • 初始时,线程池中的线程数为0,这一点从上面的demo输出可以看到
  • 当线程池中线程数小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  • 当线程池中线程数达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 。
  • workQueue已满,且maximumPoolSize > corePoolSize时,新提交任务会创建新线程执行任务。注意,新手容易犯的一个错是使用的是无界的workQueue,导致workQueue一直满不了,进而无法继续创建线程
  • workQueue已满,且提交任务数超过maximumPoolSize,任务由RejectedExecutionHandler处理。
  • 当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime时,回收这些线程。
  • 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收。

一般流程图

线程池的一般流程图

newFixedThreadPool 流程图

1
2
3
4
5
6
7
8
9
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(
nThreads, // corePoolSize
nThreads, // maximumPoolSize == corePoolSize
0L, // 空闲时间限制是 0
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>() // 无界阻塞队列
);
}

newFixedThreadPool 流程图

newCacheThreadPool 流程图

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(
0, // corePoolSoze == 0
Integer.MAX_VALUE, // maximumPoolSize 非常大
60L, // 空闲判定是60 秒
TimeUnit.SECONDS,
// 神奇的无存储空间阻塞队列,每个 put 必须要等待一个 take
new SynchronousQueue<Runnable>()
);
}

newCacheThreadPool 流程图

newSingleThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static ExecutorService newSingleThreadExecutor() {
return
new FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor
(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory
)
);
}

可以看到除了多了个FinalizableDelegatedExecutorService 代理,其初始化和newFiexdThreadPoolnThreads = 1的时候是一样的。
区别就在于:

  • newSingleThreadExecutor返回的ExcutorService在析构函数finalize()处会调用shutdown()
  • 如果我们没有对它调用shutdown(),那么可以确保它在被回收时调用shutdown()来终止线程。
    流程图略,请参考 newFiexdThreadPool,这里不再累赘。

java 常见的OOM case

OOMjava.lang.OutOfMemoryError异常的简称,在日常工作中oom还算是比较常见的一种问题吧。出现OOM意味着jvm已经无法满足新对象对内存的申请了,本文整理了一下oom的常见case和一般情况下的解决方法。

处理OOM问题,绝大多数情况下jmapMAT工具可以解决99%的问题。

Java heap space

表现现象为:

1
java.lang.OutOfMemoryError: Java heap space

可能的原因

  • 内存泄漏
  • 堆大小设置不合理
  • JVM处理引用不及时,导致内存无法释放
  • 代码中可能存在大对象分配

解决办法

  • 一般情况下,都是先通过jmap命令,把堆内存dump下来,使用mat工具分析一下,检查是否因为代码问题,存在内存泄露
  • 也可能是下游服务出问题,导致内存中的数据不能很快的处理掉,进而引起oom
  • 调整-Xmx参数,加大堆内存
  • 还有一点容易被忽略,检查是否有大量的自定义的 Finalizable 对象,也有可能是框架内部提供的,考虑其存在的必要性

PermGen space

永久代是HotSot虚拟机对方法区的具体实现,存放了被虚拟机加载的类信息、常量、静态变量、JIT编译后的代码等。

一般情况下的异常表现为:

1
java.lang. OutOfMemoryError : PermGen space

可能的原因

  • 在Java7之前,频繁的错误使用String.intern()方法
  • 运行期间生成了大量的代理类,导致方法区被撑爆,无法卸载

解决办法

  • 检查是否永久代空间是否设置的过小
  • 检查代码中是否存错误的创建过多的代理类

Metaspace

JDK8后,元空间替换了永久带,元空间使用的是本地内存,还有其它细节变化:

  • 字符串常量由永久代转移到堆中
  • 和永久代相关的JVM参数已移除

一般情况下的异常表现为:

1
java.lang.OutOfMemoryError: Metaspace

可能的原因

类似PermGen space

解决办法

  • 通过命令行设置 -XX: MaxMetaSpaceSize 增加 metaspace 大小,或者取消-XX: maxmetsspacedize
  • 其他类似PermGen space

unable to create new native Thread

这种情况的一般表现为:

1
java.lang.OutOfMemoryError: unable to create new native Thread

可能的原因

出现这种异常,基本上都是创建的了大量的线程导致的

解决办法

程序运行期间,间隔多次打印jstack,然后查看线程数的变化情况,找出增长快速的线程。

GC overhead limit exceeded

这种情况其实一般情况下遇到的不是太多,他的一般表现为:

1
java.lang.OutOfMemoryError:GC overhead limit exceeded

可能的原因

这个是JDK6新加的错误类型,一般都是堆太小导致的。Sun 官方对此的定义:超过98%的时间用来做GC并且回收了不到2%的堆内存时会抛出此异常。

解决办法

  • 检查项目中是否有大量的死循环或有使用大内存的代码,优化代码。
  • 添加参数-XX:-UseGCOverheadLimit禁用这个检查,其实这个参数解决不了内存问题,只是把错误的信息延后,最终出现 java.lang.OutOfMemoryError: Java heap space
  • dump内存,检查是否存在内存泄露,如果没有,加大内存。

java.lang.OutOfMemoryError: Out of swap space

1
java.lang.OutOfMemoryError: request size bytes for reason. Out of swap space

Oracle的官方解释是,本地内存(native heap)不够用导致的。
错误日志的具体路径可以通过JVM启动参数配置。如:-XX:ErrorFile=/var/log/java/java_error.log

解决方法

  • 其它服务进程可以选择性的拆分出去
  • 加大swap分区大小,或者加大机器内存大小

stack_trace_with_native_method

1
java.lang.OutOfMemoryError: reason stack_trace_with_native_method

一般出现该错误时,线程正在执行一个本地方法。也就是说执行本地方法时内存不足。一般需要结合其他系统级工具进行排查

Compressed class space

1
java.lang.OutOfMemoryError: Compressed class space

可通过JVM启动参数配置增大相应的内存区域。如:-XX:CompressedClassSpaceSize=2g

Requested array size exceeds VM limit

1
java.lang.OutOfMemoryError: Requested array size exceeds VM limit

程序试图创建一个数组时,该数组的大小超过了剩余可用(连续空间)的堆大小。
出现这种情况可能是堆设置得太小了。也有可能是程序在选择数组容量大小的逻辑有问题.

构造函数中使用Spring @Value注解

如果想在构造函数中使用的@value注解的话,demo如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// File: sample/Message.groovy
package sample

import org.springframework.beans.factory.annotation.*
import org.springframework.stereotype.*

@Component
class Message {

final String text

// Use @Autowired to get @Value to work.
@Autowired
Message(
// Refer to configuration property
// app.message.text to set value for
// constructor argument message.
@Value('${app.message.text}') final String text) {
this.text = text
}

}

bloom filter

当系统设计中出现多级缓存结构时,为了防止大量不存在的key值击穿高速缓存(比如主存),去直接访问低速缓存(如本地磁盘),我们一般需要将这部分key值,直接拦截在高速缓存阶段。这里,当然可以使用普通的hash table,也可以使用bitmap,但是这两种方式都比较耗费内存,当面对海量key值时,问题会变得更加严重。这时,就该介绍我们的主角bloom filter出场了。

一般的,bloom filter用于判断一个key值是否在一个set中,拥有比hash table/bitmap更好的空间经济性。如果bloom filter指示一个key值“不在”一个set中,那么这个判断是100%准确的。这样的特性,非常适合于上述的缓存场景。

bloom filter原理

  • 首先估计要判断的set中的元素个数N,然后选定k个独立的哈希函数。根据N和k,选定一个长度为M的bit array。

  • 遍历set中的N个元素

    • 对每个元素,使用k个哈希函数,得到k个哈希值(一般为一个大整数)
    • 将上述bit array中,k个哈希值所对应的bit置1
  • 对于需要判断的key值
    • 使用k个哈希函数,得到k个哈希值
    • 如果k个哈希值所对应的bit array中的值均为1,则判断此值在set中“可能”存在;否则,判定“一定”不存在

根据上面的原理我们其实可以看到,bloom filter有以下特点:

  • 比较节省空间
  • bloom的识别准确率和数据大小,k个哈希函数有关
  • 如果bloom filter判断key不存在,那么就一定不存在,100%不存在。
  • 如果bloom filter判断key存在,那么可能存在,也可能不存在

bloom filter优缺点

优点:

  • 插入、查找都是常数时间

  • 多个hash函数之间互相独立,可以并行计算

  • 不需要存储元素本身,从而带来空间效率优势,以及一些保密上的优势

  • bloom filter的bitmap可以进行交、并、差运算

缺点:

  • 判断元素是否在集合中的结果其实是不准确的
  • bloom filter中的元素是不能删除的

bloom filter的实际使用

guava bloom filter

guava中提供了bloom filter的一种实现:com.google.common.hash.BloomFilter,可以方便我们在单机情况下使用bloom filter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
/**
* A Bloom filter for instances of {@code T}. A Bloom filter offers an approximate containment test
* with one-sided error: if it claims that an element is contained in it, this might be in error,
* but if it claims that an element is <i>not</i> contained in it, then this is definitely true.
*
* <p>If you are unfamiliar with Bloom filters, this nice
* <a href="http://llimllib.github.com/bloomfilter-tutorial/">tutorial</a> may help you understand
* how they work.
*
* <p>The false positive probability ({@code FPP}) of a bloom filter is defined as the probability
* that {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that
* has not actually been put in the {@code BloomFilter}.
*
* <p>Bloom filters are serializable. They also support a more compact serial representation via the
* {@link #writeTo} and {@link #readFrom} methods. Both serialized forms will continue to be
* supported by future versions of this library. However, serial forms generated by newer versions
* of the code may not be readable by older versions of the code (e.g., a serialized bloom filter
* generated today may <i>not</i> be readable by a binary that was compiled 6 months ago).
*
* @param <T> the type of instances that the {@code BloomFilter} accepts
* @author Dimitris Andreou
* @author Kevin Bourrillion
* @since 11.0
*/
@Beta
public final class BloomFilter<T> implements Predicate<T>, Serializable {

/** The bit set of the BloomFilter (not necessarily power of 2!) */
private final BitArray bits;

/** Number of hashes per element */
private final int numHashFunctions;

/** The funnel to translate Ts to bytes */
private final Funnel<? super T> funnel;

/**
* The strategy we employ to map an element T to {@code numHashFunctions} bit indexes.
*/
private final Strategy strategy;

/**
* Creates a BloomFilter.
*/
private BloomFilter(
BitArray bits, int numHashFunctions, Funnel<? super T> funnel, Strategy strategy) {
checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
checkArgument(
numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
this.bits = checkNotNull(bits);
this.numHashFunctions = numHashFunctions;
this.funnel = checkNotNull(funnel);
this.strategy = checkNotNull(strategy);
}


/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and
* expected false positive probability.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @param fpp the desired false positive probability (must be positive and less than 1.0)
* @return a {@code BloomFilter}
*/
public static <T> BloomFilter<T> create(
Funnel<? super T> funnel, int expectedInsertions, double fpp) {
return create(funnel, (long) expectedInsertions, fpp);
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and
* expected false positive probability.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @param fpp the desired false positive probability (must be positive and less than 1.0)
* @return a {@code BloomFilter}
* @since 19.0
*/
public static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp) {
return create(funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64);
}

@VisibleForTesting
static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
checkNotNull(funnel);
checkArgument(
expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
checkNotNull(strategy);

if (expectedInsertions == 0) {
expectedInsertions = 1;
}
/*
* TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size
* is proportional to -log(p), but there is not much of a point after all, e.g.
* optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares!
*/
long numBits = optimalNumOfBits(expectedInsertions, fpp);
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
try {
return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
}
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and a
* default expected false positive probability of 3%.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @return a {@code BloomFilter}
*/
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions) {
return create(funnel, (long) expectedInsertions);
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and a
* default expected false positive probability of 3%.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @return a {@code BloomFilter}
* @since 19.0
*/
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions) {
return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions
}

// other codes ...
}

基本使用:

1
2
3
4
5
6
7
8
9
10
BloomFilter<Integer> filter = BloomFilter.create(Funnels.integerFunnel(),500,0.01);

filter.put(1);
filter.put(2);
filter.put(3);

assertThat(filter.mightContain(1)).isTrue();
assertThat(filter.mightContain(2)).isTrue();
assertThat(filter.mightContain(3)).isTrue();
assertThat(filter.mightContain(100)).isFalse();

正确估计预期插入数量是很关键的一个参数。当插入的数量接近或高于预期值的时候,布隆过滤器将会填满,这样的话,它会产生很多无用的误报点。

不过也有文章指出,guava的bloom filter在数据量变大以后,准确性大大降低,这个虽然本身就是bloom filter的特性,但是在这篇文章中也给出了一些参考值,大家可以看看: google guava bloom filter包的坑 :

在0.0001的错误率下,插入量不到1.5亿的时候,numBits已经到达了BitArray的最大容量了,这时如果再增加插入量,哈希函数个数就开始退化。到5亿的时候,哈希函数个数退化到了只有3个,也就是说,对每一个key,只有3位来标识,这时准确率就会大大下降。
第一种当然就是减少预期插入量,1亿以内,还是可以保证理论上的准确率的。
第二种,如果你的系统很大,就是会有上亿的key,这时可以考虑拆分,将一个大的bloom filter拆分成几十个小的(比如32或64个),每个最多可以容纳1亿,这时整体就能容纳32或64亿的key了。查询的时候,先对key计算一次哈希,然后取模,查找特定的bloom filter即可。

基于redis的bloom filter

guava中的bloom filter是单机的,如果想使用分布式的的话,可以考虑基于redis 的bloom filter。

主要参考这个文章:ReBloom – Bloom Filter Datatype for Redis

redis 在 4.0 的版本中加入了 module 功能,布隆过滤器可以通过module的形式添加到 redis 中,所以使用 redis 4.0 以上的版本可以通过加载 module 来使用 redis 中的布隆过滤器。但是这不是最简单的方式,使用 docker 可以直接在 redis 中体验布隆过滤器。

1
2
> docker run -d -p 6379:6379 --name bloomfilter redislabs/rebloom
> docker exec -it bloomfilter redis-cli

redis 布隆过滤器主要就两个命令:

  • bf.add 添加元素到布隆过滤器中:bf.add urls https://jaychen.cc
  • bf.exists 判断某个元素是否在过滤器中:bf.exists urls https://jaychen.cc

上面说过布隆过滤器存在误判的情况,在 redis 中有两个值决定布隆过滤器的准确率:

  • error_rate:允许布隆过滤器的错误率,这个值越低过滤器的位数组的大小越大,占用空间也就越大。
  • initial_size:布隆过滤器可以储存的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降。

redis 中有一个命令可以来设置这两个值:

1
bf.reserve urls 0.01 100

上面三个参数的含义为:

  • 第一个值是过滤器的名字。
  • 第二个值为 error_rate 的值。
  • 第三个值为 initial_size 的值。

使用这个命令要注意一点:执行这个命令之前过滤器的名字应该不存在,如果执行之前就存在会报错:(error) ERR item exists

参考文章:

java.nio.ByteBuffer

Java NIO Buffers用于和NIO Channel交互。 我们从Channel中读取数据到buffers里,从Buffer把数据写入到Channels。Buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。java中java.nio.Buffer的常见实现类如下,不过我们这里只说一下ByteBuffer这个实现。

java中`java.nio.Buffer`的常见实现类

Buffer的重要属性

Buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据,为了便于理解,你可以把它理解为一个字节数组。它有有四个重要属性:

1
2
3
4
5
6
7
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
}
  • capacity
    • 这个属性表示这个Buffer最多能放多少数据,在创建buffer的时候指定。int类型。
  • position
    下一个要读写的元素位置(从0开始),当使用buffer的相对位置进行读/写操作时,读/写会从这个下标进行,并在操作完成后,buffer会更新下标的值。
    • 写模式:当写入数据到Buffer的时候需要从一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.
    • 读模式:当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归0,每次读取后,position向后移动。
  • limit
    在Buffer上进行的读写操作都不能越过这个limit。
    • 写模式:limit的含义是我们所能写入的最大数据量,它等同于buffer的容量capacity
    • 读模式:limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。换句话说,您可以读取与写入数量相同的字节数。
  • mark
    • 一个临时存放的位置下标,用户选定的position的前一个位置或-1。
      • 调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设
        置为mark的值。mark的值总是小于等于position的值,如果将position的值设的比mark小,当前的mark值会被抛弃掉。

注:

  • position和limit之间的距离指示了可读/存的字节数。
  • boolean hasRemaining():当缓冲区至少还有一个元素时,返回true。
  • int remaining():position和limit之间字节个数。

这些属性总是满足以下条件:

0 <= mark <= position <= limit <= capacity

通过上面的描述可以看出,其中position和limit的具体含义取决于当前buffer的模式(读模式还是写模式)。capacity在两种模式下都表示容量。

Buffer的常见API

  • ByteBuffer allocate(int capacity)
    • 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,实现类是HeapByteBuffer
  • ByteBuffer allocateDirect(int capacity)
    • 类似于allocate方法,不过使用的是对外内存,实现类是DirectByteBuffer
  • ByteBuffer wrap(byte[] array)
    • 把byte数组包装为ByteBuffer,bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方。其实ByteBuffer底层本来就有一个bytes数组负责来保存buffer缓冲区中的数据,通过allocate方法系统会帮你构造一个byte数组,实现类是HeapByteBuffer
  • ByteBuffer wrap(byte[] array, int offset, int length)
    • 在上一个方法的基础上可以指定偏移量和长度,buffer的capacity就是array.length,这个offset也就是包装后byteBuffer的position,limit=length+position(offset),mark=-1实现类是HeapByteBuffer
  • abstract Object array()
    • 返回支持此缓冲区的数组 (可选操作)
  • abstract int arrayOffset()
    • 返回此缓冲区中的第一个元素在缓冲区的底层实现数组中的偏移量(可选操作)。调用此方法之前要调用hasarray方法,以确保此缓冲区具有可访问的底层实现数组。
  • abstract boolean hasArray()
    • 告诉这个缓冲区是否由可访问的数组支持
  • int capacity()
    • 返回此缓冲区的容量
  • Buffer clear()
    • 清除此缓存区。将position = 0;limit = capacity;mark = -1;把position设为0,一般在把数据写入Buffer前调用。
  • Buffer flip()
    • flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。一般在从Buffer读出数据前调用。
  • abstract boolean isDirect()
    • 判断个缓冲区是否为direct, 也就是是否是对外内存
  • abstract boolean isReadOnly()
    • 判断告知这个缓冲区是否是只读的
  • int limit()
    • 返回此缓冲区的limit的属性值
  • Buffer position(int newPosition)
    • 设置这个缓冲区的位置
  • boolean hasRemaining()
    • return position < limit,返回是否还有未读内容
  • int remaining()
    • return limit - position; 返回limit和position之间相对位置差
  • Buffer rewind()
    • 把position设为0,mark设为-1,不改变limit的值,一般在把数据重写入Buffer前调用
  • Buffer mark()
    • 设置mark的值,mark=position,做个标记
  • Buffer reset()
    • 还原标记,position=mark。
  • ByteBuffer compact()
    • 该方法的作用是将 position 与 limit之间的数据复制到buffer的开始位置,与limit 之间没有数据的话发,就不会进行复制。一个例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
例如:ByteBuffer.allowcate(10); 
内容:[0 ,1 ,2 ,3 4, 5, 6, 7, 8, 9]
## compact前
[0 ,1 ,2 , 3, 4, 5, 6, 7, 8, 9]
pos=4
lim=10
cap=10
## compact后
[4, 5, 6, 7, 8, 9, 6, 7, 8, 9]
pos=6
lim=10
cap=10
  • ByteBuffer slice();
    • 创建一个分片缓冲区。分片缓冲区与主缓冲区共享数据。 分配的起始位置是主缓冲区的position位置,容量为limit-position。 分片缓冲区无法看到主缓冲区positoin之前的元素。

创建buffer的注意点

创建ByteBuffer可以使用allocate或者wrap,就像下面这样:

  • ByteBuffer allocate(int capacity)
  • ByteBuffer allocateDirect(int capacity)
  • ByteBuffer wrap(int capacity)
  • ByteBuffer wrap(byte[] array,int offset,int length)

需要注意的是:创建的缓冲区都是定长的,大小无法改变。若发现刚创建的缓冲区容量太小,只能重新创建一个合适的

关于ByteBuffer wrap(byte[] array,int offset,int length)这个方法,这里再强调一下,这样创建的ByteBuffer的capacity和array的大小是一样的,buffer的position是offset, buffer的limit是offset+length,position之前和limit之后的数据依然可以访问到。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Demo {
public static void main(String args[]){
byte arr[]=new byte[100];
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
System.out.println("Capacity is: "+buffer.capacity());
System.out.println("Position is: "+buffer.position());
System.out.println("limit is: "+buffer.limit());
}
}
//结果:
Capacity is: 100
Position is: 3
limit is: 28

allocate和wrap的区别

wrap只是简单地创建一个具有指向被包装数组的引用的缓冲区,该数组成为后援数组。对后援数组中的数据做的任何修改都将改变缓冲区中的数据,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String args[]){  
byte arr[]=new byte[100];
//将arr数组全部置为1
Arrays.fill(arr, (byte)1);
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
//对后援数组中的数据做的任何修改都将改变缓冲区中的数据
arr[0]=(byte)2;
buffer.position(0);
System.out.println(buffer.get());
//在缓冲区上调用array()方法即可获得后援数组的引用。
System.out.println(Arrays.toString(buffer.array()));
}
//运行结果:
2
[2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ....]//总共100个元素

allocate则是创建了自己的后援数组,在缓冲区上调用array()方法也可获得后援数组的引用。通过调用arrayOffset()方法,可以获取缓冲区中第一个元素在后援数组的偏移量。但是使用wrap创建的ByteBuffer,调用arrayOffset永远是0。

1
2
3
4
5
6
7
8
9
10
11
12
13
 public static void main(String args[]){  
ByteBuffer buffer=ByteBuffer.allocate(100);
//对后援素组的修改也可以反映到buffer上
byte arr[]=buffer.array();
arr[1]=(byte)'a';
buffer.getInt();

System.out.println(Arrays.toString(buffer.array()));
System.out.println(buffer.arrayOffset());
}
//运行结果:
[0, 97, 0, 0, 0, 0, 0, 0,...]//总共100个元素
0

通过ByteBuffer allocateDirect(int capacity)创建的叫直接缓冲区,使用的堆外内存。可以通过isDirect()方法查看一个缓冲区是否是直接缓冲区。由于直接缓冲区是没有后援数组的,所以在其上面调用array()或arrayOffset()都会抛出UnsupportedOperationException异常。注意有些平台或JVM可能不支持这个创建直接缓冲区。

图解

put

写模式下,往buffer里写一个字节,并把postion移动一位。写模式下,一般limit与capacity相等。

bytebuffer-put

flip

写完数据,需要开始读的时候,将postion复位到0,并将limit设为当前postion。

bytebuffer-flip

get

从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。

bytebuffer-get

clear

将position置为0,并不清除buffer内容。

bytebuffer-clear

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Demo {
private static final int SIZE = 1024;

public static void main(String[] args) throws Exception {
// 获取通道,该通道允许写操作
FileChannel fc = new FileOutputStream("data.txt").getChannel();
// 将字节数组包装到缓冲区中
fc.write(ByteBuffer.wrap("Some text".getBytes()));
// 关闭通道
fc.close();

// 随机读写文件流创建的管道
fc = new RandomAccessFile("data.txt", "rw").getChannel();
// fc.position()计算从文件的开始到当前位置之间的字节数
System.out.println("此通道的文件位置:" + fc.position());
// 设置此通道的文件位置,fc.size()此通道的文件的当前大小,该条语句执行后,通道位置处于文件的末尾
fc.position(fc.size());
// 在文件末尾写入字节
fc.write(ByteBuffer.wrap("Some more".getBytes()));
fc.close();

// 用通道读取文件
fc = new FileInputStream("data.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
// 将文件内容读到指定的缓冲区中
fc.read(buffer);
//此行语句一定要有, 如果没有,就是从文件最后开始读取的,当然读出来的都是byte=0时候的字符。通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char)buffer.get());
}
fc.close();
}
}

确保数据落盘

在之前的文章《unix IO模型》我们曾经提到过,用户空间内核空间缓存IO等概念。关于这些概念,大家可以阅读这篇文章,在本篇文章中,我们就不在涉及这些概念了。

IO缓冲机制

大家需要有一个认知就是我们平时写的程序,在将数据到文件中时,其实数据不会立马写入磁盘中进行持久化存储的,而是会经过层层缓存,如下图所示:

I/O buffering

其中这每层缓存都有自己的刷新时机,每层缓存都刷新后才会写入磁盘进行持久化存储。这些缓存的存在目的本意都是为了加速读写操作,因为如果每次读写都对应真实磁盘操作,那么读写的效率会大大降低。但是同样带来的坏处是如果期间发生掉电或者别的故障,还未写入磁盘的数据就丢失了。对于数据安全敏感的应用,比如数据库,比如交易程序,这是无法忍受的。所以操作系统提供了保证文件落盘的机制。

在上面这图中说明了操作系统到磁盘的数据流,以及经过的缓冲区。首先数据会先存在于应用的内存空间,如果调用库函数写入,库函数可能还会把数据缓存在库函数所维护的缓冲区空间中,比如C标准库stdio提供的方法就会进行缓存,目的是为了减少系统调用的次数。这两个缓存都是在用户空间中的。库函数缓存flush时,会调用write系统调用将数据写入内核空间,内核同样维护了一个页缓存(page cache),操作系统会在合适的时间把脏页的数据写入磁盘。即使是写入磁盘了,磁盘也可能维护了一个缓存,在这个时候掉电依然会丢失数据的,只有写入了磁盘的持久存储物理介质上,数据才是真正的落盘了,是安全的。

比如在网络套接字上侦听连接并将从每个客户端接收的数据写入文件的应用程序。 在关闭连接之前,服务器确保将接收到的数据写入稳定存储器,并向客户端发送此类确认,请看下面的简化代码(代码中已经注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int sock_read(int sockfd, FILE *outfp, size_t nrbytes)
{
int ret;
size_t written = 0;
//example of an application buffer
char *buf = malloc(MY_BUF_SIZE);

if (!buf)
return -1;

//take care of reading the data from the socket
//and writing it to the file stream
while (written < nrbytes) {
ret = read(sockfd, buf, MY_BUF_SIZE);
if (ret =< 0) {
if (errno == EINTR)
continue;
return ret;
}
written += ret;
ret = fwrite((void *)buf, ret, 1, outfp);
if (ret != 1)
return ferror(outfp);
}

//flush file stream, the data to move into the "Kernel Buffers" layer
ret = fflush(outfp);
if (ret != 0)
return -1;

//makethe data is saved to the "Stable Storage" layer
ret = fsync(fileno(outfp));
if (ret < 0)
return -1;
return 0;
}

在上面的这幅图中,可以看到数据流向经过了用户控件缓冲区内核缓存区,下面我们说说这2个缓存区。

用户空间缓冲区

用户空间的缓存分为:

  • 应用程序本身维护的缓冲区
  • 库维护的缓冲区

应用本身维护的缓冲区需要开发者自己刷新,调用库函数写入到库函数的缓冲区中(这一步可能不存在)。如果应用程序不依赖任何库函数,而是直接使用系统调用,那么则是把数据写入系统的缓冲区去。

库函数一般都会维护缓冲区,目的是简化应用程序的编写,应用程序就不需要编写维护缓冲区的代码,库维护的缓冲区针对那些没有应用程序本身维护的缓存区的程序来说,在某些时候是会提升不少的性能的,因为缓冲区大大减少了系统调用的次数,而系统调用是非常耗时的,系统调用涉及到用户态到内核态的切换,这个切换需要很多的步骤与校验,较为耗时。

比如C标准库stdio就维护着一个缓冲区,对应这个缓冲区,C标准库提供了fflush方法强制把缓冲区数据写入操作系统。

在Java的OutputStream接口提供了一个flush方法,具体的作用要看实现类的具体实现。BufferedOutputStream#flush就会把自己维护的缓冲区数据写入下一层的OutputStream。比如是new BufferedOutputStream(new FileOutputStream("/"))这样的模式,则调用BufferedOutputStream#flush会将数据写入操作系统。

内核缓冲区

应用程序直接或者通过库函数间接的使用系统调用write将数据写入操作系统缓冲区.UNIX系统在内核中设有高速缓存或页面高速缓存。目的是为了减少磁盘读写次数。

用户写入系统的数据先写入系统缓冲区,系统缓冲区写满后,将其排入输出队列,然后得到队首时,才进行实际的IO操作。这种输出方式被称为延迟写

UNIX系统提供了三个系统调用来执行刷新内核缓冲区:sync,fsync,fdatasync

sync

1
2
3
// sync() causes all pending modifications to filesystem metadata and
//cached file data to be written to the underlying filesystems.
void sync(void)

sync函数只是将所有修改过的块缓冲区排入输出队列就返回,并不等待实际的写磁盘操作返回。 操作系统的update系统守护进程会周期地调用sync函数,来保证系统中的数据能定期落盘。

根据sync(2) - Linux manual page的描述,Linux对sync的实现与POSIX规范不太一样,POSIX规范中,sync可能在文件真正落盘前就返回,而Linux的实现则是文件真正落盘后才会返回。所以Linux中,sync与fsync的效果是一样的!但是1.3.20之前的Linux存在BUG,导致sync并不会在真正落盘后返回。

fsync

1
void fsync(int filedes)

fsync对指定的文件起作用,它传输内核缓冲区中这个文件的数据到存储设备中,并阻塞直到存储设备响应说数据已经保存好了。

fsync对文件数据与文件元数据都有效。文件的元数据可以理解为文件的属性数据,比如文件的更新时间,访问时间,长度等。

fdatasync

1
void fdatasync(int filedes)

fdatasyncfsync类似,两者的区别是,fdatasync不一定需要刷新文件的元数据部分到存储设备。

是否需要刷新文件的元数据,是要看元数据的变化部分是否对之后的读取有影响,比如文件元数据的访问时间st_atime和修改时间st_mtime变化了,fdatasync不会去刷新元数据数据到存储设备,因为即使这个数据丢失了不一致了,也不影响故障恢复后的文件读取。但是如果文件的长度st_size变化了,那么就需要刷新元数据数据到存储设备。

所以如果你每次都更新文件长度,那么调用fsyncfdatasync的效果是一样的。

但是如果更新能做到不修改文件长度,那么fdatasync能比fsync少了一次磁盘写入,这个是非常大的速度提升。

open中的O_SYNC和O_DSYNC

除了上面三个系统调用,open系统调用在打开文件时,可以设置和同步相关的标志位:O_SYNCO_DSYNC

  • 设置O_SYNC的效果相当于是每次write后自动调用fsync。
  • 设置O_DSYNC的效果相当于是每次write后自动调用fdatasync。

关于新建文件

在一个文件上调用fsync/fdatasync只能保证文件本身的数据落盘,但是对于文件系统来说,目录中也保存着文件信息,fsync/fdatasync的调用并不会保证这部分的数据落盘。如果此时发生掉电,这个文件就无法被找到了。所以对于新建文件来说,还需要在父目录上调用fsync。

关于覆盖现有文件

覆盖现有文件时,如果发生掉电,新的数据是不会写入成功,但是可能会污染现有的数据,导致现有数据丢失。所以最佳实践是新建一个临时文件,写入成功后,再原子性替换原有文件。具体步骤:

  • 新建一个临时文件
  • 向临时文件写入数据
  • 对临时文件调用fsync,保证数据落盘。期间发生掉电对现有文件无影响。
  • 重命名临时文件为目标文件名
  • 对父目录调用fsync

存储设备缓冲区

存储设备为了提高性能,也会加入缓存。高级的存储设备能提供非易失性的缓存,比如有掉电保护的缓存。但是无法对所有设备做出这种保证,所以如果数据只是写入了存储设备的缓存的话,遇到掉电等故障,依然会导致数据丢失。

对于保证数据能保存到存储设备的持久化存储介质上,而不管设备本身是否有易失性缓存,操作系统提供了write barriers这个机制。开启了write barriers的文件系统,能保证调用fsync/fdatasync数据持久化保存,无论是否发生了掉电等其他故障,但是会导致性能下降。

许多文件系统提供了配置write barriers的功能。比如ext3, ext4, xfsbtrfsmount参数-o barrier表示开启写屏障,调用fsync/fdatasync能保证刷新存储设备的缓存到持久化介质上。-o nobarrier则表示关闭写屏障,调用fsync/fdatasync无法保证数据落盘。

Linux默认开启write barriers,所以默认情况下,我们调用fsync/fdatasync,就可以认为是文件真正的可靠落盘了

对于这个层面的数据安全保证来说,应用程序是不需要去考虑的,因为如果这台机器的硬盘被挂载为没有开启写屏障,那么可以认为这个管理员知道这个风险,他选择了更高的性能,而不是更高的安全性。

Java世界中的对应API

针对确保数据落盘,掉电也不丢失数据的情况,JDK也封装了对应的功能,并且为我们做好了跨平台的保证。

JDK中有三种方式可以强制文件数据落盘:

  • 调用FileDescriptor#sync函数
  • 调用FileChannel#force函数
  • 使用RandomAccessFilerws或者rwd模式打开文件

FileDescriptor#sync

FileDescriptor类提供了sync方法,可以用于保证数据保存到持久化存储设备后返回。使用方法:

1
2
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");
outputStream.getFD().sync();

可以看一下JDK是如何实现FileDescriptor#sync的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public native void sync() throws SyncFailedException;

// jdk/src/solaris/native/java/io/FileDescriptor_md.c
JNIEXPORT void JNICALL
Java_java_io_FileDescriptor_sync(JNIEnv *env, jobject this) {
// 获取文件描述符
FD fd = THIS_FD(this);
// 调用IO_Sync来执行数据同步
if (IO_Sync(fd) == -1) {
JNU_ThrowByName(env, "java/io/SyncFailedException", "sync failed");
}
}
// IO_Sync在UNIX系统上的定义就是fsync:
// jdk/src/solaris/native/java/io/io_util_md.h
#define IO_Sync fsync

FileChannel#force

之前的文章提到了,操作系统提供了fsync/fdatasync两个用户同步数据到持久化设备的系统调用,后者尽可能的会不同步文件元数据,来减少一次磁盘IO,提高性能。但是Java IO的FileDescriptor#sync只是对fsync的封装,JDK中没有对于fdatasync的封装,这是一个特性缺失。

Java NIO对这一点也做了增强,FileChannel类的force方法,支持传入一个布尔参数metaData,表示是否需要确保文件元数据落盘,如果为true,则调用fsync。如果为false,则调用fdatasync。

使用例子如下:

1
2
3
4
5
6
7
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");

// 强制文件数据与元数据落盘
outputStream.getChannel().force(true);

// 强制文件数据落盘,不关心元数据是否落盘
outputStream.getChannel().force(false);

在jdk中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class FileChannelImpl extends FileChannel {
private final FileDispatcher nd;
private final FileDescriptor fd;
private final NativeThreadSet threads = new NativeThreadSet(2);

public final boolean isOpen() {
return open;
}

private void ensureOpen() throws IOException {
if(!this.isOpen()) {
throw new ClosedChannelException();
}
}

// 布尔参数metaData用于指定是否需要文件元数据也确保落盘
public void force(boolean metaData) throws IOException {
// 确保文件是已经打开的
ensureOpen();
int rv = -1;
int ti = -1;
try {
begin();
ti = threads.add();

// 再次确保文件是已经打开的
if (!isOpen())
return;
do {
// 调用FileDispatcher#force
rv = nd.force(fd, metaData);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
} finally {
threads.remove(ti);
end(rv > -1);
assert IOStatus.check(rv);
}
}
}

实现中有许多线程同步相关的代码,不属于我们要关注的部分,就不分析了。FileChannel#force调用FileDispatcher#forceFileDispatcher是NIO内部实现用的一个类,封装了一些文件操作方法,其中包含了刷新文件的方法:

1
2
3
4
abstract class FileDispatcher extends NativeDispatcher {
abstract int force(FileDescriptor fd, boolean metaData) throws IOException;
// ...
}

FileDispatcher#force的实现:

1
2
3
4
5
6
class FileDispatcherImpl extends FileDispatcher {
int force(FileDescriptor fd, boolean metaData) throws IOException {
return force0(fd, metaData);
}
static native int force0(FileDescriptor fd, boolean metaData) throws IOException;
// ...

FileDispatcher#force的本地方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
jobject fdo, jboolean md)
{
// 获取文件描述符
jint fd = fdval(env, fdo);
int result = 0;

if (md == JNI_FALSE) {
// 如果调用者认为不需要同步文件元数据,调用fdatasync
result = fdatasync(fd);
} else {
#ifdef _AIX
/* On AIX, calling fsync on a file descriptor that is opened only for
* reading results in an error ("EBADF: The FileDescriptor parameter is
* not a valid file descriptor open for writing.").
* However, at this point it is not possibly anymore to read the
* 'writable' attribute of the corresponding file channel so we have to
* use 'fcntl'.
*/
int getfl = fcntl(fd, F_GETFL);
if (getfl >= 0 && (getfl & O_ACCMODE) == O_RDONLY) {
return 0;
}
#endif
// 如果调用者认为需要同步文件元数据,调用fsync
result = fsync(fd);
}
return handle(env, result, "Force failed");
}

可以看出,其实FileChannel#force就是简单的通过metaData参数来区分调用fsync和fdatasync。

同时在zookeeper的org.apache.zookeeper.common.AtomicFileOutputStream类中我们可以看到下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public void close() throws IOException {
boolean triedToClose = false, success = false;
try {
flush();
((FileOutputStream) out).getChannel().force(true);

triedToClose = true;
super.close();
success = true;
} finally {
if (success) {
boolean renamed = tmpFile.renameTo(origFile);
if (!renamed) {
// On windows, renameTo does not replace.
if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
throw new IOException(
"Could not rename temporary file " + tmpFile
+ " to " + origFile);
}
}
} else {
if (!triedToClose) {
// If we failed when flushing, try to close it to not leak
// an FD
IOUtils.closeStream(out);
}
// close wasn't successful, try to delete the tmp file
if (!tmpFile.delete()) {
LOG.warn("Unable to delete tmp file " + tmpFile);
}
}
}
}

RandomAccessFile结合rws/rwd模式

RandomAccessFile打开文件支持4中模式:

  • r 以只读方式打开。调用结果对象的任何 write 方法都将导致抛出 IOException。
  • rw打开以便读取和写入。如果该文件尚不存在,则尝试创建该文件。
  • rws 打开以便读取和写入,对于rws,还要求对文件的内容或元数据的每个更新都同步写入到底层存储设备。
  • rwd 打开以便读取和写入,对于rwd,还要求对文件内容的每个更新都同步写入到底层存储设备。

其中rws模式会在open文件时传入O_SYNC标志位。rwd模式会在open文件时传入O_DSYNC标志位。

RandomAccessFile源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 4个标志位,用于组合表示4种模式
private static final int O_RDONLY = 1;
private static final int O_RDWR = 2;
private static final int O_SYNC = 4;
private static final int O_DSYNC = 8;

public RandomAccessFile(File file, String mode)
throws FileNotFoundException
{
String name = (file != null ? file.getPath() : null);
int imode = -1;
// 只读模式
if (mode.equals("r"))
imode = O_RDONLY;
else if (mode.startsWith("rw")) {
// 读写模式
imode = O_RDWR;
rw = true;

// 读写模式下,可以结合O_SYNC和O_DSYNC标志
if (mode.length() > 2) {
if (mode.equals("rws"))
imode |= O_SYNC;
else if (mode.equals("rwd"))
imode |= O_DSYNC;
else
imode = -1;
}
}
if (imode < 0)
throw new IllegalArgumentException("Illegal mode \"" + mode
+ "\" must be one of "
+ "\"r\", \"rw\", \"rws\","
+ " or \"rwd\"");
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkRead(name);
if (rw) {
security.checkWrite(name);
}
}
if (name == null) {
throw new NullPointerException();
}
if (file.isInvalid()) {
throw new FileNotFoundException("Invalid file path");
}
// 新建文件描述符
fd = new FileDescriptor();
fd.attach(this);
path = name;
open(name, imode);
}

private void open(String name, int mode)
throws FileNotFoundException {
open0(name, mode);
}

private native void open0(String name, int mode)
throws FileNotFoundException;

其中open0的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// jdk/src/share/native/java/io/RandomAccessFile.c
JNIEXPORT void JNICALL
Java_java_io_RandomAccessFile_open0(JNIEnv *env,
jobject this, jstring path, jint mode)
{
int flags = 0;
// JAVA中的标志位与操作系统标志位转换
if (mode & java_io_RandomAccessFile_O_RDONLY)
flags = O_RDONLY;
else if (mode & java_io_RandomAccessFile_O_RDWR) {
flags = O_RDWR | O_CREAT;
if (mode & java_io_RandomAccessFile_O_SYNC)
flags |= O_SYNC;
else if (mode & java_io_RandomAccessFile_O_DSYNC)
flags |= O_DSYNC;
}

// 调用fileOpen打开函数
fileOpen(env, this, path, raf_fd, flags);
}

fileOpen之后的流程与FileInputStream的一致。可以看出,相比于FileInputStream固定使用O_RDONLYFileOutputStream固定使用O_WRONLY | O_CREATRandomAccessFile提供了在Java中指定打开模式的能力。

但是同时我们需要清除,rwsrwd的效率比rw低非常非常多,因为每次读写都需要刷到磁盘才会返回,这两个中rwdrws效率高一些,因为rwd只刷新文件内容,rws刷新文件内容与元数据,文件的元数据就是文件更新时间等信息。

原子性的重命名文件

在java中的File类的renameTo方法,提供了重命名文件的功能。但是需要注意的是这个方法并不能保证原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Renames the file denoted by this abstract pathname.
*
* <p> Many aspects of the behavior of this method are inherently
* platform-dependent: The rename operation might not be able to move a
* file from one filesystem to another, it might not be atomic, and it
* might not succeed if a file with the destination abstract pathname
* already exists. The return value should always be checked to make sure
* that the rename operation was successful.
*
* <p> Note that the {@link java.nio.file.Files} class defines the {@link
* java.nio.file.Files#move move} method to move or rename a file in a
* platform independent manner.
*
* @param dest The new abstract pathname for the named file
*
* @return <code>true</code> if and only if the renaming succeeded;
* <code>false</code> otherwise
*
* @throws SecurityException
* If a security manager exists and its <code>{@link
* java.lang.SecurityManager#checkWrite(java.lang.String)}</code>
* method denies write access to either the old or new pathnames
*
* @throws NullPointerException
* If parameter <code>dest</code> is <code>null</code>
*/
public boolean renameTo(File dest) {

因此如果想原子性的重命名和移动文件,我们应该使用java.nio.file.Files类中的move方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Move or rename a file to a target file.
*
* <p> By default, this method attempts to move the file to the target
* file, failing if the target file exists except if the source and
* target are the {@link #isSameFile same} file, in which case this method
* has no effect. If the file is a symbolic link then the symbolic link
* itself, not the target of the link, is moved. This method may be
* invoked to move an empty directory. In some implementations a directory
* has entries for special files or links that are created when the
* directory is created. In such implementations a directory is considered
* empty when only the special entries exist. When invoked to move a
* directory that is not empty then the directory is moved if it does not
* require moving the entries in the directory. For example, renaming a
* directory on the same {@link FileStore} will usually not require moving
* the entries in the directory. When moving a directory requires that its
* entries be moved then this method fails (by throwing an {@code
* IOException}). To move a <i>file tree</i> may involve copying rather
* than moving directories and this can be done using the {@link
* #copy copy} method in conjunction with the {@link
* #walkFileTree Files.walkFileTree} utility method.
*
* <p> The {@code options} parameter may include any of the following:
*
* <table border=1 cellpadding=5 summary="">
* <tr> <th>Option</th> <th>Description</th> </tr>
* <tr>
* <td> {@link StandardCopyOption#REPLACE_EXISTING REPLACE_EXISTING} </td>
* <td> If the target file exists, then the target file is replaced if it
* is not a non-empty directory. If the target file exists and is a
* symbolic link, then the symbolic link itself, not the target of
* the link, is replaced. </td>
* </tr>
* <tr>
* <td> {@link StandardCopyOption#ATOMIC_MOVE ATOMIC_MOVE} </td>
* <td> The move is performed as an atomic file system operation and all
* other options are ignored. If the target file exists then it is
* implementation specific if the existing file is replaced or this method
* fails by throwing an {@link IOException}. If the move cannot be
* performed as an atomic file system operation then {@link
* AtomicMoveNotSupportedException} is thrown. This can arise, for
* example, when the target location is on a different {@code FileStore}
* and would require that the file be copied, or target location is
* associated with a different provider to this object. </td>
* </table>
*
* <p> An implementation of this interface may support additional
* implementation specific options.
*
* <p> Moving a file will copy the {@link
* BasicFileAttributes#lastModifiedTime last-modified-time} to the target
* file if supported by both source and target file stores. Copying of file
* timestamps may result in precision loss. An implementation may also
* attempt to copy other file attributes but is not required to fail if the
* file attributes cannot be copied. When the move is performed as
* a non-atomic operation, and an {@code IOException} is thrown, then the
* state of the files is not defined. The original file and the target file
* may both exist, the target file may be incomplete or some of its file
* attributes may not been copied from the original file.
*
* <p> <b>Usage Examples:</b>
* Suppose we want to rename a file to "newname", keeping the file in the
* same directory:
* <pre>
* Path source = ...
* Files.move(source, source.resolveSibling("newname"));
* </pre>
* Alternatively, suppose we want to move a file to new directory, keeping
* the same file name, and replacing any existing file of that name in the
* directory:
* <pre>
* Path source = ...
* Path newdir = ...
* Files.move(source, newdir.resolve(source.getFileName()), REPLACE_EXISTING);
* </pre>
*
* @param source
* the path to the file to move
* @param target
* the path to the target file (may be associated with a different
* provider to the source path)
* @param options
* options specifying how the move should be done
*
* @return the path to the target file
*
* @throws UnsupportedOperationException
* if the array contains a copy option that is not supported
* @throws FileAlreadyExistsException
* if the target file exists but cannot be replaced because the
* {@code REPLACE_EXISTING} option is not specified <i>(optional
* specific exception)</i>
* @throws DirectoryNotEmptyException
* the {@code REPLACE_EXISTING} option is specified but the file
* cannot be replaced because it is a non-empty directory
* <i>(optional specific exception)</i>
* @throws AtomicMoveNotSupportedException
* if the options array contains the {@code ATOMIC_MOVE} option but
* the file cannot be moved as an atomic file system operation.
* @throws IOException
* if an I/O error occurs
* @throws SecurityException
* In the case of the default provider, and a security manager is
* installed, the {@link SecurityManager#checkWrite(String) checkWrite}
* method is invoked to check write access to both the source and
* target file.
*/
public static Path move(Path source, Path target, CopyOption... options)
throws IOException

其中参数中的CopyOption可选性有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package java.nio.file;

/**
* Defines the standard copy options.
*
* @since 1.7
*/

public enum StandardCopyOption implements CopyOption {
/**
* Replace an existing file if it exists.
*/
REPLACE_EXISTING,
/**
* Copy attributes to the new file.
*/
COPY_ATTRIBUTES,
/**
* Move the file as an atomic file system operation.
*/
ATOMIC_MOVE;
}

我们看看kafka中怎么使用的,在kafka的org.apache.kafka.common.utils中有下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
try {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException outer) {
try {
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to "
+ outer.getMessage());
} catch (IOException inner) {
inner.addSuppressed(outer);
throw inner;
}
}
}

参考资料

java中的zero copy

在web应用程序中,我们经常会在server和client之间传输数据。比如server发数据给client,server首先将数据从硬盘读出之后,然后原封不动的通过socket传输给client,大致原理如下:

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

下面的例子展示了传统的数据复制实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class TraditionalClient {


public static void main(String[] args) {

int port = 2000;
String server = "localhost";
Socket socket = null;
String lineToBeSent;

DataOutputStream output = null;
FileInputStream inputStream = null;
int ERROR = 1;


// connect to server
try {
socket = new Socket(server, port);
System.out.println("Connected with server " +
socket.getInetAddress() +
":" + socket.getPort());
}
catch (UnknownHostException e) {
System.out.println(e);
System.exit(ERROR);
}
catch (IOException e) {
System.out.println(e);
System.exit(ERROR);
}

try {
String fname = "sendfile/NetworkInterfaces.c";
inputStream = new FileInputStream(fname);

output = new DataOutputStream(socket.getOutputStream());
long start = System.currentTimeMillis();
byte[] b = new byte[4096];
long read = 0, total = 0;
while ((read = inputStream.read(b)) >= 0) {
total = total + read;
output.write(b);
}
System.out.println("bytes send--" + total + " and totaltime--" + (System.currentTimeMillis() - start));
}
catch (IOException e) {
System.out.println(e);
}

try {
output.close();
socket.close();
inputStream.close();
}
catch (IOException e) {
System.out.println(e);
}
}
}

这种操作看起来可能不会怎么消耗CPU,但是实际上它是低效的。因为传统的 Linux 操作系统的标准 I/O 接口是基于数据拷贝操作的,即 I/O 操作会导致数据在操作系统内核地址空间的缓冲区和应用程序地址空间定义的缓冲区之间进行传输。如下图:

Traditional data copying approach

  • 数据首先被从磁盘读取到内核的read buffer
  • 然后在从内核的read buffer中复制到应用程序的buffer中
  • 然后在从应用程序的buffer中复制到内核的socket buffer
  • 最后在从内核的socket buffer中复制到网卡中

然后其中涉及了4次上下文切换:

Traditional context switches

分析上面的描述,我们可以看到kernel buffer其实在这个过程中充当了一个ahead cache。之所以引入这个kernel buffer其实是在很多的情况下是可以减少磁盘 I/O 的操作,进而提升效率的。

  • 比如对于读请求,如果我们所请求的数据的大小小于kernel buffer并且如果要读取的数据已经存放在操作系统的高速缓冲存储器中,那么就不需要再进行实际的物理磁盘 I/O 操作。直接从kernel buffer中读取就好了。
  • 对于写请求:利用这个ahead cache可以实现异步写操作

但是没有银弹,这样会带来一个问题就是,如果当我们请求的数据量的大小远大于kernel buffer的大小的话,这种情况下kernel buffer的存在反而会导致数据在kernel buffer用户缓冲区之间多次复制。

java中的zero copy

如果我们想在java中使用zero copy,我们一般会用java.nio.channels.FileChannel类中的transferTo()方法。下面是它的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Transfers bytes from this channel's file to the given writable byte
* channel.
*
* <p> An attempt is made to read up to <tt>count</tt> bytes starting at
* the given <tt>position</tt> in this channel's file and write them to the
* target channel. An invocation of this method may or may not transfer
* all of the requested bytes; whether or not it does so depends upon the
* natures and states of the channels. Fewer than the requested number of
* bytes are transferred if this channel's file contains fewer than
* <tt>count</tt> bytes starting at the given <tt>position</tt>, or if the
* target channel is non-blocking and it has fewer than <tt>count</tt>
* bytes free in its output buffer.
*
* <p> This method does not modify this channel's position. If the given
* position is greater than the file's current size then no bytes are
* transferred. If the target channel has a position then bytes are
* written starting at that position and then the position is incremented
* by the number of bytes written.
*
* <p> This method is potentially much more efficient than a simple loop
* that reads from this channel and writes to the target channel. Many
* operating systems can transfer bytes directly from the filesystem cache
* to the target channel without actually copying them. </p>
*
* @param position
* The position within the file at which the transfer is to begin;
* must be non-negative
*
* @param count
* The maximum number of bytes to be transferred; must be
* non-negative
*
* @param target
* The target channel
*
* @return The number of bytes, possibly zero,
* that were actually transferred
*
* @throws IllegalArgumentException
* If the preconditions on the parameters do not hold
*
* @throws NonReadableChannelException
* If this channel was not opened for reading
*
* @throws NonWritableChannelException
* If the target channel was not opened for writing
*
* @throws ClosedChannelException
* If either this channel or the target channel is closed
*
* @throws AsynchronousCloseException
* If another thread closes either channel
* while the transfer is in progress
*
* @throws ClosedByInterruptException
* If another thread interrupts the current thread while the
* transfer is in progress, thereby closing both channels and
* setting the current thread's interrupt status
*
* @throws IOException
* If some other I/O error occurs
*/
public abstract long transferTo(long position, long count,
WritableByteChannel target)
throws IOException;

transferTo()方法把数据从file channel传输到指定的writable byte channel。它需要底层的操作系统支持zero copy。在UNIX和各种Linux中,会执行系统调用sendfile(),该命令把数据从一个文件描述符传输到另一个文件描述符(Linux中万物皆文件):

1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

因此传统的方式中的

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

可以被transferTo()替代。下面的图展示了使用transferTo(), 也就是zero copy技术后的流程:

 Data copy with transferTo()

Context switching with transferTo()

  • transferTo()方法使文件内容被DMA引擎复制到读缓冲区中。 然后,内核将数据复制到与输出套接字关联的内核缓冲区中。
  • 第三个副本发生在DMA引擎将数据从内核套接字缓冲区传递到协议引擎时。

这是一个很明显的进步:我们把context switch的次数从4次减少到了2次,同时也把data copy的次数从4次降低到了3次(而且其中只有一次占用了CPU,另外两次由DMA完成)。但是,要做到zero copy,这还差得远。

如果网卡支持gather operation,我们可以通过kernel进一步减少数据的拷贝操作。在2.4及以上版本的linux内核中,开发者修改了socket buffer descriptor来适应这一需求。这个方法不仅减少了context switch,还消除了和CPU有关的数据拷贝。使用层面的使用方法没有变,但是内部原理却发生了变化:

Data copies when transferTo() and gather operations are used

  • transferTo()方法使文件内容被DMA引擎复制到内核缓冲区中。
  • 没有数据被复制到套接字缓冲区中。 相反,只有具有有关数据位置和长度信息的描述符才会附加到套接字缓冲区。 DMA引擎将数据直接从内核缓冲区传递到协议引擎,从而消除了剩余的最终CPU副本。

下面这个例子展示了如何使用transferTo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class TransferToClient {

public static void main(String[] args) throws IOException{
TransferToClient sfc = new TransferToClient();
sfc.testSendfile();
}
public void testSendfile() throws IOException {
String host = "localhost";
int port = 9026;
SocketAddress sad = new InetSocketAddress(host, port);
SocketChannel sc = SocketChannel.open();
sc.connect(sad);
sc.configureBlocking(true);

String fname = "sendfile/NetworkInterfaces.c";
long fsize = 183678375L, sendzise = 4094;

// FileProposerExample.stuffFile(fname, fsize);
FileChannel fc = new FileInputStream(fname).getChannel();
long start = System.currentTimeMillis();
long nsent = 0, curnset = 0;
curnset = fc.transferTo(0, fsize, sc);
System.out.println("total bytes transferred--"+curnset+" and time taken in MS--"+(System.currentTimeMillis() - start));
//fc.close();
}
}

参考资料

Iterable和Iterator结合使用的一个小例子

这篇文章主要是记录一下使用Iterable和Iterator用作迭代处理的一个例子。基于这种模式可以很方便的实现
流式处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Array<T> implements Iterable<T> {
T[] values; // this contains the actual elements of the array

// Constructor that takes a "raw" array and stores it
public Array(T[] values) {
this.values = values;
}

// This is a private class that implements iteration over the elements
// of the list. It is not accessed directly by the user, but is used in
// the iterator() method of the Array class. It implements the hasNext()
// and next() methods.
class ArrayIterator implements Iterator<T> {
int current = 0; // the current element we are looking at

// return whether or not there are more elements in the array that
// have not been iterated over.
public boolean hasNext() {
if (current < Array.this.values.length) {
return true;
} else {
return false;
}
}

// return the next element of the iteration and move the current
// index to the element after that.
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return values[current++];
}
}

// Return the value at a given index
public T get(int index) {
return values[index];
}

// Set the value at a given index
public void set(int index, T value) {
values[index] = value;
}

// Return the length of the array
public int length() {
return values.length;
}

// Return an iterator over the elements in the array. This is generally not
// called directly, but is called by Java when used in a "simple" for loop.
public Iterator<T> iterator() {
return new ArrayIterator();
}

// This is just a sample program that can be run to show how the Array
// class might be used.
public static void main(String[] args) {
// create an array of strings
String[] strings = new String[]{"Hello", "World"};

// create a new array to hold these strings
Array<String> array = new Array<String>(strings);

// get and print the first values (prints "Hello")
System.out.println(array.get(0));

// set the second value
array.set(1, "Javaland!");

// iterate over the array, printing "Hello\nJavaland!"
for (String s : array) {
System.out.println(s);
}
}
}
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×