常见的并发工具类

ConcurrentSkipListMap

基本概念

ConturrentSkipListMap是在jdk1.6中新增的,为了对高并发场景下有序的map提供更好的支持

  • 高并发场景
  • key是有序的
  • 添加,删除,查找操作都是基于跳表结构(skip List)实现的
  • key和value都不能为null

跳表

跳表SkipList是一种类似于链表的结构,其查询,插入,删除的时间复杂度都是O(logn)

跳表结合了树和链表的特性

  1. 跳表由很多层组成;
  2. 每一层都是一个有序的链表;
  3. 最底层的链表包含所有元素;
  4. 对于每一层的任意一个节点,不仅有指向其下一个节点的指针,也有指向其下一层的指针;
  5. 如果一个元素出现在Level n层的链表中,则它在Level n层以下的链表也都会出现。

跳表实际上是一种空间换时间的数据结构

ConcurrentSkipListMap结构

ConcurrentSkipListMap用到了俩种结构的节点

Node节点代表了真正存储数据的节点,包含了key,value,指向下一个节点的指针next

static final class Node<K,V> {
    final K key;     // 键
    V val;           // 值
    Node<K,V> next;  // 指向下一个节点的指针
    Node(K key, V value, Node<K,V> next) {
        this.key = key;
        this.val = value;
        this.next = next;
    }
}

Index节点代表了跳表的层级,包含了当前Node,下一层down,当前层的下一个节点

static final class Index<K,V> {
    final Node<K,V> node;   // 当前节点
    final Index<K,V> down;  // 下一层
    Index<K,V> right;       // 当前层的下一个节点
    Index(Node<K,V> node, Index<K,V> down, Index<K,V> right) {
        this.node = node;
        this.down = down;
        this.right = right;
    }
}

如图所示,Node节点将真实的数据按顺序链接起来,Index节点组成了跳表中多层次的索引结构。

例子

public class ConcurrentSkipListMapTest {

    public static void main(String[] args) {
        ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
        map.put(4, "4");
        map.put(5, "5");
        map.put(1, "1");
        map.put(6, "6");
        map.put(2, "2");
        
        System.out.println(map.keySet());
        System.out.println(map);
        System.out.println(map.descendingKeySet());
        System.out.println(map.descendingMap());
    }
}

输出内容

[1, 2, 4, 5, 6]
{1=1, 2=2, 4=4, 5=5, 6=6}
[6, 5, 4, 2, 1]
{6=6, 5=5, 4=4, 2=2, 1=1}

ConcurrentSkipListSet

ConcurrentSkipListSet是线程安全的有序的集合,适用于高并发的场景。

ConcurrentSkipListSet与ConcurrentSkipListMap的关系,与HashSET与HashMap的关系类似,就是采用“组合”的方式: ConcurrentSkipListSet组合了一个ConcurrentSkipListMap,将元素作为 ConcurrentSkipListMap的key存放。

CountDownLatch

CountDownLatch是一个同步工具类,它通过一个计数器来实现的,初始值的线程就响应的-1。当计数器达到0时,表示所有线程都已执行完毕,然后在等待的线程就可以恢复执行的任务

CountDownLatch(int count):count为计数器的初始值(一般需要多少个线程执行,count就设为几)。
countDown(): 每调用一次计数器值-1,直到count被减为0,代表所有线程全部执行完毕。
getCount():获取当前计数器的值。
await(): 等待计数器变为0,即等待所有异步线程执行完毕。
boolean await(long timeout, TimeUnit unit): 此方法与await()区别:
①此方法至多会等待指定的时间,超时后会自动唤醒,若 timeout 小于等于零,则不会等待
②boolean 类型返回值:若计数器变为零了,则返回 true;若指定的等待时间过去了,则返回 false

应用场景

某个线程需要在其他n个线程执行完毕后再向下执行

多个线程并行执行同一个任务,提高响应速度

/**
 * @author liuhuifang
 * @date 2022/5/26 11:31
 */
public class CountDownLaunchTest {

    public static void main(String[] args) throws InterruptedException {
        List list = new ArrayList();
        for(int i = 1; i<=100; i++){
            list.add(i);
        }
        Long start = System.currentTimeMillis();
        for(int i = 0; i<list.size(); i++){
            Thread.sleep(100);
        }
        System.out.println("=====同步执行:耗时"+(System.currentTimeMillis()-start)+"毫秒======");
        Long start1 = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(10);
        for(int i = 0; i<latch.getCount(); i++){
            new Thread(new Test(latch, i, list)).start();
        }
        latch.await();
        System.out.println("=====异步执行:耗时"+(System.currentTimeMillis()-start1)+"毫秒======");
    }

    static class Test implements Runnable{
        private CountDownLatch latch;
        private int i;
        private List list;
        Test(CountDownLatch latch, int i, List list){
            this.latch = latch;
            this.i = i;
            this.list = list;
        }

        @SneakyThrows
        @Override
        public void run() {
            for(int a = i*10; a<(i+1)*10; a++){
                // 执行任务逻辑
                Thread.sleep(100);
            }
            latch.countDown();
        }
    }
}

源码

CountDownLatch的底层是由AQS支持,而AQS的数据结构的核心就是俩个队列,一个是同步队列(sync queue),一个是条件队列(condition queue)

Sync内部类

countDownLatch内部是一个Sync,集成了AQS抽象类

private static final class Sync extends AbstractQueuedSynchronizer {}

countdownLatch内部其实只有一个sync属性,并且是final的

private final Sync sync;

CountDownLatch 只有一个带参数的构造方法

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
  this.sync = new Sync(count);
}

也就是说,初始化的时候必须指定计数器的数量,如果数量为负会直接抛出异常。

然后把 count 初始化为 Sync 内部的 count,也就是

注意这里有一个 setState(count),这是什么意思呢?见闻知意这只是一个设置状态的操作,但是实际上不单单是,还有一层意思是 state 的值代表着待达到条件的线程数。这个我们在聊 countDown 方法的时候再讨论。

getCount()方法返回值是getState()方法,它是AQS中的方法,这个方法会返回当前线程计数,具有volatile读取的内存语义

int getCount() {
  return getState();
}
// ---- AbstractQueuedSynchronizer ----
protected final int getState() {
  return state;
}

tryAcquireShared() 方法用于获取·共享状态下对象的状态,判断对象是否为 0 ,如果为 0 返回 1 ,表示能够尝试获取,如果不为 0,那么返回 -1,表示无法获取。

tryAcquireShared() 方法用于获取·共享状态下对象的状态,判断对象是否为 0 ,如果为 0 返回 1 ,表示能够尝试获取,如果不为 0,那么返回 -1,表示无法获取。

protected int tryAcquireShared(int acquires) {
  return (getState() == 0) ? 1 : -1;
}

这个共享状态属于AQS中的概念,在AQS中分为俩种模式,一种是独占模式,一种是共享模式

tryAcquire独占模式,尝试获取资源,成功返回true,失败返回false

tryAcquireShared共享锁的方式,尝试获取资源.负数表示失败,0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源.

tryReleaseShared() 方法用于共享模式下的释放

protected boolean tryReleaseShared(int releases) {
  // 减小数量,变为 0 的时候进行通知。
  for (;;) {
    int c = getState();
    if (c == 0)
      return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
      return nextc == 0;
  }
}

这个方法是一个无限循环,获取线程状态,如果线程状态是 0 则表示没有被线程占有,没有占有的话那么直接返回 false ,表示已经释放;然后下一个状态进行 - 1 ,使用 compareAndSetState CAS 方法进行和内存值的比较,如果内存值也是 1 的话,就会更新内存值为 0 ,判断 nextc 是否为 0 ,如果 CAS 比较不成功的话,会再次进行循环判断。

await方法

await方法是CountDownLatch一个非常重要的方法,这个方法使当前线程在CountDownLatch计数减至0之前,一直等待,除非线程被中断

CountDownLatch 中的 await 方法有两种,一种是不带任何参数的 await(),一种是可以等待一段时间的await(long timeout, TimeUnit unit)。下面我们先来看一下 await() 方法。

public void await() throws InterruptedException {
  sync.acquireSharedInterruptibly(1);
}

await 方法内部会调用 acquireSharedInterruptibly 方法,这个 acquireSharedInterruptibly 是 AQS 中的方法,以共享模式进行中断。

public final void acquireSharedInterruptibly(int arg)
  throws InterruptedException {
  if (Thread.interrupted())
    throw new InterruptedException();
  if (tryAcquireShared(arg) < 0)
    doAcquireSharedInterruptibly(arg);
}

可以看到,acquireSharedInterruptibly 方法的内部会首先判断线程是否中断,如果线程中断,则直接抛出线程中断异常。如果没有中断,那么会以共享的方式获取。如果能够在共享的方式下不能获取锁,那么就会以共享的方式断开链接。

private void doAcquireSharedInterruptibly(int arg)
  throws InterruptedException {
  final Node node = addWaiter(Node.SHARED);
  boolean failed = true;
  try {
    for (;;) {
      final Node p = node.predecessor();
      if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
          setHeadAndPropagate(node, r);
          p.next = null; // help GC
          failed = false;
          return;
        }
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        throw new InterruptedException();
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }

这个方法有些长,我们分开来看

  • 首先,会先构造一个共享模式的 Node 入队
  • 然后使用无限循环判断新构造 node 的前驱节点,如果 node 节点的前驱节点是头节点,那么就会判断线程的状态,这里调用了一个 setHeadAndPropagate ,其源码如下
private void setHeadAndPropagate(Node node, int propagate) {
  Node h = head; 
  setHead(node);
  if (propagate > 0 || h == null || h.waitStatus < 0 ||
      (h = head) == null || h.waitStatus < 0) {
    Node s = node.next;
    if (s == null || s.isShared())
      doReleaseShared();
  }
}

首先会设置头节点,然后进行一系列的判断,获取节点的获取节点的后继,以共享模式进行释放,就会调用 doReleaseShared 方法,我们再来看一下 doReleaseShared 方法

private void doReleaseShared() {
 
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;            // loop to recheck cases
        unparkSuccessor(h);
      }
      else if (ws == 0 &&
               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;                // loop on failed CAS
    }
    if (h == head)                   // loop if head changed
      break;
  }
}

这个方法会以无限循环的方式首先判断头节点是否等于尾节点,如果头节点等于尾节点的话,就会直接退出。如果头节点不等于尾节点,会判断状态是否为 SIGNAL,不是的话就继续循环 compareAndSetWaitStatus,然后断开后继节点。如果状态不是 SIGNAL,也会调用 compareAndSetWaitStatus 设置状态为 PROPAGATE,状态为 0 并且不成功,就会继续循环。

也就是说 setHeadAndPropagate 就是设置头节点并且释放后继节点的一系列过程。

我们来看下面的 if 判断,也就是 shouldParkAfterFailedAcquire(p, node) 这里

if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
  throw new InterruptedException();

如果上面 Node p = node.predecessor() 获取前驱节点不是头节点,就会进行 park 断开操作,判断此时是否能够断开,判断的标准如下

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  if (ws == Node.SIGNAL)
    return true;
  if (ws > 0) {
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

这个方法会判断 Node p 的前驱节点的结点状态(waitStatus),节点状态一共有五种,分别是

CANCELLED(1):表示当前结点已取消调度。当超时或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。

SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为 SIGNAL。

CONDITION(-2):表示结点等待在 Condition 上,当其他线程调用了 Condition 的 signal() 方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。

0:新结点入队时的默认状态。

一个和 await 重载的方法是 await(long timeout, TimeUnit unit),这个方法和 await 最主要的区别就是这个方法能够可以等待计数器一段时间再执行后续操作。

countDown 方法

countDown 是和 await 同等重要的方法,countDown 用于减少计数器的数量,如果计数减为 0 的话,就会释放所有的线程。

public void countDown() {
  sync.releaseShared(1);
}

这个方法会调用 releaseShared 方法,此方法用于共享模式下的释放操作,首先会判断是否能够进行释放,判断的方法就是 CountDownLatch 内部类 Sync 的 tryReleaseShared 方法

public final boolean releaseShared(int arg) {
  if (tryReleaseShared(arg)) {
    doReleaseShared();
    return true;
  }
  return false;
}
 
// ---- CountDownLatch ----
 
protected boolean tryReleaseShared(int releases) {
  for (;;) {
    int c = getState();
    if (c == 0)
      return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))
      return nextc == 0;
  }
}

tryReleaseShared 会进行 for 循环判断线程状态值,使用 CAS 不断尝试进行替换。

如果能够释放,就会调用 doReleaseShared 方法

private void doReleaseShared() {
  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;            // loop to recheck cases
        unparkSuccessor(h);
      }
      else if (ws == 0 &&
               !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;                // loop on failed CAS
    }
    if (h == head)                   // loop if head changed
      break;
  }
}

可以看到,doReleaseShared 其实也是一个无限循环不断使用 CAS 尝试替换的操作。

总结

本文是 CountDownLatch 的基本使用和源码分析,CountDownLatch 就是一个基于 AQS 的计数器,它内部的方法都是围绕 AQS 框架来谈的,除此之外还有其他比如 ReentrantLock、Semaphore 等都是 AQS 的实现,所以要研究并发的话,离不开对 AQS 的探讨。CountDownLatch 的源码看起来很少,比较简单,但是其内部比如 await 方法的调用链路却很长,也值得花费时间深入研究。

CyclicBarrier

有若干个线程,比如说有五个线程,需要它们都到达了某一个点之后才能开始一起执行,也就是说假如其中只有四个线程到达了这个点,还差一个线程没到达,此时这四个线程都会进入等待状态,直到第五个线程也到达了这个点之后,这五个线程才开始一起进行执行状态,是不是这个场景的描述跟CountDownLatch很类似的,下面用一个简单的示例图来感受一下它们两者的区别:

CountDownLatch

CyclicBarrier

所有子线程都已经到达屏障之后,此时屏障就会消失,所有子线程继续执行,若存子线程尚未到达屏障,其他到达了屏障的线程都会进行等待

package com.concurrency2;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;

public class MyTest1 {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
        for(int i = 0;i < 3;i ++) {
            new Thread(() -> {
                try {
                    Thread.sleep((long)(Math.random() * 2000));

                    int randomInt = new Random().nextInt(500);
                    System.out.println("hello " + randomInt);

                    cyclicBarrier.await();

                    System.out.println("world " + randomInt);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

前面讲过CountDownLatch是基于AQS实现的;而CyclicBarrier是基于ReentrantLock重入锁实现的,当然ReentrantLock也是基于AQS实现的,非要说CyclicBarrier也是基于AQS实现的也不为过。

成员变量

/ /可以理解为初始化时 需要阻塞的任务个数
    private final int parties;
/ /剩余需要等待的任务个数,初始值为parties,直到为0时依次唤醒所有被阻塞的任务线程。
    private int count;

/ /每次对“栅栏”的主要成员变量进行变更操作,都应该加锁
    private final ReentrantLock lock = new ReentrantLock();
/ /用于阻塞和唤醒任务线程
    private final Condition trip = lock.newCondition();

/ /在所有线程被唤醒前,需要执行的一个Runable对应的run方法
    private final Runnable barrierCommand;
/ /用于表示“栅栏”当前的状态
    private Generation generation = new Generation();

CyclicBarrier有两个重载的构造方法,一个是不带Runnable参数,另一个带有Runnable参数。本质上都会调用带Runnable参数的构造方法进行实例化,这里只贴出带Runnable参数的构造方法实现:

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;    / /为了实现复用,进行备份
        this.count = parties;   / /初始化,待阻塞的任务总数
            this.barrierCommand = barrierAction;   / /初始化
}

await()方法有两层含义:

1、先检查前面是否已经有count个线程了,如果没有线程则会进入等待状态
2、当检测到屏障已经有count个线程了,则所有线程会冲出屏障继续执行(如果有Runnable参数的构造方法先执行汇总方法)

int index = --count;操作很明显不是原子性的,如果在多线程中不加lock肯定会出问题

private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
 
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock(); //加锁
        try {
            final Generation g = generation;
 
            if (g.broken)
                throw new BrokenBarrierException();
            //有一个线程线程被中断,整个CyclicBarrier将不可用
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
 
            int index = --count; //待等待的任务数减1
            if (index == 0) {  // 如果待等待的任务数减至0,依次唤醒所有线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();//唤醒前先执行Runnable对象的run方法
                    ranAction = true;
                    nextGeneration();//重置整个CyclicBarrier,方便下次重用
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
 
            //如果待等待的任务数大于0,进行线程阻塞,直到count为0时被唤醒
            for (;;) {
                try {
                    if (!timed)
                        trip.await();//阻塞当前线程
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);//延时阻塞当前线程
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
 
                if (g.broken)//异常唤醒
                    throw new BrokenBarrierException();
 
                if (g != generation)//正常被唤醒,generation会被新建
                    return index;
 
                if (timed && nanos <= 0L) {//延迟阻塞时间到后唤醒
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
}

Exchanger

Exchanger用于两个线程间的通信,无论哪个线程先调用都会等到另外一个线程调用时进行数据交换

public class TestExchanger {

    private static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(()->{
            try {
                String aa = exchanger.exchange("V1");
                System.out.println(Thread.currentThread().getName() + aa);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "T1").start();

        new Thread(()->{
            try {
                String bb = exchanger.exchange("V2");
                System.out.println(Thread.currentThread().getName() + bb);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "T2").start();
    }
}

总结

允许在并发任务之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中

Phaser

phaser是jdk新增的一个同步辅助类,它可以实现CyclicBarrier

Phaser可以认为是CyclicBarrier与CountDownLatch的整合,用考试做类比,一个班级考试必须等所有同学做完题,才能开始下一科考试(做题可以看做不同的线程在执行任务)

对比

我们可以回忆一下CountDownLatch讲的是先指定N个线程干完活,在这N个线程干完活之前,其他线程先等着

CyclicBarrier讲的是先指定需要N个线程,等N个线程到齐了大家同时干活(并发执行)

而Phaser正是结合了这二者的特点,可以理解为先指定N个线程等N个线程到齐了开始第一阶段的活,等第一阶段干完活了,接着这N个线程继续开始干下一阶段的活,以此类推,直至干完了所有的活,(当然每个阶段干完进入下一阶段也可以踢掉一些不需要的线程,可以理解为造完房子,在后期收尾时不需要那么多人了,可以辞掉一些,当然也可以全部留下来不剔除)

package test2;
import java.util.concurrent.Phaser;
public class MyPhaser {
    private static ExamPhaser examPhaser = new ExamPhaser();
    public static void main(String[] args) {
        examPhaser.bulkRegister(5);
        new Thread(new Student(), "学生A").start();
        new Thread(new Student(), "学生B").start();
        new Thread(new Student(), "学生C").start();
        new Thread(new Student(), "学生D").start();
        new Thread(new Student(), "学生E").start();
    }
    static class Student implements Runnable {
        private static void chinese() {
            System.out.println(Thread.currentThread().getName() + "语文考试完毕");
            //  类似await()方法,记录到达线程数,阻塞等待其他线程到达同步点后再继续执行。
            examPhaser.arriveAndAwaitAdvance();
        }
        private static void math() {
            System.out.println(Thread.currentThread().getName() + "数学考试完毕");
            examPhaser.arriveAndAwaitAdvance();
        }
        private static void english() {
            System.out.println(Thread.currentThread().getName() + "英语考试完毕");
            examPhaser.arriveAndAwaitAdvance();
        }
        private static void history() {
            System.out.println(Thread.currentThread().getName() + "历史考试完毕");
            examPhaser.arriveAndAwaitAdvance();
        }
        @Override
        public void run() {
            chinese();
            math();
            english();
            history();
        }
    }
    static class ExamPhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {
            switch (phase) {
                case 0:
                    System.out.println("语文试结束!");
                    return false;
                case 1:
                    System.out.println("数学考试结束!");
                    return false;
                case 2:
                    System.out.println("英语考试结束!");
                    return false;
                case 3:
                    System.out.println("历史考试结束!");
                    return true;
                default:
                    return true;
            }
        }
    }
}
Last modification:December 15, 2022
如果觉得我的文章对你有用,请随意赞赏