# RSocket 支持

# RSocket 支持

RSocket Spring 集成模块(spring-integration-rsocket)允许执行RSocket 应用程序协议 (opens new window)

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-rsocket:5.5.9"

该模块从版本 5.2 开始可用,并且基于 Spring 消息传递基础及其 RSocket 组件实现,例如RSocketRequesterRSocketMessageHandlerRSocketStrategies。有关 RSocket 协议、术语和组件的更多信息,请参见Spring Framework RSocket Support (opens new window)

在通过通道适配器启动集成流处理之前,我们需要在服务器和客户机之间建立一个 RSocket 连接。为此, Spring 集成 RSocket 支持提供了ServerRSocketConnectorClientRSocketConnectorAbstractRSocketConnector实现。

ServerRSocketConnector根据提供的io.rsocket.transport.ServerTransport在主机和端口上公开一个侦听器,用于接受来自客户端的连接。可以使用setServerConfigurer()自定义内部RSocketServer实例,以及可以配置的其他选项,例如。RSocketStrategiesMimeType用于负载数据和标头元数据。当一个setupRoute是从客户端请求者提供的(参见下面的ClientRSocketConnector)时,一个连接的客户端被存储为RSocketRequester在由clientRSocketKeyStrategy``BiFunction<Map<String, Object>, DataBuffer, Object>确定的键下。默认情况下,连接数据被用来作为转换为具有 UTF-8 字符集的字符串的值。这样的RSocketRequester注册中心可以在应用程序逻辑中用于确定用于与其进行交互的特定客户端连接,或者用于向所有连接的客户端发布相同的消息。当从客户端建立连接时,RSocketConnectedEvent将从ServerRSocketConnector发出一个RSocketConnectedEvent。这类似于 Spring 消息传递模块中的@ConnectMapping注释所提供的内容。映射模式*表示接受所有的客户端路由。RSocketConnectedEvent可用于通过DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER报头来区分不同的路线。

典型的服务器配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

所有选项,包括RSocketStrategies Bean 和@EventListener对于RSocketConnectedEvent,都是可选的。有关更多信息,请参见ServerRSocketConnectorJavadocs。

从版本 5.2.1 开始,将ServerRSocketMessageHandler提取到一个公共的顶级类,以便与现有的 RSocket 服务器进行可能的连接。当ServerRSocketConnectorServerRSocketMessageHandler的外部实例一起提供时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给所提供的实例。此外,ServerRSocketMessageHandler还可以配置一个messageMappingCompatible标志来处理用于 RSocket 控制器的@MessageMapping,从而完全取代了标准RSocketMessageHandler所提供的功能。这在混合配置中是有用的,当经典的@MessageMapping方法与 RSocket 通道适配器一起存在于同一个应用程序中,并且在应用程序中存在外部配置的 RSocket 服务器时。

基于通过所提供的ClientTransport连接的RSocketClientRSocketConnector充当RSocketRequester的持有者。可以使用提供的RSocketConnectorConfigurer来定制RSocketConnector。还可以在此组件上配置带有元数据的setupRoute(带有可选模板变量)和setupData

典型的客户端配置可能如下所示:

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

这些选项中的大多数(包括RSocketStrategies Bean)都是可选的。请注意我们是如何连接到任意端口上本地启动的 RSocket 服务器的。关于setupData用例,请参见ServerRSocketConnector.clientRSocketKeyStrategy。另请参见ClientRSocketConnector及其AbstractRSocketConnector超类 Javadocs 以获取更多信息。

ClientRSocketConnectorServerRSocketConnector都负责将入站通道适配器映射到它们的path配置,用于路由传入的 RSocket 请求。有关更多信息,请参见下一节。

# RSocket 入站网关

RSocketInboundGateway负责接收 RSocket 请求并产生响应(如果有的话)。它需要一个path映射的数组,它可以作为类似于 MVC 请求映射或@MessageMapping语义的模式。此外(自 5.2.2 版本以来),可以在RSocketInboundGateway上配置一组交互模型(参见RSocketInteractionModel),以根据特定的帧类型将 RSocket 请求限制到该端点。默认情况下,所有的交互模型都是支持的。这样的 Bean,根据其IntegrationRSocketEndpoint实现(扩展 aReactiveMessageHandler),可以通过ServerRSocketConnectorClientRSocketConnector对于内部路由逻辑中的IntegrationRSocketMessageHandler对传入请求进行自动检测。可以将AbstractRSocketConnector提供给RSocketInboundGateway以用于显式端点注册。通过这种方式,在AbstractRSocketConnector上禁用自动检测选项。也可以将RSocketStrategies注入到RSocketInboundGateway中,或者它们是从提供的覆盖任何显式注入的AbstractRSocketConnector中获得的。解码器是使用来自那些RSocketStrategies来根据所提供的requestElementType来解码请求有效负载的。如果在传入的RSocketPayloadReturnValueHandler.RESPONSE_HEADER中没有提供RSocketPayloadReturnValueHandler.RESPONSE_HEADER头,则RSocketInboundGateway将请求视为fireAndForgetrsocket 交互模型。在这种情况下,RSocketInboundGatewayoutputChannel中执行普通的send操作。否则,将使用来自RSocketPayloadReturnValueHandler.RESPONSE_HEADER头的MonoProcessor值向 RSocket 发送回复。为此,RSocketInboundGatewayoutputChannel上执行sendAndReceiveMessageReactive操作。根据MessagingRSocket逻辑,向下游发送的消息的payload始终是Flux。当在fireAndForgetRSocket 交互模型中时,消息有一个简单的转换payload。回复payload可以是一个普通对象,也可以是Publisher-RSocketInboundGateway根据RSocketStrategies中提供的编码器,将它们正确地转换为 RSocket 响应。

从版本 5.3 开始,向RSocketInboundGateway添加一个decodeFluxAsUnit选项(默认false)。默认情况下,传入Flux的转换方式是将其每个事件分别解码。这是当前使用@MessageMapping语义存在的精确行为。要根据应用程序的要求恢复以前的行为或将整个Flux解码为单个单元,必须将decodeFluxAsUnit设置为true。然而,目标解码逻辑依赖于所选择的Decoder,例如,aStringDecoder需要一个新的行分隔符(默认情况下)来表示流中的字节缓冲区结束。

有关如何配置RSocketInboundGateway端点和处理下游有效载荷的示例,请参见用 Java 配置 RSocket 端点

# RSocket 出站网关

RSocketOutboundGateway是一个AbstractReplyProducingMessageHandler,用于向 RSocket 执行请求并根据 RSocket 答复(如果有的话)生成答复。从服务器端的请求消息中提供的ClientRSocketConnectorRSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER头,将低级别的 RSocket 协议交互委托给一个RSocketRequester解析。服务器端的目标RSocketRequester可以通过RSocketConnectedEvent或使用ServerRSocketConnector.getClientRSocketRequester()API 根据通过ServerRSocketConnector.setClientRSocketKeyStrategy()为连接请求映射选择的一些业务密钥进行解析。有关更多信息,请参见ServerRSocketConnectorJavadocs。

发送请求的route必须显式地配置(连同路径变量)或通过 SPEL 表达式配置,SPEL 表达式根据请求消息进行求值。

可以通过RSocketInteractionModel选项或相应的表达式设置提供 RSocket 交互模型。默认情况下,requestResponse用于通用网关用例。

当请求消息有效负载是Publisher时,可以提供一个publisherElementType选项来根据目标RSocketStrategies中提供的RSocketStrategies对其元素进行编码。这个选项的表达式可以计算为ParameterizedTypeReference。有关数据及其类型的更多信息,请参见RSocketRequester.RequestSpec.data()Javadocs。

还可以使用metadata增强 RSocket 请求。为此,可以在RSocketOutboundGateway上配置针对请求的metadataExpression消息。这样的表达式必须求值为Map<Object, MimeType>

interactionModel不是fireAndForget时,必须提供expectedResponseType。默认情况下是String.class。这个选项的表达式可以计算为ParameterizedTypeReference。有关回复数据及其类型的更多信息,请参见RSocketRequester.RetrieveSpec.retrieveMono()RSocketRequester.RetrieveSpec.retrieveFlux()Javadocs。

来自RSocketOutboundGateway的回复payloadMono(即使对于fireAndForget交互模型,它也是Mono<Void>),始终将此组件设置为async。这样的Mono是在生产前订阅的outputChannel为常规通道或按需处理的FluxMessageChannel。对于requestStreamrequestChannel交互模型的Flux响应也被包装到一个回复Mono中。它可以通过FluxMessageChannel的传递服务激活器在下游进行平坦化:

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

或在目标应用程序逻辑中显式订阅。

还可以将期望的响应类型配置(或通过表达式进行评估)为void将此网关视为出站通道适配器。然而,outputChannel仍然必须被配置(即使它只是一个NullChannel)以启动对返回的Mono的订阅。

有关示例,请参见用 Java 配置 RSocket 端点如何配置RSocketOutboundGateway端点与下游有效载荷的交易。

# RSocket 名称空间支持

Spring 集成提供了rsocket名称空间和相应的模式定义。要将其包含在配置中,请在应用程序上下文配置文件中添加以下名称空间声明:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

# 入站

要用 XML 配置 Spring 集成 RSocket 入站通道适配器,需要使用来自int-rsocket名称空间的适当的inbound-gateway组件。下面的示例展示了如何配置它:

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

aClientRSocketConnectorServerRSocketConnector应配置为通用的<bean>定义。

# 出站

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

有关所有这些 XML 属性的描述,请参见spring-integration-rsocket.xsd

# 使用 Java 配置 RSocket 端点

下面的示例展示了如何使用 Java 配置 RSocket 入站端点:

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

在此配置中,假设ClientRSocketConnectorServerRSocketConnector具有自动检测“echo”路径上的此类端点的意义。请注意@Transformer签名及其对 RSocket 请求的完全反应性处理和产生反应性答复。

下面的示例展示了如何使用 Java DSL 配置 RSocket 入站网关:

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlows
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

在此配置中假定ClientRSocketConnectorServerRSocketConnector,其含义是自动检测“/大写”路径上的此类端点,并将期望的交互模型称为“请求通道”。

下面的示例展示了如何使用 Java 配置 RSocket 出站网关:

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector()仅对客户端是必需的。在服务器端,必须在请求消息中提供带有RSocketRequester值的RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER头。

下面的示例展示了如何使用 Java DSL 配置 RSocket 出站网关:

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

参见[IntegrationFlow作为网关](./dsl.html#integration-flow-as-gateway)以获取更多信息,如何在上面的流的开头使用提到的Function接口。