前言
本文分析异步任务出现超时及异常的情况时的处理流程。
一、示例
设置超时时间为2s,但任务需要执行10s
@ApiOperation(value = "test", notes = "test")
@GetMapping(value = "/test", produces = {
MediaType.APPLICATION_JSON_VALUE})
public DeferredResult test() throws ValidDataException {
Test test = new Test();
test.setId(1);
test.setName("test1");
//构建DeferredResult,超时时间为2s
DeferredResult<Object> result = new DeferredResult<>(2000L);
Callable call = () -> {
//这里休眠10是
TimeUnit.SECONDS.sleep(10000);
return test;
};
//将call作为DeferredResult的result
result.setResult(call);
//超时回调
result.onTimeout(() -> {
result.setResult("请求超时");
});
return result;
}
二、源码分析
在AsyncContextImpl中执行异步任务的过程中,调用了updateTimeout( )来处理超时
1、processAsyncTask( )
private synchronized void processAsyncTask() {
if (!initialRequestDone) {
return;
}
//处理超时
updateTimeout();
final Runnable task = asyncTaskQueue.poll();
if (task != null) {
processingAsyncTask = true;
asyncExecutor().execute(new TaskDispatchRunnable(task));
} else {
processingAsyncTask = false;
}
}
2、 updateTimeout();
public void updateTimeout() {
XnioExecutor.Key key = this.timeoutKey;
if (key != null) {
if (!key.remove()) {
return;
} else {
this.timeoutKey = null;
}
}
//设置了超时时间,并且异步任务还没有完成
if (timeout > 0 && !complete) {
this.timeoutKey = WorkerUtils.executeAfter(exchange.getIoThread(), timeoutTask, timeout, TimeUnit.MILLISECONDS);
}
}
3、 executeAfter();
public static XnioExecutor.Key executeAfter(XnioIoThread thread, Runnable task, long timeout, TimeUnit timeUnit) {
try {
return thread.executeAfter(task, timeout, timeUnit);
} catch (RejectedExecutionException e) {
if(thread.getWorker().isShutdown()) {
UndertowLogger.ROOT_LOGGER.debugf(e, "Failed to schedule task %s as worker is shutting down", task);
//we just return a bogus key in this case
return new XnioExecutor.Key() {
@Override
public boolean remove() {
return false;
}
};
} else {
throw e;
}
}
}
public Key executeAfter(final Runnable command, final long time, final TimeUnit unit) {
final long millis = unit.toMillis(time);
if ((state & SHUTDOWN) != 0) {
throw log.threadExiting();
}
if (millis <= 0) {
execute(command);
return Key.IMMEDIATE;
}
final long deadline = (nanoTime() - START_TIME) + Math.min(millis, LONGEST_DELAY) * 1000000L;
//创建TimeKey,传入截止时间,及超时任务TimeoutTask
final TimeKey key = new TimeKey(deadline, command);
synchronized (workLock) {
final TreeSet<TimeKey> queue = delayWorkQueue;
//加入到队列中
queue.add(key);
if (queue.iterator().next() == key) {
// we're the next one up; poke the selector to update its delay time
if (polling) {
// flag is always false if we're the same thread
selector.wakeup();
}
}
return key;
}
}
超时任务被加入到了delayWorkQueue队列中
4、 WorkerThread;
在WorkerThread的run方法中,会把delayWorkQueue的任务取出来,校验是否超时,超时后将运行TimeKey的Command即TimeoutTask
public void run() {
final Selector selector = this.selector;
try {
log.tracef("Starting worker thread %s", this);
final Object lock = workLock;
//工作任务队列
final Queue<Runnable> workQueue = selectorWorkQueue;
//延时任务队列
final TreeSet<TimeKey> delayQueue = delayWorkQueue;
log.debugf("Started channel thread '%s', selector %s", currentThread().getName(), selector);
Runnable task;
Iterator<TimeKey> iterator;
long delayTime = Long.MAX_VALUE;
Set<SelectionKey> selectedKeys;
SelectionKey[] keys = new SelectionKey[16];
int oldState;
int keyCount;
for (;;) {
// Run all tasks
do {
synchronized (lock) {
task = workQueue.poll();
//首次task为null
if (task == null) {
iterator = delayQueue.iterator();
delayTime = Long.MAX_VALUE;
if (iterator.hasNext()) {
final long now = nanoTime();
do {
//取出延时任务
final TimeKey key = iterator.next();
if (key.deadline <= (now - START_TIME)) {
//超时
//将超时任务加入工作队列
workQueue.add(key.command);
//移除延时任务
iterator.remove();
} else {
//没有超时,停止循环
delayTime = key.deadline - (now - START_TIME);
// the rest are in the future
break;
}
} while (iterator.hasNext());
}
//取出task,超时的时候这里是TimeoutTask
task = workQueue.poll();
}
}
// clear interrupt status
Thread.interrupted();
//执行task
safeRun(task);
} while (task != null);
......
......
}
4、 TimeoutTask;
private final class TimeoutTask implements Runnable {
@Override
public void run() {
synchronized (AsyncContextImpl.this) {
//任务初始化或出现异常,并且还没有完成
if (!dispatched && !complete) {
//新建一个处理超时的异步任务
addAsyncTask(new Runnable() {
@Override
public void run() {
final boolean setupRequired = SecurityActions.currentServletRequestContext() == null;
UndertowServletLogger.REQUEST_LOGGER.debug("Async request timed out");
servletRequestContext.getCurrentServletContext().invokeRunnable(servletRequestContext.getExchange(), new Runnable() {
@Override
public void run() {
//now run request listeners
setupRequestContext(setupRequired);
try {
//调用监听器的onTimeout方法
onAsyncTimeout();
if (!dispatched) {
if (!getResponse().isCommitted()) {
//close the connection on timeout
exchange.setPersistent(false);
exchange.getResponseHeaders().put(Headers.CONNECTION, Headers.CLOSE.toString());
Connectors.executeRootHandler(new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
//servlet
try {
if (servletResponse instanceof HttpServletResponse) {
((HttpServletResponse) servletResponse).sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
} else {
servletRequestContext.getOriginalResponse().sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
}
} catch (IOException e) {
UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
} catch (Throwable t) {
UndertowLogger.REQUEST_IO_LOGGER.handleUnexpectedFailure(t);
}
}
}, exchange);
} else {
//not much we can do, just break the connection
IoUtils.safeClose(exchange.getConnection());
}
if (!dispatched) {
//调用完成
complete();
}
}
} finally {
tearDownRequestContext(setupRequired);
}
}
});
}
});
}
}
}
}
dispatched :表示任务是否分发过或处理完成过,在代码中设置值的地方
第一处:
private synchronized void doDispatch(final Runnable runnable) {
if (dispatched) {
throw UndertowServletMessages.MESSAGES.asyncRequestAlreadyDispatched();
}
//在异步任务创建之前,设置为true
dispatched = true;
final HttpServletRequestImpl request = servletRequestContext.getOriginalRequest();
//创建异步任务
addAsyncTask(new Runnable() {
@Override
public void run() {
request.asyncRequestDispatched();
runnable.run();
}
});
if (timeoutKey != null) {
timeoutKey.remove();
}
}
第二处:
//异步任务结束,处理结果
public void handleCompletedBeforeInitialRequestDone() {
assert completedBeforeInitialRequestDone;
completeInternal(true);
//设置为true
dispatched = true;
}
第三处:
//出现error的时候,回调
public void handleError(final Throwable error) {
//设置为false
dispatched = false; //we reset the dispatched state
onAsyncError(error);
...
}
complete:表示任务是否完成,是在complate方法中设置的
@Override
public synchronized void complete() {
if (complete) {
UndertowLogger.REQUEST_LOGGER.trace("Ignoring call to AsyncContext.complete() as it has already been called");
return;
}
//设置为true
complete = true;
if (timeoutKey != null) {
timeoutKey.remove();
timeoutKey = null;
}
if (!dispatched) {
completeInternal(false);
} else {
onAsyncComplete();
}
if (previousAsyncContext != null) {
previousAsyncContext.complete();
}
}
这里的dispatched、complete、initialRequestDone、processingAsyncTask等状态都是用boolean值,让人难以理解,为什么不参考FutureTask的状态值呢?
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
5、 onAsyncTimeout();
private void onAsyncTimeout() {
for (final BoundAsyncListener listener : asyncListeners) {
//创建异步事件AsyncEvent
AsyncEvent event = new AsyncEvent(this, listener.servletRequest, listener.servletResponse);
try {
//调用监听器的onTimeout( )
listener.asyncListener.onTimeout(event);
} catch (IOException e) {
UndertowServletLogger.REQUEST_LOGGER.ioExceptionDispatchingAsyncEvent(e);
} catch (Throwable t) {
UndertowServletLogger.REQUEST_LOGGER.failureDispatchingAsyncEvent(t);
}
}
}
其中这个asyncListener是在StandardServletAsyncWebRequest调用startAsync( )方法的时候创建的
@Override
public void startAsync() {
Assert.state(getRequest().isAsyncSupported(),
"Async support must be enabled on a servlet and for all filters involved " +
"in async request processing. This is done in Java code using the Servlet API " +
"or by adding \"<async-supported>true</async-supported>\" to servlet and " +
"filter declarations in web.xml.");
Assert.state(!isAsyncComplete(), "Async processing has already completed");
if (isAsyncStarted()) {
return;
}
this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
//创建Listener
this.asyncContext.addListener(this);
if (this.timeout != null) {
this.asyncContext.setTimeout(this.timeout);
}
}
6、 onTimeout();
StandardServletAsyncWebRequest.java
public void onTimeout(AsyncEvent event) throws IOException {
//掉用timeoutHandlers
this.timeoutHandlers.forEach(Runnable::run);
}
在WebAsyncManager的startCallableProcessing方法中初始化了timeoutHandler,也可以自定义传入timeoutHandler
this.asyncWebRequest.addTimeoutHandler(() -> {
if (logger.isDebugEnabled()) {
logger.debug("Async request timeout for " + formatRequestUri());
}
//调用拦截器链
Object result = interceptorChain.triggerAfterTimeout(this.asyncWebRequest, callable);
if (result != CallableProcessingInterceptor.RESULT_NONE) {
//处理结果
setConcurrentResultAndDispatch(result);
}
});
其中的一个拦截器会创建异常,result被赋值AsyncRequestTimeoutException,最后在处理结果中会处理超时异常的情况
public class TimeoutCallableProcessingInterceptor implements CallableProcessingInterceptor {
@Override
public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception {
return new AsyncRequestTimeoutException();
}
}
总结
本文简单分析了异步任务出现超时异常的情况时,异步任务的处理流程。