# RSocket 支持
# RSocket 支持
RSocket Spring 集成模块(spring-integration-rsocket
)允许执行RSocket 应用程序协议 (opens new window)。
你需要在项目中包含此依赖项:
Maven
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>5.5.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-rsocket:5.5.9"
该模块从版本 5.2 开始可用,并且基于 Spring 消息传递基础及其 RSocket 组件实现,例如RSocketRequester
,RSocketMessageHandler
和RSocketStrategies
。有关 RSocket 协议、术语和组件的更多信息,请参见Spring Framework RSocket Support (opens new window)。
在通过通道适配器启动集成流处理之前,我们需要在服务器和客户机之间建立一个 RSocket 连接。为此, Spring 集成 RSocket 支持提供了ServerRSocketConnector
和ClientRSocketConnector
的AbstractRSocketConnector
实现。
ServerRSocketConnector
根据提供的io.rsocket.transport.ServerTransport
在主机和端口上公开一个侦听器,用于接受来自客户端的连接。可以使用setServerConfigurer()
自定义内部RSocketServer
实例,以及可以配置的其他选项,例如。RSocketStrategies
和MimeType
用于负载数据和标头元数据。当一个setupRoute
是从客户端请求者提供的(参见下面的ClientRSocketConnector
)时,一个连接的客户端被存储为RSocketRequester
在由clientRSocketKeyStrategy``BiFunction<Map<String, Object>, DataBuffer, Object>
确定的键下。默认情况下,连接数据被用来作为转换为具有 UTF-8 字符集的字符串的值。这样的RSocketRequester
注册中心可以在应用程序逻辑中用于确定用于与其进行交互的特定客户端连接,或者用于向所有连接的客户端发布相同的消息。当从客户端建立连接时,RSocketConnectedEvent
将从ServerRSocketConnector
发出一个RSocketConnectedEvent
。这类似于 Spring 消息传递模块中的@ConnectMapping
注释所提供的内容。映射模式*
表示接受所有的客户端路由。RSocketConnectedEvent
可用于通过DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
报头来区分不同的路线。
典型的服务器配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项,包括RSocketStrategies
Bean 和@EventListener
对于RSocketConnectedEvent
,都是可选的。有关更多信息,请参见ServerRSocketConnector
Javadocs。
从版本 5.2.1 开始,将ServerRSocketMessageHandler
提取到一个公共的顶级类,以便与现有的 RSocket 服务器进行可能的连接。当ServerRSocketConnector
与ServerRSocketMessageHandler
的外部实例一起提供时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给所提供的实例。此外,ServerRSocketMessageHandler
还可以配置一个messageMappingCompatible
标志来处理用于 RSocket 控制器的@MessageMapping
,从而完全取代了标准RSocketMessageHandler
所提供的功能。这在混合配置中是有用的,当经典的@MessageMapping
方法与 RSocket 通道适配器一起存在于同一个应用程序中,并且在应用程序中存在外部配置的 RSocket 服务器时。
基于通过所提供的ClientTransport
连接的RSocket
,ClientRSocketConnector
充当RSocketRequester
的持有者。可以使用提供的RSocketConnectorConfigurer
来定制RSocketConnector
。还可以在此组件上配置带有元数据的setupRoute
(带有可选模板变量)和setupData
。
典型的客户端配置可能如下所示:
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
这些选项中的大多数(包括RSocketStrategies
Bean)都是可选的。请注意我们是如何连接到任意端口上本地启动的 RSocket 服务器的。关于setupData
用例,请参见ServerRSocketConnector.clientRSocketKeyStrategy
。另请参见ClientRSocketConnector
及其AbstractRSocketConnector
超类 Javadocs 以获取更多信息。
ClientRSocketConnector
和ServerRSocketConnector
都负责将入站通道适配器映射到它们的path
配置,用于路由传入的 RSocket 请求。有关更多信息,请参见下一节。
# RSocket 入站网关
RSocketInboundGateway
负责接收 RSocket 请求并产生响应(如果有的话)。它需要一个path
映射的数组,它可以作为类似于 MVC 请求映射或@MessageMapping
语义的模式。此外(自 5.2.2 版本以来),可以在RSocketInboundGateway
上配置一组交互模型(参见RSocketInteractionModel
),以根据特定的帧类型将 RSocket 请求限制到该端点。默认情况下,所有的交互模型都是支持的。这样的 Bean,根据其IntegrationRSocketEndpoint
实现(扩展 aReactiveMessageHandler
),可以通过ServerRSocketConnector
或ClientRSocketConnector
对于内部路由逻辑中的IntegrationRSocketMessageHandler
对传入请求进行自动检测。可以将AbstractRSocketConnector
提供给RSocketInboundGateway
以用于显式端点注册。通过这种方式,在AbstractRSocketConnector
上禁用自动检测选项。也可以将RSocketStrategies
注入到RSocketInboundGateway
中,或者它们是从提供的覆盖任何显式注入的AbstractRSocketConnector
中获得的。解码器是使用来自那些RSocketStrategies
来根据所提供的requestElementType
来解码请求有效负载的。如果在传入的RSocketPayloadReturnValueHandler.RESPONSE_HEADER
中没有提供RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头,则RSocketInboundGateway
将请求视为fireAndForget
rsocket 交互模型。在这种情况下,RSocketInboundGateway
在outputChannel
中执行普通的send
操作。否则,将使用来自RSocketPayloadReturnValueHandler.RESPONSE_HEADER
头的MonoProcessor
值向 RSocket 发送回复。为此,RSocketInboundGateway
在outputChannel
上执行sendAndReceiveMessageReactive
操作。根据MessagingRSocket
逻辑,向下游发送的消息的payload
始终是Flux
。当在fireAndForget
RSocket 交互模型中时,消息有一个简单的转换payload
。回复payload
可以是一个普通对象,也可以是Publisher
-RSocketInboundGateway
根据RSocketStrategies
中提供的编码器,将它们正确地转换为 RSocket 响应。
从版本 5.3 开始,向RSocketInboundGateway
添加一个decodeFluxAsUnit
选项(默认false
)。默认情况下,传入Flux
的转换方式是将其每个事件分别解码。这是当前使用@MessageMapping
语义存在的精确行为。要根据应用程序的要求恢复以前的行为或将整个Flux
解码为单个单元,必须将decodeFluxAsUnit
设置为true
。然而,目标解码逻辑依赖于所选择的Decoder
,例如,aStringDecoder
需要一个新的行分隔符(默认情况下)来表示流中的字节缓冲区结束。
有关如何配置RSocketInboundGateway
端点和处理下游有效载荷的示例,请参见用 Java 配置 RSocket 端点。
# RSocket 出站网关
RSocketOutboundGateway
是一个AbstractReplyProducingMessageHandler
,用于向 RSocket 执行请求并根据 RSocket 答复(如果有的话)生成答复。从服务器端的请求消息中提供的ClientRSocketConnector
或RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头,将低级别的 RSocket 协议交互委托给一个RSocketRequester
解析。服务器端的目标RSocketRequester
可以通过RSocketConnectedEvent
或使用ServerRSocketConnector.getClientRSocketRequester()
API 根据通过ServerRSocketConnector.setClientRSocketKeyStrategy()
为连接请求映射选择的一些业务密钥进行解析。有关更多信息,请参见ServerRSocketConnector
Javadocs。
发送请求的route
必须显式地配置(连同路径变量)或通过 SPEL 表达式配置,SPEL 表达式根据请求消息进行求值。
可以通过RSocketInteractionModel
选项或相应的表达式设置提供 RSocket 交互模型。默认情况下,requestResponse
用于通用网关用例。
当请求消息有效负载是Publisher
时,可以提供一个publisherElementType
选项来根据目标RSocketStrategies
中提供的RSocketStrategies
对其元素进行编码。这个选项的表达式可以计算为ParameterizedTypeReference
。有关数据及其类型的更多信息,请参见RSocketRequester.RequestSpec.data()
Javadocs。
还可以使用metadata
增强 RSocket 请求。为此,可以在RSocketOutboundGateway
上配置针对请求的metadataExpression
消息。这样的表达式必须求值为Map<Object, MimeType>
。
当interactionModel
不是fireAndForget
时,必须提供expectedResponseType
。默认情况下是String.class
。这个选项的表达式可以计算为ParameterizedTypeReference
。有关回复数据及其类型的更多信息,请参见RSocketRequester.RetrieveSpec.retrieveMono()
和RSocketRequester.RetrieveSpec.retrieveFlux()
Javadocs。
来自RSocketOutboundGateway
的回复payload
是Mono
(即使对于fireAndForget
交互模型,它也是Mono<Void>
),始终将此组件设置为async
。这样的Mono
是在生产前订阅的outputChannel
为常规通道或按需处理的FluxMessageChannel
。对于requestStream
或requestChannel
交互模型的Flux
响应也被包装到一个回复Mono
中。它可以通过FluxMessageChannel
的传递服务激活器在下游进行平坦化:
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
还可以将期望的响应类型配置(或通过表达式进行评估)为void
将此网关视为出站通道适配器。然而,outputChannel
仍然必须被配置(即使它只是一个NullChannel
)以启动对返回的Mono
的订阅。
有关示例,请参见用 Java 配置 RSocket 端点如何配置RSocketOutboundGateway
端点与下游有效载荷的交易。
# RSocket 名称空间支持
Spring 集成提供了rsocket
名称空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下名称空间声明:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
# 入站
要用 XML 配置 Spring 集成 RSocket 入站通道适配器,需要使用来自int-rsocket
名称空间的适当的inbound-gateway
组件。下面的示例展示了如何配置它:
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
aClientRSocketConnector
和ServerRSocketConnector
应配置为通用的<bean>
定义。
# 出站
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
有关所有这些 XML 属性的描述,请参见spring-integration-rsocket.xsd
。
# 使用 Java 配置 RSocket 端点
下面的示例展示了如何使用 Java 配置 RSocket 入站端点:
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
在此配置中,假设ClientRSocketConnector
或ServerRSocketConnector
具有自动检测“echo”路径上的此类端点的意义。请注意@Transformer
签名及其对 RSocket 请求的完全反应性处理和产生反应性答复。
下面的示例展示了如何使用 Java DSL 配置 RSocket 入站网关:
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
在此配置中假定ClientRSocketConnector
或ServerRSocketConnector
,其含义是自动检测“/大写”路径上的此类端点,并将期望的交互模型称为“请求通道”。
下面的示例展示了如何使用 Java 配置 RSocket 出站网关:
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
仅对客户端是必需的。在服务器端,必须在请求消息中提供带有RSocketRequester
值的RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
头。
下面的示例展示了如何使用 Java DSL 配置 RSocket 出站网关:
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
参见[IntegrationFlow
作为网关](./dsl.html#integration-flow-as-gateway)以获取更多信息,如何在上面的流的开头使用提到的Function
接口。