利用Socket,ArrayBlockingQueue,IO流模拟消息队列的消息生产和消费
需要一个消息处理中心,存储消息
生产消息客户端 —> 消息处理中心 —> 消费消息客户端
模拟最简单的一种模式
生产者往消息中心推消息
消费者从消息中心拉消息
使用Socket通信
消息处理中心就可以理解成Kafka的一个Broker,也可以考虑消息持久化
这里我们用ArrayBlockingQueue存储消息
开搞—-
首先需要一个消息处理中心Broker,存储消息
/**
* 消息Broker
*
* @author wmx
* @date 2019-09-20
*/
public class Broker {
/**
* 存储消息最大数量
*/
private final static int MAX_SIZE = 3;
/**
* 存储消息容器
*/
private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
/**
* 生产消息
*
* @param msg
*/
public static void produce(String msg){
if(messageQueue.offer(msg)){
System.out.println("成功生产消息:"+msg+" 当前消息数量:"+messageQueue.size());
}else {
System.out.println("消息已满,等待消费...");
}
}
/**
* 消费消息
*/
public static String consume(){
String msg = messageQueue.poll();
if(msg == null){
System.out.println("消息为空,等待生产...");
}else {
System.out.println("成功消费消息:"+msg+" 当前消息数量:"+messageQueue.size());
}
return msg;
}
}
然后需要把Broker服务暴露出去,并启动服务,和客户端交互
/**
* 对外提供Broker服务
*
* @author wmx
* @date 2019-09-20
*/
public class BrokerServer implements Runnable{
/**
* 服务暴露端口号
*/
public static int SERVICE_PORT = 9999;
/**
* 客户端消费标示
*/
public static String CONSUME_FLAG = "CONSUME";
private final Socket socket;
public BrokerServer(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
while(true){
String msg = in.readLine();
if(msg == null){
continue;
}
System.out.println("接收信号:"+msg);
if(CONSUME_FLAG.equals(msg)){
//消费消息
String message = Broker.consume();
out.println(message);
out.flush();
}else{
//生产消息
Broker.produce(msg);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(SERVICE_PORT);
while (true){
BrokerServer brokerServer = new BrokerServer(server.accept());
new Thread(brokerServer).start();
}
}
}
把客户端使用方法封装起来
/**
* 客户端
*
* @author wmx
* @date 2019-09-20
*/
public class MqClient {
public void produce(String message) throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try(PrintWriter out = new PrintWriter(socket.getOutputStream())){
out.println(message);
out.flush();
}
}
public String consume() throws Exception {
Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);
try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream())
){
out.println(BrokerServer.CONSUME_FLAG);
out.flush();
String message = in.readLine();
return message;
}
}
}
生产者客户端:
/**
* 生产消息客户端
*
* @author wmx
* @date 2019-09-20
*/
public class ProduceClient {
public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
client.produce("云顶之奕");
}
}
消费者客户端:
/**
* 消费消息客户端
*
* @author wmx
* @date 2019-09-20
*/
public class ConsumeClient {
public static void main(String[] args) throws Exception {
MqClient client = new MqClient();
String message = client.consume();
System.out.println("获取消息:"+message);
//处理业务逻辑
}
}
完成,测试一波
首先启动BrokerServer
再运行4次ProduceClient
再运行4次ConsumeClient
参考:《分布式消息中间件实践》
在这里只是个简单的模拟,建议手动敲下,就当复习下Socket,ArrayBlockingQueue,IO流
实际上的消息队列消息的生产和消费,要复杂的多的多的多
比如要考虑消息可靠性,有序性,消息丢了怎么办,Broker宕机了怎么办,如何实现消息的复制,消息队列集群,性能,消息积压等等
可以说消息队列的水深的压批
会涉及的知识:网络通信、并发编程、序列化和反序列化、幂等性、传输协议、批处理、数据压缩、缓存等等
说点什么
您将是第一位评论人!