目录
前言
随着逐步学习发现IO技术非常重要,整理此文并及时更新,由于水平有限,相关技能掌握不到位,如有错误敬请指正!
虽然普通日常开发中基本上很难遇到,或者说做IO、长连接等相关项目有一定门槛,但这一点都不能阻碍IO技术的重要性。
本文主要会整理一些自己学过和接触的IO相关技术和知识,写一些简单的demo,再尝试挖掘一些底层和原理性的东西。
用户空间和内核空间
现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。
操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。
为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。
针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,
而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。
文件描述符fd
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。
实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。
当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。
在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。
但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
Socket
Socket 又称套接字
Socket是在应用层和传输层之间的一个抽象层,把操作系统底层的网络通信抽象出来
一切皆文件,Socket在linux中也是一个 ip + port 组成的文件
我们对Socket的操作转换为对文件描述符的操作 [分层和抽象的思想]
数据流转
网络数据 —到达—> 网卡 —读取—> 内核缓冲区 —拷贝—> 用户空间
其中存在的IO包括 :内存IO,磁盘IO,网络IO
怎么理解同步、异步和阻塞、非阻塞
同步/异步
主要看请求方对消息结果的获取是主动发起的还是被动通知的
阻塞/非阻塞
通常指IO操作
在等待函数结果返回之前,当前线程是挂起还是运行状态
IO技术的演进
从BIO 到 NIO
BIO面向流,NIO面向缓冲区(面向块)
为了解决传统IO带来的技术上的瓶颈,随之演进而来的的五种IO模型
分别是阻塞IO模型、非阻塞IO模型、多路复用IO模型、信号驱动IO模型、异步IO模型
BIO
BIO是java1.4之前唯一的IO逻辑,在客户端通过socket向服务端传输数据,服务端监听端口。
由于传统IO读数据的时候如果数据没有传达,IO会一直等待输入传入,所以当有请求过来的时候,
新起一条线程对数据进行等待、处理,导致每一个链接都对应着服务器的一个线程。
server端:
package pers.wmx.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* BIO SERVER
*
* @author: wangmingxin03
* @date: 2020-09-01
*/
public class BioServer {
private static final int PORT = 8888;
public static void main(String[] args) {
ServerSocket serverSocket = null;
Socket client = null;
InputStream inputStream = null;
OutputStream outputStream = null;
try {
serverSocket = new ServerSocket(PORT);
client = serverSocket.accept(); //阻塞点1
inputStream = client.getInputStream();
byte[] buffer = new byte[1024];
int length = 0;
while ((length = inputStream.read(buffer)) != 0) { //阻塞点2
System.out.println(new String(buffer, 0, length));
outputStream = client.getOutputStream();
outputStream.write("success".getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (serverSocket != null) {
try {
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
client端:
package pers.wmx.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Scanner;
/**
* BIO CLIENT
*
* @author: wangmingxin03
* @date: 2020-09-01
*/
public class BioClient {
private static final String ADDRESS = "localhost";
private static final int PORT = 8888;
public static void main(String[] args) {
Socket client = null;
InputStream inputStream = null;
OutputStream outputStream = null;
byte[] buffer = new byte[1024];
Scanner scanner = new Scanner(System.in);
try {
client = new Socket(ADDRESS, PORT);
inputStream = client.getInputStream();
outputStream = client.getOutputStream();
while (true) {
String string = scanner.nextLine();
if("end".equals(string)){
System.out.println("Client End");
break;
}
outputStream.write(string.getBytes());
int length = 0;
if ((length = inputStream.read(buffer)) != 0) {
System.out.println(new String(buffer, 0, length));
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
我们来看一BIO的阻塞点
1,accept()
2, read(buffer)
accept()
方法上的注释
* Listens for a connection to be made to this socket and accepts
* it. The method blocks until a connection is made.
这个方法会阻塞直到有新的客户端连接
再看下read(buffer)
方法
* Reads some number of bytes from the input stream and stores them into
* the buffer array <code>b</code>. The number of bytes actually read is
* returned as an integer. This method blocks until input data is
* available, end of file is detected, or an exception is thrown.
也是会阻塞直到输入数据到达
我们来简单调试一波
accept()
底部调的是一个native方法
java.net.PlainSocketImpl#socketAccept
会阻塞,我们在终端用 nc 调一下
会创建一个新的客户端连接,解阻塞
得到一个fd 客户端Socket连接
read方法阻塞
客户端输入 hehe
解阻塞,读到数据
等待客户端连接会阻塞、等待数据到达会阻塞
BIO是一个同步阻塞的IO模型
再写一个多线程处理版本
package pers.wmx.io;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author: wangmingxin03
* @date: 2020-09-01
*/
public class BioMultiThreadServer {
private static final int PORT = 8888;
public static void main(String[] args) {
ServerSocket serverSocket = null;
try{
serverSocket = new ServerSocket(PORT);
Socket socket = null;
while (true){
socket = serverSocket.accept();
new Thread(new SocketHandler(socket)).start();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if (serverSocket != null){
try {
serverSocket.close();
serverSocket = null;
}catch (IOException e){
e.printStackTrace();
}
}
}
}
}
class SocketHandler implements Runnable {
private Socket socket;
public SocketHandler(Socket socket) {
this.socket=socket;
}
public void run() {
System.out.println(Thread.currentThread().getName() + " handel ...");
InputStream in = null;
OutputStream out = null;
try {
in = socket.getInputStream();
byte[] buffer = new byte[1024];
int length = 0;
while ((length = in.read(buffer)) != 0) {
System.out.println(new String(buffer, 0, length));
out = socket.getOutputStream();
out.write("success".getBytes());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (out != null) {
try {
out.close();
}catch (Exception e){
e.printStackTrace();
}
}
if (socket != null){
try {
socket.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
}
对IO API不熟的朋友建议把本文的demo都写一下
这里除了用telnet,也可以用nc模拟客户端调用
我们通过多线程达到伪异步,通过多线程处理Socekt的读写
这里也可以用线程池处理
但是这样也还是会存在一些问题:
1,虽然在服务器端,请求的处理交给了一个独立线程进行,但是操作系统通知accept()的方式还是单个的。也就是,实际上是服务器接收到数据报文后的“业务处理过程”可以多线程,但是数据报文的接受还是需要一个一个的来(下文的示例代码和debug过程我们可以明确看到这一点)
2,在linux系统中,可以创建的线程是有限的。我们可以通过cat /proc/sys/kernel/threads-max 命令查看可以创建的最大线程数。当然这个值是可以更改的,但是线程越多,CPU切换所需的时间也就越长,用来处理真正业务的需求也就越少。
3,创建一个线程是有较大的资源消耗的。JVM创建一个线程的时候,即使这个线程不做任何的工作,JVM都会分配一个堆栈空间。这个空间的大小默认为128K,您可以通过-Xss参数进行调整。当然您还可以使用ThreadPoolExecutor线程池来缓解线程的创建问题,但是又会造成BlockingQueue积压任务的持续增加,同样消耗了大量资源。
4,另外,如果您的应用程序大量使用长连接的话,线程是不会关闭的。这样系统资源的消耗更容易失控。
那么,如果你真想单纯使用线程解决阻塞的问题,那么您自己都可以算出来您一个服务器节点可以一次接受多大的并发了。看来,单纯使用线程解决这个问题不是最好的办法。
NIO
NIO : New I/O or Non-block I/O
NIO核心组件介绍
【通道】
通道 Channel 是对原 I/O 包中的流的模拟,可以通过它读取和写入数据。
通道与流的不同之处在于,流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。
通道包括以下类型:
- FileChannel: 从文件中读写数据;
- DatagramChannel: 通过 UDP 读写网络中数据;
- SocketChannel: 通过 TCP 读写网络中数据;
- ServerSocketChannel: 可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。
【缓冲区】
发送给一个通道的所有数据都必须首先放到缓冲区中,同样地,从通道中读取的任何数据都要先读到缓冲区中。
不会直接对通道进行读写数据,而是要先经过缓冲区。
缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。
缓冲区包括以下类型:
- ByteBuffer
- CharBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
对ByteBuffer的读写操作
【选择器】
Selector允许单线程处理多个 Channel。
使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。
然后我们一起看下NIO模型的特点
Java NIO底层实现是根据不同平台来选择的
windows是select模型
linux 是 epoll模型
说了半天B话,相信大家已经饥渴难耐了
是时候整波代码见证一下NIO的强大…
package pers.wmx.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @author: wangmingxin03
* @date: 2020-09-07
*/
public class NioServer {
public static void main(String[] args) throws IOException {
//选择器
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//channel 设置为非阻塞 最终调的是一个native方法
serverSocketChannel.configureBlocking(false);
//注册连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//基于serverSocketChannel创建一个服务端Socket
ServerSocket socket = serverSocketChannel.socket();
InetSocketAddress address = new InetSocketAddress("localhost", 8888);
socket.bind(address);
while(true) {
//系统调用 阻塞
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = keys.iterator();
//把所有注册的key遍历一遍
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
//有新连接
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
//为客户端创建一个新的channel
SocketChannel clientChannel = serverChannel.accept();
System.out.println("新连接...");
clientChannel.configureBlocking(false);
clientChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
System.out.println("数据到达...");
SocketChannel clientChannel = (SocketChannel)key.channel();
readDataFromChannel(clientChannel);
}
keyIterator.remove();
}
}
}
private static void readDataFromChannel(SocketChannel clientChannel) throws IOException {
ByteBuffer readbuffer = ByteBuffer.allocate(1024);
// 读取请求码流,返回读取到的字节数
int readBytes = clientChannel.read(readbuffer);
// 读取到字节,对字节进行编解码
if (readBytes > 0) {
// 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
readbuffer.flip();//读写模式反转
byte[] bytes = new byte[readbuffer.remaining()];
readbuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println(body);
output(clientChannel, body);
}
}
private static void output(SocketChannel channel, String response) throws IOException {
if (response != null && response.length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
NIO的API还是比较复杂的,同样建议没写过的手撸一遍
可以简单看下 Selector.open()
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
SelectionKey
四种事件的表示
/**
* Operation-set bit for read operations.
*/
public static final int OP_READ = 1 << 0;
/**
* Operation-set bit for write operations.
*/
public static final int OP_WRITE = 1 << 2;
/**
* Operation-set bit for socket-connect operations.
*/
public static final int OP_CONNECT = 1 << 3;
/**
* Operation-set bit for socket-accept operations.
*/
public static final int OP_ACCEPT = 1 << 4;
我们常用的是 OP_ACCEPT
和 OP_READ
//channel 设置为非阻塞 最终调的是一个native方法
serverSocketChannel.configureBlocking(false);
//注册连接事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
这两行代码是NIO的关键
selector.select();
是系统调用,产生阻塞,监听事件是否到达
我们知道有事件到达,但不知道是什么事件
因此还需要遍历SelectionKey,确定哪些key产生了什么事件再做对应的处理
IO多路复用器
IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO。
select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。
它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
如上图:
当进程调用select,进程就会被阻塞。
此时内核会监视所有select负责的的socket,当socket的数据准备好后,就立即返回。
进程再调用recvfrom操作,数据就会从内核拷贝到进程。
学习这一块主要就是学习select、poll、epoll了
epoll做了两方面的优化
1,会在内核创建一片空间,存放需要监听的fd
2,基于事件产生中断
【select/poll】
老李去火车站买票,委托黄牛,然后每隔6小时电话黄牛询问,黄牛三天内买到票,然后老李去火车站交钱领票。
耗费:打电话
【epoll】
老李去火车站买票,委托黄牛,黄牛买到后即通知老李去领,然后老李去火车站交钱领票。
耗费:无需打电话
我们来看下linux环境java nio的底层实现
把上面我们写的NioServer代码上传至服务器
javac -d . NioServer.java
java pers/wmx/io/NioServer
我们可以用 strace 监控系统调用
strace -ff -o log java pers/wmx/io/NioServer
再开一个终端 nc localhost 8888
可以处理多个连接
可以看到在当前目录生成了这些log
打开log
我们看到socket 套接字和其文件描述符 9
并设置成 nonblock
再往下翻
看到了epoll_create
即多路复用器epoll在内核开辟一片空间,存放需要监听的文件描述符
进一步证明了linux 下nio的底层实现是epoll
epoll_ctl 把9 存在内核空间
epoll_wait 等待IO事件
accept(9, {sa_family=AF_INET, sin_port=htons(51944), sin_addr=inet_addr("127.0.0.1")}, [16]) = 11
accept
得到客户端连接,文件描述符 11
把新的连接11也存放在内核空间
我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的socket外,还会再建立一个list链表,用于存储准备就绪的事件。当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。
通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已。
当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。
信号驱动IO
异步IO模型
当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者的输入输出操作。
异步IO模型比较牛逼,但是实现难度大,而且需要操作系统底层支持
信号驱动IO和异步IO模型这两部分没怎么研究过先简单带过
Netty
先简单学习下,后面学习后再专门整理netty的博客
Netty 是基于NIO封装的一套高性能IO框架
NioEventLoop 可以理解成处理连接和IO的线程组
Channel 是对Socekt的封装
Pipeline 表示逻辑链
ChannelHandler 表示逻辑处理
Demo:
package pers.wmx.io.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author: wangmingxin03
* @date: 2020-09-08
*/
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//boss线程组 处理客户端连接
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//worker线程组 处理客户端连接
EventLoopGroup workGroup = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//在逻辑链pipeline添加自定义handler
channel.pipeline().addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
System.out.println("Server Starting ...");
//关闭通道、线程组
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
package pers.wmx.io.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 业务处理
*
* @author: wangmingxin03
* @date: 2020-09-08
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
//读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg){
System.out.println("Server:"+ctx);
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发来的消息:" + buf.toString(CharsetUtil.UTF_8));
}
//数据读取完毕事件
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.copiedBuffer("滚犊子", CharsetUtil.UTF_8));
}
}
package pers.wmx.io.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author: wangmingxin03
* @date: 2020-09-09
*/
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();
channelFuture.channel().closeFuture().sync();
}
}
package pers.wmx.io.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* @author: wangmingxin03
* @date: 2020-09-09
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
//通道就绪事件
public void channelActive(ChannelHandlerContext ctx){
System.out.println("Client:" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("傻吊", CharsetUtil.UTF_8));
}
//读取数据事件
public void channelRead(ChannelHandlerContext ctx,Object msg){
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器端发来的消息:" + buf.toString(CharsetUtil.UTF_8));
}
}
先这样,后面再深入研究下IO多路复用和学习netty组件和源码
说点什么
您将是第一位评论人!