跳至主要內容

AQS底层原理

xw大约 7 分钟JAVAJAVA

概述

AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包下面。它是一个Java提高的底层同步工具类,比如CountDownLatch、ReentrantLock,Semaphore,ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS实现。AQS是一个抽象类,提供了并发的基础执行框架,过程如下图所示:

  1. 获取state同步状态,
  2. 成功加锁执行业务逻辑后释放锁,然后唤醒等待队列的节点。失败构建Node和封装线程,并将Node信息队列尾部,阻塞当前线程。

AQS有三种同步方式,独占式(ReentrantLock)、共享式(Semaphore)、组合(ReentrantReadWriteLock),AQS提供了底层支持,支持自由组装实现。

AQS核心组成

AQS出于“分离变与不变”的原则,基于模板模式实现。AQS为锁获取、锁释放的排队和出 队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取 为钩子方法,供各种锁的子类(或者其内部类)去实现。

1. 转态标志位

AQS中维持了一个单一的volatile修饰的状态信息state,AQS使用int类型的state标示锁的状态, 可以理解为锁的同步状态。由于setState()无法保证原子性,因此AQS给我们提供了compareAndSetState()方法利用底层 UnSafe的CAS机制 来实现原子性 。

 //同步状态,使用volatile保证线程可见 
 private volatile int state;

2. 队列节点类

AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类 Node定义。

3. FIFO双向同步队列

AQS的内部队列是CLH队列的变种,每当线程通过AQS获取锁失败时,线程将被封装成一个Node 节点,通过CAS原子操作插入队列尾部。当有线程释放锁时,AQS会尝试让队首的后驱节点占用锁。 AQS是一个通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点head和tail记录队 首和队尾元素,元素的节点类型为Node类型。

JUC 显式锁与 AQS 的关系

AQS是java.util.concurrent包的一个同步器,它实现了锁的基本抽象功能,支持独占锁与共享锁 两种方式。该类使用模板模式来实现的,成为构建锁和同步器的框架,使用该类可以简单且高效地 构造出应用广泛的同步器(或者等待队列)。 java.util.concurrent.locks包中的显式锁如ReentrantLock、ReentrantReadWriteLock,线程同步工具 如Semaphore,异步回调工具如FutureTask等,内部都使用了AQS作为等待队列。通过开发工具进行 AQS的子类导航会发现大量的AQS子类以内部类的形式使用

ReentrantLock实现

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package java.util.concurrent.locks;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject;
import jdk.internal.vm.annotation.ReservedStackAccess;

public class ReentrantLock implements Lock, Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    private final ReentrantLock.Sync sync;

    // 默认非公平锁
    public ReentrantLock() {
        this.sync = new ReentrantLock.NonfairSync();
    }
    // true公平锁 false非公平锁
    public ReentrantLock(boolean fair) {
        this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
    }

    public void lock() {
        this.sync.acquire(1);
    }

    public void lockInterruptibly() throws InterruptedException {
        this.sync.acquireInterruptibly(1);
    }

    public boolean tryLock() {
        return this.sync.nonfairTryAcquire(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return this.sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        this.sync.release(1);
    }

    public Condition newCondition() {
        return this.sync.newCondition();
    }

    public int getHoldCount() {
        return this.sync.getHoldCount();
    }

    public boolean isHeldByCurrentThread() {
        return this.sync.isHeldExclusively();
    }

    public boolean isLocked() {
        return this.sync.isLocked();
    }

    public final boolean isFair() {
        return this.sync instanceof ReentrantLock.FairSync;
    }

    protected Thread getOwner() {
        return this.sync.getOwner();
    }

    public final boolean hasQueuedThreads() {
        return this.sync.hasQueuedThreads();
    }

    public final boolean hasQueuedThread(Thread thread) {
        return this.sync.isQueued(thread);
    }

    public final int getQueueLength() {
        return this.sync.getQueueLength();
    }

    protected Collection<Thread> getQueuedThreads() {
        return this.sync.getQueuedThreads();
    }

    public boolean hasWaiters(Condition condition) {
        if (condition == null) {
            throw new NullPointerException();
        } else if (!(condition instanceof ConditionObject)) {
            throw new IllegalArgumentException("not owner");
        } else {
            return this.sync.hasWaiters((ConditionObject)condition);
        }
    }

    public int getWaitQueueLength(Condition condition) {
        if (condition == null) {
            throw new NullPointerException();
        } else if (!(condition instanceof ConditionObject)) {
            throw new IllegalArgumentException("not owner");
        } else {
            return this.sync.getWaitQueueLength((ConditionObject)condition);
        }
    }

    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null) {
            throw new NullPointerException();
        } else if (!(condition instanceof ConditionObject)) {
            throw new IllegalArgumentException("not owner");
        } else {
            return this.sync.getWaitingThreads((ConditionObject)condition);
        }
    }

    public String toString() {
        Thread o = this.sync.getOwner();
        return super.toString() + (o == null ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]");
    }

    static final class FairSync extends ReentrantLock.Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        FairSync() {
        }

        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = this.getState();
            // 拿到锁
            if (c == 0) {
                //this.hasQueuedPredecessors() 判断其他线程等待超时时间大于此线程
                if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, acquires)) {
                    //将当前线程设置为独占模式
                    this.setExclusiveOwnerThread(current);
                    return true;
                }
            //如果当前线程已经拿到锁,可重入锁    
            } else if (current == this.getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) {
                    throw new Error("Maximum lock count exceeded");
                }

                this.setState(nextc);
                return true;
            }

            return false;
        }
    }

    //非公平锁具体实现
    static final class NonfairSync extends ReentrantLock.Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        NonfairSync() {
        }

        protected final boolean tryAcquire(int acquires) {
            return this.nonfairTryAcquire(acquires);
        }
    }

   
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        Sync() {
        }

        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = this.getState();
            if (c == 0) {
                // 直接拿
                if (this.compareAndSetState(0, acquires)) {
                    this.setExclusiveOwnerThread(current);
                    return true;
                }
            //可重入锁    
            } else if (current == this.getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) {
                    throw new Error("Maximum lock count exceeded");
                }

                this.setState(nextc);
                return true;
            }

            return false;
        }

        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = this.getState() - releases;
            // 当前线程 没拿到锁报错
            if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            } else {
                boolean free = false;
                if (c == 0) {
                    free = true;
                    this.setExclusiveOwnerThread((Thread)null);
                }
                
                this.setState(c);
                return free;
            }
        }

        protected final boolean isHeldExclusively() {
            return this.getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject(this);
        }

        final Thread getOwner() {
            return this.getState() == 0 ? null : this.getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return this.isHeldExclusively() ? this.getState() : 0;
        }

        final boolean isLocked() {
            return this.getState() != 0;
        }

        private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            this.setState(0);
        }
    }
}

公平锁和非公平锁的核心区别:

  • 非公平锁不会管等待队列是否有其他线程,直接使用cas更新state值,更新成功则判断加锁成功。

ReentrantLock和synchronized区别:

synchronized:
1、是悲观锁会引起其他线程阻塞,java内置关键字,
2、无法判断是否获取锁的状态,锁可重入、不可中断、只能是非公平
3、加锁解锁的过程是隐式的,用户不用手动操作,优点是操作简单但显得不够灵活
4、一般并发场景使用足够、可以放在被递归执行的方法上,且不用担心线程最后能否正确释放锁
5、synchronized操作的应该是对象头中mark word

ReentrantLock:
1、是个Lock接口的实现类,是悲观锁,
2、可以判断是否获取到锁,可重入、可判断、可公平可不公平
3、需要手动加锁和解锁,且 解锁的操作尽量要放在finally代码块中,保证线程正确释放锁
4、在复杂的并发场景中使用在重入时要却确保重复获取锁的次数必须和重复释放锁的次数一样,否则可能导致 其他线程无法获得该锁。
5、创建的时候通过传进参数true创建公平锁,如果传入的是false或没传参数则创建的是非公平锁
6、底层不同是AQS的state和FIFO队列来控制加锁

ReentrantReadWriteLock实现

ReentrantReadWriteLock实现了读写锁的分离,支持公平和非公平,底层也是基于AQS实现,允许从写锁降级为读锁。

ReentrantReadWriteLock分为读锁和写锁,内部实现的逻辑是将state值高16位表示读锁,低16位表示写锁,锁最大重入次数均为2的16次方减去1。类结构如下:

读锁实现:

//获取锁 
final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                int c = getState();
                //判断当前线程是否已经拿到写锁,不是直接返回
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                // 拿到最大重入次数
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                //获取到锁逻辑
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null ||
                            rh.tid != LockSupport.getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

写锁:

final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                // 当前线程没有拿到写锁
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }