Quellcode durchsuchen

写通道时增加通道连接活跃判断

tags/v1.0.0^2
林万龙 vor 3 Jahren
Ursprung
Commit
f3e0660585
3 geänderte Dateien mit 50 neuen und 23 gelöschten Zeilen
  1. +47
    -21
      src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java
  2. +2
    -1
      src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java
  3. +1
    -1
      src/main/java/com/telpo/dipperposition/task/ScheduleService.java

+ 47
- 21
src/main/java/com/telpo/dipperposition/handler/NettyServerHandler.java Datei anzeigen

@@ -82,11 +82,13 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {


/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。
* 也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端断开链接,IP:{}", ctx.channel().localAddress().toString());
ctx.channel().close();
}

/**
@@ -121,8 +123,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
buf.writeBytes(returnBytes);
buf.writeBytes(getCheckSumBytes(channelAns));
buf.writeBytes(getReturnBytes());
ctx.write(buf);
ctx.flush();
// 确保通道处于活跃状态
// 不活跃状态继续写会产生CLOSE_WAIT现象
if(ctx.channel().isActive()) {
ctx.write(buf);
ctx.flush();
}
}
}
// 发送SDBP-AST-POS获取辅助位置信息
@@ -138,9 +144,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
buf.writeBytes(returnBytes);
buf.writeBytes(getCheckSumBytes(channelAns));
buf.writeBytes(getReturnBytes());

ctx.write(buf);
ctx.flush();
// 确保通道处于活跃状态
// 不活跃状态继续写会产生CLOSE_WAIT现象
if(ctx.channel().isActive()) {
ctx.write(buf);
ctx.flush();
}
}
}

@@ -154,8 +163,12 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
buf = bba.buffer(returnBytes.length);
//Unpooled.buffer(channelAns.getBytes().length);
buf.writeBytes(returnBytes);
ctx.write(buf);
ctx.flush();
// 确保通道处于活跃状态
// 不活跃状态继续写会产生CLOSE_WAIT现象
if(ctx.channel().isActive()) {
ctx.write(buf);
ctx.flush();
}
//}
}

@@ -171,8 +184,13 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
buf.writeBytes(returnBytes);
buf.writeBytes(getCheckSumBytes(channelAns));
buf.writeBytes(getReturnBytes());
ctx.write(buf);
ctx.flush();

// 确保通道处于活跃状态
// 不活跃状态继续写会产生CLOSE_WAIT现象
if(ctx.channel().isActive()) {
ctx.write(buf);
ctx.flush();
}
}
channelAns = nettyServerHandler.dipperAstPosAsyncTaskService.pushAstPos(ipAddress);

@@ -182,22 +200,30 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
buf.writeBytes(returnBytes);
buf.writeBytes(getCheckSumBytes(channelAns));
buf.writeBytes(getReturnBytes());
ctx.write(buf);
ctx.flush();
// 确保通道处于活跃状态
// 不活跃状态继续写会产生CLOSE_WAIT现象
if(ctx.channel().isActive()) {
ctx.write(buf);
}
}

byte[] returnBytes = nettyServerHandler.dipperDataAsyncTaskService.getAstEPH();
//if (channelAns != null) {
log.debug("PEH Buffer Length is:" + returnBytes.length);
// 使用池化的堆内存,以减少内存碎片
//ByteBuf channelPehAnsBuf = Unpooled.buffer(channelAns.getBytes().length);
//ByteBuf channelPehAnsBuf = bba.buffer(channelAns.getBytes().length);
ByteBuf channelPehAnsBuf = bba.buffer(returnBytes.length);
//channelPehAnsBuf.writeBytes(channelAns.getBytes(CharsetUtil.UTF_8));
channelPehAnsBuf.writeBytes(returnBytes);
// compositeByteBuf.addComponent(channelPehAnsBuf);
// log.info("CompositeByteBuf Length is:" + compositeByteBuf.capacity());
log.debug("PEH Buffer Length is:" + returnBytes.length);
// 使用池化的堆内存,以减少内存碎片
//ByteBuf channelPehAnsBuf = Unpooled.buffer(channelAns.getBytes().length);
//ByteBuf channelPehAnsBuf = bba.buffer(channelAns.getBytes().length);
ByteBuf channelPehAnsBuf = bba.buffer(returnBytes.length);
//channelPehAnsBuf.writeBytes(channelAns.getBytes(CharsetUtil.UTF_8));
channelPehAnsBuf.writeBytes(returnBytes);
// compositeByteBuf.addComponent(channelPehAnsBuf);
// log.info("CompositeByteBuf Length is:" + compositeByteBuf.capacity());
// 确保通道处于活跃状态
// 不活跃状态继续写会产生CLOSE_WAIT现象
if(ctx.channel().isActive()) {
ctx.write(channelPehAnsBuf);
ctx.flush();
}
//}

// 写给下一个Handler,最后一个Handler将内容移出pipeline


+ 2
- 1
src/main/java/com/telpo/dipperposition/server/DipperPositionServer.java Datei anzeigen

@@ -52,7 +52,8 @@ public class DipperPositionServer {
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
// 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true);
.childOption(ChannelOption.SO_LINGER, 1000);
//.childOption(ChannelOption.SO_KEEPALIVE, true);


//绑定端口,开始接收进来的连接


+ 1
- 1
src/main/java/com/telpo/dipperposition/task/ScheduleService.java Datei anzeigen

@@ -32,7 +32,7 @@ public class ScheduleService {
// 获取推送失败的记录
try {
// 如果失败,则可以等待10秒再获取1次。
int tryTimes = 2;
int tryTimes = 59;
dipperDataAsyncTaskService.pullAstEPH(tryTimes);
} catch (InterruptedException e) {
log.error("获取星历数据重试睡眠发生异常:", e);


Laden…
Abbrechen
Speichern