异步回调模式。
JOIN异步阻塞 操作原理:阻塞当前的线程,直到准备合并的目标线程的执行完成;即线程A调用了线程B的join方法,合并线程B,线程A则进入阻塞状态,直到线程B执行完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package com.leezy.future;public class JoinDemo { public static final int SLEEP_GAP = 500 ; public static String getCurThreadName () { return Thread.currentThread().getName(); } static class HotWaterThread extends Thread { public HotWaterThread () { super ("烧水线程" ); } public void run () { try { Thread.sleep(SLEEP_GAP); } catch (InterruptedException e) { e.printStackTrace(); } } } static class WashThread extends Thread { public WashThread () { super ("清洗线程" ); } public void run () { try { Thread.sleep(SLEEP_GAP); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main (String[] args) { Thread hotWaterThread = new HotWaterThread(); Thread washThread = new WashThread(); hotWaterThread.start(); washThread.start(); try { hotWaterThread.join(); washThread.join(); Thread.currentThread().setName("主线程" ); } catch (InterruptedException e) { e.printStackTrace(); } } }
join方法是有三个重载版本:
void join():A线程等待B线程执行结束后,A线程重新恢复执行。
void join(long millis):A线程等待B线程执行一段时间,最长等待时间为millis毫秒。超过millis毫秒后,不论B线程是否结束,A线程重新恢复执行。
void join(long millis, int nanos):等待B线程执行一段时间,最长等待时间为millis毫秒,加nanos纳秒。超过时间后,不论B线程是否结束,A线程重新恢复执行。
JOIN被合并的线程没有返回值,如果需要异步线程的执行结果,就需要用到Java的FutureTask系列类。
FutureTask异步回调
Callable接口:Callable接口是个泛型接口,与Runnable接口类似,唯一的区别是,其抽象方法call有返回值,返回值的类型为泛型形参的实际类型。但是Callable接口的实例不能作为Thread线程实例的target来使用,而Runnable接口实例可以作为Thread线程实例的target构造参数,开启一个Thread线程。其内部进行的是异步执行的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 package java.util.concurrent;@FunctionalInterface public interface Callable <V > { V call () throws Exception ; }
FutureTask类:就像一座搭在Callable实例与Thread线程实例之间的桥。FutureTask类的内部封装一个Callable实例,然后自身间接继承了Runnable接口可以作为Thread线程的target。
1 2 3 4 5 6 7 8 public class FutureTask <V > implements RunnableFuture <V > { public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package com.leezy.future;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class JavaFutureDemo { public static final int SLEEP_GAP = 500 ; public static String getCurThreadName () { return Thread.currentThread().getName(); } static class HotWaterJob implements Callable <Boolean > { @Override public Boolean call () throws Exception { try { Thread.sleep(SLEEP_GAP); } catch (InterruptedException e) { return false ; } return true ; } } static class WashJob implements Callable <Boolean > { @Override public Boolean call () throws Exception { try { Thread.sleep(SLEEP_GAP); } catch (InterruptedException e) { return false ; } return true ; } } public static void drinkTea (boolean waterOK, boolean teacupOK) { if (waterOK && teacupOK) { System.out.println("喝茶" ); } else if (!waterOK) { System.out.println("烧水失败" ); } else { System.out.println("洗杯子失败" ); } } public static void main (String[] args) { Callable<Boolean> hotWaterJob = new HotWaterJob(); FutureTask<Boolean> hotWaterTask = new FutureTask<>(hotWaterJob); Thread hotWaterThread = new Thread(hotWaterTask, "烧水线程" ); Callable<Boolean> washJob = new WashJob(); FutureTask<Boolean> washTask = new FutureTask<>(washJob); Thread washThread = new Thread(washTask, "清洁线程" ); hotWaterThread.start(); washThread.start(); Thread.currentThread().setName("主线程" ); try { Boolean waterOK = hotWaterTask.get(); Boolean teacupOK = washTask.get(); drinkTea(waterOK, teacupOK); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
P.S.FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。所以,在使用的时候,它们两个实例的泛型参数一定需要保持一致
Future接口 Java将FutureTask类的一系列操作,抽象出来作为一个重要的接口,Future接口。主要提供了三个功能
判断并发任务是否执行完成
获取并发的任务完成后的结果
取消并发执行中的任务
1 2 3 4 5 6 7 8 9 10 11 12 public interface Future <V > { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException ; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ; }
以上2种办法,通过FutureTask类和Join方法都是异步阻塞模式,效率都是比较低的。
Guava的异步调用 Guava增强了java.util.concurrent包,为了实现非阻塞获取异步线程的结果,Guava对Java的异步回调机制做了2个方面的增强。
ListenableFuture,继承了Java的Future接口,使Java的Future异步任务在Guava中能被监控和获取非阻塞异步执行的结果。
FutureCallback,新接口,该接口的目的是在异步任务执行完成后,根据异步结果,完成不同的回调处理,可以处理异步结果。
onSuccess(): 在异步任务执行成功后被回调;调用时,异步任务的执行结果,作为onSuccess方法的参数被传入。
onFailure():在异步任务执行过程中,抛出异常时被回调;调用时,异步任务所抛出的异常,作为onFailure方法的参数被传入。
1 2 3 4 5 public interface FutureCallback <V > { void onSuccess (@Nullable V var1) ; void onFailure (Throwable var1) ; }
继承自Java的Future接口,增加了一个addListener方法,作用是将FutureCallback的回调封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行处理。
在实际编程中,将FutureCallback回调逻辑绑定到ListenableFuture的异步任务,可以通过Guava的Futures工具类的addCallback静态方法。
获取Guava的ListenableFuture异步任务实例,主要通过线程池ThreadPool提交Callable任务的方式来获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public static void nativeFuture () throws Exception { ExecutorService nativeExecutor = Executors.newSingleThreadExecutor(); Future<String> nativeFuture = nativeExecutor.submit(new Callable<String>() { @Override public String call () throws Exception { TimeUnit.SECONDS.sleep(1 ); return "[" + Thread.currentThread().getName() + "]: 并发包Future返回结果" ; } }); System.out.println("[" + Thread.currentThread().getName() + "] ==>" + nativeFuture.get()); } public static void guavaFuture () { ExecutorService executorService = Executors.newSingleThreadExecutor(); ListeningExecutorService guavaExecutor = MoreExecutors.listeningDecorator(executorService); final ListenableFuture<String> listenableFuture = guavaExecutor.submit(new Callable<String>() { @Override public String call () throws Exception { TimeUnit.SECONDS.sleep(1 ); return "[" + Thread.currentThread().getName() + "]: guava的Future返回结果" ; } }); listenableFuture.addListener(new Runnable() { @Override public void run () { try { String str = "[" + Thread.currentThread().getName() + "]: guava对返回结果进行异步CallBack(Runnable):" + listenableFuture.get(); System.out.println(str); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } }, Executors.newSingleThreadExecutor()); System.out.println("[" + Thread.currentThread().getName() + "]: guavaFuture执行结束" ); } public static void guavaFuture2 () { ExecutorService executorService2 = Executors.newSingleThreadExecutor(); ListeningExecutorService guavaExecutor2 = MoreExecutors.listeningDecorator(executorService2); final ListenableFuture<String> listenableFuture2 = guavaExecutor2.submit(new Callable<String>() { @Override public String call () throws Exception { TimeUnit.SECONDS.sleep(1 ); return "[" + Thread.currentThread().getName() + "]: guava的Future返回结果" ; } }); Futures.addCallback(listenableFuture2, new FutureCallback<String>() { @Override public void onSuccess (@Nullable String result) { String str = "[" + Thread.currentThread().getName() + "]=======>对回调结果【" + result + "】进行FutureCallback" ; System.out.println(str); } @Override public void onFailure (Throwable throwable) { } }, Executors.newSingleThreadExecutor()); System.out.println( "[" + Thread.currentThread().getName() +"]: guavaFuture2执行结束" ); }
执行结果:
script 1 2 3 4 5 [main] ==>[pool-1-thread-1]: 并发包Future返回结果 [main]: guavaFuture执行结束 [pool-3-thread-1]: guava对返回结果进行异步CallBack(Runnable):[pool-2-thread-1]: guava的Future返回结果 [main]: guavaFuture2执行结束 [pool-5-thread-1]=======>对回调结果【[pool-4-thread-1]: guava的Future返回结果】进行FutureCallback
Netty的异步回调模式 Netty对JavaFuture异步任务拓展如下:
继承Java的Future接口;
定义GenericFutureListener接口,异步执行结果监听器。