jdk 1.4之前所有的socket通信都采用同步阻塞模式(bio),这种一请求一应答的通信模型简化了上层应用的开发,但是在可靠性和性能方面存在巨大的弊端,下图是bio通信模型图
通常由一个独立的Acceptor线程负责监听客户端的连接,接收到客户端连接之后为客户端连接创建一个新的线程处理请求消息,处理完成之后,返回应答消息给客户端,线程销毁。该架构最大的问题是不具备弹性伸缩能力,当并发访问量增加后,服务端的线程个数和并发访问数成线性正比,当线程数剧增后,会引起一系列连锁反应,直至系统崩溃。
//同步阻塞式,单线程处理
public class IOServerSingleThread {
private static final Logger LOGGER = LoggerFactory.getLogger(IOServer.class);
public static void main(String[] args) {
LOGGER.info("nihao");
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(1234));
serverSocket.setSoTimeout(0);
} catch (IOException ex) {
LOGGER.error("Listen failed", ex);
return;
}
InputStream inputstream = null;
try{
while(true) {
Socket socket = serverSocket.accept();
inputstream = socket.getInputStream();
LOGGER.info("Received message {}", IOUtils.toString(inputstream));
IOUtils.closeQuietly(inputstream);
}
} catch(IOException ex) {
IOUtils.closeQuietly(inputstream);
LOGGER.error("Read message failed", ex);
}
}
}
//同步阻塞多线程处理
public class IOServerMultiThread {
private static final Logger LOGGER = LoggerFactory.getLogger(IOServerMultiThread.class);
public static void main(String[] args) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(2345));
serverSocket.setSoTimeout(0);
} catch (IOException ex) {
LOGGER.error("Listen failed", ex);
return;
}
try{
while(true) {
Socket socket = serverSocket.accept();
new Thread( () -> {
try{
InputStream inputstream = socket.getInputStream();
LOGGER.info("Received message {}", IOUtils.toString(inputstream));
IOUtils.closeQuietly(inputstream);
} catch (IOException ex) {
LOGGER.error("Read message failed", ex);
}
}).start();
}
} catch(IOException ex) {
LOGGER.error("Accept connection failed", ex);
}
}
}
由于线程数会随着客户端请求的增加而增加,由此我们进行分析优化,递进出如下的模型:伪异步模型
服务端接收到客户端连接之后,不创建独立的线程,而是将socket连接封装成Task,将Task放入线程池的任务队列中执行,这样可以有效控制线程的规模,防止线程膨胀导致系统的崩溃,还能有效重复利用线程
//伪异步模型
public class IOServerThreadPool {
private static final Logger LOGGER = LoggerFactory.getLogger(IOServerThreadPool.class);
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(2345));
} catch (IOException ex) {
LOGGER.error("Listen failed", ex);
return;
}
try{
while(true) {
Socket socket = serverSocket.accept();
executorService.submit(() -> {
try{
InputStream inputstream = socket.getInputStream();
LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream)));
} catch (IOException ex) {
LOGGER.error("Read message failed", ex);
}
});
}
} catch(IOException ex) {
try {
serverSocket.close();
} catch (IOException e) {
}
LOGGER.error("Accept connection failed", ex);
}
}
}
下面我们来说说actor模型,reactor模型,proactor模型
actor模型是建立在用户态的一种并发模型,实体间的调用,通过消息进行通讯,消息的发送和接收是完全异步的,消息的发送方会将消息发送到一个FIFO队列,消息的接收方会从这个FIFO队列中取出消息,因为是FIFO队列,消息必然有先来后到,先到的消息会被先处理,后到的消息阻塞,每一个任务由以下三个属性:tag:用以区别于系统中的其它任务
target:通信到达的地址
communication:包含在target上的Actor处理任务时可获取的信息
actor的具体模型会在后面的一遍中详细讲解
reactor模型
reactor,即反应堆。reactor的一般工作工作过程是首先在Reactor中注册事件,并在注册时指定某个已定义的回调函数,当客户端发送请求时,在reactor中会触发刚才注册的事件,并调用对应的处理函数。
这是一个reactor模型的简略图,比较通俗化的图形如下
reactor模型中包含三种角色,分别是reactor,acceptor,handler
reactor: 负责派发IO事件给对应的角色处理,为了监听IO事件,select必须实现在reactor中
acceptor: 负责接受client的连线,然后给client绑定一个handler并注册IO事件到reactor上监听