package com.telpo.beidouast.handler; import com.telpo.beidouast.enums.DipperReturnValue; import com.telpo.beidouast.service.IDipperAstPosAsyncTaskService; import com.telpo.beidouast.service.IDipperAstTimeAsyncTaskService; import com.telpo.beidouast.service.IDipperDataAsyncTaskService; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.socket.SocketChannel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import java.io.InputStream; import java.time.LocalDateTime; /** * @program: dipperposition * @description: Netty服务器处理句柄 * @author: linwl * @create: 2021-01-13 13:56 **/ @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static String AST_TIME_CMD = "54494d45"; private static String AST_POS_CMD = "504f53"; private static String AST_EPH_CMD = "455048"; @Autowired private IDipperAstTimeAsyncTaskService dipperTimeAsyncTaskService; @Autowired private IDipperAstPosAsyncTaskService dipperAstPosAsyncTaskService; @Autowired private IDipperDataAsyncTaskService dipperDataAsyncTaskService; @Value(value = "${position-server.timeAsycPort}") private String timeAsycServerPort; @Value(value = "${position-server.posAsycPort}") private String posAsycServerPort; @Value(value = "${position-server.starsAsycPort}") private String starsAsycServerPort; @Value("${pos.centerProvinceFilePath}") String centerProvinceFilePath; @Value("${pos.ipPositionRequestPath}") String ipPositionRequestPath; @Value("${pos.ipPositionRequestKey}") String ipPositionRequestKey; @Value("${pos.centerProvince}") String centerProvince; @Value("${pos.ast.server}") String astServer; @Value("${pos.ast.posAstPort}") int posAstPort; @Value("${pos.ast.timeout}") int astTimeout; /** * 客户端连接会触发 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("Channel active......"); SocketChannel channel = (SocketChannel) ctx.channel(); log.info("链接报告开始"); log.info("链接报告信息:有一客户端链接到本服务端"); log.info("链接报告IP:" + channel.localAddress().getHostString()); log.info("链接报告Port:" + channel.localAddress().getPort()); log.info("链接报告完毕"); //通知客户端链接建立成功 // 默认返回取得时间成功 String ackAckCheckRef = "233E0101020004020A1D"; if (Integer.parseInt(posAsycServerPort) == channel.localAddress().getPort()) { ackAckCheckRef = "233E010102000401091C"; } if (Integer.parseInt(starsAsycServerPort) == channel.localAddress().getPort()) { ackAckCheckRef = "233E010102000421293C"; } //String str = "通知客户端链接建立成功" + " " + LocalDateTime.now() + " " + channel.localAddress().getHostString() + // "\r\n"; ByteBuf buf = Unpooled.buffer(ackAckCheckRef.getBytes().length); buf.writeBytes(ackAckCheckRef.getBytes("GBK")); ctx.writeAndFlush(buf); } /** * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("客户端断开链接,IP:{}", ctx.channel().localAddress().toString()); } /** * 客户端发消息会触发 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //接收msg消息{与上一章节相比,此处已经不需要自己进行解码} SocketChannel channel = (SocketChannel) ctx.channel(); String ipAddress = channel.remoteAddress().toString(); String message = " 接收到消息:{0}, 客户端IP:{1}"; log.info(message ,msg, ipAddress); String channelAns = ""; // 返回时间指令 //if (Integer.parseInt(timeAsycServerPort) == channel.localAddress().getPort()) { ByteBuf recvmg = (ByteBuf) msg; ByteBuf buf = null; String recvmgStr = recvmg.toString(); if (AST_TIME_CMD.equals(recvmg)) { // 初始时间辅助输入; channelAns = dipperTimeAsyncTaskService.pushAstTime(); buf = Unpooled.buffer(channelAns.getBytes().length); } // 发送SDBP-AST-POS获取辅助位置信息 // if (Integer.parseInt(posAsycServerPort) == channel.localAddress().getPort()) { if (AST_POS_CMD.equals(recvmg)) { channelAns = dipperAstPosAsyncTaskService.pushAstPos(ipAddress, centerProvinceFilePath, centerProvince, ipPositionRequestPath, ipPositionRequestKey); buf = Unpooled.buffer(channelAns.getBytes().length); } // 从缓存获取SDBP-AST-EPH星历数 //if (Integer.parseInt(starsAsycServerPort) == channel.localAddress().getPort()) { if (AST_POS_CMD.equals(recvmg)) { String astEPHBytes = dipperDataAsyncTaskService.getAstEPH(); buf = Unpooled.buffer(astEPHBytes.getBytes().length); } // 最后把SDBP-AST-TIME、SDBP-AST-POS、SDBP-AST-EPH并包一起发给设备。 // 设备采用16进制获取数据,则代理服务器也是采用16进制返回数据。 // 通知客户端链消息发送成功 // String str = "服务端收到:" + LocalDateTime.now() + " " + msg + "\r\n"; buf.writeBytes(channelAns.getBytes("GBK")); ctx.writeAndFlush(buf); //ctx.write("你也好哦"); //ctx.flush(); } // @Override // public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) // throws Exception { // if (msg instanceof HttpRequest) { // HttpRequest mReq = (HttpRequest) msg; // String clientIP = mReq.headers().get("X-Forwarded-For"); // if (clientIP == null) { // InetSocketAddress insocket = (InetSocketAddress) ctx.channel() // .remoteAddress(); // clientIP = insocket.getAddress().getHostAddress(); // } // } // } /** * 发生异常触发 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }