java.nio.ByteBuffer

Java NIO Buffers用于和NIO Channel交互。 我们从Channel中读取数据到buffers里,从Buffer把数据写入到Channels。Buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。java中java.nio.Buffer的常见实现类如下,不过我们这里只说一下ByteBuffer这个实现。

java中`java.nio.Buffer`的常见实现类

Buffer的重要属性

Buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据,为了便于理解,你可以把它理解为一个字节数组。它有有四个重要属性:

1
2
3
4
5
6
7
public abstract class Buffer {
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
}
  • capacity
    • 这个属性表示这个Buffer最多能放多少数据,在创建buffer的时候指定。int类型。
  • position
    下一个要读写的元素位置(从0开始),当使用buffer的相对位置进行读/写操作时,读/写会从这个下标进行,并在操作完成后,buffer会更新下标的值。
    • 写模式:当写入数据到Buffer的时候需要从一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.
    • 读模式:当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归0,每次读取后,position向后移动。
  • limit
    在Buffer上进行的读写操作都不能越过这个limit。
    • 写模式:limit的含义是我们所能写入的最大数据量,它等同于buffer的容量capacity
    • 读模式:limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。换句话说,您可以读取与写入数量相同的字节数。
  • mark
    • 一个临时存放的位置下标,用户选定的position的前一个位置或-1。
      • 调用mark()会将mark设为当前的position的值,以后调用reset()会将position属性设
        置为mark的值。mark的值总是小于等于position的值,如果将position的值设的比mark小,当前的mark值会被抛弃掉。

注:

  • position和limit之间的距离指示了可读/存的字节数。
  • boolean hasRemaining():当缓冲区至少还有一个元素时,返回true。
  • int remaining():position和limit之间字节个数。

这些属性总是满足以下条件:

0 <= mark <= position <= limit <= capacity

通过上面的描述可以看出,其中position和limit的具体含义取决于当前buffer的模式(读模式还是写模式)。capacity在两种模式下都表示容量。

Buffer的常见API

  • ByteBuffer allocate(int capacity)
    • 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器,实现类是HeapByteBuffer
  • ByteBuffer allocateDirect(int capacity)
    • 类似于allocate方法,不过使用的是对外内存,实现类是DirectByteBuffer
  • ByteBuffer wrap(byte[] array)
    • 把byte数组包装为ByteBuffer,bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方。其实ByteBuffer底层本来就有一个bytes数组负责来保存buffer缓冲区中的数据,通过allocate方法系统会帮你构造一个byte数组,实现类是HeapByteBuffer
  • ByteBuffer wrap(byte[] array, int offset, int length)
    • 在上一个方法的基础上可以指定偏移量和长度,buffer的capacity就是array.length,这个offset也就是包装后byteBuffer的position,limit=length+position(offset),mark=-1实现类是HeapByteBuffer
  • abstract Object array()
    • 返回支持此缓冲区的数组 (可选操作)
  • abstract int arrayOffset()
    • 返回此缓冲区中的第一个元素在缓冲区的底层实现数组中的偏移量(可选操作)。调用此方法之前要调用hasarray方法,以确保此缓冲区具有可访问的底层实现数组。
  • abstract boolean hasArray()
    • 告诉这个缓冲区是否由可访问的数组支持
  • int capacity()
    • 返回此缓冲区的容量
  • Buffer clear()
    • 清除此缓存区。将position = 0;limit = capacity;mark = -1;把position设为0,一般在把数据写入Buffer前调用。
  • Buffer flip()
    • flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。一般在从Buffer读出数据前调用。
  • abstract boolean isDirect()
    • 判断个缓冲区是否为direct, 也就是是否是对外内存
  • abstract boolean isReadOnly()
    • 判断告知这个缓冲区是否是只读的
  • int limit()
    • 返回此缓冲区的limit的属性值
  • Buffer position(int newPosition)
    • 设置这个缓冲区的位置
  • boolean hasRemaining()
    • return position < limit,返回是否还有未读内容
  • int remaining()
    • return limit - position; 返回limit和position之间相对位置差
  • Buffer rewind()
    • 把position设为0,mark设为-1,不改变limit的值,一般在把数据重写入Buffer前调用
  • Buffer mark()
    • 设置mark的值,mark=position,做个标记
  • Buffer reset()
    • 还原标记,position=mark。
  • ByteBuffer compact()
    • 该方法的作用是将 position 与 limit之间的数据复制到buffer的开始位置,与limit 之间没有数据的话发,就不会进行复制。一个例子如下:
1
2
3
4
5
6
7
8
9
10
11
12
例如:ByteBuffer.allowcate(10); 
内容:[0 ,1 ,2 ,3 4, 5, 6, 7, 8, 9]
## compact前
[0 ,1 ,2 , 3, 4, 5, 6, 7, 8, 9]
pos=4
lim=10
cap=10
## compact后
[4, 5, 6, 7, 8, 9, 6, 7, 8, 9]
pos=6
lim=10
cap=10
  • ByteBuffer slice();
    • 创建一个分片缓冲区。分片缓冲区与主缓冲区共享数据。 分配的起始位置是主缓冲区的position位置,容量为limit-position。 分片缓冲区无法看到主缓冲区positoin之前的元素。

创建buffer的注意点

创建ByteBuffer可以使用allocate或者wrap,就像下面这样:

  • ByteBuffer allocate(int capacity)
  • ByteBuffer allocateDirect(int capacity)
  • ByteBuffer wrap(int capacity)
  • ByteBuffer wrap(byte[] array,int offset,int length)

需要注意的是:创建的缓冲区都是定长的,大小无法改变。若发现刚创建的缓冲区容量太小,只能重新创建一个合适的

关于ByteBuffer wrap(byte[] array,int offset,int length)这个方法,这里再强调一下,这样创建的ByteBuffer的capacity和array的大小是一样的,buffer的position是offset, buffer的limit是offset+length,position之前和limit之后的数据依然可以访问到。例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Demo {
public static void main(String args[]){
byte arr[]=new byte[100];
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
System.out.println("Capacity is: "+buffer.capacity());
System.out.println("Position is: "+buffer.position());
System.out.println("limit is: "+buffer.limit());
}
}
//结果:
Capacity is: 100
Position is: 3
limit is: 28

allocate和wrap的区别

wrap只是简单地创建一个具有指向被包装数组的引用的缓冲区,该数组成为后援数组。对后援数组中的数据做的任何修改都将改变缓冲区中的数据,反之亦然。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String args[]){  
byte arr[]=new byte[100];
//将arr数组全部置为1
Arrays.fill(arr, (byte)1);
ByteBuffer buffer=ByteBuffer.wrap(arr,3,25);
//对后援数组中的数据做的任何修改都将改变缓冲区中的数据
arr[0]=(byte)2;
buffer.position(0);
System.out.println(buffer.get());
//在缓冲区上调用array()方法即可获得后援数组的引用。
System.out.println(Arrays.toString(buffer.array()));
}
//运行结果:
2
[2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ....]//总共100个元素

allocate则是创建了自己的后援数组,在缓冲区上调用array()方法也可获得后援数组的引用。通过调用arrayOffset()方法,可以获取缓冲区中第一个元素在后援数组的偏移量。但是使用wrap创建的ByteBuffer,调用arrayOffset永远是0。

1
2
3
4
5
6
7
8
9
10
11
12
13
 public static void main(String args[]){  
ByteBuffer buffer=ByteBuffer.allocate(100);
//对后援素组的修改也可以反映到buffer上
byte arr[]=buffer.array();
arr[1]=(byte)'a';
buffer.getInt();

System.out.println(Arrays.toString(buffer.array()));
System.out.println(buffer.arrayOffset());
}
//运行结果:
[0, 97, 0, 0, 0, 0, 0, 0,...]//总共100个元素
0

通过ByteBuffer allocateDirect(int capacity)创建的叫直接缓冲区,使用的堆外内存。可以通过isDirect()方法查看一个缓冲区是否是直接缓冲区。由于直接缓冲区是没有后援数组的,所以在其上面调用array()或arrayOffset()都会抛出UnsupportedOperationException异常。注意有些平台或JVM可能不支持这个创建直接缓冲区。

图解

put

写模式下,往buffer里写一个字节,并把postion移动一位。写模式下,一般limit与capacity相等。

bytebuffer-put

flip

写完数据,需要开始读的时候,将postion复位到0,并将limit设为当前postion。

bytebuffer-flip

get

从buffer里读一个字节,并把postion移动一位。上限是limit,即写入数据的最后位置。

bytebuffer-get

clear

将position置为0,并不清除buffer内容。

bytebuffer-clear

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class Demo {
private static final int SIZE = 1024;

public static void main(String[] args) throws Exception {
// 获取通道,该通道允许写操作
FileChannel fc = new FileOutputStream("data.txt").getChannel();
// 将字节数组包装到缓冲区中
fc.write(ByteBuffer.wrap("Some text".getBytes()));
// 关闭通道
fc.close();

// 随机读写文件流创建的管道
fc = new RandomAccessFile("data.txt", "rw").getChannel();
// fc.position()计算从文件的开始到当前位置之间的字节数
System.out.println("此通道的文件位置:" + fc.position());
// 设置此通道的文件位置,fc.size()此通道的文件的当前大小,该条语句执行后,通道位置处于文件的末尾
fc.position(fc.size());
// 在文件末尾写入字节
fc.write(ByteBuffer.wrap("Some more".getBytes()));
fc.close();

// 用通道读取文件
fc = new FileInputStream("data.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
// 将文件内容读到指定的缓冲区中
fc.read(buffer);
//此行语句一定要有, 如果没有,就是从文件最后开始读取的,当然读出来的都是byte=0时候的字符。通过buffer.flip();这个语句,就能把buffer的当前位置更改为buffer缓冲区的第一个位置
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char)buffer.get());
}
fc.close();
}
}

确保数据落盘

在之前的文章《unix IO模型》我们曾经提到过,用户空间内核空间缓存IO等概念。关于这些概念,大家可以阅读这篇文章,在本篇文章中,我们就不在涉及这些概念了。

IO缓冲机制

大家需要有一个认知就是我们平时写的程序,在将数据到文件中时,其实数据不会立马写入磁盘中进行持久化存储的,而是会经过层层缓存,如下图所示:

I/O buffering

其中这每层缓存都有自己的刷新时机,每层缓存都刷新后才会写入磁盘进行持久化存储。这些缓存的存在目的本意都是为了加速读写操作,因为如果每次读写都对应真实磁盘操作,那么读写的效率会大大降低。但是同样带来的坏处是如果期间发生掉电或者别的故障,还未写入磁盘的数据就丢失了。对于数据安全敏感的应用,比如数据库,比如交易程序,这是无法忍受的。所以操作系统提供了保证文件落盘的机制。

在上面这图中说明了操作系统到磁盘的数据流,以及经过的缓冲区。首先数据会先存在于应用的内存空间,如果调用库函数写入,库函数可能还会把数据缓存在库函数所维护的缓冲区空间中,比如C标准库stdio提供的方法就会进行缓存,目的是为了减少系统调用的次数。这两个缓存都是在用户空间中的。库函数缓存flush时,会调用write系统调用将数据写入内核空间,内核同样维护了一个页缓存(page cache),操作系统会在合适的时间把脏页的数据写入磁盘。即使是写入磁盘了,磁盘也可能维护了一个缓存,在这个时候掉电依然会丢失数据的,只有写入了磁盘的持久存储物理介质上,数据才是真正的落盘了,是安全的。

比如在网络套接字上侦听连接并将从每个客户端接收的数据写入文件的应用程序。 在关闭连接之前,服务器确保将接收到的数据写入稳定存储器,并向客户端发送此类确认,请看下面的简化代码(代码中已经注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int sock_read(int sockfd, FILE *outfp, size_t nrbytes)
{
int ret;
size_t written = 0;
//example of an application buffer
char *buf = malloc(MY_BUF_SIZE);

if (!buf)
return -1;

//take care of reading the data from the socket
//and writing it to the file stream
while (written < nrbytes) {
ret = read(sockfd, buf, MY_BUF_SIZE);
if (ret =< 0) {
if (errno == EINTR)
continue;
return ret;
}
written += ret;
ret = fwrite((void *)buf, ret, 1, outfp);
if (ret != 1)
return ferror(outfp);
}

//flush file stream, the data to move into the "Kernel Buffers" layer
ret = fflush(outfp);
if (ret != 0)
return -1;

//makethe data is saved to the "Stable Storage" layer
ret = fsync(fileno(outfp));
if (ret < 0)
return -1;
return 0;
}

在上面的这幅图中,可以看到数据流向经过了用户控件缓冲区内核缓存区,下面我们说说这2个缓存区。

用户空间缓冲区

用户空间的缓存分为:

  • 应用程序本身维护的缓冲区
  • 库维护的缓冲区

应用本身维护的缓冲区需要开发者自己刷新,调用库函数写入到库函数的缓冲区中(这一步可能不存在)。如果应用程序不依赖任何库函数,而是直接使用系统调用,那么则是把数据写入系统的缓冲区去。

库函数一般都会维护缓冲区,目的是简化应用程序的编写,应用程序就不需要编写维护缓冲区的代码,库维护的缓冲区针对那些没有应用程序本身维护的缓存区的程序来说,在某些时候是会提升不少的性能的,因为缓冲区大大减少了系统调用的次数,而系统调用是非常耗时的,系统调用涉及到用户态到内核态的切换,这个切换需要很多的步骤与校验,较为耗时。

比如C标准库stdio就维护着一个缓冲区,对应这个缓冲区,C标准库提供了fflush方法强制把缓冲区数据写入操作系统。

在Java的OutputStream接口提供了一个flush方法,具体的作用要看实现类的具体实现。BufferedOutputStream#flush就会把自己维护的缓冲区数据写入下一层的OutputStream。比如是new BufferedOutputStream(new FileOutputStream("/"))这样的模式,则调用BufferedOutputStream#flush会将数据写入操作系统。

内核缓冲区

应用程序直接或者通过库函数间接的使用系统调用write将数据写入操作系统缓冲区.UNIX系统在内核中设有高速缓存或页面高速缓存。目的是为了减少磁盘读写次数。

用户写入系统的数据先写入系统缓冲区,系统缓冲区写满后,将其排入输出队列,然后得到队首时,才进行实际的IO操作。这种输出方式被称为延迟写

UNIX系统提供了三个系统调用来执行刷新内核缓冲区:sync,fsync,fdatasync

sync

1
2
3
// sync() causes all pending modifications to filesystem metadata and
//cached file data to be written to the underlying filesystems.
void sync(void)

sync函数只是将所有修改过的块缓冲区排入输出队列就返回,并不等待实际的写磁盘操作返回。 操作系统的update系统守护进程会周期地调用sync函数,来保证系统中的数据能定期落盘。

根据sync(2) - Linux manual page的描述,Linux对sync的实现与POSIX规范不太一样,POSIX规范中,sync可能在文件真正落盘前就返回,而Linux的实现则是文件真正落盘后才会返回。所以Linux中,sync与fsync的效果是一样的!但是1.3.20之前的Linux存在BUG,导致sync并不会在真正落盘后返回。

fsync

1
void fsync(int filedes)

fsync对指定的文件起作用,它传输内核缓冲区中这个文件的数据到存储设备中,并阻塞直到存储设备响应说数据已经保存好了。

fsync对文件数据与文件元数据都有效。文件的元数据可以理解为文件的属性数据,比如文件的更新时间,访问时间,长度等。

fdatasync

1
void fdatasync(int filedes)

fdatasyncfsync类似,两者的区别是,fdatasync不一定需要刷新文件的元数据部分到存储设备。

是否需要刷新文件的元数据,是要看元数据的变化部分是否对之后的读取有影响,比如文件元数据的访问时间st_atime和修改时间st_mtime变化了,fdatasync不会去刷新元数据数据到存储设备,因为即使这个数据丢失了不一致了,也不影响故障恢复后的文件读取。但是如果文件的长度st_size变化了,那么就需要刷新元数据数据到存储设备。

所以如果你每次都更新文件长度,那么调用fsyncfdatasync的效果是一样的。

但是如果更新能做到不修改文件长度,那么fdatasync能比fsync少了一次磁盘写入,这个是非常大的速度提升。

open中的O_SYNC和O_DSYNC

除了上面三个系统调用,open系统调用在打开文件时,可以设置和同步相关的标志位:O_SYNCO_DSYNC

  • 设置O_SYNC的效果相当于是每次write后自动调用fsync。
  • 设置O_DSYNC的效果相当于是每次write后自动调用fdatasync。

关于新建文件

在一个文件上调用fsync/fdatasync只能保证文件本身的数据落盘,但是对于文件系统来说,目录中也保存着文件信息,fsync/fdatasync的调用并不会保证这部分的数据落盘。如果此时发生掉电,这个文件就无法被找到了。所以对于新建文件来说,还需要在父目录上调用fsync。

关于覆盖现有文件

覆盖现有文件时,如果发生掉电,新的数据是不会写入成功,但是可能会污染现有的数据,导致现有数据丢失。所以最佳实践是新建一个临时文件,写入成功后,再原子性替换原有文件。具体步骤:

  • 新建一个临时文件
  • 向临时文件写入数据
  • 对临时文件调用fsync,保证数据落盘。期间发生掉电对现有文件无影响。
  • 重命名临时文件为目标文件名
  • 对父目录调用fsync

存储设备缓冲区

存储设备为了提高性能,也会加入缓存。高级的存储设备能提供非易失性的缓存,比如有掉电保护的缓存。但是无法对所有设备做出这种保证,所以如果数据只是写入了存储设备的缓存的话,遇到掉电等故障,依然会导致数据丢失。

对于保证数据能保存到存储设备的持久化存储介质上,而不管设备本身是否有易失性缓存,操作系统提供了write barriers这个机制。开启了write barriers的文件系统,能保证调用fsync/fdatasync数据持久化保存,无论是否发生了掉电等其他故障,但是会导致性能下降。

许多文件系统提供了配置write barriers的功能。比如ext3, ext4, xfsbtrfsmount参数-o barrier表示开启写屏障,调用fsync/fdatasync能保证刷新存储设备的缓存到持久化介质上。-o nobarrier则表示关闭写屏障,调用fsync/fdatasync无法保证数据落盘。

Linux默认开启write barriers,所以默认情况下,我们调用fsync/fdatasync,就可以认为是文件真正的可靠落盘了

对于这个层面的数据安全保证来说,应用程序是不需要去考虑的,因为如果这台机器的硬盘被挂载为没有开启写屏障,那么可以认为这个管理员知道这个风险,他选择了更高的性能,而不是更高的安全性。

Java世界中的对应API

针对确保数据落盘,掉电也不丢失数据的情况,JDK也封装了对应的功能,并且为我们做好了跨平台的保证。

JDK中有三种方式可以强制文件数据落盘:

  • 调用FileDescriptor#sync函数
  • 调用FileChannel#force函数
  • 使用RandomAccessFilerws或者rwd模式打开文件

FileDescriptor#sync

FileDescriptor类提供了sync方法,可以用于保证数据保存到持久化存储设备后返回。使用方法:

1
2
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");
outputStream.getFD().sync();

可以看一下JDK是如何实现FileDescriptor#sync的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public native void sync() throws SyncFailedException;

// jdk/src/solaris/native/java/io/FileDescriptor_md.c
JNIEXPORT void JNICALL
Java_java_io_FileDescriptor_sync(JNIEnv *env, jobject this) {
// 获取文件描述符
FD fd = THIS_FD(this);
// 调用IO_Sync来执行数据同步
if (IO_Sync(fd) == -1) {
JNU_ThrowByName(env, "java/io/SyncFailedException", "sync failed");
}
}
// IO_Sync在UNIX系统上的定义就是fsync:
// jdk/src/solaris/native/java/io/io_util_md.h
#define IO_Sync fsync

FileChannel#force

之前的文章提到了,操作系统提供了fsync/fdatasync两个用户同步数据到持久化设备的系统调用,后者尽可能的会不同步文件元数据,来减少一次磁盘IO,提高性能。但是Java IO的FileDescriptor#sync只是对fsync的封装,JDK中没有对于fdatasync的封装,这是一个特性缺失。

Java NIO对这一点也做了增强,FileChannel类的force方法,支持传入一个布尔参数metaData,表示是否需要确保文件元数据落盘,如果为true,则调用fsync。如果为false,则调用fdatasync。

使用例子如下:

1
2
3
4
5
6
7
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");

// 强制文件数据与元数据落盘
outputStream.getChannel().force(true);

// 强制文件数据落盘,不关心元数据是否落盘
outputStream.getChannel().force(false);

在jdk中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class FileChannelImpl extends FileChannel {
private final FileDispatcher nd;
private final FileDescriptor fd;
private final NativeThreadSet threads = new NativeThreadSet(2);

public final boolean isOpen() {
return open;
}

private void ensureOpen() throws IOException {
if(!this.isOpen()) {
throw new ClosedChannelException();
}
}

// 布尔参数metaData用于指定是否需要文件元数据也确保落盘
public void force(boolean metaData) throws IOException {
// 确保文件是已经打开的
ensureOpen();
int rv = -1;
int ti = -1;
try {
begin();
ti = threads.add();

// 再次确保文件是已经打开的
if (!isOpen())
return;
do {
// 调用FileDispatcher#force
rv = nd.force(fd, metaData);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
} finally {
threads.remove(ti);
end(rv > -1);
assert IOStatus.check(rv);
}
}
}

实现中有许多线程同步相关的代码,不属于我们要关注的部分,就不分析了。FileChannel#force调用FileDispatcher#forceFileDispatcher是NIO内部实现用的一个类,封装了一些文件操作方法,其中包含了刷新文件的方法:

1
2
3
4
abstract class FileDispatcher extends NativeDispatcher {
abstract int force(FileDescriptor fd, boolean metaData) throws IOException;
// ...
}

FileDispatcher#force的实现:

1
2
3
4
5
6
class FileDispatcherImpl extends FileDispatcher {
int force(FileDescriptor fd, boolean metaData) throws IOException {
return force0(fd, metaData);
}
static native int force0(FileDescriptor fd, boolean metaData) throws IOException;
// ...

FileDispatcher#force的本地方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
jobject fdo, jboolean md)
{
// 获取文件描述符
jint fd = fdval(env, fdo);
int result = 0;

if (md == JNI_FALSE) {
// 如果调用者认为不需要同步文件元数据,调用fdatasync
result = fdatasync(fd);
} else {
#ifdef _AIX
/* On AIX, calling fsync on a file descriptor that is opened only for
* reading results in an error ("EBADF: The FileDescriptor parameter is
* not a valid file descriptor open for writing.").
* However, at this point it is not possibly anymore to read the
* 'writable' attribute of the corresponding file channel so we have to
* use 'fcntl'.
*/
int getfl = fcntl(fd, F_GETFL);
if (getfl >= 0 && (getfl & O_ACCMODE) == O_RDONLY) {
return 0;
}
#endif
// 如果调用者认为需要同步文件元数据,调用fsync
result = fsync(fd);
}
return handle(env, result, "Force failed");
}

可以看出,其实FileChannel#force就是简单的通过metaData参数来区分调用fsync和fdatasync。

同时在zookeeper的org.apache.zookeeper.common.AtomicFileOutputStream类中我们可以看到下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public void close() throws IOException {
boolean triedToClose = false, success = false;
try {
flush();
((FileOutputStream) out).getChannel().force(true);

triedToClose = true;
super.close();
success = true;
} finally {
if (success) {
boolean renamed = tmpFile.renameTo(origFile);
if (!renamed) {
// On windows, renameTo does not replace.
if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
throw new IOException(
"Could not rename temporary file " + tmpFile
+ " to " + origFile);
}
}
} else {
if (!triedToClose) {
// If we failed when flushing, try to close it to not leak
// an FD
IOUtils.closeStream(out);
}
// close wasn't successful, try to delete the tmp file
if (!tmpFile.delete()) {
LOG.warn("Unable to delete tmp file " + tmpFile);
}
}
}
}

RandomAccessFile结合rws/rwd模式

RandomAccessFile打开文件支持4中模式:

  • r 以只读方式打开。调用结果对象的任何 write 方法都将导致抛出 IOException。
  • rw打开以便读取和写入。如果该文件尚不存在,则尝试创建该文件。
  • rws 打开以便读取和写入,对于rws,还要求对文件的内容或元数据的每个更新都同步写入到底层存储设备。
  • rwd 打开以便读取和写入,对于rwd,还要求对文件内容的每个更新都同步写入到底层存储设备。

其中rws模式会在open文件时传入O_SYNC标志位。rwd模式会在open文件时传入O_DSYNC标志位。

RandomAccessFile源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 4个标志位,用于组合表示4种模式
private static final int O_RDONLY = 1;
private static final int O_RDWR = 2;
private static final int O_SYNC = 4;
private static final int O_DSYNC = 8;

public RandomAccessFile(File file, String mode)
throws FileNotFoundException
{
String name = (file != null ? file.getPath() : null);
int imode = -1;
// 只读模式
if (mode.equals("r"))
imode = O_RDONLY;
else if (mode.startsWith("rw")) {
// 读写模式
imode = O_RDWR;
rw = true;

// 读写模式下,可以结合O_SYNC和O_DSYNC标志
if (mode.length() > 2) {
if (mode.equals("rws"))
imode |= O_SYNC;
else if (mode.equals("rwd"))
imode |= O_DSYNC;
else
imode = -1;
}
}
if (imode < 0)
throw new IllegalArgumentException("Illegal mode \"" + mode
+ "\" must be one of "
+ "\"r\", \"rw\", \"rws\","
+ " or \"rwd\"");
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkRead(name);
if (rw) {
security.checkWrite(name);
}
}
if (name == null) {
throw new NullPointerException();
}
if (file.isInvalid()) {
throw new FileNotFoundException("Invalid file path");
}
// 新建文件描述符
fd = new FileDescriptor();
fd.attach(this);
path = name;
open(name, imode);
}

private void open(String name, int mode)
throws FileNotFoundException {
open0(name, mode);
}

private native void open0(String name, int mode)
throws FileNotFoundException;

其中open0的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// jdk/src/share/native/java/io/RandomAccessFile.c
JNIEXPORT void JNICALL
Java_java_io_RandomAccessFile_open0(JNIEnv *env,
jobject this, jstring path, jint mode)
{
int flags = 0;
// JAVA中的标志位与操作系统标志位转换
if (mode & java_io_RandomAccessFile_O_RDONLY)
flags = O_RDONLY;
else if (mode & java_io_RandomAccessFile_O_RDWR) {
flags = O_RDWR | O_CREAT;
if (mode & java_io_RandomAccessFile_O_SYNC)
flags |= O_SYNC;
else if (mode & java_io_RandomAccessFile_O_DSYNC)
flags |= O_DSYNC;
}

// 调用fileOpen打开函数
fileOpen(env, this, path, raf_fd, flags);
}

fileOpen之后的流程与FileInputStream的一致。可以看出,相比于FileInputStream固定使用O_RDONLYFileOutputStream固定使用O_WRONLY | O_CREATRandomAccessFile提供了在Java中指定打开模式的能力。

但是同时我们需要清除,rwsrwd的效率比rw低非常非常多,因为每次读写都需要刷到磁盘才会返回,这两个中rwdrws效率高一些,因为rwd只刷新文件内容,rws刷新文件内容与元数据,文件的元数据就是文件更新时间等信息。

原子性的重命名文件

在java中的File类的renameTo方法,提供了重命名文件的功能。但是需要注意的是这个方法并不能保证原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Renames the file denoted by this abstract pathname.
*
* <p> Many aspects of the behavior of this method are inherently
* platform-dependent: The rename operation might not be able to move a
* file from one filesystem to another, it might not be atomic, and it
* might not succeed if a file with the destination abstract pathname
* already exists. The return value should always be checked to make sure
* that the rename operation was successful.
*
* <p> Note that the {@link java.nio.file.Files} class defines the {@link
* java.nio.file.Files#move move} method to move or rename a file in a
* platform independent manner.
*
* @param dest The new abstract pathname for the named file
*
* @return <code>true</code> if and only if the renaming succeeded;
* <code>false</code> otherwise
*
* @throws SecurityException
* If a security manager exists and its <code>{@link
* java.lang.SecurityManager#checkWrite(java.lang.String)}</code>
* method denies write access to either the old or new pathnames
*
* @throws NullPointerException
* If parameter <code>dest</code> is <code>null</code>
*/
public boolean renameTo(File dest) {

因此如果想原子性的重命名和移动文件,我们应该使用java.nio.file.Files类中的move方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Move or rename a file to a target file.
*
* <p> By default, this method attempts to move the file to the target
* file, failing if the target file exists except if the source and
* target are the {@link #isSameFile same} file, in which case this method
* has no effect. If the file is a symbolic link then the symbolic link
* itself, not the target of the link, is moved. This method may be
* invoked to move an empty directory. In some implementations a directory
* has entries for special files or links that are created when the
* directory is created. In such implementations a directory is considered
* empty when only the special entries exist. When invoked to move a
* directory that is not empty then the directory is moved if it does not
* require moving the entries in the directory. For example, renaming a
* directory on the same {@link FileStore} will usually not require moving
* the entries in the directory. When moving a directory requires that its
* entries be moved then this method fails (by throwing an {@code
* IOException}). To move a <i>file tree</i> may involve copying rather
* than moving directories and this can be done using the {@link
* #copy copy} method in conjunction with the {@link
* #walkFileTree Files.walkFileTree} utility method.
*
* <p> The {@code options} parameter may include any of the following:
*
* <table border=1 cellpadding=5 summary="">
* <tr> <th>Option</th> <th>Description</th> </tr>
* <tr>
* <td> {@link StandardCopyOption#REPLACE_EXISTING REPLACE_EXISTING} </td>
* <td> If the target file exists, then the target file is replaced if it
* is not a non-empty directory. If the target file exists and is a
* symbolic link, then the symbolic link itself, not the target of
* the link, is replaced. </td>
* </tr>
* <tr>
* <td> {@link StandardCopyOption#ATOMIC_MOVE ATOMIC_MOVE} </td>
* <td> The move is performed as an atomic file system operation and all
* other options are ignored. If the target file exists then it is
* implementation specific if the existing file is replaced or this method
* fails by throwing an {@link IOException}. If the move cannot be
* performed as an atomic file system operation then {@link
* AtomicMoveNotSupportedException} is thrown. This can arise, for
* example, when the target location is on a different {@code FileStore}
* and would require that the file be copied, or target location is
* associated with a different provider to this object. </td>
* </table>
*
* <p> An implementation of this interface may support additional
* implementation specific options.
*
* <p> Moving a file will copy the {@link
* BasicFileAttributes#lastModifiedTime last-modified-time} to the target
* file if supported by both source and target file stores. Copying of file
* timestamps may result in precision loss. An implementation may also
* attempt to copy other file attributes but is not required to fail if the
* file attributes cannot be copied. When the move is performed as
* a non-atomic operation, and an {@code IOException} is thrown, then the
* state of the files is not defined. The original file and the target file
* may both exist, the target file may be incomplete or some of its file
* attributes may not been copied from the original file.
*
* <p> <b>Usage Examples:</b>
* Suppose we want to rename a file to "newname", keeping the file in the
* same directory:
* <pre>
* Path source = ...
* Files.move(source, source.resolveSibling("newname"));
* </pre>
* Alternatively, suppose we want to move a file to new directory, keeping
* the same file name, and replacing any existing file of that name in the
* directory:
* <pre>
* Path source = ...
* Path newdir = ...
* Files.move(source, newdir.resolve(source.getFileName()), REPLACE_EXISTING);
* </pre>
*
* @param source
* the path to the file to move
* @param target
* the path to the target file (may be associated with a different
* provider to the source path)
* @param options
* options specifying how the move should be done
*
* @return the path to the target file
*
* @throws UnsupportedOperationException
* if the array contains a copy option that is not supported
* @throws FileAlreadyExistsException
* if the target file exists but cannot be replaced because the
* {@code REPLACE_EXISTING} option is not specified <i>(optional
* specific exception)</i>
* @throws DirectoryNotEmptyException
* the {@code REPLACE_EXISTING} option is specified but the file
* cannot be replaced because it is a non-empty directory
* <i>(optional specific exception)</i>
* @throws AtomicMoveNotSupportedException
* if the options array contains the {@code ATOMIC_MOVE} option but
* the file cannot be moved as an atomic file system operation.
* @throws IOException
* if an I/O error occurs
* @throws SecurityException
* In the case of the default provider, and a security manager is
* installed, the {@link SecurityManager#checkWrite(String) checkWrite}
* method is invoked to check write access to both the source and
* target file.
*/
public static Path move(Path source, Path target, CopyOption... options)
throws IOException

其中参数中的CopyOption可选性有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package java.nio.file;

/**
* Defines the standard copy options.
*
* @since 1.7
*/

public enum StandardCopyOption implements CopyOption {
/**
* Replace an existing file if it exists.
*/
REPLACE_EXISTING,
/**
* Copy attributes to the new file.
*/
COPY_ATTRIBUTES,
/**
* Move the file as an atomic file system operation.
*/
ATOMIC_MOVE;
}

我们看看kafka中怎么使用的,在kafka的org.apache.kafka.common.utils中有下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
try {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException outer) {
try {
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to "
+ outer.getMessage());
} catch (IOException inner) {
inner.addSuppressed(outer);
throw inner;
}
}
}

参考资料

unix IO模型

在之前的文章《理解同步、异步、阻塞和非阻塞》我们谈了一下关于同步、异步、阻塞和非阻塞的理解。这篇文章,我打算来谈谈unix的io模型,其中会涉及到下面的内容:

  • 阻塞 I/O(blocking IO)
  • 非阻塞 I/O(nonblocking IO)
  • I/O 多路复用( IO multiplexing)
  • 异步 I/O(asynchronous IO)
  • 信号驱动式IO模型(signal-driven IO model)

背景知识

在开始正式的介绍unix的io模型之前,我们需要科普一些背景知识,便于大家正确的理解unix io模型。

同步、异步、阻塞和非阻塞

这些概念请查看我之前的文章《理解同步、异步、阻塞和非阻塞》

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

用户空间(user space)与内核空间(kernel space)

学习 Linux 时,经常可以看到两个词:User space(用户空间)和 Kernel space(内核空间)。

用户空间(user space)与内核空间(kernel space)

简单说,Kernel space 是 Linux 内核的运行空间,User space 是用户程序的运行空间。为了安全,它们是隔离的,即使用户的程序崩溃了,内核也不受影响。

Kernel space 可以执行任意命令,调用系统的一切资源;User space 只能执行简单的运算,不能直接调用系统资源,必须通过系统接口(又称 system call),才能向内核发出指令。

1
2
3
4
str = "my string" // 用户空间
x = x + 2 // 用户空间
file.write(str) // 切换到内核空间
y = x + 4 // 切换回用户空间

上面代码中,第一行和第二行都是简单的赋值运算,在 User space 执行。第三行需要写入文件,就要切换到 Kernel space,因为用户不能直接写文件,必须通过内核安排。第四行又是赋值运算,就切换回 User space。

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。进程之间的切换其实是需要耗费cpu时间的。

缓存 I/O

缓存I/O又被称作标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,数据先从磁盘复制到内核空间的缓冲区,然后从内核空间缓冲区复制到应用程序的地址空间。

  • 读操作:操作系统检查内核的缓冲区有没有需要的数据,如果已经缓存了,那么就直接从缓存中返回;否则从磁盘中读取,然后缓存在操作系统的缓存中。
  • 写操作:将数据从用户空间复制到内核空间的缓存中。这时对用户程序来说写操作就已经完成,至于什么时候再写到磁盘中由操作系统决定,除非显示地调用了sync同步命令

缓存I/O的优点:

  • 在一定程度上分离了内核空间和用户空间,保护系统本身的运行安全;
  • 可以减少物理读盘的次数,从而提高性能。

缓存I/O的缺点:

  • 在缓存I/O机制中,DMA方式可以将数据直接从磁盘读到页缓存中,或者将数据从页缓存直接写回到磁盘上,而不能直接在应用程序地址空间和磁盘之间进行数据传输,这样,数据在传输过程中需要在应用程序地址空间(用户空间)和缓存(内核空间)之间进行多次数据拷贝操作,这些数据拷贝操作所带来的CPU以及内存开销是非常大的。

因为这个原因的存在,所以又设计到zero copy技术。关于zero copy这个内容,可以参考我之前写的文章《java中的zero copy》

unix IO模型

在linux中,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:

  • 等待数据准备就绪 (Waiting for the data to be ready)
  • 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正式因为这两个阶段,linux系统产生了下面五种网络模式的方案:

  • 阻塞式IO模型(blocking IO model)
  • 非阻塞式IO模型(noblocking IO model)
  • IO复用式IO模型(IO multiplexing model)
  • 信号驱动式IO模型(signal-driven IO model)
  • 异步IO式IO模型(asynchronous IO model)

下面我们来分别谈一下这些IO模型

阻塞式IO模型(blocking IO model)

在linux中,默认情况下所有的IO操作都是blocking,一个典型的读操作流程大概是这样:

blocking Io model

当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据(对于网络IO来说,很多时候数据在一开始还没有到达。比如,还没有收到一个完整的UDP包。这个时候kernel就要等待足够的数据到来),而数据被拷贝到操作系统内核的缓冲区中是需要一个过程的,这个过程需要等待。而在用户进程这边,整个进程会被阻塞(当然,是进程自己选择的阻塞)。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户空间的缓冲区以后,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。

所以:blocking IO的特点就是在IO执行的下两个阶段的时候都被block了

  • 等待数据准备就绪 (Waiting for the data to be ready) 「阻塞」
  • 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process) 「阻塞」

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。通过java可以这么操作:

1
2
3
4
5
6
InetAddress host = InetAddress.getByName("localhost");
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(hos1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

比较完善的例子请看《What Is Non-blocking Socket Programming in Java?》

socket设置为 NONBLOCK(非阻塞)就是告诉内核,当所请求的I/O操作无法完成时,不要将进程睡眠,而是返回一个错误码(EWOULDBLOCK) ,这样请求就不会阻塞。

当对一个non-blocking socket执行读操作时,流程是这个样子:

nonblocking io model

当用户进程调用了recvfrom这个系统调用,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个EWOULDBLOCK error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个EWOULDBLOCK error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户空间缓冲区,然后返回。

可以看到,I/O 操作函数将不断的测试数据是否已经准备好,如果没有准备好,继续轮询,直到数据准备好为止。整个 I/O 请求的过程中,虽然用户线程每次发起 I/O 请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的 CPU 的资源。

所以,non blocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有

  • 等待数据准备就绪 (Waiting for the data to be ready) 「非阻塞」
  • 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process) 「阻塞」

一般很少直接使用这种模型,而是在其他 I/O 模型中使用非阻塞 I/O 这一特性。这种方式对单个 I/O 请求意义不大,但给 I/O 多路复用铺平了道路.

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们常说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这些个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

IO multiplexing model

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回

这个图和blocking IO的图其实并没有太大的不同,事实上因为IO多路复用多了添加监视 socket,以及调用 select 函数的额外操作,效率更差。还更差一些。因为这里需要使用两个system call (selectrecvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,但是,使用 select 以后最大的优势是用户可以在一个线程内同时处理多个 socket 的 I/O 请求。用户可以注册多个 socket,然后不断地调用 select 读取被激活的 socket,即可达到在同一个线程内同时处理多个 I/O 请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

因此对于IO多路复用模型来说:

  • 等待数据准备就绪 (Waiting for the data to be ready) 「阻塞」
  • 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process) 「阻塞」

异步 I/O(asynchronous IO)

接下来我们看看linux下的asynchronous IO的流程:

asynchronous io model

用户进程发起aio_read调用之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它发现一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

异步 I/O 模型使用了 Proactor 设计模式实现了这一机制。

因此对异步IO模型来说:

  • 等待数据准备就绪 (Waiting for the data to be ready) 「非阻塞」
  • 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process) 「非阻塞」

信号驱动式IO模型(signal-driven IO model)

首先我们允许 socket 进行信号驱动 I/O,并安装一个信号处理函数,进程继续运行并不阻塞。当数据准备好时,进程会收到一个SIGIO信号,可以在信号处理函数中调用 I/O 操作函数处理数据。

signal-driven IO

但是这种IO模确用的不多,所以我这里也就不详细提它了。

总结

blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,会block2个阶段,而non-blocking IO在kernel还准备数据的情况下会立刻返回,只会block第二个阶段

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:

  • A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
  • An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO

有人会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。

而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

因此我们会得出下面的分类:

  • 同步IO (synchronous IO)
    • blocking IO model
    • non-blocking IO model
    • IO multiplexing model
  • 异步IO (asynchronous IO)
    • asynchronous IO model

各个IO Model的比较

各个IO Model的比较

前四种模型的区别是阶段1不相同,阶段2基本相同,都是将数据从内核拷贝到调用者的缓冲区。而异步 I/O 的两个阶段都不同于前四个模型。同步 I/O 操作引起请求进程阻塞,直到 I/O 操作完成。异步 I/O 操作不引起请求进程阻塞。

同时通过上面的图片,可以发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

参考资料

理解同步、异步、阻塞和非阻塞

关于同步、异步、阻塞和非阻塞这个概念性问题,这可能是非常容易混淆的概念之一,特别是那些刚开始解除网络编程的人来说。本篇文章争取来说清楚这个问题,如果有错误之处,恳请批评指正。

写在前面

首先大家心中需要有以下的清晰认知:

  • 阻塞操作不等于同步(blocking operation does NOT equal to synchronous)
  • 非阻塞操作不等于异步(non-blocking operation does NOT equal to asynchronous)

事实上,同步异步于阻塞和非阻塞没有什么直接的关联关系。

同步和异步

同步和异步关注的是 通信机制 (communication mechanism)

  • 同步是指在发出一个function调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到调用结果了。这个结果可能是一个正确的期望结果,也可能是因为异常原因(比如超时)导致的失败结果。换句话说,就是由调用者主动等待这个调用的结果。

Synchronous is, when we started a function call, the call will not return anything until it gets the result, the function needs to finish everything before it can give anything to us.

  • 异步调用在发出之后,本次调用过程就直接返回了,并没有同时没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态变化、事件通知等机制来通知调用者,或通过回调函数处理这个调用。

Asynchronous does not need to wait for the function completes its operation, once we call it, it returns immediately without any result, the function uses callback function (or other notification method) to “notify” us to get the value after it completes execution.

阻塞和非阻塞

阻塞和非阻塞关注的是 程序在等待调用结果(消息、返回值)时的状态.

Unlike synchronous/asynchronous, blocking/non-blocking focuses on the status of the program while waiting for the result from the function call.

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回

A blocking operation hangs up current thread before it gets the result, in other words, a blocking operation will let the current thread wait for the result returns, even if the target function will use a callback function to notify client side to fetch the result, the thread on the client side will still be blocked until it gets the returned result.

  • 非阻塞是指在不能立刻得到结果之前,该调用不会阻塞当前线程。

the non-blocking operation will not hang up the current thread if no result returned immediately.

对unix操作系统来讲:

  • 阻塞式I/O(默认),非阻塞式I/O(nonblock),I/O复用(select/poll/epoll)都属于同步I/O,因为它们在操作系统将数据由内核缓冲区复制到用户空间缓冲区时都是阻塞的(不能干别的事)。
  • 只有异步I/O模型(AIO)是符合异步I/O操作的含义的,即在数据准备完成以后,由内核空间拷贝回用户缓冲区后再通知通知用户进程,而用户进程在等待通知的这段时间里可以干别的事。

前几天看到了下面的代码片段,我觉的写的挺好的,复制一下:

http://7niucdn.wenchao.ren/20190329000237.png

接下来我会写一篇介绍unix IO模型的文章,在这篇文章中,我会具体谈谈不同的IO模型。

参考资料

集群调用容错的套路

在日常的工作和系统设计中,我们经常会使用RPC调用,而我们所部署的服务一般也都是集群模式。我们知道在分布式系统架构中,因为有很多的可能性,比如服务发布重启,网络抖动等问题,都可能会导致RPC调用失败,一般情况下我们的集群调用设计都需要有一定的容错策略。本篇文章就总结一下常见的集群调用容错套路:

  • Failover Cluster
  • Failfast Cluster
  • Failsafe Cluster
  • Failback Cluster
  • Forking Cluster
  • Broadcast Cluster

Failover Cluster

Failover Cluster模式就是 失败自动切换,当出现失败,重试其它服务器,这种一般通常用于幂等操作,比如读操作,但重试会带来更长延迟。一般实现这种模式的时候,需要注意的是重试的时候优先剔除刚刚出问题的节点,优先选择其余节点。

Failfast Cluster

Failfast Cluster是快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster

Failfast Cluster是失败安全,出现异常时,直接忽略,就是fire and forget。比如一些场景下写入审计日志等操作,失败了也就失败了,可以忍受。

Failback Cluster

Failback Cluster是失败自动恢复,异步记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster

Forking Cluster 并行调用多个服务器,只要其中一个成功即返回。这种通常用于实时性要求较高的读操作,但需要浪费更多服务资源。

Broadcast Cluster

Broadcast Cluster是广播调用。就是广播请求到所有提供者,逐个调用,任意一台报错则报错,通常用于通知所有提供者更新缓存或日志等本地资源信息。

java中的zero copy

在web应用程序中,我们经常会在server和client之间传输数据。比如server发数据给client,server首先将数据从硬盘读出之后,然后原封不动的通过socket传输给client,大致原理如下:

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

下面的例子展示了传统的数据复制实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class TraditionalClient {


public static void main(String[] args) {

int port = 2000;
String server = "localhost";
Socket socket = null;
String lineToBeSent;

DataOutputStream output = null;
FileInputStream inputStream = null;
int ERROR = 1;


// connect to server
try {
socket = new Socket(server, port);
System.out.println("Connected with server " +
socket.getInetAddress() +
":" + socket.getPort());
}
catch (UnknownHostException e) {
System.out.println(e);
System.exit(ERROR);
}
catch (IOException e) {
System.out.println(e);
System.exit(ERROR);
}

try {
String fname = "sendfile/NetworkInterfaces.c";
inputStream = new FileInputStream(fname);

output = new DataOutputStream(socket.getOutputStream());
long start = System.currentTimeMillis();
byte[] b = new byte[4096];
long read = 0, total = 0;
while ((read = inputStream.read(b)) >= 0) {
total = total + read;
output.write(b);
}
System.out.println("bytes send--" + total + " and totaltime--" + (System.currentTimeMillis() - start));
}
catch (IOException e) {
System.out.println(e);
}

try {
output.close();
socket.close();
inputStream.close();
}
catch (IOException e) {
System.out.println(e);
}
}
}

这种操作看起来可能不会怎么消耗CPU,但是实际上它是低效的。因为传统的 Linux 操作系统的标准 I/O 接口是基于数据拷贝操作的,即 I/O 操作会导致数据在操作系统内核地址空间的缓冲区和应用程序地址空间定义的缓冲区之间进行传输。如下图:

Traditional data copying approach

  • 数据首先被从磁盘读取到内核的read buffer
  • 然后在从内核的read buffer中复制到应用程序的buffer中
  • 然后在从应用程序的buffer中复制到内核的socket buffer
  • 最后在从内核的socket buffer中复制到网卡中

然后其中涉及了4次上下文切换:

Traditional context switches

分析上面的描述,我们可以看到kernel buffer其实在这个过程中充当了一个ahead cache。之所以引入这个kernel buffer其实是在很多的情况下是可以减少磁盘 I/O 的操作,进而提升效率的。

  • 比如对于读请求,如果我们所请求的数据的大小小于kernel buffer并且如果要读取的数据已经存放在操作系统的高速缓冲存储器中,那么就不需要再进行实际的物理磁盘 I/O 操作。直接从kernel buffer中读取就好了。
  • 对于写请求:利用这个ahead cache可以实现异步写操作

但是没有银弹,这样会带来一个问题就是,如果当我们请求的数据量的大小远大于kernel buffer的大小的话,这种情况下kernel buffer的存在反而会导致数据在kernel buffer用户缓冲区之间多次复制。

java中的zero copy

如果我们想在java中使用zero copy,我们一般会用java.nio.channels.FileChannel类中的transferTo()方法。下面是它的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* Transfers bytes from this channel's file to the given writable byte
* channel.
*
* <p> An attempt is made to read up to <tt>count</tt> bytes starting at
* the given <tt>position</tt> in this channel's file and write them to the
* target channel. An invocation of this method may or may not transfer
* all of the requested bytes; whether or not it does so depends upon the
* natures and states of the channels. Fewer than the requested number of
* bytes are transferred if this channel's file contains fewer than
* <tt>count</tt> bytes starting at the given <tt>position</tt>, or if the
* target channel is non-blocking and it has fewer than <tt>count</tt>
* bytes free in its output buffer.
*
* <p> This method does not modify this channel's position. If the given
* position is greater than the file's current size then no bytes are
* transferred. If the target channel has a position then bytes are
* written starting at that position and then the position is incremented
* by the number of bytes written.
*
* <p> This method is potentially much more efficient than a simple loop
* that reads from this channel and writes to the target channel. Many
* operating systems can transfer bytes directly from the filesystem cache
* to the target channel without actually copying them. </p>
*
* @param position
* The position within the file at which the transfer is to begin;
* must be non-negative
*
* @param count
* The maximum number of bytes to be transferred; must be
* non-negative
*
* @param target
* The target channel
*
* @return The number of bytes, possibly zero,
* that were actually transferred
*
* @throws IllegalArgumentException
* If the preconditions on the parameters do not hold
*
* @throws NonReadableChannelException
* If this channel was not opened for reading
*
* @throws NonWritableChannelException
* If the target channel was not opened for writing
*
* @throws ClosedChannelException
* If either this channel or the target channel is closed
*
* @throws AsynchronousCloseException
* If another thread closes either channel
* while the transfer is in progress
*
* @throws ClosedByInterruptException
* If another thread interrupts the current thread while the
* transfer is in progress, thereby closing both channels and
* setting the current thread's interrupt status
*
* @throws IOException
* If some other I/O error occurs
*/
public abstract long transferTo(long position, long count,
WritableByteChannel target)
throws IOException;

transferTo()方法把数据从file channel传输到指定的writable byte channel。它需要底层的操作系统支持zero copy。在UNIX和各种Linux中,会执行系统调用sendfile(),该命令把数据从一个文件描述符传输到另一个文件描述符(Linux中万物皆文件):

1
2
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

因此传统的方式中的

1
2
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

可以被transferTo()替代。下面的图展示了使用transferTo(), 也就是zero copy技术后的流程:

 Data copy with transferTo()

Context switching with transferTo()

  • transferTo()方法使文件内容被DMA引擎复制到读缓冲区中。 然后,内核将数据复制到与输出套接字关联的内核缓冲区中。
  • 第三个副本发生在DMA引擎将数据从内核套接字缓冲区传递到协议引擎时。

这是一个很明显的进步:我们把context switch的次数从4次减少到了2次,同时也把data copy的次数从4次降低到了3次(而且其中只有一次占用了CPU,另外两次由DMA完成)。但是,要做到zero copy,这还差得远。

如果网卡支持gather operation,我们可以通过kernel进一步减少数据的拷贝操作。在2.4及以上版本的linux内核中,开发者修改了socket buffer descriptor来适应这一需求。这个方法不仅减少了context switch,还消除了和CPU有关的数据拷贝。使用层面的使用方法没有变,但是内部原理却发生了变化:

Data copies when transferTo() and gather operations are used

  • transferTo()方法使文件内容被DMA引擎复制到内核缓冲区中。
  • 没有数据被复制到套接字缓冲区中。 相反,只有具有有关数据位置和长度信息的描述符才会附加到套接字缓冲区。 DMA引擎将数据直接从内核缓冲区传递到协议引擎,从而消除了剩余的最终CPU副本。

下面这个例子展示了如何使用transferTo()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class TransferToClient {

public static void main(String[] args) throws IOException{
TransferToClient sfc = new TransferToClient();
sfc.testSendfile();
}
public void testSendfile() throws IOException {
String host = "localhost";
int port = 9026;
SocketAddress sad = new InetSocketAddress(host, port);
SocketChannel sc = SocketChannel.open();
sc.connect(sad);
sc.configureBlocking(true);

String fname = "sendfile/NetworkInterfaces.c";
long fsize = 183678375L, sendzise = 4094;

// FileProposerExample.stuffFile(fname, fsize);
FileChannel fc = new FileInputStream(fname).getChannel();
long start = System.currentTimeMillis();
long nsent = 0, curnset = 0;
curnset = fc.transferTo(0, fsize, sc);
System.out.println("total bytes transferred--"+curnset+" and time taken in MS--"+(System.currentTimeMillis() - start));
//fc.close();
}
}

参考资料

linux命令-grep

常见参数说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
grep [OPTIONS] PATTERN [FILE...]
grep [OPTIONS] [-e PATTERN]... [-f FILE]... [FILE...]

OPTIONS:
-e: 使用正则搜索
-i: 不区分大小写
-v: 查找不包含指定内容的行
-w: 按单词搜索
-c: 统计匹配到的次数
-n: 显示行号
-r: 逐层遍历目录查找
-A: 显示匹配行及前面多少行, 如: -A3, 则表示显示匹配行及前3行
-B: 显示匹配行及后面多少行, 如: -B3, 则表示显示匹配行及后3行
-C: 显示匹配行前后多少行, 如: -C3, 则表示显示批量行前后3行
--color: 匹配到的内容高亮显示
--include: 指定匹配的文件类型
--exclude: 过滤不需要匹配的文件类型

常见用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#多文件查询
grep leo logs.log logs_back.log

#查找即包含leo又包含li的行
grep leo logs.log | grep li

#查找匹配leo或者匹配li的行
grep leo | li logs.log

#显示匹配行前2行
grep leo logs.log -A2

#显示匹配行后2行
grep leo logs.log -B2

#显示匹配行前后2行
grep leo logs.log -C2

#不区分大小写
grep -i leo logs.log

#使用正则表达式
grep -e '[a-z]\{5\}' logs.log

#查找不包含leo的行
grep -v leo logs.log

#统计包含leo的行数
grep -c leo logs.log

#遍历当前目录及所有子目录查找匹配leo的行
grep -r leo .

#在当前目录及所有子目录查找所有java文件中查找leo
grep -r leo . --include "*.java"

#在搜索结果中排除所有README文件
grep "main()" . -r --exclude "README"

#查找并输出到指定文件
grep leo logs.log > result.log

#查找以leo开头的行
grep ^leo logs.log

#查找以leo结尾的行
grep leo$ logs.log

#查找空行
grep ^$ logs.log

mysql binlog初步介绍

binlog 即二进制日志,它记录了数据库上的所有改变,并以二进制的形式保存在磁盘中;
它可以用来查看数据库的变更历史、数据库增量备份和恢复、Mysql的复制(主从数据库的复制)。

mysql binlog解析

binlog有三种格式:

  • Statement 基于SQL语句的复制(statement-based replication,SBR),
  • Row 基于行的复制(row-based replication,RBR),
  • Mixed 混合模式复制(mixed-based replication,MBR)。

在我这边mysql 5.7.20版本中默认是使用Row的, 而且默认情况下没有开启binlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
mysql> select version();
+-----------+
| version() |
+-----------+
| 5.7.20 |
+-----------+
1 row in set (0.00 sec)

mysql>
mysql>
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
1 row in set (0.00 sec)

mac环境下开启mysql的binlog

编辑my.cnf文件,对于我来说就是/usr/local/etc/my.cnf文件,在其中增加下面的内容:

1
2
3
log-bin = /Users/rollenholt/Downloads/mysql/binlog
binlog-format = ROW
server_id = 1

然后brew services restart mysql重启mysql,接下来我们就可以验证mysql的binlog已经开启了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
mysql> show variables like 'log_bin'
-> ;
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.00 sec)

mysql>
mysql> show binary logs;
+---------------+-----------+
| Log_name | File_size |
+---------------+-----------+
| binlog.000001 | 177 |
| binlog.000002 | 177 |
| binlog.000003 | 177 |
| binlog.000004 | 154 |
+---------------+-----------+
4 rows in set (0.00 sec)

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000004 | 154 | | | |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

mysql> show master logs;
+---------------+-----------+
| Log_name | File_size |
+---------------+-----------+
| binlog.000001 | 177 |
| binlog.000002 | 177 |
| binlog.000003 | 177 |
| binlog.000004 | 154 |
+---------------+-----------+
4 rows in set (0.00 sec)

同时查看我们配置的log-bin属性对于的目录下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
~/Downloads/mysql » ls
binlog.000001 binlog.000002 binlog.000003 binlog.000004 binlog.index
------------------------------------------------------------
~/Downloads/mysql » cat binlog.index
/Users/rollenholt/Downloads/mysql/binlog.000001
/Users/rollenholt/Downloads/mysql/binlog.000002
/Users/rollenholt/Downloads/mysql/binlog.000003
/Users/rollenholt/Downloads/mysql/binlog.000004
------------------------------------------------------------
~/Downloads/mysql » cat binlog.000001
_binR �[w{5.7.20-logR �[8


**4SVѶR �[#��_]�hu �[����% ------------------------------------------------------------
~/Downloads/mysql »

binlog相关的基本命令:

  • 查看是否开启binlog show variables like 'log_bin'
  • 获取binlog文件列表 show binary logs
  • 查看master上的binlog show master logs
  • 只查看第一个binlog文件的内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    mysql> show binlog events;
    +---------------+-----+----------------+-----------+-------------+---------------------------------------+
    | Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
    +---------------+-----+----------------+-----------+-------------+---------------------------------------+
    | binlog.000001 | 4 | Format_desc | 1 | 123 | Server ver: 5.7.20-log, Binlog ver: 4 |
    | binlog.000001 | 123 | Previous_gtids | 1 | 154 | |
    | binlog.000001 | 154 | Stop | 1 | 177 | |
    +---------------+-----+----------------+-----------+-------------+---------------------------------------+
    3 rows in set (0.01 sec)
  • 查看指定binlog文件的内容 show binlog events in 'binlog.000001'

  • 删除binlog
    • 使用linux命令删除binlog文件
    • 设置binlog的过期时间 使用variable expire_logs_days
    • 手动删除binlog
      • reset master;//删除master的binlog
      • reset slave; //删除slave的中继日志
      • purge master logs before ‘2012-03-30 17:20:00’; //删除指定日期以前的日志索引中binlog日志文件
      • purge master logs to ‘mysql-bin.000002’; //删除指定日志文件的日志索引中binlog日志文件
1
2
3
4
5
6
7
mysql> show variables like '%expire_logs_days%';
+------------------+-------+
| Variable_name | Value |
+------------------+-------+
| expire_logs_days | 0 |
+------------------+-------+
1 row in set (0.00 sec)
  • flush logs 刷新日志
    • 当停止或重启服务器时,服务器会把日志文件记入下一个日志文件,Mysql会在重启时生成一个新的日志文件,文件序号递增;此外,如果日志文件超过max_binlog_size(默认值1G)系统变量配置的上限时,也会生成新的日志文件(在这里需要注意的是,如果你正使用大的事务,二进制日志还会超过max_binlog_size,不会生成新的日志文件,事务全写入一个二进制日志中,这种情况主要是为了保证事务的完整性);日志被刷新时,新生成一个日志文件。

ERROR 1728 (HY000): Cannot load from mysql.procs_priv. The table is probably corrupted

今天在搞mysql binlog收集时,需要创建一个mysql用户,结果出现了:

ERROR 1728 (HY000): Cannot load from mysql.procs_priv. The table is probably corrupted异常

解决办法:

sudo mysql_upgrade -u root -p

注意后面的用户名和密码自己修改为自己的哈。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
~ » sudo mysql_upgrade -u root -p
Password:
Enter password:
Checking if update is needed.
Checking server version.
Running queries to upgrade MySQL server.
Checking system database.
mysql.columns_priv OK
mysql.db OK
mysql.engine_cost OK
mysql.event OK
mysql.func OK
mysql.general_log OK
mysql.gtid_executed OK
mysql.help_category OK
mysql.help_keyword OK
mysql.help_relation OK
mysql.help_topic OK
mysql.innodb_index_stats OK
mysql.innodb_table_stats OK
mysql.ndb_binlog_index OK
mysql.plugin OK
mysql.proc OK
mysql.procs_priv OK
mysql.proxies_priv OK
mysql.server_cost OK
mysql.servers OK
mysql.slave_master_info OK
mysql.slave_relay_log_info OK
mysql.slave_worker_info OK
mysql.slow_log OK
mysql.tables_priv OK
mysql.time_zone OK
mysql.time_zone_leap_second OK
mysql.time_zone_name OK
mysql.time_zone_transition OK
mysql.time_zone_transition_type OK
mysql.user OK
The sys schema is already up to date (version 1.5.1).
Checking databases.
sys.sys_config OK
temp.hsc_biz_system_customer OK
temp.hsc_settlement_info OK
temp.hsc_settlement_invoice OK
temp.hsc_settlement_object OK
temp.invoice_config OK
test.worker_node OK
test.a OK
test.application OK
test.application_acl OK
test.b OK
test.big_table OK
test.certificate_info_tab OK
test.company_tab OK
test.connection_config OK
test.leader_election OK
test.test OK
test.test1 OK
test.test_emoij OK
test.user OK
Upgrade process completed successfully.
Checking if update is needed.
------------------------------------------------------------

然后重启mysql以后,在尝试创建用户,发现完美解决。

监控系统的模板功能

这篇文章主要总结整理一下最近工作中两套监控系统合并过程中使用模板功能来抽象【针对应用相关的监控】
和【针对机器相关的监控】的差异。

背景

公司主要的监控系统主要是针对应用级别的,所有的指标都是属于App级别的,也就是想看某条监控指标数据,首先需要选定具体的APP。
然后公司也有一套针对机器相关的监控系统,因为机器相关的监控很多时候是以机器为维度的,也就是想查询某个机器的指标数据时
需要先指定具体的机器。同时由于公司业务的特殊性,也需要增加对多种终端类型(比如门店、货柜)的监控相关的功能。同时考虑到开发
、维护等成本等等的诸多因素,现在需要将两套监控系统合并。因此需要一套通用的模型来抽象这种问题。

其中这套模式本身并不复杂,没啥好说的。

模板功能模型

下面这个图是我当时设计的模板功能

简单描述一下这个图:

  • Endpoint可以属于多个EndpointGroup
  • EndpointGroup可以绑定多个Template, 一个Template也可以绑定到多个EndpointGroup上面
  • Template会包含多个Metric数据,同时也支持继承关系,也就是子模板会继承父模板的指标数据,如果父子模板中都含有同样的
    Metric的话,那么子模板的会覆盖父模板的指标数据。
  • Endpoint也可以直接和Template来进行绑定,而不需要通过EndpointGroup
  • Endpoint可以直接添加Metric数据,这个主要是因为目前针对App相关的监控都是属于这种情况的。

Endpoint

因为监控指标数据可能属于一个APP,也可能属于一个机器、门店、货柜、咖啡机等。所以使用了抽象的Endpoint来描述指标的归属。

EndpointGroup

含义很简单,之所以弄这个主要是为了做批处理,这样就不需要针对每一个Endpoint来进行操作了,一次可以操作一批。

Template

Template其实就是一些Metric的集合,支持继承功能,继承功能其实并不是必须的,可以通过组合来实现,最后我们选择了继承
的方式,主要是考虑到针对下面的场景,继承这种方式描述起来更好一些:

所有的机器都有一些基础监控,比如针对cpu、内存的监控,我们将这些基础的指标数据使用template1来进行打包,然后如果想对所有的
dev环境的机器增加一些监控指标,这些监控指标我们使用Template2来打包

如果使用组合的方式,需要对Endpoint或者EndpointGroup绑定Template1和Template2,而如何使用继承的话,只需要绑定Template2
就好。

Metric

这个比较简单,所有监控系统的基本语义。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×