《Java开发手册》中强调,线程资源必须通过线程池提供,而创建线程池必须使用ThreadPoolExecutor。手册主要强调利用线程池避免两个问题,一是线程过渡切换,二是避免请求过多时造成OOM。但是如果参数配置错误,还是会引发上面的两个问题。所以本节我们主要是讨论ThreadPoolExecutor的一些技术细节,并且给出几个常用的最佳实践建议。
网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、微信小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了马山免费建站欢迎大家使用!
我在查找资料的过程中,发现有些问题存在争议。后面发现,一部分原因是因为不同JDK版本的现实是有差异的。因此,下面的分析是基于当下最常用的版本JDK1.8,并且对于存在争议的问题,我们分析源码,源码才是最准确的。
这是一个争议点。我发现大部分博文,不论是国内的还是国外的,都是这样回答这个问题的:
按照上面的描述,如果corePoolSize=0,则会判断等待队列的容量,如果还有容量,则排队,并且不会创建新的线程。
—— 但其实,这是老版本的实现方式,从1.6之后,实现方式就变了。我们直接看execute的源码(submit也依赖它),我备注出了关键一行:
- int c = ctl.get();
- if (workerCountOf(c) < corePoolSize) {
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 注意这一行代码,添加到等待队列成功后,判断当前池内线程数是否为0,如果是则创建一个firstTask为null的worker,这个worker会从等待队列中获取任务并执行。
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- else if (!addWorker(command, false))
- reject(command);
答
上述问题需区分JDK版本。在1.6版本之后,如果corePoolSize=0,提交任务时如果线程池为空,则会立即创建一个线程来执行任务(先排队再获取);如果提交任务的时候,线程池不为空,则先在等待队列中排队,只有队列满了才会创建新线程。
所以,优化在于,在队列没有满的这段时间内,会有一个线程在消费提交的任务;1.6之前的实现是,必须等队列满了之后,才开始消费。
之前有人问过我这个问题,因为他发现应用中有些Bean创建了线程池,但是这个Bean一般情况下用不到,所以咨询我是否需要把这个线程池注释掉,以减少应用运行时的线程数(该应用运行时线程过多。)
答
不会。从上面的源码可以看出,在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。
这个问题有点tricky。首先我们要明确一下概念,虽然在JavaDoc中也使用了“core/non-core threads”这样的描述,但其实这是一个动态的概念,JDK并没有给一部分线程打上“core”的标记,做什么特殊化的处理。这个问题我认为想要探讨的是闲置线程终结策略的问题。
在JDK1.6之前,线程池会尽量保持corePoolSize个核心线程,即使这些线程闲置了很长时间。这一点曾被开发者诟病,所以从JDK1.6开始,提供了方法allowsCoreThreadTimeOut,如果传参为true,则允许闲置的核心线程被终止。
请注意这种策略和corePoolSize=0的区别。我总结的区别是:
所以corePoolSize=0的效果,基本等同于allowsCoreThreadTimeOut=true && corePoolSize=1,但实现细节其实不同。
答
在JDK1.6之后,如果allowsCoreThreadTimeOut=true,核心线程也可以被终止。
首先我们要明确一下线程池模型。线程池有个内部类Worker,它实现了Runnable接口,首先,它自己要run起来。然后它会在合适的时候获取我们提交的Runnable任务,然后调用任务的run()接口。一个Worker不终止的话可以不断执行任务。
我们前面说的“线程池中的线程”,其实就是Worker;等待队列中的元素,是我们提交的Runnable任务。
每一个Worker在创建出来的时候,会调用它本身的run()方法,实现是runWorker(this),这个实现的核心是一个while循环,这个循环不结束,Worker线程就不会终止,就是这个基本逻辑。
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- w.unlock(); // allow interrupts
- boolean completedAbruptly = true;
- try {
- // 看这里,核心逻辑在这里
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- wt.interrupt();
- try {
- beforeExecute(wt, task);
- Throwable thrown = null;
- try {
- task.run();
- } catch (RuntimeException x) {
- thrown = x; throw x;
- } catch (Error x) {
- thrown = x; throw x;
- } catch (Throwable x) {
- thrown = x; throw new Error(x);
- } finally {
- afterExecute(task, thrown);
- }
- } finally {
- task = null;
- w.completedTasks++;
- w.unlock();
- }
- }
- completedAbruptly = false;
- } finally {
- processWorkerExit(w, completedAbruptly);
- }
- }
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- int rs = runStateOf(c);
- // Check if queue empty only if necessary.
- if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 注意,核心中的核心在这里
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
答
实现方式非常巧妙,核心线程(Worker)即使一直空闲也不终止,是通过workQueue.take()实现的,它会一直阻塞到从等待队列中取到新的任务。非核心线程空闲指定时间后终止是通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)实现的,一个空闲的Worker只等待keepAliveTime,如果还没有取到任务则循环终止,线程也就运行结束了。
引申思考
Worker本身就是个线程,它再调用我们传入的Runnable.run(),会启动一个子线程么?如果你还没有答案,再回想一下Runnable和Thread的关系。
笼统地回答是会占用内存,我们分析一下占用了哪些内存。首先,比较普通的一部分,一个线程的内存模型:
我想额外强调是下面这几个内存占用,需要小心:
答
线程池保持空闲的核心线程是它的默认配置,一般来讲是没有问题的,因为它占用的内存一般不大。怕的就是业务代码中使用ThreadLocal缓存的数据过大又不清理。
如果你的应用线程数处于高位,那么需要观察一下YoungGC的情况,估算一下Eden大小是否足够。如果不够的话,可能要谨慎地创建新线程,并且让空闲的线程终止;必要的时候,可能需要对JVM进行调参。
这也是个争议点。有的博文说等于0表示空闲线程永远不会终止,有的说表示执行完立刻终止。还有的说等于-1表示空闲线程永远不会终止。其实稍微看一下源码知道了,这里我直接抛出答案。
答
在JDK1.8中,keepAliveTime=0表示非核心线程执行完立刻终止。
默认情况下,keepAliveTime小于0,初始化的时候才会报错;但如果allowsCoreThreadTimeOut,keepAliveTime必须大于0,不然初始化报错。
很多代码的写法,我们都习惯按照常见范式去编写,而没有去思考为什么。比如:
—— 但是在上面,我提到过,submit()底层实现依赖execute(),两者应该统一呀,为什么有差异呢?下面再扒一扒submit()的源码,它的实现蛮有意思。
首先,ThreadPoolExecutor中没有submit的代码,而是在它的父类AbstractExecutorService中,有三个submit的重载方法,代码非常简单,关键代码就两行:
- public Future> submit(Runnable task) {
- if (task == null) throw new NullPointerException();
- RunnableFuture
ftask = newTaskFor(task, null); - execute(ftask);
- return ftask;
- }
- public
Future submit(Runnable task, T result) { - if (task == null) throw new NullPointerException();
- RunnableFuture
ftask = newTaskFor(task, result); - execute(ftask);
- return ftask;
- }
- public
Future submit(Callable task) { - if (task == null) throw new NullPointerException();
- RunnableFuture
ftask = newTaskFor(task); - execute(ftask);
- return ftask;
- }
正是因为这三个重载方法,都调用了execute,所以我才说submit底层依赖execute。通过查看这里execute的实现,我们不难发现,它就是ThreadPoolExecutor中的实现,所以,造成submit和execute的差异化的代码,不在这。那么造成差异的一定在newTaskFor方法中。这个方法也就new了一个FutureTask而已,FutureTask实现RunnableFuture接口,RunnableFuture接口继承Runnable接口和Future接口。而Callable只是FutureTask的一个成员变量。
所以讲到这里,就有另一个Java基础知识点:Callable和Future的关系。我们一般用Callable编写任务代码,Future是异步返回对象,通过它的get方法,阻塞式地获取结果。FutureTask的核心代码就是实现了Future接口,也就是get方法的实现:
- public V get() throws InterruptedException, ExecutionException {
- int s = state;
- if (s <= COMPLETING)
- // 核心代码
- s = awaitDone(false, 0L);
- return report(s);
- }
- private int awaitDone(boolean timed, long nanos)
- throws InterruptedException {
- final long deadline = timed ? System.nanoTime() + nanos : 0L;
- WaitNode q = null;
- boolean queued = false;
- // 死循环
- for (;;) {
- if (Thread.interrupted()) {
- removeWaiter(q);
- throw new InterruptedException();
- }
- int s = state;
- // 只有任务的状态是’已完成‘,才会跳出死循环
- if (s > COMPLETING) {
- if (q != null)
- q.thread = null;
- return s;
- }
- else if (s == COMPLETING) // cannot time out yet
- Thread.yield();
- else if (q == null)
- q = new WaitNode();
- else if (!queued)
- queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
- q.next = waiters, q);
- else if (timed) {
- nanos = deadline - System.nanoTime();
- if (nanos <= 0L) {
- removeWaiter(q);
- return state;
- }
- LockSupport.parkNanos(this, nanos);
- }
- else
- LockSupport.park(this);
- }
- }
get的核心实现是有个awaitDone方法,这是一个死循环,只有任务的状态是“已完成”,才会跳出死循环;否则会依赖UNSAFE包下的LockSupport.park原语进行阻塞,等待LockSupport.unpark信号量。而这个信号量只有当运行结束获得结果、或者出现异常的情况下,才会发出来。分别对应方法set和setException。这就是异步执行、阻塞获取的原理,扯得有点远了。
回到最初我们的疑问,为什么submit之后,通过get方法可以获取到异常?原因是FutureTask有一个Object类型的outcome成员变量,用来记录执行结果。这个结果可以是传入的泛型,也可以是Throwable异常:
- public void run() {
- if (state != NEW ||
- !UNSAFE.compareAndSwapObject(this, runnerOffset,
- null, Thread.currentThread()))
- return;
- try {
- Callable
c = callable; - if (c != null && state == NEW) {
- V result;
- boolean ran;
- try {
- result = c.call();
- ran = true;
- } catch (Throwable ex) {
- result = null;
- ran = false;
- setException(ex);
- }
- if (ran)
- set(result);
- }
- } finally {
- // runner must be non-null until state is settled to
- // prevent concurrent calls to run()
- runner = null;
- // state must be re-read after nulling runner to prevent
- // leaked interrupts
- int s = state;
- if (s >= INTERRUPTING)
- handlePossibleCancellationInterrupt(s);
- }
- }
- // get方法中依赖的,报告执行结果
- private V report(int s) throws ExecutionException {
- Object x = outcome;
- if (s == NORMAL)
- return (V)x;
- if (s >= CANCELLED)
- throw new CancellationException();
- throw new ExecutionException((Throwable)x);
- }
FutureTask的另一个巧妙的地方就是借用RunnableAdapter内部类,将submit的Runnable封装成Callable。所以就算你submit的是Runnable,一样可以用get获取到异常。
答
答
一般来讲,线程池的生命周期跟随服务的生命周期。如果一个服务(Service)停止服务了,那么需要调用shutdown方法进行关闭。所以ExecutorService.shutdown在Java以及一些中间件的源码中,是封装在Service的shutdown方法内的。
如果是Server端不重启就不停止提供服务,我认为是不需要特殊处理的。
答
本来想分析一下两者的源码的,但是发现本文的篇幅已经过长了,源码也贴了不少。感兴趣的朋友自己看一下即可。
答
SimpleAsyncTaskExecutor | 每次请求新开线程,没有最大线程数设置.不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。 |
SyncTaskExecutor | 不是异步的线程。同步可以用SyncTaskExecutor,但这个可以说不算一个线程池,因为还在原线程执行。这个类没有实现异步调用,只是一个同步操作。 |
ConcurrentTaskExecutor | Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类。 |
SimpleThreadPoolTaskExecutor | 监听Spring’s lifecycle callbacks,并且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类。 |
这里我想着重强调的就是SimpleAsyncTaskExecutor,Spring中使用的@Async注解,底层就是基于SimpleAsyncTaskExecutor去执行任务,只不过它不是线程池,而是每次都新开一个线程。
另外想要强调的是Executor接口。Java初学者容易想当然的以为Executor结尾的类就是一个线程池,而上面的都是反例。我们可以在JDK的execute方法上看到这个注释:
- /**
- * Executes the given command at some time in the future. The command
- * may execute in a new thread, in a pooled thread, or in the calling
- * thread, at the discretion of the {@code Executor} implementation.
- */
所以,它的职责并不是提供一个线程池的接口,而是提供一个“将来执行命令”的接口。真正能代表线程池意义的,是ThreadPoolExecutor类,而不是Executor接口。
线程池初始化示例:
- private static final ThreadPoolExecutor pool;
- static {
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("po-detail-pool-%d").build();
- pool = new ThreadPoolExecutor(4, 8, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(512),
- threadFactory, new ThreadPoolExecutor.AbortPolicy());
- pool.allowCoreThreadTimeOut(true);
- }
网页题目:10问10答:你真的了解线程池吗?
网页链接:http://www.csdahua.cn/qtweb/news41/309241.html
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网