kafka消息堆积能力比较强,可以堆积上亿的消息,特别适合日志处理这种实时性要求不太高的场景springboot日志配置及输出,同时支持集群部署springboot日志配置及输出,相比redis堆积能力和可靠性更高

可以通过下面的步骤快速上手这个kafka

获取一个可用的kafka实例

可以使用docker一键启动一个kafka集群

git clone https://github.com/simplesteph/kafka-stack-docker-compose.git
cd kafka-stack-docker-compose
docker-compose -f full-stack.yml up -d

操作效果如下

springboot多模块配置_springboot配置tomcat_springboot日志配置及输出

使用命令docker-compose -f full-stack.yml ps获取可以kafka监听的端口

springboot日志配置及输出_springboot多模块配置_springboot配置tomcat

记下kafka监听的地址9092,这个后面会用到

8000端口是这个kafka的topic的ui界面,这个界面可以查看当前的topic列表,效果如下

springboot多模块配置_springboot配置tomcat_springboot日志配置及输出

这里也看到topic里保存的数据

准备案例项目

可以在创建测试项目

需要加上下面这三个包

spring-boot-starter-webspring-kafkalombok

在appliation.properties中配置kafka的地址和使用的group-id,这个group-id名称可以自行定义,比如:myconsumergroup

spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=myconsumergroup

用kafka客户端发送消息

使用一个spring boot的service封装kafka发送消息的代码,核心代码如下

package mykafka.service;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class Producer {
 private final KafkaTemplate kafkaTemplate;
 private String topic = "自行定义的topic";
 Producer(KafkaTemplate kafkaTemplate) {
 this.kafkaTemplate = kafkaTemplate;
 }
 public void send(String message) {
 this.kafkaTemplate.send(topic, message);
 System.out.println("Sent sample message [" + message + "] to " + topic);
 }
}

然后编写一个接口调用这个发送kafka消息的service,核心代码如下:

@RestController
@RequestMapping("/")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class MyController {
 private final Producer producer;
 @RequestMapping("/test1")
 public String test1() {
 producer.send(String.format("my message currentTimeMillis: %d", System.currentTimeMillis()));
 return "test1";
 }
}

注意:上面代码里使用的kafka的topic可以自行定义,比如mytopic

然后在浏览器中访问这个接口 ip:8080/test1

springboot多模块配置_springboot日志配置及输出_springboot配置tomcat

可以在这个kafka的topic的ui看到发送到kafka的消息

可以看到这个消息已经发送到kafka了

消费消息

消费消息只需要在方法上加上KafkaListener,并指定topic和groupId即可

核心代码如下

@KafkaListener(topics = "mytopic", groupId = "myconsumergroup")
public void processMessage(String message,
 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List partitions,
 @Header(KafkaHeaders.RECEIVED_TOPIC) List topics,
 @Header(KafkaHeaders.OFFSET) List offsets) {
 log.info(
 "received message, topic: {}, partition: {}, offset: {}, message: {}",
 topics.get(0),
 partitions.get(0),
 offsets.get(0),
 message
 );
}

操作效果如下:

可以看到已经成功收到了kafka里的消息其它客户端

一些注意的点发送消息和消费消息需要确保topic一致日志可以先发送到kafka做缓冲,然后通过kafka的客户端把消息取出来放到elk等日志存储系统中分析和可视化

因为kafka客户端发送消息和服务端把消息保存到磁盘都是异步操作,所以存在服务器宕机后消息可能丢失,如果可靠性要求更高,可以使用改进版的kafka:rocketmq

限 时 特 惠: 本站每日持续更新海量各大内部创业教程,一年会员只需98元,全站资源免费下载 点击查看详情
站 长 微 信: muyang-0410