常用的并发工具类

###ThreadLocal
理解:借助Thread类的threadLocals属性对应的ThreadLocalMap,将数据缓存在该map中;(换句话说,ThreadLocal将数据缓存在
Thread的ThreadLocalMap中,键为TreadLocal对象,值为要缓存的数据,也就是说ThreadLocal将数据缓存在了当前调用的
线程的Map结构中,而每个线程都有自己的ThreadLocalMap,所以ThreadLocalMap可以发挥的作用就是对于同一个变量,可以为
每个线程维护一份自己的副本,副本数据不受其他线程影响)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// TreadLocal.java - set()
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
// TreadLocal.java - getMap()
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

// Thread类中属性
ThreadLocal.ThreadLocalMap threadLocals = null;

###ThreadLocal可能导致内存泄露的原因
理解:通过下面的代码片段的可以看出ThreadLocalMap是一个Entry节点的数组,而ThreadLocalMap中的Entry节点继承
自WeakReference弱引用,且Entry的key是一个弱引用,value任然是一个强引用;
使用弱引用带来的好处和问题:
当当前ThreadLocal没有强引用指向的时候,Entry节点的key的这个弱引用会被自动回收,此时虽然key被回收了,但是此时value
依然存在,仍然占用着内存不会被回收而导致了内存泄露,因为ThreadLocalMap是Thread的属性,它会随着当前线程的存在而存在,
防止ThreadLocal内存泄露的方法,就是在不需要使用的时候调用remove方法;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// TreadLocal.java中ThreadLocalMap内部类的部分片段
static class ThreadLocalMap {

/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

private Entry[] table;
}

###强引用
理解:我们自定义的class类都是强引用

###软引用
理解:当JVM内存空间不足的时候,会回收软引用指向对象的内存空间

###弱引用
理解:当没有强引用指向弱引用指向的对象时,在下次GC的时候就会被回收

###虚引用
理解:虚引用主要用来跟踪对象何时被垃圾回收器回收的;声明虚引用的时候是要传入一个queue,当你的虚引用所引用的对象被回收时,
会被加入到这个queue中,可以通过判断queue里面是不是有对象来判断对象是不是被回收;
应用场景假设:如果我们在使用堆外内存的时候,对象使用完需要释放内存时,无法通过JVM来回收内存,这个时候我们就可以使用虚引用
来确定回收堆外内存的时机,然后调用Unsafe类的freeMemory(long var1)方法来回收内存;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class MyPhantomReference {
private static final List<Object> LIST = new LinkedList<>();
private static final ReferenceQueue<M> QUEUE = new ReferenceQueue<>();

public static void main(String[] args) {
PhantomReference<M> phantomReference = new PhantomReference<>(new M(), QUEUE);

new Thread(() -> {
while (true) {
LIST.add(new byte[1024 * 1024]);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
System.out.println(phantomReference.get());
}
}).start();

new Thread(() -> {
while (true) {
Reference<? extends M> poll = QUEUE.poll();
if (poll != null) {
System.out.println("--- 虚引用对象被jvm回收了 ---- " + poll);
}
}
}).start();

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

###原子类AtomicXXX

###LongAdder

###ReentrantLock
理解:与synchronized的不同
private Lock lock = new ReentrantLock();
// 相当于创建了producer等待队列,调用producer.await()则进入producer等待队列等待,producer.signalAll()则是唤醒
// producer等待队列的所有线程
private Condition producer = lock.newCondition();
// 相当于创建了consumer等待队列,调用consumer.await()则进入consumer等待队列等待,consumer.signalAll()则是唤醒
// consumer等待队列的所有线程
private Condition consumer = lock.newCondition();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class MyContainer<T> {
final private LinkedList<T> lists = new LinkedList<>();
final private int MAX = 10; //最多10个元素
private int count = 0;

private Lock lock = new ReentrantLock();
private Condition producer = lock.newCondition();
private Condition consumer = lock.newCondition();

public void put(T t) {
try {
lock.lock();
while(lists.size() == MAX) { //想想为什么用while而不是用if?
producer.await();
}

lists.add(t);
++count;
consumer.signalAll(); //通知消费者线程进行消费
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public T get() {
T t = null;
try {
lock.lock();
while(lists.size() == 0) {
consumer.await();
}
t = lists.removeFirst();
count --;
producer.signalAll(); //通知生产者进行生产
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return t;
}

public static void main(String[] args) {
MyContainer2<String> c = new MyContainer2<>();
//启动消费者线程
for(int i=0; i<10; i++) {
new Thread(()->{
for(int j=0; j<5; j++) System.out.println(c.get());
}, "c" + i).start();
}

try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}

//启动生产者线程
for(int i=0; i<2; i++) {
new Thread(()->{
for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
}, "p" + i).start();
}
}
}

###CountDownLatch

###CyclicBarrier

###Phaser

###ReadWriteLock

###Semaphore
理解:限制最多有多个线程同时在运行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class TestSemaphore {
public static void main(String[] args) {
// 创建拥有2个许可的Semaphore对象
// Semaphore s = new Semaphore(2);
// 公平和非公平
Semaphore s = new Semaphore(2, true);
//允许一个线程同时执行
//Semaphore s = new Semaphore(1);

new Thread(()->{
try {
// 从Semaphore对象获取许可,只有获取了许可的线程才能够继续执行
s.acquire();
System.out.println("T1 running...");
Thread.sleep(200);
System.out.println("T1 running...");

} catch (InterruptedException e) {
e.printStackTrace();
} finally {
s.release();
}
}).start();

new Thread(()->{
try {
s.acquire();

System.out.println("T2 running...");
Thread.sleep(200);
System.out.println("T2 running...");

s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

###Exchanger
理解:Exchanger.exchange()方法是阻塞的,交换两个线程的数据;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class TestExchanger {

static Exchanger<String> exchanger = new Exchanger<>();

public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);

}, "t1").start();

new Thread(()->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);

}, "t2").start();
}
}

###LockSupport
理解:相比较与wait()和notify()更加灵活,LockSupport.park()阻塞线程,LockSupport.unpark(t)唤醒指定线程,notifyAll()不能只唤醒指定线程,unpark()方法可以先park()方法执行,可以理解为先发一张通行证,但是这个通行证最多为1,也就是调用
多次unpark()方法也只有一张通行证能够用于解除线程阻塞;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TestLockSupport {
public static void main(String[] args) {
Thread t = new Thread(()->{
for (int i = 0; i < 10; i++) {
System.out.println(i);
if(i == 5) {
LockSupport.park();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
LockSupport.unpark(t);
}
}

###wait和notify
理解:notify()可以唤醒线程但是不会释放锁,wait()会释放锁的;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class NotifyHoldingLock { //wait notify
volatile List lists = new ArrayList();

public void add(Object o) {
lists.add(o);
}

public int size() {
return lists.size();
}

public static void main(String[] args) {
T03_NotifyHoldingLock c = new T03_NotifyHoldingLock();

final Object lock = new Object();

new Thread(() -> {
synchronized(lock) {
System.out.println("t2开始执行");
if(c.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("t2 执行结束");
// 唤醒t1线程
lock.notify();
}
}, "t2").start();

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}

new Thread(() -> {
System.out.println("t1开始执行");
synchronized(lock) {
for(int i=0; i<10; i++) {
c.add(new Object());
System.out.println("add " + i);

if(c.size() == 5) {
// 唤醒t2线程
lock.notify();
// t1线程释放锁
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "t1").start();
}
}

###LockSupport

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class T07_LockSupport_WithoutSleep {

volatile List lists = new ArrayList();

public void add(Object o) {
lists.add(o);
}

public int size() {
return lists.size();
}

static Thread t1 = null, t2 = null;

public static void main(String[] args) {
T07_LockSupport_WithoutSleep c = new T07_LockSupport_WithoutSleep();

t1 = new Thread(() -> {
System.out.println("t1线程开始");
for (int i = 0; i < 10; i++) {
c.add(new Object());
System.out.println("add " + i);
if (c.size() == 5) {
LockSupport.unpark(t2);
LockSupport.park();
}
}
}, "t1");

t2 = new Thread(() -> {
LockSupport.park();
System.out.println("t2线程结束");
LockSupport.unpark(t1);
}, "t2");

t2.start();
t1.start();
}
}