什么?对你没有听错,也没有看错 ..多线程并发执行任务,取结果归集~~ 不再忧愁….
引言
先来看一些APP的获取数据,诸如此类,一个页面获取N多个,多达10个左右的一个用户行为数据,比如:点赞数,发布文章数,点赞数,消息数,关注数,收藏数,粉丝数,卡券数,红包数……….. 真的是多~ 我们看些图:
平时要10+接口的去获取数据(因为当你10+个查询写一起,那估计到半分钟才能响应了),一个页面上N多接口,真是累死前端的宝宝了,前端开启多线程也累啊,我们做后端的要体量一下前端的宝宝们,毕竟有句话叫"程序员何苦为难程序员~"
今天我们也可以一个接口将这些数据返回~ 还贼TM快,解决串行编程,阻塞编程带来的苦恼~
多线程并发执行任务,取结果归集
今天猪脚就是:Future、FutureTask、ExecutorService…
用上FutureTask任务获取结果老少皆宜,就是CPU有消耗。FutureTask也可以做闭锁(实现了Future的语义,表示一种抽象的可计算的结果)。通过把Callable(相当于一个可生成结果的Runnable)作为一个属性,进而把它自己作为一个执行器去继承Runnable,FutureTask 实际上就是一个支持取消行为的异步任务执行器。
Callable就是一个回调接口,可以泛型声明返回类型,而Runnable是线程去执行的方法.这个很简单~大家想深入了解就进去看源码好了~ 因为真的很简单~
FutureTask实现了Future,提供了start, cancel, query等功能,并且实现了Runnable接口,可以提交给线程执行。
Java并发工具类的三板斧 状态,队列,CAS
状态
/***Therunstateofthistask,initiallyNEW.Therunstate*transitionstoaterminalstateonlyinmethodsset,*setException,andcancel.Duringcompletion,statemaytakeon*transientvaluesofCOMPLETING(whileoutcomeisbeingset)or*INTERRUPTING(onlywhileinterruptingtherunnertosatisfya*cancel(true)).Transitionsfromtheseintermediatetofinal*statesusecheaperordered/lazywritesbecausevaluesareunique*andcannotbefurthermodified.**Possiblestatetransitions://可能发生的状态过度过程*NEW->COMPLETING->NORMAL//创建-->完成-->正常*NEW->COMPLETING->EXCEPTIONAL//创建-->完成-->异常*NEW->CANCELLED//创建-->取消*NEW->INTERRUPTING->INTERRUPTED//创建-->中断中-->中断结束*/privatevolatileintstate;//执行器状态privatestaticfinalintNEW=0;//初始值由构造函数保证privatestaticfinalintCOMPLETING=1;//完成进行时正在设置任务结果privatestaticfinalintNORMAL=2;//正常结束任务正常执行完毕privatestaticfinalintEXCEPTIONAL=3;//发生异常任务执行过程中发生异常privatestaticfinalintCANCELLED=4;//已经取消任务已经取消privatestaticfinalintINTERRUPTING=5;//中断进行时正在中断运行任务的线程privatestaticfinalintINTERRUPTED=6;//中断结束任务被中断/**Theunderlyingcallable;nulledoutafterrunning*/privateCallable<V>callable;/**Theresulttoreturnorexceptiontothrowfromget()*/privateObjectoutcome;//non-volatile,protectedbystatereads/writes/**Thethreadrunningthecallable;CASedduringrun()*/privatevolatileThreadrunner;/**Treiberstackofwaitingthreads*/privatevolatileWaitNodewaiters;
还不明白就看图:
publicinterfaceFuture<T>{/***取消任务*@parammayInterruptIfRunning*是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务*如果任务正在执行,则返回true*如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,返回true*如果任务已经完成,则无论mayInterruptIfRunning为true还是false,返回false*/booleancancel(booleanmayInterruptIfRunning);/***任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true*/booleanisCancelled();/***任务是否完成*/booleanisDone();/***通过阻塞获取执行结果*/Tget()throwsInterruptedException,ExecutionException;/***通过阻塞获取执行结果。如果在指定的时间内没有返回,则返回null*/Tget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}
Future
cancle 可以停止任务的执行 但不一定成功 看返回值true or false
get 阻塞获取callable的任务结果,即get阻塞住调用线程,直至计算完成返回结果
isCancelled 是否取消成功
isDone 是否完成
重点说明:
Furture.get()获取执行结果的值,取决于执行的状态,如果任务完成,会立即返回结果,否则一直阻塞直到任务进入完成状态,然后返回结果或者抛出异常。
“运行完成”表示计算的所有可能结束的状态,包含正常结束,由于取消而结束和由于异常而结束。当进入完成状态,他会停止在这个状态上,只要state不处于 NEW 状态,就说明任务已经执行完毕。
FutureTask负责将计算结果从执行任务的线程传递到调用这个线程的线程,而且确保了,传递过程中结果的安全发布
UNSAFE 无锁编程技术,确保了线程的安全性~ 为了保持无锁编程CPU的消耗,所以用状态标记,减少空转的时候CPU的压力
任务本尊:callable
任务的执行者:runner
任务的结果:outcome
获取任务的结果:state + outcome + waiters
中断或者取消任务:state + runner + waiters
run方法
1、检查state,非NEW,说明已经启动,直接返回;否则,设置runner为当前线程,成功则继续,否则,返回。
2、调用Callable.call()方法执行任务,成功则调用set(result)方法,失败则调用setException(ex)方法,最终都会设置state,并调用finishCompletion()方法,唤醒阻塞在get()方法上的线程们。
3、如注释所示,如果省略ran变量,并把"set(result);" 语句移动到try代码块"ran = true;" 语句处,会怎样呢?首先,从代码逻辑上看,是没有问题的,但是,考虑到"set(result);"方法万一抛出异常甚至是错误了呢?set()方法最终会调用到用户自定义的done()方法,所以,不可省略。
4、如果state为INTERRUPTING, 则主动让出CPU,自旋等待别的线程执行完中断流程。见handlePossibleCancellationInterrupt(int s) 方法。
publicvoidrun(){//UNSAFE.compareAndSwapObject,CAS保证Callable任务只被执行一次无锁编程if(state!=NEW||!UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))return;try{Callable<V>c=callable;//拿到执行任务if(c!=null&&state==NEW){//任务不为空,并且执行器状态是初始值,才会执行;如果取消就不执行了Vresult;booleanran;//记录是否执行成功try{result=c.call();//执行任务ran=true;//成功}catch(Throwableex){result=null;//异常,清空结果ran=false;//失败setException(ex);//记录异常}if(ran)//问题:ran变量可以省略吗,把set(result);移到try块里面?set(result);//设置结果}}finally{runner=null;//直到set状态前,runner一直都是非空的,为了防止并发调用run()方法。ints=state;if(s>=INTERRUPTING)//有别的线程要中断当前线程,把CPU让出去,自旋等一下handlePossibleCancellationInterrupt(s);}}
privatevoidhandlePossibleCancellationInterrupt(ints){if(s==INTERRUPTING)//当state为INTERRUPTING时while(state==INTERRUPTING)//表示有线程正在中断当前线程Thread.yield();//让出CPU,自旋等待中断}
再啰嗦下: run方法重点做了以下几件事:
将runner属性设置成当前正在执行run方法的线程
调用callable成员变量的call方法来执行任务
设置执行结果outcome, 如果执行成功, 则outcome保存的就是执行结果;如果执行过程中发生了异常, 则outcome中保存的就是异常,设置结果之前,先将state状态设为中间态
对outcome的赋值完成后,设置state状态为终止态(NORMAL或者EXCEPTIONAL)
唤醒Treiber栈中所有等待的线程
善后清理(waiters, callable,runner设为null)
检查是否有遗漏的中断,如果有,等待中断状态完成。
怎么能少了get方法呢,一直阻塞获取参见:awaitDone
publicVget()throwsInterruptedException,ExecutionException{ints=state;//执行器状态if(s<=COMPLETING)//如果状态小于等于COMPLETING,说明任务正在执行,需要等待s=awaitDone(false,0L);//等待returnreport(s);//报告结果}
顺便偷偷看下get(long, TimeUnit),就是get的方法扩展,增加了超时时间,超时后我还没拿到就生气抛异常….
publicVget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException{if(unit==null)//参数校验thrownewNullPointerException();ints=state;//执行器状态if(s<=COMPLETING&&(s=awaitDone(true,unit.toNanos(timeout)))<=COMPLETING)//如果状态小于等于COMPLETING,说明任务正在执行,需要等待;等待指定时间,state依然小于等于COMPLETINGthrownewTimeoutException();//抛出超时异常returnreport(s);//报告结果}
那么再看awaitDone,要知道会写死循环while(true)|for (;;)的都是高手~
privateintawaitDone(booleantimed,longnanos)throwsInterruptedException{finallongdeadline=timed?System.nanoTime()+nanos:0L;//计算deadlineWaitNodeq=null;//等待结点booleanqueued=false;//是否已经入队for(;;){if(Thread.interrupted()){//如果当前线程已经标记中断,则直接移除此结点,并抛出中断异常removeWaiter(q);thrownewInterruptedException();}ints=state;//执行器状态if(s>COMPLETING){//如果状态大于COMPLETING,说明任务已经完成,或者已经取消,直接返回if(q!=null)q.thread=null;//复位线程属性returns;//返回}elseif(s==COMPLETING)//如果状态等于COMPLETING,说明正在整理结果,自旋等待一会儿Thread.yield();elseif(q==null)//初始,构建结点q=newWaitNode();elseif(!queued)//还没入队,则CAS入队queued=UNSAFE.compareAndSwapObject(this,waitersOffset,q.next=waiters,q);elseif(timed){//是否允许超时nanos=deadline-System.nanoTime();//计算等待时间if(nanos<=0L){//超时removeWaiter(q);//移除结点returnstate;//返回结果}LockSupport.parkNanos(this,nanos);//线程阻塞指定时间}elseLockSupport.park(this);//阻塞线程}}
至此,线程安排任务和获取我就不啰嗦了~~~~还要很多探索的,毕竟带薪聊天比较紧张,我就不多赘述了~
队列
接着我们来看队列,在FutureTask中,队列的实现是一个单向链表,它表示所有等待任务执行完毕的线程的集合。我们知道,FutureTask实现了Future接口,可以获取“Task”的执行结果,那么如果获取结果时,任务还没有执行完毕怎么办呢?那么获取结果的线程就会在一个等待队列中挂起,直到任务执行完毕被唤醒。这一点有点类似于AQS中的sync queue,在下文的分析中,大家可以自己对照它们的异同点。
我们前面说过,在并发编程中使用队列通常是将当前线程包装成某种类型的数据结构扔到等待队列中,我们先来看看队列中的每一个节点是怎么个结构:
staticfinalclassWaitNode{volatileThreadthread;volatileWaitNodenext;WaitNode(){thread=Thread.currentThread();}}
可见,相比于AQS的sync queue所使用的双向链表中的Node,这个WaitNode要简单多了,它只包含了一个记录线程的thread属性和指向下一个节点的next属性。
值得一提的是,FutureTask中的这个单向链表是当做栈来使用的,确切来说是当做Treiber栈来使用的,不了解Treiber栈是个啥的可以简单的把它当做是一个线程安全的栈,它使用CAS来完成入栈出栈操作(想进一步了解的话可以看这篇文章)。
为啥要使用一个线程安全的栈呢,因为同一时刻可能有多个线程都在获取任务的执行结果,如果任务还在执行过程中,则这些线程就要被包装成WaitNode扔到Treiber栈的栈顶,即完成入栈操作,这样就有可能出现多个线程同时入栈的情况,因此需要使用CAS操作保证入栈的线程安全,对于出栈的情况也是同理。
由于FutureTask中的队列本质上是一个Treiber(驱动)栈,那么使用这个队列就只需要一个指向栈顶节点的指针就行了,在FutureTask中,就是waiters属性:
/**Treiberstackofwaitingthreads*/privatevolatileWaitNodewaiters;
事实上,它就是整个单向链表的头节点。
综上,FutureTask中所使用的队列的结构如下:
CAS操作
CAS操作大多数是用来改变状态的,在FutureTask中也不例外。我们一般在静态代码块中初始化需要CAS操作的属性的偏移量:
//Unsafemechanicsprivatestaticfinalsun.misc.UnsafeUNSAFE;privatestaticfinallongstateOffset;privatestaticfinallongrunnerOffset;privatestaticfinallongwaitersOffset;static{try{UNSAFE=sun.misc.Unsafe.getUnsafe();Class<?>k=FutureTask.class;stateOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("state"));runnerOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));waitersOffset=UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));}catch(Exceptione){thrownewError(e);}}
从这个静态代码块中我们也可以看出,CAS操作主要针对3个属性,包括state、runner和waiters,说明这3个属性基本是会被多个线程同时访问的。其中state属性代表了任务的状态,waiters属性代表了指向栈顶节点的指针,这两个我们上面已经分析过了。
runner属性代表了执行FutureTask中的“Task”的线程。为什么需要一个属性来记录执行任务的线程呢?这是为了中断或者取消任务做准备的,只有知道了执行任务的线程是谁,我们才能去中断它。
定义完属性的偏移量之后,接下来就是CAS操作本身了。在FutureTask,CAS操作最终调用的还是Unsafe类的compareAndSwapXXX方法,关于Unsafe,由于带薪码文这里不再赘述。
实战演练
一切没有例子的讲解都是耍流氓 >>> 葱姜切沫~~加入生命的源泉….
实战项目以springboot为项目脚手架,github地址:
https://github.com/javastacks/spring-boot-best-practice
1.MyFutureTask实现类
内部定义一个线程池进行任务的调度和线程的管理以及线程的复用,大家可以根据自己的实际项目情况进行配置
其中线程调度示例:核心线程 8 最大线程 20 保活时间30s 存储队列 10 有守护线程 拒绝策略:将超负荷任务回退到调用者
说明 :
默认使用核心线程(8)数执行任务,任务数量超过核心线程数就丢到队列,队列(10)满了就再开启新的线程,新的线程数最大为20,当任务执行完,新开启的线程将存活30s,若没有任务就消亡,线程池回到核心线程数量.
publicinterfaceFuture<T>{/***取消任务*@parammayInterruptIfRunning*是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务*如果任务正在执行,则返回true*如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,返回true*如果任务已经完成,则无论mayInterruptIfRunning为true还是false,返回false*/booleancancel(booleanmayInterruptIfRunning);/***任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true*/booleanisCancelled();/***任务是否完成*/booleanisDone();/***通过阻塞获取执行结果*/Tget()throwsInterruptedException,ExecutionException;/***通过阻塞获取执行结果。如果在指定的时间内没有返回,则返回null*/Tget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}0
2.service业务方法
常规业务查询方法,为了特效,以及看出实际的效果,我们每个方法做了延时
publicinterfaceFuture<T>{/***取消任务*@parammayInterruptIfRunning*是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务*如果任务正在执行,则返回true*如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,返回true*如果任务已经完成,则无论mayInterruptIfRunning为true还是false,返回false*/booleancancel(booleanmayInterruptIfRunning);/***任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true*/booleanisCancelled();/***任务是否完成*/booleanisDone();/***通过阻塞获取执行结果*/Tget()throwsInterruptedException,ExecutionException;/***通过阻塞获取执行结果。如果在指定的时间内没有返回,则返回null*/Tget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}1
3.controller调用
publicinterfaceFuture<T>{/***取消任务*@parammayInterruptIfRunning*是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务*如果任务正在执行,则返回true*如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,返回true*如果任务已经完成,则无论mayInterruptIfRunning为true还是false,返回false*/booleancancel(booleanmayInterruptIfRunning);/***任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true*/booleanisCancelled();/***任务是否完成*/booleanisDone();/***通过阻塞获取执行结果*/Tget()throwsInterruptedException,ExecutionException;/***通过阻塞获取执行结果。如果在指定的时间内没有返回,则返回null*/Tget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}2
我们启动项目:开启调用 http://localhost:8080/api/user/get/data?userId=4
当我们线程池配置为:核心线程 8 最大线程 20 保活时间30s 存储队列 10 的时候,我们测试的结果如下:
结果:我们看到每个server method的执行线程都是从线程池中发起的线程名:User_Async_FutureTask-%d, 总耗时从累计的52秒缩短到10秒,即取决于最耗时的方法查询时间.
那我们再将注释代码放开,进行串行查询进行测试:
结果:我们使用串行的方式进行查询,结果汇总将达到52秒,那太可怕了~~
总结
使用FutureTask的时候,就是将任务runner以caller的方式进行回调,阻塞获取,最后我们将结果汇总,即完成了开启多线程异步调用我们的业务方法.
publicinterfaceFuture<T>{/***取消任务*@parammayInterruptIfRunning*是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务*如果任务正在执行,则返回true*如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,返回true*如果任务已经完成,则无论mayInterruptIfRunning为true还是false,返回false*/booleancancel(booleanmayInterruptIfRunning);/***任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true*/booleanisCancelled();/***任务是否完成*/booleanisDone();/***通过阻塞获取执行结果*/Tget()throwsInterruptedException,ExecutionException;/***通过阻塞获取执行结果。如果在指定的时间内没有返回,则返回null*/Tget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}3
这里使用的只是一个简单的例子,具体项目可以定义具体的业务方法进行归并处理,其实在JDK1.8以后,又有了ExecutorCompletionService,ForkJoinTask,CompletableFuture这些都可以实现上述的方法,我们后续会做一些这些方法使用的案例,期望大家的关注,文章中有不足之处,欢迎指正~
小甜点
所以:我们要用到亲爱的Spring的异步编程,异步编程有很多种方式:比如常见的Future的sync,CompletableFuture.supplyAsync,@Async,哈哈 其实都离不开Thread.start()…,等等我说个笑话:
老爸有俩孩子:小红和小明。老爸想喝酒了,他让小红去买酒,小红出去了。然后老爸突然想吸烟了,于是老爸让小明去买烟。在面对对象的思想中,一般会把买东西,然后买回来这件事作为一个方法,如果按照顺序结构或者使用多线程同步的话,小明想去买烟就必须等小红这个买东西的操作进行完。这样无疑增加了时间的开销(万一老爸尿憋呢?)。异步就是为了解决这样的问题。你可以分别给小红小明下达指令,让他们去买东西,然后你就可以自己做自己的事,等他们买回来的时候接收结果就可以了。
publicinterfaceFuture<T>{/***取消任务*@parammayInterruptIfRunning*是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务*如果任务正在执行,则返回true*如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,返回true*如果任务已经完成,则无论mayInterruptIfRunning为true还是false,返回false*/booleancancel(booleanmayInterruptIfRunning);/***任务是否被取消成功,如果在任务正常完成前被取消成功,则返回true*/booleanisCancelled();/***任务是否完成*/booleanisDone();/***通过阻塞获取执行结果*/Tget()throwsInterruptedException,ExecutionException;/***通过阻塞获取执行结果。如果在指定的时间内没有返回,则返回null*/Tget(longtimeout,TimeUnitunit)throwsInterruptedException,ExecutionException,TimeoutException;}4
运行结果:
链接:juejin.im/post/5d3c46d2f265da1b9163dbce