/** * Set the value associated with key. * * @param key the thread local object * @param value the value to be set */ privatevoidset(ThreadLocal<?> key, Object value) { // We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not. // 通过key来计算在散列表中的对应位置 Entry[] tab = table; intlen= tab.length; inti= key.threadLocalHashCode & (len-1);
for (Entrye= tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get();
// k = key,说明当前set操作是一个替换操作,做替换逻辑,直接返回 if (k == key) { e.value = value; return; }
/** * 环形数组下标计算(下一个数组元素下标) */ privatestaticintnextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); }
/** * 环形数组下标计算(上一个数组元素下标) */ privatestaticintprevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); }
/** * Replace a stale entry encountered during a set operation * with an entry for the specified key. The value passed in * the value parameter is stored in the entry, whether or not * an entry already exists for the specified key. * * As a side effect, this method expunges all stale entries in the * "run" containing the stale entry. (A run is a sequence of entries * between two null slots.) * * @param key the key * @param value the value to be associated with key * @param staleSlot index of the first stale entry encountered while * searching for key. */ privatevoidreplaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; intlen= tab.length; Entry e;
// Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). // slotToExpunge表示开始探测式清理过期数据的开始下标,默认从当前的staleSlot开始。 // 以当前的staleSlot开始,向前迭代查找,找到没有过期的数据,for循环一直碰到Entry为null才会结束。 // 如果向前找到了过期数据,更新探测清理过期数据的开始下标为 i,即slotToExpunge=i intslotToExpunge= staleSlot; for (inti= prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever // occurs first // 接着开始从staleSlot向后查找,也是碰到Entry为null的桶结束。 for (inti= nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. // 如果迭代过程中,碰到 k == key,这说明这里是替换逻辑,替换新数据并且交换当前staleSlot位置 if (k == key) { e.value = value; tab[i] = tab[staleSlot]; tab[staleSlot] = e; // Start expunge at preceding stale entry if it exists // 如果slotToExpunge == staleSlot,这说明replaceStaleEntry()一开始向前查找过期数据时并未找到过期的Entry数据, // 接着向后查找过程中也未发现过期数据,修改开始探测式清理过期数据的下标为当前循环的 index,即slotToExpunge = i if (slotToExpunge == staleSlot) slotToExpunge = i; // 调用cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);进行过期数据清理 cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; }
// If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. // 如果 k != key则会接着往下走,k == null说明当前遍历的Entry是一个过期数据, // slotToExpunge == staleSlot说明,一开始的向前查找数据并未找到过期的Entry。 // 如果条件成立,则更新slotToExpunge 为当前位置,这个前提是前驱节点扫描时未发现过期数据。 if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; }
// If key not found, put new entry in stale slot // 往后迭代的过程中如果没有找到k == key的数据,且碰到Entry为null的数据,则结束当前的迭代操作。 // 此时说明这里是一个添加的逻辑,将新的数据添加到table[staleSlot] 对应的slot中。 tab[staleSlot].value = null; tab[staleSlot] = newEntry(key, value); // If there are any other stale entries in run, expunge them // 最后判断除了staleSlot以外,还发现了其他过期的slot数据,就要开启清理数据的逻辑 if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
/** * Expunge a stale entry by rehashing any possibly colliding entries * lying between staleSlot and the next null slot. This also expunges * any other stale entries encountered before the trailing null. See * Knuth, Section 6.4 * * @param staleSlot index of slot known to have null key * @return the index of the next null slot after staleSlot * (all between staleSlot and this slot will have been checked * for expunging). */ privateintexpungeStaleEntry(int staleSlot) { Entry[] tab = table; intlen= tab.length;
// Rehash until we encounter null Entry e; int i; // 以staleSlot位置往后迭代遍历 for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get();
// 不是当前槽位下标位置,那么说明产生了hash冲突, if (h != i) { tab[i] = null; // Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. // 此时以新计算出来正确的槽位位置往后迭代,找到最近一个可以存放entry的位置 while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
/** * Heuristically scan some cells looking for stale entries. * This is invoked when either a new element is added, or * another stale one has been expunged. It performs a * logarithmic number of scans, as a balance between no * scanning (fast but retains garbage) and a number of scans * proportional to number of elements, that would find all * garbage but would cause some insertions to take O(n) time. * * @param i a position known NOT to hold a stale entry. The * scan starts at the element after i. * * @param n scan control: {@code log2(n)} cells are scanned, * unless a stale entry is found, in which case * {@code log2(table.length)-1} additional cells are scanned. * When called from insertions, this parameter is the number * of elements, but when from replaceStaleEntry, it is the * table length. (Note: all this could be changed to be either * more or less aggressive by weighting n instead of just * using straight log n. But this version is simple, fast, and * seems to work well.) * * @return true if any stale entries have been removed. */ /** * 主要作用: * 从i位置开始搜索,如果在log2n个连续节点内发现了失效条目(key=null,value<>null)则进行清理失效条目, * 并且重置n的值为len,如此循环,直到连续log2n个节点都是正常节点时才会跳出while。 * (和expungeStaleEntry(i)的区别就是它的清理范围更长,只要在log2n个连续节点有失效条目, * 那么你的清理范围就可以一直延长,极端情况下可能会清理整个数组,也可能没有清理掉任何节点) */ privatebooleancleanSomeSlots(int i, int n) { booleanremoved=false; Entry[] tab = table; intlen= tab.length; do { i = nextIndex(i, len); Entrye= tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; }
/** * Re-pack and/or re-size the table. First scan the entire * table removing stale entries. If this doesn't sufficiently * shrink the size of the table, double the table size. */ privatevoidrehash() { expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis // 清理完成之后,table中可能有一些key为null的Entry数据被清理掉, // 所以此时通过判断size >= threshold - threshold / 4 也就是size >= threshold * 3/4 来决定是否扩容。 if (size >= threshold - threshold / 4) resize(); }
/** * Double the capacity of the table. */ privatevoidresize() { Entry[] oldTab = table; intoldLen= oldTab.length; intnewLen= oldLen * 2; Entry[] newTab = newEntry[newLen]; intcount=0;
for (intj=0; j < oldLen; ++j) { Entrye= oldTab[j]; if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { inth= k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } }
setThreshold(newLen); size = count; table = newTab; } /** * Expunge all stale entries in the table. */ privatevoidexpungeStaleEntries() { Entry[] tab = table; intlen= tab.length; for (intj=0; j < len; j++) { Entrye= tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } }
/** * Get the entry associated with key. This method * itself handles only the fast path: a direct hit of existing * key. It otherwise relays to getEntryAfterMiss. This is * designed to maximize performance for direct hits, in part * by making this method readily inlinable. * * @param key the thread local object * @return the entry associated with key, or null if no such */ private Entry getEntry(ThreadLocal<?> key) { inti= key.threadLocalHashCode & (table.length - 1); Entrye= table[i]; // 命中直接返回 if (e != null && e.get() == key) return e; else //存在哈希冲突 return getEntryAfterMiss(key, i, e); }
/** * Version of getEntry method for use when key is not found in * its direct hash slot. * * @param key the thread local object * @param i the table index for key's hash code * @param e the entry at table[i] * @return the entry associated with key, or null if no such */ private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entrye) { Entry[] tab = table; intlen= tab.length;
while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) //命中 return e; if (k == null) // 清理过期 Entry expungeStaleEntry(i); else // 线性探测:下一个元素 i = nextIndex(i, len); e = tab[i]; } returnnull; }
4. InheritableThreadLocal
在父线程中调用 new Thread()方法来创建子线程时,Thread#init方法在 Thread的构造方法中被调用,在 init方法中拷贝父线程数据到子线程中。
1 2 3 4 5 6 7 8 9 10 11 12 13
privatevoidinit(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { if (name == null) { thrownewNullPointerException("name cannot be null"); }
if (inheritThreadLocals && parent.inheritableThreadLocals != null) this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); this.stackSize = stackSize; tid = nextThreadID(); }
但 InheritableThreadLocal仍然有缺陷,一般我们做异步化处理都是使用的线程池,而 InheritableThreadLocal是在 new Thread中的 init()方法给赋值的,而线程池是线程复用的逻辑,所以这里会存在问题。