# JMS 支持

# JMS 支持

Spring 集成提供了用于接收和发送 JMS 消息的通道适配器。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jms</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-jms:5.5.9"

javax.jms:javax.jms-api必须通过一些特定于供应商的 JMS 实现显式地添加,例如 Apache ActiveMQ。

实际上有两个基于 JMS 的入站通道适配器。第一种使用 Spring 的JmsTemplate基于轮询周期来接收。第二种是“消息驱动”,依赖于 Spring MessageListener容器。出站通道适配器使用JmsTemplate按需转换和发送 JMS 消息。

通过使用JmsTemplateMessageListener容器, Spring 集成依赖于 Spring 的 JMS 支持。理解这一点很重要,因为这些适配器上公开的大多数属性都配置了底层的JmsTemplateMessageListener容器。有关JmsTemplateMessageListener容器的更多详细信息,请参见Spring JMS documentation (opens new window)

Spring 尽管 JMS 通道适配器旨在用于单向消息传递(仅发送或仅接收), Spring 集成还提供了用于请求和应答操作的入站和出站 JMS 网关。入站网关依赖于 Spring 的MessageListener容器实现中的一个来实现消息驱动的接收。它还能够向reply-to目的地发送返回值,如接收到的消息所提供的那样。出站网关向request-destination(或request-destination-namerequest-destination-expression)发送一条 JMS 消息,然后接收一条回复消息。你可以显式地配置reply-destination引用(或reply-destination-namereply-destination-expression)。否则,出站网关使用 JMS临时队列 (opens new window)

在 Spring Integration2.2 之前,如果有必要,将为每个请求或回复创建(并删除)一个TemporaryQueue。从 Spring Integration2.2 开始,你可以将出站网关配置为使用MessageListener容器来接收回复,而不是直接使用新的(或缓存的)Consumer来接收每个请求的回复。在这样配置且未提供明确的应答目的地时,每个网关使用一个TemporaryQueue,而不是每个请求使用一个。

# 入站通道适配器

入站通道适配器需要对单个JmsTemplate实例的引用,或者对ConnectionFactoryDestination的引用(你可以提供一个“destinationName”来代替“destination”引用)。下面的示例定义了一个具有Destination引用的入站通道适配器:

爪哇 DSL

@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(
                    Jms.inboundAdapter(connectionFactory)
                       .destination("inQueue"),
                    e -> e.poller(poller -> poller.fixedRate(30000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}

Kotlin DSL

@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
    integrationFlow(
            Jms.inboundAdapter(connectionFactory).destination("inQueue"),
            { poller { Pollers.fixedRate(30000) } })
       {
            handle { m -> println(m.payload) }
       }

爪哇

@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
    JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
    source.setDestinationName("inQueue");
    return source;
}

XML

<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
    <int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
从前面的配置中注意到inbound-channel-adapter是一个轮询消费者。
这意味着它在触发时调用receive()
你应该仅在轮询相对不频繁且及时性不重要的情况下使用它。
对于所有其他情况(绝大多数基于 JMS 的用例),message-driven-channel-adapter稍后描述)是更好的选项。
默认情况下,所有需要引用ConnectionFactory的 JMS 适配器都会自动查找名为jmsConnectionFactory的 Bean。
,这就是为什么在许多示例中看不到connection-factory属性的原因。,但是,
,如果你的 JMSConnectionFactory具有不同的 Bean 名称,则需要提供该属性。

如果extract-payload设置为true(默认值),则接收到的 JMS 消息将通过MessageConverter传递。当依赖默认的SimpleMessageConverter时,这意味着生成的 Spring 集成消息将 JMS 消息的主体作为其有效负载。JMSTextMessage产生基于字符串的有效负载,JMSBytesMessage产生字节数组有效负载,JMSObjectMessage的可序列化实例成为 Spring 集成消息的有效负载。如果希望将原始 JMS 消息作为 Spring 集成消息的有效负载,请将extractPayload选项设置为false

从版本 5.0.8 开始,receive-timeout的默认值是-1(不等待)org.springframework.jms.connection.CachingConnectionFactorycacheConsumers,否则为 1 秒。JMS 入站通道适配器基于所提供的ConnectionFactory和选项来创建DynamicJmsTemplate板条箱。如果需要一个外部JmsTemplate(例如在 Spring 引导环境中),或者ConnectionFactory不是缓存,或者不需要cacheConsumers,则建议设置jmsTemplate.receiveTimeout(-1),如果需要非阻塞消耗:

Jms.inboundAdapter(connectionFactory)
        .destination(queueName)
        .configureJmsTemplate(template -> template.receiveTimeout(-1))

# 事务

从版本 4.0 开始,入站通道适配器支持session-transacted属性。在较早的版本中,你必须插入一个JmsTemplate,并将sessionTransacted设置为true。(适配器确实允许你将acknowledge属性设置为transacted,但这是不正确的,并且不工作)。

但是,请注意,将session-transacted设置为true几乎没有什么价值,因为事务是在receive()操作之后和消息发送到channel之前立即提交的。

如果你希望整个流是事务性的(例如,如果有一个下游出站通道适配器),则必须使用transactionalpoller 和JmsTransactionManager。或者,考虑使用jms-message-driven-channel-adapter并将acknowledge设置为transacted(默认值)。

# 消息驱动通道适配器

message-driven-channel-adapter要求引用 Spring MessageListener容器的实例(AbstractMessageListenerContainer的任意子类),或者同时引用ConnectionFactoryDestination(可以提供一个“destinationName”来代替“destination”引用)。下面的示例用Destination引用定义了消息驱动的通道适配器:

爪哇 DSL

@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                     .destination("inQueue"))
            .channel("exampleChannel")
            .get();
}

Kotlin DSL

@Bean
fun jmsMessageDrivenFlowWithContainer() =
        integrationFlow(
                Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
                             .destination("inQueue")) {
            channel("exampleChannel")
        }

Java

@Bean
public JmsMessageDrivenEndpoint jmsIn() {
    JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
    return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(cf());
    container.setDestinationName("inQueue");
    return container;
}

@Bean
public ChannelPublishingJmsMessageListener listener() {
    ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
    listener.setRequestChannelName("exampleChannel");
    return listener;
}

XML

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
消息驱动适配器还接受与MessageListener容器有关的几个属性。
只有在不提供container引用的情况下,才会考虑这些值。,
在这种情况下,将根据这些属性创建和配置
的实例,例如,你可以指定transaction-manager引用、concurrent-consumers值以及其他几个属性引用和值。
有关更多详细信息,请参见Javadoc (opens new window)和 Spring Integration 的 JMS 模式(spring-integration-jms.xsd)。,

如果你有一个自定义的侦听器容器实现(通常是DefaultMessageListenerContainer的子类),你可以通过使用container属性提供对它的实例的引用,或者通过使用container-class属性提供其完全限定的类名。
在这种情况下,适配器上的属性将被传输到自定义容器的实例。
你无法使用 Spring JMS 名称空间元素<jms:listener-container/><int-jms:message-driven-channel-adapter>配置容器引用,因为这个元素实际上并不引用容器。
每个<jms:listener/>子元素获得自己的DefaultMessageListenerContainer(在父元素<jms:listener-container/>元素上定义了共享属性)。
你可以给每个侦听子元素一个id,然后使用它将其注入通道适配器,<jms:/>命名空间需要一个真正的侦听器。

建议为<bean>配置一个常规的<bean>,并将其用作通道适配器中的引用。
从版本 4.2 开始,默认的acknowledge模式是transacted,除非你提供一个外部容器。
在这种情况下,你应该根据需要配置容器。
我们建议使用transactedDefaultMessageListenerContainer,以避免消息丢失。

“extract-payload”属性具有相同的效果,其默认值为“true”。poller元素不适用于消息驱动的通道适配器,因为它是被主动调用的。对于大多数场景,消息驱动的方法更好,因为一旦从底层 JMS 使用者接收到消息,就会将消息传递给MessageChannel

最后,<message-driven-channel-adapter>元素还接受“error-channel”属性。这提供了相同的基本功能,如[Enter theGatewayProxyFactoryBean](./gateway.html#gateway-proxy)中所述。下面的示例展示了如何在消息驱动的通道适配器上设置错误通道:

<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
    channel="exampleChannel"
    error-channel="exampleErrorChannel"/>

当将前面的示例与我们稍后讨论的通用网关配置或 JMS“入站网关”进行比较时,关键的区别在于我们处于单向流中,因为这是一个“通道适配器”,而不是网关。因此,来自“错误通道”的下游流也应该是单向的。例如,它可以发送到日志处理程序,也可以连接到不同的 JMS<outbound-channel-adapter>元素。

在使用 from topics 时,将pub-sub-domain属性设置为 true。对于持久订阅,将subscription-durable设置为true;对于共享订阅,将subscription-shared设置为subscription-shared(这需要一个 JMS2.0 代理,并且自版本 4.2 以来一直可用)。使用subscription-name来命名订阅。

从版本 5.1 开始,当端点在应用程序保持运行时停止时,底层侦听器容器将被关闭,从而关闭其共享连接和消费者。以前,连接和消费者保持开放。要恢复到以前的行为,请将JmsMessageDrivenEndpoint上的shutdownContainerOnStop设置为false

# 入站转换错误

从版本 4.2 开始,“错误通道”也用于转换错误。在此之前,如果 JMS<message-driven-channel-adapter/><inbound-gateway/>由于转换错误而无法发送消息,则会将异常抛回容器。如果容器被配置为使用事务,则消息将被回滚并反复重新交付。转换过程发生在消息构造之前和期间,以便不会将此类错误发送到“错误通道”。现在,这样的转换异常会导致ErrorMessage被发送到“错误通道”,但payload除外。如果你希望事务回滚并定义了“错误通道”,那么“错误通道”上的集成流必须重新抛出异常(或其他异常)。如果错误流没有抛出异常,则提交事务并删除消息。如果没有定义“错误通道”,则异常将像以前一样被抛回容器。

# 出站通道适配器

JmsSendingMessageHandler实现了MessageHandler接口,并且能够将 Spring 集成Messages转换为 JMS 消息,然后发送到 JMS 目的地。它需要一个jmsTemplate引用,或者同时需要jmsConnectionFactorydestination引用(destinationName可以用来代替destination)。与入站通道适配器一样,配置此适配器的最简单方法是使用名称空间支持。以下配置生成一个适配器,该适配器接收来自exampleChannel的 Spring 集成消息,将这些消息转换为 JMS 消息,并将它们发送到 Bean 名称为outQueue的 JMS 目标引用:

Java DSL

@Bean
public IntegrationFlow jmsOutboundFlow() {
    return f -> f
            .handle(Jms.outboundAdapter(cachingConnectionFactory())
                    .destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                    .configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}

Kotlin DSL

@Bean
fun jmsOutboundFlow() =
        integrationFlow {
            handle(Jms.outboundAdapter(jmsConnectionFactory())
                    .apply {
                        destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
                        deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
                        timeToLiveExpression("10000")
                        configureJmsTemplate { it.explicitQosEnabled(true) }
                    }
            )
        }

Java

@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
    JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
    handler.setDestinationName("outQueue");
    return handler;
}

XML

<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>

与入站通道适配器一样,有一个“提取有效负载”属性。然而,对于出站适配器来说,其含义是相反的。Boolean 属性不是应用于 JMS 消息,而是应用于 Spring 集成消息有效负载。换句话说,决定是将 Spring 集成消息本身作为 JMS 消息体传递,还是将 Spring 集成消息有效负载作为 JMS 消息体传递。默认值是“true”。因此,如果你传递的 Spring 集成消息的有效负载是String,那么将创建一个 JMSTextMessage。另一方面,如果你希望通过 JMS 向另一个系统发送实际的 Spring 集成消息,则将其设置为“false”。

不管有效负载提取的布尔值是什么, Spring 集成MessageHeaders映射到 JMS 属性,只要你依赖默认转换器或提供对MessageConverter的另一个实例的引用,
(对于“入站”适配器也是如此,但在这些情况下,JMS 属性映射到 Spring IntegrationMessageHeaders)。

从版本 5.1 开始,<int-jms:outbound-channel-adapter>JmsSendingMessageHandler)可以配置deliveryModeExpressiontimeToLiveExpression属性,以评估针对请求 Spring Message在运行时发送的 JMS 消息的适当 QoS 值。新的setMapInboundDeliveryMode(true)setMapInboundExpiration(true)``DefaultJmsHeaderMapper选项可以作为消息头中动态deliveryModetimeToLive的信息源:

<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
                        time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>

# 事务

从版本 4.0 开始,出站通道适配器支持session-transacted属性。在较早的版本中,你必须插入一个JmsTemplate,并将sessionTransacted设置为true。现在,该属性在内置的默认值JmsTemplate上设置属性。如果存在事务(可能来自上游message-driven-channel-adapter),则在同一事务中执行发送操作。否则,将启动一个新的事务。

# 入站网关

Spring Integration 的消息驱动 JMS Inbound-Gateway 委托给MessageListener容器,支持动态调整并发消费者,还可以处理回复。入站网关需要引用ConnectionFactory和请求Destination(或“requestDestationName”)。下面的示例定义了一个 JMS,该 JMS 从 Bean ID 引用的 JMS 队列接收,并发送到名为的 Spring 集成通道:

<int-jms:inbound-gateway id="jmsInGateway"
    request-destination="inQueue"
    request-channel="exampleChannel"/>

由于网关提供了请求-应答行为,而不是单向的发送或接收行为,因此对于“有效负载提取”,它们也有两个不同的属性(对于通道适配器的“提取-有效负载”设置,如前面讨论过)。对于入站网关,“extract-request-payload”属性确定是否提取了接收到的 JMS 消息体。如果“false”,则 JMS 消息本身就成为 Spring 集成消息的有效负载。默认值是“true”。

类似地,对于入站网关,“extract-reply-payload”属性应用于将被转换为应答 JMS 消息的 Spring 集成消息。如果你希望传递整个 Spring 集成消息(作为 JMS ObjectMessage 的主体),请将该值设置为“false”。默认情况下,将 Spring 集成消息有效负载转换为 JMS 消息也是“真的”(例如,String有效负载变为 JMS 文本消息)。

与其他任何事情一样,网关调用可能会导致错误。默认情况下,生产者不会被通知可能在消费者侧发生的错误,并且会等待回复。但是,有时你可能想要将错误条件传递回使用者(换句话说,你可能希望通过将异常映射到消息来将其视为有效的回复)。为了实现这一点,JMS 入站网关提供了对消息通道的支持,可以将错误发送到该通道进行处理,这可能会导致响应消息负载,该负载符合某些契约,该契约定义了调用者可能期望的“错误”响应。你可以使用错误通道属性来配置这样的通道,如下例所示:

<int-jms:inbound-gateway request-destination="requestQueue"
          request-channel="jmsInputChannel"
          error-channel="errorTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

你可能会注意到,这个示例看起来与[Enter theGatewayProxyFactoryBean](./gateway.html#gateway-proxy)中包含的示例非常相似。同样的想法也适用于这里:exceptionTransformer可能是创建错误响应对象的 POJO,你可以引用nullChannel来抑制错误,或者你可以将“错误通道”留在外面,让异常传播。

入站转换错误

在使用 from topics 时,将pub-sub-domain属性设置为 true。对于持久订阅,将subscription-durable设置为true;对于共享订阅,将subscription-shared设置为subscription-shared(需要一个 JMS2.0 代理,并且自版本 4.2 以来一直可用)。使用subscription-name来命名订阅。

从版本 4.2 开始,默认的acknowledge模式是transacted,除非提供了外部容器。
在这种情况下,你应该根据需要配置容器。
我们建议你使用transactedDefaultMessageListenerContainer,以避免消息丢失。

从版本 5.1 开始,当端点在应用程序保持运行时停止时,底层侦听器容器将被关闭,从而关闭其共享连接和消费者。以前,连接和消费者保持开放。要恢复到以前的行为,请将JmsInboundGateway上的shutdownContainerOnStop设置为false

# 出站网关

出站网关从 Spring 集成消息创建 JMS 消息,并将它们发送到“请求-目的地”。然后,它通过使用选择器从你配置的“reply-destination”接收 JMS 回复消息,或者,如果没有提供“reply-destination”,则通过创建 JMSTemporaryQueue实例来处理 JMS 回复消息。

reply-destination(或reply-destination-name)与具有将 cacheconsumers 设置为trueCachingConnectionFactory一起使用,可能会导致内存不足的情况。
这是因为每个请求都会获得一个带有新选择器的新消费者(在correlation-key值上进行选择,或者,当没有correlation-key时,
鉴于这些选择器是唯一的,它们在当前请求完成后仍保留在缓存中(未使用)。

如果指定了回复目的地,建议不要使用缓存的消费者。
或者,考虑使用<reply-listener/>作为如下所述

下面的示例展示了如何配置出站网关:

<int-jms:outbound-gateway id="jmsOutGateway"
    request-destination="outQueue"
    request-channel="outboundJmsRequests"
    reply-channel="jmsReplies"/>

“出站网关”有效负载提取属性与“入站网关”的有效负载提取属性是反相关的(参见先前的讨论)。这意味着“extract-request-payload”属性值应用于正在转换为 JMS 消息的 Spring 集成消息,该消息将作为请求发送。“extract-reply-payload”属性值应用于作为答复接收的 JMS 消息,然后将其转换为 Spring 集成消息,随后将其发送到“replacy-channel”,如前面的配置示例所示。

# 使用<reply-listener/>

Spring Integration2.2 引入了一种用于处理答复的替代技术。如果你在网关中添加一个<reply-listener/>子元素,而不是为每个回复创建一个使用者,那么将使用一个MessageListener容器来接收回复并将其交给请求线程。这提供了许多性能优势,并缓解了早些时候的警告中描述的缓存消费者内存利用问题。

当使用带有不具有reply-destination的出站网关的<reply-listener/>时,不是为每个请求创建TemporaryQueue,而是使用单个TemporaryQueue。(如果与代理的连接丢失并恢复,则网关会根据需要创建额外的TemporaryQueue)。

当使用correlation-key时,多个网关可以共享相同的回复目的地,因为侦听器容器使用每个网关唯一的选择器。

如果你指定了一个应答侦听器,并指定了一个应答目的地(或应答目的地名),但没有提供相关键,那么网关将记录一个警告并退回到版本 2.2 之前的行为。
这是因为在这种情况下无法配置选择器。
因此,没有办法避免回复到不同的网关,该网关可能配置了相同的回复目的地,

注意,在这种情况下,每个请求都会使用一个新的使用者,并且消费者可以在上面的警告中描述的那样在内存中建立;因此在这种情况下不应该使用缓存的消费者。

下面的示例显示了一个具有默认属性的应答侦听器:

<int-jms:outbound-gateway id="jmsOutGateway"
        request-destination="outQueue"
        request-channel="outboundJmsRequests"
        reply-channel="jmsReplies">
    <int-jms:reply-listener />
</int-jms-outbound-gateway>

侦听器非常轻量级,我们预计,在大多数情况下,你只需要一个消费者。但是,你可以添加诸如concurrent-consumersmax-concurrent-consumers等属性。有关受支持的属性的完整列表,请参见模式,以及Spring JMS documentation (opens new window)的含义。

# 空闲回复侦听器

从版本 4.2 开始,你可以根据需要启动应答侦听器(并在空闲时间之后停止它),而不是在网关的生命周期中运行。如果你在应用程序上下文中有许多网关,而这些网关大部分都是空闲的,那么这可能是有用的。其中一种情况是上下文中使用 Spring 集成和 JMS 进行分区分发的许多(非活动的)分区Spring Batch (opens new window)作业。如果所有的应答侦听器都是活动的,那么 JMS 代理为每个网关都有一个活动的消费者。通过启用空闲超时,每个使用者仅在相应的批处理作业运行时存在(并在其完成后的很短时间内存在)。

idle-reply-listener-timeoutin属性引用

# 网关回复相关性

本节描述了用于响应相关性的机制(确保发起网关仅接收对其请求的响应),具体取决于网关的配置方式。有关此处讨论的属性的完整描述,请参见属性引用

下面的列表描述了各种情况(数字是用来标识的——顺序无关紧要):

  1. noreply-destination*属性和 no<reply-listener>属性

    为每个请求创建一个TemporaryQueue,并在请求完成(成功或以其他方式完成)时删除。correlation-key是不相关的。

  2. 提供了一个reply-destination*属性,并且既不提供<reply-listener/>也不提供correlation-key

    与传出消息 is 相等的JMSCorrelationID用作消费者的消息选择器:

    messageSelector = "JMSCorrelationID = '" + messageId + "'"

    预计响应系统将在答复JMSCorrelationID中返回入站JMSMessageID。这是一种常见的模式,由 Spring 集成入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO 来实现。

    使用此配置时,不应使用主题进行回复。
    该回复可能会丢失。
  3. 提供了一个reply-destination*属性,没有提供<reply-listener/>属性,并且correlation-key="JMSCorrelationID"

    网关生成唯一的相关 IS,并将其插入JMSCorrelationID头文件中。消息选择器为:

    messageSelector = "JMSCorrelationID = '" + uniqueId + "'"

    预计响应系统将在答复JMSCorrelationID中返回入站JMSCorrelationID。这是一种常见的模式,由 Spring 集成入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO 来实现。

  4. 提供了一个reply-destination*属性,没有提供<reply-listener/>属性,并且correlation-key="myCorrelationHeader"

    网关生成唯一的相关 ID,并将其插入myCorrelationHeader消息属性中。correlation-key可以是任何用户定义的值。消息选择器为:

    messageSelector = "myCorrelationHeader = '" + uniqueId + "'"

    预计响应系统将在答复myCorrelationHeader中返回入站myCorrelationHeader

  5. 提供了一个reply-destination*属性,没有提供<reply-listener/>,并且correlation-key="JMSCorrelationID*"(请注意相关键中的*)。

    网关使用来自请求消息的jms_correlationId头中的值(如果存在),并将其插入JMSCorrelationID头中。消息选择器为:

    messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"

    用户必须确保这个值是唯一的。

    如果头不存在,则网关的行为如3中所示。

    预计响应系统将在答复JMSCorrelationID中返回入站JMSCorrelationID。这是一种常见的模式,由 Spring 集成入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO 来实现。

  6. 不提供reply-destination*属性,而提供<reply-listener>属性

    将创建一个临时队列,并将其用于该网关实例的所有回复。消息中不需要相关数据,但是在网关中内部使用传出JMSMessageID来将响应引导到正确的请求线程。

  7. 提供了一个reply-destination*属性,提供了一个<reply-listener>属性,没有提供correlation-key属性

    不允许。

    忽略<reply-listener/>配置,网关的行为与2中的一样。将写入一条警告日志消息来指示此情况。

  8. 提供了一个reply-destination*属性,提供了一个<reply-listener>属性,以及correlation-key="JMSCorrelationID"

    网关具有唯一的相关 ID 并将其插入,同时在JMSCorrelationID报头(gatewayId + "_" + ++seq)中插入一个递增的值。消息选择器为:

    messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"

    预计响应系统将在答复JMSCorrelationID中返回入站JMSCorrelationID。这是一种常见的模式,由 Spring 集成入站网关以及 Spring 的MessageListenerAdapter用于消息驱动的 POJO 来实现。由于每个网关都有一个惟一的 ID,因此每个实例只获得自己的回复。完整的相关数据用于将答复路由到正确的请求线程。

  9. 提供了一个reply-destination*属性,提供了一个<reply-listener/>属性,并且correlation-key="myCorrelationHeader"

    网关具有唯一的相关 ID 并将其插入,同时在myCorrelationHeader属性(gatewayId + "_" + ++seq)中插入一个递增的值。correlation-key可以是任何用户定义的值。消息选择器为:

    messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"

    预计响应系统将在答复myCorrelationHeader中返回入站myCorrelationHeader。由于每个网关都有一个惟一的 ID,因此每个实例只获得自己的回复。完整的相关数据用于将答复路由到正确的请求线程。

  10. 提供了一个reply-destination*属性,提供了一个<reply-listener/>属性,以及correlation-key="JMSCorrelationID*"

(注意相关键中的*

不允许。

回复侦听器不允许使用用户提供的相关 ID。网关不使用此配置初始化。

# 异步网关

从版本 4.3 开始,在配置出站网关时,你现在可以指定async="true"(或者在 Java 中是setAsync(true))。

默认情况下,当请求被发送到网关时,请求线程将被挂起,直到收到答复。然后在该线程上继续流。如果asynctrue,则在发送完成后立即释放请求线程,并在侦听器容器线程上返回答复(并且流继续)。当在 Poller 线程上调用网关时,这可能是有用的。该线程已被释放,并可用于框架内的其他任务。

async需要<reply-listener/>(或者在使用 Java 配置时setUseReplyContainer(true))。它还需要指定correlationKey(通常JMSCorrelationID)。如果不满足上述任一条件,async将被忽略。

# 属性引用

下面的清单显示了outbound-gateway的所有可用属性:

<int-jms:outbound-gateway
    connection-factory="connectionFactory" (1)
    correlation-key="" (2)
    delivery-persistent="" (3)
    destination-resolver="" (4)
    explicit-qos-enabled="" (5)
    extract-reply-payload="true" (6)
    extract-request-payload="true" (7)
    header-mapper="" (8)
    message-converter="" (9)
    priority="" (10)
    receive-timeout="" (11)
    reply-channel="" (12)
    reply-destination="" (13)
    reply-destination-expression="" (14)
    reply-destination-name="" (15)
    reply-pub-sub-domain="" (16)
    reply-timeout="" (17)
    request-channel="" (18)
    request-destination="" (19)
    request-destination-expression="" (20)
    request-destination-name="" (21)
    request-pub-sub-domain="" (22)
    time-to-live="" (23)
    requires-reply="" (24)
    idle-reply-listener-timeout="" (25)
    async=""> (26)
  <int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 引用javax.jms.ConnectionFactory
默认的jmsConnectionFactory
2 一个属性的名称,该属性包含相关数据以将响应与响应关联起来。
如果省略,则网关期望响应系统在JMSCorrelationID报头中返回出站JMSMessageID报头的值。
如果指定,网关生成一个相关 ID 并用它填充指定的属性,
响应系统必须在相同的属性中回显这个值,
它可以被设置为JMSCorrelationID,在这种情况下,使用标准标头而不是String属性来保存相关数据。
当你使用<reply-container/>时,如果你提供一个显式的correlation-key,则必须指定correlation-key
从版本 4.0.1 开始,此属性还支持值JMSCorrelationID*,这意味着如果出站消息已经有JMSCorrelationID(从jms_correlationId映射)头,则使用它而不是生成新的。,
注意,当你使用JMSCorrelationID*时,不允许使用JMSCorrelationID*键,因为容器需要在初始化过程中设置消息选择器。

你应该理解,网关无法确保唯一性,如果提供的相关 ID 不是唯一的,则可能会出现意外的副作用。
3 一个布尔值,指示交付模式应该是DeliveryMode.PERSISTENTtrue)还是DeliveryMode.NON_PERSISTENTfalse)。
只有当explicit-qos-enabledtrue时,该设置才生效。
4 aDestinationResolver.
默认值是DynamicDestinationResolver,它将目标名称映射到该名称的队列或主题。
5 当设置为true时,它允许使用服务质量属性:prioritydelivery-modetime-to-live
6 当设置为true(默认)时, Spring 集成回复消息的有效负载是从 JMS 回复消息的主体(通过使用MessageConverter)创建的。
当设置为false时,整个 JMS 消息就成为 Spring 集成消息的有效负载。
7 当设置为true(默认)时, Spring 集成消息的有效负载被转换为JMSMessage(通过使用MessageConverter)。
当设置为false时,整个 Spring 集成消息被转换为JMSMessage。,在这两种情况下,
, Spring 通过使用HeaderMapper将集成消息头映射到 JMS 头和属性。
8 aHeaderMapper用于映射 Spring 集成消息头与 JMS 消息头和属性之间的关系。
9 MessageConverter的引用,用于在 JMS 消息和 Spring 集成消息有效负载(或消息,如果extract-request-payloadfalse)之间进行转换。
默认为SimpleMessageConverter
10 请求消息的默认优先级。
如果存在,则被消息优先级标头重写。
其范围为09
只有当explicit-qos-enabledtrue时,此设置才生效。
11 等待回复的时间(以毫秒为单位)。
默认值为5000(5 秒)。
12 将回复消息发送到的通道。
13 Destination的引用,它被设置为JMSReplyTo标头。
最多只允许reply-destinationreply-destination-expressionreply-destination-name中的一个。
如果不提供,则使用TemporaryQueue对此网关的答复。
14 计算Destination的 spel 表达式,该表达式将被设置为JMSReplyTo标头。
该表达式可以生成Destination对象或String对象。
它被DestinationResolver用于解析实际的Destination
,只有reply-destinationreply-destination-expression,或reply-destination-name中的一个是允许的。
如果不提供,则使用TemporaryQueue对此网关进行回复。
15 将目标的名称设置为 jmsreplyto 头。
它被DestinationResolver用于解析实际的Destination
最多只允许reply-destinationreply-destination-expressionreply-destination-name中的一个,如果没有提供,对此网关的回复使用TemporaryQueue
16 当设置为true时,它表示由DestinationResolver解析的任何答复Destination应该是Topic,而不是Queue
17 网关在将回复消息发送到reply-channel时等待的时间。
只有当reply-channel可以阻塞时,才会产生效果——例如,容量限制为当前已满的QueueChannel
默认值为无穷大。
18 该网关接收请求消息的通道。
19 对发送请求消息的Destination的引用。
需要reply-destinationreply-destination-expressionreply-destination-name中的一个。
只能使用这三个属性中的一个。
20 计算向其发送请求消息的Destination的 spel 表达式。
该表达式可以生成Destination对象或String
它被DestinationResolver用于解析实际的Destination
中的一个reply-destination,<471"/>,或者reply-destination-name是必需的。
你只能使用这三个属性中的一个。
21 请求消息被发送到的目的地的名称。
它被DestinationResolver用于解析实际的Destination
需要reply-destinationreply-destination-expressionreply-destination-name中的一个。
只能使用这三个属性中的一个。
22 当设置为true时,它表示由DestinationResolver解析的任何请求Destination都应该是Topic,而不是Queue
23 指定消息持续时间。
此设置仅在explicit-qos-enabledtrue时生效。
24 指定此出站网关是否必须返回一个非空值。
默认情况下,该值为true,并且当底层服务在receive-timeout之后没有返回一个值时,将抛出一个MessageTimeoutException
注意,如果该服务永远不会返回一个答复,使用<int-jms:outbound-channel-adapter/>而不是使用<int-jms:outbound-gateway/>requires-reply="false"
会更好。使用后者时,发送线程被阻塞,等待receive-timeout期间的答复。
25 当你使用<reply-listener />时,其生命周期(开始和停止)默认情况下与网关的生命周期相匹配。
当该值大于0时,容器是按需启动的(当发送请求时)。
容器继续运行,直到至少这一次没有接收到请求(并且直到没有未得到答复)。
容器在下一个请求时再次启动。
停止时间是最短的实际上可能是这个值的 1.5 倍。
26 异步网关
27 当包含此元素时,将通过异步MessageListenerContainer接收答复,而不是为每个答复创建一个消费者。
在许多情况下,这可以更有效。

# 映射消息头到和来自 JMS 消息

JMS 消息可以包含元信息,如 JMS API 头和简单属性。你可以使用JmsHeaderMapper将这些消息映射到 Spring 集成消息头。JMS API 头被传递给适当的 setter 方法(例如setJMSReplyTo),而其他头则被复制到 JMS 消息的一般属性中。JMS 出站网关使用JmsHeaderMapper的默认实现来引导,它将映射标准的 JMS API 头以及原语或String消息头。还可以通过使用入站和出站网关的header-mapper属性提供自定义的头映射器。

许多 JMS 特定于供应商的客户端不允许设置deliveryModeprioritytimeToLive属性直接在已创建的 JMS 消息上。
它们被认为是 QoS 属性因此必须传播到目标MessageProducer.send(message, deliveryMode, priority, timeToLive)API。,因此
DefaultJmsHeaderMapper不会将适当的 Spring 集成头(或表达式结果)映射到所提到的 JMS 消息属性中,相反,
DynamicJmsTemplateJmsSendingMessageHandler使用,将请求消息中的头值传播到MessageProducer.send()API 中,
启用此功能,你必须将出站端点配置为DynamicJmsTemplate,并将其explicitQosEnabled属性设置为 true。
Spring Integration Java DSL 默认情况下配置DynamicJmsTemplate,但你仍然必须设置explicitQosEnabled属性。
自版本 4.0 以来,JMSPriority报头被映射到用于入站消息的标准priority报头,
以前,priority报头仅用于出站消息,
返回到以前的行为(即不映射入站优先级),将DefaultJmsHeaderMappermapInboundPriority属性设置为false
从版本 4.3 开始,DefaultJmsHeaderMapper通过调用其toString()方法将标准correlationId头映射为消息属性(correlationId通常是UUID,这是 JMS 不支持的),
在入站侧,它被映射为String
这与jms_correlationId报头无关,后者与JMSCorrelationID报头相互映射,
JMSCorrelationID通常用于关联请求和回复,而correlationId通常用于将相关消息合并到一个组中(例如使用聚合器或重排序程序)。

从版本 5.1 开始,可以将DefaultJmsHeaderMapper配置为映射入站JMSDeliveryModeJMSExpiration属性:

@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
    DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
    mapper.setMapInboundDeliveryMode(true)
    mapper.setMapInboundExpiration(true)
    return mapper;
}

这些 JMS 属性分别映射到JmsHeaders.DELIVERY_MODEJmsHeaders.EXPIRATION Spring 消息头。

# 消息转换、编组和解组

如果需要转换消息,所有 JMS 适配器和网关都允许你通过设置message-converter属性来提供MessageConverter。要做到这一点,请提供在相同的应用程序上下文中可用的MessageConverter实例的 Bean 名称。另外,为了提供与 Marshaller 和 Unmarshaller 接口的某种一致性, Spring 提供了MarshallingMessageConverter,你可以使用自己的自定义 Marshaller 和 Unmarshaller 对其进行配置。下面的示例展示了如何做到这一点。

<int-jms:inbound-gateway request-destination="requestQueue"
    request-channel="inbound-gateway-channel"
    message-converter="marshallingMessageConverter"/>

<bean id="marshallingMessageConverter"
    class="org.springframework.jms.support.converter.MarshallingMessageConverter">
    <constructor-arg>
        <bean class="org.bar.SampleMarshaller"/>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.bar.SampleUnmarshaller"/>
    </constructor-arg>
</bean>
当你提供自己的MessageConverter实例时,它仍然被包装在HeaderMappingMessageConverter中。
这意味着“extract-request-payload”和“extract-reply-payload”属性可能会影响传递给转换器的实际对象。
HeaderMappingMessageConverter本身委托给目标MessageConverter,同时还将 Spring 集成MessageHeaders映射到 JMS 消息属性并再次返回。

# JMS 支持的消息通道

前面介绍的通道适配器和网关都是为与其他外部系统集成的应用程序设计的。入站选项假设其他系统正在向 JMS 目的地发送 JMS 消息,出站选项假设其他系统正在从该目的地接收消息。 Spring 集成应用程序可以是也可以不是另一种系统。当然,当发送 Spring 集成消息实例作为 JMS 消息本身的主体时(将’extract-payload’值设置为false),假定另一个系统是基于 Spring 集成的。然而,这绝不是一项要求。这种灵活性是使用基于消息的集成选项并抽象“通道”(在 JMS 的情况下是目标)的好处之一。

有时,给定 JMS 目标的生产者和消费者都是同一个应用程序的一部分,在同一个进程中运行。你可以通过使用一对入站和出站通道适配器来实现这一点。这种方法的问题在于,你需要两个适配器,尽管从概念上讲,目标是拥有一个消息通道。 Spring 集成版本 2.0 支持更好的选项。现在可以在使用 JMS 名称空间时定义单个“通道”,如下例所示:

<int-jms:channel id="jmsChannel" queue="exampleQueue"/>

前面示例中的通道的行为很像来自主 Spring 集成名称空间的正常<channel/>元素。任何端点的input-channeloutput-channel属性都可以引用它。不同之处在于,该通道由一个名为exampleQueue的 JMS 队列实例支持。这意味着在产生端点和使用端点之间可以进行异步消息传递。然而,与通过在非 JMS<channel/>元素中添加<queue/>元素创建的更简单的异步消息通道不同,消息不存储在内存队列中。相反,这些消息将在 JMS 消息体中传递,然后底层 JMS 提供程序的全部功能可用于该通道。使用此替代方案的最常见的理由可能是利用 JMS 消息传递的存储和转发方法所提供的持久性。

如果配置正确,JMS 支持的消息通道也支持事务。换句话说,如果生产者的发送操作是回滚事务的一部分,那么生产者实际上不会写到事务 JMS 支持的通道。同样,如果接收 JMS 消息是回滚事务的一部分,则使用者不会从通道中实际删除该消息。请注意,在这样的场景中,生产者事务和消费者事务是分开的。这与跨简单的、同步的<channel/>元素传播事务上下文有很大的不同,该元素没有<queue/>子元素。

由于上面的示例引用了一个 JMS 队列实例,因此它充当了一个点对点通道。另一方面,如果需要发布-订阅行为,则可以使用一个单独的元素并引用一个 JMS 主题。下面的示例展示了如何做到这一点:

<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>

对于两种类型的 JMS 支持的通道,都可以提供目标的名称,而不是引用,如下例所示:

<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>

<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>

在前面的示例中,目标名称是通过 Spring 的默认DynamicDestinationResolver实现解析的,但是你可以提供DestinationResolver接口的任何实现。另外,JMSConnectionFactory是通道的一个必需属性,但是,在默认情况下,预期的 Bean 名称将是jmsConnectionFactory。下面的示例提供了一个用于解析 JMS 目标名称的自定义实例,并为ConnectionFactory提供了一个不同的名称:

<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
    destination-resolver="customDestinationResolver"
    connection-factory="customConnectionFactory"/>

对于<publish-subscribe-channel />,对于持久订阅,将durable属性设置为true,对于共享订阅,将subscription-shared属性设置为subscription-shared(需要一个 JMS2.0 代理,并且自版本 4.2 以来一直可用)。使用subscription来命名订阅。

# 使用 JMS 消息选择器

使用 JMS 消息选择器,你可以基于 JMS 头和 JMS 属性过滤JMS 消息 (opens new window)。例如,如果希望侦听其自定义 JMS 头属性myHeaderProperty等于something的消息,则可以指定以下表达式:

myHeaderProperty = 'something'

消息选择器表达式是SQL-92 (opens new window)条件表达式语法的一个子集,并被定义为Java 消息服务 (opens new window)规范的一部分。你可以为以下 Spring 集成 JMS 组件使用 XML 名称空间配置来指定 JMS 消息selector属性:

  • JMS 频道

  • JMS 发布订阅频道

  • JMS 入站通道适配器

  • JMS 入站网关

  • JMS 消息驱动通道适配器

不能使用 JMS 消息选择器引用消息主体的值。

# JMS 样本

要对这些 JMS 适配器进行实验,请查看 Spring Integration Samples Git 存储库中的 JMS 示例,网址为https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms (opens new window)

那个仓库里有两个样本。一个提供入站和出站通道适配器,另一个提供入站和出站网关。它们被配置为与嵌入式ActiveMQ (opens new window)进程一起运行,但是你可以修改每个示例的common.xml (opens new window) Spring 应用程序上下文文件,以支持不同的 JMS 提供者或独立的 ActiveMQ 进程。

换句话说,你可以分割配置,以便在单独的 JVM 中运行入站和出站适配器。如果安装了 ActiveMQ,请修改common.xml文件中的brokerURL属性,以使用tcp://localhost:61616(而不是vm://localhost)。这两个样本都接受来自 STDIN 的输入,并回传给 STDOUT。查看配置以了解这些消息是如何通过 JMS 路由的。