不在要Netty的Pinple的线程中乱设置拒绝策略

之前给业务同学排查问题时发现我们的Trace服务的某个地方当Trace的量特别大时,一个线程池会对业务层抛出RejectedExecutionException
于是热心的我就顺手给这个地方加了一个RejectedExecutionHandler的实现,在这个里面加一个监控,然后就没了。这样是没问题的,但是
「手贱」的我看到本文件中另外一个地方的线程池也没有设置拒绝策略。他之前的代码如下:

1
2
3
4
String name = nettyClientConfig.getString(nameKey);
String workerExecutorName = StringUtils.hasText(name) ? String.format("%s.NettyClientCodecThread", name) : "NettyClientCodecThread";
ThreadFactory threadFactory = new DefaultThreadFactory(workerExecutorName, false);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(threadSize, threadFactory);

我修改之后为:

1
2
3
4
5
6
String name = nettyClientConfig.getString(nameKey);
String workerExecutorName = StringUtils.hasText(name) ? String.format("%s.NettyClientCodecThread", name) : "NettyClientCodecThread";
ThreadFactory threadFactory = new NamedThreadFactory(workerExecutorName, false);
int maxPendingTasks = nettyClientConfig.getInteger(maxPendingTasksKey, 100);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(threadSize, threadFactory, maxPendingTasks,
(task, executor) -> Metrics.counter("NettyClientCodecThread.rejected.counter").get().inc());

心想着老子又做了一回活雷锋。但是后来这个改动坑了一波,因为这个defaultEventExecutorGroup线程池是在Netty的pipeline中使用的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Bootstrap handler = this.bootstrap.group(this.eventLoopGroupSelector).channel(useEpoll() ? EpollSocketChannel.class : NioSocketChannel.class)//
.option(TCP_NODELAY, true)
.option(SO_KEEPALIVE, false)
.option(CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getInteger(connectTimeoutKey))
.option(SO_SNDBUF, nettyClientConfig.getInteger(socketSndBufSizeKey))
.option(SO_RCVBUF, nettyClientConfig.getInteger(socketRecBufSizeKey))
.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, writeBufferLowWaterMark)
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, writeBufferHighWaterMark)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//看这里
defaultEventExecutorGroup,
new NettyDecoder(nettyClientConfig),
prependerHandler,
encoderHandler,
new IdleStateHandler(0, 0, nettyClientConfig.getInteger(NettyClientConfig.channelMaxIdleTimeSecondsKey)),
connectManageHandler,
clientHandler);
}
});

然后这部分的代码因为是单独抽取出来的,所以Netty的两端都会使用这份代码,出现的现象就是,当传输的数据量太大的时候,如果触发拒绝策略,那么相当于这个TCP包就被丢弃了。
如果丢弃的这个包是一个完整的包(其实tcp是流协议,没有包的概念,此处指的包是一段完整的数据字节流)那么就不会出问题。

为啥改动之前没问题呢,因为默认的DefaultEventExecutorGroup的代码为:

1
2
3
4
5
6
7
8
9
10
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used.
*/
public DefaultEventExecutorGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SingleThreadEventExecutor.DEFAULT_MAX_PENDING_EXECUTOR_TASKS,
RejectedExecutionHandlers.reject());
}

也就是他的拒绝策略虽然也是RejectedExecutionHandlers.reject(),但是他的队列是无界的(Integer.MAX_VALUE,近乎无界),所以基本上在他OOM挂掉之前他一般不会触发这个rejeced。
在排查这个问题的时候,我压根就不会觉的这个地方有问题,知道后来实在想不出有啥其他原因了,抓包分析发现问题果然出现在这里。

从Netty的ResourceLeakDetector#Lavel的设计的一些感想

Netty中的ResourceLeakDetector#Level有4个级别:

  • DISABLED 这种模式下不进行泄露监控。
  • SIMPLE 这种模式下以1/128的概率抽取ByteBuf进行泄露监控。
  • ADVANCED 在SIMPLE的基础上,每一次对ByteBuf的调用都会尝试记录调用轨迹,消耗较大
  • PARANOID 在ADVANCED的基础上,对每一个ByteBuf都进行泄露监控,消耗最大。

一般而言,在项目的初期使用SIMPLE模式进行监控,如果没有问题一段时间后就可以关闭。否则升级到ADVANCED或者PARANOID模式尝试确认泄露位置。

结合自己做中间件开发的一些感触吧:

  • client端新增加的功能,最好都有一个对应的开关,便于出问题的时候及时调整,给自己留个后路
  • client的功能尽量支持动态升级和降级,非核心功能不要影响业务功能,分清楚主次。
  • client端的功能代码必要的时候一定需要辅有排查问题的辅助代码
  • 非核心功能,能异步就异步,尽可能快,异步处理的时候,尤其是异步回调的时候,一定要风清楚代码是在哪个线程池中执行的。

Lettuce一定要打开redis集群拓扑刷新功能

在使用Lettuce访问Redis的时候,一定要记得打开它的Redis 集群拓扑刷新功能,否则他压根就不存在高可用。因为他的集群拓扑刷新功能是默认没开启的。

上面的3个文章其实说的就是这个事情,在redis集群拓扑结构发生变化,比如Redis的master挂掉了后,lettuce的client端就会长时间不能恢复。因此可以通过下面的配置打开拓扑刷新功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//默认超时时间, lettuce默认超时时间为60s太长了,此处默认设置为15s
private Long timeoutInMillis = Duration.ofSeconds(15).toMillis();

static ClusterClientOptions.Builder initDefaultClusterClientOptions(ClusterClientOptions.Builder builder) {
ClusterTopologyRefreshOptions defaultClusterTopologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
//开启集群拓扑结构周期性刷新,和默认参数保持一致
.enablePeriodicRefresh(60, TimeUnit.SECONDS)
//开启针对{@link RefreshTrigger}中所有类型的事件的触发器
.enableAllAdaptiveRefreshTriggers()
//和默认一样,30s超时,避免短时间大量出现刷新拓扑的事件
.adaptiveRefreshTriggersTimeout(30, TimeUnit.SECONDS)
//和默认一样重连5次先,然后在刷新集群拓扑
.refreshTriggersReconnectAttempts(5)
.build();

return builder
// 配置用于开启自适应刷新和定时刷新。如自适应刷新不开启,Redis集群变更时将会导致连接异常
.topologyRefreshOptions(defaultClusterTopologyRefreshOptions)
//默认就是重连的,显示定义一下
.autoReconnect(true)
//和默认一样最大重定向5次,避免极端情况无止境的重定向
.maxRedirects(5)
//Accept commands when auto-reconnect is enabled, reject commands when auto-reconnect is disabled.
.disconnectedBehavior(ClientOptions.DisconnectedBehavior.DEFAULT)
.socketOptions(SocketOptions.builder().keepAlive(true).tcpNoDelay(true).build())
//取消校验集群节点的成员关系
.validateClusterNodeMembership(false);
}

public static ClusterClientOptions.Builder getDefaultClusterClientOptionBuilder() {
return initDefaultClusterClientOptions(ClusterClientOptions.builder());
}

上面的配置其实就是修改默认的连接参数,打开集群拓扑刷新功能。其中有几个比较重要的地方,redis的默认超时时间是1分钟,其实这个时间太长了,很多的时候几秒钟就可以了,我这里是改为了15秒。
另外一个比较重要的参数就是validateClusterNodeMembership,这个大家一定要注意,默认这个属性是true的,也就是你的redis cluster集群增加一个redis节点,Lettuce默认是不信任这个节点的,
因此在内网的情况下,我们基本上都要关闭这个功能。也就是:validateClusterNodeMembership(false);

Netty如何检测ByteBuf没有release

Netty中的ByteBuf算是中间件开发中比较常用的API了,一般我们会使用PooledByteBuf来提升性能,但是这个玩意需要我们使用以后手动进行release,如果有时候忘记手动释放的话,会出现内存泄漏。
而且这种问题一般也没那么方便的排查。不过非常幸运的是Netty已经帮我们考虑到了这个问题,它提供了自己的检测工具:

  • ResourceLeakDetector
  • ResourceLeakTracker

基本思想

他的实现原理很巧妙,不过我们先不着急说Netty的实现,我们先想想如果我们自己来弄,我们一般会面临下面3个问题:

  • 被检测的对象创建的时候,我们就需要知道他创建了,然后做一些操作,比如该标记就标记,该计数就计数,
  • 对象「无用」的时候,我们也需要知道这个时刻。这里的「无用」一般我们选择对象被GC时
  • 我们还需要一种机制来判断对象在被GC之前有没有调用某个操作,比如release或者close操作。

下面以netty 4.0.46版本来说哈。

  • 第一个问题其实很好实现,在对象的构造函数中我们就可以做这些事情,因为对象的构造函数执行的时候,就是他被创建的时候
  • 第二个问题,Netty是利用了Java中的java.lang.ref.PhantomReference和引用队列这个东西。java.lang.ref.PhantomReference有叫虚引用也有叫做幽灵引用的,叫法无所谓,它和软引用(SoftReference)弱引用(WeakReference)不同,它并不影响对象的生命周期,如果一个对象与java.lang.ref.PhantomReference关联,则跟没有引用与之关联一样,在任何时候都可能被垃圾回收器回收。而且除过强引用之外,剩余的3种引用类型都有一个引用队列可以与之配合。当java清理调用不必要的引用后,会将这个引用本身(不是引用指向的值对象)添加到队列之中。比如你看PhantomReference的定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package java.lang.ref;


/**
* Phantom reference objects, which are enqueued after the collector
* determines that their referents may otherwise be reclaimed. Phantom
* references are most often used for scheduling pre-mortem cleanup actions in
* a more flexible way than is possible with the Java finalization mechanism.
*
* <p> If the garbage collector determines at a certain point in time that the
* referent of a phantom reference is <a
* href="package-summary.html#reachability">phantom reachable</a>, then at that
* time or at some later time it will enqueue the reference.
*
* <p> In order to ensure that a reclaimable object remains so, the referent of
* a phantom reference may not be retrieved: The <code>get</code> method of a
* phantom reference always returns <code>null</code>.
*
* <p> Unlike soft and weak references, phantom references are not
* automatically cleared by the garbage collector as they are enqueued. An
* object that is reachable via phantom references will remain so until all
* such references are cleared or themselves become unreachable.
*
* @author Mark Reinhold
* @since 1.2
*/

public class PhantomReference<T> extends Reference<T> {

/**
* Returns this reference object's referent. Because the referent of a
* phantom reference is always inaccessible, this method always returns
* <code>null</code>.
*
* @return <code>null</code>
*/
public T get() {
return null;
}

/**
* Creates a new phantom reference that refers to the given object and
* is registered with the given queue.
*
* <p> It is possible to create a phantom reference with a <tt>null</tt>
* queue, but such a reference is completely useless: Its <tt>get</tt>
* method will always return null and, since it does not have a queue, it
* will never be enqueued.
*
* @param referent the object the new phantom reference will refer to
* @param q the queue with which the reference is to be registered,
* or <tt>null</tt> if registration is not required
*/
public PhantomReference(T referent, ReferenceQueue<? super T> q) {
super(referent, q);
}

}

最下面的这个构造函数可以传递一个引用队列ReferenceQueue进去。

而借助这个PhantomReference和引用队列,我们其实可以知道对象啥时候「无用」了。因为我们只要在这个队列中的对象,其实都是被GC了的。

  • 上面的第三个问题其实比较好做,我们可以在对象内部维护状态之类的,就可以非常简单的解决这个问题。

源码相关说明

io.netty.util.HashedWheelTimer中其实就使用了ResourceLeakDetector。我们就以这个类简单的来说一下他的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
public class HashedWheelTimer implements Timer {

private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
.newResourceLeakDetector(HashedWheelTimer.class, 1);
private final ResourceLeakTracker<HashedWheelTimer> leak;

/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
* @param leakDetection {@code true} if leak detection should be enabled always,
* if false it will only be enabled if the worker thread is not
* a daemon thread.
* @param maxPendingTimeouts The maximum number of pending timeouts after which call to
* {@code newTimeout} will result in
* {@link java.util.concurrent.RejectedExecutionException}
* being thrown. No maximum pending timeouts limit is assumed if
* this value is 0 or negative.
* @throws NullPointerException if either of {@code threadFactory} and {@code unit} is {@code null}
* @throws IllegalArgumentException if either of {@code tickDuration} and {@code ticksPerWheel} is &lt;= 0
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {

if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;

// Convert tickDuration to nanos.
this.tickDuration = unit.toNanos(tickDuration);

// Prevent overflow.
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
workerThread = threadFactory.newThread(worker);

//注意看这里,这里其实就是根据参数和workerThread来判断是否检测
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

this.maxPendingTimeouts = maxPendingTimeouts;

if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}



@Override
public Set<Timeout> stop() {
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}

if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
//关闭监听,触发内部操作
boolean closed = leak.close(this);
assert closed;
}
}

return Collections.emptySet();
}

try {
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}

if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
//关闭监听,触发内部操作
boolean closed = leak.close(this);
assert closed;
}
}
return worker.unprocessedTimeouts();
}


//其他代码...

}

上面的代码省略了无关的部分,从中我们可以看到如何使用ResourceLeakDetectorResourceLeakTracker

首先提一下ResourceLeakDetector#Level有4个级别:

  • DISABLED 这种模式下不进行泄露监控。
  • SIMPLE 这种模式下以1/128的概率抽取ByteBuf进行泄露监控。
  • ADVANCED 在SIMPLE的基础上,每一次对ByteBuf的调用都会尝试记录调用轨迹,消耗较大
  • PARANOID 在ADVANCED的基础上,对每一个ByteBuf都进行泄露监控,消耗最大。

一般而言,在项目的初期使用SIMPLE模式进行监控,如果没有问题一段时间后就可以关闭。否则升级到ADVANCED或者PARANOID模式尝试确认泄露位置。
这一点可以给大家平时开发设计开发提一个醒,就是最好每一个功能有开关,尽量支持动态升级和降级,同时要辅有排查问题的辅助代码,这种手段在设计中间件client的时候需要经常用到。

可以通过JVM参数-Dio.netty.leakDetection.level=PARANOID来设置级别。

不过的级别有不同的行为,这部分代码在ResourceLeakDetector#track0中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private DefaultResourceLeak track0(T obj) {
Level level = ResourceLeakDetector.level;
if (level == Level.DISABLED) {
return null;
}

if (level.ordinal() < Level.PARANOID.ordinal()) {
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
reportLeak(level);
return new DefaultResourceLeak(obj);
} else {
return null;
}
} else {
reportLeak(level);
return new DefaultResourceLeak(obj);
}
}

这里其实就是完成我一开始说的第一步操作,我们从HashedWheelTimer的构造函数中可以看到,就是在构造函数里面调用的ResourceLeakDetector#track方法进而调用到trace0方法。这样就可以
知道对象创建的时机,然后在DefaultResourceLeak的实现中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//继承了PhantomReference
private final class DefaultResourceLeak extends PhantomReference<Object> implements ResourceLeakTracker<T>,
ResourceLeak {
//代码的创建位置
private final String creationRecord;
//最近几次访问的位置
private final Deque<String> lastRecords = new ArrayDeque<String>();
//被检测对象的hash,因为不能持有对象引用,否则不能gc
private final int trackedHash;

private int removedRecords;

DefaultResourceLeak(Object referent) {
super(referent, refQueue);

assert referent != null;

// Store the hash of the tracked object to later assert it in the close(...) method.
// It's important that we not store a reference to the referent as this would disallow it from
// be collected via the PhantomReference.
trackedHash = System.identityHashCode(referent);

Level level = getLevel();
if (level.ordinal() >= Level.ADVANCED.ordinal()) {
//newRecord其实就是获取对象创建的位置
creationRecord = newRecord(null, 3);
} else {
creationRecord = null;
}
allLeaks.put(this, LeakEntry.INSTANCE);
}

上面的newRecord其实就是获取对象创建的位置,一般我们动态的获取代码位置,都是通过StackTraceElement[] array = new Throwable().getStackTrace();然后来处理这个StackTraceElement数组。

Netty在实现这里的时候,使用了装饰器模式 ? 包装器模式(无所谓了),看这几个类就好了:

  • SimpleLeakAwareByteBuf
  • SimpleLeakAwareCompositeByteBuf
  • AdvancedLeakAwareByteBuf
  • AdvancedLeakAwareCompositeByteBuf

在文章最初说的第三个「我们还需要一种机制来判断对象在被GC之前有没有调用某个操作,比如release或者close操作。」,Netty这里是在ResourceLeakDetector中维护了一个
private final ConcurrentMap<DefaultResourceLeak, LeakEntry> allLeaks = PlatformDependent.newConcurrentHashMap(); 每次对象close或者release的时候,从这里移除就好了。
这样就不需要每个对象都有一个是否close或者是否release的状态位了。

有了上面的讲解,只要可以从引用队列拿出属性,然后看看这个allLeaks中有没有他的位置,那么就可以知道这个对象是否调用过某个操作,比如是否调用过release操作。

更详细的我懒得写了,大家自己看代码吧。也可以看看Netty如何监控内存泄露

借助arthas排查重复类的问题

现象描述

业务反馈他们的项目运行时出现Jackson中的com.fasterxml.jackson.databind.deser.SettableBeanProperty类的版本不对,和他们在pom中指定的版本不一致,这种问题一般都是因为项目的依赖(包括间接依赖)中,存在某些依赖有shade包,如果这些shade包打包的时候忘记修改package,那么就经常会出现这种问题。

解决思路

这种问题其实只要确定jvm加载的这个com.fasterxml.jackson.databind.deser.SettableBeanProperty到底来自哪个jar就可以帮助我们确定问题根源,而借助Arthas可以快速解决这个问题:

  • 使用Arthas连接具体环境的具体机器上的应用
  • 在console中输入如下的命令: sc -fd com.fasterxml.jackson.databind.deser.SettableBeanProperty
  • 查看console的输出,看其中的 code-source就可以指定这个类来自哪个jar了
1
2
3
4
5
6
## 安装arthas
curl -L https://alibaba.github.io/arthas/install.sh | sh
## $PID为自己项目运行的pid,注意修改, 此处使用tomcat用户是因为我们的程序是tomcat用户运行的
sudo -u tomcat -EH ./as.sh $PID
## arthas attach成功以后在console中输入
sc -fd com.fasterxml.jackson.databind.deser.SettableBeanProperty

下面贴一个sc命令的样例输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class-info        com.fasterxml.jackson.databind.deser.impl.SetterlessProperty
code-source /data/w/www/data-bbb-sea.aaa.com/webapps/ROOT/WEB-INF/lib/jackson-databind-2.10.3.jar
name com.fasterxml.jackson.databind.deser.impl.SetterlessProperty
isInterface false
isAnnotation false
isEnum false
isAnonymousClass false
isArray false
isLocalClass false
isMemberClass false
isPrimitive false
isSynthetic false
simple-name SetterlessProperty
modifier final,public
annotation
interfaces
super-class +-com.fasterxml.jackson.databind.deser.SettableBeanProperty
+-com.fasterxml.jackson.databind.introspect.ConcreteBeanPropertyBase
+-java.lang.Object
class-loader +-WebappClassLoader
context:
delegate: false
repositories:
/WEB-INF/classes/
----------> Parent Classloader:
org.apache.catalina.loader.StandardClassLoader@224edc67
+-org.apache.catalina.loader.StandardClassLoader@224edc67
+-sun.misc.Launcher$AppClassLoader@18b4aac2
+-sun.misc.Launcher$ExtClassLoader@5ccddd20
classLoaderHash 4c6a62ac
fields name serialVersionUID
type long
modifier final,private,static
value 1
name _annotated
type com.fasterxml.jackson.databind.introspect.AnnotatedMethod
modifier final,protected
name _getter
type java.lang.reflect.Method
modifier final,protected

Affect(row-cnt:11) cost in 183 ms.

Unexpected end of ZLIB input stream

前几天在项目开发是遇到了这个Unexpected end of ZLIB input stream异常。异常出现的位置:

1
2
3
4
5
Caused by: java.io.EOFException: Unexpected end of ZLIB input stream
at java.util.zip.InflaterInputStream.fill(InflaterInputStream.java:240)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:158)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:122)

之前一开始没太想清楚,以为是我写的GzipFilter出现了问题,后来吃了个午饭才恍然大悟,是client端的数据传输有点问题。简单抽象一下场景就是client通过http接口给server上报
一些数据,这些数据使用了gzip来进行压缩。问题出现在这个gzip压缩这快。我看来看看早期的有问题的代码:

1
2
3
4
5
6
7
8
9
10
11
12
private byte[] buildRequestBody(List<LoggerEntity> loggerEntities) {
try {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(JSON.writeValueAsBytes(loggerEntities));
return byteArrayOutputStream.toByteArray();
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

先说一下上面的代码是有问题的,问题在于try-with-resource里面的try中的2行代码,因为很可能gzipOutputStream没写完然后就已经return了。因此此处有两种处理办法,

第一种就是在try里面对gzipOutputStream进行close:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private byte[] buildRequestBody(List<LoggerEntity> loggerEntities) {
try {

try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(JSON.writeValueAsBytes(loggerEntities));
gzipOutputStream.finish();
gzipOutputStream.close();
return byteArrayOutputStream.toByteArray();
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

第二种就是将return语句拿到外层。

1
2
3
4
5
6
7
8
9
10
11
12
private byte[] buildRequestBody(List<LoggerEntity> loggerEntities) {
try {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) {
gzipOutputStream.write(JSON.writeValueAsBytes(loggerEntities));
}
return byteArrayOutputStream.toByteArray();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

缩短class路径

如果有时候在打印一些class日志时,经常会遇到class full name太长的问题,这个时候可以借助logback中的ch.qos.logback.classic.pattern.TargetLengthBasedClassNameAbbreviator来缩短输出。 ​

TargetLengthBasedClassNameAbbreviator

AttachNotSupportedException和jstack失败的常见原因

最近在公司升级Bistoury Agent时发现,有不少应用出AttachNotSupportedException异常:

1
2
3
4
5
6
7
com.sun.tools.attach.AttachNotSupportedException: Unable to open socket file: target process not responding or HotSpot VM not loaded
at sun.tools.attach.LinuxVirtualMachine.<init>(LinuxVirtualMachine.java:106) ~[tools.jar:na]
at sun.tools.attach.LinuxAttachProvider.attachVirtualMachine(LinuxAttachProvider.java:78) ~[tools.jar:na]
at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:250) ~[tools.jar:na]
at qunar.tc.bistoury.commands.arthas.ArthasStarter.attachAgent(ArthasStarter.java:74) ~[bistoury-commands-1.4.22.jar:na]
at qunar.tc.bistoury.commands.arthas.ArthasStarter.start(ArthasStarter.java:57) ~[bistoury-commands-1.4.22.jar:na]
at qunar.tc.bistoury.commands.arthas.ArthasEntity.start(ArthasEntity.java:82) [bistoury-commands-1.4.22.jar:na]

但是这样应用的行为和监控指标都是特别正常的,此时如果给这些应用使用:sudo -u tomcat jstack [pid](备注我们的应用是tomcat用户运行的)的话,会发现jstack
使用出问题,一个例子为:

1
2
3
sudo -u tomcat /home/w/java/default/bin/jstack 691167
691167: Unable to open socket file: target process not responding or HotSpot VM not loaded
The -F option can be used when the target process is not responding

然后查看tomcat的catalina.out文件的话,会发现jstack的输出输出在这个文件中了。在经过一番google后发现是因为/tmp目录下面的.java_pid[pid]文件被删除了。
经过在我们公司服务器上实测,在删除/tmp/.java_pidxxxx文件以后,jstack此时就会出现上面的现象。然后agent也会attach失败。只能等应用重启暂时恢复。

接下来的问题就是为什么这个.java_pid文件会被删除,后来发现我们公司的centos7上面的/usr/lib/tmpfiles.d/tmp.conf中配置的会对/tmp目录下超过10天的文件进行删除。

现在我们已经让Ops同学统一调整这个删除逻辑了,针对.java_pid开头的文件在删除之前会检查一下是否存在这个pid进程。当对应的pid存在的时候就不进行删除,不存在在进行删除。

gc Roots对象有哪些

JVM的垃圾自动回收是我们经常说的一个话题,这里的垃圾的含义是:

内存中已经不再被使用到的对象就是垃圾

要进行垃圾回收,如何判断一个对象是否可以被回收?

一般有两种办法:

  • 引用计数法
    • 实现简单,但是没法解决对象之间的循环引用问题
  • 枚举根节点做可达性分析
    • 通过一系列名为“GC Roots”的对象作为起始点,从“GC Roots”对象开始向下搜索,如果一个对象到“GC Roots”没有任何引用链相连,说明此对象可以被回收

常见的常见的GC Root有如下:

  • 通过System Class Loader或者Boot Class Loader加载的class对象,通过自定义类加载器加载的class不一定是GC Root
  • 处于激活状态的线程
  • 栈中的对象
  • 本地方法栈中 JNI (Native方法)的对象
  • JNI中的全局对象
  • 正在被用于同步的各种锁对象
  • JVM自身持有的对象,比如系统类加载器等。

Synchronized的一些东西

synchronized是Java中解决并发问题的一种最常用的方法,从语法上讲synchronized总共有三种用法:

  • 修饰普通方法
  • 修饰静态方法
  • 修饰代码块

synchronized 原理

为了查看synchronized的原理,我们首先反编译一下下面的代码, 这是一个synchronized修饰代码块的demo

1
2
3
4
5
6
7
public class SynchronizedDemo {
public void method() {
synchronized (this) {
System.out.println("Method 1 start");
}
}
}

http://7niucdn.wenchao.ren/20190902124248.png

从上面截图可以看到,synchronized的实现依赖2个指令:

  • monitorenter
  • monitorexit

但是从上面的截图可以看到有一个monitorenter和2个monitorexit,这里之所以有2个monitorexit是因为synchronized的锁释放有2种情况:

  • 方法正常执行完毕synchronized的范围,也就是正常情况下的锁释放
  • synchronized圈起来的范围内的代码执行抛出异常,导致锁释放

monitorenter

关于这个指令,jvm中的描述为:

Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref, as follows:

• If the entry count of the monitor associated with objectref is zero, the thread enters the monitor and sets its entry count to one. The thread is then the owner of the monitor.
• If the thread already owns the monitor associated with objectref, it reenters the monitor, incrementing its entry count.
• If another thread already owns the monitor associated with objectref, the thread blocks until the monitor’s entry count is zero, then tries again to gain ownership.

翻译一下大概为:

每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:

  • 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
  • 如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
  • 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。

monitorexit

The thread that executes monitorexit must be the owner of the monitor associated with the instance referenced by objectref.
The thread decrements the entry count of the monitor associated with objectref. If as a result the value of the entry count is zero, the thread exits the monitor and is no longer its owner. Other threads that are blocking to enter the monitor are allowed to attempt to do so.

翻译一下为:

执行monitorexit的线程必须是objectref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。

通过这两段描述,我们应该能很清楚的看出Synchronized的实现原理,Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,这就是为什么只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。

上面的demo是使用synchronized修饰代码块的demo,下面我们看一个使用synchronized修饰方法的demo:

1
2
3
4
5
public class SynchronizedMethod {
public synchronized void method() {
System.out.println("Hello World!");
}
}

我们继续对这个类进行反编译:

http://7niucdn.wenchao.ren/20190902125437.png

从反编译的结果来看,方法的同步并没有通过指令monitorentermonitorexit来完成(虽然理论上其实也可以通过这两条指令来实现),不过相对于普通方法,其常量池中多了ACC_SYNCHRONIZED标示符。JVM就是根据该标示符来实现方法的同步的:

  • 当方法调用时,调用指令将会检查方法的 ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先获取monitor,获取成功之后才能执行方法体,方法执行完后再释放monitor。
  • 在方法执行期间,其他任何线程都无法再获得同一个monitor对象。 其实本质上没有区别,只是方法的同步是一种隐式的方式来实现,无需通过字节码来完成。

观城模型

Synchronized 锁升级

这四种锁是指锁的状态,专门针对synchronized的。在介绍这四种锁状态之前还需要介绍一些额外的知识。

首先为什么Synchronized能实现线程同步?在回答这个问题之前我们需要了解两个重要的概念:Java对象头Monitor

Java对象头

synchronized是悲观锁,在操作同步资源之前需要给同步资源先加锁,这把锁就是存在Java对象头里的,而Java对象头又是什么呢?
我们以Hotspot虚拟机为例,Hotspot的对象头主要包括两部分数据:Mark Word(标记字段)、Klass Pointer(类型指针)。如下面的一个示例:

java object示意图

我们以64位虚拟机来说,object header的结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|------------------------------------------------------------------------------------------------------------|--------------------|
| Object Header (128 bits) | State |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| Mark Word (64 bits) | Klass Word (64 bits) | |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| unused:25 | identity_hashcode:31 | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Normal |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| thread:54 | epoch:2 | unused:1 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Biased |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| ptr_to_lock_record:62 | lock:2 | OOP to metadata object | Lightweight Locked |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| ptr_to_heavyweight_monitor:62 | lock:2 | OOP to metadata object | Heavyweight Locked |
|------------------------------------------------------------------------------|-----------------------------|--------------------|
| | lock:2 | OOP to metadata object | Marked for GC |
|------------------------------------------------------------------------------|-----------------------------|--------------------|

Mark Word

Mark Word默认存储:

  • 对象的HashCode
  • 分代年龄
  • 锁标志位信息

这些信息都是与对象自身定义无关的数据,所以Mark Word被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。它会根据对象的状态复用自己的存储空间,也就是说在运行期间Mark Word里存储的数据会随着锁标志位的变化而变化。

Klass Point

Klass Point:对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例。

Monitor

Monitor可以理解为一个同步工具或一种同步机制,通常被描述为一个对象。每一个Java对象就有一把看不见的锁,称为内部锁或者Monitor锁。
Monitor是线程私有的数据结构,每一个线程都有一个可用monitor record列表,同时还有一个全局的可用列表。每一个被锁住的对象都会和一个monitor关联,同时monitor中有一个Owner字段存放拥有该锁的线程的唯一标识,表示该锁被这个线程占用。

synchronized通过Monitor来实现线程同步,Monitor是依赖于底层的操作系统的Mutex Lock(互斥锁)来实现的线程同步。

synchronized最初实现同步的方式是阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长,这就是JDK 6之前synchronized效率低的原因。这种依赖于操作系统Mutex Lock所实现的锁我们称之为“重量级锁”,JDK 6中为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”。

所以目前 锁一共有4种状态,级别从低到高依次是:无锁、偏向锁、轻量级锁和重量级锁。锁状态只能升级不能降级

下面是出四种锁状态对应的的Mark Word内容,然后再分别讲解四种锁状态的思路以及特点:

上下两幅图对照着看,我们就能够清楚的知道在不同的锁状态下,mack word区域存储的内容的不同了。

无锁

无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。

无锁的特点就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。CAS原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。

偏向锁

偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。

在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。,也就是说偏向锁一般是一个线程的事情

当一个线程访问同步代码块并获取锁时,会在Mark Word里存储锁偏向的线程ID。在线程进入和退出同步块时不再通过CAS操作来加锁和解锁,而是检测Mark Word里是否存储着指向当前线程的偏向锁。引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次CAS原子指令,而偏向锁只需要在置换ThreadID的时候依赖一次CAS原子指令即可。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销,需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。

偏向锁在JDK 6及以后的JVM里是默认启用的。可以通过JVM参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。

轻量级锁

轻量级锁是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。因此一般情况下轻量级锁大多数是2个线程的事情。

在代码进入同步块的时候,如果同步对象锁状态为无锁状态(锁标志位为“01”状态,是否为偏向锁为“0”),虚拟机首先将在当前线程的栈帧中建立一个名为锁记录(Lock Record)的空间,用于存储锁对象目前的Mark Word的拷贝,然后拷贝对象头中的Mark Word复制到锁记录中。

拷贝成功后,虚拟机将使用CAS操作尝试将对象的Mark Word更新为指向Lock Record的指针,并将Lock Record里的owner指针指向对象的Mark Word。

  • 如果这个更新动作成功了,那么这个线程就拥有了该对象的锁,并且对象Mark Word的锁标志位设置为“00”,表示此对象处于轻量级锁定状态。
  • 如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的Mark Word是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。

若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。

重量级锁

升级为重量级锁时,锁标志的状态值变为“10”,此时Mark Word中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。

整体的锁状态升级流程如下:

无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁

综上,偏向锁通过对比Mark Word解决加锁问题,避免执行CAS操作。而轻量级锁是通过用CAS操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。

参考资料

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×