Spring-Cloud-Stream
Spring Cloud Stream 笔记
spring-messaging
Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。
Message 消息体
Spring Messaging 中规定了 Message 的模型分为 payload 和 headers
package org.springframework.messaging;
public interface Message<T> {
/**
* Return the message payload.
*/
T getPayload();
/**
* Return message headers for the message (never {@code null} but may be empty).
*/
MessageHeaders getHeaders();
}
MessageChannel 发送消息
消息通道 MessageChannel 用于接收消息,调用 send 方法可以将消息发送至该消息通道中 :
package org.springframework.messaging;
@FunctionalInterface
public interface MessageChannel {
long INDEFINITE_TIMEOUT = -1;
default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
// 发送消息直到消息被接收或到达超时时间
boolean send(Message<?> message, long timeout);
}
SubscribableChannel 接收消息
通过消息通道的子接口可订阅的消息通道 SubscribableChannel 来进行消息消费,向其中注册消息处理器 MessageHandler
package org.springframework.messaging;
public interface SubscribableChannel extends MessageChannel {
// 注册消息处理器
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
MessageHandler 消息处理器
MessageHandler 消息处理器用于处理消息
package org.springframework.messaging;
@FunctionalInterface
public interface MessageHandler {
void handleMessage(Message<?> message) throws MessagingException;
}
Spring Cloud Stream 基本概念
Main Concepts
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_main_concepts
干货|Spring Cloud Stream 体系及原理介绍
https://fangjian0423.github.io/2019/04/03/spring-cloud-stream-intro/
Binder 绑定器
The Binder Abstraction
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-binder-abstraction
Spring Cloud Stream 屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。
Binder 是提供与外部消息中间件集成的组件,会构造 Binding, 提供了 2 个方法分别是 bindConsumer 和 bindProducer 分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部实现了 RocketMQ Binder。
消费组
Consumer Groups
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#consumer-groups
如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过 spring.cloud.stream.bindings.input.group 属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候, 只会有一个成员真正收到消息并进行处理。
默认情况下, 当没有为应用指定消费组的时候, Spring Cloud Stream 会为其分配一个独立的匿名消费组。 所以, 如果同一主题下的所有应用都没有被指定消费组的时候, 当有消息发布之后, 所有的应用都会对其进行消费, 因为它们各自都属于一个独立的组。 大部分情况下, 我们在创建 Spring Cloud Stream 应用的时候, 建议最好为其指定一个消费组,以防止对消息的重复处理, 除非该行为需要这样做(比如刷新所有实例的配置等)。
分区支持
Partitioning Support
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#partitioning
Demystifying Spring Cloud Stream producers with Apache Kafka partitions
https://spring.io/blog/2021/02/03/demystifying-spring-cloud-stream-producers-with-apache-kafka-partitions
实例
Stream Processing with Spring Cloud Stream and Apache Kafka Streams. Part 1 - Programming Model
https://spring.io/blog/2019/12/02/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-1-programming-model
StreamBridge动态选择destination(目的地本身还是静态配置)
Dynamic destinations with mutiple binders: #913
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/913
Spring Cloud Stream - route to multiple dynamic destinations at runtime
https://stackoverflow.com/questions/62247159/spring-cloud-stream-route-to-multiple-dynamic-destinations-at-runtime
利用 StreamBridge 可以实现动态发送到不同的目的地 binding, 但这些 binding 也必须提前在配置文件中定义好。
@Bean
public Consumer<String> process() {
return c -> {
if (c.equals("first")) {
System.out.println("Sending to first output");
streamBridge.send("first-out-0", c);
}
else {
System.out.println("Sending to second output");
streamBridge.send("second-out-0", c);
}
};
}
上面代码中的 first-out-0
和 second-out-0
是提前定义在配置文件中的 output binding
spring-cloud-stream-samples/multi-binder-samples/multi-binder-dynamic-destinations/
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/multi-binder-dynamic-destinations
Binder 绑定器
Classpath 中有多个绑定器时
当 classpath 中有多个绑定器(binder)时,配置时必须指明每个绑定(binding)的绑定器(binder)。
每个绑定器实现都包含一个 META-INF/spring.binders
属性文件,例如:
RabbitMQ 绑定器的是:
rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration
Kafka 绑定器的是
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
如果想自定义绑定器的话,也需要提供此文件。
此文件的 key 是绑定器的名字,value 是逗号分割的配置类列表,每个配置类里有且只有一个 org.springframework.cloud.stream.binder.Binder
接口的 Bean 定义,比如 KafkaBinderConfiguration
中是 KafkaMessageChannelBinder
绑定器选择
1、可以通过 spring.cloud.stream.defaultBinder
配置进行全局范围的绑定器选择,例如 spring.cloud.stream.defaultBinder=rabbit
2、或者可以在每个绑定(binding)配置中单独指定绑定器,例如包含一个 读绑定 input 和 写绑定 output 的应用配置,从 Kafka 读消息写入 RabbitMQ, 可通过下面的配置实现:
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit
配置选项
Configuration Options
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_configuration_options
绑定服务属性
绑定服务选项在类 org.springframework.cloud.stream.config.BindingServiceProperties
中定义
defaultBinder 默认绑定器
spring.cloud.stream.defaultBinder
默认值:空
某个绑定(binding) 没有配置单独的绑定器时,会使用默认绑定器。
通用绑定属性
Binding Properties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties
通过 spring.cloud.stream.bindings.<bindingName>.<property>=<value>
来配置绑定属性。bindingName 是要配置的绑定名。
为了避免重复,Spring Cloud Stream 支持给所有绑定设置默认属性,通过 spring.cloud.stream.default.<property>=<value>
或 spring.cloud.stream.default.<producer|consumer>.<property>=<value>
来配置所有绑定的默认属性。例如 spring.cloud.stream.default.contentType=application/json
通用绑定属性在类 org.springframework.cloud.stream.config.BindingProperties
中定义。
destination 绑定的目的地(topic)
绑定的目的地,在不同的消息中间件中叫法各不同,比如 RabbitMQ 中是 exchange, Kafka 中是 topic.
如果绑定是一个 input 输入绑定,可以绑定到逗号分割的多个目的地。
如果绑定是一个 output 输出绑定,只能绑定到一个目的地。
group 消费组
绑定的消费组,只适用于 input 输入绑定。
默认值:null
, 表示匿名消费组。
contentType 内容类型
绑定的内容类型
默认值:application/json
binder 绑定器
此绑定(binding)使用的绑定器。
默认值:null
, 配置为 null 时会使用默认值绑定器(如果存在的话)
Consumer 属性
Consumer Properties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties
消费者属性在类 org.springframework.cloud.stream.binder.ConsumerProperties
中定义。
消费者属性的前缀必须是 spring.cloud.stream.bindings.<bindingName>.consumer.
, 例如 spring.cloud.stream.bindings.input.consumer.concurrency=3
可通过 spring.cloud.stream.default.consumer
设置默认消费者属性,例如 spring.cloud.stream.default.consumer.headerMode=none
concurrency 并发数
消费者并发数
默认值:1
默认并发数 spring.cloud.stream.default.consumer.concurrency=3
指定 input 绑定的并发数 spring.cloud.stream.bindings.input.consumer.concurrency=3
headerMode
none
禁用消息 payload 中的 headerheaders
使用中间件的原生 header 机制。embeddedHeaders
在消息 payload 中嵌入 header
默认值:依赖于绑定的具体实现。headerMode
在消费非 Spring Cloud Stream 应用发出的消息时且不支持原生 header 时有用。
Unrecognized token ‘ÿ’: was expecting
错误:
Spring cloud stream kafka 消费者消息反序列化报错 Unrecognized token ‘ÿ’: was expecting
2021-05-27 17:13:03.543 [KafkaConsumerDestination{consumerDestinationName='masikkk-test-topic', partitions=1, dlqName='null'}.container-0-C-1] ERROR o.s.integration.handler.LoggingHandler.handleMessageInternal:187 - org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"�contentType "application/json"{"msgCode":"msgcode-2","msgType":"ALERT","name":"name-2"}"; line: 1, column: 4]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"�contentType "application/json"{"msgCode":"msgcode-2","msgType":"ALERT","name":"name-2"}"; line: 1, column: 4], failedMessage=GenericMessage [payload=byte[156], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@536fd40b, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=masikkk-test-topic, kafka_receivedTimestamp=1622106780457, contentType=application/json, kafka_groupId=group-local-mc-1}]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235)
发现消息开头有个特殊字符。直接通过 kafka 命令行工具 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic masikkk-test-topic --from-beginning
接收的消息也是有这个特殊字符
�
contentType"application/json"{"msgCode":"msgcode-2","msgType":"ALERT","taskId":"taskid-2","appCode":"appcode-2","deviceId":"deviceid-2","payload":{}}
原因:
发送方 Spring stream 的 headerMode 是默认值 embeddedHeaders, 所以消息中会有一个 header 但接收方没有解析这个 header
解决:
headerMode 设置为 none (3.0之前是 raw ) 时会禁用 output 中的 header 迁入,将默认值 embeddedHeaders 改为 none
高级 Consumer 配置
Producer 属性
Producer Properties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties
Producer 属性通过 org.springframework.cloud.stream.binder.ProducerProperties
类暴露。
下面的 Producer 绑定属性必须通过 spring.cloud.stream.bindings.<bindingName>.producer.
前缀定义,例如 spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=payload.id
可通过 spring.cloud.stream.default.producer
前缀设置所有 binding 的默认属性,例如 spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
partitionKeyExpression
生产者分区的 SpEL 表达式。设置后,此绑定上的出站数据会被分区。
必须将 partitionCount
设置为大于 1 分区才会生效。
默认值:null
partitionCount
partition 个数。当启用 partitioning 时,此值必须设置为大于 1
在 Kafka 上,此值和目标 topic 分区数的较大者被使用。
默认值:1
配置所有 topic 的 partition 个数默认值 spring.cloud.stream.default.producer.partitionCount=3
配置某个 tpikc 的 partition 个数 spring.cloud.stream.bindings.<bindingName>.producer.partitionCount=3
如果 topic 已经参加,之后配置这个值会报错,说无法修改 partition 个数,但可配置 spring.cloud.stream.kafka.binder.autoAddPartitions
启动 partition 个数修改
org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 3, but 1 has been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:394)
Kafka Binder 配置项
startOffset
新 group 从哪里开始消费,可配置为 earliest
或 latest
默认值 null 等价于 earliest
autoAddPartitions
spring.cloud.stream.kafka.binder.autoAddPartitions
默认值 false
问题
MessageDispatchingException: Dispatcher has no subscribers
org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, headers={kafka_offset=125, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1fccd0d6, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, kafka_receivedTopic=topic2, kafka_receivedTimestamp=1602175742577, contentType=application/json, kafka_groupId=topologyR}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
原因:
spring-cloud-stream 的 bug
解决:
升级 spring-cloud-stream 到 3.0.9 及以上即可。
Dispatcher has no subscribers when value serializer is set for reactive producer in Spring boot 2.3.3 #973
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/973
无kafka服务无法启动
使用 spring-cloud-stream 连接 kafka 默认如果 kafka 连不上则 spring boot 服务也起不来,在连接 kafka 是可选的服务中很难受。
Spring-Cloud-Stream 连接 Kafka 实例
添加依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
定义输入输出channel
public interface AlertSource {
String ALERT_INPUT = "alertInput";
String ALERT_OUTPUT = "alertOutput";
@Input(ALERT_INPUT)
MessageChannel input();
@Output(ALERT_OUTPUT)
MessageChannel output();
}
主类或配置类增加 binding
@EnableBinding({AlertSource.class})
生产者
@Component
@RequiredArgsConstructor
public class AlertProducer {
private final AlertSource alertSource;
void publish(MessageVO message) {
log.debug("publish message: {}", JsonMappers.Normal.toJson(message));
alertSource.output().send(MessageBuilder.withPayload(message).build());
}
}
消费者
@Component
public class AlertConsumer {
@Override
@StreamListener(AlertSource.ALERT_INPUT)
public void handle(@Payload MessageVO messageVO) {
log.info("接收到消息: {}", JsonMappers.Normal.toJson(messageVO));
}
}
配置项
1、配置项里将 channel 和 topic 绑定。
2、可自动创建 topic
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
default:
headerMode: none
producer:
partitionCount: 12
partitionKeyExpression: payload.code
consumer:
concurrency: 12
defaultBinder: kafka
bindings:
# 通道名
alertOutput:
# 消息发往的目的地,对应topic
destination: alert-topic
alertInput:
destination: alert-topic
group: group-local
页面信息
location:
protocol
: host
: hostname
: origin
: pathname
: href
: document:
referrer
: navigator:
platform
: userAgent
: