Welcome everyone

NIO 多线程处理客户端请求

java 汪明鑫 540浏览 0评论

仿照 netty 写个低配版本 nio多线程处理客户端请求

package pers.wmx.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author wangmingxin03
 * Created on 2023-07-26
 */
public class SelectorThread
        extends ThreadLocal<LinkedBlockingQueue<Channel>>
        implements Runnable {

    Selector selector = null;
    LinkedBlockingQueue<Channel> queue = get();

    SelectorThreadGroup selectorThreadGroup = null;

    SelectorThread(SelectorThreadGroup selectorThreadGroup){
        try {
            this.selectorThreadGroup = selectorThreadGroup;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected LinkedBlockingQueue<Channel> initialValue() {
        return new LinkedBlockingQueue<>();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 阻塞读
                int nums = selector.select();

                if (nums > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();

                        if(key.isAcceptable()){
                            acceptHandler(key);
                        }else if(key.isReadable()){
                            readHandler(key);
                        }else if(key.isWritable()){

                        }
                    }
                }

                if (!queue.isEmpty()) {
                    Channel channel = queue.take();

                    if(channel instanceof ServerSocketChannel){
                        ServerSocketChannel server = (ServerSocketChannel) channel;

                        // 注册接受连接事件
                        server.register(selector,SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName() + " register accept");
                    } else if(channel instanceof SocketChannel){
                        SocketChannel client = (SocketChannel) channel;
                        ByteBuffer buffer = ByteBuffer.allocateDirect(4096);

                        // 注册可读事件
                        client.register(selector, SelectionKey.OP_READ, buffer);
                        System.out.println(Thread.currentThread().getName()+" register read : "
                                + client.getRemoteAddress());

                    }
                }

            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void readHandler(SelectionKey key) {
        ByteBuffer buffer = (ByteBuffer)key.attachment();
        SocketChannel client = (SocketChannel)key.channel();
        buffer.clear();
        while(true) {
            try {
                int num = client.read(buffer);
                if (num > 0){
                    buffer.flip();  //将读到的内容翻转,然后直接写出
                    System.out.println(
                            Thread.currentThread().getName() + " read buffer");
                    while(buffer.hasRemaining()){
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (num == 0){
                    break;
                } else {
                    //客户端断开了
                    System.out.println("client: " + client.getRemoteAddress() + "closed......");
                    key.cancel();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+"   acceptHandler......");

        ServerSocketChannel server = (ServerSocketChannel)key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            selectorThreadGroup.nextSelector(client);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}
package pers.wmx.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author wangmingxin03
 * Created on 2023-07-26
 */
public class SelectorThreadGroup {
    SelectorThread[] selectorThreads;
    ServerSocketChannel server=null;
    AtomicInteger counter = new AtomicInteger(0);

    SelectorThreadGroup(int num){
        //num  线程数
        selectorThreads = new SelectorThread[num];
        for (int i = 0; i < num; i++) {
            selectorThreads[i] = new SelectorThread(this);

            new Thread(selectorThreads[i]).start();
        }
    }

    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            nextSelector(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void nextSelector(Channel channel) {
        SelectorThread selectorThread = next();
        selectorThread.queue.add(channel);
        // 打断阻塞
        selectorThread.selector.wakeup();
    }

    private SelectorThread next() {
        int index = counter.getAndIncrement() % selectorThreads.length;  //轮询就会很尴尬,倾斜
        return selectorThreads[index];
    }
}
package pers.wmx.nio;

/**
 * @author wangmingxin03
 * Created on 2023-07-26
 */
public class MainThread {

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + " enter ...");
        SelectorThreadGroup selectorThreadGroup = new SelectorThreadGroup(5);
        selectorThreadGroup.bind(8886);
    }

}


程序跑起来,开4个客户端连接

一个线程处理连接请求

为每个客户端再分配一个线程处理读写请求

再仿照netty的主从模式,整一个简易版本的

package pers.wmx.nio1;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @author wangmingxin03
 * Created on 2023-07-26
 */
public class SelectorThread
        extends ThreadLocal<LinkedBlockingQueue<Channel>>
        implements Runnable {

    Selector selector = null;
    LinkedBlockingQueue<Channel> queue = get();

    SelectorThreadGroup selectorThreadGroup = null;

    SelectorThread(SelectorThreadGroup selectorThreadGroup){
        try {
            this.selectorThreadGroup = selectorThreadGroup;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    protected LinkedBlockingQueue<Channel> initialValue() {
        return new LinkedBlockingQueue<>();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 阻塞读
                int nums = selector.select();

                if (nums > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();

                        if(key.isAcceptable()){
                            acceptHandler(key);
                        }else if(key.isReadable()){
                            readHandler(key);
                        }else if(key.isWritable()){

                        }
                    }
                }

                if (!queue.isEmpty()) {
                    Channel channel = queue.take();

                    if(channel instanceof ServerSocketChannel){
                        ServerSocketChannel server = (ServerSocketChannel) channel;

                        // 注册接受连接事件
                        server.register(selector,SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName() + " register accept");
                    } else if(channel instanceof SocketChannel){
                        SocketChannel client = (SocketChannel) channel;
                        ByteBuffer buffer = ByteBuffer.allocateDirect(4096);

                        // 注册可读事件
                        client.register(selector, SelectionKey.OP_READ, buffer);
                        System.out.println(Thread.currentThread().getName()+" register read : "
                                + client.getRemoteAddress());

                    }
                }

            } catch (IOException | InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void readHandler(SelectionKey key) {
        ByteBuffer buffer = (ByteBuffer)key.attachment();
        SocketChannel client = (SocketChannel)key.channel();
        buffer.clear();
        while(true) {
            try {
                int num = client.read(buffer);
                if (num > 0){
                    buffer.flip();  //将读到的内容翻转,然后直接写出
                    System.out.println(
                            Thread.currentThread().getName() + " read buffer");
                    while(buffer.hasRemaining()){
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (num == 0){
                    break;
                } else {
                    //客户端断开了
                    System.out.println("client: " + client.getRemoteAddress() + "closed......");
                    key.cancel();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+"   acceptHandler......");

        ServerSocketChannel server = (ServerSocketChannel)key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            selectorThreadGroup.nextSelector(client);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}
package pers.wmx.nio1;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author wangmingxin03
 * Created on 2023-07-26
 */
public class SelectorThreadGroup {
    SelectorThread[] selectorThreads;
    ServerSocketChannel server = null;
    AtomicInteger counter = new AtomicInteger(0);

    SelectorThreadGroup worker = this;

    SelectorThreadGroup(int num){
        //num  线程数
        selectorThreads = new SelectorThread[num];
        for (int i = 0; i < num; i++) {
            selectorThreads[i] = new SelectorThread(this);

            new Thread(selectorThreads[i]).start();
        }
    }

    public void setWorker(SelectorThreadGroup worker) {
        this.worker = worker;
    }

    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            nextSelector(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void nextSelector(Channel channel) {
        try {
            if (channel instanceof ServerSocketChannel) {
                SelectorThread selectorThread = next();
                selectorThread.queue.put(channel);
                // 打断阻塞
                selectorThread.selector.wakeup();
            } else {
                // 交给woker线程处理读写
                SelectorThread selectorThread = nextWorker();
                selectorThread.queue.put(channel);
                // 打断阻塞
                selectorThread.selector.wakeup();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private SelectorThread next() {
        int index = counter.getAndIncrement() % selectorThreads.length;  //轮询就会很尴尬,倾斜
        return selectorThreads[index];
    }

    // 从worker取
    private SelectorThread nextWorker() {
        int index = counter.getAndIncrement() % worker.selectorThreads.length;
        return worker.selectorThreads[index];
    }
}
package pers.wmx.nio1;

/**
 * @author wangmingxin03
 * Created on 2023-07-26
 */
public class MainThread {

    public static void main(String[] args) {
        System.out.println(Thread.currentThread().getName() + " enter ...");
        SelectorThreadGroup boss = new SelectorThreadGroup(2);
        SelectorThreadGroup worker = new SelectorThreadGroup(4);
        boss.setWorker(worker);

        boss.bind(8886);
        boss.bind(8887);
        boss.bind(8888);
    }

}

boss起2个线程

worker起4个线程

boss处理客户端连接请求

worker处理客户端读写请求

服务跑起来,模拟客户端连接

转载请注明:汪明鑫的个人博客 » NIO 多线程处理客户端请求

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz