Java中的异步执行Future小结

最近碰到的异步处理的操作比较多,异步就是不等想要的结果返回执行接下来的其他操作,等异步结果返回后直接调用已经注册好的处理方法完成后续操作。异步的思想是非常棒的,相比轮询的方式而言,异步的实现方式无疑是高效并且优雅的。本文介绍了包括Future,AIO和有点类似于单机版的Map-Reduce的fork/join框架。

Guava ListenableFuture

使用JDK提供的线程池ExcuteService的execute(Runable runable)方法来执行不需要返回结果的线程任务,而使用submit(Callable callable)方法需要线程任务返回T类型的执行结果,方法返回Future对象,使用Future的get方法可以获取执行结果,而在执行get方法在线程返回结果之前是阻塞的,jdk这对于想要异步的处理结果没有提供相应的接口,guava的ListenableFuture接口就是为实现异步的获取Future中的结果而出现的。
顾名思义,ListenableFuture是可监听的Future,可以在结果返回的时候以方法回调的方式实现异步的后续操作。那么如何获取ListenableFuture呢,方法有两种:

  1. 将jdk提供的Futhure转换成ListenableFuture
  2. 将ExcutorService线程池转换成ListeningExcutorService,继而获取ListenableFuture

第一种方式我们使用如下适配器获得Future对应的ListenableFuture

1
ListenableFuture<String> listenableFuture=JdkFutureAdapters.listenInPoolThread(future);

第二种方式我们使用一个线程池的修饰类获得

1
2
ListeningExecutorService listeningThreadPool=MoreExecutors.listeningDecorator(threadPool);
ListenableFuture<String> listenableFuture=listeningThreadPool.submit(new Callable<String>() {....}

在获取了ListenableFuture之后,我们同样有两种方式异步获取线程执行结果

  1. 添加FutureCallback执行回调方法
  2. 为ListenableFuture添加监听线程

第一种方法使用Futures的addCallback方法实现

1
2
3
4
5
6
7
8
9
10
11
12
Futures.addCallback(listenableFuture,new FutureCallback<String>(){
@Override
public void onSuccess(String result) {
System.out.println(result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});

第二种方法不太推荐,使用listenableFuture.addListener的方法实现。

源码解读

我们看到guava的实现方式非常优雅,那么它是怎么实现这种异步回调的呢,以JdkFutureAdapters.listenInPoolThread为例,其实他是返回了一个ListenableFutureAdapter的内部类,它实现了ListenableFuture并且继承了ForwardingFuture类,然后调用callback的时候会新建一个Runable线程任务,其主要逻辑是使用Future的get方法阻塞获取执行结果,在结果完成的时候回到callback类的onSuccess方法,如果出现异常则调用onFailure方法。
整个方法的设计使用了适配器模式,ListenableFuture是最终用户需要的接口,ListenableFutureAdapter是适配器,ForwordingFuture实现了Future接口,是被适配者。

关于FutureTask

FutureTask是Future的一个实现类,它同时实现了Runable接口,直接使用线程池运行FutureTask,并且获取阻塞获取结果,其在异步方面并没有做出改变。

本节代码

所有测试代码获取可以点击这里
参考文献:guava并发深入学习 FutureTask

Java AIO中的Future和CompleteHandler

Socket通信是Java网络通信的一种方式,在基础的阻塞BIO后出现了NIO,NIO采用了多路复用思想,使得一个线程可以监听多个socket文件描述符,使得在在一次轮训的过程中可以查看多个阻塞IO的状态,相比BIO每次只能监听一个IO状态,如果这个IO长期处于阻塞状态,那么其他IO操作如果准备好也无法执行。
BIO的另一个问题是在于对于每个准备好的IO操作必须分配一个线程进行实际的阻塞IO操作,这使得系统的线程数和已准备好的IO操作成线性关系。NIO的优势在于它可以对多个已准备好的IO阻塞操作做打包操作,做线程数的缩减。
NIO对数据的操作是面向缓冲区的,而BIO是面向数据流的,NIO中面向缓冲区的IO操作是讲所有的数据一次读入到缓冲区内,进而做操作,这相比BIO的面向数据流的IO每次只能读入一个或多个字节的方式,这种方式更加快捷。
NIO本质其实还是采用轮询的方式去获取已准备好的IO操作,实际的读写IO操作仍然是阻塞的,Java AIO是对NIO的又一次改进,其真正实现了异步的IO操作,包括获取准备好的操作,读写操作都是异步的,其执行都会直接返回一个Future对象,我们可以使用其get方法阻塞接下来的执行。
Java AIO主要用到的类有:

  1. AsynchronousSocketChannel
  2. AsynchronousServerSocketChannel

AsynchronousSocketChannel的connet方法为其准备io操作,write和read方法为其实际的IO操作,同样的AsynchronousServerSocketChannel的accept方法为其准备io操作,读写操作是和AsynchronousSocketChannel一样的。
以上方法都是异步的,也就是说方法会立即返回,获取异步执行结果的方法有两种:

  1. 获取Future结果,使用Future的get方法获取执行结果
  2. 使用CompletionHandler的回调方法在结果返回的时候获取结果

以AsynchronousServerSocketChannel为例,其connet方法有两个重载方式:

  1. accept()无参方式返回一个Future,可以使用get方法获取连接的client。
  2. accept(A attach,CompletionHandler),无返回值,在接收到client连接的时候执行CompletionHandler中的completed回调方法。

具体的,我们使用一个Echo的server/client的程序来测试Java AIO,其Server端实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class AioEchoServer {
private int PORT = 9955;
private String IP = "127.0.0.1";
private AsynchronousServerSocketChannel server;
public AioEchoServer() {
try {
server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(IP, PORT));
} catch (IOException e) {
e.printStackTrace();
}
}
public void startServer() {
//attachment参数可以被CompletionHandler接收
server.accept("attachment", new CompletionHandler<AsynchronousSocketChannel, String>() {
@Override
public void completed(AsynchronousSocketChannel client, String attachment) {
//获取了对应的客户端socket
try {
System.out.println("attachment: " + attachment);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//这里是非阻塞读取,需要使用get等待客户端返回结果
int readResult = client.read(byteBuffer).get();
System.out.println("读入数据量:" + readResult);
byteBuffer.flip();
System.out.println("Get from client:" + (new String(byteBuffer.array())));
int writeResult = client.write(byteBuffer).get();
if (writeResult > 0) {
System.out.println(client.getRemoteAddress() + ": " + "response success! write length: " + writeResult);
} else {
System.out.println("write length <0");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
server.accept(null,this);
}
}
@Override
public void failed(Throwable exc, String attachment) {
exc.printStackTrace();
}
});
}
public static void main(String[] args) {
AioEchoServer aioEchoServer = new AioEchoServer();
aioEchoServer.startServer();
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
}
}

客户端的实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class AioClient {
public static void main(String[] args) {
String IP = "127.0.0.1";
int PORT = 9955;
final AsynchronousSocketChannel client;
try {
client = AsynchronousSocketChannel.open();
SocketAddress serverSocketAddress = new InetSocketAddress(IP, PORT);
client.connect(serverSocketAddress, "clientAttachment", new CompletionHandler<Void, String>() {
@Override
public void completed(Void result, String attachment) {
System.out.println(attachment);
try {
ByteBuffer byteBuffer = ByteBuffer.wrap("hello".getBytes());
//wrap后不用byteBuffer.flip(),此时position=0,limit是之前position的位置
//read后的postion是写入的界限,limit=capbility,flip做的事情:limit=poition,position=0.
//须阻塞写入
int writeResult = client.write(byteBuffer).get();
System.out.println("写入Byte数:" + writeResult);
//须阻塞读取
byteBuffer.clear();
int readResult = client.read(byteBuffer).get();
byteBuffer.flip();
System.out.println("server response: " + (new String(byteBuffer.array())));
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, String attachment) {
exc.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
Scanner scanner=new Scanner(System.in);
scanner.nextLine();
}
}

需要注意的有两点,不论是client还是server,其write和read都是非阻塞的,模拟的情况下我们需要阻塞等待其结果返回;二是ByteBuffer.wrap方法之后不用flip操作,其返回的结果已经是flip过的,作者在这里踩了坑。

单机版Map-Reduce fork/join框架

fork/join框架类似于单机版的Map-Reduce,如果一个计算任务数据量比较大,就将任务分解交给不同的线程去处理,然后最后汇总结果,这种计算模型对于多核处理器来说具有极大的优势。使用fork/join框架的基本过程如下。

  1. 实现一个类继承自RecursiveTask(或者无返回值的RecursiveAction)

    1. 实现compute方法,将任务拆分成多个自实现的RecursiveTask
    2. invokeAll(所有自实现的RecursiveTask);
    3. 使用自实现的RecursiveTask的join方法获取执行结果
    4. 汇总返回结果并返回
  2. 使用ForkJoinPool的invoke方法调用上一步自定义的计算任务

下例实现了一个计算数组中所有数和的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CountTask extends RecursiveTask<Integer>{
private static int THRESHOLD=3;
private int[] integers;
private int start;
private int end;
public CountTask(int[] integers,int start,int end){
this.integers=integers;
this.start=start;
this.end=end;
}
@Override
protected Integer compute() {
int len=end-start+1;
boolean isOverThreshold=len>THRESHOLD;
int sum=0;
if(!isOverThreshold){
for(int i=start;i<=end;i++){
sum+=integers[i];
}
}else{
int mid=(start+end)/2;
CountTask leftTask=new CountTask(this.integers,start,mid);
CountTask rightTask=new CountTask(this.integers,mid+1,end);
invokeAll(leftTask,rightTask);
//fork的作用是将当前任务放到workerThread里面去做
//invokeAll是将其中一个放在本线程做,其他的调用fork
int leftResult=leftTask.join();
int rightResult=rightTask.join();
sum=leftResult+rightResult;
}
return sum;
}
}

1
2
3
4
5
6
7
8
9
public class ForkjoinTest {
public static void main(String[] args){
int[] integers={1,2,3,4,5,6,7,8,9,10};
ForkJoinPool forkJoinPool=new ForkJoinPool(4);
ForkJoinTask<Integer> task=new CountTask(integers,0,9);
int sum=forkJoinPool.invoke(task);
System.out.println(sum);
}
}

值得一提的是fork/join框架是jdk8的stream实现的计算模型,如果想要深入了解stream的实现原理,可以参考这篇文章Java8 Stream原理深度解析

关于源码

本文所有源码可以从这里获得,本文首发表于我的博客,欢迎关注!转载须注明文章出处,作者保留文章所有权。

坚持原创技术分享,您的支持将鼓励我继续创作!