上篇文章我们讲解了 Netty 的 Future,本篇文章我就就来分析一下可写的 Future,也就是 promise,Netty 中的 Promise 扩展自 Netty 的 Future。
Promise 接口
在Netty 中,Promise 接口是一种特殊的可写的 Future。 Promise 的核心源码如下:
public interface Promise<V> extends Future<V> {
Promise<V> setSuccess(V var1);
boolean trySuccess(V var1);
Promise<V> setFailure(Throwable var1);
boolean tryFailure(Throwable var1);
boolean setUncancellable();
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Promise<V> await() throws InterruptedException;
Promise<V> awaitUninterruptibly();
Promise<V> sync() throws InterruptedException;
Promise<V> syncUninterruptibly();
}
从上面可以看出,Promise 就是一个可写的 Future。在 Future 机制中,业务逻辑所在任务执行的状态(成功或失败)是在 Future 中实现的;而在 Promise 中,可以在业务逻辑中控制任务的执行结果,相比 Future 更加灵活。
以下是一个 Promise 的示例(伪代码)。
//异步的耗时任务接收一个 Promise
public Promise asynchronousFunction() {
Promise promise = new PromiseImpl();
Object result = null;
return =search() //业务逻辑
if (sucess) {
promise.setSuccess(result); //通知 promise 当前异步任务成功了,并传入结果
} else if (failed) {
promise.setFailure(reason);//通知 promise 当前异步任务失败了
} else if (error) {
promise.setFailure(error);//通知 promise 当前异步任务发生了异常
}
}
//调用异步的耗时操作
Promise promise = asynchronousFunction(promise);//会立即返回 promise
//添加成功处理 / 失败处理 / 异步处理等事件
promise.addListener();//例如:可以添加成功后的执行事件
//继续做其他事件,不需要理会 asynchronousFunction 何时结束
doOtherThings();
在Netty 中,Promise 继承了 Future,因此也具备了 Future 的所有功能。在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败。
Netty 的常用 Promise 类有 DefaultPromise 类,这是 Promise 实现的基础,DefaultChannelPromise 是 DefaultPromise 的子类,加入了channel属性。
Netty 的 DefaultPromise
Netty 中涉及异步操作的地方都使用了 Promise 。例如,下面是服务器/客户端启动时的注册任务,最终会调用 Unsafe 的 register,调用过程中会传入一个Promise 。Unsafe 进行事件的注册时调用 Promise 可以设置成功或者失败。
//SingleThreadEventLoop.java
public ChannelFuture register(ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
//AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (AbstractChannel.this.isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
} else if (!AbstractChannel.this.isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
} else {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
this.register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
public void run() {
AbstractUnsafe.this.register0(promise);
}
});
} catch (Throwable var4) {
AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4);
this.closeForcibly();
AbstractChannel.this.closeFuture.setClosed();
this.safeSetFailure(promise, var4);
}
}
}
}
DefaultPromise 提供的功能可以分为两个部分;一个是为调用者提供 get()和addListen()用于获取 Future 任务执行结果和添加监听事件;另一部分是为业务处理任务提供setSucess()等方法设置任务的成功或失败。
1、 设置任务的成功或失败;
DefaultPromise 核心源码如下:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
public Promise<V> setSuccess(V result) {
if (this.setSuccess0(result)) {
return this;
} else {
throw new IllegalStateException("complete already: " + this);
}
}
public boolean trySuccess(V result) {
return this.setSuccess0(result);
}
public Promise<V> setFailure(Throwable cause) {
if (this.setFailure0(cause)) {
return this;
} else {
throw new IllegalStateException("complete already: " + this, cause);
}
}
public boolean tryFailure(Throwable cause) {
return this.setFailure0(cause);
}
public boolean setUncancellable() {
if (RESULT_UPDATER.compareAndSet(this, (Object)null, UNCANCELLABLE)) {
return true;
} else {
Object result = this.result;
return !isDone0(result) || !isCancelled0(result);
}
}
public boolean isSuccess() {
Object result = this.result;
return result != null && result != UNCANCELLABLE && !(result instanceof DefaultPromise.CauseHolder);
}
public boolean isCancellable() {
return this.result == null;
}
//...
}
2、 获取Future任务执行结果和添加监听事件;
DefaultPromise 的get方法有 3 个。
- 无参数的get会阻塞等待;
- 有参数的get会等待指定事件,若未结束就抛出超时异常,这两个get是在其父类 AbstractFuture中实现的。
- getNow()方法则会立马返回结果。
源码如下:
public V getNow() {
Object result = this.result;
return !(result instanceof DefaultPromise.CauseHolder) && result != SUCCESS && result != UNCANCELLABLE ? result : null;
}
public V get() throws InterruptedException, ExecutionException {
Object result = this.result;
if (!isDone0(result)) {
this.await();
result = this.result;
}
if (result != SUCCESS && result != UNCANCELLABLE) {
Throwable cause = this.cause0(result);
if (cause == null) {
return result;
} else if (cause instanceof CancellationException) {
throw (CancellationException)cause;
} else {
throw new ExecutionException(cause);
}
} else {
return null;
}
}
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
Object result = this.result;
if (!isDone0(result)) {
if (!this.await(timeout, unit)) {
throw new TimeoutException();
}
result = this.result;
}
if (result != SUCCESS && result != UNCANCELLABLE) {
Throwable cause = this.cause0(result);
if (cause == null) {
return result;
} else if (cause instanceof CancellationException) {
throw (CancellationException)cause;
} else {
throw new ExecutionException(cause);
}
} else {
return null;
}
}
await() 方法判断 Future 任务是否结束,之后获取 this 锁,如果任务未完成则调用 Object 的 wait()等待。源码如下:
public Promise<V> await() throws InterruptedException {
if (this.isDone()) {
return this;
} else if (Thread.interrupted()) {
throw new InterruptedException(this.toString());
} else {
this.checkDeadLock();
synchronized(this) {
while(!this.isDone()) {
this.incWaiters();
try {
this.wait();
} finally {
this.decWaiters();
}
}
return this;
}
}
//...
}
addListener 方法被调用时,将传入的回调传入listeners对象中。如果监听多于 1 个,会创建DeflaultFutureListeners对象将回调方法保存在一个数组中。
removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。源码如下。
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
ObjectUtil.checkNotNull(listener, "listener");
synchronized(this) {
this.addListener0(listener);
}
if (this.isDone()) {
this.notifyListeners();
}
return this;
}
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
ObjectUtil.checkNotNull(listeners, "listeners");
synchronized(this) {
GenericFutureListener[] var3 = listeners;
int var4 = listeners.length;
int var5 = 0;
while(var5 < var4) {
GenericFutureListener<? extends Future<? super V>> listener = var3[var5];
if (listener != null) {
this.addListener0(listener);
++var5;
continue;
}
}
}
if (this.isDone()) {
this.notifyListeners();
}
return this;
}
public Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener) {
ObjectUtil.checkNotNull(listener, "listener");
synchronized(this) {
this.removeListener0(listener);
return this;
}
}
public Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
ObjectUtil.checkNotNull(listeners, "listeners");
synchronized(this) {
GenericFutureListener[] var3 = listeners;
int var4 = listeners.length;
for(int var5 = 0; var5 < var4; ++var5) {
GenericFutureListener<? extends Future<? super V>> listener = var3[var5];
if (listener == null) {
break;
}
this.removeListener0(listener);
}
return this;
}
}
在添加监听器的过程中,如果任务刚好执行完毕 done(),则立即触发监听事件。触发监听通过notifyListeners()实现。主要逻辑如下:
如果当前addListener的线程(准确来说应该是调用了notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners和 Promise 内的线程池)与当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池中执行;
而如果是执行 Future 任务的线程池中的setSuccess时,调用notifyListeners(),会放在当前线程中执行。内部维护了notifyListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次遍历并调用operationComplete。
Netty 的 DefaultChannelPromise
DefaultChannelPromise 是 DefaultPromise 的子类,内部维护了一个通道变量 channel。
Promise 机制相关的方法都是调用父类方法。
除此之外,DefaultChannelPromise 还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
核心源码如下:
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
private long checkpoint;
//...
public Channel channel() {
return this.channel;
}
public ChannelPromise setSuccess() {
return this.setSuccess((Void)null);
}
public ChannelPromise setSuccess(Void result) {
super.setSuccess(result);
return this;
}
public boolean trySuccess() {
return this.trySuccess((Object)null);
}
public ChannelPromise setFailure(Throwable cause) {
super.setFailure(cause);
return this;
}
//...
public ChannelPromise promise() {
return this;
}
protected void checkDeadLock() {
if (this.channel().isRegistered()) {
super.checkDeadLock();
}
}
public ChannelPromise unvoid() {
return this;
}
public boolean isVoid() {
return false;
}
}
总结
以上我们分析了 Netty 中的 Promise,知道了它是扩展自 Netty 的 Future,是一个可写的 Future。