目录
Kafka部署
参考之前的文章,搭建单机伪分布式ZK集群和Kafka集群
搞事情的过程中遇到过2问题,有点恶心
Java客户端连不上Broker
发现是kafka broker启动配置有问题
需要配置下 advertised.listeners
是向ZK注册时暴露的ip:port
如果只配置了 listeners 服务器本地host,远程是连不上的。。。
这个问题解决后,又出了个问题,一个实例起了3 Kafka, 资本家听了都流泪,机器压榨的太厉害了,何况我这个实例就2G内存
就导致了,3个broker, 总有一个会被挤掉。。。麻了,内存利用率打到了80%~90%,后面看看在哪再买个便宜点的云服务器,好几把贵
我们用kafka提供的脚本创建topic写些demo
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --partitions 2 --replication-factor 2 --topic test-1
查看topic详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-1
查看消费者组消费进度
bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092,localhost:9093 --group order-create
原生Kafka Java Client
maven依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
package pers.wmx.springbootfreemarkerdemo.kafka;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-16
*/
@Slf4j
public class SimpleKafkaProducer {
private static final String SERVER_LIST = "localhost:9092,localhost:9093";
private static final String TOPIC_NAME = "xinye";
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "888");
Future<RecordMetadata> result = producer.send(record);
RecordMetadata recordMetadata = result.get();
int partition = recordMetadata.partition();
long offset = recordMetadata.offset();
log.info("produce msg | partition:{}, offset:{}", partition, offset);
}
}
消费者-自动提交
package pers.wmx.springbootfreemarkerdemo.kafka;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-16
*/
@Slf4j
public class SimpleKafkaConsumer {
private static final String SERVER_LIST = "localhost:9092,localhost:9093";
private static final String TOPIC_NAME = "xinye";
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "WMX");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 设置offset自动提交
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 消费指定topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// TODO
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// TODO
}
});
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(0));
if (!records.isEmpty()) {
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while(iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
log.info("consume msg | key:{}, value:{}, partition:{}, offset:{}",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
}
消费者-手动提交
package pers.wmx.springbootfreemarkerdemo.kafka;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-16
*/
@Slf4j
public class SimpleKafkaConsumer1 {
private static final String SERVER_LIST = "localhost:9092,localhost:9093";
private static final String TOPIC_NAME = "xinye";
// 再写一个手动提交的Consumer
// 自动提交有丢失数据和重复消费的风险
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "WMX");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 设置offset自动提交
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 消费指定topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// TODO
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// TODO
}
});
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(0));
if (!records.isEmpty()) {
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while(iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
// handle msg ...
log.info("consume msg | key:{}, value:{}, partition:{}, offset:{}",
record.key(), record.value(), record.partition(), record.offset());
}
// 批次处理完手动提交一波
// 这里的手动提交策略,还可以处理一条提交一条更安全
// 批次处理的话,如果中间挂了,下次又重新拉取会重复消费
consumer.commitSync();
}
}
}
}
再写个Consumer支持分区粒度的手动提交消费进度
package pers.wmx.springbootfreemarkerdemo.kafka;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-16
*/
@Slf4j
public class SimpleKafkaConsumer2 {
private static final String SERVER_LIST = "39.97.47.254:9092,39.97.47.254:9093";
private static final String TOPIC_NAME = "xinye";
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER_LIST);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 设置消费组
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "WMX");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
// 设置offset自动提交
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 消费指定topic
consumer.subscribe(Collections.singletonList(TOPIC_NAME), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// TODO
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// TODO
}
});
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(0));
if (!records.isEmpty()) {
// 按分区粒度去处理和提交
Set<TopicPartition> partitions = records.partitions();
for (TopicPartition partition : partitions) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
Iterator<ConsumerRecord<String, String>> pIterator = partitionRecords.iterator();
while (pIterator.hasNext()) {
ConsumerRecord<String, String> next = pIterator.next();
// handle msg
log.info("consume msg | key:{}, value:{}, partition:{}, offset:{}",
next.key(), next.value(), next.partition(), next.offset());
}
// 提交分区最后一条数据
ConsumerRecord<String, String> lastRecord = partitionRecords.get(partitionRecords.size() - 1);
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastRecord.offset());
HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
map.put(partition, offsetAndMetadata);
consumer.commitSync(map);
}
}
}
}
}
可以写写demo自己跑跑
Spring Boot整合 Kafka
maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
引入了spring-kafka
就不需要引入 kafka-clients
了
创建一个配置文件 application.yaml
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093
producer:
acks: 1
retries: 3
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
enable-auto-commit: true
properties:
spring:
json:
trusted:
packages: pers.wmx.springbootfreemarkerdemo.kafka
定义一个消息
package pers.wmx.springbootfreemarkerdemo.kafka;
/**
* @author wangmingxin03
* Created on 2021-12-15
*/
public class OrderCreateMsg {
int orderId;
long userId;
public OrderCreateMsg() {
}
public OrderCreateMsg(int orderId, long userId) {
this.orderId = orderId;
this.userId = userId;
}
public int getOrderId() {
return orderId;
}
public void setOrderId(int orderId) {
this.orderId = orderId;
}
public long getUserId() {
return userId;
}
public void setUserId(long userId) {
this.userId = userId;
}
}
生产者
package pers.wmx.springbootfreemarkerdemo.kafka;
import java.util.concurrent.ExecutionException;
import javax.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
/**
* @author wangmingxin03
* Created on 2021-12-15
*/
@Service
public class KafkaProducerHelper {
private static final String TOPIC_NAME = "test-1";
@Resource
private KafkaTemplate<Object, Object> kafkaTemplate;
public SendResult send(OrderCreateMsg orderCreateMsg) throws Exception {
return kafkaTemplate.send(TOPIC_NAME, orderCreateMsg).get();
}
public ListenableFuture<SendResult<Object, Object>> asyncSend(OrderCreateMsg orderCreateMsg) {
return kafkaTemplate.send(TOPIC_NAME, orderCreateMsg);
}
}
package pers.wmx.springbootfreemarkerdemo.kafka;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.support.SendResult;
import org.springframework.test.context.junit4.SpringRunner;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-15
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class KafkaTest {
@Autowired
private KafkaProducerHelper kafkaProducerHelper;
@Test
public void testSend() throws Exception {
OrderCreateMsg msg = new OrderCreateMsg(2, 234);
SendResult sendResult = kafkaProducerHelper.send(msg);
log.info("sendResult:{}", sendResult);
}
}
再起2消费者
package pers.wmx.springbootfreemarkerdemo.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-15
*/
@Slf4j
@Service
public class KafkaConsumerHelper {
private static final String TOPIC_NAME = "test-1";
// 直接把Spring Boot应用起起来等待消费就行啦
@KafkaListener(topics = TOPIC_NAME, groupId = "order-create")
public void onMessage(OrderCreateMsg orderCreateMsg) {
log.info("consume msg:{}", JSON.toJSONString(orderCreateMsg));
}
}
package pers.wmx.springbootfreemarkerdemo.kafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* @author wangmingxin03
* Created on 2021-12-16
*/
@Service
@Slf4j
public class KafkaAConsumerHelper {
private static final String TOPIC_NAME = "test-1";
// 直接把Spring Boot应用起起来等待消费就行啦
// 再起一个Consumer,2个Consumer会分配
@KafkaListener(topics = TOPIC_NAME, groupId = "order-create")
public void onMessage(OrderCreateMsg orderCreateMsg) {
log.info("KafkaAConsumerHelper consume msg:{}", JSON.toJSONString(orderCreateMsg));
}
}
参考文档
https://cloud.tencent.com/developer/article/1632139
https://blog.csdn.net/maoyuanming0806/article/details/80555979
https://juejin.cn/post/6982702523853307935
https://blog.csdn.net/qq_39331255/article/details/109595370
转载请注明:汪明鑫的个人博客 » Kafka Java API
说点什么
您将是第一位评论人!