仿照 netty 写个低配版本 nio多线程处理客户端请求
package pers.wmx.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author wangmingxin03
* Created on 2023-07-26
*/
public class SelectorThread
extends ThreadLocal<LinkedBlockingQueue<Channel>>
implements Runnable {
Selector selector = null;
LinkedBlockingQueue<Channel> queue = get();
SelectorThreadGroup selectorThreadGroup = null;
SelectorThread(SelectorThreadGroup selectorThreadGroup){
try {
this.selectorThreadGroup = selectorThreadGroup;
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected LinkedBlockingQueue<Channel> initialValue() {
return new LinkedBlockingQueue<>();
}
@Override
public void run() {
while (true) {
try {
// 阻塞读
int nums = selector.select();
if (nums > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){
acceptHandler(key);
}else if(key.isReadable()){
readHandler(key);
}else if(key.isWritable()){
}
}
}
if (!queue.isEmpty()) {
Channel channel = queue.take();
if(channel instanceof ServerSocketChannel){
ServerSocketChannel server = (ServerSocketChannel) channel;
// 注册接受连接事件
server.register(selector,SelectionKey.OP_ACCEPT);
System.out.println(Thread.currentThread().getName() + " register accept");
} else if(channel instanceof SocketChannel){
SocketChannel client = (SocketChannel) channel;
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
// 注册可读事件
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println(Thread.currentThread().getName()+" register read : "
+ client.getRemoteAddress());
}
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void readHandler(SelectionKey key) {
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel client = (SocketChannel)key.channel();
buffer.clear();
while(true) {
try {
int num = client.read(buffer);
if (num > 0){
buffer.flip(); //将读到的内容翻转,然后直接写出
System.out.println(
Thread.currentThread().getName() + " read buffer");
while(buffer.hasRemaining()){
client.write(buffer);
}
buffer.clear();
} else if (num == 0){
break;
} else {
//客户端断开了
System.out.println("client: " + client.getRemoteAddress() + "closed......");
key.cancel();
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void acceptHandler(SelectionKey key) {
System.out.println(Thread.currentThread().getName()+" acceptHandler......");
ServerSocketChannel server = (ServerSocketChannel)key.channel();
try {
SocketChannel client = server.accept();
client.configureBlocking(false);
selectorThreadGroup.nextSelector(client);
} catch (IOException e) {
e.printStackTrace();
}
}
}
package pers.wmx.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author wangmingxin03
* Created on 2023-07-26
*/
public class SelectorThreadGroup {
SelectorThread[] selectorThreads;
ServerSocketChannel server=null;
AtomicInteger counter = new AtomicInteger(0);
SelectorThreadGroup(int num){
//num 线程数
selectorThreads = new SelectorThread[num];
for (int i = 0; i < num; i++) {
selectorThreads[i] = new SelectorThread(this);
new Thread(selectorThreads[i]).start();
}
}
public void bind(int port) {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
nextSelector(server);
} catch (IOException e) {
e.printStackTrace();
}
}
public void nextSelector(Channel channel) {
SelectorThread selectorThread = next();
selectorThread.queue.add(channel);
// 打断阻塞
selectorThread.selector.wakeup();
}
private SelectorThread next() {
int index = counter.getAndIncrement() % selectorThreads.length; //轮询就会很尴尬,倾斜
return selectorThreads[index];
}
}
package pers.wmx.nio;
/**
* @author wangmingxin03
* Created on 2023-07-26
*/
public class MainThread {
public static void main(String[] args) {
System.out.println(Thread.currentThread().getName() + " enter ...");
SelectorThreadGroup selectorThreadGroup = new SelectorThreadGroup(5);
selectorThreadGroup.bind(8886);
}
}
一个线程处理连接请求
为每个客户端再分配一个线程处理读写请求
再仿照netty的主从模式,整一个简易版本的
package pers.wmx.nio1;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author wangmingxin03
* Created on 2023-07-26
*/
public class SelectorThread
extends ThreadLocal<LinkedBlockingQueue<Channel>>
implements Runnable {
Selector selector = null;
LinkedBlockingQueue<Channel> queue = get();
SelectorThreadGroup selectorThreadGroup = null;
SelectorThread(SelectorThreadGroup selectorThreadGroup){
try {
this.selectorThreadGroup = selectorThreadGroup;
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
protected LinkedBlockingQueue<Channel> initialValue() {
return new LinkedBlockingQueue<>();
}
@Override
public void run() {
while (true) {
try {
// 阻塞读
int nums = selector.select();
if (nums > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){
acceptHandler(key);
}else if(key.isReadable()){
readHandler(key);
}else if(key.isWritable()){
}
}
}
if (!queue.isEmpty()) {
Channel channel = queue.take();
if(channel instanceof ServerSocketChannel){
ServerSocketChannel server = (ServerSocketChannel) channel;
// 注册接受连接事件
server.register(selector,SelectionKey.OP_ACCEPT);
System.out.println(Thread.currentThread().getName() + " register accept");
} else if(channel instanceof SocketChannel){
SocketChannel client = (SocketChannel) channel;
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
// 注册可读事件
client.register(selector, SelectionKey.OP_READ, buffer);
System.out.println(Thread.currentThread().getName()+" register read : "
+ client.getRemoteAddress());
}
}
} catch (IOException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void readHandler(SelectionKey key) {
ByteBuffer buffer = (ByteBuffer)key.attachment();
SocketChannel client = (SocketChannel)key.channel();
buffer.clear();
while(true) {
try {
int num = client.read(buffer);
if (num > 0){
buffer.flip(); //将读到的内容翻转,然后直接写出
System.out.println(
Thread.currentThread().getName() + " read buffer");
while(buffer.hasRemaining()){
client.write(buffer);
}
buffer.clear();
} else if (num == 0){
break;
} else {
//客户端断开了
System.out.println("client: " + client.getRemoteAddress() + "closed......");
key.cancel();
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void acceptHandler(SelectionKey key) {
System.out.println(Thread.currentThread().getName()+" acceptHandler......");
ServerSocketChannel server = (ServerSocketChannel)key.channel();
try {
SocketChannel client = server.accept();
client.configureBlocking(false);
selectorThreadGroup.nextSelector(client);
} catch (IOException e) {
e.printStackTrace();
}
}
}
package pers.wmx.nio1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author wangmingxin03
* Created on 2023-07-26
*/
public class SelectorThreadGroup {
SelectorThread[] selectorThreads;
ServerSocketChannel server = null;
AtomicInteger counter = new AtomicInteger(0);
SelectorThreadGroup worker = this;
SelectorThreadGroup(int num){
//num 线程数
selectorThreads = new SelectorThread[num];
for (int i = 0; i < num; i++) {
selectorThreads[i] = new SelectorThread(this);
new Thread(selectorThreads[i]).start();
}
}
public void setWorker(SelectorThreadGroup worker) {
this.worker = worker;
}
public void bind(int port) {
try {
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress(port));
nextSelector(server);
} catch (IOException e) {
e.printStackTrace();
}
}
public void nextSelector(Channel channel) {
try {
if (channel instanceof ServerSocketChannel) {
SelectorThread selectorThread = next();
selectorThread.queue.put(channel);
// 打断阻塞
selectorThread.selector.wakeup();
} else {
// 交给woker线程处理读写
SelectorThread selectorThread = nextWorker();
selectorThread.queue.put(channel);
// 打断阻塞
selectorThread.selector.wakeup();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private SelectorThread next() {
int index = counter.getAndIncrement() % selectorThreads.length; //轮询就会很尴尬,倾斜
return selectorThreads[index];
}
// 从worker取
private SelectorThread nextWorker() {
int index = counter.getAndIncrement() % worker.selectorThreads.length;
return worker.selectorThreads[index];
}
}
package pers.wmx.nio1;
/**
* @author wangmingxin03
* Created on 2023-07-26
*/
public class MainThread {
public static void main(String[] args) {
System.out.println(Thread.currentThread().getName() + " enter ...");
SelectorThreadGroup boss = new SelectorThreadGroup(2);
SelectorThreadGroup worker = new SelectorThreadGroup(4);
boss.setWorker(worker);
boss.bind(8886);
boss.bind(8887);
boss.bind(8888);
}
}
boss起2个线程
worker起4个线程
boss处理客户端连接请求
worker处理客户端读写请求
服务跑起来,模拟客户端连接
转载请注明:汪明鑫的个人博客 » NIO 多线程处理客户端请求
说点什么
您将是第一位评论人!