Springcloud alibaba 2023.0.1.2 集成 cp-kafka 7.6.0 示例

1. 项目依赖

        <!-- Spring Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

2.  配置 Kafka 连接信息(application.yml)

# Spring
spring:
kafka:
# Kafka broker 地址,根据你的 cp-kafka 集群配置填写
bootstrap-servers: 172.18.214.116:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-kafka-group
auto-offset-reset: earliest # 从最早的消息开始消费,可选 [earliest, latest, none]
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. Kafka 消息生产者(Producer)

创建一个 Kafka 消息生产者服务,用于发送消息到 Kafka Topic。

package com.example.demo.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {

    private static final String TOPIC = "demo-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        System.out.println("Producing message: " + message);
        kafkaTemplate.send(TOPIC, message);
    }
}

4. Kafka 消息消费者(Consumer)

创建一个 Kafka 消息消费者,监听指定 topic 并消费消息。

package com.example.demo.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);

@KafkaListener(topics = "demo-topic", groupId = "my-kafka-group")
public void listen(String message) {
logger.info("Received Message: {}", message);
}
}


📌 说明:

@KafkaListener(topics = "demo-topic")表示监听名为 demo-topic的主题。

groupId必须与 application.yml中配置的 spring.kafka.consumer.group-id一致,或者直接在这里覆盖。

5. 启动类 & 测试 Controller(可选)

5.1. 主启动类

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}

5.2. 测试 Controller(通过 HTTP 触发发消息,可选)

如果你想通过 HTTP 接口发送一条消息到 Kafka,可以加一个简单的 REST Controller:

package com.example.demo.controller;

import com.example.demo.producer.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam String message) {
        kafkaProducerService.sendMessage(message);
        return "Message sent: " + message;
    }
}

然后访问(前提是项目端口为8080):

👉 http://localhost:8080/send?message=HelloKafka760

即可发送一条消息到 Kafka,消费者会监听并打印出来。

6. 常见问题

配置完启动提示:

2025-08-21T16:31:39.950+08:00 WARN 48296 — [ruoyi-test] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-kafka-group-1, groupId=my-kafka-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Node may not be available.

由于我是用docker compose方式安装的kafka, 所以需要修改配置文件:

修改docker-compose.yml文件,将 KAFKA_ADVERTISED_LISTENERS改为:​

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://172.18.214.116:9092

重新启动 Kafka 容器(重新构建并启动 compose):​

docker-compose down
docker-compose up -d

后即可解决

Previous Post

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注