>>分享Java编程技术,对《Java面向对象编程》等书籍提供技术支持 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 21297 个阅读者 刷新本主题
 * 贴子主题:  java NIO示例以及流程详解 回复文章 点赞(0)  收藏  
作者:flybird    发表时间:2020-02-06 16:36:31     消息  查看  搜索  好友  邮件  复制  引用

  
                                                                                                                                                                                                                                     服务端:        
  1.       package nio;
  2.       import org.springframework.util.StringUtils;
  3.       import java.io.IOException;
  4.       import java.net.InetSocketAddress;
  5.       import java.nio.ByteBuffer;
  6.       import java.nio.channels.SelectionKey;
  7.       import java.nio.channels.Selector;
  8.       import java.nio.channels.ServerSocketChannel;
  9.       import java.nio.channels.SocketChannel;
  10.       import java.util.Date;
  11.       import java.util.Iterator;
  12.       import java.util.Set;
  13.       public   class  ServerSocketChannels  implements  Runnable {
  14.           private  ServerSocketChannel serverSocketChannel;
  15.           private  Selector selector;
  16.           private  volatile  boolean stop;
  17.            public  ServerSocketChannels ( int port){
  18.               try {
  19.                   //创建多路复用器selector,工厂方法
  20.                  selector = Selector.open();
  21.                   //创建ServerSocketChannel,工厂方法
  22.                  serverSocketChannel = ServerSocketChannel.open();
  23.                   //绑定ip和端口号,默认的IP=127.0.0.1,对连接的请求最大队列长度设置为backlog=1024,如果队列满时收到连接请求,则拒绝连接
  24.                  serverSocketChannel.socket().bind( new InetSocketAddress(port),  1024);
  25.                   //设置非阻塞方式
  26.                  serverSocketChannel.configureBlocking( false);
  27.                   //注册serverSocketChannel到selector多路服用器上面,监听accrpt请求
  28.                  serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  29.                  System.out.println( "the time is start port = " + port);
  30.              }  catch (IOException e) {
  31.                  e.printStackTrace();
  32.                  System.exit( 1);
  33.              }
  34.          }
  35.            public  void  stop (){
  36.               this.stop =  true;
  37.          }
  38.           @Override
  39.            public  void  run () {
  40.               //如果server没有停止
  41.               while(!stop){
  42.                   try {
  43.                       //selector.select()会一直阻塞到有一个通道在你注册的事件上就绪了
  44.                       //selector.select(1000)会阻塞到1s后然后接着执行,相当于1s轮询检查
  45.                      selector.select( 1000);
  46.                       //找到所有准备接续的key
  47.                      Set<SelectionKey> selectionKeys = selector.selectedKeys();
  48.                      Iterator<SelectionKey> it = selectionKeys.iterator();
  49.                      SelectionKey key =  null;
  50.                      while(it.hasNext()){
  51.                         key = it.next();
  52.                         it.remove();
  53.                          try {
  54.                              //处理准备就绪的key
  55.                             handle(key);
  56.                         } catch (Exception e){
  57.                              if(key !=  null){
  58.                                  //请求取消此键的通道到其选择器的注册
  59.                                 key.cancel();
  60.                                  //关闭这个通道
  61.                                  if(key.channel() !=  null){
  62.                                     key.channel().close();
  63.                                 }
  64.                             }
  65.                         }
  66.                     }
  67.                  }  catch (Throwable e) {
  68.                      e.printStackTrace();
  69.                  }
  70.              }
  71.               if(selector !=  null){
  72.                   try {
  73.                      selector.close();
  74.                  }  catch (IOException e) {
  75.                      e.printStackTrace();
  76.                  }
  77.              }
  78.          }
  79.            public  void  handle (SelectionKey key)  throws IOException {
  80.                //如果key是有效的
  81.                 if(key.isValid()){
  82.                     //监听到有新客户端的接入请求
  83.                     //完成TCP的三次握手,建立物理链路层
  84.                     if(key.isAcceptable()){
  85.                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  86.                        SocketChannel sc = (SocketChannel) ssc.accept();
  87.                         //设置客户端链路为非阻塞模式
  88.                        sc.configureBlocking( false);
  89.                         //将新接入的客户端注册到多路复用器Selector上
  90.                        sc.register(selector, SelectionKey.OP_READ);
  91.                    }
  92.                     //监听到客户端的读请求
  93.                     if(key.isReadable()){
  94.                         //获得通道对象
  95.                        SocketChannel sc = (SocketChannel) key.channel();
  96.                        ByteBuffer readBuffer = ByteBuffer.allocate( 1024);
  97.                         //从channel读数据到缓冲区
  98.                        int readBytes = sc.read(readBuffer);
  99.                        if (readBytes >  0){
  100.                           //Flips this buffer.  The limit is set to the current position and then
  101.                            // the position is set to zero,就是表示要从起始位置开始读取数据
  102.                           readBuffer.flip();
  103.                            //eturns the number of elements between the current position and the  limit.
  104.                            // 要读取的字节长度
  105.                            byte[] bytes =  new  byte[readBuffer.remaining()];
  106.                            //将缓冲区的数据读到bytes数组
  107.                           readBuffer.get(bytes);
  108.                           String body =  new String(bytes,  "UTF-8");
  109.                           System.out.println( "the time server receive order: " + body);
  110.                           String currenttime =  "query time order".equals(body) ?  new Date(System.currentTimeMillis()).toString():  "bad order";
  111.                           doWrite(sc, currenttime);
  112.                       } else  if(readBytes <  0){
  113.                          key.channel();
  114.                          sc.close();
  115.                        }
  116.                    }
  117.                }
  118.          }
  119.            public  static  void  doWrite (SocketChannel channel, String response)  throws IOException {
  120.               if(!StringUtils.isEmpty(response)){
  121.                   byte []  bytes = response.getBytes();
  122.                   //分配一个bytes的length长度的ByteBuffer
  123.                  ByteBuffer  write = ByteBuffer.allocate(bytes.length);
  124.                   //将返回数据写入缓冲区
  125.                  write.put(bytes);
  126.                  write.flip();
  127.                   //将缓冲数据写入渠道,返回给客户端
  128.                  channel.write(write);
  129.              }
  130.          }
  131.      }

       服务端启动程序:        

  1.       package nio;
  2.        /**
  3.        * 服务端启动程序
  4.        */
  5.       public   class  ServerMain {
  6.            public  static  void  main (String[] args) {
  7.               int port =  8010;
  8.              ServerSocketChannels server =  new ServerSocketChannels(port);
  9.               new Thread(server, "timeserver-001").start();
  10.          }
  11.      }

       客户端程序:        

  1.       package nio;
  2.       import java.io.IOException;
  3.       import java.net.InetSocketAddress;
  4.       import java.nio.ByteBuffer;
  5.       import java.nio.channels.SelectionKey;
  6.       import java.nio.channels.Selector;
  7.       import java.nio.channels.SocketChannel;
  8.       import java.util.Iterator;
  9.       import java.util.Set;
  10.       public   class  TimeClientHandler  implements  Runnable {
  11.           //服务器端的ip
  12.           private String host;
  13.          //服务器端的端口号
  14.           private  int port;
  15.          //多路服用选择器
  16.           private Selector selector;
  17.           private SocketChannel socketChannel;
  18.           private  volatile  boolean stop;
  19.            public  TimeClientHandler (String host,  int port){
  20.               this.host = host ==  null ?  "127.0.0.1": host;
  21.               this.port = port;
  22.               try {
  23.                   //初始化一个Selector,工厂方法
  24.                  selector = Selector.open();
  25.                   //初始化一个SocketChannel,工厂方法
  26.                  socketChannel = SocketChannel.open();
  27.                   //设置非阻塞模式
  28.                  socketChannel.configureBlocking( false);
  29.              }  catch (IOException e) {
  30.                  e.printStackTrace();
  31.                  System.exit( 1);
  32.              }
  33.          }
  34.            /**
  35.            * 首先尝试连接服务端
  36.            *  @throws IOException
  37.            */
  38.            public  void  doConnect ()  throws IOException {
  39.               //如果连接成功,像多路复用器selector监听读请求
  40.               if(socketChannel.connect( new InetSocketAddress( this.host,  this.port))){
  41.               socketChannel.register(selector, SelectionKey.OP_READ);
  42.                //执行写操作,像服务器端发送数据
  43.               doWrite(socketChannel);
  44.              } else {
  45.                   //监听连接请求
  46.                  socketChannel.register(selector, SelectionKey.OP_CONNECT);
  47.              }
  48.          }
  49.            public  static  void  doWrite (SocketChannel sc)  throws IOException {
  50.             //构造请求消息体
  51.              byte [] bytes =  "query time order".getBytes();
  52.             //构造ByteBuffer
  53.             ByteBuffer write = ByteBuffer.allocate(bytes.length);
  54.             //将消息体写入发送缓冲区
  55.              write.put(bytes);
  56.              write.flip();
  57.              //调用channel的发送方法异步发送
  58.              sc.write(write);
  59.              //通过hasRemaining方法对发送结果进行判断,如果消息全部发送成功,则返回true
  60.               if(!write.hasRemaining()){
  61.                  System.out.println( "send order 2 server successd");
  62.              }
  63.          }
  64.           @Override
  65.            public  void  run () {
  66.               try {
  67.                  doConnect();
  68.              }  catch (IOException e) {
  69.                  e.printStackTrace();
  70.                  System.exit( 1);
  71.              }
  72.               while (!stop){
  73.                   try {
  74.                      selector.select( 1000);
  75.                      Set<SelectionKey> keys =  selector.selectedKeys();
  76.                      Iterator<SelectionKey> its =keys.iterator();
  77.                      SelectionKey key =  null;
  78.                       while (its.hasNext()){
  79.                          key = its.next();
  80.                          its.remove();
  81.                           try {
  82.                              handle(key);
  83.                          } catch (Exception e){
  84.                               if(key !=  null){
  85.                                  key.cancel();
  86.                                   if(key.channel() !=  null){
  87.                                      key.channel().close();
  88.                                  }
  89.                              }
  90.                          }
  91.                      }
  92.                  }  catch (Exception e) {
  93.                      e.printStackTrace();
  94.                      System.exit( 1);
  95.                  }
  96.              }
  97.          }
  98.            public   void  handle (SelectionKey key)  throws IOException {
  99.               if(key.isValid()){
  100.                  SocketChannel sc = (SocketChannel) key.channel();
  101.                   if(key.isConnectable()){
  102.                       //如果连接成功,监听读请求
  103.                      if(sc.finishConnect()){
  104.                        sc.register( this.selector, SelectionKey.OP_READ);
  105.                         //像服务端发送数据
  106.                         doWrite(sc);
  107.                     } else{
  108.                         System.exit( 1);
  109.                     }
  110.                  }
  111.                   //监听到读请求,从服务器端接受数据
  112.                   if(key.isReadable()){
  113.                      ByteBuffer byteBuffer = ByteBuffer.allocate( 1024);
  114.                       int readBytes = sc.read(byteBuffer);
  115.                       if(readBytes >  0){
  116.                          byteBuffer.flip();
  117.                           byte []  bytes =  new  byte;
  118.                          byteBuffer.get(bytes);
  119.                          String body =  new String(bytes, "UTF-8");
  120.                          System.out.println( "now body is "+ body);
  121.                          stop =  true;
  122.                      } else  if(readBytes <  0){
  123.                          key.cancel();
  124.                          sc.close();
  125.                      }
  126.                  }
  127.              }
  128.          }
  129.         //释放所有与该多路复用器selector关联的资源
  130.            if(selector !=  null){
  131.               try {
  132.                  selector.close();
  133.              }  catch (IOException e) {
  134.                  e.printStackTrace();
  135.              }
  136.          }
  137.      }

       客户端启动程序:        

  1.       package nio;
  2.        /**
  3.        * 客户端启动程序
  4.        */
  5.       public   class  ClientMain {
  6.            public  static  void  main (String[] args) {
  7.             int port =  8010;
  8.            TimeClientHandler client =  new TimeClientHandler( "",port);
  9.             new Thread(client, "client-001").start();
  10.          }
  11.      }

           现在说一下nio的执行过程:

         第一步:启动server服务器,初始化多路复用器selector、ServerSocketChannel通道、设置通道的模式为非阻塞、注册channel到selector上,并监听accept请求;

         第二步:启动server服务器,循环selectionKeys,当有channel准备好时就处理,否则一直循环;

         第三步:启动client端,初始化多路复用器selector、SocketChannel通道,设置通道的模式为非阻塞;

         第四步:client首先尝试连接server,此时socketChannel.connect(new InetSocketAddress(this.host, this.port)返回false,表示server还没有返回信息,server收到连接请求后,监听到client的接入请求,会初始化一个新的client、并将新接入的client注册到多路复用器Selector上,并应答client;再回到client端,由于client没有及时收到server端的应答,所以client胡监听一个connect请求,socketChannel.register(selector, SelectionKey.OP_CONNECT),当server返回应答信息时,client会收到一个connect请求,key.isConnectable(),如果此时sc.finishConnect()连接完成,client会监听一个read请求,并像server发送数据doWrite(sc),然后server会收到一个read请求,key.isReadable()处理完后返回给client,client也会收到一个读请求,收到server的返回数据,此时,整个交互过程结束;

                           截取一下书上的步骤:来自netty权威指南:

         点击在新窗口中浏览原图
CTRL+鼠标滚轮放大或缩小

                                             nio的优点:

         1、客户端发起的连接操作connect是异步的,可以通过在多路复用器selector上监听connect事件等待后续结果,不需要像之前的客户端那样被同步阻塞;

         2、SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样IO线程就可以处理其它的链路,不需要同步等待这个链路可用;

         3、线程模型的优化,由于jdk的selector在linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限与操作系统的最大句柄数或者单个进程的句柄限制),这意味这一个selector可以连接成千上万个客户端连接,而性能不会随着客户端连接数的增长呈线性下降,因此,它适合做高性能、高负载的网络服务器;

                                                                                                                        
----------------------------
原文链接:https://blog.csdn.net/chengkui1990/article/details/81558522

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-02-07 10:54:24 重新编辑]
  Java面向对象编程-->Java语言中的修饰符
  JavaWeb开发-->JSP中使用JavaBean(Ⅰ)
  JSP与Hibernate开发-->Java应用分层架构及软件模型
  Java网络编程-->ServerSocket用法详解
  精通Spring-->计算属性和数据监听
  Vue3开发-->Vue简介
  NIO底层原理
  Java方法的嵌套与递归调用
  深入分析synchronized实现原理
  Java多线程volatile详解
  Eclipse使用指南:创建Java项目的步骤
  Java设计模式:享元模式
  Java设计模式:装饰器模式
  Java Scoket之java.io.EOFException解决方案
  Java入门实用代码:查看线程是否存活
  Java入门实用代码:获取远程文件大小
  Java入门实用代码:使用 Enumeration 遍历 HashTable
  Java入门实用代码:获取链表(LinkedList)的第一个和最后一...
  Java 入门实用代码:取最大和最小值
  java实现PPT转化为PDF
  Java中的main()方法详解
  更多...
 IPIP: 已设置保密
楼主      
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。