目录
  1. 1. 前言
  2. 2. 集成说明
  3. 3. 集成步骤
springboot集成kafka消息中间件

前言

目前消息中间件有很多,比如rabbitmq、rocketmq、activemq、kafka,集成方法和使用方法大同小异,消息中间件主要是解决三方面的问题:削峰、异步、解耦。曾用过rabbitmq上传机器人状态信息、用rocketmq进行订单超时取消功能。

集成说明

kafka原生并不支持传输对象,具体可用传输类型可参照下图

以Serializer后缀的为序列化类,以Deserializer后缀的为反序列化类。若需要让kafka能够传输对象,一是实体类需要实现序列化接口,二是实现Serializer和Deserializer接口。
为方便了解具体的集成步骤,本文生产者和消费者在不同的工程集成

集成步骤

1、定义实体转换类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* @ClassName BeanConvertUtils
* @Description 实体转换工具类
* @Author yuxk
* @Date 2020/11/11 10:47
* @Version 3.0
**/
public class BeanConvertUtil {

private static final Logger logger = LoggerFactory.getLogger(BeanConvertUtil.class);

private BeanConvertUtil() {

}

public static byte[] ObjectToBytes(Object obj) {
byte[] bytes = null;
ByteArrayOutputStream bo = null;
ObjectOutputStream oo = null;
try {
bo = new ByteArrayOutputStream();
oo = new ObjectOutputStream(bo);
oo.writeObject(obj);
bytes = bo.toByteArray();

} catch (IOException e) {
logger.warn(e.getMessage(), e);
} finally {
try {
if (bo != null) {
bo.close();
}
if (oo != null) {
oo.close();
}
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
return bytes;
}

/**
* 字节数组转对象
*
* @param bytes
* @return
*/
public static Object BytesToObject(byte[] bytes) {
Object obj = null;
ByteArrayInputStream bi = null;
ObjectInputStream oi = null;
try {
bi = new ByteArrayInputStream(bytes);
oi = new ObjectInputStream(bi);
obj = oi.readObject();

} catch (Exception e) {
logger.warn(e.getMessage(), e);
} finally {
try {
if (bi != null) {
bi.close();
}
if (oi != null) {
oi.close();
}
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
return obj;
}

}

2、定义序列化类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @ClassName ObjectEncodeSerializer
* @Description TODO
* @Author yuxk
* @Date 2020/11/11 10:54
* @Version 3.0
**/
public class EncodeObjectSerializer implements Serializer<Object> {

@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public byte[] serialize(String s, Object o) {
return BeanConvertUtil.ObjectToBytes(o);
}

@Override
public void close() {

}
}

3、定义反序列化类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @ClassName DecodeObjectDeserializer
* @Description 对象反序列化
* @Author yuxk
* @Date 2020/11/11 10:42
* @Version 3.0
**/
public class DecodeObjectDeserializer implements Deserializer<Object> {

@Override
public void configure(Map map, boolean b) {

}

@Override
public Object deserialize(String s, byte[] bytes) {
return BeanConvertUtil.BytesToObject(bytes);
}

@Override
public void close() {

}
}

4、生产者集成kafka
(1)pom.xml引入依赖

1
2
3
4
5
     <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>

(2)application.yml加入配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
spring:
# kafka消费者参数配置
kafka:
producer:
bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
# Kafka提供的序列化和反序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: com.powersi.kbs.utils.EncodeObjectSerializer
batch-size: 4096 # 批量大小
buffer-memory: 40960 # 生产端缓冲区大小
retries: 1 # 重试次数
# acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
properties:
linger:
ms: 0 # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了

(3)配置kafka主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Configuration
@EnableKafka
public class KafkaProducerConfig {

@Bean
public NewTopic kbsDeptSusTopic() {
return new NewTopic(TopicConstant.KBS_DEPT_SUS, 3, (short) 1);
}

@Bean
public NewTopic kbsApiDataTopic() {
return new NewTopic(TopicConstant.KBS_API_DATA, 3, (short) 1);
}

@Bean
public NewTopic kbsApiCurveTopic() {
return new NewTopic(TopicConstant.KBS_API_CURVE, 3, (short) 1);
}

@Bean
public NewTopic kbsActualDataTopic() {
return new NewTopic(TopicConstant.KBS_ACTUAL_DATA, 3, (short) 1);
}

@Bean
public NewTopic kbsRuleChartTopic() {
return new NewTopic(TopicConstant.KBS_RULE_CHART, 3, (short) 1);
}

@Bean
public NewTopic kbsSceneData() {
return new NewTopic(TopicConstant.KBS_SCENE_DATA, 3, (short) 1);
}

}

(5)定义方法用于发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @ClassName KbsScreenServiceImpl
* @Description 大屏数据发送service层
* @Author yuxk
* @Date 2020/11/10 9:33
* @Version 3.0
**/
@Service
public class KbsScreenServiceImpl {

private static final Logger logger = LoggerFactory.getLogger(KbsScreenServiceImpl.class);

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage(String topic, Object obj) {
logger.info("准备发送主题:{},消息{}", topic, JSONObject.toJSONString(obj));
// 发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
// 发送失败处理
logger.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
// 成功处理
logger.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}

}

5、消费者集成kafka
(1)pom.xml引入依赖

1
2
3
4
5
     <dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>

(2)application.yml加入配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
spring:
kafka:
consumer:
bootstrap-servers: 127.0.0.1:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
group-id: default_consumer_group #群组ID
enable-auto-commit: true # 是否自动提交offset
auto-commit-interval: 1000 # 提交offset延时(接收到消息后多久提交offset)
# Kafka提供的序列化和反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: com.powersi.kbs.utils.DecodeObjectDeserializer
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
auto-offset-reset: latest
properties:
session:
timeout: 120000 # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
request:
timeout: 180000 # 消费请求超时时间
max-poll-records: 50 # 批量消费每次最多消费多少条消息
listener:
concurrency: 10
type: batch # 设置批量消费

(3)定义主题监听方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @ClassName ConsumerListener
* @Description 消费者监听
* @Author yuxk
* @Date 2020/11/10 9:13
* @Version 3.0
**/
@Component
public class ConsumerListener {

private static final Logger logger = LoggerFactory.getLogger(ConsumerListener.class);

@KafkaListener(topics = {TopicConstant.KBS_DEPT_SUS, TopicConstant.KBS_API_DATA, TopicConstant.KBS_API_CURVE,
TopicConstant.KBS_ACTUAL_DATA, TopicConstant.KBS_RULE_CHART, TopicConstant.KBS_SCENE_DATA})
public void listen(List<ConsumerRecord<?, ?>> consumerRecords) {
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
logger.info("接收消息:" + JSON.toJSONString(consumerRecord.value()));
WebSocketServer.sendMessageToAll(WebSocketResult.success(consumerRecord.topic(), consumerRecord.value()));
}
}

}
文章作者: 微光
文章链接: http://www.guduke.cn/2020/11/29/kafka/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 微光
打赏
  • 微信
  • 支付宝

评论