feat(master):kafka 总结

kafka 的相关代码
master
土豆兄弟 2 weeks ago
parent d87b716ace
commit 02898b00e4

@ -3,10 +3,10 @@
### SpringBoot 集成 Kafka 构建消息驱动微服务 ### SpringBoot 集成 Kafka 构建消息驱动微服务
-
### SpringBoot 集成 RocketMQ 构建消息驱动微服务 ### SpringBoot 集成 RocketMQ 构建消息驱动微服务
-
### SpringCloud Stream 消息驱动组件概览 ### SpringCloud Stream 消息驱动组件概览

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.example</groupId>
<artifactId>dev-protocol</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>dev-protocol-springcloud-message-study</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>dev-protocol-springcloud-message-study</name>
<description>MQ 学习实践</description>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot 监控端点 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 让 SpringBoot 能够识别 bootstrap.yml -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.0.RELEASE</version>
</dependency>
<!-- RocketMQ 这个版本必须和你的 RocketMQ 版本匹配 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version> <!-- 匹配 4.8.0-->
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<!-- MySQL 驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.19</version>
</dependency>
<!-- Spring Data Jpa -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,14 @@
package org.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* <h1></h1>
* */
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}

@ -0,0 +1,41 @@
package org.example.controller;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.example.kafka.KafkaProducer;
import org.example.vo.QMessage;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* <h1>SpringBoot kafka </h1>
* */
@Slf4j
@RestController
@RequestMapping("/kafka")
public class KafkaController {
private final ObjectMapper mapper;
private final KafkaProducer kafkaProducer;
public KafkaController(ObjectMapper mapper, KafkaProducer kafkaProducer) {
this.mapper = mapper;
this.kafkaProducer = kafkaProducer;
}
/**
* <h2> kafka </h2>
* */
@GetMapping("/send-message")
public void sendMessage(@RequestParam(required = false) String key,
@RequestParam String topic) throws Exception {
QMessage message = new QMessage(
1,
"q-Study-Message"
);
kafkaProducer.sendMessage(key, mapper.writeValueAsString(message), topic);
}
}

@ -0,0 +1,81 @@
package org.example.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* <h1> Kafka </h1>
*
*
* ,
* */
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
/**
* <h2>Kafka Producer </h2>
* */
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
/**
* <h2>Kafka Producer </h2>
* */
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* <h2>Kafka Consumer </h2>
* */
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 因为配置为拉取模式, 最多拉取 50条记录
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* <h2>Kafka Consumer </h2>
* */
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
// 并发数就是一个消费者实例起几个线程
factory.setConcurrency(3);
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

@ -0,0 +1,56 @@
package org.example.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.example.vo.QMessage;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* <h1>Kafka </h1>
* */
@Slf4j
@Component
public class KafkaConsumer {
private final ObjectMapper mapper;
public KafkaConsumer(ObjectMapper mapper) {
this.mapper = mapper;
}
/**
* <h2> Kafka </h2>
* */
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka")
public void listener01(ConsumerRecord<String, String> record) throws Exception {
String key = record.key();
String value = record.value();
QMessage kafkaMessage = mapper.readValue(value, QMessage.class);
log.info("in listener01 consume kafka message: [{}], [{}]",
key, mapper.writeValueAsString(kafkaMessage));
}
/**
* <h2> Kafka </h2>
*
* */
@KafkaListener(topics = {"qinyi-springboot"}, groupId = "qinyi-springboot-kafka-1")
public void listener02(ConsumerRecord<?, ?> record) throws Exception {
Optional<?> _kafkaMessage = Optional.ofNullable(record.value());
if (_kafkaMessage.isPresent()) {
Object message = _kafkaMessage.get();
// 如果不能确定类型的时候, 下面的代码要进行包装
QMessage kafkaMessage = mapper.readValue(message.toString(),
QMessage.class);
log.info("in listener02 consume kafka message: [{}]",
mapper.writeValueAsString(kafkaMessage));
}
}
}

@ -0,0 +1,77 @@
package org.example.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
/**
* <h1>kafka </h1>
* */
@Slf4j
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* <h2> kafka </h2>
* */
public void sendMessage(String key, String value, String topic) {
if (StringUtils.isBlank(value) || StringUtils.isBlank(topic)) {
throw new IllegalArgumentException("value or topic is null or empty");
}
ListenableFuture<SendResult<String, String>> future = StringUtils.isBlank(key) ?
kafkaTemplate.send(topic, value) : kafkaTemplate.send(topic, key, value);
// 异步回调的方式获取通知
future.addCallback(
success -> {
// 元数据信息不为空
assert null != success && null != success.getRecordMetadata();
// 发送到 kafka 的 topic
String _topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = success.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]",
_topic, partition, offset);
}, failure -> {
log.error("send kafka message failure: [{}], [{}], [{}]",
key, value, topic);
}
);
// future 支持多次获取消息, 不需要重新发送消息
// 同步等待的方式获取通知
try {
// SendResult<String, String> sendResult = future.get();
SendResult<String, String> sendResult = future.get(5, TimeUnit.SECONDS);
// 发送到 kafka 的 topic
String _topic = sendResult.getRecordMetadata().topic();
// 消息发送到的分区
int partition = sendResult.getRecordMetadata().partition();
// 消息在分区内的 offset
long offset = sendResult.getRecordMetadata().offset();
log.info("send kafka message success: [{}], [{}], [{}]",
_topic, partition, offset);
} catch (Exception ex) {
log.error("send kafka message failure: [{}], [{}], [{}]",
key, value, topic);
}
}
}

@ -0,0 +1,27 @@
package org.example.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* <h1> Kafka </h1>
* */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class QMessage {
/**
*
*/
private Integer id;
/**
*
*/
private String projectName;
// todo 自己进行扩展
}

@ -0,0 +1,49 @@
server:
port: 8001
servlet:
context-path: /dev-protocol-springcloud-message-study
spring:
# SpringBoot 集成 Kafka 的配置, 最低配置只需要配置 spring.kafka.bootstrap-servers
kafka:
bootstrap-servers: 127.0.0.1:9092
jpa:
show-sql: true
hibernate:
ddl-auto: none
properties:
hibernate.show_sql: true
hibernate.format_sql: true
open-in-view: false
datasource:
# 数据源
url: jdbc:mysql://127.0.0.1:3306/dev_protocol_springcloud_project?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: root
type: com.zaxxer.hikari.HikariDataSource
driver-class-name: com.mysql.cj.jdbc.Driver
# 连接池
hikari:
maximum-pool-size: 8
minimum-idle: 4
idle-timeout: 30000
connection-timeout: 30000
max-lifetime: 45000
auto-commit: true
pool-name: devProtocolSpringcloudHikariCP
# consumer:
# 如果 Consumer 没有指定 group-id, 则使用配置文件中配置的; 如果配置文件中也没有定义, 则由框架随机生成
# group-id: imooc-study-ecommerce
# auto-offset-reset: latest
# key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# producer:
# key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
# RocketMQ 的配置, 这是最低配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 发送同一类消息的设置为同一个 group, 保证唯一
group: dev-protocol-springcloud-message-study

@ -0,0 +1,16 @@
spring:
profiles:
# prod, dev
active: dev
application:
name: dev-protocol-springcloud-message-study
# 暴露端点
management:
endpoints:
web:
exposure:
include: '*'
endpoint:
health:
show-details: always

@ -0,0 +1,7 @@
### kafka-send-message
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?key=qqq&topic=q-springboot
Content-Type: application/json
### kafka-send-message - (测试kafka 支持无key消息, key 是用来分区的)
GET http://127.0.0.1:8001/dev-protocol-springcloud-message-study/kafka/send-message?topic=q-springboot
Content-Type: application/json

@ -0,0 +1,15 @@
### message-with-value
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-value
Content-Type: application/json
### message-with-key
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-key
Content-Type: application/json
### message-with-tag
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-tag
Content-Type: application/json
### message-with-all
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/rocket-mq/message-with-all
Content-Type: application/json

@ -0,0 +1,7 @@
### wrong-rollback-for
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-rollback-for
Content-Type: application/json
### wrong-inner-call
GET http://127.0.0.1:8001/imooc-study-ecommerce-dev/transactional-lose/wrong-inner-call
Content-Type: application/json

@ -63,6 +63,7 @@
<module>dev-protocol-springcloud/dev-protocol-springcloud-project-goods-service</module> <module>dev-protocol-springcloud/dev-protocol-springcloud-project-goods-service</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-hystrix</module> <module>dev-protocol-springcloud/dev-protocol-springcloud-hystrix</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard</module> <module>dev-protocol-springcloud/dev-protocol-springcloud-hystrix-dashboard</module>
<module>dev-protocol-springcloud/dev-protocol-springcloud-message-study</module>
</modules> </modules>
<properties> <properties>

Loading…
Cancel
Save