一、基本说明
1、在 Netty 中做耗时的,不可预料的操作,比如数据库、网络请求,会严重影响 Netty 对 Socket 的处理速度。
2、解决方案就是将耗时任务添加到异步线程池中。但是就添加线程池这步操作来讲,可以有2种方式:
- 处理耗时业务的第一种方式----handler中加入线程池
- 处理耗时业务的第二种方式----Context中添加线程池
二、handler中加入线程池
2.1 业务代码
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
// group 充当业务线程池,可以将任务提交到该线程池中
// 这里创建了 16 个线程
static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("EchoServerHandler 的线程是=" + Thread.currentThread().getName());
// 将任务提交到 group 线程池
group.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
// 接收客户端消息
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String body = new String(bytes, "UTF-8");
// 休眠 10 秒
Thread.sleep(10 * 1000);
System.out.println("group.submit 的 call 的线程是=" + Thread.currentThread().getName());
// 回复消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端222", CharsetUtil.UTF_8));
return null;
}
});
System.out.println("go on");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
在channelRead 方法中,模拟了一个耗时 10秒的操作,这里我们将这个任务提交到了一个自定义的业务线程池(group)中,这样,就不会阻塞 Netty 的 IO 线程。
2.2 整个程序处理过程
1、当 IO 线程轮询到一个 socket 事件后,交给 IO 线程开始处理,当走到耗时 handler 的时候,将耗时任务交给业务线程池。
2、当耗时任务执行完毕再执行 pipeline write 方法的时候(代码中使用的是ctx.writeAndFlush),会将这个任务交给 IO 线程
2.3 write方法源码(AbstractChannelHandlerContext)
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
1、当判断下个 outbound 的 executor 线程不是当前线程的时候,会将当前的工作封装成 task,然后放入队列中,等待 IO 任务执行完毕后执行队列中的任务。
2、当我们使用了group.submit(new Callable() {},在 handler 中加入线程池,就会进入到 safeExecute(executor, task, promise, m); 如果去掉这段代码,而使用普通方式来执行耗时的业务,那么就不会进入到 safeExecute(executor, task, promise, m);
三、Context中加入线程池
3.1 业务代码
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
// 创建业务线程池
// 这里创建了有两个线程的线程池
static final EventExecutorGroup group = new DefaultEventExecutorGroup(2);
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
//p.addLast(new EchoServerHandler());
// 说明:如果在 addLast 添加 handler 时,前面有指定
// EventExecutorGroup ,那么该 handler 会优先加入到该线程池中
p.addLast(group,new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1、handler 中的代码就使用普通的方式来处理耗时业务。
2、当我们在调用 addLast 方法添加线程池后,handler 将优先使用这个线程池,如果不添加,将使用 IO 线程。
3、当走到 AbstractChannelHandlerContext 的 invokeChannelRead 方法的时候,executor.inEventLoop() 是不会通过的,因为当前线程是 IO 线程,Context(也就是Handler)的 executor 是业务线程,所以会异步执行。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
4、验证时,如果去掉 p.addLast(group,new EchoServerHandler()); 改为 p.addLast(new EchoServerHandler()); 就不会进行异步执行
四、两种方式的比较
1、第一种方式在 handler 中添加异步,可能更加的自由,比如如果需要访问数据库,那就异步,如果不需要,那就不异步。异步会拖长接口响应时间,因为需要将任务放进 mpscTask 中。如果 IO 时间很短,task 很多,可能一个循环下来,都没时间执行整个 task,导致响应时间达不到指标
2、第二种方式是 Netty 标准方式(即加入到队列),但是,这么做会将整个 handler 都交给业务线程池。不论耗时不耗时,都加入到队列,不够灵活