CompletableFuture
CompletableFuture常用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//thenApply
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn) {
return uniApplyStage(null, fn);
}
//thenApplyAsync
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn) {
return uniApplyStage(asyncPool, fn);
}
//thenCompose
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(null, fn);
}
//thenComposeAsync
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn) {
return uniComposeStage(asyncPool, fn);
}
//supplyAsync
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
//runAsync
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
以CompletableFuture.thenApplyAsync为例
大致流程
1
get相关方法
1
2
3
4
5
public T get() throws InterruptedException, ExecutionException {
Object r;
////result为null,继续等待结果
return reportGet((r = result) == null ? waitingGet(true) : r);
}
waitingGet()方法
1,首先进行自旋
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private Object waitingGet(boolean interruptible) {
Signaller q = null;
boolean queued = false;
int spins = -1;
Object r;
while ((r = result) == null) {
//spins < 0开始进入自旋
if (spins < 0)
spins = (Runtime.getRuntime().availableProcessors() > 1) ?
1 << 8 : 0; // Use brief spin-wait on multiprocessors
else if (spins > 0) {、
//随机数,并不是每次都一样
if (ThreadLocalRandom.nextSecondarySeed() >= 0)
--spins;
}
//到这里说明自旋次数spins==0
else if (q == null)//还没有生成Signaller,
q = new Signaller(interruptible, 0L, 0L);
else if (!queued)//将Signaller入栈
queued = tryPushStack(q);
else if (interruptible && q.interruptControl < 0) {//interruptible是否允许中断,且已经中断,直接返回null
q.thread = null;
cleanStack();//清理stack,依赖的stage
return null;
}
else if (q.thread != null && result == null) {
try {
ForkJoinPool.managedBlock(q);//进行阻塞,通过LockSupport.park阻塞
} catch (InterruptedException ie) {
q.interruptControl = -1;
}
}
}
//阻塞已经唤醒,有可能是中断,需要处理下中断
if (q != null) {
q.thread = null;
if (q.interruptControl < 0) {//interruptControl < 0表示已经中断了
if (interruptible)
r = null; // report interruption
else
Thread.currentThread().interrupt();
}
}
//非中断,正常唤醒
postComplete();
return r;
}
ForkJoinPool.managedBlock阻塞
内部调用Signaller的block进行阻塞,看起来主要是调用LockSupport进行阻塞
1
2
3
4
5
6
7
8
9
10
11
public boolean block() {
if (isReleasable())
return true;
else if (deadline == 0L)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
final boolean isLive() { return thread != null; }
}
Signaller唤醒
既然阻塞是通过LockSupport.park,那么唤醒理应是LockSupport.unpark,这个在Signaller的tryFire内部
1
2
3
4
5
6
7
8
final CompletableFuture<?> tryFire(int ignore) {
Thread w; // no need to atomically claim
if ((w = thread) != null) {
thread = null;
LockSupport.unpark(w);
}
return null;
}
再看下的定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
static final class Signaller extends Completion
implements ForkJoinPool.ManagedBlocker {
long nanos; // wait time if timed
final long deadline; // non-zero if timed
volatile int interruptControl; // > 0: interruptible, < 0: interrupted
volatile Thread thread;
Signaller(boolean interruptible, long nanos, long deadline) {
this.thread = Thread.currentThread();//当前线程
this.interruptControl = interruptible ? 1 : 0;//支持中断默认值为1
this.nanos = nanos;
this.deadline = deadline;
}
final CompletableFuture<?> tryFire(int ignore) {
Thread w; // no need to atomically claim
if ((w = thread) != null) {
thread = null;
LockSupport.unpark(w);//唤醒当前线程
}
return null;
}
public boolean isReleasable() {
if (thread == null)
return true;
if (Thread.interrupted()) {
int i = interruptControl;
interruptControl = -1;
if (i > 0)
return true;
}
if (deadline != 0L &&
(nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
thread = null;
return true;
}
return false;
}
public boolean block() {
if (isReleasable())
return true;
else if (deadline == 0L)
LockSupport.park(this);
else if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
final boolean isLive() { return thread != null; }
}
Signaller继承了Completion Completion继承了ForkJoinTask,实现Runnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
abstract static class Completion extends ForkJoinTask<Void>
implements Runnable, AsynchronousCompletionTask {
volatile Completion next; // Treiber stack link
/**
* Performs completion action if triggered, returning a
* dependent that may need propagation, if one exists.
*
* @param mode SYNC, ASYNC, or NESTED
*/
abstract CompletableFuture<?> tryFire(int mode);
/** Returns true if possibly still triggerable. Used by cleanStack. */
abstract boolean isLive();
public final void run() { tryFire(ASYNC); }//Runnable触发tryFire
public final boolean exec() { tryFire(ASYNC); return true; }//由ForkJoinTask调用
public final Void getRawResult() { return null; }
public final void setRawResult(Void v) {}
}