finally { // Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
}
改造后:
finally { if(res != null && CompletionStage.class.isAssignableFrom(res.getClass())) {
final boolean sendResult = sndRes;
final IgniteException igException = ex;
@SuppressWarnings("unchecked")
CompletionStage cs = (CompletionStage)res;
cs.exceptionally(t->{
return new IgniteException(t);
}).thenAccept(r->{
if (!HOLD.get()) {
IgniteException e = igException;
finishJob(r, e, sendResult);
} else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
});
} else {
// Finish here only if not held by this thread.
if (!HOLD.get())
finishJob(res, ex, sndRes);
else
// Make sure flag is not set for current thread.
// This may happen in case of nested internal task call with continuation.
HOLD.set(false);
ctx.job().currentTaskSession(null);
if (reqTopVer != null)
GridQueryProcessor.setRequestAffinityTopologyVersion(null);
}