环球ug客户端:线程池续:你必须要知道的线程池submit()实现原理之FutureTask!

admin/2020-06-02/ 分类:科技/阅读:

前言

上一篇内容写了Java中线程池的实现原理及源码剖析,说好的是实着实在的大知足,想通过一篇文章让人人对线程池有个透彻的领会,然则文章写完总觉得还瑕玷什么?

上篇文章只提到线程提交的execute()方式,并没有解说线程提交的submit()方式,submit()有一个返回值,可以获取线程执行的效果Future<T>,这一讲就那深入学习下submit()FutureTask实现原理。

使用场景&示例

使用场景

我能想到的使用场景就是在并行盘算的时刻,例如一个方式中挪用methodA()、methodB(),我们可以通过线程池异步去提交方式A、B,然后在主线程中获取组装方式A、B盘算后的效果,能够大大提升方式的吞吐量。

使用示例

/** * @author wangmeng * @date 2020/5/28 15:30 */ public class FutureTaskTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newCachedThreadPool(); System.out.println("====执行FutureTask线程义务===="); Future<String> futureTask = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("FutureTask执行营业逻辑"); Thread.sleep(2000); System.out.println("FutureTask营业逻辑执行完毕!"); return "迎接关注: 一枝花算不算浪漫!"; } }); System.out.println("====执行主线程义务===="); Thread.sleep(1000); boolean flag = true; while(flag){ if(futureTask.isDone() && !futureTask.isCancelled()){ System.out.println("FutureTask异步义务执行效果:" futureTask.get()); flag = false; } } threadPool.shutdown(); } } 

上面的使用很简朴,submit()内部通报的实际上是个Callable接口,我们自己实现其中的call()方式,我们通过futureTask既可以获取到详细的返回值。

submit()实现原理

submit() 是也是提交义务到线程池,只是它可以获取义务返回效果,返回效果是通过FutureTask来实现的,先看下ThreadPoolExecutor中代码实现:

public class ThreadPoolExecutor extends AbstractExecutorService { public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } } public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } } 

提交义务照样执行execute()方式,只是task被包装成了FutureTask ,也就是在excute()中启动线程后会执行FutureTask.run()方式。

再来详细看下它执行的完整链路图:

上图可以看到,执行义务并返回执行效果的焦点逻辑着实FutureTask中,我们以FutureTask.run/get 两个方式为突破口,一点点剖析FutureTask的实现原理。

FutureTask源码初探

先看下FutureTask中部分属性:

public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; } 
  1. state

当前task状态,共有7中类型。
NEW: 当前义务尚未执行
COMPLETING: 当前义务正在竣事,尚未完全竣事,一种临界状态
NORMAL:当前义务正常竣事
EXCEPTIONAL: 当前义务执行历程中发生了异常。
CANCELLED: 当前义务被作废
INTERRUPTING: 当前义务中止中..
INTERRUPTED: 当前义务已中止

  1. callble

用户提交义务通报的Callable,自定义call方式,实现营业逻辑

  1. outcome

义务竣事时,outcome保留执行效果或者异常信息。

  1. runner

当前义务被线程执行时代,保留当前义务的线程工具引用

  1. waiters

由于会有许多线程去get当前义务的效果,以是这里使用了一种stack数据结构来保留

FutureTask.run()实现原理

我们已经知道在线程池runWorker()中最终会挪用到FutureTask.run()方式中,我们就来看下它的执行原理吧:

详细代码如下:

public class FutureTask<V> implements RunnableFuture<V> { public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } } 

首先是判断FutureTaskstate状态,必须是NEW才可以继续执行。

然后通过CAS修改runner引用为当前线程。

接着执行用户自定义的call()方式,将返回效果设置到result中,result可能为正常返回也可能为异常信息。这里主要是挪用set()/setException()

FutureTask.set()实现原理

set()方式的实现很简朴,直接看下代码:

public class FutureTask<V> implements RunnableFuture<V> { protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } } } 

call()返回的数据赋值给全局变量outcome上,然后修改state状态为NORMAL,最后挪用finishCompletion()来做挂起线程的叫醒操作,这个方式等到get()后面再来解说。

FutureTask.get()实现原理

接着看下代码:

public class FutureTask<V> implements RunnableFuture<V> { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } } 

若是FutureTaskstateNORMAL或者COMPLETING,说明当前义务并没有执行完成,挪用get()方式会被壅闭,详细的壅闭逻辑在awaitDone()方式:

private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } 

这个方式可以说是FutureTask中最焦点的方式了,一步步来剖析:

若是timed不为空,这说明指定nanos时间还未返回效果,线程就会退出。

q是一个WaitNode工具,是将当前引用线程封装在一个stack数据结构中,WaitNode工具属性如下:

 static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } 

接着判断当前线程是否中止,若是中止则抛出中止异常。

下面就进入一轮轮的if... else if...判断逻辑,我们照样接纳分支的方式去剖析。

分支一:if (s > COMPLETING) {

此时get()方式已经有效果了,无论是正常返回的效果,照样异常、中止、作废等,此时直接返回state状态,然后执行report()方式。

分支二:else if (s == COMPLETING)

条件建立,说明当前义务靠近完成状态,这里让当前线程再释放cpu,举行下一轮抢占cpu

分支三:else if (q == null)

第一次自旋执行,WaitNode还没有初始化,初始化q=new WaitNode();

分支四:else if (!queued){

queued代表当前线程是否入栈,若是没有入栈则举行入栈操作,顺便将全局变量waiters指向栈顶元素。

分支五/六:LockSupport.park

若是设置了超时时间,则使用parkNanos来挂起当前线程,否则使用park()

经由这么一轮自旋循环后,若是执行call()还没有返回效果,那么挪用get()方式的线程都市被挂起。

被挂起的线程会守候run()返回效果后依次叫醒,详细的执行逻辑在finishCompletion()中。

最终stack结构中数据如下:

FutureTask.finishCompletion()实现原理

详细实现代码如下:

private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } done(); callable = null; } 

代码实现很简朴,看过get()方式后,我们知道所有挪用get()方式的线程,在run()还没有返回效果前,都市保留到一个有WaitNode组成的statck数据结构中,而且每个线程都市被挂起。

这里是遍历waiters栈顶元素,然后依次查询起next节点举行叫醒,叫醒后的节点接着会往后挪用report()方式。

FutureTask.report()实现原理

详细代码如下:

private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } 

这个方式很简朴,由于执行到了这里,说明当前state状态一定大于COMPLETING,判断若是是正常返回,那么返回outcome数据。

若是state是作废状态,抛出CancellationException异常。

若是状态都不知足,则说明执行中泛起了差错,直接抛出ExecutionException

FutureTask.cancel()实现原理

public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } 

cancel()方式的逻辑很简朴,就是修改state状态为CANCELLED,然后挪用finishCompletion()来叫醒守候的线程。

这里若是mayInterruptIfRunning,就会先中止当前线程,然后再去叫醒守候的线程。

总结

FutureTask的实现原理实在很简朴,每个方式基本上都画了一个简朴的流程图来利便立刻。

后面还计划分享一个BlockingQueue相关的源码解读,这样线程池也可以算是完结了。

在这之前可能会先分享一个SpringCloud常见设置代码剖析、最佳实践等手册,利便工作中使用,也是对之前看过的源码一种总结。敬请期待!
迎接关注:

,

Allbet

www.520super.com欢迎进入欧搏平台(Allbet Game),欧搏平台开放欧搏(Allbet)开户、欧搏(Allbet)代理开户、欧搏(Allbet)电脑客户端、欧搏(Allbet)APP下载等业务。

TAG:
阅读:
广告 330*360
广告 330*360

热门文章

HOT NEWS
  • 周榜
  • 月榜
Sunbet_进入申博sunbet官网
微信二维码扫一扫
关注微信公众号
新闻自媒体 Copyright © 2002-2019 Sunbet 版权所有
二维码
意见反馈 二维码