netty-handler

anjingsi 8月前 ⋅ 647 阅读

netty-handler

ChannelHandler

所有handler的父类,有三个子类的实现ChannelHandlerAdapter、ChannelOutboundHandler、ChannelInboundHandler

handlerAdded

handler被添加到channel的时候执行,这个动作就是由pipeline的添加handler方法完成的.对于服务端,在客户端连接进来的时候,就通过ServerBootstrapAcceptor的read方法,为每一个channel添加了handler。该方法对于handler而言是第一个触发的方法

void handlerAdded(ChannelHandlerContext ctx) throws Exception

handlerRemoved

对应handlerAdded,将handler从该channel的pipeline移除后的回调方法

void handlerRemoved(ChannelHandlerContext ctx) throws Exception

handlerRemoved

捕获channel生命周期中异常处理 void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception

1、ChannelHandlerAdapter

2、ChannelOutboundHandler

bind

绑定端口的操作 void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception

连接远程地址

registered完成之后且channel处理active状态,首次注册状态。主要见AbstractChannel的register0(promise)方法

/**
 * Called once a connect operation is made.
 *
 * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
 * @param remoteAddress     the {@link SocketAddress} to which it should connect
 * @param localAddress      the {@link SocketAddress} which is used as source on connect
 * @param promise           the {@link ChannelPromise} to notify once the operation completes
 * @throws Exception        thrown if an error occurs
 */
void connect(
        ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception

断开连接

unregistered之前执行,主要见AbstractChannel的deregister方法 void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception

close

关闭操作 void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception

deregister

取消注册到线程池中 void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception

read

设置监听读取事件 void read(ChannelHandlerContext ctx) throws Exception

write

写入数据的操作 void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception

flush

刷新缓冲区,立刻发送 void flush(ChannelHandlerContext ctx) throws Exception 上述接口都是和连接相关的处理,而且这些都是通过pipeline的相关方法触发的,最终调用的是tail对象。实际上这个handler比较鸡肋,因为大部分实现类都做不了什么, 最终都交给headContext对象调用unsafe相关方法由其处理了。大部分handler只对write方法进行了处理,其他都直接交给其他的handler处理了

3、ChannelInboundHandler

channelRegistered

在channel注册到线程池的时候会被触发 void channelRegistered(ChannelHandlerContext ctx) throws Exception;

channelUnregistered

在channel关闭的时候触发 void channelUnregistered(ChannelHandlerContext ctx) throws Exception

channelActive

registered完成之后且channel处理active状态,首次注册状态。主要见AbstractChannel的register0(promise)方法 void channelActive(ChannelHandlerContext ctx) throws Exception

channelInactive

unregistered之前执行,主要见AbstractChannel的deregister方法 void channelInactive(ChannelHandlerContext ctx) throws Exception

channelRead

这个主要见pipeline的fireChannelRead方法,其被channel在获取到数据的阶段进行调用,进而触发到handler的channelRead方法 void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception

channelReadComplete

这个和上面read方法一样,fireChannelReadComplete方法,被channel运行过程中read过程完成会进行调用 void channelReadComplete(ChannelHandlerContext ctx) throws Exception

userEventTriggered

这个也是由pipeline提供的方法作为入口fireUserEventTriggered,这个就是触发一个事件了,以IdleStateHandler为例,其一般作为心跳检测事件,放入线程池执行,判断空闲就会触发该方法,传导到各个handler。 void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception

channelWritabilityChanged

这个入口也在pipeline,但是好像没有怎么用到,channel并没有调用这个方法,一般也没怎么用该方法 void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception

ChannelHandlerContext

handlerContext和pipeline联合使用,管理handler链。handler本身没有持有顺序关系,都是通过handlerContext完成的。handlerContext自身会通过配合handler造成顺着构成的链式顺序调用下去

handlerContext和pipeline一样实现类很少,基本使用的就是DefaultChannelHandlerContext,其继承自AbstractChannelHandlerContext,大部分方法也都是在抽象父类中。

pipeline的构造handler链的过程:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

pipeline初始化的时候构建了tail和head两个handler,这两个是特殊的,位置不能替换的。

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

先判断这个handler有没有被其他pipeline加载过,是否是sharable类型,进行相关设置。通过handler创建context。然后插入到tail前面,后面就是添加handler之后触发的方法,触发handlerAdd方法。通过addLast0方法,可以看见context中的参数prev和next被初始化了。也就是说通过当前的context能够找到前一个和后一个context了。

@Override
public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
    return this;
}

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelActive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelActive();
            }
        });
    }
}

private void invokeChannelActive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelActive(this);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelActive();
    }
}

handler的任何一个方法,在context中都是由三个这样的方法来构成链式执行的。static方法都是给pipeline调用的,其入参就是head,handler链的头结点。调用了invokeChannelActive()方法,这里就是我们的handler方法相关内容被触发了。乍一看好像执行完了就没了啊,这个地方要接下去就需要handler配合了。随便找一个handler,比如ChannelInboundHandlerAdapter,其相关方法都调用了ctx.fireChannelActive(); 这里就接上了,ctx的fireChannelActive不就是回到了invokeChannelActive()方法了。其入参就不再是当前的context,而是通过findContextInbound来找到下一个in类型的handler,这也说明这个接口是针对in类型的接口。所以一个基本的循环就是ctx.invokeChannelActive(ctx) ->ctx.invokeChannelActive() -> hander.channelActive()->ctx.fireChannelActive(ctx)->findContextInbound()->ctx.invokeChannelActive(ctx)。这么个循环链,起点就是pipeline的invokeChannelActive(head),终点就是handler.channelActive()没有调用ctx.channel的时候,最后的tailContext就是没有执行任何操作,所以执行到这链路就断了。

handler生命周期与回调接口调用顺序

handlerAdded -> channelRegistered -> channelActive -> channelRead -> channelReadComplete channelInactive -> channelUnRegistered -> handlerRemoved

handler方法中常用方法与含义:

方法名含义
handlerAdded新建立的连接会按照初始化策略,把handler添加到该channel的pipeline里面,也就是channel.pipeline.addLast(new LifeCycleInBoundHandler)执行完成后的回调
channelRegistered当该连接分配到具体的worker线程后,该回调会被调用
channelActivechannel的准备工作已经完成,所有的pipeline添加完成,并分配到具体的线上上,说明该channel准备就绪,可以使用了
channelRead客户端向服务端发来数据,每次都会回调此方法,表示有数据可读
channelReadComplete服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕
channelInactive当连接断开时,该回调会被调用,说明这时候底层的TCP连接已经被断开了
channelUnRegistered对应channelRegistered,当连接关闭后,释放绑定的worker线程
handlerRemoved对应handlerAdded,将handler从该channel的pipeline移除后的回调方法
exceptionCaughthanler处理发生异常时调用
userEventTriggered事件检测

内置handler

handler含义参数说明
HttpRequestDecoderHttp请求解码器int maxInitialLineLength(最大长度), int maxHeaderSize(头长度), int maxChunkSize(每包大小)
HttpResponseEncoderHttp响应编码器
StringDecoderString类型解码器
StringEncoderString类型编码器
DelimiterBasedFrameDecoder以指定分隔符解码int maxFrameLength(单条消息的最大长度到达该长度没有找到分隔符抛出异常), ByteBuf delimiter(分隔符)
WebSocketServerProtocolHandlerWebSocket连接,需要添加该处理器websocketPath(连接路径)
HttpObjectAggregator请求头请求体合成一个FullHttpRequestmaxContentLength(请求体长度)

IdleStateHandler

使用示例

pipeline.addLast(new IdleStateHandler(40,0,0, TimeUnit.SECONDS));

源码
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);
  • readerIdleTime:读超时,即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件
  • writerIdleTime:写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件
  • allIdleTime:读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件
  • unit:时间参数的格式

示例子中设置的是 new IdleStateHandler(40,0,0,TimeUnit.SECONDS),意思就是客户端 40 秒内没有发生读事件,超时事件就会被触发,超时事件就会被触发,具体操作定义在自定义的处理类的userEventTriggered方法中检测

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // 心跳检测
    if (evt instanceof IdleStateEvent) {
        IdleState state = ((IdleStateEvent) evt).state();
        if (state == IdleState.READER_IDLE) {
            log.warn("client:{} reader timeout", ctx.channel().remoteAddress());
        } else if (state == IdleState.WRITER_IDLE) {
            log.warn("client:{} writer timeout", ctx.channel().remoteAddress());
        } else if (state == IdleState.ALL_IDLE) {
            log.warn("client:{} writer and reader   timeout", ctx.channel().remoteAddress());
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}

全部评论: 0

    我有话说: