CompletableFuture 源码分析

JDK源码学习
上篇分析了Future的实现类FutureTask, 这篇分析Future接口的另一个实现类 CompletableFuture

类继承关系见此篇 FutureTask 源码分析 UML图

CompletableFuture 是一个异步编程类,jdk1.8新增,在Future之上扩展了异步回调,组合处理的能力

使用场景:多任务执行时依赖某个任务执行的结果; 某个任务执行完后的通知;多个任务执行后的组合等等都可使用CompletableFuture来完成。

CompletableFuture类:
举例:比如某个任务需获取上一个任务的结果后才执行,下面通过这个例子来分析整个执行流程

public void test6() throws Exception {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
return “1”;
}).thenApply(s -> {
System.out.println(Thread.currentThread().getName());
return s + “2”;
});
System.out.println(cf.get());
}

返回值:
ForkJoinPool.commonPool-worker-1
main
12
public void test7() throws Exception {
CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(20);
} catch (InterruptedException e) {}
System.out.println(Thread.currentThread().getName());
return “1”;
}).thenApply(s -> {
System.out.println(Thread.currentThread().getName());
return s + “2”;
});
System.out.println(cf.get());
}

返回值:
ForkJoinPool.commonPool-worker-1
ForkJoinPool.commonPool-worker-1
12

上面 test6(), test7() 返回值不一样:

仔细看是test7 多了一段线程睡眠的代码,这也代表真实业务场景中执行任务需要的时长,

不一样的原因是test6() main函数执行thenApply方法时,上个任务supplyAsync方法的结果已经写入内存了,所以能获取到结果往下执行。

test7 方法分析
CompletableFuture类:

//异步线程池
private static final Executor asyncPool = useCommonPool ?
    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

//Supplier 函数式接口,返回一个值
//返回异步完成的新的CompletableFuture
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
    return asyncSupplyStage(asyncPool, supplier);
}

static CompletableFuture asyncSupplyStage(Executor e, Supplier f) {
if (f == null) throw new NullPointerException();
CompletableFuture d = new CompletableFuture();
//提交一个异步任务
e.execute(new AsyncSupply(d, f));
return d;
}

supplyAsync方法传入一个函数类,asyncSupplyStage方法封装了异步任务,由线程池执行;

AsyncSupply类:
static final class AsyncSupply extends ForkJoinTask
implements Runnable, AsynchronousCompletionTask {
CompletableFuture dep; Supplier fn;
//传入依赖对象
AsyncSupply(CompletableFuture dep, Supplier fn) {
this.dep = dep; this.fn = fn;
}

    public final Void getRawResult() { return null; }
    public final void setRawResult(Void v) {}
    public final boolean exec() { run(); return true; }

    public void run() {
        CompletableFuture<T> d; Supplier<T> f;
        //传入的CompletableFuture对象赋值
        if ((d = dep) != null && (f = fn) != null) {
            dep = null; fn = null;
            if (d.result == null) {
                try {
                    //f.get()获取函数执行结果
                    //d.completeValue() 通过cas操作更新结果
                    d.completeValue(f.get());
                } catch (Throwable ex) {
                    d.completeThrowable(ex);
                }
            }
            //执行所有依赖此任务的其它任务,处理后续回调
            d.postComplete();
        }
    }
}

AsyncSupply.run() 方法逻辑就是获取用户自定义函数的执行结果result, cas写入内存,后面的任务会用到此结果;

postComplete() 方法是处理后续任务的回调,此方法等会分析,先分析下test7() thenApply()方法,thenApply主要是把任务加入依赖的CompletableFuture 对象

thenApply()
CompletableFuture类:
public CompletableFuture thenApply(
Function fn) {
return uniApplyStage(null, fn);
}

private CompletableFuture uniApplyStage(
Executor e, Function f) {
if (f == null) throw new NullPointerException();
//创建新对象
CompletableFuture d = new CompletableFuture();
//d.uniApply方法尝试执行任务,返回false则任务未执行
if (e != null || !d.uniApply(this, f, null)) {
UniApply c = new UniApply(e, d, this, f);
//将c入栈
push(c);
//尝试执行任务
c.tryFire(SYNC);
}
return d;
}

final <S> boolean uniApply(CompletableFuture<S> a,
                           Function<? super S,? extends T> f,
                           UniApply<S,T> c) {
    Object r; Throwable x;
    //结果未执行完
    if (a == null || (r = a.result) == null || f == null)
        return false;
    tryComplete: if (result == null) {
        if (r instanceof AltResult) {
            if ((x = ((AltResult)r).ex) != null) {
                completeThrowable(x, r);
                break tryComplete;
            }
            r = null;
        }
        try {
            //异步执行
            if (c != null && !c.claim())
                return false;
            @SuppressWarnings("unchecked") S s = (S) r;
            //回调后cas操作写入结果
            completeValue(f.apply(s));
        } catch (Throwable ex) {
            completeThrowable(ex);
        }
    }
    return true;
}

uniApplyStage方法会创建新对象CompletableFuture d, 进入if 分支条件 uniApply方法会判断依赖的CompletableFuture,也就是当前对象是否有返回值,

如果result接口有值,则completeValue() 方法cas写入结果,返回true

否认往下执行,创建UniApply类

CompletableFuture类:
static final class UniApply extends UniCompletion {
Function fn;
//dep 依赖的对象,src 源对象
UniApply(Executor executor, CompletableFuture dep,
CompletableFuture src,
Function fn) {
super(executor, dep, src); this.fn = fn;
}
final CompletableFuture tryFire(int mode) {
CompletableFuture d; CompletableFuture a;
if ((d = dep) == null ||
//执行任务
!d.uniApply(a = src, fn, mode > 0 ? null : this))
return null;
dep = null; src = null; fn = null;
return d.postFire(a, mode);
}
}

final void push(UniCompletion<?,?> c) {
    if (c != null) {
        while (result == null && !tryPushStack(c))
            lazySetNext(c, null); // clear on failure
    }
}

final boolean tryPushStack(Completion c) {
    Completion h = stack;
    //将当前对象stack设置为c的next
    lazySetNext(c, h);
    //cas操作把当前栈值h设置为c
    return UNSAFE.compareAndSwapObject(this, STACK, h, c);
}

static void lazySetNext(Completion c, Completion next) {
    UNSAFE.putOrderedObject(c, NEXT, next);
}

分析d.postFire(a, mode), 回调方法执行完成后触发下一个回调

CompletableFuture:
//尝试清楚a对象的栈值,运行postComplete 或 将this 返回给调用者
final CompletableFuture postFire(CompletableFuture a, int mode) {
if (a != null && a.stack != null) {
if (mode < 0 || a.result == null)
a.cleanStack();
else
//触发下一个a实例的下一个回调
a.postComplete();
}
if (result != null && stack != null) {
//当前CompletableFuture已执行
if (mode < 0)
return this;
else
//触发当前实例this的回调
postComplete();
}
return null;
}

接上面AsyncSupply任务中postComplete()方法:

CompletableFuture:

//最开始的异步任务执行完后调用
final void postComplete() {
CompletableFuture f = this; Completion h;
//当f的stack为空时,则f指向当前对象CompletableFuture的stack
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture d; Completion t;
//cas把h中的next属性指向对象f的stack属性
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
//将h加入当前实例this的栈顶
pushStack(h);
continue;
}
h.next = null; // detach
}
//从头节点开始向下执行任务,执行完指向下一个回调的CompletableFuture
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}

上面分析了一波可能不太直观。所以写了个例子,看的直观些。左图为对象相关信息,右图是例子,可以看到先执行的Completion对象被后面的对象放入栈中,设置为当前对象的next属性。

执行顺序:f1 -> f2 -> f3 -> f4 -> f5 -> f6

总结
上面的例子只分析了CompletableFuture 的一部分属性和方法,可以看到其实现思路是通过链表来完成任务的回调,依赖处理。每个回调方法对应一个CompletableFuture, 如果上一个

CompletableFuture未完成,则将当前CompletableFuture添加到上一个对象的栈属性中,任务执行完毕后,回调栈中的Completion方法。

声明:来自阿飞技术,仅代表创作者观点。链接:https://eyangzhen.com/2742.html

阿飞技术的头像阿飞技术

相关推荐

关注我们
关注我们
购买服务
购买服务
返回顶部