ThreadLocal


1. 作用

ThreadLocal类主要解决的就是让每个线程绑定自己的值,每一个线程都有自己的专属本地变量,避免线程安全问题。

注意:阿里巴巴Java开发手册里规定:

【参考】ThreadLocal无法解决共享对象的更新问题,ThreadLocal对象建议使用static修饰。这个变量是针对一个线程内所有操作共享的,所以设置为静态变量,所有此类实例共享此静态变量 ,也就是说在类第一次被使用时装载,只分配一块存储空间,所有此类的对象(只要是这个线程内定义的)都可以操控这个变量。

2. 原理

Thread 类中有一个 threadLocals 和 一个 inheritableThreadLocals 变量,它们都是 ThreadLocalMap 类型的变量,我们可以把 ThreadLocalMap 理解为 ThreadLocal 类实现的定制化的 HashMap。默认情况下这两个变量都是 null,只有当前线程调用 ThreadLocal 类的 setget方法时才创建它们,实际上调用这两个方法的时候,我们调用的是 ThreadLocalMap类对应的 get()set()方法。

1
2
3
4
5
6
7
8
9
public class Thread implements Runnable {
//......
//与此线程有关的ThreadLocal值。由ThreadLocal类维护
ThreadLocal.ThreadLocalMap threadLocals = null;

//与此线程有关的InheritableThreadLocal值。由InheritableThreadLocal类维护
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
//......
}

ThreadLocal 可以理解为只是 ThreadLocalMap的封装,传递了变量值。每个 Thread中都具备一个 ThreadLocalMap,而 ThreadLocalMap可以存储以 ThreadLocal为 key ,Object 对象为 value 的键值对。

注意:ThreadLocalMap有点类似 HashMap的结构,只是 HashMap是由数组+链表实现的,而 ThreadLocalMap中并没有链表结构。

3. ThreadLocalMap

3.1 Hash 算法

既然是 Map结构,那么 ThreadLocalMap当然也要实现自己的 hash算法来解决散列表数组冲突问题。

1
int i = key.threadLocalHashCode & (len-1);

每当创建一个 ThreadLocal对象,这个 ThreadLocal.nextHashCode 这个值就会增长 0x61c88647

这个值很特殊,它是斐波那契数 也叫 黄金分割数hash增量为 这个数字,带来的好处就是 hash 分布非常均匀

3.2 Hash 冲突

整体是线性探测再散列的方法,即对应数组下标无数据则插入,有数据则向后查找,直至找到无数据或key为null的下标。

注:什么情况下桶才是可以使用的呢?

  • k = key 说明是替换操作,可以使用
  • 碰到一个过期的桶,执行替换逻辑,占用过期桶
  • 查找过程中,碰到桶中 Entry=null的情况,直接使用

3.3 Set 源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Set the value associated with key.
*
* @param key the thread local object
* @param value the value to be set
*/
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
// 通过key来计算在散列表中的对应位置
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();

// k = key,说明当前set操作是一个替换操作,做替换逻辑,直接返回
if (k == key) {
e.value = value;
return;
}

// key = null,说明当前桶位置的Entry是过期数据,需要清理数据
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

// 在Entry为null的桶中创建一个新的Entry对象
tab[i] = new Entry(key, value);
int sz = ++size;

// 清理有限范围内的失效条目,如果找到了失效条目肯定会清理掉同时size--,这时一定不需要扩容。
// 如果没有清理任何失效条目,则size是可能达到阈值(数组长度的 2/3)的,达到阈值则扩容。
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

接着重点看下 replaceStaleEntry()方法,replaceStaleEntry()方法提供替换过期数据的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* 环形数组下标计算(下一个数组元素下标)
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

/**
* 环形数组下标计算(上一个数组元素下标)
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

/**
* Replace a stale entry encountered during a set operation
* with an entry for the specified key. The value passed in
* the value parameter is stored in the entry, whether or not
* an entry already exists for the specified key.
*
* As a side effect, this method expunges all stale entries in the
* "run" containing the stale entry. (A run is a sequence of entries
* between two null slots.)
*
* @param key the key
* @param value the value to be associated with key
* @param staleSlot index of the first stale entry encountered while
* searching for key.
*/
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;

// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
// slotToExpunge表示开始探测式清理过期数据的开始下标,默认从当前的staleSlot开始。
// 以当前的staleSlot开始,向前迭代查找,找到没有过期的数据,for循环一直碰到Entry为null才会结束。
// 如果向前找到了过期数据,更新探测清理过期数据的开始下标为 i,即slotToExpunge=i
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

// Find either the key or trailing null slot of run, whichever
// occurs first
// 接着开始从staleSlot向后查找,也是碰到Entry为null的桶结束。
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
// 如果迭代过程中,碰到 k == key,这说明这里是替换逻辑,替换新数据并且交换当前staleSlot位置
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists
// 如果slotToExpunge == staleSlot,这说明replaceStaleEntry()一开始向前查找过期数据时并未找到过期的Entry数据,
// 接着向后查找过程中也未发现过期数据,修改开始探测式清理过期数据的下标为当前循环的 index,即slotToExpunge = i
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 调用cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);进行过期数据清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
// 如果 k != key则会接着往下走,k == null说明当前遍历的Entry是一个过期数据,
// slotToExpunge == staleSlot说明,一开始的向前查找数据并未找到过期的Entry。
// 如果条件成立,则更新slotToExpunge 为当前位置,这个前提是前驱节点扫描时未发现过期数据。
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// If key not found, put new entry in stale slot
// 往后迭代的过程中如果没有找到k == key的数据,且碰到Entry为null的数据,则结束当前的迭代操作。
// 此时说明这里是一个添加的逻辑,将新的数据添加到table[staleSlot] 对应的slot中。
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// If there are any other stale entries in run, expunge them
// 最后判断除了staleSlot以外,还发现了其他过期的slot数据,就要开启清理数据的逻辑
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

3.4 探测式清理

探测式清理,也就是 expungeStaleEntry方法,遍历散列数组,从开始位置向后探测清理过期数据,将过期数据的 Entry设置为 null,沿途中碰到未过期的数据则将此数据 rehash后重新在 table数组中定位,如果定位的位置已经有了数据,则会将未过期的数据放到最靠近此位置的 Entry=null的桶中,使 rehash后的 Entry数据距离正确的桶的位置更近一些,查询的时候效率才会更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Expunge a stale entry by rehashing any possibly colliding entries
* lying between staleSlot and the next null slot. This also expunges
* any other stale entries encountered before the trailing null. See
* Knuth, Section 6.4
*
* @param staleSlot index of slot known to have null key
* @return the index of the next null slot after staleSlot
* (all between staleSlot and this slot will have been checked
* for expunging).
*/
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;

// expunge entry at staleSlot
// 将tab[staleSlot]槽位的数据清空
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;

// Rehash until we encounter null
Entry e;
int i;
// 以staleSlot位置往后迭代遍历
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// k==null的过期数据,清空该槽位数据
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// key没有过期,重新计算当前key的下标位置
int h = k.threadLocalHashCode & (len - 1);

// 不是当前槽位下标位置,那么说明产生了hash冲突,

if (h != i) {
tab[i] = null;

// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
// 此时以新计算出来正确的槽位位置往后迭代,找到最近一个可以存放entry的位置
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}

3.5 启发式清理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Heuristically scan some cells looking for stale entries.
* This is invoked when either a new element is added, or
* another stale one has been expunged. It performs a
* logarithmic number of scans, as a balance between no
* scanning (fast but retains garbage) and a number of scans
* proportional to number of elements, that would find all
* garbage but would cause some insertions to take O(n) time.
*
* @param i a position known NOT to hold a stale entry. The
* scan starts at the element after i.
*
* @param n scan control: {@code log2(n)} cells are scanned,
* unless a stale entry is found, in which case
* {@code log2(table.length)-1} additional cells are scanned.
* When called from insertions, this parameter is the number
* of elements, but when from replaceStaleEntry, it is the
* table length. (Note: all this could be changed to be either
* more or less aggressive by weighting n instead of just
* using straight log n. But this version is simple, fast, and
* seems to work well.)
*
* @return true if any stale entries have been removed.
*/
/**
* 主要作用:
* 从i位置开始搜索,如果在log2n个连续节点内发现了失效条目(key=null,value<>null)则进行清理失效条目,
* 并且重置n的值为len,如此循环,直到连续log2n个节点都是正常节点时才会跳出while。
* (和expungeStaleEntry(i)的区别就是它的清理范围更长,只要在log2n个连续节点有失效条目,
* 那么你的清理范围就可以一直延长,极端情况下可能会清理整个数组,也可能没有清理掉任何节点)
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}

3.6 扩容

ThreadLocalMap.set()方法的最后,如果执行完启发式清理工作后,未清理到任何数据,且当前散列数组中 Entry的数量已经达到了列表的扩容阈值 (len*2/3),就开始执行 rehash()逻辑。

在真正扩容之前,先尝试回收一次key为null的值,腾出一些空间。

如果回收之后的size大于等于threshold的3/4时,才需要真正的扩容。

也就是说添加数据后,新的size大于等于老size的1/2 (1/2 = 2/3 * 3/4)时,才需要扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Re-pack and/or re-size the table. First scan the entire
* table removing stale entries. If this doesn't sufficiently
* shrink the size of the table, double the table size.
*/
private void rehash() {
expungeStaleEntries();

// Use lower threshold for doubling to avoid hysteresis
// 清理完成之后,table中可能有一些key为null的Entry数据被清理掉,
// 所以此时通过判断size >= threshold - threshold / 4 也就是size >= threshold * 3/4 来决定是否扩容。
if (size >= threshold - threshold / 4)
resize();
}

/**
* Double the capacity of the table.
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;

for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}

setThreshold(newLen);
size = count;
table = newTab;
}
/**
* Expunge all stale entries in the table.
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}

3.7 Get源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Get the entry associated with key. This method
* itself handles only the fast path: a direct hit of existing
* key. It otherwise relays to getEntryAfterMiss. This is
* designed to maximize performance for direct hits, in part
* by making this method readily inlinable.
*
* @param key the thread local object
* @return the entry associated with key, or null if no such
*/
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 命中直接返回
if (e != null && e.get() == key)
return e;
else
//存在哈希冲突
return getEntryAfterMiss(key, i, e);
}

/**
* Version of getEntry method for use when key is not found in
* its direct hash slot.
*
* @param key the thread local object
* @param i the table index for key's hash code
* @param e the entry at table[i]
* @return the entry associated with key, or null if no such
*/
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entrye) {
Entry[] tab = table;
int len = tab.length;

while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
//命中
return e;
if (k == null)
// 清理过期 Entry
expungeStaleEntry(i);
else
// 线性探测:下一个元素
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

4. InheritableThreadLocal

在父线程中调用 new Thread()方法来创建子线程时,Thread#init方法在 Thread的构造方法中被调用,在 init方法中拷贝父线程数据到子线程中。

1
2
3
4
5
6
7
8
9
10
11
12
13
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
tid = nextThreadID();
}

InheritableThreadLocal仍然有缺陷,一般我们做异步化处理都是使用的线程池,而 InheritableThreadLocal是在 new Thread中的 init()方法给赋值的,而线程池是线程复用的逻辑,所以这里会存在问题。

当然,有问题出现就会有解决问题的方案,阿里巴巴开源了一个 TransmittableThreadLocal组件就可以解决这个问题。

5. 存在的问题

5.1 内存泄漏

ThreadLocalMap 中使用的 key 为 ThreadLocal 的弱引用,而 value 是强引用。所以,如果 ThreadLocal 没有被外部强引用的情况下,在垃圾回收的时候,key 会被清理掉,而 value 不会被清理掉。

这样一来,ThreadLocalMap 中就会出现 key 为 null 的 Entry。假如我们不做任何措施的话,value 永远无法被 GC 回收,这个时候就可能会产生内存泄露。ThreadLocalMap 实现中已经考虑了这种情况,在调用 set()get()remove() 方法的时候,会清理掉 key 为 null 的记录,但这种操作具有滞后性,且不是全数组扫描,可能会存在漏掉的情况。

根本解决内存泄漏的问题,就是在使用完 ThreadLocal方法后手动调用 remove()方法。

注: Java四种引用类型

  • 强引用 :我们常常 new 出来的对象就是强引用类型,只要强引用存在,垃圾回收器将永远不会回收被引用的对象,哪怕内存不足的时候
  • 软引用 :使用 SoftReference 修饰的对象被称为软引用,软引用指向的对象在内存要溢出的时候被回收
  • 弱引用 :使用 WeakReference 修饰的对象被称为弱引用,只要发生垃圾回收,若这个对象只被弱引用指向,那么就会被回收
  • 虚引用 :虚引用是最弱的引用,在 Java 中使用 PhantomReference 进行定义。虚引用中唯一的作用就是用队列接收对象即将死亡的通知

(1)如果key为强引用会有什么问题?

我们都知道ThreadLocal变量对ThreadLocal对象是有强引用存在的。

即使ThreadLocal变量生命周期完了,设置成null了,但由于key对ThreadLocal还是强引用。

此时,如果执行该代码的线程使用了线程池,一直长期存在,不会被销毁。

就会存在这样的强引用链:Thread变量 -> Thread对象 -> ThreadLocalMap -> Entry -> key -> ThreadLocal对象。

那么,ThreadLocal对象和ThreadLocalMap都将不会被GC回收,于是产生了内存泄露问题。

(2)Entry的value为什么不设计成弱引用?

Entry的value假如只是被Entry引用,有可能没被业务系统中的其他地方引用。如果将value改成了弱引用,被GC贸然回收了(数据突然没了),可能会导致业务系统出现异常。

参考文献

(1)ThreadLocal 详解:https://javaguide.cn/java/concurrent/threadlocal.html

(2)threadlocal value为什么不是弱引用?https://www.zhihu.com/question/399087116/answer/2679320042

作者

lei.ch1941

发布于

2023-11-02

更新于

2023-11-05

许可协议

评论