diff --git a/src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java b/src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java index c4f8578..b275124 100644 --- a/src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java +++ b/src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java @@ -82,11 +82,13 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** - * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据 + * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。 + * 也就是说客户端与服务端的关闭了通信通道并且不可以传输数据 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("客户端断开链接,IP:{}", ctx.channel().localAddress().toString()); + ctx.channel().close(); } /** @@ -121,8 +123,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { buf.writeBytes(returnBytes); buf.writeBytes(getCheckSumBytes(channelAns)); buf.writeBytes(getReturnBytes()); - ctx.write(buf); - ctx.flush(); + // 确保通道处于活跃状态 + // 不活跃状态继续写会产生CLOSE_WAIT现象 + if(ctx.channel().isActive()) { + ctx.write(buf); + ctx.flush(); + } } } // 发送SDBP-AST-POS获取辅助位置信息 @@ -138,9 +144,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { buf.writeBytes(returnBytes); buf.writeBytes(getCheckSumBytes(channelAns)); buf.writeBytes(getReturnBytes()); - - ctx.write(buf); - ctx.flush(); + // 确保通道处于活跃状态 + // 不活跃状态继续写会产生CLOSE_WAIT现象 + if(ctx.channel().isActive()) { + ctx.write(buf); + ctx.flush(); + } } } @@ -154,8 +163,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { buf = bba.buffer(returnBytes.length); //Unpooled.buffer(channelAns.getBytes().length); buf.writeBytes(returnBytes); - ctx.write(buf); - ctx.flush(); + // 确保通道处于活跃状态 + // 不活跃状态继续写会产生CLOSE_WAIT现象 + if(ctx.channel().isActive()) { + ctx.write(buf); + ctx.flush(); + } //} } @@ -171,8 +184,13 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { buf.writeBytes(returnBytes); buf.writeBytes(getCheckSumBytes(channelAns)); buf.writeBytes(getReturnBytes()); - ctx.write(buf); - ctx.flush(); + + // 确保通道处于活跃状态 + // 不活跃状态继续写会产生CLOSE_WAIT现象 + if(ctx.channel().isActive()) { + ctx.write(buf); + ctx.flush(); + } } channelAns = nettyServerHandler.dipperAstPosAsyncTaskService.pushAstPos(ipAddress); @@ -182,22 +200,30 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { buf.writeBytes(returnBytes); buf.writeBytes(getCheckSumBytes(channelAns)); buf.writeBytes(getReturnBytes()); - ctx.write(buf); - ctx.flush(); + // 确保通道处于活跃状态 + // 不活跃状态继续写会产生CLOSE_WAIT现象 + if(ctx.channel().isActive()) { + ctx.write(buf); + } } + byte[] returnBytes = nettyServerHandler.dipperDataAsyncTaskService.getAstEPH(); //if (channelAns != null) { - log.debug("PEH Buffer Length is:" + returnBytes.length); - // 使用池化的堆内存,以减少内存碎片 - //ByteBuf channelPehAnsBuf = Unpooled.buffer(channelAns.getBytes().length); - //ByteBuf channelPehAnsBuf = bba.buffer(channelAns.getBytes().length); - ByteBuf channelPehAnsBuf = bba.buffer(returnBytes.length); - //channelPehAnsBuf.writeBytes(channelAns.getBytes(CharsetUtil.UTF_8)); - channelPehAnsBuf.writeBytes(returnBytes); - // compositeByteBuf.addComponent(channelPehAnsBuf); - // log.info("CompositeByteBuf Length is:" + compositeByteBuf.capacity()); + log.debug("PEH Buffer Length is:" + returnBytes.length); + // 使用池化的堆内存,以减少内存碎片 + //ByteBuf channelPehAnsBuf = Unpooled.buffer(channelAns.getBytes().length); + //ByteBuf channelPehAnsBuf = bba.buffer(channelAns.getBytes().length); + ByteBuf channelPehAnsBuf = bba.buffer(returnBytes.length); + //channelPehAnsBuf.writeBytes(channelAns.getBytes(CharsetUtil.UTF_8)); + channelPehAnsBuf.writeBytes(returnBytes); + // compositeByteBuf.addComponent(channelPehAnsBuf); + // log.info("CompositeByteBuf Length is:" + compositeByteBuf.capacity()); + // 确保通道处于活跃状态 + // 不活跃状态继续写会产生CLOSE_WAIT现象 + if(ctx.channel().isActive()) { ctx.write(channelPehAnsBuf); ctx.flush(); + } //} // 写给下一个Handler,最后一个Handler将内容移出pipeline diff --git a/src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java b/src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java index 6d513bb..c8e0f44 100644 --- a/src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java +++ b/src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java @@ -52,7 +52,8 @@ public class DipperPositionServer { .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文 - .childOption(ChannelOption.SO_KEEPALIVE, true); + .childOption(ChannelOption.SO_LINGER, 1000); + //.childOption(ChannelOption.SO_KEEPALIVE, true); //绑定端口,开始接收进来的连接 diff --git a/src/main/java/com/telpo/dipperposition/task/ScheduleService.java b/src/main/java/com/telpo/dipperposition/task/ScheduleService.java index 4a9f49a..34eaddc 100644 --- a/src/main/java/com/telpo/dipperposition/task/ScheduleService.java +++ b/src/main/java/com/telpo/dipperposition/task/ScheduleService.java @@ -32,7 +32,7 @@ public class ScheduleService { // 获取推送失败的记录 try { // 如果失败,则可以等待10秒再获取1次。 - int tryTimes = 2; + int tryTimes = 59; dipperDataAsyncTaskService.pullAstEPH(tryTimes); } catch (InterruptedException e) { log.error("获取星历数据重试睡眠发生异常:", e);