# 消息传递端点

# 消息传递端点

# 消息端点

本章的第一部分介绍了一些背景理论,并揭示了驱动 Spring 集成的各种消息传递组件的底层 API。如果你想真正了解幕后的情况,这些信息可能会很有帮助。但是,如果你想要启动并运行各种元素的基于名称空间的简化配置,现在可以跳过端点命名空间支持

正如在概述中提到的,消息端点负责将各种消息传递组件连接到通道。在接下来的几章中,我们将介绍使用消息的许多不同组件。其中一些还能够发送回复消息。发送消息非常简单。如前面消息通道中所示,可以将消息发送到消息通道。然而,接收要复杂一些。主要原因是有两类消费者:民意测验消费者 (opens new window)事件驱动的消费者 (opens new window)

在这两种情况中,事件驱动的消费者要简单得多。不需要管理和调度单独的 Poller 线程,它们本质上是具有回调方法的侦听器。当连接到 Spring 集成的可订阅消息通道之一时,这个简单的选项非常有效。然而,当连接到一个缓冲的、可匹配的消息通道时,一些组件必须调度和管理轮询线程。 Spring 集成提供了两种不同的端点实现,以适应这两种类型的消费者。因此,消费者本身只需要实现回调接口。当需要轮询时,端点充当使用者实例的容器。其好处类似于使用容器来托管消息驱动的 bean,但是,由于这些使用者是在ApplicationContext中运行的 Spring 管理对象,因此它更类似于 Spring 自己的MessageListener容器。

# 消息处理程序

Spring 集成的MessageHandler接口由框架内的许多组件实现。换句话说,这不是公共 API 的一部分,并且你通常不会直接实现MessageHandler。然而,消息使用者使用它来实际处理所使用的消息,因此了解此策略接口确实有助于理解使用者的整体角色。接口定义如下:

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

尽管它很简单,但是这个接口为下面几章中涉及的大多数组件(路由器、转换器、分发器、聚合器、服务激活器和其他组件)提供了基础。这些组件各自对它们处理的消息执行非常不同的功能,但是实际接收消息的要求是相同的,并且轮询和事件驱动行为之间的选择也是相同的。 Spring 集成提供了两个端点实现,它们承载这些基于回调的处理程序,并让它们连接到消息通道。

# 事件驱动的消费者

因为它是两个中比较简单的,所以我们首先讨论事件驱动的消费者端点。你可能还记得SubscribableChannel接口提供了一个subscribe()方法,并且该方法接受一个MessageHandler参数(如[SubscribableChannel](./channel.html#channel-interfaces-subscribablechannel)所示)。下面的清单显示了subscribe方法的定义:

subscribableChannel.subscribe(messageHandler);

由于订阅了一个通道的处理程序不必主动轮询该通道,因此这是一个事件驱动的消费者, Spring 集成提供的实现接受一个SubscribableChannel和一个MessageHandler,如下例所示:

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

# 轮询消费者

Spring 集成还提供了一个PollingConsumer,并且可以以相同的方式进行实例化,只是通道必须实现PollableChannel,如下例所示:

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询消费者的更多信息,请参见Poller通道适配器

轮询使用者还有许多其他配置选项。例如,触发器是一个必需的属性。下面的示例展示了如何设置触发器:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(30, TimeUnit.SECONDS));

PeriodicTrigger通常定义为一个简单的间隔(以毫秒为单位),但也支持initialDelay属性和布尔fixedRate属性(默认为false——即没有固定的延迟)。以下示例设置了这两个属性:

PeriodicTrigger trigger = new PeriodicTrigger(1000);
trigger.setInitialDelay(5000);
trigger.setFixedRate(true);

在前面的示例中,三个设置的结果是一个触发器,该触发器等待五秒钟,然后每秒触发一次。

CronTrigger需要一个有效的 CRON 表达式。有关详细信息,请参见爪哇doc (opens new window)。下面的示例设置了一个新的CronTrigger:

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

在前面的示例中定义的触发器的结果是,从星期一到星期五,每十秒钟触发一次触发器。

除了触发器,你还可以指定另外两个与轮询相关的配置属性:maxMessagesPerPollreceiveTimeout。下面的示例展示了如何设置这两个属性:

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll属性指定在给定的轮询操作中要接收的消息的最大数量。这意味着,Poller 将继续调用receive()而不等待,直到返回null或达到最大值为止。例如,如果一个 Poller 有一个 10 秒的间隔触发器,并且maxMessagesPerPoll设置为25,并且它正在轮询一个在其队列中有 100 条消息的通道,那么所有 100 条消息都可以在 40 秒内检索到。它抓取 25 个,等待 10 秒,抓取下一个 25,以此类推。如果maxMessagesPerPoll被配置为负值,则在一个轮询周期内调用MessageSource.receive(),直到返回null。从版本 5.5 开始,0值具有特殊的含义-完全跳过MessageSource.receive()调用,这可能被视为暂停此轮询端点,直到maxMessagesPerPoll在以后的时间更改为 n 个非零值,例如通过控制总线。

receiveTimeout属性指定当调用器调用 receive 操作时,如果没有可用消息,那么调用器应该等待的时间。例如,考虑两个表面上看起来相似但实际上完全不同的选项:第一个选项的间隔触发时间为 5 秒,接收超时时间为 50 毫秒,而第二个选项的间隔触发时间为 50 毫秒,接收超时时间为 5 秒。第一个接收消息的时间可能比它到达频道的时间晚 4950 毫秒(如果该消息是在其一个轮询调用返回后立即到达的)。另一方面,第二种配置对消息的漏失永远不会超过 50 毫秒。不同之处在于,第二个选项需要等待线程。然而,结果是,它可以更快地响应到达的消息。这种技术被称为“长轮询”,可以用来模拟被轮询数据源上的事件驱动行为。

轮询消费者也可以委托给 Spring TaskExecutor,如下例所示:

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,PollingConsumer具有一个名为adviceChain的属性。此属性允许你指定一个List的 AOP 建议,用于处理包括事务在内的其他交叉问题。这些建议是围绕doPoll()方法应用的。有关更多的深入信息,请参见端点命名空间支持下的 AOP 建议链和事务支持部分。

前面的示例显示了依赖项查找。然而,请记住,这些消费者通常被配置为 Spring Bean 定义。实际上, Spring 集成还提供了一个名为FactoryBeanConsumerEndpointFactoryBeanConsumerEndpointFactoryBean,它基于通道的类型创建适当的消费者类型。此外, Spring 集成具有完整的 XML 命名空间支持,以进一步隐藏这些细节。基于名称空间的配置在本指南中介绍了每个组件类型。

许多MessageHandler实现都可以生成回复消息。
如前所述,与接收消息相比,发送消息是微不足道的。
然而,何时以及发送多少回复消息取决于处理程序类型。例如,
,聚合器等待大量消息到达,并且通常被配置为拆分器的下游使用者,拆分器可以为其处理的每条消息生成多个答复。
当使用名称空间配置时,你不需要严格地了解所有细节。,但是,
,仍然值得知道的是,这些组件中的几个共享一个公共基类,即AbstractReplyProducingMessageHandler,并且它提供了一个setOutputChannel(..)方法。

# 端点命名空间支持

在本参考手册中,你可以找到端点元素的特定配置示例,例如路由器、转换器、服务激活器等等。其中大多数支持input-channel属性,许多支持output-channel属性。在被解析之后,这些端点元素产生PollingConsumerEventDrivenConsumer的实例,这取决于所引用的input-channel的类型:PollableChannelSubscribableChannel,分别。当通道是可匹配的时,轮询行为基于端点元素的poller子元素及其属性。

下面的清单列出了poller的所有可用配置选项:

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            id=""                                    (6)
            max-messages-per-poll=""                 (7)
            receive-timeout=""                       (8)
            ref=""                                   (9)
            task-executor=""                         (10)
            time-unit="MILLISECONDS"                 (11)
            trigger="">                              (12)
            <int:advice-chain />                     (13)
            <int:transactional />                    (14)
</int:poller>
1 提供通过使用 CRON 表达式配置 Pollers 的能力。
底层实现使用org.springframework.scheduling.support.CronTrigger
如果设置了此属性,则必须指定以下所有属性:fixed-delaytriggerfixed-rateref
2 通过将此属性设置为true,你可以精确地定义一个全局默认 poller。
引发了异常如果在应用程序上下文中定义了一个以上的默认 Poller。
连接到PollableChannelPollingConsumer)或任何没有显式配置 Poller 的SourcePollingChannelAdapter的端点,则使用全局默认 Poller。
它默认为false
可选的。
3 标识如果此调用器的调用发生故障时发送错误消息的通道。
要完全抑制异常,可以提供对nullChannel的引用。
可选的。
4 固定延迟触发器在覆盖项下使用PeriodicTrigger
如果不使用time-unit属性,则指定的值以毫秒为单位表示。
如果设置了此属性,则必须指定以下所有属性:fixed-ratetriggercronref
5 固定费率触发器在覆盖项下使用PeriodicTrigger
如果不使用time-unit属性,则指定的值以毫秒为单位表示。
如果设置了该属性,则不需要指定以下属性:fixed-delaytriggercron,以及ref
6 引用 poller 的底层 Bean-定义的 ID 类型为org.springframework.integration.scheduling.PollerMetadata
对于顶级 poller 元素,id属性是必需的,除非它是默认的 poller(default="true")。
7 有关更多信息,请参见配置入站通道适配器
如果未指定,默认值取决于上下文。
如果使用PollingConsumer,则此属性默认为-1
但是,如果使用max-messages-per-poll,则max-messages-per-poll属性默认为1。<gtr=" 可选。
8 值是在基础类PollerMetadata上设置的。
如果未指定,它的默认值为 1000(毫秒)。
可选。
9 Bean 引用另一个顶级 poller。
ref属性不能出现在顶级poller元素上。
但是,如果设置了此属性,则不能指定以下属性:fixed-ratecron,以及fixed-delay
10 提供引用自定义任务执行器的能力。
有关更多信息,请参见TaskExecutor 支持
可选。
11 此属性指定底层java.util.concurrent.TimeUnit上的java.util.concurrent.TimeUnitenum 值。
因此,此属性只能与fixed-delayfixed-rate属性结合使用。
如果与crontrigger引用属性结合使用,它会导致失败。
PeriodicTrigger所支持的最小粒度是毫秒。
因此,唯一可用的选项是毫秒和秒。
如果不提供该值,则任何fixed-delayfixed-rate值都被解释为毫秒。
基本上,这个枚举为基于秒的间隔触发值提供了便利。
对于每小时、每天和每月的设置,我们建议使用cron触发器。
12 引用实现org.springframework.scheduling.Trigger接口的任何 Spring-配置的 Bean。
但是,如果设置了此属性,则必须指定以下任何属性:fixed-delayfixed-ratecronref可选的。
13 允许指定额外的 AOP 建议来处理额外的横切问题。
有关更多信息,请参见事务支持
可选。
14 可以使 poller 成为事务性的。
有关更多信息,请参见AOP Advice chains
可选。
# 示例

具有 1 秒间隔的简单的基于间隔的 Poller 可以配置如下:

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

作为使用fixed-rate属性的替代方法,你还可以使用fixed-delay属性。

对于基于 CRON 表达式的 poller,请使用cron属性,如下例所示:

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果输入通道是PollableChannel,则需要进行 poller 配置。具体地说,正如前面提到的,triggerPollingConsumer类的一个必需属性。因此,如果省略用于轮询消费者端点配置的poller子元素,可能会引发异常。如果你试图在连接到不可搜索通道的元素上配置一个 Poller,也可能引发异常。

也可以创建顶级 Pollers,在这种情况下,只需要一个ref属性,如下例所示:

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref属性仅在内部 poller 定义上允许。
在顶级 poller 上定义此属性会导致在初始化应用程序上下文期间引发配置异常。
# 全局默认 Poller

为了进一步简化配置,你可以定义一个全局默认 Poller。XML DSL 中的单个顶级 Poller 组件可能将default属性设置为true。对于 爪哇 配置,在这种情况下必须声明带有PollerMetadata.DEFAULT_POLLER名称的PollerMetadata Bean。在这种情况下,任何具有PollableChannel作为其输入通道的端点,都是在相同的ApplicationContext中定义的,并且没有显式配置poller的端点使用该默认值。下面的示例展示了这样的 Poller 和使用它的 Transformer:

Java DSL

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlows.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}

Java

@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}

Kotlin DSL

@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }

XML

<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>
# 事务支持

Spring 集成还为 Pollers 提供事务支持,以便每个接收和转发操作可以作为工作的原子单元来执行。要为 Poller 配置事务,请添加<transactional/>子元素。下面的示例显示了可用的属性:

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

有关更多信息,请参见Poller 事务支持

# AOP 建议链

由于 Spring 事务支持依赖于用TransactionInterceptor( AOP 通知)处理由 Poller 发起的消息流的事务行为的代理机制,因此有时必须提供额外的建议来处理与 Poller 相关的其他交叉行为。为此,poller定义了一个advice-chain元素,该元素允许你在实现MethodInterceptor接口的类中添加更多的建议。下面的示例展示了如何为poller定义advice-chain:

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

有关如何实现MethodInterceptor接口的更多信息,请参见AOP sections of the Spring Framework Reference Guide (opens new window)。也可以在没有任何事务配置的 Poller 上应用一个建议链,从而增强由 Poller 发起的消息流的行为。

当使用建议链时,不能指定<transactional/>子元素。
相反,声明一个<tx:advice/> Bean 并将其添加到<advice-chain/>中。
有关完整的配置详细信息,请参见Poller 事务支持
# TaskExecutor 支持

轮询线程可以由 Spring 的TaskExecutor抽象的任何实例执行。这使端点或一组端点能够并发。截至 Spring 3.0,核心 Spring 框架有一个task名称空间,其<executor/>元素支持创建一个简单的线程池执行器。该元素接受用于公共并发设置的属性,例如池大小和队列容量。配置线程池执行器可以使端点在负载下的执行方式发生很大的变化。这些设置可用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期卷)。要为配置了 XML 名称空间支持的轮询端点启用并发,请在其<poller/>元素上提供task-executor引用,然后提供一个或多个属性,如以下示例所示:

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

如果你不提供任务执行器,则会在调用者的线程中调用使用者的处理程序。请注意,调用者通常是默认的TaskScheduler(参见配置任务调度程序)。还应该记住,通过指定 Bean 名称,task-executor属性可以提供对 Spring 的TaskExecutor接口的任何实现的引用。前面显示的executor元素是为了方便而提供的。

正如前面在轮询消费者的后台部分中提到的,你还可以以这样一种方式配置轮询使用者,以便模拟事件驱动的行为。通过长时间的接收超时和较短的触发间隔,你可以确保对到达的消息做出非常及时的反应,即使是在经过调查的消息源上。请注意,这仅适用于具有超时阻塞等待调用的源。例如,文件 poller 不会阻塞。每个receive()调用都会立即返回,并且要么包含新文件,要么不包含新文件。因此,即使 poller 包含一个长receive-timeout,在这种情况下也不会使用该值。另一方面,当使用 Spring 集成自己的基于队列的通道时,超时值确实有机会参与。下面的示例展示了轮询消费者如何几乎在瞬间接收消息:

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

使用这种方法不会带来太多的开销,因为在内部,它只不过是一个定时等待线程,它所需的 CPU 资源使用量几乎不像(例如)颠簸的无限 while 循环那样多。

# 在运行时更改轮询速率

当配置带有fixed-delayfixed-rate属性的 Poller 时,默认实现使用PeriodicTrigger实例。PeriodicTrigger是核心 Spring 框架的一部分。它只接受作为构造函数参数的时间间隔。因此,它不能在运行时进行更改。

但是,你可以定义自己的org.springframework.scheduling.Trigger接口的实现。你甚至可以使用PeriodicTrigger作为起点。然后,你可以为间隔(周期)添加一个 setter,或者你甚至可以在触发器本身中嵌入你自己的节流逻辑。对nextExecutionTime的每次调用都使用period属性来安排下一次投票。要在 Pollers 中使用此自定义触发器,请在应用程序上下文中声明 Bean 自定义触发器的定义,并使用trigger属性将依赖项注入到 Poller 配置中,该属性引用自定义触发器 Bean 实例。现在可以获得对触发器的引用 Bean 并更改轮询之间的轮询间隔。

有关示例,请参见Spring Integration Samples (opens new window)项目。它包含一个名为dynamic-poller的示例,该示例使用自定义触发器,并演示了在运行时更改轮询间隔的能力。

该示例提供了一个自定义触发器,该触发器实现了[org.springframework.scheduling.Trigger](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/schooling/trigger.html)接口。该示例的触发器基于 Spring 的[PeriodicTrigger](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/schooling/support/perioditragger.html)实现。但是,自定义触发器的字段不是最终的,属性具有显式的 getter 和 setter,允许你在运行时动态地更改轮询周期。

不过,需要注意的是,由于触发器方法是nextExecutionTime(),因此基于现有配置,对动态触发器的任何更改直到下一次轮询才生效。
在当前配置的下一次执行时间之前,不可能强制触发。

# 有效载荷类型转换

在整个参考手册中,你还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任何任意的Object作为输入参数。在Object的情况下,这样的参数被映射到消息有效负载或有效负载或报头的一部分(当使用 Spring 表达式语言时)。然而,端点方法的输入参数的类型有时与有效负载或其部分的类型不匹配。在这个场景中,我们需要执行类型转换。 Spring 集成提供了一种方便的方式来注册类型转换器(通过使用 Spring )在其自身实例内的名为的转换服务 Bean。 Bean 是在通过使用 Spring 集成基础设施定义第一转换器时自动创建的。要注册转换器,可以实现org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory

Converter实现是最简单的,可以从一种类型转换为另一种类型。对于更复杂的操作,例如转换为类层次结构,你可以实现GenericConverter,也可以实现ConditionalConverter。这使你能够完全访问fromto类型描述符,从而实现复杂的转换。例如,如果你有一个名为Something的抽象类,它是转换的目标(参数类型、通道数据类型等等),那么你有两个具体的实现,分别称为Thing1Thing,并且你希望根据输入类型转换为一个或另一个,GenericConverter将是一个很好的匹配。有关更多信息,请参见这些接口的 Javadoc:

当你实现了转换器之后,你可以使用方便的名称空间支持对其进行注册,如下例所示:

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,你可以使用内部 Bean,如下例所示:

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

从 Spring Integration4.0 开始,你可以使用注释来创建前面的配置,如下例所示:

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

或者,你可以使用@Configuration注释,如下例所示:

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}
在配置应用程序上下文时, Spring 框架允许你添加conversionService Bean(参见配置转换服务 (opens new window)章)。
当需要时,该服务用于在 Bean 创建和配置期间执行适当的转换,相比之下,

integrationConversionService用于运行时转换。
这些用途是完全不同的。
转换器是用于连接 Bean 构造函数参数和属性时使用的转换器,如果在运行时用于 Spring 针对数据类型通道、有效负载类型转换器中的消息的集成表达式计算,则可能会产生意想不到的结果,以此类推。

但是,如果你确实希望使用 Spring conversionService作为 Spring 集成integrationConversionService,则可以在应用程序上下文中配置一个别名,如以下示例所示:


在这种情况下,
conversionService提供的转换器可用于 Spring 集成运行时转换。

# 内容类型转换

从版本 5.0 开始,默认情况下,方法调用机制基于org.springframework.messaging.handler.invocation.InvocableHandlerMethod基础架构。它的HandlerMethodArgumentResolver实现(例如PayloadArgumentResolverMessageMethodArgumentResolver)可以使用MessageConverter抽象来将传入的payload转换为目标方法参数类型。该转换可以基于contentType消息头。为此, Spring Integration 提供了ConfigurableCompositeMessageConverter,它将委托给要调用的已注册转换器的列表,直到其中一个转换器返回非空结果。默认情况下,此转换器提供(以严格的顺序):

  1. [MappingJackson2MessageConverter](https://DOCS. Spring.io/ Spring-framework/DOCS/current/javadoc-api/org/springframework/jms/support/converter/mappingJackson2messageconverter.html)如果 Jackson 处理器存在于 Classpath 上

  2. [ByteArrayMessageConverter](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/SpringFramework/Messaging/Converter/BytearrayMessageConverter.html)

  3. [ObjectStringMessageConverter](https://DOCS. Spring.io/ Spring-integration/DOCS/current/api//org/springframework/integration/support/converter/objectstringmessageConverter.html)

  4. [GenericMessageConverter](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/SpringFramework/Messaging/Converter/GenericMessageConverter.html)

有关其目的和用于转换的适当的contentType值的更多信息,请参见 Javadoc(在前面的列表中链接)。使用ConfigurableCompositeMessageConverter是因为它可以与任何其他MessageConverter实现一起提供,包括或排除前面提到的默认转换器。 Bean 还可以在应用程序上下文中将其注册为适当的,覆盖默认的转换器,如以下示例所示:

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

这两个新的转换器是注册在复合之前的默认值。你也可以不使用ConfigurableCompositeMessageConverter,而是通过使用名称注册 Bean integrationArgumentResolverMessageConverter(通过设置IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME属性)来提供你自己的MessageConverter

当使用 SPEL 方法调用时,基于MessageConverter(包括contentTypeheader)的转换是不可用的。
在这种情况下,只有在有效载荷类型转换中提到的常规类到类的转换是可用的。

# 异步轮询

如果希望轮询是异步的,则 Poller 可以选择指定一个task-executor属性,该属性指向任何TaskExecutor Bean 的现有实例( Spring 3.0 通过task命名空间提供了一个方便的命名空间配置)。但是,在使用TaskExecutor配置 Poller 时,你必须了解某些事情。

问题在于存在两种配置,poller 和TaskExecutor。他们必须互相配合。否则,你可能最终会创建一个人工内存泄漏。

考虑以下配置:

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

前面的配置演示了一个过时的配置。

默认情况下,任务执行器有一个无界的任务队列。即使所有线程都被阻塞,Poller 仍会继续调度新任务,等待新消息到达或超时过期。考虑到有 20 个线程以 5 秒的超时时间执行任务,它们的执行速度为每秒 4 个线程。然而,新任务的调度速度为每秒 20 次,因此任务执行器中的内部队列以每秒 16 次的速度增长(而进程是空闲的),因此我们存在内存泄漏。

处理此问题的方法之一是设置任务执行器的queue-capacity属性。即使是 0 也是一个合理的值。你还可以通过设置任务执行器的rejection-policy属性(例如,设置为DISCARD)来指定如何处理不能排队的消息来管理它。换句话说,在配置TaskExecutor时,你必须了解某些细节。有关此主题的更多详细信息,请参见 Spring 参考手册中的“任务执行和调度” (opens new window)

# 端点内部 bean

许多端点是复合 bean。这包括所有的消费者和所有的入站通道适配器。消费者(民意测验或事件驱动)委托给MessageHandler。经过轮询的适配器通过委托给MessageSource来获取消息。 Bean 通常情况下,获得对委托的引用是有用的,可能用于在运行时更改配置或进行测试。这些 bean 可以从具有众所周知的名称的ApplicationContext中获得。MessageHandler实例使用与someConsumer.handler相似的 Bean ID 在应用程序上下文中注册,(其中’Consumer’是端点的id属性的值),MessageSource实例使用与somePolledAdapter.source相似的 Bean ID 注册,其中“somepolledapter”是适配器的 ID。

前面的只适用于框架组件本身。你可以使用内部 Bean 定义,如下例所示:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

Bean 被视为与任何内部 Bean 声明的一样,并且不与应用上下文注册。如果你希望以某种其他方式访问此 Bean,请在顶层用id声明它,并使用ref属性。有关更多信息,请参见Spring Documentation (opens new window)

# 端点角色

从版本 4.2 开始,可以将端点分配给角色。角色让端点作为一个组开始和停止。这在使用领导选举时特别有用,在这种情况下,可以分别在授予或撤销领导时启动或停止一组端点。为此,框架在应用程序上下文中以IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER的名称注册了SmartLifecycleRoleController Bean。每当需要控制生命周期时,这 Bean 可以被注入或@Autowired:

<bean class="com.some.project.SomeLifecycleControl">
    <property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>

你可以使用 XML、Java 配置或编程方式将端点分配给角色。下面的示例展示了如何使用 XML 配置端点角色:

<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
        auto-startup="false">
    <int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>

下面的示例展示了如何为用 Java 创建的 Bean 配置端点角色:

@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
    return // some MessageHandler
}

下面的示例展示了如何在 Java 中的方法上配置端点角色:

@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
    return payload.toUpperCase();
}

下面的示例展示了如何使用 Java 中的SmartLifecycleRoleController配置端点角色:

@Autowired
private SmartLifecycleRoleController roleController;
...
    this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...

下面的示例展示了如何在 Java 中使用IntegrationFlow配置端点角色:

IntegrationFlow flow -> flow
        .handle(..., e -> e.role("cluster"));

每一个都将端点添加到cluster角色。

调用roleController.startLifecyclesInRole("cluster")和相应的stop…​方法来启动和停止端点。

任何实现SmartLifecycle的对象都可以通过编程方式添加——而不仅仅是端点。

SmartLifecycleRoleController实现了ApplicationListener<AbstractLeaderEvent>,并且在授予或撤销领导权限时(当某些 Bean 发布OnGrantedEventOnRevokedEvent时),它会自动启动和停止其配置的SmartLifecycle对象。

当使用 leadership election 来启动和停止组件时,将auto-startupXML 属性(autoStartup Bean 属性)设置为false非常重要,这样应用程序上下文在上下文初始化期间就不会启动组件。

从版本 4.3.8 开始,SmartLifecycleRoleController提供了几种状态方法:

public Collection<String> getRoles() (1)

public boolean allEndpointsRunning(String role) (2)

public boolean noEndpointsRunning(String role) (3)

public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 返回被管理角色的列表。
2 如果角色中的所有端点都在运行,则返回true
3 如果角色中的端点都没有运行,则返回true
4 返回component name : running status的映射。
组件名称通常是 Bean 名称。

# 领导力事件处理

端点组可以根据分别授予或撤销的领导才能来启动和停止。这在群集场景中很有用,在群集场景中,共享资源必须仅由单个实例使用。这方面的一个例子是一个文件入站通道适配器,它正在轮询一个共享目录。(见读取文件)。

为了参与领导者选举并在当选领导者、领导者被撤销或未能获得成为领导者的资源时获得通知,应用程序在应用程序上下文中创建了一个称为“领导者发起者”的组件。通常,leader 发起者是SmartLifecycle,因此它在上下文启动时启动(可选地),然后在 leadership 更改时发布通知。你还可以通过将publishFailedEvents设置为true(从版本 5.0 开始)来接收失败通知,以便在发生失败时采取特定操作。按照惯例,你应该提供一个Candidate来接收回调。你还可以通过框架提供的Context对象撤销领导地位。你的代码还可以侦听o.s.i.leader.event.AbstractLeaderEvent实例(OnGrantedEventOnRevokedEvent的超类)并做出相应的响应(例如,通过使用SmartLifecycleRoleController)。这些事件包含对Context对象的引用。下面的清单显示了Context接口的定义:

public interface Context {

	boolean isLeader();

	void yield();

	String getRole();

}

从版本 5.0.6 开始,上下文提供了对候选人角色的引用。

Spring 集成提供了一种基于LockRegistry抽象的 leader 启动器的基本实现。要使用它,你需要创建一个实例作为 Bean,如下例所示:

@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
    return new LockRegistryLeaderInitiator(locks);
}

如果正确地实现了锁注册表,则最多只有一个领导者。如果 Lock Registry 还提供了锁,当异常过期或中断时抛出异常(理想情况下,InterruptedException),则无引导期的持续时间可以短于锁实现中固有的延迟所允许的时间。默认情况下,busyWaitMillis属性会增加一些额外的延迟,以防止在(更常见的)锁不完善且只有在再次尝试获得锁时才知道锁已过期的情况下出现 CPU 短缺。

有关领导选举和使用 ZooKeeper 的活动的更多信息,请参见动物园管理员领导事件处理

# 消息传递网关

网关隐藏了 Spring 集成提供的消息传递 API。它让应用程序的业务逻辑不了解 Spring Integration API。通过使用通用网关,你的代码只与一个简单的接口交互。

# 输入GatewayProxyFactoryBean

如前所述,不依赖 Spring 集成 API(包括 Gateway 类)将是很好的。出于这个原因, Spring 集成提供了GatewayProxyFactoryBean,它为任何接口生成代理,并在内部调用下面所示的网关方法。通过使用依赖注入,你可以将接口公开给你的业务方法。

下面的示例显示了可用于与 Spring 集成交互的接口:

package org.cafeteria;

public interface Cafe {

    void placeOrder(Order order);

}

# 网关 XML 命名空间支持

还提供了名称空间支持。它允许你将接口配置为服务,如下例所示:

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

在定义了这种配置之后,cafeService现在可以被注入到其他 bean 中,并且调用Cafe接口的代理实例上的方法的代码不知道 Spring 集成 API。一般的方法类似于 Spring 远程处理(RMI、Httpinvoker 等)。参见“Samples”附录中使用gateway元素的示例(在 Cafe 演示中)。

前面配置中的默认值应用于网关接口上的所有方法。如果未指定应答超时,则调用线程将无限期地等待应答。见未收到响应时的网关行为

对于单个方法,可以重写默认值。见带有注释和 XML 的网关配置

# 设置默认回复通道

通常,你不需要指定default-reply-channel,因为网关会自动创建一个临时的匿名回复通道,在该通道中监听回复。然而,有些情况下可能会提示你定义default-reply-channel(或者reply-channel具有适配器网关,例如 HTTP、JMS 和其他)。

对于一些背景,我们简要地讨论了网关的一些内部工作。网关创建一个临时的点对点应答通道。它是匿名的,并以replyChannel的名称添加到消息头中。当提供显式default-reply-channel(带有远程适配器网关的reply-channel)时,你可以指向一个发布-订阅通道,之所以这样命名,是因为可以向它添加多个订阅服务器。 Spring 在内部,集成在临时replyChannel和显式定义的default-reply-channel之间创建了一个桥。

假设你希望你的回复不仅发送到网关,还发送到其他一些消费者。在这种情况下,你想要两件事:

  • 你可以订阅的指定频道

  • 该频道为发布-订阅-频道

网关使用的默认策略不能满足这些需求,因为添加到报头的应答通道是匿名的和点对点的。这意味着没有其他订阅者可以获得它的句柄,并且,即使可以,该通道也具有点对点行为,使得只有一个订阅者将获得该消息。通过定义default-reply-channel,你可以指向你选择的通道。在这种情况下,这是一个publish-subscribe-channel。网关创建了一个桥,从它到临时的匿名回复通道,该通道存储在头文件中。

你可能还希望显式地提供一个响应通道,用于通过拦截器进行监视或审核(例如,wiretap)。要配置通道拦截器,你需要一个命名通道。

从版本 5.4 开始,当网关方法返回类型是void时,如果没有明确提供这样的头,则框架将replyChannel头填充为nullChannel Bean 引用。
这允许丢弃来自下游流的任何可能的答复,从而满足单向网关契约。

# 带注释和 XML 的网关配置

考虑下面的示例,该示例通过添加@Gateway注释,扩展了前面的Cafe接口示例:

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header注释允许你添加被解释为消息头的值,如下例所示:

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

如果你更喜欢使用 XML 方法来配置网关方法,那么可以在网关配置中添加method元素,如下例所示:

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

你还可以使用 XML 为每个方法调用提供单独的头。如果你想要设置的标头本质上是静态的,并且你不想通过使用@Header注释将它们嵌入到网关的方法签名中,那么这可能是有用的。例如,在 Loan Broker 示例中,我们希望根据发起的请求类型(单个报价或所有报价)来影响如何进行贷款报价的聚合。通过评估调用了哪个网关方法来确定请求的类型,尽管这是可能的,但这将违反 Concerns 分离范式(该方法是一个 Java 工件)。然而,在消息传递体系结构中,在消息头中表达你的意图(元信息)是很自然的。下面的示例展示了如何为两个方法中的每一个添加不同的消息头:

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

在前面的示例中,根据网关的方法,为“response_type”头设置了不同的值。

例如,如果在<int:method/>中指定requestChannel以及在@Gateway注释中指定requestChannel,则注释值获胜。
如果在 XML 中指定了无参数网关,并且接口方法同时具有@Payload@Gateway注释(在payloadExpression元素中带有payload-expressionpayload-expression元素),则@Payload值将被忽略。
# 表达式和“全局”头

<header/>元素支持expression作为value的替代项。计算 SPEL 表达式以确定标头的值。从版本 5.2 开始,计算上下文的#root对象是带有getMethod()getArgs()访问器的MethodArgsHolder对象。

从版本 5.2 开始,这两个表达式求值上下文变量已被弃用:

  • #args:包含方法参数的Object[]

  • #GatewayMethod:表示调用的service-interface中的方法的对象(派生自java.reflect.Method)。包含此变量的头可以在流的后面使用(例如,用于路由)。例如,如果你希望对简单的方法名进行路由,则可以添加一个带有以下表达式的头:#gatewayMethod.name

java.reflect.Method不可序列化。如果你以后序列化消息,则带有method表达式的头将丢失。
因此,在这些情况下,你可能希望使用method.namemethod.toString()service-interface方法提供了该方法的String表示,包括参数和返回类型。

从版本 3.0 开始,<default-header/>元素可以被定义为向网关产生的所有消息添加头,而不管调用的方法是什么。为方法定义的特定标头优先于缺省标头。这里为方法定义的特定标头覆盖了服务接口中的任何@Header注释。但是,缺省标头不会覆盖服务接口中的任何@Header注释。

网关现在还支持default-payload-expression,它适用于所有方法(除非重写)。

# 将方法参数映射到消息

使用上一节中的配置技术,可以控制如何将方法参数映射到消息元素(有效负载和标题)。当不使用显式配置时,将使用某些约定来执行映射。在某些情况下,这些约定不能确定哪个参数是有效负载,哪个参数应该映射到头。考虑以下示例:

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

在第一种情况下,惯例是将第一个参数映射到有效负载(只要它不是Map),并且第二个参数的内容成为标题。

在第二种情况下(或者当参数thing1的参数是Map时的第一种情况),框架无法确定哪个参数应该是有效负载。因此,映射失败。这通常可以使用payload-expression@Payload注释或@Headers注释来解决。

或者(每当约定失效时),你可以承担将方法调用映射到消息的全部责任。为此,实现MethodArgsMessageMapper,并通过使用mapper属性将其提供给<gateway/>。映射器映射了MethodArgsHolder,这是一个简单的类,它包装java.reflect.Method实例和包含参数的Object[]实例。当提供自定义映射器时,在网关上不允许default-payload-expression属性和<default-header/>元素。类似地,payload-expression属性和<header/>元素在任何<method/>元素上都是不允许的。

# 映射方法参数

以下示例展示了如何将方法参数映射到消息,并展示了无效配置的一些示例:

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("#args[0] + #args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(#args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 请注意,在本例中,spel 变量#this引用了参数——在本例中,是s的值。

等效的 XML 看起来有些不同,因为方法参数没有#this上下文。但是,表达式可以通过使用#args变量来引用方法参数,如下例所示:

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="#args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(#args[0])"/>
  <int:method name="send3" payload-expression="#method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="#args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

# @MessagingGateway注解

从版本 4.0 开始,网关服务接口可以用@MessagingGateway注释来标记,而不需要定义用于配置的<gateway />XML 元素。下面的两个示例比较了配置相同网关的两种方法:

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
与 XML 版本类似, Spring 集成在组件扫描期间发现这些注释时,将使用其消息传递基础设施创建proxy实现,
以执行此扫描并在应用程序上下文中注册BeanDefinition,将@IntegrationComponentScan注释添加到@Configuration类中。
标准@ComponentScan基础设施不处理接口。
因此,我们引入了自定义@IntegrationComponentScan逻辑,以细化接口上的@MessagingGateway注释,并为它们注册GatewayProxyFactoryBean实例。
另请参见注释支持

@MessagingGateway注释一起,你可以使用@Profile注释标记服务接口,以避免创建 Bean,如果这样的配置文件不是活动的。

如果没有 XML 配置,则在至少一个@Configuration类上需要@EnableIntegration注释。
参见[configuration and@EnableIntegration](./overview.html#configuration-enable-integration)以获取更多信息。

# 调用无参数方法

在没有任何参数的网关接口上调用方法时,默认的行为是从PollableChannel接收Message

然而,有时你可能希望触发无参数方法,以便你可以与不需要用户提供的参数的其他下游组件进行交互,例如触发无参数 SQL 调用或存储过程。

要实现发送和接收语义,你必须提供一个有效负载。要生成有效负载,不需要接口上的方法参数。你可以在method元素上使用@Payload注释或在 XML 中使用payload-expression属性。下面的列表包括几个有效载荷可能是什么的例子:

  • 字面意义的字符串

  • #gatewaymethod.name

  • 新建 java.util.date()

  • @somebean.somemethod()的返回值

下面的示例展示了如何使用@Payload注释:

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

你也可以使用@Gateway注释。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
如果两个注释都存在(并且提供了payloadExpression),则@Gateway获胜。

另见带有注释和 XML 的网关配置

如果一个方法没有参数,也没有返回值,但确实包含有效负载表达式,那么它将被视为只发送操作。

# 调用default方法

网关代理的接口也可以有default方法,并且从版本 5.3 开始,该框架将DefaultMethodInvokingMethodInterceptor注入到代理中,以便使用default方法而不是代理方法来调用java.lang.invoke.MethodHandle方法。来自 JDK 的接口,例如java.util.function.Function,仍然可以用于网关代理,但是它们的default方法不能被调用,因为内部的 Java 安全原因导致针对 JDK 类的MethodHandles.Lookup实例化。这些方法还可以使用显式的@Gateway方法上的注释,或proxyDefaultMethods上的注释或proxyDefaultMethods上的注释或@MessagingGateway上的注释或<gateway>XML 组件来代理(失去它们的实现逻辑,同时,恢复以前的网关代理行为)。

# 错误处理

网关调用可能会导致错误。默认情况下,下游发生的任何错误都会在网关的方法调用中“按原样”重新抛出。例如,考虑以下简单的流程:

gateway -> service-activator

如果服务激活器调用的服务抛出一个MyException(例如),框架将其包装为MessagingException,并在failedMessage属性中将传递的消息附加到服务激活器。因此,框架执行的任何日志记录都具有故障的全部上下文。默认情况下,当异常被网关捕获时,MyException将被打开并抛给调用方。可以在网关方法声明中配置throws子句,以匹配原因链中的特定异常类型。例如,如果你希望捕获整个MessagingException带有所有消息传递信息的下行错误的原因,则应该具有类似于以下的网关方法:

public interface MyGateway {

    void performProcess() throws MessagingException;

}

由于我们鼓励 POJO 编程,因此你可能不希望将调用方暴露于消息传递基础结构中。

如果你的网关方法没有throws子句,则网关遍历原因树,查找不是RuntimeExceptionRuntimeException。如果没有找到,则框架抛出MessagingException。如果前面的讨论中的MyException的原因是SomeOtherException,并且你的方法throws SomeOtherException,那么网关将进一步展开该原因,并将其抛给调用者。

当网关声明为 noservice-interface时,将使用内部框架接口RequestReplyExchanger

考虑以下示例:

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

在版本 5.0 之前,这个exchange方法没有throws子句,因此,异常被打开。如果你使用此接口并希望恢复以前的打开行为,请使用自定义的service-interface,或者自己访问cause中的MessagingException

但是,你可能希望记录错误,而不是传播错误,或者你可能希望将异常视为有效的答复(通过将其映射到符合调用者理解的某些“错误消息”契约的消息)。为了实现这一点,网关通过包括对error-channel属性的支持,提供了对专门用于错误的消息通道的支持。在下面的示例中,“Transformer”从Exception创建一个答复Message:

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer可以是一个简单的 POJO,它知道如何创建预期的错误响应对象。这将成为发送回调用者的有效负载。如果有必要,你可以在这样的“错误流程”中做更多复杂的事情。它可能涉及路由器(包括 Spring Integration 的ErrorMessageExceptionTypeRouter)、过滤器等等。然而,在大多数情况下,一个简单的“变压器”就足够了。

或者,你可能希望只记录异常(或将异常异步发送到某个地方)。如果你提供了单向流,那么将不会向呼叫者发送任何内容。如果希望完全抑制异常,可以提供对全局nullChannel方法的引用(本质上是/dev/null方法)。最后,正如上面提到的,如果没有error-channel被定义,那么异常将照常传播。

当你使用@MessagingGateway注释(参见[@messaginggatewayAnnotation](#messaging-gateway-annotation))时,你可以使用errorChannel属性。

从版本 5.0 开始,当你使用带有void返回类型(单向流)的网关方法时,error-channel引用(如果提供)将在每个发送消息的标准errorChannel头中填充。该特性允许基于标准ExecutorChannel配置(或QueueChannel)的下游异步流覆盖默认的全局errorChannel异常发送行为。以前,你必须手动指定带有@GatewayHeader注释或<header>元素的errorChannel头。对于具有异步流的void方法,忽略了error-channel属性。相反,错误消息被发送到默认的errorChannel

通过简单的 POJI 网关公开消息传递系统会带来好处,但“隐藏”底层消息传递系统的现实确实是要付出代价的,因此,你应该考虑某些事情。
我们希望我们的 Java 方法能够尽快返回,而不是在调用者等待它返回时无限期地挂起(是否无效,返回值,或抛出异常)。
当常规方法被用作消息传递系统前面的代理时,我们必须考虑到底层消息传递的潜在异步性质。
这意味着可能存在这样一种可能性,即网关发起的消息可能会被过滤器丢弃,而永远不会到达组件。这是产生回复的原因。
某些服务激活器方法可能会导致异常,从而不提供回复(因为我们不生成空消息)。
换句话说,多个场景可能会导致回复消息永远不会出现。
这在消息传递系统中是非常自然的。
但是,考虑一下网关方法的含义。网关的方法输入参数被合并到一条消息中,并向下游发送。
回复消息将被转换为网关方法的返回值,
因此,你可能需要确保,对于每个网关调用,总是有一条回复消息。,否则,你的网关方法可能永远不会返回并无限期地挂起。
处理这种情况的一种方法是使用异步网关(在本节稍后进行说明)。
另一种处理方法是显式地设置reply-timeout属性。
,网关的挂起时间不会超过reply-timeout指定的时间,如果超时过了,则返回’null’。最后,你可能需要考虑在服务激活器上设置下游标志,例如’requires-reply’,或者在过滤器上设置’throw-exceptions-on-reference’。本章的最后一节将对这些选项进行更详细的讨论。
如果下游流返回一个ErrorMessage,则其payload(aThrowable)被视为一个常规的下游错误。
如果配置了一个error-channel,则将其发送到错误流。
否则,有效负载将被抛给网关的调用者。,如果error-channel上的错误流返回ErrorMessage,它的有效负载被抛给调用者。
这同样适用于任何具有Throwable有效负载的消息。
当你需要将Exception直接传播给调用者时,这在异步情况下很有用。,
这样做,你可以返回一个Exception(作为来自某个服务的reply),也可以抛出它,
通常情况下,即使使用异步流,框架负责将下游流引发的异常传播回网关。
TCP 客户机-服务器多路复用 (opens new window)示例演示了这两种技术都可以将异常返回给调用者。
它通过使用aggregatorgroup-timeout(参见聚合器和组超时)以及在 discard 流上的MessagingTimeoutException答复来模拟套接字 IO 错误到等待线程。

# 网关超时

网关有两个超时属性:requestTimeoutreplyTimeout。请求超时仅在通道可以阻塞(例如,有界QueueChannel已满)的情况下才适用。replyTimeout值是网关等待回复或返回null的时间。它默认值为无穷大。

对于网关(defaultRequestTimeoutdefaultReplyTimeout)或MessagingGateway接口注释上的所有方法,超时都可以设置为默认值。单个方法可以覆盖这些默认值(在<method/>子元素中)或在@Gateway注释上。

从版本 5.0 开始,超时可以定义为表达式,如下例所示:

@Gateway(payloadExpression = "#args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "#args[1]", replyTimeoutExpression = "#args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

评估上下文有一个BeanResolver(使用@someBean引用其他 bean),并且#args数组变量是可用的。

在使用 XML 进行配置时,超时属性可以是长值或 SPEL 表达式,如下例所示:

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="#args[0]"
                      request-timeout="1000"
                      reply-timeout="#args[1]">
</method>

# 异步网关

作为一种模式,消息传递网关提供了一种很好的方式来隐藏特定于消息传递的代码,同时仍然公开消息传递系统的全部功能。由于前面描述的GatewayProxyFactoryBean提供了一种方便的方式,可以通过服务接口公开代理,从而使你能够基于 POJO 访问消息传递系统(基于你自己的域中的对象、原语/字符串或其他对象)。但是,当网关通过返回值的简单 POJO 方法公开时,这意味着对于每个请求消息(在调用方法时生成),必须有一个答复消息(在方法返回时生成)。由于消息传递系统是自然异步的,因此你可能不能总是保证“对于每个请求,总是会有一个答复”的契约。 Spring Integration2.0 引入了对异步网关的支持,它提供了一种方便的方式来初始化流,当你可能不知道是否需要回复或回复需要多长时间才能到达时。

为了处理这些类型的场景, Spring 集成使用java.util.concurrent.Future实例来支持异步网关。

从 XML 配置来看,没有任何变化,你仍然以与定义常规网关相同的方式定义异步网关,如下例所示:

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

然而,网关接口(一种服务接口)有一点不同,如下所示:

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

如前面的示例所示,网关方法的返回类型是Future。当GatewayProxyFactoryBean看到网关方法的返回类型是Future时,它通过使用AsyncTaskExecutor立即切换到异步模式。这就是差异的程度。对这种方法的调用总是以Future实例立即返回。然后,你可以按照自己的速度与Future进行交互,以获得结果、取消等等。同样,与使用Future实例的任何其他情况一样,调用get()可能会显示超时、执行异常等。下面的示例展示了如何使用从异步网关返回的Future:

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

有关更详细的示例,请参见 Spring 集成示例中的异步网关 (opens new window)示例。

# ListenableFuture

从版本 4.1 开始,异步网关方法还可以返回ListenableFuture(在 Spring Framework4.0 中引入)。这些返回类型允许你提供回调,在结果可用(或发生异常)时调用该回调。当网关检测到此返回类型并且任务执行者AsyncListenableTaskExecutor时,将调用执行器的submitListenable()方法。下面的示例展示了如何使用ListenableFuture:

ListenableFuture<String> result = this.asyncGateway.async("something");
result.addCallback(new ListenableFutureCallback<String>() {

    @Override
    public void onSuccess(String result) {
        ...
    }

    @Override
    public void onFailure(Throwable t) {
        ...
    }
});
# AsyncTaskExecutor

默认情况下,GatewayProxyFactoryBean在为返回类型为Future的任何网关方法提交内部AsyncInvocationTask实例时使用org.springframework.core.task.SimpleAsyncTaskExecutor。但是,<gateway/>元素的配置中的async-executor属性允许你提供对 Spring 应用程序上下文中可用的java.util.concurrent.Executor的任何实现的引用。

(默认)SimpleAsyncTaskExecutor同时支持FutureListenableFuture返回类型,分别返回FutureTaskListenableFutureTask。参见[CompletableFuture]。尽管有一个默认的执行器,但提供一个外部的执行器通常是有用的,这样你就可以在日志中标识它的线程(当使用 XML 时,线程名称是基于执行器的 Bean 名称),如下例所示:

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

如果你希望返回一个不同的Future实现,那么你可以提供一个自定义执行器,或者完全禁用该执行器,并从下游流返回回复消息有效负载中的Future。要禁用执行器,请在GatewayProxyFactoryBean中将其设置为null(通过使用setAsyncTaskExecutor(null))。当使用 XML 配置网关时,使用async-executor=""。当使用@MessagingGateway注释进行配置时,请使用类似于以下代码的代码:

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
如果返回类型是特定的具体Future实现或配置的执行器不支持的其他子接口,则流在调用方的线程上运行,流必须在应答消息有效负载中返回所需的类型。
# CompletableFuture

从版本 4.2 开始,网关方法现在可以返回CompletableFuture<?>。返回此类型时有两种操作模式:

  • 当提供了异步执行器并且返回类型正好是CompletableFuture(不是子类)时,框架在执行器上运行任务,并立即将CompletableFuture返回给调用者。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)用于创建 future。

  • 当异步执行器显式地设置为null并且返回类型是CompletableFuture或者返回类型是CompletableFuture的子类时,将在调用者的线程上调用该流。在这种情况下,预计下游流将返回适当类型的CompletableFuture

# 使用场景

在下面的场景中,调用者线程立即返回一个CompletableFuture<Invoice>,当下游流回应到网关(使用Invoice对象)时,这个过程就完成了。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

在下面的场景中,当下游流将调用者线程作为对网关的响应的有效负载提供时,调用者线程返回CompletableFuture<Invoice>。当发票准备好时,其他一些过程必须完成。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

在下面的场景中,当下游流将调用者线程作为对网关的响应的有效负载提供时,调用者线程返回CompletableFuture<Invoice>。当发票准备好时,其他一些过程必须完成。如果启用了DEBUG日志记录,则会发出一个日志条目,表示异步执行器不能用于此场景。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

CompletableFuture实例可用于对回复执行额外的操作,如下例所示:

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);
# 反应堆Mono

从版本 5.0 开始,GatewayProxyFactoryBean允许使用带有网关接口方法的项目反应堆 (opens new window),使用[Mono<T>](https://github.com/reactor/reactor-core)返回类型。内部AsyncInvocationTask包装在Mono.fromCallable()中。

可以使用Mono稍后检索结果(类似于Future<?>),或者你可以在结果返回到网关时通过调用你的Consumer与 Dispatcher 一起使用它。

Mono不会被框架立即刷新。
因此,在网关方法返回之前,底层消息流不会启动(就像Future<?>``Executor任务一样)。
当订阅Mono时,该流开始。
或者,当subscribe()与整个Flux相关时,Mono(作为一个“可组合”)可能是反应器流的一部分。
下面的示例展示了如何使用 Project Reactor 创建网关:
@MessagingGateway
public static interface TestGateway {

	@Gateway(requestChannel = "promiseChannel")
	Mono<Integer> multiply(Integer value);

	}

	    ...

	@ServiceActivator(inputChannel = "promiseChannel")
	public Integer multiply(Integer value) {
			return value * 2;
	}

		...

    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(integers -> ...);

另一个使用 Project Reactor 的示例是一个简单的回调场景,如下例所示:

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

调用线程继续,当流完成时调用handleInvoice()

# 下游流返回异步类型

正如上面的ListenableFuture小节中提到的,如果你希望某些下游组件返回带有异步负载的消息(FutureMono,以及其他),则必须显式地将异步执行器设置为null(或者在使用 XML 配置时"")。然后在调用者线程上调用该流,然后可以检索结果。

# void返回类型

与前面提到的返回类型不同,当方法返回类型是void时,框架不能隐式地确定你希望下游流异步运行,而调用者线程立即返回。在这种情况下,你必须用@Async注释接口方法,如下例所示:

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

Future<?>返回类型不同,如果流引发了某些异常,则无法通知调用方,除非某些自定义TaskExecutor(例如ErrorHandlingTaskExecutor)与@Async注释相关联。

# 未收到响应时的网关行为

由于早些时候解释过,网关提供了一种通过 POJO 方法调用与消息传递系统进行交互的便捷方式。但是,通常期望总是返回(即使有异常)的典型方法调用可能并不总是将一对一映射到消息交换(例如,回复消息可能不会到达——相当于方法不返回)。

本节的其余部分涵盖了各种场景,以及如何使网关的行为更具可预测性。可以对某些属性进行配置,以使同步网关行为更可预测,但其中一些属性可能并不总是如你所期望的那样工作。其中一个是reply-timeout(在方法级别或default-reply-timeout在网关级别)。我们检查reply-timeout属性,以查看它如何能够和不能在各种场景中影响同步网关的行为。我们研究了单线程场景(下游的所有组件都通过直接通道连接)和多线程场景(例如,在下游的某个地方,你可能有一个打破单线程边界的 pollable 或 executor 通道)。

# 长期运行的流程下游

单线程同步网关

如果下游组件仍在运行(可能是由于无限循环或服务速度较慢),则设置reply-timeout没有任何作用,并且网关方法调用直到下游服务退出(通过返回或抛出异常)才返回。

同步网关,多线程

如果多线程消息流中的下游组件仍在运行(可能是由于无限循环或服务速度较慢),那么设置reply-timeout会产生一种效果,即允许网关方法调用在超时后返回,因为GatewayProxyFactoryBean在回复通道上轮询,等待消息,直到超时结束。但是,如果在产生实际的回复之前已经达到超时,则可能导致网关方法返回“null”。你应该理解,在网关方法调用返回之后,回复消息(如果产生)将被发送到一个回复通道,因此你必须意识到这一点,并在设计流程时将其考虑在内。

# 下游组件返回’null’

同步网关——单线程

如果组件下游返回’null’,并且没有配置reply-timeout,则网关方法调用将无限期地挂起,除非已经配置了reply-timeout或者在可能返回’null’的下游组件(例如,服务激活器)上设置了requires-reply属性。在这种情况下,将抛出一个异常并将其传播到网关。

同步网关——多线程

行为与前一例相同。

# 下游组件返回签名为“void”,而网关方法签名为非 void

同步网关——单线程

如果组件下游返回“void”,并且没有配置reply-timeout,则网关方法调用将无限期地挂起,除非配置了reply-timeout

同步网关——多线程

行为与前一例相同。

# 下游组件导致运行时异常

同步网关——单线程

如果组件下游抛出一个运行时异常,则该异常将通过一条错误消息传播回网关并重新抛出。

同步网关——多线程

行为与前一例相同。

你应该理解,默认情况下,reply-timeout是无界的。,因此,如果不显式地设置reply-timeout,则网关方法调用可能会无限期地挂起,
,为了确保你分析了你的流,并且如果存在发生这些场景中的一种远程可能性,你应该将reply-timeout属性设置为“’safe’”值,
甚至更好,你可以将下游组件的requires-reply属性设置为“true”,以确保及时响应,当下游组件内部返回 null 时,抛出异常就会产生异常。
但是你也应该意识到在某些情况下(参见第一个),reply-timeout并没有帮助,
这意味着分析消息流并决定何时使用同步网关而不是异步网关也很重要,
as前面描述的,后一种情况是定义返回Future实例的网关方法,
然后保证接收到该返回值,并且对调用结果有更细粒度的控制。
同样,在处理路由器时,你应该记住,将resolution-required属性设置为“true”会导致路由器在无法解析特定通道时引发异常,
同样,在处理过滤器时,你可以设置throw-exception-on-rejection属性,在这两种情况下,
都是这样,生成的流的行为就像它包含一个具有“requires-reply”属性的服务激活器。
换句话说,它有助于确保网关方法调用的及时响应。
reply-timeout对于<gateway/>元素是无界的(由GatewayProxyFactoryBean创建)。
用于外部集成的入站网关(WS、HTTP 等)与这些网关共享许多特性和属性。
但是,对于那些入站网关,默认的reply-timeout是 1000 毫秒(一秒)。
如果对另一个线程进行了下游异步切换,则可能需要增加此属性,以便在网关超时之前为流留出足够的时间来完成。
你应该理解,定时器是在线程返回网关时启动的——也就是说,当流完成或将消息传递给另一个线程时,
此时,调用线程开始等待回复。,
如果流是完全同步的,这个回复是立即可用的。
对于异步流,线程等待的时间最多到这个时候。

有关通过IntegrationFlows定义网关的选项,请参见 Java DSL 章节中的[IntegrationFlowas gateway](./dsl.html#integration-flow-as-gateway)。

# 服务激活器

服务激活器是用于将任何 Spring 管理的对象连接到输入通道的端点类型,以便它可以扮演服务的角色。如果服务产生输出,它也可以连接到一个输出通道。或者,输出产生服务可以位于处理管道或消息流的末端,在这种情况下,可以使用入站消息的replyChannel头。如果没有定义输出通道,这是默认的行为。与这里描述的大多数配置选项一样,相同的行为实际上也适用于大多数其他组件。

# 配置服务激活器

要创建服务激活器,请使用带有“input-channel”和“ref”属性的“service-activator”元素,如下例所示:

<int:service-activator input-channel="exampleChannel" ref="exampleHandler"/>

前面的配置从exampleHandler中选择满足消息传递要求之一的所有方法,如下所示:

  • @ServiceActivator注释

  • ispublic

  • 如果requiresReply == true不返回void

在运行时调用的目标方法是通过它们的payload类型为每个请求消息选择的,或者作为对Message<?>类型的回退,如果这样的方法存在于目标类中的话。

从版本 5.0 开始,一个服务方法可以用@org.springframework.integration.annotation.Default标记,作为所有不匹配情况的后备。当使用内容类型转换并在转换后调用目标方法时,这可能是有用的。

要委托给任何对象的显式定义的方法,你可以添加method属性,如下例所示:

<int:service-activator input-channel="exampleChannel" ref="somePojo" method="someMethod"/>

在这两种情况下,当服务方法返回一个非空值时,端点尝试将应答消息发送到适当的应答通道。要确定应答通道,首先要检查端点配置中是否提供了output-channel,如下例所示:

<int:service-activator input-channel="exampleChannel" output-channel="replyChannel"
                       ref="somePojo" method="someMethod"/>

如果方法返回一个结果,并且没有output-channel被定义,那么框架将检查请求消息的replyChannel标头值。如果该值是可用的,那么它将检查其类型。如果是MessageChannel,则将回复消息发送到该通道。如果是String,则端点尝试将通道名称解析为通道实例。如果通道不能解析,则抛出一个DestinationResolutionException。如果它能被解决,消息就会被发送到那里。如果请求消息没有replyChannel头,并且reply对象是Message,则其replyChannel头将用于查询目标目的地。这是 Spring 集成中用于请求-回复消息传递的技术,也是返回地址模式的示例。

如果你的方法返回一个结果,并且希望丢弃它并结束流,那么你应该配置output-channel以发送到NullChannel。为了方便起见,该框架注册了一个名为nullChannel的框架。有关更多信息,请参见特殊频道

服务激活器是不需要生成回复消息的组件之一。如果你的方法返回null或具有void返回类型,则服务激活器在方法调用后退出,不带任何信号。这种行为可以通过AbstractReplyProducingMessageHandler.requiresReply选项进行控制,当使用 XML 名称空间进行配置时,该选项也会以requires-reply的形式公开。如果将标记设置为true,并且方法返回 null,则抛出一个ReplyRequiredException

服务方法中的参数可以是消息,也可以是任意类型。如果是后者,则假定它是一个消息负载,该负载从消息中提取并注入到服务方法中。我们通常推荐这种方法,因为在使用 Spring 集成时,它遵循并促进了 POJO 模型。参数也可以有@Header@Headers注释,如注释支持中所述。

服务方法不需要有任何参数,这意味着你可以实现事件风格的服务激活器(在这里你所关心的只是对服务方法的调用)不要担心消息的内容。
将其视为空的 JMS 消息。
这种实现的一个示例用例是一个简单的计数器或消息监视器,它存储在输入通道上。

从版本 4.1 开始,该框架将消息属性(payloadheaders)正确地转换为 Java8OptionalPOJO 方法参数,如下例所示:

public class MyBean {
    public String computeValue(Optional<String> payload,
               @Header(value="foo", required=false) String foo1,
               @Header(value="foo") Optional<String> foo2) {
        if (payload.isPresent()) {
            String value = payload.get();
            ...
        }
        else {
           ...
       }
    }

}

如果自定义服务激活器处理程序实现可以在其他<service-activator>定义中重用,我们通常建议使用ref属性。然而,如果自定义服务激活器处理程序实现仅在<service-activator>的单个定义内使用,则可以提供内部 Bean 定义,如以下示例所示:

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="someMethod">
    <beans:bean class="org.something.ExampleServiceActivator"/>
</int:service-activator>
不允许在同一个<service-activator>配置中同时使用ref属性和内部处理程序定义,因为它会创建一个模棱两可的条件并导致抛出异常。
如果ref属性引用扩展AbstractMessageProducingHandler的 Bean(例如框架本身提供的处理程序),则通过将输出通道直接注入处理程序来优化配置。,在这种情况下,
,每个ref必须是单独的 Bean 实例(或prototype-作用域 Bean)或使用内部的<bean/>配置类型。
如果无意中从多个 bean 引用了相同的消息处理程序,则会出现配置异常。
# 服务激活器和 Spring 表达式语言

Spring 集成 2.0 以来,服务激活器还可以受益于SpEL (opens new window)

例如,可以调用任何 Bean 方法,而不指向ref属性中的 Bean,或者将其作为内部 Bean 定义包括在内,如下所示:

<int:service-activator input-channel="in" output-channel="out"
	expression="@accountService.processAccount(payload, headers.accountId)"/>

	<bean id="accountService" class="thing1.thing2.Account"/>

在前面的配置中,我们使用 spel 的@beanId表示法,并调用一个与消息有效负载兼容的类型的方法,而不是通过使用ref或作为内部 Bean 注入“AccountService”。我们还传递一个标头值。可以针对消息中的任何内容计算任何有效的 SPEL 表达式。对于简单的场景,如果所有逻辑都可以封装在这样的表达式中,那么你的服务激活器就不需要引用 A Bean,如下例所示:

<int:service-activator input-channel="in" output-channel="out" expression="payload * 2"/>

在前面的配置中,我们的服务逻辑是将有效载荷值乘以 2。SPEL 让我们相对容易地处理它。

有关配置服务激活器的更多信息,请参见 Java DSL 章节中的[服务激活器和.handle()方法](./dsl.html#java-dsl-handle)。

# 异步服务激活器

服务激活器由调用线程调用。如果输入通道是SubscribableChannelPollableChannel的 poller 线程,则这是一个上游线程。如果服务返回一个ListenableFuture<?>,那么默认的操作是将其作为消息的有效负载发送到输出(或回复)通道。从版本 4.3 开始,你现在可以将async属性设置为true(在使用 Java 配置时使用Object)。如果当async属性被设置为true时,服务返回一个ListenableFuture<?>,则调用线程将立即被释放,并在完成将来的线程(从你的服务中)上发送答复消息。这对于使用PollableChannel的长时间运行的服务特别有利,因为 Poller 线程被释放以在框架内执行其他服务。

如果服务使用Exception完成 future,则发生正常的错误处理。如果存在ErrorMessage消息头,则将其发送到errorChannel消息头。否则,将把ErrorMessage发送到默认的errorChannel(如果可用)。

# 服务激活器和方法返回类型

服务方法可以返回成为回复消息有效负载的任何类型。在这种情况下,将创建一个新的Message<?>对象,并复制来自请求消息的所有标题。当交互是基于 POJO 方法调用时,对于大多数 Spring 集成MessageHandler实现,这以相同的方式工作。

也可以从该方法返回一个完整的Message<?>对象。但是请记住,与变形金刚不同的是,对于服务激活器,如果返回的消息中不存在标题,则将通过从请求消息中复制标题来修改此消息。因此,如果你的方法参数是Message<?>,并且你在服务方法中复制了一些(但不是全部)现有的头,那么它们将在回复消息中重新出现。从回复消息中删除头不是服务激活器的责任,并且,遵循松耦合原则,最好在集成流中添加HeaderFilter。或者,可以使用 Transformer 代替服务激活器,但是,在这种情况下,当返回完整的Message<?>时,该方法完全负责消息,包括复制请求消息头(如果需要)。你必须确保重要的框架标题(例如replyChannelerrorChannel)如果存在,就必须保留。

# 延迟器

延迟器是一个简单的端点,它允许消息流延迟一定的时间间隔。当消息延迟时,原始发件人不会阻塞。相反,延迟消息被调度为一个org.springframework.scheduling.TaskScheduler的实例,以便在延迟通过后将其发送到输出通道。这种方法即使对于相当长的延迟也是可伸缩的,因为它不会导致大量的发件人线程被阻塞。相反,在典型的情况下,线程池用于实际执行消息的释放。本节包含几个配置延迟器的示例。

# 配置延迟器

<delayer>元素用于延迟两个消息通道之间的消息流。与其他端点一样,你可以提供“输入通道”和“输出通道”属性,但延迟器也具有“默认延迟”和“表达式”属性(以及“表达式”元素),它们决定每条消息应该延迟的毫秒数。以下示例将所有消息延迟 3 秒钟:

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

如果需要确定每条消息的延迟,还可以使用“expression”属性提供 SPEL 表达式,如下所示:

Java DSL

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from("input")
            .delay("delayer.messageGroupId", d -> d
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}

Kotlin DSL

@Bean
fun flow() =
    integrationFlow("input") {
        delay("delayer.messageGroupId") {
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }

Java

@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}

XML

<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

在前面的示例中,只有当表达式对给定的入站消息计算为 null 时,才会出现三秒的延迟。如果只想对表达式求值结果有效的消息应用延迟,可以使用0(默认值)的“默认延迟”。对于延迟0(或更短)的任何消息,该消息将立即在调用线程上发送。

XML 解析器使用消息组 ID<beanName>.messageGroupId
延迟处理程序支持表示以毫秒为单位的间隔的表达式求值结果(其Object方法产生的值可以解析为Long)以及表示绝对时间的java.util.Date实例。,在第一种情况下,
,毫秒是从当前时间开始计算的(例如,
的值5000将使消息从延迟者接收到的时间起至少延迟 5 秒)。
使用Date实例,直到Date对象表示的时间,消息才会被释放。,
等于非正延迟或过去日期的值不会导致延迟。相反,
,它被直接发送到原始发送者线程上的输出通道。
如果表达式求值结果不是Date且不能解析为Long,则应用默认延迟(如果有的话—默认为0)。
表达式求值可能由于各种原因抛出求值异常,包括无效的表达式或其他条件。
默认情况下,这样的异常将被忽略(尽管在调试级别上进行了记录),并且延迟器将回落到默认的延迟(如果有)。
你可以通过设置ignore-expression-failures属性来修改此行为。
默认情况下,此属性被设置为true,延迟器行为与前面描述的一样。
但是,如果你不希望忽略表达式求值异常并将它们抛给延迟者的调用者,请将ignore-expression-failures属性设置为false
在前面的例子中,将延迟表达式指定为headers['delay']
这是 SPELIndexer语法,用于访问Map元素(MessageHeaders实现
)。
它调用:headers.get("delay")。对于不包含’的简单映射元素名称(
),也可以使用 SPEL“dot accessor”语法,如果前面显示的头表达式可以指定为headers.delay
但是,如果缺少头表达式,则会获得不同的结果。
在第一种情况下,表达式的计算结果为null
第二种结果类似于以下结果:
<br/> org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):<br/> Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'<br/>
因此,
如果存在省略标题的可能性,并且你希望返回到默认的延迟,那么使用 Indexer 语法而不是 Dot Property Accessor 语法通常会更有效(建议使用),因为检测空比捕获异常更快。

延迟器将委托给 Spring 的TaskScheduler抽象的实例。延迟器使用的默认调度程序是由 Spring Integration 在启动时提供的ThreadPoolTaskScheduler实例。见配置任务调度程序。如果你想委托给一个不同的计划程序,你可以通过 delayer 元素的’scheduler’属性提供一个引用,如下例所示:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
如果你配置了一个外部ThreadPoolTaskScheduler,则可以在此属性上设置waitForTasksToCompleteOnShutdown = true。,
它允许在应用程序关机时成功完成已经处于执行状态的“延迟”任务(释放消息),
在 Spring 集成 2.2 之前,此属性在<delayer>元素上可用,因为ThreadPoolTaskScheduler可以在后台创建自己的调度程序。
自 2.2 以来,延迟器需要一个外部调度程序实例,并且waitForTasksToCompleteOnShutdown已被删除。
你应该使用调度程序自己的配置。
ThreadPoolTaskScheduler具有一个属性Map,该属性可以与org.springframework.util.ErrorHandler的一些实现一起注入。
该处理程序允许从发送延迟消息的调度任务的线程处理Exception,默认情况下,它使用org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler,你可以在日志中看到堆栈跟踪,
你可能需要考虑使用org.springframework.integration.channel.MessagePublishingErrorHandler,它将ErrorMessage发送到error-channel,要么从失败消息的消息头,要么进入默认的error-channel
此错误处理是在事务回滚(如果存在)后执行的。
参见发布失败

# 延迟器和消息存储

DelayHandler将延迟消息保存到提供的MessageStore中的消息组中。(“groupid”基于<delayer>元素所需的“id”属性。)在DelayHandler将消息发送到output-channel之前,调度任务将从MessageStore中删除一条延迟消息。如果提供的MessageStore是持久性的(例如JdbcMessageStore),则提供了在应用程序关闭时不丢失消息的能力。在应用程序启动之后,ErrorMessage从其消息组中的表达式评估建议中读取消息,并根据消息的原始到达时间(如果延迟是数值的话)重新安排它们的延迟。对于延迟报头为Date的消息,在重新调度时使用Date。如果延迟消息在TaskScheduler中的停留时间超过其“延迟”,则在启动后立即发送该消息。

<delayer>可以用两个相互排斥的元素中的任意一个来丰富:<transactional><advice-chain>。这些 AOP 通知中的List应用于代理的内部DelayHandler.ReleaseMessageHandler,该代理负责在延迟之后在调度任务的Thread上释放消息。例如,当下游消息流抛出一个异常并且回滚ReleaseMessageHandler的事务时,可以使用它。在这种情况下,延迟消息保持在持久的MessageStore中。你可以在<advice-chain>中使用任何自定义的org.aopalliance.aop.Advice实现。<transactional>元素定义了一个简单的建议链,该建议链仅包含事务性建议。下面的示例显示了advice-chain中的<delayer>:

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

DelayHandler可以导出为带有托管操作(MBeanreschedulePersistedMessages)的 JMXMBean,这允许在运行时重新安排延迟的持久消息——例如,如果TaskScheduler先前已被停止。可以通过Control Bus命令调用这些操作,如下例所示:

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("@'delayer.handler'.reschedulePersistedMessages()").build();
controlBusChannel.send(delayerReschedulingMessage);
有关消息存储、JMX 和控制总线的更多信息,请参见MessageStore

从版本 5.3.7 开始,如果将消息存储到MessageStore时事务处于活动状态,则在TransactionSynchronization.afterCommit()回调中调度发布任务。这对于防止竞争情况是必要的,在这种情况下,调度的发布可以在事务提交之前运行,并且找不到消息。在这种情况下,消息将在延迟之后发布,或者在事务提交之后发布,以较晚者为准。

# 发行失败

从版本 5.0.8 开始,DeLayer 上有两个新属性:

  • maxAttempts(默认 5)

  • (默认 1 秒)

当消息被释放时,如果下游流失败,将在retryDelay之后尝试释放。如果达到maxAttempts,则该消息将被丢弃(除非该发布是事务性的,在这种情况下,该消息将保留在存储区中,但将不再计划发布,直到重新启动应用程序,或者调用reschedulePersistedMessages()方法,如上所述)。

此外,你还可以配置delayedMessageErrorChannel;当发布失败时,一个ErrorMessage被发送到该通道,异常作为有效负载,并具有originalMessage属性。ErrorMessage包含一个包含当前计数的头IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT

如果错误流使用错误消息并正常退出,则不会采取进一步的操作;如果发布是事务性的,则事务将提交,消息将从存储中删除。如果错误流抛出一个异常,则释放将被重试到maxAttempts,正如上面讨论的那样。

# 脚本支持

Spring Integration2.1 增加了对面向 Java 规范的 JSR223 脚本 (opens new window)的支持,在 Java 版本 6 中引入。它允许你使用以任何受支持的语言(包括 Ruby、JRuby、Groovy 和 Kotlin)编写的脚本来为各种集成组件提供逻辑,类似于 Spring 表达式语言在 Spring 集成中的使用方式。有关 JSR223 的更多信息,请参见文件 (opens new window)

从 Java11 开始,Nashorn JavaScript 引擎已被弃用,可能会在 Java15 中删除。
建议从现在开始重新考虑使用其他脚本语言。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-scripting</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-scripting:5.5.9"

此外,还需要添加一个脚本引擎实现,例如 JRuby、Jython。

从版本 5.2 开始, Spring 集成提供了 Kotlin JSR223 支持。你需要将这些依赖项添加到项目中以使其正常工作:

Maven

<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-script-util</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-compiler-embeddable</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.jetbrains.kotlin</groupId>
    <artifactId>kotlin-scripting-compiler-embeddable</artifactId>
    <scope>runtime</scope>
</dependency>

Gradle

runtime 'org.jetbrains.kotlin:kotlin-script-util'
runtime 'org.jetbrains.kotlin:kotlin-compiler-embeddable'
runtime 'org.jetbrains.kotlin:kotlin-scripting-compiler-embeddable'

所提供的kotlin所选择的kotlin语言指示器或脚本文件带有.kts扩展名。

为了使用 JVM 脚本语言,必须在类路径中包含该语言的 JSR223 实现。Groovy (opens new window)JRuby (opens new window)项目在其标准发行版中提供了 JSR233 支持。

第三方开发了各种 JSR223 语言实现。
特定实现与 Spring 集成的兼容性取决于它与规范的一致性以及实现方对规范的解释。
如果你计划使用 Groovy 作为脚本语言,我们建议你使用Spring-Integration’s Groovy Support,因为它提供了 Groovy 特有的其他功能。
但是,这一部分也是相关的。

# 脚本配置

根据集成需求的复杂性,脚本可以作为 XML 配置中的 CDATA 内联提供,也可以作为对包含该脚本的 Spring 资源的引用提供。 Spring 为了启用脚本支持,集成定义了一个headers,它将消息有效负载绑定到一个名为payload的变量,并将消息头绑定到一个headers变量,这两个变量都可以在脚本执行上下文中访问。你所需要做的就是编写一个使用这些变量的脚本。以下两个示例展示了创建筛选器的示例配置:

Java DSL

@Bean
public IntegrationFlow scriptFilter() {
    return f -> f.filter(Scripts.processor("some/path/to/ruby/script/RubyFilterTests.rb"));
}
...
@Bean
public Resource scriptResource() {
	return new ByteArrayResource("headers.type == 'good'".getBytes());
}

@Bean
public IntegrationFlow scriptFilter() {
	return f -> f.filter(Scripts.processor(scriptResource()).lang("groovy"));
}

XML

<int:filter input-channel="referencedScriptInput">
   <int-script:script location="some/path/to/ruby/script/RubyFilterTests.rb"/>
</int:filter>

<int:filter input-channel="inlineScriptInput">
     <int-script:script lang="groovy">
     <![CDATA[
     return payload == 'good'
   ]]>
  </int-script:script>
</int:filter>

如前面的示例所示,该脚本可以内联地包含,也可以通过引用资源位置(通过使用location属性)来包含。此外,lang属性对应于语言名称(或其 JSR223 别名)。

Spring 其他支持脚本的集成端点元素包括routerservice-activatorLifecyclesplitter。在每种情况下,脚本配置都将与上述相同(除了 Endpoint 元素)。

脚本支持的另一个有用的特性是无需重新启动应用程序上下文就可以更新(重新加载)脚本。要这样做,请在script元素上指定refresh-check-delay属性,如下例所示:

Java DSL

Scripts.processor(...).refreshCheckDelay(5000)
}

XML

<int-script:script location="..." refresh-check-delay="5000"/>

在前面的示例中,每 5 秒检查一次脚本位置的更新。如果脚本已更新,则自更新后 5 秒内发生的任何调用都会导致新脚本的运行。

考虑以下示例:

Java DSL

Scripts.processor(...).refreshCheckDelay(0)
}

XML

<int-script:script location="..." refresh-check-delay="0"/>

在前面的示例中,一旦发生这样的修改,就会用任何脚本修改来更新上下文,从而为“实时”配置提供了一种简单的机制。任何负值都意味着在初始化应用程序上下文后不会重新加载脚本。这是默认的行为。下面的示例显示了一个永远不会更新的脚本:

Java DSL

Scripts.processor(...).refreshCheckDelay(-1)
}

XML

<int-script:script location="..." refresh-check-delay="-1"/>
无法重新加载内联脚本。
# 脚本变量绑定

需要变量绑定来使脚本能够引用外部提供给脚本执行上下文的变量。默认情况下,headersheaders用作绑定变量。可以使用<variable>元素(或ScriptSpec.variables()选项)将其他变量绑定到脚本,如下例所示:

Java DSL

Scripts.processor("foo/bar/MyScript.py")
    .variables(Map.of("var1", "thing1", "var2", "thing2", "date", date))
}

XML

<script:script lang="py" location="foo/bar/MyScript.py">
    <script:variable name="var1" value="thing1"/>
    <script:variable name="var2" value="thing2"/>
    <script:variable name="date" ref="date"/>
</script:script>

如前面的示例所示,可以将脚本变量绑定到标量值或 Spring Bean 引用。注意,payloadheaders仍然作为绑定变量包括在内。

Spring Integration3.0 中,除了variable元素外,还引入了variables属性。此属性和variable元素并不互斥,你可以将它们合并到一个script组件中。然而,变量必须是唯一的,无论它们在哪里定义。此外,自 Spring Integration3.0 以来,内联脚本也允许变量绑定,如下例所示:

<service-activator input-channel="input">
    <script:script lang="ruby" variables="thing1=THING1, date-ref=dateBean">
        <script:variable name="thing2" ref="thing2Bean"/>
        <script:variable name="thing3" value="thing2"/>
        <![CDATA[
            payload.foo = thing1
            payload.date = date
            payload.bar = thing2
            payload.baz = thing3
            payload
        ]]>
    </script:script>
</service-activator>

前面的示例显示了一个内联脚本、一个variable元素和一个variables属性的组合。variables属性包含一个逗号分隔的值,其中每个段包含变量及其值的一个’=’分隔对。变量名可以后缀为-ref,就像前面示例中的date-ref变量一样。这意味着绑定变量的名称为date,但该值是对应用程序上下文中dateBean Bean 的引用。这在使用属性占位符配置或命令行参数时可能很有用。

如果你需要更多地控制变量的生成方式,那么你可以实现自己的 Java 类,它使用ScriptVariableGenerator策略,该策略由以下接口定义:

public interface ScriptVariableGenerator {

    Map<String, Object> generateScriptVariables(Message<?> message);

}

这个接口要求你实现generateScriptVariables(Message)方法。消息参数允许你访问消息有效负载和消息头中的任何可用数据,返回值是绑定变量的Map。每当执行消息的脚本时,都会调用此方法。下面的示例展示了如何提供ScriptVariableGenerator的实现,并使用script-variable-generator属性对其进行引用:

Java DSL

Scripts.processor("foo/bar/MyScript.groovy")
    .variableGenerator(new foo.bar.MyScriptVariableGenerator())
}

XML

<int-script:script location="foo/bar/MyScript.groovy"
        script-variable-generator="variableGenerator"/>

<bean id="variableGenerator" class="foo.bar.MyScriptVariableGenerator"/>

如果不提供script-variable-generator,则脚本组件使用DefaultScriptVariableGenerator,它将所提供的任何<variable>元素与payloadheaders中的Message变量合并到其generateScriptVariables(Message)方法中。

不能同时提供script-variable-generator属性和<variable>元素。
它们是互斥的。

# Groovy 支持

在 Spring Integration2.0 中,我们添加了 Groovy 支持,允许你使用 Groovy 脚本语言为各种集成组件提供逻辑——类似于 Spring Expression 语言在路由、转换和其他集成问题中所支持的方式。有关 Groovy 的更多信息,请参见 Groovy 文档,你可以在项目网站 (opens new window)上找到该文档。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-groovy</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-groovy:5.5.9"

# Groovy 配置

在 Spring Integration2.1 中,Groovy 支持的配置名称空间是 Spring Integration 的脚本支持的扩展,并共享脚本支持部分中详细描述的核心配置和行为。尽管 Groovy 脚本很好地得到了通用脚本支持,但 Groovy 支持提供了Groovy配置名称空间,它由 Spring 框架的org.springframework.scripting.groovy.GroovyScriptFactory和相关组件支持,为使用 Groovy 提供了扩展功能。下面的清单显示了两个示例配置:

例1.过滤器

<int:filter input-channel="referencedScriptInput">
   <int-groovy:script location="some/path/to/groovy/file/GroovyFilterTests.groovy"/>
</int:filter>

<int:filter input-channel="inlineScriptInput">
     <int-groovy:script><![CDATA[
     return payload == 'good'
   ]]></int-groovy:script>
</int:filter>

正如前面的示例所示,该配置看起来与常规脚本支持配置相同。唯一的区别是使用 Groovy 命名空间,如int-groovy命名空间前缀所示。还要注意,<script>标记上的lang属性在此命名空间中无效。

# Groovy 对象定制

如果需要自定义 Groovy 对象本身(除了设置变量),你可以引用一个 Bean,该 Bean 通过使用customizer属性来实现GroovyObjectCustomizer。例如,如果你希望通过修改MetaClass并在脚本中注册可用的函数来实现特定于域的语言,这可能会很有用。下面的示例展示了如何做到这一点:

<int:service-activator input-channel="groovyChannel">
    <int-groovy:script location="somewhere/SomeScript.groovy" customizer="groovyCustomizer"/>
</int:service-activator>

<beans:bean id="groovyCustomizer" class="org.something.MyGroovyObjectCustomizer"/>

设置自定义GroovyObjectCustomizer并不与<variable>元素或script-variable-generator属性相互排斥。它也可以在定义内联脚本时提供。

Spring Integration3.0 引入了variables属性,该属性与variable元素一起工作。此外,如果没有提供绑定变量的名称,那么 Groovy 脚本也可以在BeanFactory中将变量解析为 Bean。下面的示例展示了如何使用变量(entityManager):

<int-groovy:script>
    <![CDATA[
        entityManager.persist(payload)
        payload
    ]]>
</int-groovy:script>

entityManager在应用程序上下文中必须是适当的 Bean。

有关<variable>元素、variables属性和script-variable-generator属性的更多信息,请参见脚本变量绑定

# Groovy 脚本编译器定制

@CompileStatic提示是最流行的 Groovy 编译器定制选项。它可以在类或方法级别上使用。有关更多信息,请参见 groovy参考手册 (opens new window),特别是@compilestatic (opens new window)。为了将此特性用于短脚本(在集成场景中),我们被迫将简单的脚本更改为更类似于 Java 的代码。考虑以下<filter>脚本:

headers.type == 'good'

在 Spring 集成中,前面的脚本变成了下面的方法:

@groovy.transform.CompileStatic
String filter(Map headers) {
	headers.type == 'good'
}

filter(headers)

这样,filter()方法就被转换并编译为静态 Java 代码,绕过了 Groovy 动态调用阶段,例如getProperty()工厂和filter()代理。

从版本 4.3 开始,你可以使用compile-static``boolean选项配置 Spring Integration Groovy 组件,指定headersfor@CompileStatic应该添加到内部CompilerConfiguration。有了这一点,你就可以在我们的脚本代码中省略带有filter()的方法声明,并且仍然可以获得已编译的普通 Java 代码。在这种情况下,前面的脚本可以很短,但仍然需要比解释的脚本更详细一些,如下例所示:

binding.variables.headers.type == 'good'

你必须通过groovy.lang.Script``binding属性访问headerspayload(或任何其他)变量,因为对于@CompileStatic,我们不具有动态GroovyObject.getProperty()功能。

此外,我们引入了compiler-configuration Bean 引用。使用此属性,你可以提供任何其他所需的 Groovy 编译器自定义,例如ImportCustomizer。有关此功能的更多信息,请参见高级编译器配置 (opens new window)的 Groovy 文档。

使用ASTTransformationCustomizer不会自动为@CompileStatic注释添加ASTTransformationCustomizer,并且会覆盖compileStatic选项。
如果仍然需要CompileStatic,则应该手动将new ASTTransformationCustomizer(CompileStatic.class)添加到该自定义CompilationCustomizers中。
Groovy 编译器定制对refresh-check-delay选项没有任何影响,并且可重新加载的脚本也可以静态编译。

# 控制总线

如(Enterprise 整合模式 (opens new window))中所述,控制总线背后的思想是,你可以使用相同的消息传递系统来监视和管理框架内的组件,就像用于“应用程序级”消息传递一样。在 Spring 集成中,我们构建了前面描述的适配器,以便你可以发送消息作为调用公开操作的一种方式。这些操作的一种选择是 Groovy 脚本。以下示例为控制总线配置了一个 Groovy 脚本:

<int-groovy:control-bus input-channel="operationChannel"/>

控制总线有一个输入通道,可以访问该通道来调用应用程序上下文中的 bean 上的操作。

Groovy 控制总线以 Groovy 脚本的形式在输入通道上运行消息。它接收一条消息,将主体编译为脚本,使用GroovyObjectCustomizer对其进行自定义,并运行它。控制总线’@ManagedResource公开了应用程序上下文中用@ManagedResource注释的所有 bean,并实现了 Spring 的Lifecycle接口或扩展了 Spring 的CustomizableThreadCreator基类(例如,几个TaskExecutorTaskScheduler实现)。

在控制总线命令脚本中使用带有自定义作用域(例如“请求”)的托管 bean 时要小心,尤其是在异步消息流中,
如果控制总线的MessageProcessor不能从应用程序上下文中公开 Bean,在命令脚本运行期间,你可能会遇到一些BeansException
例如,如果未建立自定义作用域的上下文,则在该作用域内获取 Bean 的尝试会触发BeanCreationException

如果需要进一步定制 Groovy 对象,还可以提供对 Bean 的引用,该 Bean 通过customizer属性实现Lifecycle,如下例所示:

<int-groovy:control-bus input-channel="input"
        output-channel="output"
        customizer="groovyCustomizer"/>

<beans:bean id="groovyCustomizer" class="org.foo.MyGroovyObjectCustomizer"/>

# 向端点添加行为

在 Spring Integration2.2 之前,可以通过向 Poller 的<advice-chain/>元素添加 AOP 建议,将行为添加到整个集成流中。然而,假设你想要重试,比如说,只尝试一个 REST Web 服务调用,而不是任何下游端点。

例如,考虑以下流程:

inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter

如果你将一些重试逻辑配置到 Poller 上的一个建议链中,并且由于网络故障,对http-gateway2的调用失败,则重试将导致http-gateway1http-gateway2都被第二次调用。类似地,在 JDBC-出站-适配器中发生瞬时故障后,在再次调用jdbc-outbound-adapter之前,将再次调用这两个 HTTP 网关。

Spring 集成 2.2 增加了向单个端点添加行为的能力。这是通过向许多端点添加<request-handler-advice-chain/>元素来实现的。下面的示例展示了如何在outbound-gateway中处理<request-handler-advice-chain/>元素:

<int-http:outbound-gateway id="withAdvice"
    url-expression="'http://localhost/test1'"
    request-channel="requests"
    reply-channel="nextChannel">
    <int-http:request-handler-advice-chain>
        <ref bean="myRetryAdvice" />
    </int-http:request-handler-advice-chain>
</int-http:outbound-gateway>

在这种情况下,myRetryAdvice仅在本地应用于此网关,并且不应用于在向nextChannel发送回复后在下游采取的进一步操作。建议的范围仅限于端点本身。

此时,你不能通知整个<chain/>的端点。
模式不允许<request-handler-advice-chain>作为链本身的子元素。

但是,可以将<request-handler-advice-chain>添加到<chain>元素中的各个产生回复的端点。
是一个例外,在不产生回复的链中,因为链中的最后一个元素是script-variable-generator,所以不能通知最后一个元素。
如果需要通知这样的元素,它必须被移动到链外(链的output-channel是适配器的input-channel)。
然后适配器可以像往常一样被通知。
对于产生应答的链,可以通知每个子元素。

# 提供的咨询课程

除了提供应用 AOP 建议类的通用机制外, Spring 集成还提供了这些开箱即用的建议实现方式:

# 重试建议

重试建议(o.s.i.handler.advice.RequestHandlerRetryAdvice)利用了Spring Retry (opens new window)项目提供的丰富的重试机制。spring-retry的核心组件是RetryTemplate,它允许配置复杂的重试场景,包括RetryPolicyBackoffPolicy策略(具有许多实现)以及RecoveryCallback策略,以确定重试用完时要采取的操作。

无状态重试

无状态重试是指重试活动完全在建议中处理的情况。线程暂停(如果配置为这样做的话)并重试操作。

有状态重试

有状态重试是在通知中管理重试状态,但引发异常并且调用者重新提交请求的情况。有状态重试的一个例子是,当我们希望消息发起者(例如,JMS)负责重新提交时,而不是在当前线程上执行它。有状态重试需要某种机制来检测重试提交。

有关spring-retry的更多信息,请参见该项目的 Javadoc (opens new window)Spring Batch (opens new window)的参考文档,其中spring-retry起源于此。

默认的 Back Off 行为是不后退。
会立即尝试重新尝试。
使用导致线程在尝试之间暂停的 Back Off 策略可能会导致性能问题,包括过多的内存使用和线程饥饿。
在大容量环境中,应谨慎使用 Back Off 策略。
# 配置重试建议

本节中的示例使用以下<service-activator>,它总是抛出一个异常:

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}

简单无状态重试

默认的缓存建议有一个SimpleRetryPolicy,它尝试了三次。没有BackOffPolicy,所以这三次尝试是背对背对背进行的,两次尝试之间没有延迟。没有RecoveryCallback,所以结果是在最后一次失败的重试发生后向调用方抛出异常。在 Spring 集成环境中,可以通过在入站端点上使用error-channel来处理最后的异常。下面的示例使用RetryTemplate并显示其DEBUG输出:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3

带恢复的简单无状态重试

下面的示例将RecoveryCallback添加到前面的示例中,并使用ErrorMessageSendingRecoverer向通道发送ErrorMessage:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]

使用定制策略的无状态重试和恢复

为了更复杂,我们可以使用定制的RetryTemplate提供建议。这个示例继续使用SimpleRetryPolicy,但将尝试次数增加到四次。它还添加了RetryStateGenerator,其中第一次重试等待一秒,第二次等待五秒,第三次等待 25 次(总共尝试了四次)。下面的清单显示了示例及其DEBUG输出:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]

命名空间对无状态重试的支持

从版本 4.0 开始,前面的配置可以大大简化,这要归功于对 Retry 建议的命名空间支持,如下例所示:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

在前面的示例中,该建议被定义为顶级 Bean,以便它可以在多个request-handler-advice-chain实例中使用。你还可以在链中直接定义建议,如下例所示:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

一个<handler-retry-advice>可以有一个<fixed-back-off><exponential-back-off>子元素,或者没有子元素。不带子元素的<handler-retry-advice>不使用 back off。如果没有recovery-channel,则当重试用完时将抛出异常。名称空间只能与无状态重试一起使用。

对于更复杂的环境(自定义策略等),使用普通的<bean>定义。

带恢复的简单状态重试

为了使重试有状态,我们需要提供一个RetryStateGenerator实现的建议。该类用于将消息标识为重新提交,以便RetryTemplate可以确定此消息的当前重试状态。该框架提供了SpelExpressionRetryStateGenerator,它通过使用 SPEL 表达式来确定消息标识符。本例再次使用默认策略(三次尝试,没有后退)。与无状态重试一样,这些策略可以自定义。下面的清单显示了示例及其DEBUG输出:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

如果你将前面的示例与无状态的示例进行比较,你可以看到,通过有状态重试,在每次失败时都会向调用方抛出异常。

重试的异常分类

Spring 重试在确定哪些异常可以调用重试方面具有很大的灵活性。默认配置会对所有异常进行重试,异常分类器会查看顶层异常。如果你将它配置为只在MyException上重试,并且你的应用程序抛出一个SomeOtherException,其中原因是MyException,则不会发生重试。

由于 Spring 重试 1.0.3,BinaryExceptionClassifier具有一个名为traverseCauses的属性(默认为false)。当true时,它将遍历异常原因,直到找到匹配的原因或要遍历的原因用完为止。

要使用此分类器进行重试,请使用一个SimpleRetryPolicy,该构造函数创建了最大尝试次数,Map对象的Map,以及traverseCauses布尔。然后可以将此策略注入RetryTemplate

在这种情况下,traverseCauses是必需的,因为用户异常可能被包装在MessagingException中。
# http-gateway2断路器建议

断路器模式的总体思想是,如果服务当前不可用,请不要浪费时间(和资源)来尝试使用它。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice实现了这个模式。当断路器处于关闭状态时,端点尝试调用服务。如果一定数量的连续尝试失败,则断路器将进入打开状态。当它处于打开状态时,新的请求会“快速失败”,并且在一段时间过期之前不会尝试调用该服务。

当该时间已过时,断路器被设置为半开状态。当处于这种状态时,即使是一次尝试失败,断路器也会立即进入打开状态。如果尝试成功,断路器将进入关闭状态,在这种情况下,直到再次发生配置的连续失败次数,断路器才会再次进入打开状态。任何成功的尝试都会将状态重置为零失败,目的是确定断路器何时可能再次进入打开状态。

通常,此建议可用于外部服务,在这种情况下,可能需要一些时间才能失败(例如试图建立网络连接的超时)。

RequestHandlerCircuitBreakerAdvice有两个性质:thresholdhalfOpenAfterthreshold属性表示断路器打开之前需要发生的连续故障的数量。它默认为5halfOpenAfter属性表示在尝试另一个请求之前,断路器在上次失败之后等待的时间。默认值为 1000 毫秒。

下面的示例配置一个断路器,并显示其DEBUGERROR输出:

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

在前面的示例中,阈值设置为2,而halfOpenAfter设置为12秒。每隔 5 秒就会收到一个新的请求。前两次尝试调用该服务。第三个和第四个都失败了,但有一个例外,表明断路器是打开的。尝试了第五个请求,因为该请求是在上次失败后 15 秒发出的。第六次尝试立即失败,因为断路器立即打开。

# 表达式评估建议

最后提供的建议类是o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice。这个建议比另外两个建议更笼统。它提供了一种机制来计算发送到端点的原始入站消息上的表达式。在成功或失败之后,可以对单独的表达式进行求值。可选地,包含计算结果的消息,连同输入消息,可以被发送到消息通道。

此建议的一个典型用例可能是使用<ftp:outbound-channel-adapter/>,如果传输成功,可以将文件移动到一个目录,如果传输失败,可以将文件移动到另一个目录:

该建议具有在成功时设置表达式、失败表达式以及相应通道的属性。对于成功的情况,发送到successChannel的消息是AdviceMessage,有效负载是表达式求值的结果。另一个名为inputMessage的属性包含发送给处理程序的原始消息。发送到failureChannel(当处理程序抛出异常时)的消息是ErrorMessage,其有效负载为MessageHandlingExpressionEvaluatingAdviceException。与所有MessagingException实例一样,此有效负载具有failedMessagecause属性,以及一个名为evaluationResult的附加属性,该属性包含表达式求值的结果。

从版本 5.1.3 开始,如果配置了通道,但没有提供表达式,则使用默认表达式对消息的payload进行求值。

当在通知的作用域中抛出异常时,默认情况下,在计算任何failureExpression后,将该异常抛给调用方。如果希望禁止抛出异常,请将trapException属性设置为true。下面的建议展示了如何使用 Java DSL 配置建议:

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.handle((GenericHandler<String>) (payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}
# 速率限制器建议

速率限制器建议(RateLimiterRequestHandlerAdvice)允许确保端点不会被请求过载。当速率限制被违反时,请求将处于阻塞状态。

此建议的一个典型用例可能是外部服务提供者,它不允许每分钟的请求数超过n

RateLimiterRequestHandlerAdvice实现完全基于弹性 4J (opens new window)项目,并且需要RateLimiterRateLimiterConfig注入。也可以使用默认值和/或自定义名称进行配置。

下面的示例以每 1 秒一个请求来配置速率限制通知:

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}
# 缓存建议

从版本 5.2 开始,引入了CacheRequestHandlerAdvice。它基于Spring Framework (opens new window)中的缓存抽象,并与@Caching注释族提供的概念和功能保持一致。内部逻辑基于CacheAspectSupport扩展,其中缓存操作的代理是围绕AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage方法进行的,并以请求Message<?>作为参数。可以将此建议配置为 SPEL 表达式或Function,以计算缓存键。请求Message<?>作为 SPEL 求值上下文的根对象可用,或者作为Function输入参数可用。默认情况下,请求消息的payload用于缓存键。当默认缓存操作是CacheableOperation时,或使用任意CacheOperations 的集合时,CacheRequestHandlerAdvice必须配置cacheNames,每个CacheOperation都可以单独配置或具有共享选项,例如CacheManagerCacheResolverCacheErrorHandler,可以从CacheRequestHandlerAdvice配置中重用。这种配置功能类似于 Spring Framework 的@CacheConfig@Caching注释组合。如果不提供CacheManager,则默认情况下从BeanFactory中的CacheAspectSupport解析单个 Bean。

下面的示例使用不同的缓存操作集配置两个建议:

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}

# 反应建议

从版本 5.3 开始,可以使用ReactiveRequestHandlerAdvice来生成Mono回复的请求消息处理程序。对于此通知必须提供一个BiFunction<Message<?>, Mono<?>, Publisher<?>>,并且在由截获的handleRequestMessage()方法实现产生的答复上从Mono.transform()操作符调用它。通常这样的Mono定制是必要的,当我们希望通过timeout()retry()和类似的支持操作来控制网络波动时。例如,当我们可以通过 WebFlux 客户端发送 HTTP 请求时,我们可以使用下面的配置来等待响应的时间不超过 5 秒:

.handle(WebFlux.outboundGateway("https://somehost/"),
                       e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));

message参数是消息处理程序的请求消息,可用于确定请求范围属性。mono参数是此消息处理程序的handleRequestMessage()方法实现的结果。一个嵌套的Mono.transform()也可以从这个函数调用来应用,例如,一个无功断路器 (opens new window)

# 自定义建议类

除了提供的建议类前面描述的,你还可以实现自己的建议类。虽然你可以提供org.aopalliance.aop.Advice(通常org.aopalliance.intercept.MethodInterceptor)的任何实现,但我们通常建议你子类o.s.i.handler.advice.AbstractRequestHandlerAdvice。这样做的好处是避免了编写低级别的面向方面的编程代码,并提供了一个专门为在此环境中使用而定制的起点。

子类需要实现doInvoke()方法,其定义如下:

/**
 * Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
 * invokes the handler method and returns its result, or null).
 * @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
 * @param target The target handler.
 * @param message The message that will be sent to the handler.
 * @return the result after invoking the {@link MessageHandler}.
 * @throws Exception
 */
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;

回调参数是避免直接处理 AOP 的子类的一种方便。调用callback.execute()方法将调用消息处理程序。

为那些需要为特定处理程序维护状态的子类提供target参数,可能是通过在由目标控制的Map中维护该状态。此功能允许将相同的建议应用于多个处理程序。RequestHandlerCircuitBreakerAdvice使用此建议来为每个处理程序保持断路器状态。

message参数是发送给处理程序的消息。虽然通知在调用处理程序之前不能修改消息,但它可以修改有效负载(如果它具有可变属性)。通常,建议将使用该消息进行日志记录,或者在调用处理程序之前或之后将消息的副本发送到某个地方。

返回值通常是callback.execute()返回的值。但是,该建议确实具有修改返回值的功能。注意,只有AbstractReplyProducingMessageHandler实例返回值。下面的示例展示了一个扩展AbstractRequestHandlerAdvice的自定义建议类:

public class MyAdvice extends AbstractRequestHandlerAdvice {

    @Override
    protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
        // add code before the invocation
        Object result = callback.execute();
        // add code after the invocation
        return result;
    }
}
除了execute()方法之外,ExecutionCallback还提供了一个额外的方法:cloneAndExecute()
在执行doInvoke()时,必须在调用可能被多次调用的情况下使用该方法,例如在RequestHandlerRetryAdvice.
中,这是必需的,因为 Spring AOP org.springframework.aop.framework.ReflectiveMethodInvocation对象通过跟踪链中上次调用的建议来保持状态。
每次调用都必须重置此状态。

有关更多信息,请参见反思方法 (opens new window)javadoc。

# 其他建议链元素

虽然上面提到的抽象类很方便,但你可以向链中添加任何Advice,包括事务建议。

# 处理消息通知

正如本节导言中所讨论的,请求处理程序建议链中的通知对象仅应用于当前端点,而不是下游流(如果有的话)。对于产生应答的MessageHandler对象(例如那些扩展AbstractReplyProducingMessageHandler的对象),将通知应用于一个内部方法:handleRequestMessage()(从MessageHandler.handleMessage()调用)。对于其他消息处理程序,该建议应用于MessageHandler.handleMessage()

在某些情况下,即使消息处理程序是AbstractReplyProducingMessageHandler,也必须将通知应用于handleMessage方法。例如,幂等接收机可能返回null,如果处理程序的replyRequired属性设置为true,则会导致异常。另一个例子是BoundRabbitChannelAdvice—参见严格的消息排序

从版本 4.3.1 开始,引入了一个新的HandleMessageAdvice接口及其基本实现(AbstractHandleMessageAdvice)。不论处理程序类型如何,实现Advice的对象总是应用于handleMessage()方法。

重要的是要理解,HandleMessageAdvice实现(例如幂等接收机)在应用到返回响应的处理程序时,与adviceChain分离,并正确地应用于MessageHandler.handleMessage()方法。

由于这种不关联,建议链订单不被遵守。

考虑以下配置:

<some-reply-producing-endpoint ... >
    <int:request-handler-advice-chain>
        <tx:advice ... />
        <ref bean="myHandleMessageAdvice" />
    </int:request-handler-advice-chain>
</some-reply-producing-endpoint>

在前面的示例中,<tx:advice>应用于AbstractReplyProducingMessageHandler.handleRequestMessage()。但是,将myHandleMessageAdvice应用于MessageHandler.handleMessage()。因此,它被调用在此之前<tx:advice>。要保留顺序,你应该遵循标准的Spring AOP (opens new window)配置方法,并使用端点id以及.handler后缀来获得目标MessageHandler Bean。注意,在这种情况下,整个下游流都在事务范围内。

MessageHandler不返回响应的情况下,将保留通知链顺序。

从版本 5.3 开始,存在HandleMessageAdviceAdapter,以使MethodInterceptor适用于MessageHandler.handleMessage()的任何现有MethodInterceptor,因此也适用于整个子流。例如,可以对从某个端点开始的整个子流应用RetryOperationsInterceptor,这在默认情况下是不可能的,因为消费者端点仅对AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()应用建议。从版本 5.3 开始,提供了HandleMessageAdviceAdapter来应用MethodInterceptor方法的任何MethodInterceptor,因此,整个子流都是如此。例如,可以将RetryOperationsInterceptor应用于从某个端点开始的整个子流;默认情况下,这是不可能的,因为消费者端点仅将建议应用于AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()

# 事务支持

从版本 5.0 开始,引入了一个新的TransactionHandleMessageAdvice,以使整个下游流具有事务性,这要归功于HandleMessageAdvice实现。当在<request-handler-advice-chain>元素中使用常规的TransactionInterceptor时(例如,通过配置<tx:advice>),启动的事务仅应用于内部AbstractReplyProducingMessageHandler.handleRequestMessage(),并且不传播到下游流。

为了简化 XML 配置,除了<request-handler-advice-chain>之外,还向所有<outbound-gateway><service-activator>及相关组件添加了一个<transactional>元素。下面的示例显示了<transactional>正在使用中:

<int-rmi:outbound-gateway remote-channel="foo" host="localhost"
    request-channel="good" reply-channel="reply" port="#{@port}">
        <int-rmi:transactional/>
</int-rmi:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

如果你熟悉JPA integration components,那么这样的配置并不是新的,但是现在我们可以从我们流程中的任何一点开始事务——不仅可以从<poller>或者消息驱动的通道适配器(例如JMS)。

可以通过使用TransactionInterceptorBuilder简化 Java 配置,并且可以在消息传递注释adviceChain属性中使用结果 Bean 名称,如下例所示:

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

请注意true构造函数上的TransactionInterceptorBuilder参数。它导致创建一个TransactionHandleMessageAdvice,而不是一个常规的TransactionInterceptor

Java DSL 通过端点配置上的.transactional()选项支持Advice,如下例所示:

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}

# 通知过滤器

当建议Filter建议时,还有一个额外的考虑因素。默认情况下,任何丢弃操作(当筛选器返回false时)都在建议链的作用域内执行。这可能包括丢弃通道下游的所有流量。因此,例如,如果丢弃通道下游的一个元素抛出了一个异常,并且有一个重试建议,那么进程将被重试。同样,如果throwExceptionOnRejection被设置为true(在通知的范围内抛出异常)。

discard-within-advice设置为false将修改此行为,并且在调用建议链之后发生丢弃(或异常)。

# 使用注释通知端点

当通过使用注释(@Filter@ServiceActivator@Splitter@Transformer)配置某些端点时,你可以在adviceChain属性中为建议链提供一个 Bean 名称。此外,@Filter注释还具有discardWithinAdvice属性,该属性可用于配置丢弃行为,如建议过滤器中所讨论的那样。下面的示例将导致在通知之后执行丢弃:

@MessageEndpoint
public class MyAdvisedFilter {

    @Filter(inputChannel="input", outputChannel="output",
            adviceChain="adviceChain", discardWithinAdvice="false")
    public boolean filter(String s) {
        return s.contains("good");
    }
}

# 在建议链中订购建议

建议类是“周围”的建议,并以嵌套的方式应用。第一个建议是最外层的,而最后一个建议是最内层的(即最接近被建议的处理程序)。重要的是将建议类按正确的顺序排列,以实现所需的功能。

例如,假设你想添加一个重试建议和一个事务建议。你可能希望将“重试建议”放在第一位,然后是“事务建议”。因此,每次重试都在一个新事务中执行。另一方面,如果你希望所有尝试和任何恢复操作(在 retryRecoveryCallback中)都在事务范围内,那么你可以将事务建议放在第一位。

# 建议的处理程序属性

有时,从建议中访问处理程序属性是有用的。例如,大多数处理程序实现NamedComponent以允许你访问组件名称。

可以通过target参数(当子类化AbstractRequestHandlerAdvice时)或invocation.getThis()(当实现org.aopalliance.intercept.MethodInterceptor时)访问目标对象。

当对整个处理程序进行建议时(例如,当处理程序不产生答复或该建议实现HandleMessageAdvice时),可以将目标对象强制转换为接口,例如NamedComponent,如以下示例所示:

String componentName = ((NamedComponent) target).getComponentName();

当直接实现MethodInterceptor时,可以按以下方式强制转换目标对象:

String componentName = ((NamedComponent) invocation.getThis()).getComponentName();

当只建议使用handleRequestMessage()方法(在产生答复的处理程序中)时,你需要访问完整的处理程序,即AbstractReplyProducingMessageHandler。下面的示例展示了如何做到这一点:

AbstractReplyProducingMessageHandler handler =
    ((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();

String componentName = handler.getComponentName();

# 幂等接收机 Enterprise 积分模式

从版本 4.1 开始, Spring 集成提供了幂等接收机 (opens new window)Enterprise 集成模式的实现。它是一种功能模式,整个幂等逻辑应该在应用程序中实现。但是,为了简化决策,提供了IdempotentReceiverInterceptor组件。这是一个 AOP Advice,它应用于MessageHandler.handleMessage()方法,并且可以根据其配置filter请求消息或将其标记为duplicate

在此之前,你可以通过在<filter/>(参见Filter)中使用自定义的MessageSelector来实现此模式。然而,由于该模式真正定义了端点的行为,而不是端点本身,所以幂等接收器实现不提供端点组件。相反,它应用于应用程序中声明的端点。

IdempotentReceiverInterceptor的逻辑是基于所提供的MessageSelector,并且,如果该选择符不接受消息,则将duplicateMessage头设置为true。目标MessageHandler(或下游流)可以参考这个头来实现正确的等效性逻辑。如果IdempotentReceiverInterceptor配置了discardChannelthrowExceptionOnRejection = true,则重复消息不会发送到目标MessageHandler.handleMessage()。相反,它被抛弃了。如果你想丢弃(不对重复消息做任何操作),那么discardChannel应该配置为NullChannel,例如默认的nullChannel Bean。

为了保持消息之间的状态并提供比较消息的等效性的能力,我们提供了MetadataStoreSelector。它接受MessageProcessor实现(它基于Message创建一个查找键)和一个可选的ConcurrentMetadataStore元数据存储)。有关更多信息,请参见[MetadataStoreSelectorJavadoc](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/selector/metadatastoreselector.html)。你还可以使用额外的MessageProcessor来定制valueConcurrentMetadataStore。默认情况下,MetadataStoreSelector使用timestamp消息头。

通常情况下,如果键不存在值,选择器会选择要接受的消息。在某些情况下,比较一个键的当前值和新值是有用的,以确定是否应该接受该消息。从版本 5.3 开始,提供了compareValues属性,该属性引用了BiPredicate<String, String>;第一个参数是旧值;返回true,以接受消息并在MetadataStore中用新值替换旧值。这对于减少键的数量很有用;例如,当处理文件中的行时,可以将文件名存储在键中,并将当前行号存储在值中。然后,在重新启动之后,你可以跳过已经处理过的行。有关示例,请参见幂等下游处理拆分文件

为了方便起见,MetadataStoreSelector选项可直接在<idempotent-receiver>组件上配置。下面的清单显示了所有可能的属性:

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1 IdempotentReceiverInterceptor Bean 的 ID。
可选的。
2 将此拦截器应用到的使用者端点名称或模式。
使用逗号(,)的单独名称(模式),例如endpoint="aaa, bbb*, **ccc, *ddd**, eee*fff"。然后使用与这些模式匹配的名称来检索目标端点的MessageHandler Bean(使用其.handler后缀),并将IdempotentReceiverInterceptor应用到这些 bean。
需要。
3 aMessageSelector Bean 引用。
metadata-storekey-strategy (key-expression)互斥。
未提供selector时,需要key-strategykey-strategy-expression中的一个。
4 标识当IdempotentReceiverInterceptor不接受消息时要向其发送消息的通道。
省略时,重复的消息将以duplicateMessage标头转发到处理程序。
可选。
5 aConcurrentMetadataStore引用。
由底层MetadataStoreSelector使用。
selector互斥。
可选。
默认MetadataStoreSelector使用内部SimpleMetadataStore,该状态不在应用程序执行中保持。
6 aMessageProcessor引用.
由底层MetadataStoreSelector使用.
从请求消息中计算出一个idempotentKey
key-expression互斥。
如果不提供selector,则需要key-strategy中的一个。
7 用于填充ExpressionEvaluatingMessageProcessor的 spel 表达式。
由底层MetadataStoreSelector使用。
通过使用请求消息作为计算上下文的根对象来计算idempotentKey
key-strategy互斥。
如果不提供selector,需要key-strategykey-strategy-expression中的一个。
8 aMessageProcessor引用.
由底层MetadataStoreSelector使用.
对来自请求消息的idempotentKey求值value
value-expression互斥。默认情况下,’metataStoreStamp’message 消息使用’元数据标头作为元数据的值。
9 用于填充ExpressionEvaluatingMessageProcessor的 spel 表达式。
由底层MetadataStoreSelector使用。
通过使用请求消息作为计算上下文的根对象,为value计算
selector互斥。value-strategy默认情况下,,“元数据存储选择器”使用“时间戳”消息头作为元数据“值”。
10 BiPredicate<String, String> Bean 的引用,它允许你通过比较键的新旧值来选择消息;默认情况下,null
11 如果IdempotentReceiverInterceptor拒绝消息,是否抛出异常。
默认为false
无论是否提供discard-channel,都将应用它。

Spring 对于 Java 配置,集成提供了方法级的@IdempotentReceiver注释。它用于标记具有消息注释(method@ServiceActivator@Router, and others) to specify whichidempotentReceiverInterceptorobjects are applied to this endpoint. The following example shows how to use the)的method@idempotentReceiver` 注释:

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

使用 Java DSL 时,可以将拦截器添加到端点的建议链中,如下例所示:

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor仅为MessageHandler.handleMessage(Message<?>)方法设计。
从版本 4.3.1 开始,它实现了HandleMessageAdvice,并将AbstractHandleMessageAdvice作为基类,以更好地解离。
参见处理消息建议获取更多信息。

# 日志通道适配器

<logging-channel-adapter>通常与丝锥结合使用,如Wire Tap中所讨论的。然而,它也可以作为任何流的最终消费者。例如,考虑一个以<service-activator>结尾的流,它返回一个结果,但你希望放弃该结果。要做到这一点,你可以将结果发送到NullChannel。或者,你可以将它路由到INFO级别<logging-channel-adapter>。这样,当在INFO级别进行日志记录时,你可以看到丢弃的消息,但在(例如)WARN级别进行日志记录时,则看不到它。使用NullChannel,在DEBUG级别进行日志记录时,你只会看到丢弃的消息。下面的清单显示了logging-channel-adapter元素的所有可能属性:

<int:logging-channel-adapter
    channel="" (1)
    level="INFO" (2)
    expression="" (3)
    log-full-message="false" (4)
    logger-name="" /> (5)
1 将日志适配器连接到上游组件的通道。
2 将日志记录发送到此适配器的消息的日志级别。
默认值:INFO
3 一个 SPEL 表达式,确切地表示记录了消息的哪些部分。
默认:payload—只记录了有效负载。
如果指定了log-full-message,则无法指定此属性。
4 true时,将记录整个消息(包括消息头)。
默认:false—只记录有效负载。
如果指定expression,则无法指定此属性。
5 指定记录器的name(在log4j中称为category)。
用于标识由此适配器创建的日志消息。
这允许为各个适配器设置(在记录子系统中)日志名称。
默认情况下,所有适配器都将日志记录在以下名称下:org.springframework.integration.handler.LoggingHandler

# 使用 Java 配置

Spring 下面的引导应用程序显示了通过使用 Java 配置来配置LoggingHandler的示例:

@SpringBootApplication
public class LoggingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
             new SpringApplicationBuilder(LoggingJavaApplication.class)
                    .web(false)
                    .run(args);
         MyGateway gateway = context.getBean(MyGateway.class);
         gateway.sendToLogger("foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "logChannel")
    public LoggingHandler logging() {
        LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.DEBUG);
        adapter.setLoggerName("TEST_LOGGER");
        adapter.setLogExpressionString("headers.id + ': ' + payload");
        return adapter;
    }

    @MessagingGateway(defaultRequestChannel = "logChannel")
    public interface MyGateway {

        void sendToLogger(String data);

    }

}

# 使用 Java DSL 进行配置

Spring 下面的引导应用程序展示了使用 Java DSL 配置日志通道适配器的示例:

@SpringBootApplication
public class LoggingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
             new SpringApplicationBuilder(LoggingJavaApplication.class)
                    .web(false)
                    .run(args);
         MyGateway gateway = context.getBean(MyGateway.class);
         gateway.sendToLogger("foo");
    }

    @Bean
    public IntegrationFlow loggingFlow() {
        return IntegrationFlows.from(MyGateway.class)
                     .log(LoggingHandler.Level.DEBUG, "TEST_LOGGER",
                           m -> m.getHeaders().getId() + ": " + m.getPayload());
    }

    @MessagingGateway
    public interface MyGateway {

        void sendToLogger(String data);

    }

}

# java.util.function接口支持

从版本 5.1 开始, Spring 集成为java.util.function包中的接口提供了直接支持。所有消息传递端点(服务激活器、转换器、过滤器等)现在都可以引用Function(或Consumer)bean。消息传递注释可以直接应用于这些 bean,类似于常规的MessageHandler定义。例如,如果你有这个Function Bean 定义:

@Configuration
public class FunctionConfiguration {

    @Bean
    public Function<String, String> functionAsService() {
        return String::toUpperCase;
    }

}

你可以将它用作 XML 配置文件中的一个简单引用:

<service-activator input-channel="processorViaFunctionChannel" ref="functionAsService"/>

当我们用消息批注配置流时,代码很简单:

@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
    return String::toUpperCase;
}

当函数返回一个数组时,Collection(本质上是任意Iterable),Stream或 reactorFlux@Splitter可以在这样的 Bean 上用于对结果内容执行迭代。

java.util.function.Consumer接口可用于<int:outbound-channel-adapter>,或者与@ServiceActivator注释一起用于执行流的最后一步:

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
public Consumer<Message<?>> messageConsumerAsService() {
    // Has to be an anonymous class for proper type inference
    return new Consumer<Message<?>>() {

        @Override
        public void accept(Message<?> e) {
            collector().add(e);
        }

    };
}

另外,请注意上面代码片段中的注释:如果你想处理Function/Consumer中的整个消息,则不能使用 lambda 定义。由于 Java 类型擦除,我们无法确定apply()/accept()方法调用的目标类型。

java.util.function.Supplier接口可以简单地与@InboundChannelAdapter注释一起使用,或者在<int:inbound-channel-adapter>中作为ref接口使用:

@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
    return () -> "foo";
}

对于 Java DSL,我们只需要在端点定义中使用对函数 Bean 的引用。同时实现Supplier的接口可以作为MessageSource的常规定义:

@Bean
public Function<String, String> toUpperCaseFunction() {
    return String::toUpperCase;
}

@Bean
public Supplier<String> stringSupplier() {
    return () -> "foo";
}

@Bean
public IntegrationFlow supplierFlow() {
    return IntegrationFlows.from(stringSupplier())
                .transform(toUpperCaseFunction())
                .channel("suppliedChannel")
                .get();
}

当与Spring Cloud Function (opens new window)框架一起使用时,此函数支持非常有用,在该框架中,我们有一个函数目录,并且可以从集成流定义中引用其成员函数。

# Kotlin lambdas

该框架还进行了改进,以支持用于函数的 Kotlin lambdas,因此现在可以使用 Kotlin 语言和 Spring 集成流定义的组合:

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")])
fun kotlinSupplier(): () -> String {
    return { "baz" }
}