最近碰到的异步处理的操作比较多,异步就是不等想要的结果返回执行接下来的其他操作,等异步结果返回后直接调用已经注册好的处理方法完成后续操作。异步的思想是非常棒的,相比轮询的方式而言,异步的实现方式无疑是高效并且优雅的。本文介绍了包括Future,AIO和有点类似于单机版的Map-Reduce的fork/join框架。
Guava ListenableFuture
使用JDK提供的线程池ExcuteService的execute(Runable runable)方法来执行不需要返回结果的线程任务,而使用submit(Callable
顾名思义,ListenableFuture是可监听的Future,可以在结果返回的时候以方法回调的方式实现异步的后续操作。那么如何获取ListenableFuture呢,方法有两种:
- 将jdk提供的Futhure转换成ListenableFuture
- 将ExcutorService线程池转换成ListeningExcutorService,继而获取ListenableFuture
第一种方式我们使用如下适配器获得Future对应的ListenableFuture
第二种方式我们使用一个线程池的修饰类获得
在获取了ListenableFuture之后,我们同样有两种方式异步获取线程执行结果
- 添加FutureCallback执行回调方法
- 为ListenableFuture添加监听线程
第一种方法使用Futures的addCallback方法实现
第二种方法不太推荐,使用listenableFuture.addListener的方法实现。
源码解读
我们看到guava的实现方式非常优雅,那么它是怎么实现这种异步回调的呢,以JdkFutureAdapters.listenInPoolThread为例,其实他是返回了一个ListenableFutureAdapter的内部类,它实现了ListenableFuture并且继承了ForwardingFuture类,然后调用callback的时候会新建一个Runable线程任务,其主要逻辑是使用Future的get方法阻塞获取执行结果,在结果完成的时候回到callback类的onSuccess方法,如果出现异常则调用onFailure方法。
整个方法的设计使用了适配器模式,ListenableFuture是最终用户需要的接口,ListenableFutureAdapter是适配器,ForwordingFuture实现了Future接口,是被适配者。
关于FutureTask
FutureTask是Future的一个实现类,它同时实现了Runable接口,直接使用线程池运行FutureTask,并且获取阻塞获取结果,其在异步方面并没有做出改变。
本节代码
所有测试代码获取可以点击这里
参考文献:guava并发,深入学习 FutureTask
Java AIO中的Future和CompleteHandler
Socket通信是Java网络通信的一种方式,在基础的阻塞BIO后出现了NIO,NIO采用了多路复用思想,使得一个线程可以监听多个socket文件描述符,使得在在一次轮训的过程中可以查看多个阻塞IO的状态,相比BIO每次只能监听一个IO状态,如果这个IO长期处于阻塞状态,那么其他IO操作如果准备好也无法执行。
BIO的另一个问题是在于对于每个准备好的IO操作必须分配一个线程进行实际的阻塞IO操作,这使得系统的线程数和已准备好的IO操作成线性关系。NIO的优势在于它可以对多个已准备好的IO阻塞操作做打包操作,做线程数的缩减。
NIO对数据的操作是面向缓冲区的,而BIO是面向数据流的,NIO中面向缓冲区的IO操作是讲所有的数据一次读入到缓冲区内,进而做操作,这相比BIO的面向数据流的IO每次只能读入一个或多个字节的方式,这种方式更加快捷。
NIO本质其实还是采用轮询的方式去获取已准备好的IO操作,实际的读写IO操作仍然是阻塞的,Java AIO是对NIO的又一次改进,其真正实现了异步的IO操作,包括获取准备好的操作,读写操作都是异步的,其执行都会直接返回一个Future对象,我们可以使用其get方法阻塞接下来的执行。
Java AIO主要用到的类有:
- AsynchronousSocketChannel
- AsynchronousServerSocketChannel
AsynchronousSocketChannel的connet方法为其准备io操作,write和read方法为其实际的IO操作,同样的AsynchronousServerSocketChannel的accept方法为其准备io操作,读写操作是和AsynchronousSocketChannel一样的。
以上方法都是异步的,也就是说方法会立即返回,获取异步执行结果的方法有两种:
- 获取Future结果,使用Future的get方法获取执行结果
- 使用CompletionHandler的回调方法在结果返回的时候获取结果
以AsynchronousServerSocketChannel为例,其connet方法有两个重载方式:
- accept()无参方式返回一个Future
,可以使用get方法获取连接的client。 - accept(A attach,CompletionHandler
),无返回值,在接收到client连接的时候执行CompletionHandler中的completed回调方法。
具体的,我们使用一个Echo的server/client的程序来测试Java AIO,其Server端实现如下
客户端的实现方式
需要注意的有两点,不论是client还是server,其write和read都是非阻塞的,模拟的情况下我们需要阻塞等待其结果返回;二是ByteBuffer.wrap方法之后不用flip操作,其返回的结果已经是flip过的,作者在这里踩了坑。
单机版Map-Reduce fork/join框架
fork/join框架类似于单机版的Map-Reduce,如果一个计算任务数据量比较大,就将任务分解交给不同的线程去处理,然后最后汇总结果,这种计算模型对于多核处理器来说具有极大的优势。使用fork/join框架的基本过程如下。
实现一个类继承自RecursiveTask(或者无返回值的RecursiveAction)
- 实现compute方法,将任务拆分成多个自实现的RecursiveTask
- invokeAll(所有自实现的RecursiveTask);
- 使用自实现的RecursiveTask的join方法获取执行结果
- 汇总返回结果并返回
使用ForkJoinPool的invoke方法调用上一步自定义的计算任务
下例实现了一个计算数组中所有数和的任务
|
|
值得一提的是fork/join框架是jdk8的stream实现的计算模型,如果想要深入了解stream的实现原理,可以参考这篇文章Java8 Stream原理深度解析。