Springcloud alibaba 2023.0.1.2 集成 cp-kafka 7.6.0 示例
- vicentz
- 0
- Posted on
1. 项目依赖
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
2. 配置 Kafka 连接信息(application.yml)
# Spring
spring:
kafka:
# Kafka broker 地址,根据你的 cp-kafka 集群配置填写
bootstrap-servers: 172.18.214.116:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-kafka-group
auto-offset-reset: earliest # 从最早的消息开始消费,可选 [earliest, latest, none]
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. Kafka 消息生产者(Producer)
创建一个 Kafka 消息生产者服务,用于发送消息到 Kafka Topic。
package com.example.demo.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducerService {
private static final String TOPIC = "demo-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
System.out.println("Producing message: " + message);
kafkaTemplate.send(TOPIC, message);
}
}
4. Kafka 消息消费者(Consumer)
创建一个 Kafka 消息消费者,监听指定 topic 并消费消息。
package com.example.demo.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);
@KafkaListener(topics = "demo-topic", groupId = "my-kafka-group")
public void listen(String message) {
logger.info("Received Message: {}", message);
}
}
📌 说明:
@KafkaListener(topics = "demo-topic")表示监听名为 demo-topic的主题。
groupId必须与 application.yml中配置的 spring.kafka.consumer.group-id一致,或者直接在这里覆盖。
5. 启动类 & 测试 Controller(可选)
5.1. 主启动类
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
5.2. 测试 Controller(通过 HTTP 触发发消息,可选)
如果你想通过 HTTP 接口发送一条消息到 Kafka,可以加一个简单的 REST Controller:
package com.example.demo.controller;
import com.example.demo.producer.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
@Autowired
private KafkaProducerService kafkaProducerService;
@GetMapping("/send")
public String sendMessage(@RequestParam String message) {
kafkaProducerService.sendMessage(message);
return "Message sent: " + message;
}
}
然后访问(前提是项目端口为8080):
👉 http://localhost:8080/send?message=HelloKafka760

即可发送一条消息到 Kafka,消费者会监听并打印出来。


6. 常见问题
配置完启动提示:
2025-08-21T16:31:39.950+08:00 WARN 48296 — [ruoyi-test] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-kafka-group-1, groupId=my-kafka-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.
由于我是用docker compose方式安装的kafka, 所以需要修改配置文件:
修改docker-compose.yml文件,将 KAFKA_ADVERTISED_LISTENERS改为:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://172.18.214.116:9092

重新启动 Kafka 容器(重新构建并启动 compose):
docker-compose down
docker-compose up -d
后即可解决