Java CountDownLatch 和 CyclicBarrier 示例

复习了一下 JCIP 回顾一下同步工具类的使用

CountDownLatch


import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchExample1 {

	private final static int threadCount = 200;

	public static void main(String[] args) throws Exception {

		ExecutorService exec = Executors.newCachedThreadPool();

		final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

		for (int i = 0; i < threadCount; i++) {
			final int threadNum = i;
			exec.execute(() -> {
				try {
					test(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					countDownLatch.countDown();
				}
			});
		}
		countDownLatch.await();
		System.out.println("finish");
		exec.shutdown();
	}

	private static void test(int threadNum) throws Exception {
		Thread.sleep(100);
		System.out.println("{" + threadNum + "}");
		Thread.sleep(100);
	}
}

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchExample2 {
	private final static int threadCount = 200;

	public static void main(String[] args) throws Exception {

		ExecutorService exec = Executors.newCachedThreadPool();

		final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

		for (int i = 0; i < threadCount; i++) {
			final int threadNum = i;
			exec.execute(() -> {
				try {
					test(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				} finally {
					countDownLatch.countDown();
				}
			});
		}
                //超时之后不阻塞,直接继续运行
		countDownLatch.await(10, TimeUnit.MILLISECONDS);
		System.out.println("finish");
		exec.shutdown();
	}

	private static void test(int threadNum) throws Exception {
		Thread.sleep(100);
		System.out.println("{" + threadNum + "}");
	}
}
import java.util.concurrent.CountDownLatch;

public class TestCountDownLatch {

	public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {

		final CountDownLatch startGate = new CountDownLatch(1);
		final CountDownLatch endGate = new CountDownLatch(nThreads);

		for (int i = 0; i < nThreads; i++) {
			Thread thread = new Thread() {
				@Override
				public void run() {
					try {
						startGate.await();
						try {
							task.run();
						} finally {
							endGate.countDown();
						}
					} catch (InterruptedException e) {

					}

				}
			};
			thread.start();
		}

		long start = System.nanoTime();
		startGate.countDown();
		endGate.await();
		long end = System.nanoTime();
		return end - start;
	}

	public static void main(String[] args) {
		int nTask = 10;
		TestCountDownLatch testCountDownLatch = new TestCountDownLatch();
		try {
			long timeUse = testCountDownLatch.timeTasks(nTask, new Task());
			System.out.println(nTask + " use time " + timeUse);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

	}

}

class Task implements Runnable {

	@Override
	public void run() {
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

CyclicBarrier

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample1 {

	private static CyclicBarrier barrier = new CyclicBarrier(5);

	public static void main(String[] args) throws Exception {

		ExecutorService executor = Executors.newCachedThreadPool();

		for (int i = 0; i < 10; i++) {
			final int threadNum = i;
			Thread.sleep(1000);
			executor.execute(() -> {
				try {
					race(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
		executor.shutdown();
	}

	private static void race(int threadNum) throws Exception {
		Thread.sleep(1000);
		System.out.println("{" + threadNum + "} is ready ");
		barrier.await();
		System.out.println("{" + threadNum + "} continue ");
	}
}
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierExample2 {
	private static CyclicBarrier barrier = new CyclicBarrier(5);

	public static void main(String[] args) throws Exception {

		ExecutorService executor = Executors.newCachedThreadPool();

		for (int i = 0; i < 10; i++) {
			final int threadNum = i;
			Thread.sleep(1000);
			executor.execute(() -> {
				try {
					race(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
		executor.shutdown();
	}

	private static void race(int threadNum) throws Exception {
		Thread.sleep(1000);
		System.out.println("{" + threadNum + "} is ready");
		try {
                        //等待超时之后直接放开栅栏,让线程执行,不需要等到规定线程数都就位
			barrier.await(2000, TimeUnit.MILLISECONDS);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("{" + threadNum + "} continue");
	}
}
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierExample3 {
        //开始之前执行回调函数
	private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
		System.out.println("callback is running");
	});

	public static void main(String[] args) throws Exception {

		ExecutorService executor = Executors.newCachedThreadPool();

		for (int i = 0; i < 10; i++) {
			final int threadNum = i;
			Thread.sleep(1000);
			executor.execute(() -> {
				try {
					race(threadNum);
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
		executor.shutdown();
	}

	private static void race(int threadNum) throws Exception {
		Thread.sleep(1000);
		System.out.println("{" + threadNum + "} is ready");
		barrier.await();
		System.out.println("{" + threadNum + "} continue");
	}
}

LockSupport

concurrent包是基于AQS (AbstractQueuedSynchronizer)框架的,AQS框架借助于两个类:

  • Unsafe(提供CAS操作)
  • LockSupport(提供park/unpark操作)

因此,LockSupport非常重要。
两个重点
(1)操作对象
归根结底,LockSupport.park()和LockSupport.unpark(Thread thread)调用的是Unsafe中的native代码:

//LockSupport中
public static void park() {
     UNSAFE.park(false, 0L);
}
//LockSupport中
public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

Unsafe类中的对应方法:

    //park
    public native void park(boolean isAbsolute, long time);
    
    //unpack
    public native void unpark(Object var1);

park函数是将当前调用Thread阻塞,而unpark函数则是将指定线程Thread唤醒。

与Object类的wait/notify机制相比,park/unpark有两个优点:
以thread为操作对象更符合阻塞线程的直观定义
操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。

(2)关于“许可”

在上面的文字中,我使用了阻塞和唤醒,是为了和wait/notify做对比。

其实park/unpark的设计原理核心是“许可”:park是等待一个许可,unpark是为某线程提供一个许可。
如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

有一点比较难理解的,是unpark操作可以再park操作之前。
也就是说,先提供许可。当某线程调用park时,已经有许可了,它就消费这个许可,然后可以继续运行。这其实是必须的。考虑最简单的生产者(Producer)消费者(Consumer)模型:Consumer需要消费一个资源,于是调用park操作等待;Producer则生产资源,然后调用unpark给予Consumer使用的许可。非常有可能的一种情况是,Producer先生产,这时候Consumer可能还没有构造好(比如线程还没启动,或者还没切换到该线程)。那么等Consumer准备好要消费时,显然这时候资源已经生产好了,可以直接用,那么park操作当然可以直接运行下去。如果没有这个语义,那将非常难以操作。

但是这个“许可”是不能叠加的,“许可”是一次性的。
比如线程B连续调用了三次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A再次调用park,则进入等待状态。
继续阅读“LockSupport”

Java ReadWriteLock读写锁的使用

本文将提供Java中的ReadWriteLock和ReentrantReadWriteLock的示例。JDK 1.5中已经引入了ReadWriteLock和ReentrantReadWriteLock。ReentrantReadWriteLock是ReadWriteLock接口的实现,而ReadWriteLock扩展了Lock接口。ReentrantReadWriteLock是具有可重入性的ReadWriteLock的实现。ReentrantReadWriteLock具有关联的读写锁,可以重新获取这些锁。在本文,我们将通过完整的示例讨论ReadWriteLock和ReentrantReadWriteLock。

Lock
JDK 1.5中引入了java.util.concurrent.locks.Lock接口。Lock可以代替使用同步方法,并将有助于更有效的锁定系统。Lock在多线程环境中用于共享资源。Lock作用的方式是,任何线程必须必须首先获得锁才能访问受锁保护的共享资源。一次只有一个线程可以获取锁,一旦其工作完成,它将为队列中其他线程解锁资源。ReadWriteLock是扩展的接口Lock。

ReadWriteLock
JDK 1.5中引入了java.util.concurrent.locks.ReadWriteLock接口。ReadWriteLock是用于读取和写入操作的一对锁。如果没有写锁定请求,则多个线程可以同时获取读锁定请求。如果线程获得了对资源的写锁,则任何线程都无法获得对该资源的其他读或写锁。ReadWriteLock在读操作比写操作更频繁的情况下,效率更高,因为可以由多个线程同时为共享资源获取读锁定。

ReentrantReadWriteLock
JDK 1.5中引入了java.util.concurrent.locks.ReentrantReadWriteLock类。ReentrantReadWriteLock是的实现ReadWriteLock。我们将讨论ReentrantReadWriteLock的一些主要属性。

Acquisition order [获取顺序]
ReentrantReadWriteLock可以使用公平和非公平模式。默认是不公平的。当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。

Reentrancy [重入]

什么是可重入锁,不可重入锁呢?”重入”字面意思已经很明显了,就是可以重新进入。可重入锁,就是说一个线程在获取某个锁后,还可以继续获取该锁,即允许一个线程多次获取同一个锁。比如synchronized内置锁就是可重入的,如果A类有2个synchornized方法method1和method2,那么method1调用method2是允许的。显然重入锁给编程带来了极大的方便。假如内置锁不是可重入的,那么导致的问题是:1个类的synchornized方法不能调用本类其他synchornized方法,也不能调用父类中的synchornized方法。与内置锁对应,JDK提供的显示锁ReentrantLock也是可以重入的。

Lock downgrading [锁降级]
ReentrantReadWriteLock可以从写锁降级为读锁。这意味着,如果线程获得了写锁,则可以将其锁从写锁降级为读锁。顺序为:首先获取写锁,执行写操作,然后获取读锁,然后解锁写锁,然后在读操作之后最终解锁读锁。从读锁升级到写锁是不行的。
继续阅读“Java ReadWriteLock读写锁的使用”

CyclicBarrier 使用不当导致死锁问题

先上代码:


import java.util.concurrent.*;

public class App {

	public static ExecutorService pool = Executors.newFixedThreadPool(5);
	public static ExecutorService pool2 = Executors.newCachedThreadPool();

	public static void main(String[] args) {
		// 使用 CyclicBarrier 出现死锁问题
		// new GenTaskUseBarrier(pool, 10).start();
		// 使用 CachedThreadPool 解决 CyclicBarrier 死锁问题
		// new GenTaskUseBarrier(pool2, 10).start();
		// 使用 CountDownLatch 解决 CyclicBarrier 死锁问题
		// new GenTaskUseCountDown(pool, 10).start();

		// 模拟5个并发共享一个线程池
		// new MockConcurrence(pool, 5).start();
	}
}

class MockConcurrence extends Thread {
	ExecutorService pool;
	int count;

	public MockConcurrence(ExecutorService pool, int count) {
		super();
		this.pool = pool;
		this.count = count;
	}

	@Override
	public void run() {
		for (int i = 0; i < count; i++) {
			new GenTaskUseBarrier(pool, 5, false).start();
		}
	}
}

class GenTaskUseCountDown extends Thread {

	ExecutorService pool;

	int taskSize;

	public GenTaskUseCountDown(ExecutorService pool, int taskSize) {
		this.pool = pool;
		this.taskSize = taskSize;
	}

	@Override
	public void run() {
		CountDownLatch latch = new CountDownLatch(taskSize);
		for (int i = 0; i < taskSize; i++) {
			pool.submit(new MyTask(latch));
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		pool.shutdown();
		System.out.println("All task done!");
	}
}

class GenTaskUseBarrier extends Thread {

	ExecutorService pool;

	int taskSize;

	boolean autoClose = true;

	public GenTaskUseBarrier(ExecutorService pool, int taskSize) {
		this.pool = pool;
		this.taskSize = taskSize;
	}

	public GenTaskUseBarrier(ExecutorService pool, int taskSize, boolean autoClose) {
		this(pool, taskSize);
		this.autoClose = autoClose;
	}

	@Override
	public void run() {
		CyclicBarrier barrier = new CyclicBarrier(taskSize, new Runnable() {
			@Override
			public void run() {
				if (autoClose) {
					pool.shutdown();
				}
				System.out.println("All task done!");
			}
		});
		for (int i = 0; i < taskSize; i++)
			pool.submit(new MyTask(barrier));
	}
}

class MyTask extends Thread {

	CyclicBarrier barrier;

	CountDownLatch latch;

	public MyTask(CyclicBarrier barrier) {
		this.barrier = barrier;
	}

	public MyTask(CountDownLatch latch) {
		this.latch = latch;
	}

	@Override
	public void run() {
		try {
			System.out.println("线程" + Thread.currentThread().getName() + "正在执行同一个任务");
			// 以睡眠来模拟几个线程执行一个任务的时间
			Thread.sleep(1000);
			System.out.println("线程" + Thread.currentThread().getName() + "执行任务完成,等待其他线程执行完毕");
			// 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
			if (barrier != null) {
				barrier.await();
			}
			if (latch != null) {
				latch.countDown();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
		System.out.println("所有线程写入完毕");

	}

}

输出结果:

线程pool-1-thread-3正在执行同一个任务
线程pool-1-thread-4正在执行同一个任务
线程pool-1-thread-2正在执行同一个任务
线程pool-1-thread-1正在执行同一个任务
线程pool-1-thread-5正在执行同一个任务
线程pool-1-thread-2执行任务完成,等待其他线程执行完毕
线程pool-1-thread-3执行任务完成,等待其他线程执行完毕
线程pool-1-thread-4执行任务完成,等待其他线程执行完毕
线程pool-1-thread-1执行任务完成,等待其他线程执行完毕
线程pool-1-thread-5执行任务完成,等待其他线程执行完毕

//卡死在这个地方

原因分析:
1.提交了10个任务,但是只有5个调度线程、每次只能执行一个解析任务;
2.前面5个任务执行后由于调用了 barrier.await 被阻塞了,需要等待其他两个任务都达到栅栏状态;
3.前面5个任务的线程被阻塞了,导致没有空闲的调度线程去执行另外两个任务;
4.前面5个任务等待其他两个任务的栅栏唤醒,而其他5个任务则等待第一个任务的线程资源,从而进入死锁状态。

解决方案 [修改Main方法注释 可以逐个测试]:
1、调整线程池调度线程个数,提交多少个任务开多少个资源,如果并发调用的时候共享同一个线程池调度非常容易出现问题,一定要小心
2、使用 CachedThreadPool,或者自定义线程池
3、更换协作工具类为 CountDownLatch,将主线程阻塞直到所有的解析任务都被执行完成。

JAVA 引用学习

JDK1.2 之前,Java 中引用的定义很传统:如果 reference 类型的数据存储的数值代表的是另一块内存的起始地址,就称这块内存代表一个引用。

JDK1.2 以后,Java 对引用的概念进行了扩充,将引用分为强引用、软引用、弱引用、虚引用四种(引用强度逐渐减弱)

1.强引用

以前我们使用的大部分引用实际上都是强引用,这是使用最普遍的引用。如果一个对象具有强引用,那就类似于必不可少的生活用品,垃圾回收器绝不会回收它。当内存空间不足,Java 虚拟机宁愿抛出 OutOfMemoryError 错误,使程序异常终止,也不会靠随意回收具有强引用的对象来解决内存不足问题。

2.软引用(SoftReference)

如果一个对象只具有软引用,那就类似于可有可无的生活用品。如果内存空间足够,垃圾回收器就不会回收它,如果内存空间不足了,就会回收这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。软引用可用来实现内存敏感的高速缓存。

软引用可以和一个引用队列(ReferenceQueue)联合使用,如果软引用所引用的对象被垃圾回收,JAVA 虚拟机就会把这个软引用加入到与之关联的引用队列中。

3.弱引用(WeakReference)

如果一个对象只具有弱引用,那就类似于可有可无的生活用品。弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程, 因此不一定会很快发现那些只具有弱引用的对象。

弱引用可以和一个引用队列(ReferenceQueue)联合使用,如果弱引用所引用的对象被垃圾回收,Java 虚拟机就会把这个弱引用加入到与之关联的引用队列中。

4.虚引用(PhantomReference)

“虚引用”顾名思义,就是形同虚设,与其他几种引用都不同,虚引用并不会决定对象的生命周期。如果一个对象仅持有虚引用,那么它就和没有任何引用一样,在任何时候都可能被垃圾回收。

虚引用主要用来跟踪对象被垃圾回收的活动。

虚引用与软引用和弱引用的一个区别在于: 虚引用必须和引用队列(ReferenceQueue)联合使用。当垃圾回收器准备回收一个对象时,如果发现它还有虚引用,就会在回收对象的内存之前,把这个虚引用加入到与之关联的引用队列中。程序可以通过判断引用队列中是否已经加入了虚引用,来了解被引用的对象是否将要被垃圾回收。程序如果发现某个虚引用已经被加入到引用队列,那么就可以在所引用的对象的内存被回收之前采取必要的行动。

特别注意,在程序设计中一般很少使用弱引用与虚引用,使用软引用的情况较多,这是因为软引用可以加速 JVM 对垃圾内存的回收速度,可以维护系统的运行安全,防止内存溢出(OutOfMemory)等问题的产生。

继续阅读“JAVA 引用学习”

ThreadLocal可能引起的内存泄露

threadlocal里面使用了一个存在弱引用的map,当释放掉threadlocal的强引用以后,map里面的value却没有被回收.而这块value永远不会被访问到了. 所以存在着内存泄露.
** 最好的做法是将调用threadlocal的remove方法.**: 把当前ThreadLocal从当前线程的ThreadLocalMap中移除。(包括key,value)

/**
 * Remove the entry for key.
 */
private void remove(ThreadLocal<?> key) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        if (e.get() == key) {
            e.clear();// 将entry的对threadlocal的引用赋值为null
            expungeStaleEntry(i);//将 entry的value赋值为null
            return;
        }
    }
}

在threadlocal的生命周期中,都存在这些引用. 看下图: 实线代表强引用,虚线代表弱引用

继续阅读“ThreadLocal可能引起的内存泄露”

StampedLock 学习

StampedLock是并发包里面jdk8版本新增的一个锁,
该锁提供了三种模式的读写控制,三种模式分别如下:

      写锁writeLock,是个排它锁或者叫独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其它请求的线程必须等待,当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个stamp票据变量用来表示该锁的版本,当释放该锁时候需要unlockWrite并传递参数stamp。
      悲观读锁readLock,是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁,如果已经有线程持有写锁,其他线程请求获取该读锁会被阻塞。这里讲的悲观其实是参考数据库中的乐观悲观锁的,这里说的悲观是说在具体操作数据前悲观的认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个stamp票据变量用来表示该锁的版本,当释放该锁时候需要unlockRead并传递参数stamp。
      乐观读锁tryOptimisticRead,是相对于悲观锁来说的,在操作数据前并没有通过CAS设置锁的状态,如果当前没有线程持有写锁,则简单的返回一个非0的stamp版本信息,获取该stamp后在具体操作数据前还需要调用validate验证下该stamp是否已经不可用,也就是看当调用tryOptimisticRead返回stamp后到到当前时间间是否有其他线程持有了写锁,如果是那么validate会返回0,否者就可以使用该stamp版本的锁对数据进行操作。由于tryOptimisticRead并没有使用CAS设置锁状态所以不需要显示的释放该锁。该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用与或操作进行检验,不涉及CAS操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。

一个小的案例:
继续阅读“StampedLock 学习”

关于ThreadLocal的一些反思

ThreadLocal 是一个用于提供线程局部变量的工具类,它主要用于将私有线程和该线程存放的副本对象做一个映射,各个线程之间的变量互不干扰,在高并发场景下,可以实现无状态的调用,特别适用于各个线程依赖不通的变量值完成操作的场景。例如Spring 中的单例Bean在多线程调用的时候,并行计算的时候各个线程缓存自己的中间计算结果等场景。

ThreadLocal 是解决线程安全问题一个很好的思路,它通过为每一个线程提供一个独立的变量副本,从而隔离了多个线程对数据的访问冲突。在很多情况下,ThreadLocal 比直接使用 synchronized 同步机制解决线程安全问题更简单,更方便,且结果程序拥有更高的并发性。

class PrintRunnable extends Runnable {
  val number = new ThreadLocal[Double]

  override def run(): Unit = {
    number.set(Math.random())
    println(number.get())
  }
}

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val printRunnable = new PrintRunnable

    val thread1 = new Thread(printRunnable)
    val thread2 = new Thread(printRunnable)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()
  }
}
0.5157676349493098
0.37557496403907353

但是在父子线程要传递一些变量的时候会发现数据不能共享,需要 InheritableThreadLocal 来实现父子线程变量的传递。

class PrintRunnable extends Runnable {
  val number = new ThreadLocal[Double]

  override def run(): Unit = {
    number.set(Math.random())
    println(number.get())
    
    val childThread = new Thread(new Runnable {
      override def run(): Unit = {
        println(number.get())
      }
    })
    childThread.start()
    childThread.join()
  }
}

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val printRunnable = new PrintRunnable

    val thread1 = new Thread(printRunnable)
    val thread2 = new Thread(printRunnable)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()
  }
}
0.5475226099407153
0.8376546404552231
null
null

正确的做法:

class PrintRunnable extends Runnable {
  val number = new InheritableThreadLocal[Double]

  override def run(): Unit = {
    number.set(Math.random())
    println(number.get())

    val childThread = new Thread(new Runnable {
      override def run(): Unit = {
        println(number.get())
      }
    })
    childThread.start()
    childThread.join()
  }
}

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val printRunnable = new PrintRunnable

    val thread1 = new Thread(printRunnable)
    val thread2 = new Thread(printRunnable)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()
  }
}
0.006425375134899158
0.021932306310074368
0.006425375134899158
0.021932306310074368

在子线程中不能修改父线程的变量:

class PrintRunnable extends Runnable {
  val number = new InheritableThreadLocal[Double]

  override def run(): Unit = {
    number.set(Math.random())
    println(number.get())

    val childThread = new Thread(new Runnable {
      override def run(): Unit = {
        println(number.get())
        number.set(0.1)
      }
    })
    childThread.start()
    childThread.join()
    println(number.get())
  }
}

object SimpleApp {
  def main(args: Array[String]): Unit = {
    val printRunnable = new PrintRunnable
    val thread1 = new Thread(printRunnable)
    thread1.start()
    thread1.join()
  }
}
0.7413853012849937
0.7413853012849937
0.7413853012849937

但是现实开发中我们更多的是使用线程池管理线程,线程复用怎么解决ThreadLocal变量的传递或者跨线程池传递。如下面场景:分布式跟踪系统,日志收集记录系统上下文,Session级Cache,应用容器或上层框架跨应用代码给下层SDK传递信息。
可以看一下这个开源项目transmittable-thread-local

关于缓存的反思

目前缓存的解决方案一般有两种:

内存缓存(如 Ehcache) —— 速度快,进程内可用

集中式缓存(如 Redis)—— 可同时为多节点提供服务

使用内存缓存时,一旦应用重启后,由于缓存数据丢失,缓存雪崩,给数据库造成巨大压力,导致应用堵塞

使用内存缓存时,多个应用节点无法共享缓存数据

使用集中式缓存,由于大量的数据通过缓存获取,导致缓存服务的数据吞吐量太大,带宽跑满。现象就是 Redis 服务负载不高,但是由于机器网卡带宽跑满,导致数据读取非常慢

在遭遇问题1、2 时,很多人自然而然会想到使用 Redis 来缓存数据,因此就难以避免的导致了问题3的发生。

当发生问题 3 时,又有很多人想到 Redis 的集群,通过集群来降低缓存服务的压力,特别是带宽压力。

但其实,这个时候的 Redis 上的数据量并不一定大,仅仅是数据的吞吐量大而已。

个人觉得: 可以使用2级缓存,优先使用本地,再使用redis或者memcache的缓存,本地防止OOM可以做一些淘汰机制,刷新到redis或者memcahe中,当然这种情况就需要一些精巧的机制保证2级缓存的一致性。

节点间数据同步的方案 —— Redis Pub/Sub 和 JGroups 。当某个节点的缓存数据需要更新时,会通过 Redis 的消息订阅机制或者是 JGroups 的组播来通知集群内其他节点。当其他节点收到缓存数据更新的通知时,它会清掉自己内存里的数据,然后重新从 Redis 中读取最新数据。

为什么不用 Ehcache 的集群方案?

对 Ehcache 比较熟悉的人还会问的就是这个问题,Ehcache 本身是提供集群模式的,可以在多个节点同步缓存数据。但是 Ehcache 的做法是将整个缓存数据在节点间进行传输。如咱们前面的说的,一个页面需要读取 50K 的缓存数据,当这 50K 的缓存数据有更新时,那么需要在几个节点间传递整个 50K 的数据。这也会造成应用节点间大量的数据传输,这个情况完全不可控。

补充:当然这个单个数据传输量本身没有差别,但是 ehcache 利用 jgroups 来同步数据的做法,在实际测试过程中发现可靠性还是略低,而且 jgroups 的同步数据在云主机上无法使用。

订阅和通知模式传输的仅仅是缓存的 key 而已,因此相比 Ehcache 的集群模式,使用通知机制要传输的数据极其小,对节点间的数据通信完全不会产生大的影响。