复习了一下 JCIP 回顾一下同步工具类的使用
CountDownLatch
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); System.out.println("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); System.out.println("{" + threadNum + "}"); Thread.sleep(100); } }
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }); } //超时之后不阻塞,直接继续运行 countDownLatch.await(10, TimeUnit.MILLISECONDS); System.out.println("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); System.out.println("{" + threadNum + "}"); } }
import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { public long timeTasks(int nThreads, final Runnable task) throws InterruptedException { final CountDownLatch startGate = new CountDownLatch(1); final CountDownLatch endGate = new CountDownLatch(nThreads); for (int i = 0; i < nThreads; i++) { Thread thread = new Thread() { @Override public void run() { try { startGate.await(); try { task.run(); } finally { endGate.countDown(); } } catch (InterruptedException e) { } } }; thread.start(); } long start = System.nanoTime(); startGate.countDown(); endGate.await(); long end = System.nanoTime(); return end - start; } public static void main(String[] args) { int nTask = 10; TestCountDownLatch testCountDownLatch = new TestCountDownLatch(); try { long timeUse = testCountDownLatch.timeTasks(nTask, new Task()); System.out.println(nTask + " use time " + timeUse); } catch (InterruptedException e) { e.printStackTrace(); } } } class Task implements Runnable { @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
CyclicBarrier
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); System.out.println("{" + threadNum + "} is ready "); barrier.await(); System.out.println("{" + threadNum + "} continue "); } }
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); System.out.println("{" + threadNum + "} is ready"); try { //等待超时之后直接放开栅栏,让线程执行,不需要等到规定线程数都就位 barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } System.out.println("{" + threadNum + "} continue"); } }
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierExample3 { //开始之前执行回调函数 private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { System.out.println("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { e.printStackTrace(); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); System.out.println("{" + threadNum + "} is ready"); barrier.await(); System.out.println("{" + threadNum + "} continue"); } }