Solo  当前访客:4 登录 注册

喧哗博客-http://blog.xuahua.com

繁华过后的沉寂--技术经验分享
浏览次数: 94,390    文章总数: 91    评论总数: 3
标签:

Netty Webscoket 实现聊天室 有更新!

一。服务器端

1,服务器应用重启,初始化,绑定端口

public interface Server {

    /**
     * 初始化服务器
     */
    void init();

    /**
     * 启动服务器
     */
    void start();

    /**
     * 关闭服务器
     */
    void shutdown();
}

 

 

@Component
public class ChatServer implements Server {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private DefaultEventLoopGroup defaultGroup;

// Boss线程:由这个线程池提供的线程是boss种类的,用于创建、连接、绑定socket, (有点像门卫)然后把这些socket传给worker线程池。
// 在服务器端每个监听的socket都有一个boss线程来处理。在客户端,只有一个boss线程来处理所有的socket。

 private NioEventLoopGroup bossGroup;

// Worker线程:Worker线程执行所有的异步I/O,即处理操作

 private NioEventLoopGroup workGroup;


 private ChannelFuture future;

 private ServerBootstrap bootstrap;

 
 private ScheduledExecutorService executorService;

@Value("${HOST}")
 private String host;

@Value("${PORT}")
 private int port;

@Autowired
private UserAuthHandler authHandler;

@Autowired
private MessageHandler messageHandler;

@Autowired
private ChannelManager manager;

@PostConstruct
 @Override
public void init() {
 logger.info("server init");
 int cpus = Runtime.getRuntime().availableProcessors();


defaultGroup = new DefaultEventLoopGroup(8, new ThreadFactory() {
 private AtomicInteger index = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
 return new Thread(r, "DEFAULTGROUP" + index.incrementAndGet());
}
 });

bossGroup = new NioEventLoopGroup(cpus, new ThreadFactory() {
 private AtomicInteger index = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
 return new Thread(r, "BOSSGROUP" + index.incrementAndGet());
}
 });

workGroup = new NioEventLoopGroup(cpus * 10, new ThreadFactory() {
 private AtomicInteger index = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
 return new Thread(r, "WORKGROUP" + index.incrementAndGet());
}
 });

bootstrap = new ServerBootstrap();

executorService = Executors.newScheduledThreadPool(2);

}

 @Override
public void start() {
 logger.info("server start");
bootstrap.group(bossGroup, workGroup)
 .channel(NioServerSocketChannel.class)
 //TCP测链路检测
.option(ChannelOption.SO_KEEPALIVE, true)
 //禁止使用Nagle算法
.option(ChannelOption.TCP_NODELAY, true)
 //初始化服务端可连接队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
 .localAddress(new InetSocketAddress(port))
 .childHandler(new ChannelInitializer<SocketChannel>() {

 @Override
protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(defaultGroup,
//请求解码器
new HttpServerCodec(),
//将多个消息转换成单一的消息对象
new HttpObjectAggregator(65536),
//支持异步发送大的码流
new ChunkedWriteHandler(),
//定时检测链路是否读空闲
new IdleStateHandler(60, 0, 0),
//认证Handler
authHandler,
//消息Handler
messageHandler

);
}
 });

 try {
 future = bootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) future.channel().localAddress();
logger.info("QuarkChat start success ,host is :" + addr.getHostName() + ",port is:" + addr.getPort());

/**
 * 定时扫描Channel 关闭失效的Channel
 */
executorService.scheduleAtFixedRate(new Runnable() {
 @Override
public void run() {
 logger.info("scheduleAtFixedRate to close channel");
manager.scanNotActiveChannel();
}
 //initialDelay:延迟180秒执行,period:任务执行的间隔周期
}, 180, 180, TimeUnit.SECONDS);

/**
 * 定时向客户端发送Ping进行心跳检测
 */
executorService.scheduleAtFixedRate(new Runnable() {
 @Override
public void run() {
 logger.info("scheduleAtFixedRate to ping");
manager.broadPing();
}
 }, 60, 60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
 logger.error("Quark Chat fail ", e);
Thread.currentThread().interrupt();
}
 }

 @PreDestroy
 @Override
public void shutdown() {
 if (defaultGroup != null) {
 defaultGroup.shutdownGracefully();
}
 if (executorService != null) {
 executorService.shutdown();
}
 bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}

}

 

 

 

二。消息体参数

public class ClientProtocol{

/**
自己定义文件体头,也可以不用
*/
private int MAGIC;

/**
消息体请求,响应编码
如 0x01 ping 消息请求标识
   0x02 pong 服务器响应ping请求
   0x03 auth 用户验证请求标识
   0x04 authresp 服务器响应用户验证响应
   0x05 message 用户发送消息请求标识
   0x06 messageresp 服务器响应用户发送消息响应
   0x07 reconn 用户重连请求标识
   0x08 reconnresp 服务器响应重连请求响应
   0x10 sysmessage 系统发送消息标识
   0x11 useroff 用户离线通知标识
   0x12 useroffresp 系统通知用户
 
*/
private byte type;

/**聊天室id*/
private int roomid ;
/***
发送的消息内容
*/
private msg;
/**目标对象**/
private int target;
}

 

三。消息处理

 

 

@ChannelHandler.Sharable
@Scope("prototype")
@Component
public class UserAuthHandler extends SimpleChannelInboundHandler {

    private static final Logger logger = LoggerFactory.getLogger(UserAuthHandler.class);

    private WebSocketServerHandshaker handshaker;

    @Value("${WEBSOCKET_URL}")
    private String WEBSOCKET_URL;

    @Autowired
    private ChannelManager manager;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //Http请求(第一次握手)
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        }
        //处理WebSocket请求
        else if (msg instanceof WebSocketFrame) {
            handleWebSocket(ctx, (WebSocketFrame) msg);
        }
    }

    /**
     * 内部链路检测
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        //当通道空闲时由IdleStateHandler触发的用户事件
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            // 判断Channel是否读空闲, 读空闲时移除Channel
            if (event.state().equals(IdleState.READER_IDLE)) {
                final String address = NettyUtil.parseChannelRemoteAddr(ctx.channel());
                logger.warn("Netty Server UserAuthHandler: IDLE exception :{}", address);

            }
        }
    }

    /**
     * HTTP握手反馈
     */
    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        //判断是否是WebSocket协议
        if (!request.decoderResult().isSuccess() || !"websocket".equals(request.headers().get("Upgrade"))) {
            logger.warn("protobuf don't support WebSocket");
            ctx.channel().close();
            return;
        }
        WebSocketServerHandshakerFactory handshakerFactory = new WebSocketServerHandshakerFactory(
                WEBSOCKET_URL, null, true);
        handshaker = handshakerFactory.newHandshaker(request);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
        } else {
            // 动态加入websocket的编解码处理
            handshaker.handshake(ctx.channel(), request);
            // 存储已经连接的Channel
            manager.addChannel(ctx.channel());
        }
    }

    /**
     * WebSocket反馈
     *
     * @param ctx
     * @param frame
     */
    private void handleWebSocket(ChannelHandlerContext ctx, WebSocketFrame frame) {
        //判断是否是关闭链路的命令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            manager.removeChannel(ctx.channel());
            logger.info("Have a WebSocket Channel Close");
            return;
        }

        //判断是否是Ping消息
        if (frame instanceof PingWebSocketFrame) {
            logger.info("ping message : ", frame.content().retain());
            ctx.writeAndFlush(new PingWebSocketFrame(frame.content().retain()));
            return;
        }

        //判断是否是Pong消息
        if (frame instanceof PongWebSocketFrame) {
            logger.info("pong message :", frame.content().retain());
            ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
        }

        //仅支持文本消息
        if (!(frame instanceof TextWebSocketFrame)) {
            throw new UnsupportedOperationException(frame.getClass().getName() + "frame type not supported!!!");
        }
        String message = ((TextWebSocketFrame) frame).text();
       ClientProtocol clientProto = JSON.parseObject(message, new TypeReference<ClientProtocol>() {
        });
     
        byte type = clientProto.getType();
        Channel channel = ctx.channel();
        int roomid = clientProto.getRoomid();
        if (clientProto.getMAGIC() != 
            ChatProtocol.getMAGIC()) {
            return;//过滤格式不正确的消息
        }
        switch (type) {
          
            default:
              return;
        }

        //MessageHandler处理
        ctx.fireChannelRead(frame.retain());
    }

 

 

公告

喧哗博客--繁华过后的沉寂--技术经验分享^-^
Copyright (c) 2009-2019, b3log.org & hacpai.com