零散笔记的流水账式总结,有错误之处请指正😂

部分图片资源来自网络,侵删

1. 内存模型

1.1 硬件内存模型

Modern hardware memory architecture.

CPU不能直接对内存进行操作,必须将内存中的值读到cache/寄存器中进行操作,然后写回内存

1.2 JVM内存模型

JVM模型和OS的内存模型一样,分栈和堆

1.2.1 线程栈

每个线程会分配一个线程栈,用于存储方法调用过程中的局部变量,主要用来存储

  • 原始类型变量
  • 堆对象引用

1.2.2 堆

Java对象的主要存储区域,new出来的对象都在堆区

2. volatile关键字

2.1 变量可见性

两个线程同时引用堆中的对象,想修改对象中某个字段的值,在通过jvm最终都会执行到硬件层面,cpu同样需要从内存把数据读到cache里,操作完成之后写回内存

i=0由于这个特性,在t1执行完i+=1操作后,cpu cache中的值并没有及时写回内存,导致t2在执行i+=1操作时,从内存读到的值依旧是0。和最终的预期并不一致

image-20180927152138166

给变量加上volatile关键字,就能确保对变量的更新操作完成之后,cpu cache中的值被及时写回内存。

volatile有个额外的保证,当t1写回一个volatile变量时,所有对t1可见的变量都会被写回内存

2.2 乱序执行

以下代码

int a = 0;
int b = 0;
a += 1;
b += 1;

最终执行效果可能是

int a = 0;
a += 1;
int b = 0;
b += 1;

编译器/cpu都可能帮你干这种事。编译器先略过,看看cpu。

现代cpu内置了若干条指令流水线,一条指令执行的时候会分成若干个阶段,某些阶段互不影响,可以并行执行,所以会被分配到这些流水线里。当遇到分支条件时,cpu可能会进行分支预测,即在流水线里同时执行两个分支条件,当命中其中一个分支时,立刻抛弃掉另一个分支的流水线指令,提高效率,反正空着也是空着。

从cpu角度,以上做法是为了减少执行这4条语句的总指令周期。当cpu发现,原语句顺序里,语句1/3和语句2/4的执行相互独立,互不影响,2和3的执行顺序并不会影响最终结果,cpu可能就会打乱指令顺序,按最优顺序执行。

在上面这段代码里,乱序是没什么毛病,毕竟结果一样,但是下面这一段可能就不一样了。

常见的double check singleton

public class A {
    private static A _instance;
    private A() {}
    public static A getInstance() {
        if (_instance == null) {			// 1⃣️
            synchronized (A.class) {
                if (_instance == null) {
                    _instance = new A();	// 2⃣️
                }
            }
        }
        return _instance;
    }
}

注意这里的2⃣️并不是原子操作,可以拆分成以下操作

  1. 申请内存
  2. 调用A的构造函数初始化_instance
  3. 将申请到的内存分配给_instance

cpu会认为2和3的执行顺序不影响最终产出结果,所以可能乱序执行,线程t1先执行了3,还没执行2的时候,线程t2进了1⃣️判断,发现有值,直接返回了_instance拿出去用,这个时候_instance其实还没初始化完成。

解决办法就是给_instance加volatile关键字,因为volatile关键字除了可见性保证之外还有另一个保证:写入volatile变量之前的操作”happens before”写入操作本身,即1/2这两条语句作为一个整体不能和3乱序,1/2两个之间是否乱序由乱序规则规定,通过生成汇编内存屏障指令实现

2.3 volatile不是什么

volatile不是原子操作,即volatile不提供CAS逻辑的原子性,要实现CAS的原子性必须用Unsafe类通过cpu指令实现

3. synchronized关键字(Monitor)

synchronized关键字实现了monitor语义,monitor是:

  • monitor内同一时间只允许一个线程访问(其余进入entry set)
  • 提供主动让出cpu时间片等待某个事件发生的方法(wait,进入wait set)
  • 提供通知其他线程某个事件发生的方法(notify),唤醒wait set中的一个或多个线程

jvm通过synchronized关键字原生提供moniterenter和monitorexit指令,进入/离开monitor。在synchronized块内部通过wait/notify/notifyAll实现monitor的后两条语义。

wait/notify/notifyAll必须在synchronized块内执行(即当前线程是monitor的拥有者),否则会抛java.lang.IllegalMonitorStateException异常

notifyAll唤醒的所有线程都必须等待monitor退出之后才能竞争monitor,按线程优先级、等待时间等条件计算哪个线程能获得monitor,得到monitor之后才能继续执行

4. Lock

Lock是Java标准库提供的比monitor更加灵活的同步机制,有相同也有不同。

  • lock()/moniterenter
  • unlock()/monitorexit
  • Condition.await()/Object.wait()
  • Condition.signal/Object.notify()

不同之处在于,一个Lock可以关联多个Condition对象

4.1 常见的Lock

4.1.1 ReentrantLock

可重入锁:已获得锁的线程可以再次获得锁而不阻塞,当锁拥有者数量为0时才释放锁

4.1.2 ReadWriteLock(interface)

读写锁:允许多个reader同时获得ReadLock,或者一个writer获得WriteLock。没有writer线程并且没有writer request时,可以允许成为reader。没有writer和reader时,允许成为writer。

这只是个interface,由ReentrantReadWriteLock实现

4.1.3 ReentrantReadWriteLock

可重入读写锁:已获得ReadLock的线程,在没有其他reader并且没有writer的情况下,允许获得writer锁。已获得WriteLock的线程,如果已经是reader或者没有其他writer request的情况下,允许获得reader锁

4.2 Lock实现

篇幅所限,详细内容要新开一篇

4.2.1 通用实现

AQS

整个lock框架底层由AbstractQueuedSynchronizer提供实现,AbstractQueuedSynchronizerUnsafe的CAS提供锁的实现,当线程没有抢到锁时,会进入基于链表的等待队列。

获得锁

public final void acquire(int arg) {
	if (!tryAcquire(arg) &&
			acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
		selfInterrupt();
}

tryAcquire为抽象方法,由各Lock子类提供实现,作用为尝试以独占模式获得锁

acquireQueued当尝试获取锁失败时,将当前线程包装后加入等待队列

释放锁

public final boolean release(int arg) {
	if (tryRelease(arg)) {
		Node h = head;
		if (h != null && h.waitStatus != 0)
			unparkSuccessor(h);
		return true;
	}
	return false;
}

tryRelease为抽象方法,由各Lock子类实现,作为尝试在独占模式下解锁的方法

然后从等待队列中拿出头部节点的线程,唤醒之

4.2.2 ReentrantLock实现

ReentrantLock依赖内部一个成员变量sync,继承于AQS。fair和non-fair模式的锁取决于用的是哪个sync子类。

类结构图如图所示

image-20180929154724854

NonfairSync

非公平锁实现,tryAcquire方法最终会调用到这个方法

final boolean nonfairTryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {	// 1⃣️
		if (compareAndSetState(0, acquires)) {	// 2⃣️
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	else if (current == getExclusiveOwnerThread()) { // 3⃣️
		int nextc = c + acquires;
		if (nextc < 0) // overflow
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}

意思就是,如果没人占用这个锁(1⃣️),那么尝试抢占(2⃣️),如果成功了,就成功了,失败了就返回到AQS的acquire方法进等待队列并挂起。如果已经被占用了,并且自己就是占用者(3⃣️),那么增加计数state

FairSync

公平锁实现

protected final boolean tryAcquire(int acquires) {
	final Thread current = Thread.currentThread();
	int c = getState();
	if (c == 0) {
		if (!hasQueuedPredecessors() &&		// 1⃣️
				compareAndSetState(0, acquires)) {
			setExclusiveOwnerThread(current);
			return true;
		}
	}
	else if (current == getExclusiveOwnerThread()) {
		int nextc = c + acquires;
		if (nextc < 0)
			throw new Error("Maximum lock count exceeded");
		setState(nextc);
		return true;
	}
	return false;
}

这比非公平锁只多了1⃣️这行,公平锁在尝试抢占之前,会看一眼等待队列里有没有线程已经在等待,如果有,乖乖返回AQS的acquire方法进等待队列。这样的好处是,不会导致等待队列里的线程饿死。坏处是,略微降低了吞吐量(TODO: 需要测试数据支撑)

4.2.3 ReentrantReadWriteLock实现

其中包含ReadLockWriteLock,这两个最终又依赖继承于AbstractQueuedSynchronizer的实现Sync

ReentrantLock一样,也有FairSync和NonfairSync。不同于ReentrantLock,ReentrantReadWriteLock分别利用了state字段的高低4位来表示读锁和写锁占用数量

WriteLock会调用

protected final boolean tryAcquire(int acquires) {
	Thread current = Thread.currentThread();
	int c = getState();
	int w = exclusiveCount(c);
	if (c != 0) {
		// (Note: if c != 0 and w == 0 then shared count != 0)
		if (w == 0 || current != getExclusiveOwnerThread())		// 1⃣️
			return false;
		if (w + exclusiveCount(acquires) > MAX_COUNT)
			throw new Error("Maximum lock count exceeded");
		// Reentrant acquire
		setState(c + acquires);
		return true;
	}
	if (writerShouldBlock() ||		// 2⃣️
			!compareAndSetState(c, c + acquires))	// 3⃣️
		return false;
	setExclusiveOwnerThread(current);	// 4⃣️
	return true;
}

代码1⃣️处分两种情况,都不能获得写锁

  • w是写锁占用数,state != 0 && w == 0 => 读锁占用数>0,不能获得写锁
  • w != 0 && current != getExclusiveOwnerThread(),有人占了写锁,但不是当前线程,不能获得写锁

当没人占用任何锁,并且writer不用阻塞的时候(2⃣️),尝试抢占(3⃣️),设置独占线程(4⃣️)。writerShouldBlock()是抽象方法由子类实现(是否公平锁)

ReadLock实现

// todo 篇幅较长,考虑独立

NonfairSync & FairSync

由于主要工作都在父类Sync里完成了,这两个子类仅实现reader或者writer是否需要主动阻塞的逻辑

NonfairSync

writer永远不需要主动阻塞。

reader会看等待队列里的第一个线程是否是写线程,如果是写线程,则主动阻塞。这里并不是让写线程有足够高的优先级优先执行,而是一定程度上减少写线程饿死的肯恩行。所以这个不公平锁对等待中的写线程还是有一点点公平的

FairSync

读写线程都会先看看等待队列里有没有线程在等待,有就去排队,主动阻塞

4.3 Condition实现

ConditionReentrantLock派生出来的,可以在获得锁之后在线程间同步的机制。跟ReentrantLock关联,必须在获得锁之后使用。

Conditionawaitsignal方法,作用和Object的wait/notify一样。内部实现上,关联了生成它的ReentrantLock对象,自身也有类似ReentrantLock等待队列的条件队列。当await的时候先把当前线程加到条件队列里,等待signal将线程转移到ReentrantLock的等待队列上,等待ReentrantLock调度

4.3.1 await

主逻辑如下

public final void await() throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	Node node = addConditionWaiter();		// 1⃣️
	int savedState = fullyRelease(node);	// 2⃣️
	int interruptMode = 0;
	while (!isOnSyncQueue(node)) {			// 3⃣️
		LockSupport.park(this);
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)	// 4⃣️
		interruptMode = REINTERRUPT;
	if (node.nextWaiter != null) // clean up if cancelled
		unlinkCancelledWaiters();
	if (interruptMode != 0)
		reportInterruptAfterWait(interruptMode);
}

几个关键逻辑

1⃣️将当前线程加入Condition的等待队列(链表结构)中

2⃣️释放当前线程持有的ReentrantLock,注意这里需要fullyRelease,因为锁可重入

3⃣️循环等待,直到当前线程被signal转移到ReentrantLock的等待队列里

4⃣️acquireQueued返回之后,当前线程就持有了ReentrantLock

4.3.2 signal

关键逻辑

private void doSignal(Node first) {
	do {
		if ( (firstWaiter = first.nextWaiter) == null)
			lastWaiter = null;
		first.nextWaiter = null;
	} while (!transferForSignal(first) &&
			(first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
		return false;

	Node p = enq(node);
	int ws = p.waitStatus;
	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
		LockSupport.unpark(node.thread);
	return true;
}

doSignal遍历Condition的等待队列,直到遇到一个能被成功转移到ReentrantLock等待队列的线程,完成signal操作

transferForSignal第一个CAS注释上说如果waitStatus不是CONDITION,那么表示node被cancel,这块不是很理解什么时候会被cancel,总之就是当前node无效,转移失败。当waitStatus成功变成0(等待队列上的node类型)之后,将node通过enq放到等待队列里,p是node的前序节点。

这里不是很理解最后这个if的逻辑,看起来是如果前序节点的waitStatus不是SIGNAL(表示当前节点没在SIGNAL状态),那么唤醒当前线程???

5. Lock Free

个人理解,lock的最大问题在以下几点

  • 竞争失败需要入等待队列,入队出队需要跟等待队列有交互
  • 线程挂起/切换代价大,引发context保存/恢复、cpu cache miss等问题

lock free相对于lock based算法会有以上优势,但不是银弹。lock free的主要算法是spin + cas

public class LockFree {
    private int i = 0;	// ①

    private static final Unsafe UNSAFE = Unsafe.getUnsafe();
    private static long iOffset = 0;	// ②
    static {
        try {
            iOffset = UNSAFE.objectFieldOffset(LockFree.class.getDeclaredField("i"));
        } catch (NoSuchFieldException e) {
            throw new Error(e);
        }
    }

    public void incr() {
        while (true) {
            int old = i;
            i++;
            if (UNSAFE.compareAndSwapInt(this, iOffset, old, i)) {		// ③
                break;
            }
        }
    }
}
  1. 数据字段

  2. unsafe得到的这个字段在内存中的偏移量,约等于C里的(&struct.field - &struct)

  3. CAS操作,在native方法中最终会调用到CPU的CAS指令,原子赋值

乐观锁实现,只有一个线程能看到old和i的值相等而改变值,其他线程返回while开头重新竞争。个人认为Lock Free会有以下问题

  • 在竞争非常激烈的场合,即长期有大量竞争,那么就会导致有很多线程在空耗CPU自旋
  • 只适合临界区非常轻量的场合,即临界区内执行时间很短
  • 非公平实现,可能会有线程饿死

6. 常用同步工具

concurrent包下的同步工具基本都是AbstractQueuedSynchronizer的直接使用(内部使用其子类),或者间接使用(用ReentrantLock)

CountDownLatch

设置等待数量,允许一个线程等在latch上,N个线程countDown(),在所有线程countDown()完之后,等待线程放行。典型场景就是多线程统计的时候,N个线程统计完了之后在等待线程上归并

实现上,内部继承AQS,以share mode使用,state归零之后可以获得锁

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

Semaphore

容量为N的资源,每acquire()一次-1,到0的时候阻塞,等前面持有资源的人release()

实现上,很直接的AQS应用,share mode,设置初始state值,大于0的时候允许获得锁,到0阻塞

也分fair和non-fair,区别和ReentrantLock一样,在于抢锁前是否会看一眼队列里有没有等待线程

CyclicBarrier

设置个初始量N,在这N个线程都await()之后所有线程才能继续执行,否则阻塞。线程中断可以打破这个barrier,所有await中的线程抛InterruptedException

实现上,用ReentrantLock做同步,内部用一个generation的概念区分每一次全员到达barrier之前的状态,每次到达barrier之后重置这个generation

7. 线程池

线程创建的时候有额外的开销,OS需要创建线程内核数据结构、分配栈内存(fork/COW)、管理调度队列,JVM需要创建Thread对象、分配栈空间、关联OS线程等操作,所以一般多线程程序不会频繁创建/结束线程,而是将线程池化,要用的时候向线程池提交任务,由线程池去找线程执行(如果找得到的话),结束后归还线程池等待新任务。

使用

Java线程池主要由ThreadPoolExecutor(及其子类)实现,通过void execute(Runnable command)(或Future<?> submit(Runnable task),如果需要后续拿返回结果)向线程池提交任务执行,submit最终也是调用execute。

完整版构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize,池内idle线程数量限制,一般不会被回收,除非设置了allowCoreThreadTimeOut
  • maximumPoolSize,总线程数量,线程数量在(corePoolSize, maximumPoolSize]区间时,idle线程超时会被回收
  • keepAliveTime,idle线程超时时间
  • unit,keepAliveTime单位
  • workQueue,等待队列
  • ThreadFactory,创建线程的工厂类
  • RejectedExecutionHandler,线程池拒绝任务后的回调

实现

实现上,内部由AtomicInteger ctl控制线程池内部状态,其中高3位存储执行状态(RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED),低3位存储工作线程数。

execute(Runnable command)实现如下

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {	// ①
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {	// ②
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))	// ③
        reject(command);
}

主要逻辑是

  1. 如果工作线程数比core数量少,直接通过addWorker增加工作线程,并执行当前这个任务
  2. 超过core数量之后,后续任务先塞等待队列
  3. 如果等待队列满了(2的offer失败),再新开线程执行

以上可以看出,如果等待队列是有容量的,很可能后来没机会进队列的任务比早先先进等待队列的先执行,这块可能某些业务场景会有问题

addWorker(Runnable firstTask, boolean core)主干逻辑如下(有删节,主要是维护计数相关的代码)

private boolean addWorker(Runnable firstTask, boolean core) {
    // 内部计数......
    
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);		// ①
        final Thread t = w.thread;
        if (t != null) {
            // worker集合操作......
            
            if (workerAdded) {
                t.start();			// ②
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable {
    final Thread thread;
    }
  1. addWorker主要new了一个Worker对象,看Worker对象的定义可以发现它实现了Runnable接口,并且内部维护了一个Thread,并且本身就是个AQS。就是说可以把他同时当一个线程和一个锁来用
  2. 初始化工作完成之后启动Worker内部维护的线程,最终会调用到java.util.concurrent.ThreadPoolExecutor#runWorkerrunWorker主要逻辑是死循环调用getTask从等待队列中取任务执行

Runnable getTask()逻辑

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        // ...

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;	// ①

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {	// ③
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();		// ②
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

注意编号顺序

  1. 在标记了allowCoreThreadTimeOut或者工作线程数大于core数量时需要设置超时
  2. 从队列里拿task,看1有没有设置超时来决定等待是否需要超时
  3. 超时并且队列里没task的时候,CAS成功之后返回null,告诉工作线程可以结束循环而退出,回收工作线程