Welcome everyone

Kafka Java API

消息队列 汪明鑫 706浏览 0评论

Kafka部署

参考之前的文章,搭建单机伪分布式ZK集群和Kafka集群

 

 

 

 

 

搞事情的过程中遇到过2问题,有点恶心

Java客户端连不上Broker

发现是kafka broker启动配置有问题

 

 

需要配置下 advertised.listeners

是向ZK注册时暴露的ip:port

如果只配置了 listeners 服务器本地host,远程是连不上的。。。

 

这个问题解决后,又出了个问题,一个实例起了3 Kafka, 资本家听了都流泪,机器压榨的太厉害了,何况我这个实例就2G内存

就导致了,3个broker, 总有一个会被挤掉。。。麻了,内存利用率打到了80%~90%,后面看看在哪再买个便宜点的云服务器,好几把贵

 

我们用kafka提供的脚本创建topic写些demo

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --partitions 2 --replication-factor 2 --topic test-1

 

查看topic详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-1

 

查看消费者组消费进度

bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092,localhost:9093 --group order-create 

 

原生Kafka Java Client

maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

 

package pers.wmx.springbootfreemarkerdemo.kafka;


import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-16
 */
@Slf4j
public class SimpleKafkaProducer {
    private static final String SERVER_LIST = "localhost:9092,localhost:9093";

    private static final String TOPIC_NAME = "xinye";

    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "888");
        Future<RecordMetadata> result = producer.send(record);

        RecordMetadata recordMetadata = result.get();
        int partition = recordMetadata.partition();
        long offset = recordMetadata.offset();
        log.info("produce msg | partition:{}, offset:{}", partition, offset);
    }
}

 

消费者-自动提交

package pers.wmx.springbootfreemarkerdemo.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-16
 */
@Slf4j
public class SimpleKafkaConsumer {
    private static final String SERVER_LIST = "localhost:9092,localhost:9093";

    private static final String TOPIC_NAME = "xinye";

    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 设置消费组
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "WMX");

        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        // 设置offset自动提交
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 消费指定topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // TODO
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // TODO
            }
        });

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(0));

            if (!records.isEmpty()) {
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                while(iterator.hasNext()) {
                    ConsumerRecord<String, String> record = iterator.next();
                    log.info("consume msg | key:{}, value:{}, partition:{}, offset:{}",
                            record.key(), record.value(), record.partition(), record.offset());
                }
            }
        }

    }
}

 

消费者-手动提交

package pers.wmx.springbootfreemarkerdemo.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-16
 */
@Slf4j
public class SimpleKafkaConsumer1 {
    private static final String SERVER_LIST = "localhost:9092,localhost:9093";

    private static final String TOPIC_NAME = "xinye";

    // 再写一个手动提交的Consumer
    // 自动提交有丢失数据和重复消费的风险
    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 设置消费组
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "WMX");

        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        // 设置offset自动提交
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 消费指定topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // TODO
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // TODO
            }
        });

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(0));

            if (!records.isEmpty()) {
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();

                while(iterator.hasNext()) {
                    ConsumerRecord<String, String> record = iterator.next();

                    // handle msg ...

                    log.info("consume msg | key:{}, value:{}, partition:{}, offset:{}",
                            record.key(), record.value(), record.partition(), record.offset());
                }

                // 批次处理完手动提交一波
                // 这里的手动提交策略,还可以处理一条提交一条更安全
                // 批次处理的话,如果中间挂了,下次又重新拉取会重复消费
                consumer.commitSync();
            }
        }

    }
}

 

再写个Consumer支持分区粒度的手动提交消费进度

package pers.wmx.springbootfreemarkerdemo.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-16
 */
@Slf4j
public class SimpleKafkaConsumer2 {
    private static final String SERVER_LIST = "39.97.47.254:9092,39.97.47.254:9093";

    private static final String TOPIC_NAME = "xinye";

    public static void main(String[] args) {
        Properties properties = new Properties();

        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 设置消费组
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "WMX");

        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        // 设置offset自动提交
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // 消费指定topic
        consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // TODO
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                // TODO
            }
        });

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(0));

            if (!records.isEmpty()) {

                // 按分区粒度去处理和提交
                Set<TopicPartition> partitions = records.partitions();
                for (TopicPartition partition : partitions) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    Iterator<ConsumerRecord<String, String>> pIterator = partitionRecords.iterator();

                    while (pIterator.hasNext()) {
                        ConsumerRecord<String, String> next = pIterator.next();

                        // handle msg
                        log.info("consume msg | key:{}, value:{}, partition:{}, offset:{}",
                                next.key(), next.value(), next.partition(), next.offset());
                    }

                    // 提交分区最后一条数据
                    ConsumerRecord<String, String> lastRecord = partitionRecords.get(partitionRecords.size() - 1);
                    OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastRecord.offset());

                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(partition, offsetAndMetadata);
                    consumer.commitSync(map);
                }

            }
        }

    }
}

 

可以写写demo自己跑跑

 

Spring  Boot整合 Kafka

maven依赖

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

 

引入了spring-kafka 就不需要引入 kafka-clients 了

 

创建一个配置文件 application.yaml

spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093
    producer:
      acks: 1
      retries: 3
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: true
      properties:
        spring:
          json:
            trusted:
              packages: pers.wmx.springbootfreemarkerdemo.kafka

 

定义一个消息

package pers.wmx.springbootfreemarkerdemo.kafka;

/**
 * @author wangmingxin03
 * Created on 2021-12-15
 */
public class OrderCreateMsg {
    int orderId;

    long userId;

    public OrderCreateMsg() {
    }

    public OrderCreateMsg(int orderId, long userId) {
        this.orderId = orderId;
        this.userId = userId;
    }

    public int getOrderId() {
        return orderId;
    }

    public void setOrderId(int orderId) {
        this.orderId = orderId;
    }

    public long getUserId() {
        return userId;
    }

    public void setUserId(long userId) {
        this.userId = userId;
    }
}

 

生产者

package pers.wmx.springbootfreemarkerdemo.kafka;

import java.util.concurrent.ExecutionException;

import javax.annotation.Resource;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

/**
 * @author wangmingxin03
 * Created on 2021-12-15
 */
@Service
public class KafkaProducerHelper {
    private static final String TOPIC_NAME = "test-1";

    @Resource
    private KafkaTemplate<Object, Object> kafkaTemplate;

    public SendResult send(OrderCreateMsg orderCreateMsg) throws Exception {
        return kafkaTemplate.send(TOPIC_NAME, orderCreateMsg).get();
    }

    public ListenableFuture<SendResult<Object, Object>> asyncSend(OrderCreateMsg orderCreateMsg) {
        return kafkaTemplate.send(TOPIC_NAME, orderCreateMsg);
    }

}
package pers.wmx.springbootfreemarkerdemo.kafka;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-15
 */
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class KafkaTest {
    @Autowired
    private KafkaProducerHelper kafkaProducerHelper;

    @Test
    public void testSend() throws Exception {
        OrderCreateMsg msg = new OrderCreateMsg(2, 234);
        SendResult sendResult = kafkaProducerHelper.send(msg);
        log.info("sendResult:{}", sendResult);
    }

}

 

再起2消费者

package pers.wmx.springbootfreemarkerdemo.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-15
 */
@Slf4j
@Service
public class KafkaConsumerHelper {
    private static final String TOPIC_NAME = "test-1";

    // 直接把Spring Boot应用起起来等待消费就行啦
    @KafkaListener(topics = TOPIC_NAME, groupId = "order-create")
    public void onMessage(OrderCreateMsg orderCreateMsg) {
        log.info("consume msg:{}", JSON.toJSONString(orderCreateMsg));
    }
}

 

package pers.wmx.springbootfreemarkerdemo.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import com.alibaba.fastjson.JSON;

import lombok.extern.slf4j.Slf4j;

/**
 * @author wangmingxin03
 * Created on 2021-12-16
 */
@Service
@Slf4j
public class KafkaAConsumerHelper {
    private static final String TOPIC_NAME = "test-1";

    // 直接把Spring Boot应用起起来等待消费就行啦
    // 再起一个Consumer,2个Consumer会分配
    @KafkaListener(topics = TOPIC_NAME, groupId = "order-create")
    public void onMessage(OrderCreateMsg orderCreateMsg) {
        log.info("KafkaAConsumerHelper consume msg:{}", JSON.toJSONString(orderCreateMsg));
    }
}

 

 

参考文档

https://cloud.tencent.com/developer/article/1632139

https://blog.csdn.net/maoyuanming0806/article/details/80555979

https://juejin.cn/post/6982702523853307935

https://blog.csdn.net/qq_39331255/article/details/109595370

 

转载请注明:汪明鑫的个人博客 » Kafka Java API

喜欢 (3)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz