跳至主要內容

Netty学习笔记二

xw大约 3 分钟NettyJavaNetty


Echo案例代码

根据 Nettyopen in new window官网编写入门案例,编写一个简单的echo服务。直接上代码

1、编写EchoServer

package echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * @Author: 向往
 * @Contact: 2457081614@qq.com
 * @Date: 2019/11/17 0017 23:54
 * @Version: 1.0
 * @Description:
 */
public class EchoServer {

    private Integer port;

    public EchoServer(Integer port) {
        this.port = port;
    }

    public EchoServer() {
    }

    public void run() throws InterruptedException, UnknownHostException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

      try
      {
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          serverBootstrap.group(bossGroup,workerGroup)
                  //设置通道类型
                  .channel(NioServerSocketChannel.class)
                  //设置新建立的channel tcp参数
                  .option(ChannelOption.TCP_NODELAY,true)
                  //每个通道的数据处理
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                      protected void initChannel(SocketChannel socketChannel) throws Exception {
                             socketChannel.pipeline().addLast(new EchoServerHandle());
                      }
                  });
          System.out.println("echo server 启动成功,ip地址"+InetAddress.getLocalHost().getHostAddress());
          //绑定端口
          ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();

          channelFuture.channel().closeFuture().sync();
      }
      finally {
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }

    }
    public static void main(String[] args) throws InterruptedException, UnknownHostException {
      new EchoServer(8080).run();
    }
}

2、编辑对应handle类

package echo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.nio.charset.Charset;

/**
 * @Author: 向往
 * @Contact: 2457081614@qq.com
 * @Date: 2019/11/18 0018 0:03
 * @Version: 1.0
 * @Description:
 */

public class EchoServerHandle extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
        ctx.writeAndFlush(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}
  • EventLoop好比一个线程,1个EventLoop可以服务多个Channel,1个Channel只有一个EventLoop可以创建多个 EventLoop 来优化资源利用,也就是EventLoopGroup

  • EventLoopGroup 负责分配 EventLoop 到新创建的 Channel,里面包含多个EventLoop

测试

启动后使用 telnet ipaddress port命令

$ telnet 192.168.0.196 8080
Trying 192.168.0.196...
Connected to 192.168.0.196.
Escape character is '^]'.
s
s
hello world
hello world

demo2-服务端推送服务

服务端主动推送时间给客户端,客户端打印输出,废话少说直接上代码。
客户端代码

package time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect("127.0.0.1", 8080).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
            System.out.println("客户端关闭");
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

package time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

服务端代码

package time;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetAddress;
import java.net.UnknownHostException;

/**
 * @Author: 向往
 * @Contact: 2457081614@qq.com
 * @Date: 2019/11/17 0017 23:54
 * @Version: 1.0
 * @Description:
 */
public class TimeServer {

    private Integer port;

    public TimeServer(Integer port) {
        this.port = port;
    }

    public TimeServer() {
    }

    public void run() throws InterruptedException, UnknownHostException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

      try
      {
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          serverBootstrap.group(bossGroup,workerGroup)
                  .channel(NioServerSocketChannel.class)
                  .childHandler(new ChannelInitializer<SocketChannel>() {
                      protected void initChannel(SocketChannel socketChannel) throws Exception {
                             socketChannel.pipeline().addLast(new TimeServerHandler());
                      }
                  });
          System.out.println("time server 启动成功,ip地址"+InetAddress.getLocalHost().getHostAddress());
          //绑定端口
          ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();

          channelFuture.channel().closeFuture().sync();
      }
      finally {
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
      }

    }
    public static void main(String[] args) throws InterruptedException, UnknownHostException {
      new TimeServer(8080).run();
    }
}

package time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(final ChannelHandlerContext ctx) { // (1)
        //为了发送消息,分配4个字节
        final ByteBuf time = ctx.alloc().buffer(4); // (2)
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time); // (3)
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                System.out.println("监听器监听到操作已经完成,关闭");
                ctx.close();
            }
        }); // (4)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

测试

先启动服务端,再启动客户端,输出相关信息。
image.png

注意

image.png