From 02898b00e4ebe88601cde4400ad4bc75b3a8321c Mon Sep 17 00:00:00 2001 From: qyx <565485304@qq.com> Date: Thu, 5 Sep 2024 18:04:33 +0800 Subject: [PATCH] =?UTF-8?q?feat(master):kafka=20=E6=80=BB=E7=BB=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit kafka 的相关代码 --- .../SpringCloud-Stream.md | 4 +- .../pom.xml | 99 +++++++++++++++++++ .../main/java/org/example/Application.java | 14 +++ .../example/controller/KafkaController.java | 41 ++++++++ .../java/org/example/kafka/KafkaConfig.java | 81 +++++++++++++++ .../java/org/example/kafka/KafkaConsumer.java | 56 +++++++++++ .../java/org/example/kafka/KafkaProducer.java | 77 +++++++++++++++ .../main/java/org/example/vo/QMessage.java | 27 +++++ .../src/main/resources/application-dev.yml | 49 +++++++++ .../src/main/resources/bootstrap.yml | 16 +++ .../main/resources/http/kafka-controller.http | 7 ++ .../resources/http/rocket-mq-controller.http | 15 +++ .../resources/http/transactional-lose.http | 7 ++ pom.xml | 1 + 14 files changed, 492 insertions(+), 2 deletions(-) rename dev-protocol-springcloud/{dev-protocol-springcloud-stream => dev-protocol-springcloud-message-study}/SpringCloud-Stream.md (99%) create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/pom.xml create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http create mode 100644 dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-stream/SpringCloud-Stream.md b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md similarity index 99% rename from dev-protocol-springcloud/dev-protocol-springcloud-stream/SpringCloud-Stream.md rename to dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md index fd4d062..f15be38 100644 --- a/dev-protocol-springcloud/dev-protocol-springcloud-stream/SpringCloud-Stream.md +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/SpringCloud-Stream.md @@ -3,10 +3,10 @@ ### SpringBoot 集成 Kafka 构建消息驱动微服务 - +- ### SpringBoot 集成 RocketMQ 构建消息驱动微服务 - +- ### SpringCloud Stream 消息驱动组件概览 diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/pom.xml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/pom.xml new file mode 100644 index 0000000..6a62ee6 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/pom.xml @@ -0,0 +1,99 @@ + + + 4.0.0 + + org.example + dev-protocol + 1.0-SNAPSHOT + ../../pom.xml + + + dev-protocol-springcloud-message-study + 1.0-SNAPSHOT + jar + + dev-protocol-springcloud-message-study + MQ 学习实践 + + + 8 + 8 + UTF-8 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.projectlombok + lombok + true + + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-test + test + + + + org.springframework.cloud + spring-cloud-context + 2.2.6.RELEASE + + + + org.springframework.kafka + spring-kafka + 2.5.0.RELEASE + + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.1.0 + + + org.apache.commons + commons-lang3 + 3.11 + + + + mysql + mysql-connector-java + 8.0.19 + + + + org.springframework.boot + spring-boot-starter-data-jpa + + + + + ${artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + + \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java new file mode 100644 index 0000000..5d8dbdc --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/Application.java @@ -0,0 +1,14 @@ +package org.example; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + *

工程启动入口

+ * */ +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} \ No newline at end of file diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java new file mode 100644 index 0000000..43daacc --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/controller/KafkaController.java @@ -0,0 +1,41 @@ +package org.example.controller; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.example.kafka.KafkaProducer; +import org.example.vo.QMessage; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + *

SpringBoot 集成 kafka 发送消息

+ * */ +@Slf4j +@RestController +@RequestMapping("/kafka") +public class KafkaController { + + private final ObjectMapper mapper; + private final KafkaProducer kafkaProducer; + + public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) { + this.mapper = mapper; + this.kafkaProducer = kafkaProducer; + } + + /** + *

发送 kafka 消息

+ * */ + @GetMapping("/send-message") + public void sendMessage(@RequestParam(required = false) String key, + @RequestParam String topic) throws Exception { + + QMessage message = new QMessage( + 1, + "q-Study-Message" + ); + kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic); + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java new file mode 100644 index 0000000..2882d17 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConfig.java @@ -0,0 +1,81 @@ +package org.example.kafka; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.*; + +import java.util.HashMap; +import java.util.Map; + +/** + *

通过代码自定义 Kafka 配置

+ * + * 复杂不会怎么更改的配置信息最好以代码的方式进行配置 + * 一般一些常用的改动的配置, 直接放在配置文件中 + * */ +@Configuration +public class KafkaConfig { + + @Value("${spring.kafka.bootstrap-servers}") + private String bootstrapServers; + + /** + *

Kafka Producer 工厂类配置

+ * */ + @Bean + public ProducerFactory producerFactory() { + + Map configs = new HashMap<>(); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new DefaultKafkaProducerFactory<>(configs); + } + + /** + *

Kafka Producer 客户端

+ * */ + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + /** + *

Kafka Consumer 工厂类配置

+ * */ + @Bean + public ConsumerFactory consumerFactory() { + + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + // 因为配置为拉取模式, 最多拉取 50条记录 + props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(props); + } + + /** + *

Kafka Consumer 监听器工厂类配置

+ * */ + @Bean + public ConcurrentKafkaListenerContainerFactory + kafkaListenerContainerFactory() { + + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + // 并发数就是一个消费者实例起几个线程 + factory.setConcurrency(3); + factory.setConsumerFactory(consumerFactory()); + + return factory; + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java new file mode 100644 index 0000000..1d911ae --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaConsumer.java @@ -0,0 +1,56 @@ +package org.example.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.example.vo.QMessage; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +import java.util.Optional; + +/** + *

Kafka 消费者

+ * */ +@Slf4j +@Component +public class KafkaConsumer { + + private final ObjectMapper mapper; + + public KafkaConsumer(ObjectMapper mapper) { + this.mapper = mapper; + } + + /** + *

监听 Kafka 消息并消费

+ * */ + @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka") + public void listener01(ConsumerRecord record) throws Exception { + + String key = record.key(); + String value = record.value(); + + QMessage kafkaMessage = mapper.readValue(value, QMessage.class); + log.info("in listener01 consume kafka message: [{}], [{}]", + key, mapper.writeValueAsString(kafkaMessage)); + } + + /** + *

监听 Kafka 消息并消费

+ * 不知道发送的类是什么类型的时候发送的 + * */ + @KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1") + public void listener02(ConsumerRecord record) throws Exception { + + Optional _kafkaMessage = Optional.ofNullable(record.value()); + if (_kafkaMessage.isPresent()) { + Object message = _kafkaMessage.get(); + // 如果不能确定类型的时候, 下面的代码要进行包装 + QMessage kafkaMessage = mapper.readValue(message.toString(), + QMessage.class); + log.info("in listener02 consume kafka message: [{}]", + mapper.writeValueAsString(kafkaMessage)); + } + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java new file mode 100644 index 0000000..b63987c --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/kafka/KafkaProducer.java @@ -0,0 +1,77 @@ +package org.example.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.stereotype.Component; +import org.springframework.util.concurrent.ListenableFuture; + +import java.util.concurrent.TimeUnit; + +/** + *

kafka 生产者

+ * */ +@Slf4j +@Component +public class KafkaProducer { + + private final KafkaTemplate kafkaTemplate; + + public KafkaProducer(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + /** + *

发送 kafka 消息

+ * */ + public void sendMessage(String key, String value, String topic) { + + if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) { + throw new IllegalArgumentException("value or topic is null or empty"); + } + + ListenableFuture> future = StringUtils.isBlank(key) ? + kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value); + + // 异步回调的方式获取通知 + future.addCallback( + success -> { + // 元数据信息不为空 + assert null != success && null != success.getRecordMetadata(); + // 发送到 kafka 的 topic + String _topic = success.getRecordMetadata().topic(); + // 消息发送到的分区 + int partition = success.getRecordMetadata().partition(); + // 消息在分区内的 offset + long offset = success.getRecordMetadata().offset(); + + log.info("send kafka message success: [{}], [{}], [{}]", + _topic, partition, offset); + }, failure -> { + log.error("send kafka message failure: [{}], [{}], [{}]", + key, value, topic); + } + ); + // future 支持多次获取消息, 不需要重新发送消息 + + // 同步等待的方式获取通知 + try { +// SendResult sendResult = future.get(); + SendResult sendResult = future.get(5, TimeUnit.SECONDS); + + // 发送到 kafka 的 topic + String _topic = sendResult.getRecordMetadata().topic(); + // 消息发送到的分区 + int partition = sendResult.getRecordMetadata().partition(); + // 消息在分区内的 offset + long offset = sendResult.getRecordMetadata().offset(); + + log.info("send kafka message success: [{}], [{}], [{}]", + _topic, partition, offset); + } catch (Exception ex) { + log.error("send kafka message failure: [{}], [{}], [{}]", + key, value, topic); + } + } +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java new file mode 100644 index 0000000..7d4c20f --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/java/org/example/vo/QMessage.java @@ -0,0 +1,27 @@ +package org.example.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + *

通过 Kafka 传递的消息对象

+ * */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class QMessage { + + /** + * 唯一的标识一个消息对象 + */ + private Integer id; + + /** + * 项目名称 + */ + private String projectName; + + // todo 自己进行扩展 + +} diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml new file mode 100644 index 0000000..9695606 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/application-dev.yml @@ -0,0 +1,49 @@ +server: + port: 8001 + servlet: + context-path: /dev-protocol-springcloud-message-study + +spring: + # SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers + kafka: + bootstrap-servers: 127.0.0.1:9092 + jpa: + show-sql: true + hibernate: + ddl-auto: none + properties: + hibernate.show_sql: true + hibernate.format_sql: true + open-in-view: false + datasource: + # 数据源 + url: jdbc:mysql://127.0.0.1:3306/dev_protocol_springcloud_project?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8 + username: root + password: root + type: com.zaxxer.hikari.HikariDataSource + driver-class-name: com.mysql.cj.jdbc.Driver + # 连接池 + hikari: + maximum-pool-size: 8 + minimum-idle: 4 + idle-timeout: 30000 + connection-timeout: 30000 + max-lifetime: 45000 + auto-commit: true + pool-name: devProtocolSpringcloudHikariCP +# consumer: + # 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成 +# group-id: imooc-study-ecommerce +# auto-offset-reset: latest +# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer +# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer +# producer: +# key-serializer: org.apache.kafka.common.serialization.StringSerializer +# value-serializer: org.apache.kafka.common.serialization.StringSerializer + +# RocketMQ 的配置, 这是最低配置 +rocketmq: + name-server: 127.0.0.1:9876 + producer: + # 发送同一类消息的设置为同一个 group, 保证唯一 + group: dev-protocol-springcloud-message-study diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..2366e3d --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/bootstrap.yml @@ -0,0 +1,16 @@ +spring: + profiles: + # prod, dev + active: dev + application: + name: dev-protocol-springcloud-message-study + +# 暴露端点 +management: + endpoints: + web: + exposure: + include: '*' + endpoint: + health: + show-details: always diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http new file mode 100644 index 0000000..6d183bd --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/kafka-controller.http @@ -0,0 +1,7 @@ +### kafka-send-message +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?key=qqq&topic=q-springboot +Content-Type: application/json + +### kafka-send-message - (测试kafka 支持无key消息, key 是用来分区的) +GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?topic=q-springboot +Content-Type: application/json diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http new file mode 100644 index 0000000..3fd0c0b --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/rocket-mq-controller.http @@ -0,0 +1,15 @@ +### message-with-value +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-value +Content-Type: application/json + +### message-with-key +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-key +Content-Type: application/json + +### message-with-tag +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-tag +Content-Type: application/json + +### message-with-all +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-all +Content-Type: application/json diff --git a/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http new file mode 100644 index 0000000..de7d991 --- /dev/null +++ b/dev-protocol-springcloud/dev-protocol-springcloud-message-study/src/main/resources/http/transactional-lose.http @@ -0,0 +1,7 @@ +### wrong-rollback-for +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-rollback-for +Content-Type: application/json + +### wrong-inner-call +GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-inner-call +Content-Type: application/json diff --git a/pom.xml b/pom.xml index d1d50db..fb471b6 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ dev-protocol-springcloud/dev-protocol-springcloud-project-goods-service dev-protocol-springcloud/dev-protocol-springcloud-hystrix dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard + dev-protocol-springcloud/dev-protocol-springcloud-message-study