确保数据落盘

在之前的文章《unix IO模型》我们曾经提到过,用户空间内核空间缓存IO等概念。关于这些概念,大家可以阅读这篇文章,在本篇文章中,我们就不在涉及这些概念了。

IO缓冲机制

大家需要有一个认知就是我们平时写的程序,在将数据到文件中时,其实数据不会立马写入磁盘中进行持久化存储的,而是会经过层层缓存,如下图所示:

I/O buffering

其中这每层缓存都有自己的刷新时机,每层缓存都刷新后才会写入磁盘进行持久化存储。这些缓存的存在目的本意都是为了加速读写操作,因为如果每次读写都对应真实磁盘操作,那么读写的效率会大大降低。但是同样带来的坏处是如果期间发生掉电或者别的故障,还未写入磁盘的数据就丢失了。对于数据安全敏感的应用,比如数据库,比如交易程序,这是无法忍受的。所以操作系统提供了保证文件落盘的机制。

在上面这图中说明了操作系统到磁盘的数据流,以及经过的缓冲区。首先数据会先存在于应用的内存空间,如果调用库函数写入,库函数可能还会把数据缓存在库函数所维护的缓冲区空间中,比如C标准库stdio提供的方法就会进行缓存,目的是为了减少系统调用的次数。这两个缓存都是在用户空间中的。库函数缓存flush时,会调用write系统调用将数据写入内核空间,内核同样维护了一个页缓存(page cache),操作系统会在合适的时间把脏页的数据写入磁盘。即使是写入磁盘了,磁盘也可能维护了一个缓存,在这个时候掉电依然会丢失数据的,只有写入了磁盘的持久存储物理介质上,数据才是真正的落盘了,是安全的。

比如在网络套接字上侦听连接并将从每个客户端接收的数据写入文件的应用程序。 在关闭连接之前,服务器确保将接收到的数据写入稳定存储器,并向客户端发送此类确认,请看下面的简化代码(代码中已经注释):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int sock_read(int sockfd, FILE *outfp, size_t nrbytes)
{
int ret;
size_t written = 0;
//example of an application buffer
char *buf = malloc(MY_BUF_SIZE);

if (!buf)
return -1;

//take care of reading the data from the socket
//and writing it to the file stream
while (written < nrbytes) {
ret = read(sockfd, buf, MY_BUF_SIZE);
if (ret =< 0) {
if (errno == EINTR)
continue;
return ret;
}
written += ret;
ret = fwrite((void *)buf, ret, 1, outfp);
if (ret != 1)
return ferror(outfp);
}

//flush file stream, the data to move into the "Kernel Buffers" layer
ret = fflush(outfp);
if (ret != 0)
return -1;

//makethe data is saved to the "Stable Storage" layer
ret = fsync(fileno(outfp));
if (ret < 0)
return -1;
return 0;
}

在上面的这幅图中,可以看到数据流向经过了用户控件缓冲区内核缓存区,下面我们说说这2个缓存区。

用户空间缓冲区

用户空间的缓存分为:

  • 应用程序本身维护的缓冲区
  • 库维护的缓冲区

应用本身维护的缓冲区需要开发者自己刷新,调用库函数写入到库函数的缓冲区中(这一步可能不存在)。如果应用程序不依赖任何库函数,而是直接使用系统调用,那么则是把数据写入系统的缓冲区去。

库函数一般都会维护缓冲区,目的是简化应用程序的编写,应用程序就不需要编写维护缓冲区的代码,库维护的缓冲区针对那些没有应用程序本身维护的缓存区的程序来说,在某些时候是会提升不少的性能的,因为缓冲区大大减少了系统调用的次数,而系统调用是非常耗时的,系统调用涉及到用户态到内核态的切换,这个切换需要很多的步骤与校验,较为耗时。

比如C标准库stdio就维护着一个缓冲区,对应这个缓冲区,C标准库提供了fflush方法强制把缓冲区数据写入操作系统。

在Java的OutputStream接口提供了一个flush方法,具体的作用要看实现类的具体实现。BufferedOutputStream#flush就会把自己维护的缓冲区数据写入下一层的OutputStream。比如是new BufferedOutputStream(new FileOutputStream("/"))这样的模式,则调用BufferedOutputStream#flush会将数据写入操作系统。

内核缓冲区

应用程序直接或者通过库函数间接的使用系统调用write将数据写入操作系统缓冲区.UNIX系统在内核中设有高速缓存或页面高速缓存。目的是为了减少磁盘读写次数。

用户写入系统的数据先写入系统缓冲区,系统缓冲区写满后,将其排入输出队列,然后得到队首时,才进行实际的IO操作。这种输出方式被称为延迟写

UNIX系统提供了三个系统调用来执行刷新内核缓冲区:sync,fsync,fdatasync

sync

1
2
3
// sync() causes all pending modifications to filesystem metadata and
//cached file data to be written to the underlying filesystems.
void sync(void)

sync函数只是将所有修改过的块缓冲区排入输出队列就返回,并不等待实际的写磁盘操作返回。 操作系统的update系统守护进程会周期地调用sync函数,来保证系统中的数据能定期落盘。

根据sync(2) - Linux manual page的描述,Linux对sync的实现与POSIX规范不太一样,POSIX规范中,sync可能在文件真正落盘前就返回,而Linux的实现则是文件真正落盘后才会返回。所以Linux中,sync与fsync的效果是一样的!但是1.3.20之前的Linux存在BUG,导致sync并不会在真正落盘后返回。

fsync

1
void fsync(int filedes)

fsync对指定的文件起作用,它传输内核缓冲区中这个文件的数据到存储设备中,并阻塞直到存储设备响应说数据已经保存好了。

fsync对文件数据与文件元数据都有效。文件的元数据可以理解为文件的属性数据,比如文件的更新时间,访问时间,长度等。

fdatasync

1
void fdatasync(int filedes)

fdatasyncfsync类似,两者的区别是,fdatasync不一定需要刷新文件的元数据部分到存储设备。

是否需要刷新文件的元数据,是要看元数据的变化部分是否对之后的读取有影响,比如文件元数据的访问时间st_atime和修改时间st_mtime变化了,fdatasync不会去刷新元数据数据到存储设备,因为即使这个数据丢失了不一致了,也不影响故障恢复后的文件读取。但是如果文件的长度st_size变化了,那么就需要刷新元数据数据到存储设备。

所以如果你每次都更新文件长度,那么调用fsyncfdatasync的效果是一样的。

但是如果更新能做到不修改文件长度,那么fdatasync能比fsync少了一次磁盘写入,这个是非常大的速度提升。

open中的O_SYNC和O_DSYNC

除了上面三个系统调用,open系统调用在打开文件时,可以设置和同步相关的标志位:O_SYNCO_DSYNC

  • 设置O_SYNC的效果相当于是每次write后自动调用fsync。
  • 设置O_DSYNC的效果相当于是每次write后自动调用fdatasync。

关于新建文件

在一个文件上调用fsync/fdatasync只能保证文件本身的数据落盘,但是对于文件系统来说,目录中也保存着文件信息,fsync/fdatasync的调用并不会保证这部分的数据落盘。如果此时发生掉电,这个文件就无法被找到了。所以对于新建文件来说,还需要在父目录上调用fsync。

关于覆盖现有文件

覆盖现有文件时,如果发生掉电,新的数据是不会写入成功,但是可能会污染现有的数据,导致现有数据丢失。所以最佳实践是新建一个临时文件,写入成功后,再原子性替换原有文件。具体步骤:

  • 新建一个临时文件
  • 向临时文件写入数据
  • 对临时文件调用fsync,保证数据落盘。期间发生掉电对现有文件无影响。
  • 重命名临时文件为目标文件名
  • 对父目录调用fsync

存储设备缓冲区

存储设备为了提高性能,也会加入缓存。高级的存储设备能提供非易失性的缓存,比如有掉电保护的缓存。但是无法对所有设备做出这种保证,所以如果数据只是写入了存储设备的缓存的话,遇到掉电等故障,依然会导致数据丢失。

对于保证数据能保存到存储设备的持久化存储介质上,而不管设备本身是否有易失性缓存,操作系统提供了write barriers这个机制。开启了write barriers的文件系统,能保证调用fsync/fdatasync数据持久化保存,无论是否发生了掉电等其他故障,但是会导致性能下降。

许多文件系统提供了配置write barriers的功能。比如ext3, ext4, xfsbtrfsmount参数-o barrier表示开启写屏障,调用fsync/fdatasync能保证刷新存储设备的缓存到持久化介质上。-o nobarrier则表示关闭写屏障,调用fsync/fdatasync无法保证数据落盘。

Linux默认开启write barriers,所以默认情况下,我们调用fsync/fdatasync,就可以认为是文件真正的可靠落盘了

对于这个层面的数据安全保证来说,应用程序是不需要去考虑的,因为如果这台机器的硬盘被挂载为没有开启写屏障,那么可以认为这个管理员知道这个风险,他选择了更高的性能,而不是更高的安全性。

Java世界中的对应API

针对确保数据落盘,掉电也不丢失数据的情况,JDK也封装了对应的功能,并且为我们做好了跨平台的保证。

JDK中有三种方式可以强制文件数据落盘:

  • 调用FileDescriptor#sync函数
  • 调用FileChannel#force函数
  • 使用RandomAccessFilerws或者rwd模式打开文件

FileDescriptor#sync

FileDescriptor类提供了sync方法,可以用于保证数据保存到持久化存储设备后返回。使用方法:

1
2
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");
outputStream.getFD().sync();

可以看一下JDK是如何实现FileDescriptor#sync的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public native void sync() throws SyncFailedException;

// jdk/src/solaris/native/java/io/FileDescriptor_md.c
JNIEXPORT void JNICALL
Java_java_io_FileDescriptor_sync(JNIEnv *env, jobject this) {
// 获取文件描述符
FD fd = THIS_FD(this);
// 调用IO_Sync来执行数据同步
if (IO_Sync(fd) == -1) {
JNU_ThrowByName(env, "java/io/SyncFailedException", "sync failed");
}
}
// IO_Sync在UNIX系统上的定义就是fsync:
// jdk/src/solaris/native/java/io/io_util_md.h
#define IO_Sync fsync

FileChannel#force

之前的文章提到了,操作系统提供了fsync/fdatasync两个用户同步数据到持久化设备的系统调用,后者尽可能的会不同步文件元数据,来减少一次磁盘IO,提高性能。但是Java IO的FileDescriptor#sync只是对fsync的封装,JDK中没有对于fdatasync的封装,这是一个特性缺失。

Java NIO对这一点也做了增强,FileChannel类的force方法,支持传入一个布尔参数metaData,表示是否需要确保文件元数据落盘,如果为true,则调用fsync。如果为false,则调用fdatasync。

使用例子如下:

1
2
3
4
5
6
7
FileOutputStream outputStream = new FileOutputStream("/Users/mazhibin/b.txt");

// 强制文件数据与元数据落盘
outputStream.getChannel().force(true);

// 强制文件数据落盘,不关心元数据是否落盘
outputStream.getChannel().force(false);

在jdk中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class FileChannelImpl extends FileChannel {
private final FileDispatcher nd;
private final FileDescriptor fd;
private final NativeThreadSet threads = new NativeThreadSet(2);

public final boolean isOpen() {
return open;
}

private void ensureOpen() throws IOException {
if(!this.isOpen()) {
throw new ClosedChannelException();
}
}

// 布尔参数metaData用于指定是否需要文件元数据也确保落盘
public void force(boolean metaData) throws IOException {
// 确保文件是已经打开的
ensureOpen();
int rv = -1;
int ti = -1;
try {
begin();
ti = threads.add();

// 再次确保文件是已经打开的
if (!isOpen())
return;
do {
// 调用FileDispatcher#force
rv = nd.force(fd, metaData);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
} finally {
threads.remove(ti);
end(rv > -1);
assert IOStatus.check(rv);
}
}
}

实现中有许多线程同步相关的代码,不属于我们要关注的部分,就不分析了。FileChannel#force调用FileDispatcher#forceFileDispatcher是NIO内部实现用的一个类,封装了一些文件操作方法,其中包含了刷新文件的方法:

1
2
3
4
abstract class FileDispatcher extends NativeDispatcher {
abstract int force(FileDescriptor fd, boolean metaData) throws IOException;
// ...
}

FileDispatcher#force的实现:

1
2
3
4
5
6
class FileDispatcherImpl extends FileDispatcher {
int force(FileDescriptor fd, boolean metaData) throws IOException {
return force0(fd, metaData);
}
static native int force0(FileDescriptor fd, boolean metaData) throws IOException;
// ...

FileDispatcher#force的本地方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_force0(JNIEnv *env, jobject this,
jobject fdo, jboolean md)
{
// 获取文件描述符
jint fd = fdval(env, fdo);
int result = 0;

if (md == JNI_FALSE) {
// 如果调用者认为不需要同步文件元数据,调用fdatasync
result = fdatasync(fd);
} else {
#ifdef _AIX
/* On AIX, calling fsync on a file descriptor that is opened only for
* reading results in an error ("EBADF: The FileDescriptor parameter is
* not a valid file descriptor open for writing.").
* However, at this point it is not possibly anymore to read the
* 'writable' attribute of the corresponding file channel so we have to
* use 'fcntl'.
*/
int getfl = fcntl(fd, F_GETFL);
if (getfl >= 0 && (getfl & O_ACCMODE) == O_RDONLY) {
return 0;
}
#endif
// 如果调用者认为需要同步文件元数据,调用fsync
result = fsync(fd);
}
return handle(env, result, "Force failed");
}

可以看出,其实FileChannel#force就是简单的通过metaData参数来区分调用fsync和fdatasync。

同时在zookeeper的org.apache.zookeeper.common.AtomicFileOutputStream类中我们可以看到下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public void close() throws IOException {
boolean triedToClose = false, success = false;
try {
flush();
((FileOutputStream) out).getChannel().force(true);

triedToClose = true;
super.close();
success = true;
} finally {
if (success) {
boolean renamed = tmpFile.renameTo(origFile);
if (!renamed) {
// On windows, renameTo does not replace.
if (!origFile.delete() || !tmpFile.renameTo(origFile)) {
throw new IOException(
"Could not rename temporary file " + tmpFile
+ " to " + origFile);
}
}
} else {
if (!triedToClose) {
// If we failed when flushing, try to close it to not leak
// an FD
IOUtils.closeStream(out);
}
// close wasn't successful, try to delete the tmp file
if (!tmpFile.delete()) {
LOG.warn("Unable to delete tmp file " + tmpFile);
}
}
}
}

RandomAccessFile结合rws/rwd模式

RandomAccessFile打开文件支持4中模式:

  • r 以只读方式打开。调用结果对象的任何 write 方法都将导致抛出 IOException。
  • rw打开以便读取和写入。如果该文件尚不存在,则尝试创建该文件。
  • rws 打开以便读取和写入,对于rws,还要求对文件的内容或元数据的每个更新都同步写入到底层存储设备。
  • rwd 打开以便读取和写入,对于rwd,还要求对文件内容的每个更新都同步写入到底层存储设备。

其中rws模式会在open文件时传入O_SYNC标志位。rwd模式会在open文件时传入O_DSYNC标志位。

RandomAccessFile源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
// 4个标志位,用于组合表示4种模式
private static final int O_RDONLY = 1;
private static final int O_RDWR = 2;
private static final int O_SYNC = 4;
private static final int O_DSYNC = 8;

public RandomAccessFile(File file, String mode)
throws FileNotFoundException
{
String name = (file != null ? file.getPath() : null);
int imode = -1;
// 只读模式
if (mode.equals("r"))
imode = O_RDONLY;
else if (mode.startsWith("rw")) {
// 读写模式
imode = O_RDWR;
rw = true;

// 读写模式下,可以结合O_SYNC和O_DSYNC标志
if (mode.length() > 2) {
if (mode.equals("rws"))
imode |= O_SYNC;
else if (mode.equals("rwd"))
imode |= O_DSYNC;
else
imode = -1;
}
}
if (imode < 0)
throw new IllegalArgumentException("Illegal mode \"" + mode
+ "\" must be one of "
+ "\"r\", \"rw\", \"rws\","
+ " or \"rwd\"");
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkRead(name);
if (rw) {
security.checkWrite(name);
}
}
if (name == null) {
throw new NullPointerException();
}
if (file.isInvalid()) {
throw new FileNotFoundException("Invalid file path");
}
// 新建文件描述符
fd = new FileDescriptor();
fd.attach(this);
path = name;
open(name, imode);
}

private void open(String name, int mode)
throws FileNotFoundException {
open0(name, mode);
}

private native void open0(String name, int mode)
throws FileNotFoundException;

其中open0的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// jdk/src/share/native/java/io/RandomAccessFile.c
JNIEXPORT void JNICALL
Java_java_io_RandomAccessFile_open0(JNIEnv *env,
jobject this, jstring path, jint mode)
{
int flags = 0;
// JAVA中的标志位与操作系统标志位转换
if (mode & java_io_RandomAccessFile_O_RDONLY)
flags = O_RDONLY;
else if (mode & java_io_RandomAccessFile_O_RDWR) {
flags = O_RDWR | O_CREAT;
if (mode & java_io_RandomAccessFile_O_SYNC)
flags |= O_SYNC;
else if (mode & java_io_RandomAccessFile_O_DSYNC)
flags |= O_DSYNC;
}

// 调用fileOpen打开函数
fileOpen(env, this, path, raf_fd, flags);
}

fileOpen之后的流程与FileInputStream的一致。可以看出,相比于FileInputStream固定使用O_RDONLYFileOutputStream固定使用O_WRONLY | O_CREATRandomAccessFile提供了在Java中指定打开模式的能力。

但是同时我们需要清除,rwsrwd的效率比rw低非常非常多,因为每次读写都需要刷到磁盘才会返回,这两个中rwdrws效率高一些,因为rwd只刷新文件内容,rws刷新文件内容与元数据,文件的元数据就是文件更新时间等信息。

原子性的重命名文件

在java中的File类的renameTo方法,提供了重命名文件的功能。但是需要注意的是这个方法并不能保证原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Renames the file denoted by this abstract pathname.
*
* <p> Many aspects of the behavior of this method are inherently
* platform-dependent: The rename operation might not be able to move a
* file from one filesystem to another, it might not be atomic, and it
* might not succeed if a file with the destination abstract pathname
* already exists. The return value should always be checked to make sure
* that the rename operation was successful.
*
* <p> Note that the {@link java.nio.file.Files} class defines the {@link
* java.nio.file.Files#move move} method to move or rename a file in a
* platform independent manner.
*
* @param dest The new abstract pathname for the named file
*
* @return <code>true</code> if and only if the renaming succeeded;
* <code>false</code> otherwise
*
* @throws SecurityException
* If a security manager exists and its <code>{@link
* java.lang.SecurityManager#checkWrite(java.lang.String)}</code>
* method denies write access to either the old or new pathnames
*
* @throws NullPointerException
* If parameter <code>dest</code> is <code>null</code>
*/
public boolean renameTo(File dest) {

因此如果想原子性的重命名和移动文件,我们应该使用java.nio.file.Files类中的move方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/**
* Move or rename a file to a target file.
*
* <p> By default, this method attempts to move the file to the target
* file, failing if the target file exists except if the source and
* target are the {@link #isSameFile same} file, in which case this method
* has no effect. If the file is a symbolic link then the symbolic link
* itself, not the target of the link, is moved. This method may be
* invoked to move an empty directory. In some implementations a directory
* has entries for special files or links that are created when the
* directory is created. In such implementations a directory is considered
* empty when only the special entries exist. When invoked to move a
* directory that is not empty then the directory is moved if it does not
* require moving the entries in the directory. For example, renaming a
* directory on the same {@link FileStore} will usually not require moving
* the entries in the directory. When moving a directory requires that its
* entries be moved then this method fails (by throwing an {@code
* IOException}). To move a <i>file tree</i> may involve copying rather
* than moving directories and this can be done using the {@link
* #copy copy} method in conjunction with the {@link
* #walkFileTree Files.walkFileTree} utility method.
*
* <p> The {@code options} parameter may include any of the following:
*
* <table border=1 cellpadding=5 summary="">
* <tr> <th>Option</th> <th>Description</th> </tr>
* <tr>
* <td> {@link StandardCopyOption#REPLACE_EXISTING REPLACE_EXISTING} </td>
* <td> If the target file exists, then the target file is replaced if it
* is not a non-empty directory. If the target file exists and is a
* symbolic link, then the symbolic link itself, not the target of
* the link, is replaced. </td>
* </tr>
* <tr>
* <td> {@link StandardCopyOption#ATOMIC_MOVE ATOMIC_MOVE} </td>
* <td> The move is performed as an atomic file system operation and all
* other options are ignored. If the target file exists then it is
* implementation specific if the existing file is replaced or this method
* fails by throwing an {@link IOException}. If the move cannot be
* performed as an atomic file system operation then {@link
* AtomicMoveNotSupportedException} is thrown. This can arise, for
* example, when the target location is on a different {@code FileStore}
* and would require that the file be copied, or target location is
* associated with a different provider to this object. </td>
* </table>
*
* <p> An implementation of this interface may support additional
* implementation specific options.
*
* <p> Moving a file will copy the {@link
* BasicFileAttributes#lastModifiedTime last-modified-time} to the target
* file if supported by both source and target file stores. Copying of file
* timestamps may result in precision loss. An implementation may also
* attempt to copy other file attributes but is not required to fail if the
* file attributes cannot be copied. When the move is performed as
* a non-atomic operation, and an {@code IOException} is thrown, then the
* state of the files is not defined. The original file and the target file
* may both exist, the target file may be incomplete or some of its file
* attributes may not been copied from the original file.
*
* <p> <b>Usage Examples:</b>
* Suppose we want to rename a file to "newname", keeping the file in the
* same directory:
* <pre>
* Path source = ...
* Files.move(source, source.resolveSibling("newname"));
* </pre>
* Alternatively, suppose we want to move a file to new directory, keeping
* the same file name, and replacing any existing file of that name in the
* directory:
* <pre>
* Path source = ...
* Path newdir = ...
* Files.move(source, newdir.resolve(source.getFileName()), REPLACE_EXISTING);
* </pre>
*
* @param source
* the path to the file to move
* @param target
* the path to the target file (may be associated with a different
* provider to the source path)
* @param options
* options specifying how the move should be done
*
* @return the path to the target file
*
* @throws UnsupportedOperationException
* if the array contains a copy option that is not supported
* @throws FileAlreadyExistsException
* if the target file exists but cannot be replaced because the
* {@code REPLACE_EXISTING} option is not specified <i>(optional
* specific exception)</i>
* @throws DirectoryNotEmptyException
* the {@code REPLACE_EXISTING} option is specified but the file
* cannot be replaced because it is a non-empty directory
* <i>(optional specific exception)</i>
* @throws AtomicMoveNotSupportedException
* if the options array contains the {@code ATOMIC_MOVE} option but
* the file cannot be moved as an atomic file system operation.
* @throws IOException
* if an I/O error occurs
* @throws SecurityException
* In the case of the default provider, and a security manager is
* installed, the {@link SecurityManager#checkWrite(String) checkWrite}
* method is invoked to check write access to both the source and
* target file.
*/
public static Path move(Path source, Path target, CopyOption... options)
throws IOException

其中参数中的CopyOption可选性有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package java.nio.file;

/**
* Defines the standard copy options.
*
* @since 1.7
*/

public enum StandardCopyOption implements CopyOption {
/**
* Replace an existing file if it exists.
*/
REPLACE_EXISTING,
/**
* Copy attributes to the new file.
*/
COPY_ATTRIBUTES,
/**
* Move the file as an atomic file system operation.
*/
ATOMIC_MOVE;
}

我们看看kafka中怎么使用的,在kafka的org.apache.kafka.common.utils中有下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
try {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException outer) {
try {
Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
log.debug("Non-atomic move of " + source + " to " + target + " succeeded after atomic move failed due to "
+ outer.getMessage());
} catch (IOException inner) {
inner.addSuppressed(outer);
throw inner;
}
}
}

参考资料

集群调用容错的套路

在日常的工作和系统设计中,我们经常会使用RPC调用,而我们所部署的服务一般也都是集群模式。我们知道在分布式系统架构中,因为有很多的可能性,比如服务发布重启,网络抖动等问题,都可能会导致RPC调用失败,一般情况下我们的集群调用设计都需要有一定的容错策略。本篇文章就总结一下常见的集群调用容错套路:

  • Failover Cluster
  • Failfast Cluster
  • Failsafe Cluster
  • Failback Cluster
  • Forking Cluster
  • Broadcast Cluster

Failover Cluster

Failover Cluster模式就是 失败自动切换,当出现失败,重试其它服务器,这种一般通常用于幂等操作,比如读操作,但重试会带来更长延迟。一般实现这种模式的时候,需要注意的是重试的时候优先剔除刚刚出问题的节点,优先选择其余节点。

Failfast Cluster

Failfast Cluster是快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

Failsafe Cluster

Failfast Cluster是失败安全,出现异常时,直接忽略,就是fire and forget。比如一些场景下写入审计日志等操作,失败了也就失败了,可以忍受。

Failback Cluster

Failback Cluster是失败自动恢复,异步记录失败请求,定时重发。通常用于消息通知操作。

Forking Cluster

Forking Cluster 并行调用多个服务器,只要其中一个成功即返回。这种通常用于实时性要求较高的读操作,但需要浪费更多服务资源。

Broadcast Cluster

Broadcast Cluster是广播调用。就是广播请求到所有提供者,逐个调用,任意一台报错则报错,通常用于通知所有提供者更新缓存或日志等本地资源信息。

一致性哈希

本文谈谈一致性哈希,一致性哈希作为「负载均衡」中比较常见的一种实现,经常会有意无意的被大家使用到。我希望通过这篇文章可以使得你完全明白:

  • 一致性哈希要解决的问题
  • 一致性哈希的原理
  • 一致性哈希的优点
  • 一致性哈希的不适用场景
  • 如何手动实现一致性哈希
  • 常见开源代码中的一致性哈希实现

一致性哈希要优化的问题

一致性哈希要解决的问题,或者说目标,其实用一句话概括就是:在hash value区间有限并且可能会发生变化的情况下,相同的hash key尽可能得到同一个hash value

上面短短的一句话,我们可以得到一些重要信息:

一致性哈希的原理

上面短短的

一致性哈希负载均衡需要保证的是“相同的请求尽可能落到同一个服务器上”,注意这短短的一句描述,却包含了相当大的信息量。“相同的请求” — 什么是相同的请求?一般在使用一致性哈希负载均衡时,需要指定一个 key 用于 hash 计算

一致性哈希的优点

相比于传统的「取模」哈希,一致性哈希减少了因为服务节点变更导致的key的映射关系失效的数量

一致性哈希的不适用场景

trace 日志收集的一些考虑

这篇文章简单描述一下我这一两天关于trace日志收集的一些思考

trace日志收集的特点

  • trace日志分散 trace相关的日志收集和平时的日志收集还是有一些区别的,主要区别在于trace的日志是分散在各个系统的各个机器上,而不是集中在某个或者多个文件中的。这就要求trace的日志收集需要对所有的机器都开启,这样收集到的日志才全。
  • trace日志的顺序问题 因为trace的日志分散的缘故,以及日志agent在不同机器上的收集速度问题,可能导致在trace链路中靠后的日志反而先收集上来,而我们展示的时候,一般都是需要按照trace链路的时间顺序来展示的。
  • trace日志的查询特点 一般trace的日志,我们查询的时候都会基于TraceId来查询,这也就导致trace相关的日志存储的时候,和kafka的文件存储还是有一些区别的。
  • 相比写请求来说读请求极少

上面4个点,是我觉的trace日志收集的一些比较特殊的情况。

日志收集疑难点:

  • 日志一般存在轮转的情况,比如每小时轮转
  • 一般情况下,公司的服务器上都会配置定时压缩脚本,可能存在日志压缩的问题

这2个问题是我接下来准备研究的,所以这篇文章先不说这个。

初步简要设计想法

核心流程

  • 各个机器上的原始日志通过日志收集agent收集上来以后写入kafka中,这些原始数据中需要携带文件路径,文件名,机器host,收集的时间戳等信息
    • 可以基于这个原始的topic做日志的静态存储,写入hdfs中
  • 基于的kafka的原始数据,消费这些kafka数据,然后写入kafka的另外一个topic中,做日志分类。
    • 此处需要自定义kafka的分区分配策略,策略核心内容是基于日志中的traceid做分配。将相同的traceId的日志分配给同样的consumer节点,这样当使用traceId做日志查询的时候,就可以使用相同的策略去目标机器上找日志。
    • 备注:如果这一步要做数据高可用,那么可以给这个kafka topic增加consumer group来弄,也可以只有一个consumer group,但是写的时候写多个nfs。
  • 然后在消费第二步分类以后的kafka数据,这样按照下面说的存储方式存储

机器上的trace日志组织:

  • 按天 20190125
  • 按小时 20
  • 按照分钟级别划分 没10分钟/前后30分钟,最多1分钟一个目录
  • 按照appName分 比如
  • 如果这样单个目录下的文件数还是太多的话,在按照host来划分
  • 按照traceId分 每个traceId一个文件,文本文件,顺序写入,但是注意,这样写入的日志文件直接展示的时候顺序是有问题的,可能存在乱序的问题。

先按照时间是便于定期按照时间删除文件。之所以分这么多目录,主要是考虑到linux文件系统的限制。不要超过ulimit -n的限制。

单个trace日志文化的日志乱序问题

在上一个小节说的日志存储在单个trace日志文件中是可能乱序的,但是存储的时候没有去排序,只要是考虑到trace系统的特点,写非常多,读相比写来说非常少。而且大多数
时候单个trace链路并不会太长,所以可以考虑查询整个trace日志的时候在排序。

当然也不排序某些循环trace链路的问题,比如A发延迟消息给B,B收到以后在发延迟消息给A,这种就恶心,不过解决99.9%的问题就很不错了已经。

定时任务进行过期和压缩

  • 进行文件压缩,我打算选gzip,当然也可以选其他压缩算法,主要是平衡cpu和空间,定时任务压缩前1小时的文件夹下面的所有的trace日志文件
  • 文件过期删除

先写这么多

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×