前言

多线程原理一个高频面试题,需要了解核心参数,以及参数之间的关系

一、ThreadPoolExecutor

1、核心参数
  • corePoolSize

    • 核心线程数,相当于最小线程数
  • maximumPoolSize

    • 最大线程数,线程池最大线程数
  • keepAliveTime

    • 线程存活时间
  • BlockingQueue

    • 任务队列,当前活跃线程数超过核心线程数的时候,会将提交任务放到队列中等待线程执行
  • RejectedExecutionHandler

    • 当无可用线程,且队列容量不足,会选择的放弃策略,默认 AbortPolicy,提供了四种策略

    • AbortPolicy

      抛出 RejectedExecutionException 异常

    • DiscardPolicy

      不做任何处理,直接放弃

    • DiscardOldestPolicy

      丢弃blockqueue 最尾部的那个task,来执行当前的task

    • CallerRunsPolicy

      调用主线程执行,不用线程池中的线程

  • ThreadFactory

    • 线程工厂,用户创建线程,默认 DefaultThreadFactory
2、worker

基于AQS

private final HashSet<Worker> workers = new HashSet<Worker>();

// 创建 移除 都需要 加锁
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();
// 根据当前runnable 对象,构造成worker
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 当前线程
final Thread thread;
// 当前提交的 runnable
Runnable firstTask;

// 完成的 任务数
volatile long completedTasks;

Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

}
3、execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

// 当前线程数 + 线程池状态
int c = ctl.get();
// 判断当前线程数 是否 小于 核心线程数
if (workerCountOf(c) < corePoolSize) {
// 创建worker 添加到 HashSet<Worker> workers = new HashSet<Worker>(); 维持线程池
if (addWorker(command, true))
return;
c = ctl.get();
}
// 线程池状态是RUNNING,就从放到queue中,入队
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 入队伍失败,就会继续创建非核心worker
else if (!addWorker(command, false))
// 失败就执行废弃策略
reject(command);
}
// work的run 是调用 runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 先执行 firstTask,执行完成,就去 getTask,线程启动起来就会一直去获取任务,直到没有任务,就会执行 finally 中的代码块,就行回收线程;但是默认还是会维持核心线程数的工作线程
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 没有任务 就会回收 work 统计完成任务数
processWorkerExit(w, completedAbruptly);
}
}
private Runnable getTask() {
boolean timedOut = false;

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// 这里很关键,当目前工作线程回收到了核心线程数的情况下,由于 allowCoreThreadTimeOut 默认是 false,所以导致 timed 是false
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 上面timed是false,就会用take,这种没有获取到任务,就会阻塞,所以有时候,在main方法中,用线程池执行任务,执行完成会发现程序不会退出,就是在这里,如果 allowCoreThreadTimeOut 设置为true的话,就会退出
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}