LockSupport

concurrent包是基于AQS (AbstractQueuedSynchronizer)框架的,AQS框架借助于两个类:

  • Unsafe(提供CAS操作)
  • LockSupport(提供park/unpark操作)

因此,LockSupport非常重要。
两个重点
(1)操作对象
归根结底,LockSupport.park()和LockSupport.unpark(Thread thread)调用的是Unsafe中的native代码:

//LockSupport中
public static void park() {
     UNSAFE.park(false, 0L);
}
//LockSupport中
public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

Unsafe类中的对应方法:

    //park
    public native void park(boolean isAbsolute, long time);
    
    //unpack
    public native void unpark(Object var1);

park函数是将当前调用Thread阻塞,而unpark函数则是将指定线程Thread唤醒。

与Object类的wait/notify机制相比,park/unpark有两个优点:
以thread为操作对象更符合阻塞线程的直观定义
操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。

(2)关于“许可”

在上面的文字中,我使用了阻塞和唤醒,是为了和wait/notify做对比。

其实park/unpark的设计原理核心是“许可”:park是等待一个许可,unpark是为某线程提供一个许可。
如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

有一点比较难理解的,是unpark操作可以再park操作之前。
也就是说,先提供许可。当某线程调用park时,已经有许可了,它就消费这个许可,然后可以继续运行。这其实是必须的。考虑最简单的生产者(Producer)消费者(Consumer)模型:Consumer需要消费一个资源,于是调用park操作等待;Producer则生产资源,然后调用unpark给予Consumer使用的许可。非常有可能的一种情况是,Producer先生产,这时候Consumer可能还没有构造好(比如线程还没启动,或者还没切换到该线程)。那么等Consumer准备好要消费时,显然这时候资源已经生产好了,可以直接用,那么park操作当然可以直接运行下去。如果没有这个语义,那将非常难以操作。

但是这个“许可”是不能叠加的,“许可”是一次性的。
比如线程B连续调用了三次unpark函数,当线程A调用park函数就使用掉这个“许可”,如果线程A再次调用park,则进入等待状态。
继续阅读“LockSupport”

Java ReadWriteLock读写锁的使用

本文将提供Java中的ReadWriteLock和ReentrantReadWriteLock的示例。JDK 1.5中已经引入了ReadWriteLock和ReentrantReadWriteLock。ReentrantReadWriteLock是ReadWriteLock接口的实现,而ReadWriteLock扩展了Lock接口。ReentrantReadWriteLock是具有可重入性的ReadWriteLock的实现。ReentrantReadWriteLock具有关联的读写锁,可以重新获取这些锁。在本文,我们将通过完整的示例讨论ReadWriteLock和ReentrantReadWriteLock。

Lock
JDK 1.5中引入了java.util.concurrent.locks.Lock接口。Lock可以代替使用同步方法,并将有助于更有效的锁定系统。Lock在多线程环境中用于共享资源。Lock作用的方式是,任何线程必须必须首先获得锁才能访问受锁保护的共享资源。一次只有一个线程可以获取锁,一旦其工作完成,它将为队列中其他线程解锁资源。ReadWriteLock是扩展的接口Lock。

ReadWriteLock
JDK 1.5中引入了java.util.concurrent.locks.ReadWriteLock接口。ReadWriteLock是用于读取和写入操作的一对锁。如果没有写锁定请求,则多个线程可以同时获取读锁定请求。如果线程获得了对资源的写锁,则任何线程都无法获得对该资源的其他读或写锁。ReadWriteLock在读操作比写操作更频繁的情况下,效率更高,因为可以由多个线程同时为共享资源获取读锁定。

ReentrantReadWriteLock
JDK 1.5中引入了java.util.concurrent.locks.ReentrantReadWriteLock类。ReentrantReadWriteLock是的实现ReadWriteLock。我们将讨论ReentrantReadWriteLock的一些主要属性。

Acquisition order [获取顺序]
ReentrantReadWriteLock可以使用公平和非公平模式。默认是不公平的。当以非公平初始化时,读锁和写锁的获取的顺序是不确定的。非公平锁主张竞争获取,可能会延缓一个或多个读或写线程,但是会比公平锁有更高的吞吐量。当以公平模式初始化时,线程将会以队列的顺序获取锁。当当前线程释放锁后,等待时间最长的写锁线程就会被分配写锁;或者有一组读线程组等待时间比写线程长,那么这组读线程组将会被分配读锁。

Reentrancy [重入]

什么是可重入锁,不可重入锁呢?”重入”字面意思已经很明显了,就是可以重新进入。可重入锁,就是说一个线程在获取某个锁后,还可以继续获取该锁,即允许一个线程多次获取同一个锁。比如synchronized内置锁就是可重入的,如果A类有2个synchornized方法method1和method2,那么method1调用method2是允许的。显然重入锁给编程带来了极大的方便。假如内置锁不是可重入的,那么导致的问题是:1个类的synchornized方法不能调用本类其他synchornized方法,也不能调用父类中的synchornized方法。与内置锁对应,JDK提供的显示锁ReentrantLock也是可以重入的。

Lock downgrading [锁降级]
ReentrantReadWriteLock可以从写锁降级为读锁。这意味着,如果线程获得了写锁,则可以将其锁从写锁降级为读锁。顺序为:首先获取写锁,执行写操作,然后获取读锁,然后解锁写锁,然后在读操作之后最终解锁读锁。从读锁升级到写锁是不行的。
继续阅读“Java ReadWriteLock读写锁的使用”

CyclicBarrier 使用不当导致死锁问题

先上代码:


import java.util.concurrent.*;

public class App {

	public static ExecutorService pool = Executors.newFixedThreadPool(5);
	public static ExecutorService pool2 = Executors.newCachedThreadPool();

	public static void main(String[] args) {
		// 使用 CyclicBarrier 出现死锁问题
		// new GenTaskUseBarrier(pool, 10).start();
		// 使用 CachedThreadPool 解决 CyclicBarrier 死锁问题
		// new GenTaskUseBarrier(pool2, 10).start();
		// 使用 CountDownLatch 解决 CyclicBarrier 死锁问题
		// new GenTaskUseCountDown(pool, 10).start();

		// 模拟5个并发共享一个线程池
		// new MockConcurrence(pool, 5).start();
	}
}

class MockConcurrence extends Thread {
	ExecutorService pool;
	int count;

	public MockConcurrence(ExecutorService pool, int count) {
		super();
		this.pool = pool;
		this.count = count;
	}

	@Override
	public void run() {
		for (int i = 0; i < count; i++) {
			new GenTaskUseBarrier(pool, 5, false).start();
		}
	}
}

class GenTaskUseCountDown extends Thread {

	ExecutorService pool;

	int taskSize;

	public GenTaskUseCountDown(ExecutorService pool, int taskSize) {
		this.pool = pool;
		this.taskSize = taskSize;
	}

	@Override
	public void run() {
		CountDownLatch latch = new CountDownLatch(taskSize);
		for (int i = 0; i < taskSize; i++) {
			pool.submit(new MyTask(latch));
		}
		try {
			latch.await();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		pool.shutdown();
		System.out.println("All task done!");
	}
}

class GenTaskUseBarrier extends Thread {

	ExecutorService pool;

	int taskSize;

	boolean autoClose = true;

	public GenTaskUseBarrier(ExecutorService pool, int taskSize) {
		this.pool = pool;
		this.taskSize = taskSize;
	}

	public GenTaskUseBarrier(ExecutorService pool, int taskSize, boolean autoClose) {
		this(pool, taskSize);
		this.autoClose = autoClose;
	}

	@Override
	public void run() {
		CyclicBarrier barrier = new CyclicBarrier(taskSize, new Runnable() {
			@Override
			public void run() {
				if (autoClose) {
					pool.shutdown();
				}
				System.out.println("All task done!");
			}
		});
		for (int i = 0; i < taskSize; i++)
			pool.submit(new MyTask(barrier));
	}
}

class MyTask extends Thread {

	CyclicBarrier barrier;

	CountDownLatch latch;

	public MyTask(CyclicBarrier barrier) {
		this.barrier = barrier;
	}

	public MyTask(CountDownLatch latch) {
		this.latch = latch;
	}

	@Override
	public void run() {
		try {
			System.out.println("线程" + Thread.currentThread().getName() + "正在执行同一个任务");
			// 以睡眠来模拟几个线程执行一个任务的时间
			Thread.sleep(1000);
			System.out.println("线程" + Thread.currentThread().getName() + "执行任务完成,等待其他线程执行完毕");
			// 用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;
			if (barrier != null) {
				barrier.await();
			}
			if (latch != null) {
				latch.countDown();
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
		System.out.println("所有线程写入完毕");

	}

}

输出结果:

线程pool-1-thread-3正在执行同一个任务
线程pool-1-thread-4正在执行同一个任务
线程pool-1-thread-2正在执行同一个任务
线程pool-1-thread-1正在执行同一个任务
线程pool-1-thread-5正在执行同一个任务
线程pool-1-thread-2执行任务完成,等待其他线程执行完毕
线程pool-1-thread-3执行任务完成,等待其他线程执行完毕
线程pool-1-thread-4执行任务完成,等待其他线程执行完毕
线程pool-1-thread-1执行任务完成,等待其他线程执行完毕
线程pool-1-thread-5执行任务完成,等待其他线程执行完毕

//卡死在这个地方

原因分析:
1.提交了10个任务,但是只有5个调度线程、每次只能执行一个解析任务;
2.前面5个任务执行后由于调用了 barrier.await 被阻塞了,需要等待其他两个任务都达到栅栏状态;
3.前面5个任务的线程被阻塞了,导致没有空闲的调度线程去执行另外两个任务;
4.前面5个任务等待其他两个任务的栅栏唤醒,而其他5个任务则等待第一个任务的线程资源,从而进入死锁状态。

解决方案 [修改Main方法注释 可以逐个测试]:
1、调整线程池调度线程个数,提交多少个任务开多少个资源,如果并发调用的时候共享同一个线程池调度非常容易出现问题,一定要小心
2、使用 CachedThreadPool,或者自定义线程池
3、更换协作工具类为 CountDownLatch,将主线程阻塞直到所有的解析任务都被执行完成。

StampedLock 学习

StampedLock是并发包里面jdk8版本新增的一个锁,
该锁提供了三种模式的读写控制,三种模式分别如下:

      写锁writeLock,是个排它锁或者叫独占锁,同时只有一个线程可以获取该锁,当一个线程获取该锁后,其它请求的线程必须等待,当目前没有线程持有读锁或者写锁的时候才可以获取到该锁,请求该锁成功后会返回一个stamp票据变量用来表示该锁的版本,当释放该锁时候需要unlockWrite并传递参数stamp。
      悲观读锁readLock,是个共享锁,在没有线程获取独占写锁的情况下,同时多个线程可以获取该锁,如果已经有线程持有写锁,其他线程请求获取该读锁会被阻塞。这里讲的悲观其实是参考数据库中的乐观悲观锁的,这里说的悲观是说在具体操作数据前悲观的认为其他线程可能要对自己操作的数据进行修改,所以需要先对数据加锁,这是在读少写多的情况下的一种考虑,请求该锁成功后会返回一个stamp票据变量用来表示该锁的版本,当释放该锁时候需要unlockRead并传递参数stamp。
      乐观读锁tryOptimisticRead,是相对于悲观锁来说的,在操作数据前并没有通过CAS设置锁的状态,如果当前没有线程持有写锁,则简单的返回一个非0的stamp版本信息,获取该stamp后在具体操作数据前还需要调用validate验证下该stamp是否已经不可用,也就是看当调用tryOptimisticRead返回stamp后到到当前时间间是否有其他线程持有了写锁,如果是那么validate会返回0,否者就可以使用该stamp版本的锁对数据进行操作。由于tryOptimisticRead并没有使用CAS设置锁状态所以不需要显示的释放该锁。该锁的一个特点是适用于读多写少的场景,因为获取读锁只是使用与或操作进行检验,不涉及CAS操作,所以效率会高很多,但是同时由于没有使用真正的锁,在保证数据一致性上需要拷贝一份要操作的变量到方法栈,并且在操作数据时候可能其他写线程已经修改了数据,而我们操作的是方法栈里面的数据,也就是一个快照,所以最多返回的不是最新的数据,但是一致性还是得到保障的。

一个小的案例:
继续阅读“StampedLock 学习”

Disruptor with multiple consumers so that each event is only consumed once?

There are 2 approaches, the first is to use the WorkerPool. The second it to use the ‘striped’ EventHandler approach described below.

If we have 4 handlers and assign each an ordinal (0 through 3), then the consumer need only do a modulo operation using the sequence number and the number of consumers and compare it to its ordinal value.

public final class MyHandler implements EventHandler<ValueEvent>
{
    private final long ordinal;
    private final long numberOfConsumers;

    public MyHandler(final long ordinal, final long numberOfConsumers)
    {
        this.ordinal = ordinal;
        this.numberOfConsumers = numberOfConsumers;
    }

    public void onEvent(final ValueEvent entry, final long sequence, final boolean onEndOfBatch)
    {
        if ((sequence % numberOfConsumers) == ordinal)
        {
            // Process the event
        }
    }
}

Some would ask if one consumer takes too long on a transaction it will block all queued on that ordinal. Technically this is possible but one must consider that the batching effect then kicks in thus saving cost for those behind. With this approach the concurrency costs are so low you may find even small stalls are less costly than a queue based alternative.

DisruptorFrequently-Asked-Questions
并发框架Disruptor译文