Welcome everyone

简单模拟消息队列

消息队列 汪明鑫 575浏览 0评论

利用Socket,ArrayBlockingQueue,IO流模拟消息队列的消息生产和消费

需要一个消息处理中心,存储消息

 

生产消息客户端  —>  消息处理中心  —>  消费消息客户端

模拟最简单的一种模式

生产者往消息中心推消息

消费者从消息中心拉消息

使用Socket通信

 

消息处理中心就可以理解成Kafka的一个Broker,也可以考虑消息持久化

这里我们用ArrayBlockingQueue存储消息

 

开搞—-

首先需要一个消息处理中心Broker,存储消息

/**
 * 消息Broker
 *
 * @author wmx
 * @date 2019-09-20
 */
public class Broker {

    /**
     * 存储消息最大数量
     */
    private final static int MAX_SIZE = 3;

    /**
     * 存储消息容器
     */
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
    
    /**
     * 生产消息
     * 
     * @param msg
     */
    public static void produce(String msg){
        if(messageQueue.offer(msg)){
            System.out.println("成功生产消息:"+msg+" 当前消息数量:"+messageQueue.size());
        }else {
            System.out.println("消息已满,等待消费...");
        }
    }

    /**
     * 消费消息
     */
    public static String consume(){
        String msg = messageQueue.poll();
        if(msg == null){
            System.out.println("消息为空,等待生产...");
        }else {
            System.out.println("成功消费消息:"+msg+" 当前消息数量:"+messageQueue.size());
        }

        return msg;
    }

}

 

然后需要把Broker服务暴露出去,并启动服务,和客户端交互

/**
 * 对外提供Broker服务
 *
 * @author wmx
 * @date 2019-09-20
 */
public class BrokerServer implements Runnable{

    /**
     * 服务暴露端口号
     */
    public static int SERVICE_PORT = 9999;

    /**
     * 客户端消费标示
     */
    public static String CONSUME_FLAG = "CONSUME";

    private final Socket socket;


    public BrokerServer(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
             PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
          while(true){
              String msg = in.readLine();
              if(msg == null){
                  continue;
              }
              
              System.out.println("接收信号:"+msg);

              if(CONSUME_FLAG.equals(msg)){
                  //消费消息
                  String message = Broker.consume();
                  out.println(message);
                  out.flush();

              }else{
                  //生产消息
                  Broker.produce(msg);
              }

          }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {
        ServerSocket server = new ServerSocket(SERVICE_PORT);
        while (true){
            BrokerServer brokerServer = new BrokerServer(server.accept());
            new Thread(brokerServer).start();
        }

    }
}

 

把客户端使用方法封装起来

/**
 * 客户端
 *
 * @author wmx
 * @date 2019-09-20
 */
public class MqClient {
    public void produce(String message) throws Exception {
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
        try(PrintWriter out = new PrintWriter(socket.getOutputStream())){
            out.println(message);
            out.flush();
        }
    }

    public String consume() throws Exception {
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
        try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){

            out.println(BrokerServer.CONSUME_FLAG);
            out.flush();
            String message = in.readLine();

            return message;
        }
    }
}

 

生产者客户端:

/**
 * 生产消息客户端
 *
 * @author wmx
 * @date 2019-09-20
 */
public class ProduceClient {
    public static void main(String[] args) throws Exception {
        MqClient client = new MqClient();
        client.produce("云顶之奕");
    }
}

 

消费者客户端:

/**
 * 消费消息客户端
 *
 * @author wmx
 * @date 2019-09-20
 */
public class ConsumeClient {
    public static void main(String[] args) throws Exception {
        MqClient client = new MqClient();

        String message = client.consume();
        System.out.println("获取消息:"+message);

        //处理业务逻辑
        
    }
}

 

完成,测试一波

首先启动BrokerServer

 

再运行4次ProduceClient

再运行4次ConsumeClient

 

 

参考:《分布式消息中间件实践》

 

在这里只是个简单的模拟,建议手动敲下,就当复习下Socket,ArrayBlockingQueue,IO流

实际上的消息队列消息的生产和消费,要复杂的多的多的多

比如要考虑消息可靠性,有序性,消息丢了怎么办,Broker宕机了怎么办,如何实现消息的复制,消息队列集群,性能,消息积压等等

可以说消息队列的水深的压批

会涉及的知识:网络通信、并发编程、序列化和反序列化、幂等性、传输协议、批处理、数据压缩、缓存等等

 

转载请注明:汪明鑫的个人博客 » 简单模拟消息队列

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz