gc Roots对象有哪些

JVM的垃圾自动回收是我们经常说的一个话题,这里的垃圾的含义是:

内存中已经不再被使用到的对象就是垃圾

要进行垃圾回收,如何判断一个对象是否可以被回收?

一般有两种办法:

  • 引用计数法
    • 实现简单,但是没法解决对象之间的循环引用问题
  • 枚举根节点做可达性分析
    • 通过一系列名为“GC Roots”的对象作为起始点,从“GC Roots”对象开始向下搜索,如果一个对象到“GC Roots”没有任何引用链相连,说明此对象可以被回收

常见的常见的GC Root有如下:

  • 通过System Class Loader或者Boot Class Loader加载的class对象,通过自定义类加载器加载的class不一定是GC Root
  • 处于激活状态的线程
  • 栈中的对象
  • 本地方法栈中 JNI (Native方法)的对象
  • JNI中的全局对象
  • 正在被用于同步的各种锁对象
  • JVM自身持有的对象,比如系统类加载器等。

Synchronized的一些东西

synchronized是Java中解决并发问题的一种最常用的方法,从语法上讲synchronized总共有三种用法:

  • 修饰普通方法
  • 修饰静态方法
  • 修饰代码块

synchronized 原理

为了查看synchronized的原理,我们首先反编译一下下面的代码, 这是一个synchronized修饰代码块的demo

1
2
3
4
5
6
7
public class SynchronizedDemo {
public void method() {
synchronized (this) {
System.out.println("Method 1 start");
}
}
}

http://7niucdn.wenchao.ren/20190902124248.png

从上面截图可以看到,synchronized的实现依赖2个指令:

  • monitorenter
  • monitorexit

但是从上面的截图可以看到有一个monitorenter和2个monitorexit,这里之所以有2个monitorexit是因为synchronized的锁释放有2种情况:

  • 方法正常执行完毕synchronized的范围,也就是正常情况下的锁释放
  • synchronized圈起来的范围内的代码执行抛出异常,导致锁释放

monitorenter

关于这个指令,jvm中的描述为:

Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:

• If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
• If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
• If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

翻译一下大概为:

每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:

  • 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
  • 如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
  • 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。

monitorexit

The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.

翻译一下为:

执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。

通过这两段描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。

上面的demo是使用synchronized修饰代码块的demo,下面我们看一个使用synchronized修饰方法的demo:

1
2
3
4
5
public class SynchronizedMethod {
public synchronized void method() {
System.out.println("Hello World!");
}
}

我们继续对这个类进行反编译:

http://7niucdn.wenchao.ren/20190902125437.png

从反编译的结果来看,方法的同步并没有通过指令monitorentermonitorexit来完成(虽然理论上其实也可以通过这两条指令来实现),不过相对于普通方法,其常量池中多了ACC_SYNCHRONIZED标示符。JVM就是根据该标示符来实现方法的同步的:

  • 当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。
  • 在方法执行期间,其他任何线程都无法再获得同一个monitor对象。 其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。

观城模型

Synchronized 锁升级

这四种锁是指锁的状态,专门针对synchronized的。在介绍这四种锁状态之前还需要介绍一些额外的知识。

首先为什么Synchronized能实现线程同步?在回答这个问题之前我们需要了解两个重要的概念:Java对象头Monitor

Java对象头

synchronized是悲观锁,在操作同步资源之前需要给同步资源先加锁,这把锁就是存在Java对象头里的,而Java对象头又是什么呢?
我们以Hotspot虚拟机为例,Hotspot的对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针)。如下面的一个示例:

java object示意图

我们以64位虚拟机来说,object header的结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|------------------------------------------------------------------------------------------------------------|--------------------|
| Object Header (128 bits) | State |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| Mark Word (64 bits) | Klass Word (64 bits) | |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Normal |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Biased |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| ptr_to_lock_record:62 | lock:2 | OOP to metadata object | Lightweight Locked |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| ptr_to_heavyweight_monitor:62 | lock:2 | OOP to metadata object | Heavyweight Locked |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| | lock:2 | OOP to metadata object | Marked for GC |
|------------------------------------------------------------------------------|-----------------------------|--------------------|

Mark Word

Mark Word默认存储:

  • 对象的HashCode
  • 分代年龄
  • 锁标志位信息

这些信息都是与对象自身定义无关的数据,所以Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。

Klass Point

Klass Point:对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。

Monitor

Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁。
Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。

synchronized通过Monitor来实现线程同步,Monitor是依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的线程同步。

synchronized最初实现同步的方式是阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长,这就是JDK 6之前synchronized效率低的原因。这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”,JDK 6中为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。

所以目前 锁一共有4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。锁状态只能升级不能降级

下面是出四种锁状态对应的的Mark Word内容,然后再分别讲解四种锁状态的思路以及特点:

上下两幅图对照着看,我们就能够清楚的知道在不同的锁状态下,mack word区域存储的内容的不同了。

无锁

无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。

无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。

在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。,也就是说偏向锁一般是一个线程的事情

当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

偏向锁在JDK 6及以后的JVM里是默认启用的。可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。

轻量级锁

轻量级锁是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。因此一般情况下轻量级锁大多数是2个线程的事情。

在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。

拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。

  • 如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。
  • 如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。

若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。

重量级锁

升级为重量级锁时,锁标志的状态值变为“10”,此时Mark Word中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。

整体的锁状态升级流程如下:

无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

综上,偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。

参考资料

ThreadPoolExecutor相关

java中的线程池相关的东西抛不开ThreadPoolExecutor,本文就简单的说说这个ThreadPoolExecutor

先看一个ThreadPoolExecutor的demo,然后我们说说它的相关参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test {
private static ThreadPoolExecutor threadPoolExecutor;

public static void main(String[] args) {
threadPoolExecutor = new ThreadPoolExecutor(
4, 8, 0, TimeUnit.MICROSECONDS,
new LinkedBlockingQueue<>(100), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
});
System.out.println(threadPoolExecutor.getCorePoolSize()); //4
System.out.println(threadPoolExecutor.getMaximumPoolSize()); //8
System.out.println(threadPoolExecutor.getPoolSize());//0
boolean b = threadPoolExecutor.prestartCoreThread();
System.out.println(threadPoolExecutor.getCorePoolSize());//4
System.out.println(threadPoolExecutor.getMaximumPoolSize());//8
System.out.println(threadPoolExecutor.getPoolSize());//1
int i = threadPoolExecutor.prestartAllCoreThreads();
System.out.println(threadPoolExecutor.getCorePoolSize());//4
System.out.println(threadPoolExecutor.getMaximumPoolSize());//8
System.out.println(threadPoolExecutor.getPoolSize());//4
}
}

参数介绍

ThreadPoolExecutor的几个参数是必须要清楚的:

  • corePoolSize
    • 线程池中的核心线程数
  • maximumPoolSize
    • 线程池最大线程数,它表示在线程池中最多能创建多少个线程
  • keepAliveTime
    • 线程池中非核心线程闲置超时时长(准确来说应该是没有任务执行时的回收时间)
    • 一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉
    • 如果设置allowCoreThreadTimeOut(boolean value),则会作用于核心线程, 也就是说当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收
  • TimeUnit
    • 时间单位。可选的单位有分钟(MINUTES),秒(SECONDS),毫秒(MILLISECONDS) 等
  • BlockingQueue
    • 任务的阻塞队列,缓存将要执行的Runnable任务,由各线程轮询该任务队列获取任务执行。可以选择以下几个阻塞队列
      • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
      • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
      • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
      • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • ThreadFactory
    • 线程创建的工厂。可以进行一些属性设置,比如线程名,优先级等等,有默认实现。
  • RejectedExecutionHandler
    • 任务拒绝策略,当运行线程数已达到maximumPoolSize,并且队列也已经装满时会调用该参数拒绝任务,默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
      • AbortPolicy:直接抛出异常, 这个是默认的拒绝策略。
      • CallerRunsPolicy:只用调用者所在线程来运行任务。
      • DiscardOldestPolicy:丢弃队列里最早的一个任务,并执行当前任务。
      • DiscardPolicy:不处理,丢弃掉。

运行原理

  • 初始时,线程池中的线程数为0,这一点从上面的demo输出可以看到
  • 当线程池中线程数小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
  • 当线程池中线程数达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行 。
  • workQueue已满,且maximumPoolSize > corePoolSize时,新提交任务会创建新线程执行任务。注意,新手容易犯的一个错是使用的是无界的workQueue,导致workQueue一直满不了,进而无法继续创建线程
  • workQueue已满,且提交任务数超过maximumPoolSize,任务由RejectedExecutionHandler处理。
  • 当线程池中线程数超过corePoolSize,且超过这部分的空闲时间达到keepAliveTime时,回收这些线程。
  • 当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize范围内的线程空闲时间达到keepAliveTime也将回收。

一般流程图

线程池的一般流程图

newFixedThreadPool 流程图

1
2
3
4
5
6
7
8
9
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(
nThreads, // corePoolSize
nThreads, // maximumPoolSize == corePoolSize
0L, // 空闲时间限制是 0
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>() // 无界阻塞队列
);
}

newFixedThreadPool 流程图

newCacheThreadPool 流程图

1
2
3
4
5
6
7
8
9
10
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(
0, // corePoolSoze == 0
Integer.MAX_VALUE, // maximumPoolSize 非常大
60L, // 空闲判定是60 秒
TimeUnit.SECONDS,
// 神奇的无存储空间阻塞队列,每个 put 必须要等待一个 take
new SynchronousQueue<Runnable>()
);
}

newCacheThreadPool 流程图

newSingleThreadPool

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static ExecutorService newSingleThreadExecutor() {
return
new FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor
(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory
)
);
}

可以看到除了多了个FinalizableDelegatedExecutorService 代理,其初始化和newFiexdThreadPoolnThreads = 1的时候是一样的。
区别就在于:

  • newSingleThreadExecutor返回的ExcutorService在析构函数finalize()处会调用shutdown()
  • 如果我们没有对它调用shutdown(),那么可以确保它在被回收时调用shutdown()来终止线程。
    流程图略,请参考 newFiexdThreadPool,这里不再累赘。

java 常见的OOM case

OOMjava.lang.OutOfMemoryError异常的简称,在日常工作中oom还算是比较常见的一种问题吧。出现OOM意味着jvm已经无法满足新对象对内存的申请了,本文整理了一下oom的常见case和一般情况下的解决方法。

处理OOM问题,绝大多数情况下jmapMAT工具可以解决99%的问题。

Java heap space

表现现象为:

1
java.lang.OutOfMemoryError: Java heap space

可能的原因

  • 内存泄漏
  • 堆大小设置不合理
  • JVM处理引用不及时,导致内存无法释放
  • 代码中可能存在大对象分配

解决办法

  • 一般情况下,都是先通过jmap命令,把堆内存dump下来,使用mat工具分析一下,检查是否因为代码问题,存在内存泄露
  • 也可能是下游服务出问题,导致内存中的数据不能很快的处理掉,进而引起oom
  • 调整-Xmx参数,加大堆内存
  • 还有一点容易被忽略,检查是否有大量的自定义的 Finalizable 对象,也有可能是框架内部提供的,考虑其存在的必要性

PermGen space

永久代是HotSot虚拟机对方法区的具体实现,存放了被虚拟机加载的类信息、常量、静态变量、JIT编译后的代码等。

一般情况下的异常表现为:

1
java.lang. OutOfMemoryError : PermGen space

可能的原因

  • 在Java7之前,频繁的错误使用String.intern()方法
  • 运行期间生成了大量的代理类,导致方法区被撑爆,无法卸载

解决办法

  • 检查是否永久代空间是否设置的过小
  • 检查代码中是否存错误的创建过多的代理类

Metaspace

JDK8后,元空间替换了永久带,元空间使用的是本地内存,还有其它细节变化:

  • 字符串常量由永久代转移到堆中
  • 和永久代相关的JVM参数已移除

一般情况下的异常表现为:

1
java.lang.OutOfMemoryError: Metaspace

可能的原因

类似PermGen space

解决办法

  • 通过命令行设置 -XX: MaxMetaSpaceSize 增加 metaspace 大小,或者取消-XX: maxmetsspacedize
  • 其他类似PermGen space

unable to create new native Thread

这种情况的一般表现为:

1
java.lang.OutOfMemoryError: unable to create new native Thread

可能的原因

出现这种异常,基本上都是创建的了大量的线程导致的

解决办法

程序运行期间,间隔多次打印jstack,然后查看线程数的变化情况,找出增长快速的线程。

GC overhead limit exceeded

这种情况其实一般情况下遇到的不是太多,他的一般表现为:

1
java.lang.OutOfMemoryError:GC overhead limit exceeded

可能的原因

这个是JDK6新加的错误类型,一般都是堆太小导致的。Sun 官方对此的定义:超过98%的时间用来做GC并且回收了不到2%的堆内存时会抛出此异常。

解决办法

  • 检查项目中是否有大量的死循环或有使用大内存的代码,优化代码。
  • 添加参数-XX:-UseGCOverheadLimit禁用这个检查,其实这个参数解决不了内存问题,只是把错误的信息延后,最终出现 java.lang.OutOfMemoryError: Java heap space
  • dump内存,检查是否存在内存泄露,如果没有,加大内存。

java.lang.OutOfMemoryError: Out of swap space

1
java.lang.OutOfMemoryError: request size bytes for reason. Out of swap space

Oracle的官方解释是,本地内存(native heap)不够用导致的。
错误日志的具体路径可以通过JVM启动参数配置。如:-XX:ErrorFile=/var/log/java/java_error.log

解决方法

  • 其它服务进程可以选择性的拆分出去
  • 加大swap分区大小,或者加大机器内存大小

stack_trace_with_native_method

1
java.lang.OutOfMemoryError: reason stack_trace_with_native_method

一般出现该错误时,线程正在执行一个本地方法。也就是说执行本地方法时内存不足。一般需要结合其他系统级工具进行排查

Compressed class space

1
java.lang.OutOfMemoryError: Compressed class space

可通过JVM启动参数配置增大相应的内存区域。如:-XX:CompressedClassSpaceSize=2g

Requested array size exceeds VM limit

1
java.lang.OutOfMemoryError: Requested array size exceeds VM limit

程序试图创建一个数组时,该数组的大小超过了剩余可用(连续空间)的堆大小。
出现这种情况可能是堆设置得太小了。也有可能是程序在选择数组容量大小的逻辑有问题.

构造函数中使用Spring @Value注解

如果想在构造函数中使用的@value注解的话,demo如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// File: sample/Message.groovy
package sample

import org.springframework.beans.factory.annotation.*
import org.springframework.stereotype.*

@Component
class Message {

final String text

// Use @Autowired to get @Value to work.
@Autowired
Message(
// Refer to configuration property
// app.message.text to set value for
// constructor argument message.
@Value('${app.message.text}') final String text) {
this.text = text
}

}

bloom filter

当系统设计中出现多级缓存结构时,为了防止大量不存在的key值击穿高速缓存(比如主存),去直接访问低速缓存(如本地磁盘),我们一般需要将这部分key值,直接拦截在高速缓存阶段。这里,当然可以使用普通的hash table,也可以使用bitmap,但是这两种方式都比较耗费内存,当面对海量key值时,问题会变得更加严重。这时,就该介绍我们的主角bloom filter出场了。

一般的,bloom filter用于判断一个key值是否在一个set中,拥有比hash table/bitmap更好的空间经济性。如果bloom filter指示一个key值“不在”一个set中,那么这个判断是100%准确的。这样的特性,非常适合于上述的缓存场景。

bloom filter原理

  • 首先估计要判断的set中的元素个数N,然后选定k个独立的哈希函数。根据N和k,选定一个长度为M的bit array。

  • 遍历set中的N个元素

    • 对每个元素,使用k个哈希函数,得到k个哈希值(一般为一个大整数)
    • 将上述bit array中,k个哈希值所对应的bit置1
  • 对于需要判断的key值
    • 使用k个哈希函数,得到k个哈希值
    • 如果k个哈希值所对应的bit array中的值均为1,则判断此值在set中“可能”存在;否则,判定“一定”不存在

根据上面的原理我们其实可以看到,bloom filter有以下特点:

  • 比较节省空间
  • bloom的识别准确率和数据大小,k个哈希函数有关
  • 如果bloom filter判断key不存在,那么就一定不存在,100%不存在。
  • 如果bloom filter判断key存在,那么可能存在,也可能不存在

bloom filter优缺点

优点:

  • 插入、查找都是常数时间

  • 多个hash函数之间互相独立,可以并行计算

  • 不需要存储元素本身,从而带来空间效率优势,以及一些保密上的优势

  • bloom filter的bitmap可以进行交、并、差运算

缺点:

  • 判断元素是否在集合中的结果其实是不准确的
  • bloom filter中的元素是不能删除的

bloom filter的实际使用

guava bloom filter

guava中提供了bloom filter的一种实现:com.google.common.hash.BloomFilter,可以方便我们在单机情况下使用bloom filter。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
/**
* A Bloom filter for instances of {@code T}. A Bloom filter offers an approximate containment test
* with one-sided error: if it claims that an element is contained in it, this might be in error,
* but if it claims that an element is <i>not</i> contained in it, then this is definitely true.
*
* <p>If you are unfamiliar with Bloom filters, this nice
* <a href="http://llimllib.github.com/bloomfilter-tutorial/">tutorial</a> may help you understand
* how they work.
*
* <p>The false positive probability ({@code FPP}) of a bloom filter is defined as the probability
* that {@linkplain #mightContain(Object)} will erroneously return {@code true} for an object that
* has not actually been put in the {@code BloomFilter}.
*
* <p>Bloom filters are serializable. They also support a more compact serial representation via the
* {@link #writeTo} and {@link #readFrom} methods. Both serialized forms will continue to be
* supported by future versions of this library. However, serial forms generated by newer versions
* of the code may not be readable by older versions of the code (e.g., a serialized bloom filter
* generated today may <i>not</i> be readable by a binary that was compiled 6 months ago).
*
* @param <T> the type of instances that the {@code BloomFilter} accepts
* @author Dimitris Andreou
* @author Kevin Bourrillion
* @since 11.0
*/
@Beta
public final class BloomFilter<T> implements Predicate<T>, Serializable {

/** The bit set of the BloomFilter (not necessarily power of 2!) */
private final BitArray bits;

/** Number of hashes per element */
private final int numHashFunctions;

/** The funnel to translate Ts to bytes */
private final Funnel<? super T> funnel;

/**
* The strategy we employ to map an element T to {@code numHashFunctions} bit indexes.
*/
private final Strategy strategy;

/**
* Creates a BloomFilter.
*/
private BloomFilter(
BitArray bits, int numHashFunctions, Funnel<? super T> funnel, Strategy strategy) {
checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
checkArgument(
numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
this.bits = checkNotNull(bits);
this.numHashFunctions = numHashFunctions;
this.funnel = checkNotNull(funnel);
this.strategy = checkNotNull(strategy);
}


/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and
* expected false positive probability.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @param fpp the desired false positive probability (must be positive and less than 1.0)
* @return a {@code BloomFilter}
*/
public static <T> BloomFilter<T> create(
Funnel<? super T> funnel, int expectedInsertions, double fpp) {
return create(funnel, (long) expectedInsertions, fpp);
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and
* expected false positive probability.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @param fpp the desired false positive probability (must be positive and less than 1.0)
* @return a {@code BloomFilter}
* @since 19.0
*/
public static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp) {
return create(funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64);
}

@VisibleForTesting
static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
checkNotNull(funnel);
checkArgument(
expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
checkNotNull(strategy);

if (expectedInsertions == 0) {
expectedInsertions = 1;
}
/*
* TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size
* is proportional to -log(p), but there is not much of a point after all, e.g.
* optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares!
*/
long numBits = optimalNumOfBits(expectedInsertions, fpp);
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
try {
return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
}
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and a
* default expected false positive probability of 3%.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @return a {@code BloomFilter}
*/
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, int expectedInsertions) {
return create(funnel, (long) expectedInsertions);
}

/**
* Creates a {@link BloomFilter BloomFilter<T>} with the expected number of insertions and a
* default expected false positive probability of 3%.
*
* <p>Note that overflowing a {@code BloomFilter} with significantly more elements than specified,
* will result in its saturation, and a sharp deterioration of its false positive probability.
*
* <p>The constructed {@code BloomFilter<T>} will be serializable if the provided
* {@code Funnel<T>} is.
*
* <p>It is recommended that the funnel be implemented as a Java enum. This has the benefit of
* ensuring proper serialization and deserialization, which is important since {@link #equals}
* also relies on object identity of funnels.
*
* @param funnel the funnel of T's that the constructed {@code BloomFilter<T>} will use
* @param expectedInsertions the number of expected insertions to the constructed
* {@code BloomFilter<T>}; must be positive
* @return a {@code BloomFilter}
* @since 19.0
*/
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions) {
return create(funnel, expectedInsertions, 0.03); // FYI, for 3%, we always get 5 hash functions
}

// other codes ...
}

基本使用:

1
2
3
4
5
6
7
8
9
10
BloomFilter<Integer> filter = BloomFilter.create(Funnels.integerFunnel(),500,0.01);

filter.put(1);
filter.put(2);
filter.put(3);

assertThat(filter.mightContain(1)).isTrue();
assertThat(filter.mightContain(2)).isTrue();
assertThat(filter.mightContain(3)).isTrue();
assertThat(filter.mightContain(100)).isFalse();

正确估计预期插入数量是很关键的一个参数。当插入的数量接近或高于预期值的时候,布隆过滤器将会填满,这样的话,它会产生很多无用的误报点。

不过也有文章指出,guava的bloom filter在数据量变大以后,准确性大大降低,这个虽然本身就是bloom filter的特性,但是在这篇文章中也给出了一些参考值,大家可以看看: google guava bloom filter包的坑 :

在0.0001的错误率下,插入量不到1.5亿的时候,numBits已经到达了BitArray的最大容量了,这时如果再增加插入量,哈希函数个数就开始退化。到5亿的时候,哈希函数个数退化到了只有3个,也就是说,对每一个key,只有3位来标识,这时准确率就会大大下降。
第一种当然就是减少预期插入量,1亿以内,还是可以保证理论上的准确率的。
第二种,如果你的系统很大,就是会有上亿的key,这时可以考虑拆分,将一个大的bloom filter拆分成几十个小的(比如32或64个),每个最多可以容纳1亿,这时整体就能容纳32或64亿的key了。查询的时候,先对key计算一次哈希,然后取模,查找特定的bloom filter即可。

基于redis的bloom filter

guava中的bloom filter是单机的,如果想使用分布式的的话,可以考虑基于redis 的bloom filter。

主要参考这个文章:ReBloom – Bloom Filter Datatype for Redis

redis 在 4.0 的版本中加入了 module 功能,布隆过滤器可以通过module的形式添加到 redis 中,所以使用 redis 4.0 以上的版本可以通过加载 module 来使用 redis 中的布隆过滤器。但是这不是最简单的方式,使用 docker 可以直接在 redis 中体验布隆过滤器。

1
2
> docker run -d -p 6379:6379 --name bloomfilter redislabs/rebloom
> docker exec -it bloomfilter redis-cli

redis 布隆过滤器主要就两个命令:

  • bf.add 添加元素到布隆过滤器中:bf.add urls https://jaychen.cc
  • bf.exists 判断某个元素是否在过滤器中:bf.exists urls https://jaychen.cc

上面说过布隆过滤器存在误判的情况,在 redis 中有两个值决定布隆过滤器的准确率:

  • error_rate:允许布隆过滤器的错误率,这个值越低过滤器的位数组的大小越大,占用空间也就越大。
  • initial_size:布隆过滤器可以储存的元素个数,当实际存储的元素个数超过这个值之后,过滤器的准确率会下降。

redis 中有一个命令可以来设置这两个值:

1
bf.reserve urls 0.01 100

上面三个参数的含义为:

  • 第一个值是过滤器的名字。
  • 第二个值为 error_rate 的值。
  • 第三个值为 initial_size 的值。

使用这个命令要注意一点:执行这个命令之前过滤器的名字应该不存在,如果执行之前就存在会报错:(error) ERR item exists

参考文章:

使用阿里云maven镜像加速

maven是一个好东西,但是默认情况下,maven使用的是中央仓央是:http://repo1.maven.org/maven2http://uk.maven.org/maven2。这两个镜像在国内
访问其实是比较慢的,因此我们需要尽可能使用国内同步好的镜像。

我在国内选择的是阿里云的镜像:公共代理库

maven的配置为:打开maven的配置文件(windows机器一般在maven安装目录的conf/settings.xml),在<mirrors></mirrors>标签中添加mirror子节点:

1
2
3
4
5
6
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>

其他的如gradle的配置指南请参见公共代理库中描述的那样操作就好了。

但是一般情况下在公司开发的时候,公司也会有自己的maven镜像仓库,这个时候搞多个mirror就好了。

参考资料:

java.nio.ByteBuffer

Java NIO Buffers用于和NIO Channel交互。 我们从Channel中读取数据到buffers里,从Buffer把数据写入到Channels。Buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。java中java.nio.Buffer的常见实现类如下,不过我们这里只说一下ByteBuffer这个实现。

java中`java.nio.Buffer`的常见实现类

Buffer的重要属性

Buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据,为了便于理解,你可以把它理解为一个字节数组。它有有四个重要属性:

1
2
3
4
5
6
7
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
}
  • capacity
    • 这个属性表示这个Buffer最多能放多少数据,在创建buffer的时候指定。int类型。
  • position
    下一个要读写的元素位置(从0开始),当使用buffer的相对位置进行读/写操作时,读/写会从这个下标进行,并在操作完成后,buffer会更新下标的值。
    • 写模式:当写入数据到Buffer的时候需要从一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.
    • 读模式:当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归0,每次读取后,position向后移动。
  • limit
    在Buffer上进行的读写操作都不能越过这个limit。
    • 写模式:limit的含义是我们所能写入的最大数据量,它等同于buffer的容量capacity
    • 读模式:limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。换句话说,您可以读取与写入数量相同的字节数。
  • mark
    • 一个临时存放的位置下标,用户选定的position的前一个位置或-1。
      • 调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设
        置为mark的值。mark的值总是小于等于position的值,如果将position的值设的比mark小,当前的mark值会被抛弃掉。

注:

  • position和limit之间的距离指示了可读/存的字节数。
  • boolean hasRemaining():当缓冲区至少还有一个元素时,返回true。
  • int remaining():position和limit之间字节个数。

这些属性总是满足以下条件:

0 <= mark <= position <= limit <= capacity

通过上面的描述可以看出,其中position和limit的具体含义取决于当前buffer的模式(读模式还是写模式)。capacity在两种模式下都表示容量。

Buffer的常见API

  • ByteBuffer allocate(int capacity)
    • 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,实现类是HeapByteBuffer
  • ByteBuffer allocateDirect(int capacity)
    • 类似于allocate方法,不过使用的是对外内存,实现类是DirectByteBuffer
  • ByteBuffer wrap(byte[] array)
    • 把byte数组包装为ByteBuffer,bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方。其实ByteBuffer底层本来就有一个bytes数组负责来保存buffer缓冲区中的数据,通过allocate方法系统会帮你构造一个byte数组,实现类是HeapByteBuffer
  • ByteBuffer wrap(byte[] array, int offset, int length)
    • 在上一个方法的基础上可以指定偏移量和长度,buffer的capacity就是array.length,这个offset也就是包装后byteBuffer的position,limit=length+position(offset),mark=-1实现类是HeapByteBuffer
  • abstract Object array()
    • 返回支持此缓冲区的数组 (可选操作)
  • abstract int arrayOffset()
    • 返回此缓冲区中的第一个元素在缓冲区的底层实现数组中的偏移量(可选操作)。调用此方法之前要调用hasarray方法,以确保此缓冲区具有可访问的底层实现数组。
  • abstract boolean hasArray()
    • 告诉这个缓冲区是否由可访问的数组支持
  • int capacity()
    • 返回此缓冲区的容量
  • Buffer clear()
    • 清除此缓存区。将position = 0;limit = capacity;mark = -1;把position设为0,一般在把数据写入Buffer前调用。
  • Buffer flip()
    • flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。一般在从Buffer读出数据前调用。
  • abstract boolean isDirect()
    • 判断个缓冲区是否为direct, 也就是是否是对外内存
  • abstract boolean isReadOnly()
    • 判断告知这个缓冲区是否是只读的
  • int limit()
    • 返回此缓冲区的limit的属性值
  • Buffer position(int newPosition)
    • 设置这个缓冲区的位置
  • boolean hasRemaining()
    • return position < limit,返回是否还有未读内容
  • int remaining()
    • return limit - position; 返回limit和position之间相对位置差
  • Buffer rewind()
    • 把position设为0,mark设为-1,不改变limit的值,一般在把数据重写入Buffer前调用
  • Buffer mark()
    • 设置mark的值,mark=position,做个标记
  • Buffer reset()
    • 还原标记,position=mark。
  • ByteBuffer compact()
    • 该方法的作用是将 position 与 limit之间的数据复制到buffer的开始位置,与limit 之间没有数据的话发,就不会进行复制。一个例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
例如:ByteBuffer.allowcate(10); 
内容:[0 ,1 ,2 ,3 4, 5, 6, 7, 8, 9]
## compact前
[0 ,1 ,2 , 3, 4, 5, 6, 7, 8, 9]
pos=4
lim=10
cap=10
## compact后
[4, 5, 6, 7, 8, 9, 6, 7, 8, 9]
pos=6
lim=10
cap=10
  • ByteBuffer slice();
    • 创建一个分片缓冲区。分片缓冲区与主缓冲区共享数据。 分配的起始位置是主缓冲区的position位置,容量为limit-position。 分片缓冲区无法看到主缓冲区positoin之前的元素。

创建buffer的注意点

创建ByteBuffer可以使用allocate或者wrap,就像下面这样:

  • ByteBuffer allocate(int capacity)
  • ByteBuffer allocateDirect(int capacity)
  • ByteBuffer wrap(int capacity)
  • ByteBuffer wrap(byte[] array,int offset,int length)

需要注意的是:创建的缓冲区都是定长的,大小无法改变。若发现刚创建的缓冲区容量太小,只能重新创建一个合适的

关于ByteBuffer wrap(byte[] array,int offset,int length)这个方法,这里再强调一下,这样创建的ByteBuffer的capacity和array的大小是一样的,buffer的position是offset, buffer的limit是offset+length,position之前和limit之后的数据依然可以访问到。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Demo {
public static void main(String args[]){
byte arr[]=new byte[100];
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
System.out.println("Capacity is: "+buffer.capacity());
System.out.println("Position is: "+buffer.position());
System.out.println("limit is: "+buffer.limit());
}
}
//结果:
Capacity is: 100
Position is: 3
limit is: 28

allocate和wrap的区别

wrap只是简单地创建一个具有指向被包装数组的引用的缓冲区,该数组成为后援数组。对后援数组中的数据做的任何修改都将改变缓冲区中的数据,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String args[]){  
byte arr[]=new byte[100];
//将arr数组全部置为1
Arrays.fill(arr, (byte)1);
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
//对后援数组中的数据做的任何修改都将改变缓冲区中的数据
arr[0]=(byte)2;
buffer.position(0);
System.out.println(buffer.get());
//在缓冲区上调用array()方法即可获得后援数组的引用。
System.out.println(Arrays.toString(buffer.array()));
}
//运行结果:
2
[2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ....]//总共100个元素

allocate则是创建了自己的后援数组,在缓冲区上调用array()方法也可获得后援数组的引用。通过调用arrayOffset()方法,可以获取缓冲区中第一个元素在后援数组的偏移量。但是使用wrap创建的ByteBuffer,调用arrayOffset永远是0。

1
2
3
4
5
6
7
8
9
10
11
12
13
 public static void main(String args[]){  
ByteBuffer buffer=ByteBuffer.allocate(100);
//对后援素组的修改也可以反映到buffer上
byte arr[]=buffer.array();
arr[1]=(byte)'a';
buffer.getInt();

System.out.println(Arrays.toString(buffer.array()));
System.out.println(buffer.arrayOffset());
}
//运行结果:
[0, 97, 0, 0, 0, 0, 0, 0,...]//总共100个元素
0

通过ByteBuffer allocateDirect(int capacity)创建的叫直接缓冲区,使用的堆外内存。可以通过isDirect()方法查看一个缓冲区是否是直接缓冲区。由于直接缓冲区是没有后援数组的,所以在其上面调用array()或arrayOffset()都会抛出UnsupportedOperationException异常。注意有些平台或JVM可能不支持这个创建直接缓冲区。

图解

put

写模式下,往buffer里写一个字节,并把postion移动一位。写模式下,一般limit与capacity相等。

bytebuffer-put

flip

写完数据,需要开始读的时候,将postion复位到0,并将limit设为当前postion。

bytebuffer-flip

get

从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。

bytebuffer-get

clear

将position置为0,并不清除buffer内容。

bytebuffer-clear

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Demo {
private static final int SIZE = 1024;

public static void main(String[] args) throws Exception {
// 获取通道,该通道允许写操作
FileChannel fc = new FileOutputStream("data.txt").getChannel();
// 将字节数组包装到缓冲区中
fc.write(ByteBuffer.wrap("Some text".getBytes()));
// 关闭通道
fc.close();

// 随机读写文件流创建的管道
fc = new RandomAccessFile("data.txt", "rw").getChannel();
// fc.position()计算从文件的开始到当前位置之间的字节数
System.out.println("此通道的文件位置:" + fc.position());
// 设置此通道的文件位置,fc.size()此通道的文件的当前大小,该条语句执行后,通道位置处于文件的末尾
fc.position(fc.size());
// 在文件末尾写入字节
fc.write(ByteBuffer.wrap("Some more".getBytes()));
fc.close();

// 用通道读取文件
fc = new FileInputStream("data.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
// 将文件内容读到指定的缓冲区中
fc.read(buffer);
//此行语句一定要有, 如果没有,就是从文件最后开始读取的,当然读出来的都是byte=0时候的字符。通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char)buffer.get());
}
fc.close();
}
}

java中的zero copy

在web应用程序中,我们经常会在server和client之间传输数据。比如server发数据给client,server首先将数据从硬盘读出之后,然后原封不动的通过socket传输给client,大致原理如下:

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

下面的例子展示了传统的数据复制实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class TraditionalClient {


public static void main(String[] args) {

int port = 2000;
String server = "localhost";
Socket socket = null;
String lineToBeSent;

DataOutputStream output = null;
FileInputStream inputStream = null;
int ERROR = 1;


// connect to server
try {
socket = new Socket(server, port);
System.out.println("Connected with server " +
socket.getInetAddress() +
":" + socket.getPort());
}
catch (UnknownHostException e) {
System.out.println(e);
System.exit(ERROR);
}
catch (IOException e) {
System.out.println(e);
System.exit(ERROR);
}

try {
String fname = "sendfile/NetworkInterfaces.c";
inputStream = new FileInputStream(fname);

output = new DataOutputStream(socket.getOutputStream());
long start = System.currentTimeMillis();
byte[] b = new byte[4096];
long read = 0, total = 0;
while ((read = inputStream.read(b)) >= 0) {
total = total + read;
output.write(b);
}
System.out.println("bytes send--" + total + " and totaltime--" + (System.currentTimeMillis() - start));
}
catch (IOException e) {
System.out.println(e);
}

try {
output.close();
socket.close();
inputStream.close();
}
catch (IOException e) {
System.out.println(e);
}
}
}

这种操作看起来可能不会怎么消耗CPU,但是实际上它是低效的。因为传统的 Linux 操作系统的标准 I/O 接口是基于数据拷贝操作的,即 I/O 操作会导致数据在操作系统内核地址空间的缓冲区和应用程序地址空间定义的缓冲区之间进行传输。如下图:

Traditional data copying approach

  • 数据首先被从磁盘读取到内核的read buffer
  • 然后在从内核的read buffer中复制到应用程序的buffer中
  • 然后在从应用程序的buffer中复制到内核的socket buffer
  • 最后在从内核的socket buffer中复制到网卡中

然后其中涉及了4次上下文切换:

Traditional context switches

分析上面的描述,我们可以看到kernel buffer其实在这个过程中充当了一个ahead cache。之所以引入这个kernel buffer其实是在很多的情况下是可以减少磁盘 I/O 的操作,进而提升效率的。

  • 比如对于读请求,如果我们所请求的数据的大小小于kernel buffer并且如果要读取的数据已经存放在操作系统的高速缓冲存储器中,那么就不需要再进行实际的物理磁盘 I/O 操作。直接从kernel buffer中读取就好了。
  • 对于写请求:利用这个ahead cache可以实现异步写操作

但是没有银弹,这样会带来一个问题就是,如果当我们请求的数据量的大小远大于kernel buffer的大小的话,这种情况下kernel buffer的存在反而会导致数据在kernel buffer用户缓冲区之间多次复制。

java中的zero copy

如果我们想在java中使用zero copy,我们一般会用java.nio.channels.FileChannel类中的transferTo()方法。下面是它的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Transfers bytes from this channel's file to the given writable byte
* channel.
*
* <p> An attempt is made to read up to <tt>count</tt> bytes starting at
* the given <tt>position</tt> in this channel's file and write them to the
* target channel. An invocation of this method may or may not transfer
* all of the requested bytes; whether or not it does so depends upon the
* natures and states of the channels. Fewer than the requested number of
* bytes are transferred if this channel's file contains fewer than
* <tt>count</tt> bytes starting at the given <tt>position</tt>, or if the
* target channel is non-blocking and it has fewer than <tt>count</tt>
* bytes free in its output buffer.
*
* <p> This method does not modify this channel's position. If the given
* position is greater than the file's current size then no bytes are
* transferred. If the target channel has a position then bytes are
* written starting at that position and then the position is incremented
* by the number of bytes written.
*
* <p> This method is potentially much more efficient than a simple loop
* that reads from this channel and writes to the target channel. Many
* operating systems can transfer bytes directly from the filesystem cache
* to the target channel without actually copying them. </p>
*
* @param position
* The position within the file at which the transfer is to begin;
* must be non-negative
*
* @param count
* The maximum number of bytes to be transferred; must be
* non-negative
*
* @param target
* The target channel
*
* @return The number of bytes, possibly zero,
* that were actually transferred
*
* @throws IllegalArgumentException
* If the preconditions on the parameters do not hold
*
* @throws NonReadableChannelException
* If this channel was not opened for reading
*
* @throws NonWritableChannelException
* If the target channel was not opened for writing
*
* @throws ClosedChannelException
* If either this channel or the target channel is closed
*
* @throws AsynchronousCloseException
* If another thread closes either channel
* while the transfer is in progress
*
* @throws ClosedByInterruptException
* If another thread interrupts the current thread while the
* transfer is in progress, thereby closing both channels and
* setting the current thread's interrupt status
*
* @throws IOException
* If some other I/O error occurs
*/
public abstract long transferTo(long position, long count,
WritableByteChannel target)
throws IOException;

transferTo()方法把数据从file channel传输到指定的writable byte channel。它需要底层的操作系统支持zero copy。在UNIX和各种Linux中,会执行系统调用sendfile(),该命令把数据从一个文件描述符传输到另一个文件描述符(Linux中万物皆文件):

1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

因此传统的方式中的

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

可以被transferTo()替代。下面的图展示了使用transferTo(), 也就是zero copy技术后的流程:

 Data copy with transferTo()

Context switching with transferTo()

  • transferTo()方法使文件内容被DMA引擎复制到读缓冲区中。 然后,内核将数据复制到与输出套接字关联的内核缓冲区中。
  • 第三个副本发生在DMA引擎将数据从内核套接字缓冲区传递到协议引擎时。

这是一个很明显的进步:我们把context switch的次数从4次减少到了2次,同时也把data copy的次数从4次降低到了3次(而且其中只有一次占用了CPU,另外两次由DMA完成)。但是,要做到zero copy,这还差得远。

如果网卡支持gather operation,我们可以通过kernel进一步减少数据的拷贝操作。在2.4及以上版本的linux内核中,开发者修改了socket buffer descriptor来适应这一需求。这个方法不仅减少了context switch,还消除了和CPU有关的数据拷贝。使用层面的使用方法没有变,但是内部原理却发生了变化:

Data copies when transferTo() and gather operations are used

  • transferTo()方法使文件内容被DMA引擎复制到内核缓冲区中。
  • 没有数据被复制到套接字缓冲区中。 相反,只有具有有关数据位置和长度信息的描述符才会附加到套接字缓冲区。 DMA引擎将数据直接从内核缓冲区传递到协议引擎,从而消除了剩余的最终CPU副本。

下面这个例子展示了如何使用transferTo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class TransferToClient {

public static void main(String[] args) throws IOException{
TransferToClient sfc = new TransferToClient();
sfc.testSendfile();
}
public void testSendfile() throws IOException {
String host = "localhost";
int port = 9026;
SocketAddress sad = new InetSocketAddress(host, port);
SocketChannel sc = SocketChannel.open();
sc.connect(sad);
sc.configureBlocking(true);

String fname = "sendfile/NetworkInterfaces.c";
long fsize = 183678375L, sendzise = 4094;

// FileProposerExample.stuffFile(fname, fsize);
FileChannel fc = new FileInputStream(fname).getChannel();
long start = System.currentTimeMillis();
long nsent = 0, curnset = 0;
curnset = fc.transferTo(0, fsize, sc);
System.out.println("total bytes transferred--"+curnset+" and time taken in MS--"+(System.currentTimeMillis() - start));
//fc.close();
}
}

参考资料

Iterable和Iterator结合使用的一个小例子

这篇文章主要是记录一下使用Iterable和Iterator用作迭代处理的一个例子。基于这种模式可以很方便的实现
流式处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public class Array<T> implements Iterable<T> {
T[] values; // this contains the actual elements of the array

// Constructor that takes a "raw" array and stores it
public Array(T[] values) {
this.values = values;
}

// This is a private class that implements iteration over the elements
// of the list. It is not accessed directly by the user, but is used in
// the iterator() method of the Array class. It implements the hasNext()
// and next() methods.
class ArrayIterator implements Iterator<T> {
int current = 0; // the current element we are looking at

// return whether or not there are more elements in the array that
// have not been iterated over.
public boolean hasNext() {
if (current < Array.this.values.length) {
return true;
} else {
return false;
}
}

// return the next element of the iteration and move the current
// index to the element after that.
public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return values[current++];
}
}

// Return the value at a given index
public T get(int index) {
return values[index];
}

// Set the value at a given index
public void set(int index, T value) {
values[index] = value;
}

// Return the length of the array
public int length() {
return values.length;
}

// Return an iterator over the elements in the array. This is generally not
// called directly, but is called by Java when used in a "simple" for loop.
public Iterator<T> iterator() {
return new ArrayIterator();
}

// This is just a sample program that can be run to show how the Array
// class might be used.
public static void main(String[] args) {
// create an array of strings
String[] strings = new String[]{"Hello", "World"};

// create a new array to hold these strings
Array<String> array = new Array<String>(strings);

// get and print the first values (prints "Hello")
System.out.println(array.get(0));

// set the second value
array.set(1, "Javaland!");

// iterate over the array, printing "Hello\nJavaland!"
for (String s : array) {
System.out.println(s);
}
}
}
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×