# JDBC 支持

# JDBC 支持

Spring 集成提供了用于通过使用数据库查询来接收和发送消息的通道适配器。通过这些适配器, Spring 集成不仅支持普通的 JDBC SQL 查询,还支持存储过程和存储函数调用。

你需要在项目中包含此依赖项:

Maven

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-jdbc</artifactId>
    <version>5.5.9</version>
</dependency>

Gradle

compile "org.springframework.integration:spring-integration-jdbc:5.5.9"

默认情况下,以下 JDBC 组件可用:

Spring 集成 JDBC 模块还提供了JDBC 消息存储

# 入站通道适配器

入站通道适配器的主要功能是执行 SQLSELECT查询,并将结果集转换为消息。消息负载是整个结果集(表示为List),列表中项的类型取决于行映射策略。默认策略是泛型映射器,它为查询结果中的每一行返回Map。你可以通过添加对RowMapper实例的引用来改变这一点(有关行映射的更多详细信息,请参见Spring JDBC (opens new window)文档)。

如果希望将SELECT查询结果中的行转换为单个消息,则可以使用下游拆分器。

入站适配器还需要对JdbcTemplate实例或DataSource实例的引用。

除了生成消息的SELECT语句外,适配器还具有一个UPDATE语句,该语句将记录标记为已处理,以便它们不会在下一次投票中显示。更新可以通过原始 SELECT 中的 ID 列表进行参数化。默认情况下,这是通过命名约定完成的(输入结果集中称为id的列被转换为参数映射中用于更新的列表id)。下面的示例定义了一个带更新查询和DataSource引用的入站通道适配器。

<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
    channel="target" data-source="dataSource"
    update="update item set status=10 where id in (:id)" />
更新查询中的参数是用冒号(:)作为参数名称的前缀来指定的(在前面的示例中,是要应用到轮询结果集中的每一行的表达式)。
这是 Spring JDBC 中命名参数 JDBC 支持的一个标准特性,结合 Spring 积分中采用的约定(投影到已调查的结果列表上),
JDBC 的底层特性限制了可用的表达式(例如,除句号外的大多数特殊字符都不允许),但是,由于目标通常是可由 Bean 路径寻址的对象的列表(可能是对象的列表),因此不会有过多的限制。

要更改参数生成策略,可以在适配器中注入SqlParameterSourceFactory以覆盖默认行为(适配器具有sql-parameter-source-factory属性)。 Spring 集成提供了ExpressionEvaluatingSqlParameterSourceFactory,它创建了一个基于 SPEL 的参数源,查询的结果作为#root对象。(如果update-per-row为真,则根对象是行)。如果相同的参数名在更新查询中出现多次,则只对其进行一次求值,并缓存其结果。

你还可以为 SELECT 查询使用参数源。在这种情况下,由于没有针对“结果”对象进行评估,因此每次都使用一个参数源(而不是使用参数源工厂)。从版本 4.0 开始,你可以使用 Spring 来创建基于 SPEL 的参数源,如下例所示:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
	channel="target" data-source="dataSource"
	select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
			factory-method="createParameterSourceNoCache">
	<constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
		class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
	<property name="parameterExpressions">
		<map>
			<entry key="status" value="@statusBean.which()" />
		</map>
	</property>
</bean>

<bean id="statusBean" class="foo.StatusDetermination" />

每个参数表达式中的value可以是任何有效的 SPEL 表达式。表达式求值的#root对象是在parameterSource Bean 上定义的构造函数参数。对于所有的求值,它都是静态的(在前面的示例中,一个空的String)。

从版本 5.0 开始,你可以使用ExpressionEvaluatingSqlParameterSourceFactorysqlParameterTypes为特定参数指定目标 SQL 类型。

下面的示例为查询中使用的参数提供了 SQL 类型:

<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
    channel="target" data-source="dataSource"
    select-sql-parameter-source="parameterSource" />

<bean id="parameterSource" factory-bean="parameterSourceFactory"
            factory-method="createParameterSourceNoCache">
    <constructor-arg value="" />
</bean>

<bean id="parameterSourceFactory"
        class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="sqlParameterTypes">
        <map>
            <entry key="status" value="#{ T(java.sql.Types).BINARY}" />
        </map>
    </property>
</bean>
使用createParameterSourceNoCache工厂方法。
否则,参数源将缓存求值的结果。
还请注意,由于缓存是禁用的,如果相同的参数名多次出现在 SELECT 查询中,则会对每次出现的结果进行重新求值。

# 轮询和事务

入站适配器接受一个常规的 Spring 集成 Poller 作为一个子元素。因此,可以控制轮询的频率(除其他用途外)。用于 JDBC 使用的 Poller 的一个重要特性是在事务中包装 poll 操作的选项,如下例所示:

<int-jdbc:inbound-channel-adapter query="..."
        channel="target" data-source="dataSource" update="...">
    <int:poller fixed-rate="1000">
        <int:transactional/>
    </int:poller>
</int-jdbc:inbound-channel-adapter>
如果没有显式地指定一个 Poller,则使用一个默认值。
作为 Spring 集成的正常情况,可以将其定义为顶级 Bean)。

在前面的示例中,数据库每 1000 毫秒(或每秒一次)进行一次轮询,并且 Update 和 SELECT 查询都在同一个事务中执行。未显示事务管理器配置。然而,只要它知道数据源,轮询就是事务性的。一个常见的用例是,下游通道是直接通道(默认),这样,在相同的线程中调用端点,从而调用相同的事务。这样,如果它们中的任何一个失败了,事务就会回滚,输入数据就会恢复到原来的状态。

# max-rowsvsmax-messages-per-poll

JDBC 入站通道适配器定义了一个名为max-rows的属性。在指定适配器的 Poller 时,还可以定义一个名为max-messages-per-poll的属性。虽然这两个属性看起来很相似,但它们的含义却大不相同。

max-messages-per-poll指定每个轮询间隔执行查询的次数,而max-rows指定每个执行返回的行数。

在正常情况下,在使用 JDBC 入站通道适配器时,你可能不希望设置 poller 的max-messages-per-poll属性。它的默认值是1,这意味着 JDBC 入站通道适配器的[receive()](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/jdbc/jdbcollingchanneladapter.html#receive())方法在每个轮询间隔中精确执行一次。

max-messages-per-poll属性设置为一个更大的值,这意味着查询将被执行多次。有关max-messages-per-poll属性的更多信息,请参见配置入站通道适配器

相反,如果max-rows属性大于0,则指定由receive()方法创建的查询结果集中要使用的最大行数。如果属性设置为0,则所有行都包含在结果消息中。属性默认为0

建议通过特定于供应商的查询选项使用结果集限制,例如 MySQL或 SQL Server或 甲骨文 的。有关更多信息,请参见特定供应商的文档。

# 出站通道适配器

出站通道适配器是入站的反面:它的作用是处理消息并使用它执行 SQL 查询。默认情况下,消息有效负载和消息头可作为查询的输入参数,如下例所示:

<int-jdbc:outbound-channel-adapter
    query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
    data-source="dataSource"
    channel="input"/>

在前面的示例中,到达标记为input的通道的消息具有一个映射的有效负载,其键为something,因此[]操作符从映射中解除该值。标题也可以作为地图访问。

前面的查询中的参数是传入消息上的 Bean 属性表达式(不是 SPEL 表达式)。
此行为是SqlParameterSource的一部分,这是出站适配器创建的默认源。
你可以注入不同的SqlParameterSourceFactory以获得不同的行为。

出站适配器需要引用DataSourceJdbcTemplate。还可以插入SqlParameterSourceFactory来控制每个传入消息与查询的绑定。

如果输入通道是直接通道,则出站适配器将在相同的线程中运行其查询,因此,与消息的发送者运行相同的事务(如果有的话)。

# 使用 spel 表达式传递参数

大多数 JDBC 通道适配器的一个常见需求是将参数作为 SQL 查询或存储过程或函数的一部分进行传递。如前所述,这些参数在默认情况下是 Bean 属性表达式,而不是 SPEL 表达式。但是,如果需要将 SPEL 表达式作为参数传递,则必须显式地注入SqlParameterSourceFactory

下面的示例使用ExpressionEvaluatingSqlParameterSourceFactory来实现该要求:

<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
    query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE)     \
    values (:id, :payload, :createdDate)"
    sql-parameter-source-factory="spelSource"/>

<bean id="spelSource"
      class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
    <property name="parameterExpressions">
        <map>
            <entry key="id"          value="headers['id'].toString()"/>
            <entry key="createdDate" value="new java.util.Date()"/>
            <entry key="payload"     value="payload"/>
        </map>
    </property>
</bean>

有关更多信息,请参见定义参数源

# 使用PreparedStatement回调

有时,SqlParameterSourceFactory的灵活性和松耦合并不能满足我们对目标PreparedStatement的需求,或者我们需要做一些低级别的 JDBC 工作。 Spring JDBC 模块提供 API 来配置执行环境(例如ConnectionCallbackPreparedStatementCreator)和操作参数值(例如SqlParameterSource)。它甚至可以访问用于低级操作的 API,例如StatementCallback

从 Spring Integration4.2 开始,MessagePreparedStatementSetter允许在PreparedStatement上下文中手动指定requestMessage上的参数。这个类在标准 Spring JDBC API 中扮演与PreparedStatementSetter完全相同的角色。实际上,当JdbcMessageHandlerJdbcTemplate上调用execute时,它是从内联PreparedStatementSetter实现直接调用的。

此功能接口选项与sqlParameterSourceFactory互斥,并且可以用作从requestMessage填充PreparedStatement参数的更强大的替代方案。例如,当我们需要以流的方式将File数据存储到数据库BLOB列时是有用的。下面的示例展示了如何做到这一点:

@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
    JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
            "INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
    jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
        ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
        try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
            ps.setBlob(2, inputStream);
        }
        catch (Exception e) {
            throw new MessageHandlingException(m, e);
        }
        ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
    });
    return jdbcMessageHandler;
}

从 XML 配置的角度来看,prepared-statement-setter属性在<int-jdbc:outbound-channel-adapter>组件上可用。它允许你指定MessagePreparedStatementSetter Bean 引用。

# 批量更新

从版本 5.1 开始,如果请求消息的有效负载是Iterable实例,则JdbcOperations.batchUpdate()执行JdbcOperations.batchUpdate()Iterable的每个元素都被包装到Message,并带有来自请求消息的标题。在基于常规SqlParameterSourceFactory的配置的情况下,这些消息用于为上述JdbcOperations.batchUpdate()函数中使用的参数构建SqlParameterSource[]。当应用MessagePreparedStatementSetter配置时,将使用BatchPreparedStatementSetter变量对每个项的这些消息进行迭代,并针对它们调用所提供的MessagePreparedStatementSetter。选择keysGenerated模式时,不支持批处理更新。

# 出站网关

出站网关类似于出站适配器和入站适配器的组合:它的作用是处理消息,并使用它执行 SQL 查询,然后通过将结果发送到应答通道来进行响应。默认情况下,消息有效负载和消息头可作为查询的输入参数,如下例所示:

<int-jdbc:outbound-gateway
    update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource" />

上述示例的结果是将一条记录插入mythings表,并返回一条消息,该消息指示到输出通道的受影响行数(有效负载是映射:{UPDATED=1})。

如果更新查询是带有自动生成键的 INSERT,则可以通过在前面的示例中添加keys-generated="true"来用生成的键填充答复消息(这不是默认的,因为某些数据库平台不支持它)。下面的示例显示了更改后的配置:

<int-jdbc:outbound-gateway
    update="insert into mythings (status, name) values (0, :payload[thing])"
    request-channel="input" reply-channel="output" data-source="dataSource"
    keys-generated="true"/>

你还可以提供一个 SELECT 查询来执行并从结果(例如入站适配器)生成一个回复消息,而不是更新计数或生成的键,如下例所示:

<int-jdbc:outbound-gateway
    update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
    query="select * from foos where id=:headers[$id]"
    request-channel="input" reply-channel="output" data-source="dataSource"/>

自 Spring Integration2.2 以来,更新 SQL 查询不再是强制性的。现在,你可以使用query属性或query元素,只提供一个 SELECT 查询。如果你需要通过例如使用通用网关或有效负载 Enricher 来主动检索数据,这将非常有用。然后从结果生成应答消息(类似于入站适配器的工作方式)并将其传递给应答通道。下面的示例展示了如何使用query属性:

<int-jdbc:outbound-gateway
    query="select * from foos where id=:headers[id]"
    request-channel="input"
    reply-channel="output"
    data-source="dataSource"/>
默认情况下,SELECT查询的组件仅从游标返回一个(第一个)行。
你可以使用max-rows选项调整此行为。
如果你需要从 SELECT 返回所有行,请考虑指定max-rows="0"

与通道适配器一样,你也可以为请求和回复提供SqlParameterSourceFactory实例。缺省值与出站适配器相同,因此请求消息可以作为表达式的根消息使用。如果keys-generated="true",表达式的根是生成的键(如果只有一个映射,则是一个映射;如果是多值的,则是一个映射列表)。

出站网关需要引用DataSourceJdbcTemplate。它还可以有一个SqlParameterSourceFactory注入,以控制传入消息与查询的绑定。

从版本 4.2 开始,request-prepared-statement-setter属性在<int-jdbc:outbound-gateway>上可用,作为request-sql-parameter-source-factory的替代项。它允许你指定MessagePreparedStatementSetter Bean 引用,它在执行之前实现更复杂的PreparedStatement准备。

有关MessagePreparedStatementSetter的更多信息,请参见出站通道适配器

# JDBC 消息存储

Spring 集成提供了两个 JDBC 特定的消息存储实现。JdbcMessageStore适合用于聚合器和索赔检查模式。JdbcChannelMessageStore实现为消息通道提供了更有针对性和可伸缩的实现。

注意,可以使用JdbcMessageStore来备份消息通道,JdbcChannelMessageStore为此进行了优化。

从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore的索引已经优化,
如果你在这样的存储中有大的消息组,你可能希望更改索引。,此外,注释了PriorityChannel的索引,因为除非你使用的是由 JDBC 支持的这样的通道,否则不需要它。
当使用甲骨文ChannelMessageStoreQueryProvider时,将添加优先通道索引必须,因为它包含在查询中的一个提示中。

# 初始化数据库

在开始使用 JDBC 消息存储组件之前,你应该为目标数据库提供适当的对象。

Spring 集成附带一些示例脚本,这些脚本可用于初始化数据库。在spring-integration-jdbcJAR 文件中,你可以在org.springframework.integration.jdbc包中找到脚本。它为一系列公共数据库平台提供了一个示例 CREATE 和一个示例 DROP 脚本。使用这些脚本的一种常见方法是在Spring JDBC data source initializer (opens new window)中引用它们。请注意,这些脚本是作为示例以及所需的表和列名的规范提供的。你可能会发现需要为生产使用而增强它们(例如,通过添加索引声明)。

# 通用 JDBC 消息存储

JDBC 模块提供了由数据库支持的 Spring 集成MessageStore(在 ClaimCheck 模式中很重要)和MessageGroupStore(在有状态模式中很重要,例如聚合器)的实现。这两个接口都是由JdbcMessageStore实现的,并且支持在 XML 中配置存储实例,如下例所示:

<int-jdbc:message-store id="messageStore" data-source="dataSource"/>

你可以指定JdbcTemplate,而不是DataSource

下面的示例展示了其他一些可选属性:

<int-jdbc:message-store id="messageStore" data-source="dataSource"
    lob-handler="lobHandler" table-prefix="MY_INT_"/>

在前面的示例中,我们指定了一个LobHandler,用于将消息作为大型对象处理(对于 甲骨文 来说,这通常是必需的),并为存储生成的查询中的表名指定了一个前缀。表名前缀默认为INT_

# 后台消息通道

如果你打算用 JDBC 支持消息通道,我们建议使用JdbcChannelMessageStore实现。它只与消息通道一起工作。

# 支持的数据库

JdbcChannelMessageStore使用特定于数据库的 SQL 查询来从数据库检索消息。因此,你必须在JdbcChannelMessageStore上设置ChannelMessageStoreQueryProvider属性。这个channelMessageStoreQueryProvider为你指定的特定数据库提供 SQL 查询。 Spring 集成为以下关系数据库提供了支持:

  • PostgreSQL

  • HSQLDB

  • MySQL

  • Oracle

  • 德比

  • H2

  • SQLServer

  • Sybase

  • DB2

如果你的数据库未列出,则可以扩展AbstractChannelMessageStoreQueryProvider类并提供自己的自定义查询。

4.0 版本在表中添加了MESSAGE_SEQUENCE列,以确保即使消息存储在相同的毫秒内也能进行先进先出排队。

# 自定义消息插入

从版本 5.0 开始,通过重载ChannelMessageStorePreparedStatementSetter类,可以为JdbcChannelMessageStore中的消息插入提供自定义实现。你可以使用它来设置不同的列,或者改变表结构或序列化策略。例如,你可以将它的结构存储为 JSON 字符串,而不是默认的序列化byte[]

下面的示例使用setValues的默认实现来存储公共列,并重写将消息有效负载存储为varchar的行为:

public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {

    @Override
    public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
        Object groupId, String region, 	boolean priorityEnabled) throws SQLException {
        // Populate common columns
        super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
        // Store message payload as varchar
        preparedStatement.setString(6, requestMessage.getPayload().toString());
    }
}
通常,我们不建议使用关系数据库进行排队。
相反,如果可能的话,可以考虑使用 JMS-或 AMQP 支持的通道代替。
有关更多参考,请参见以下参考资料:



*作为队列反模式的数据库 (opens new window)
# 并发轮询

在轮询消息通道时,你可以选择将关联的Poller配置为TaskExecutor引用。

不过,请记住,如果你使用 JDBC 支持的消息通道,并且你计划轮询该通道,并因此轮询多线程的消息存储,那么你应该确保使用支持多版本并发控制 (opens new window)的关系数据库。,否则,
,锁定可能是一个问题,并且在使用多个线程时,性能可能不会像预期的那样实现。
例如,阿帕奇德比 在这方面是有问题的。

为了实现更好的 JDBC 队列吞吐量,并避免在不同线程可能从队列轮询相同的Message时出现问题,使用不支持 MVCC 的数据库时,将重要的usingIdCache属性设置为true
下面的示例演示了如何这样做:

<br/><bean id="queryProvider"<br/> class="o.s.i.jdbc.store.channel.PostgresChannelMessageStoreQueryProvider"/><br/><br/><int:transaction-synchronization-factory id="syncFactory"><br/> <int:after-commit expression="@store.removeFromIdCache(headers.id.toString())" /><br/> <int:after-rollback expression="@store.removeFromIdCache(headers.id.toString())"/><br/></int:transaction-synchronization-factory><br/><br/><task:executor id="pool" pool-size="10"<br/> queue-capacity="10" rejection-policy="CALLER_RUNS" /><br/><br/><bean id="store" class="o.s.i.jdbc.store.JdbcChannelMessageStore"><br/> <property name="dataSource" ref="dataSource"/><br/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/><br/> <property name="region" value="TX_TIMEOUT"/><br/> <property name="usingIdCache" value="true"/><br/></bean><br/><br/><int:channel id="inputChannel"><br/> <int:queue message-store="store"/><br/></int:channel><br/><br/><int:bridge input-channel="inputChannel" output-channel="outputChannel"><br/> <int:poller fixed-delay="500" receive-timeout="500"<br/> max-messages-per-poll="1" task-executor="pool"><br/> <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory"<br/> isolation="READ_COMMITTED" transaction-manager="transactionManager" /><br/> </int:poller><br/></int:bridge><br/><br/><int:channel id="outputChannel" /><br/>
# 优先通道

从版本 4.0 开始,JdbcChannelMessageStore实现了PriorityCapableChannelMessageStore,并提供了priorityEnabled选项,让它用作message-store实例的引用。为此,INT_CHANNEL_MESSAGE表有一个MESSAGE_PRIORITY列来存储PRIORITY消息头的值。此外,一个新的MESSAGE_SEQUENCE列使我们能够实现一种健壮的先进先出轮询机制,即使多个消息以相同的优先级存储在相同的毫秒中。用order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE对数据库中的消息进行轮询(选中)。

我们不建议对优先级和非优先级队列通道使用相同的JdbcChannelMessageStore Bean,因为priorityEnabled选项适用于整个存储,并且对于队列通道不保留适当的 FIFO 队列语义。,但是,
,相同的INT_CHANNEL_MESSAGE表(甚至region)可以用于两个JdbcChannelMessageStore类型。
要配置该场景,可以从另一个消息存储 Bean 扩展一个消息存储,如下例所示:
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

# 对消息存储进行分区

通常使用JdbcMessageStore作为同一应用程序中的一组应用程序或节点的全局存储。为了提供一些防止名称冲突的保护,并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。一种方法是使用单独的表名,通过更改前缀(as前面描述的)。另一种方法是为单个表中的数据分区指定region名称。第二种方法的一个重要用例是当MessageStore管理支持 Spring 集成消息通道的持久性队列时。持久通道的消息数据在通道名称的存储中进行键控。因此,如果通道名称不是全局唯一的,则通道可以接收不适合它们的数据。为了避免这种危险,你可以使用消息存储region将具有相同逻辑名称的不同物理通道的数据分开。

# 存储过程

在某些情况下,仅仅支持 JDBC 是不够的。也许你需要处理遗留的关系数据库模式,或者你有复杂的数据处理需求,但是,最终,你必须使用存储过程 (opens new window)或存储函数。 Spring 自集成 2.1 以来,我们提供了三个组件来执行存储过程或存储函数:

  • 存储过程入站通道适配器

  • 存储过程出站通道适配器

  • 存储过程出站网关

# 支持的数据库

为了启用对存储过程和存储函数的调用,存储过程组件使用[org.springframework.jdbc.core.simple.SimpleJdbcCall](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/jdbc/core/simplejdbccall.html)类。因此,完全支持以下数据库来执行存储过程:

  • Apache Derby

  • DB2

  • MySQL

  • Microsoft SQL Server

  • Oracle

  • PostgreSQL

  • Sybase

如果你想执行存储函数,则完全支持以下数据库:

  • MySQL

  • Microsoft SQL Server

  • Oracle

  • PostgreSQL

即使你的特定数据库可能没有得到完全支持,但只要你的 RDBMS 支持存储过程或存储函数,你仍然可以非常成功地使用存储过程 Spring 集成组件,事实上,

,提供的一些集成测试使用H2 数据库 (opens new window)
尽管如此,彻底测试这些使用场景是非常重要的。

# 配置

存储过程组件提供了完整的 XML 命名空间支持,并且配置这些组件与前面讨论的通用 JDBC 组件类似。

# 公共配置属性

所有存储过程组件共享某些配置参数:

  • auto-startup:生命周期属性,用于在应用程序上下文启动期间发出是否应启动此组件的信号。它的默认值为true。可选的。

  • data-source:引用javax.sql.DataSource,用于访问数据库。必须的。

  • id:标识底层 Spring Bean 定义,这是EventDrivenConsumerPollingConsumer的实例,这取决于出站通道适配器的channel属性是否引用了SubscribableChannelPollableChannel。可选的。

  • ignore-column-meta-data:对于完全支持的数据库,底层[SimpleJdbcCall](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/jdbc/core/core/simplejdbccall.html)类可以自动从 JDBC 元数据中检索存储过程或存储函数的参数信息。

    但是,如果数据库不支持元数据查找,或者如果需要提供定制的参数定义,则可以将此标志设置为true。它的默认值为false。可选的。

  • is-function:如果true,调用一个 SQL 函数。在这种情况下,stored-procedure-namestored-procedure-name-expression属性定义了被调用函数的名称。它默认为false。可选的。

  • stored-procedure-name:此属性指定存储过程的名称。如果is-function属性设置为true,则此属性指定函数名。必须指定此属性或stored-procedure-name-expression

  • stored-procedure-name-expression:此属性通过使用 SPEL 表达式指定存储过程的名称。通过使用 SPEL,你可以访问完整的消息(如果可用的话),包括其标题和有效负载。你可以使用此属性在运行时调用不同的存储过程。例如,你可以提供希望作为消息头执行的存储过程名。表达式必须解析为String

    如果is-function属性设置为true,则此属性指定一个存储函数。必须指定此属性或stored-procedure-name

  • jdbc-call-operations-cache-size:定义缓存SimpleJdbcCallOperations实例的最大数量。基本上,对于每个存储过程名称,都会创建一个新的[SimpleJdbcCallOperations](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/jdbc/core/simplejdbcalloperations.html)实例,作为回报,该实例被缓存。

    Spring Integration2.2 添加了stored-procedure-name-expression属性和jdbc-call-operations-cache-size属性。

    默认的缓存大小是10。值0将禁用缓存。负值是不允许的。

    如果启用 JMX,则有关jdbc-call-operations-cache的统计信息将作为 MBean 公开。有关更多信息,请参见MBean 出口商

  • sql-parameter-source-factory:(存储过程入站通道适配器不可用。)引用SqlParameterSourceFactory。 Bean 默认情况下,通过使用BeanPropertySqlParameterSourceFactory,在Message中传递的有效负载的属性被用作存储过程输入参数的源。

    对于基本的用例来说,这可能就足够了。对于更复杂的选项,可以考虑传入一个或多个ProcedureParameter值。见定义参数源。可选的。

  • use-payload-as-parameter-source:(存储过程入站通道适配器不可用。)如果设置为true,则Message的有效负载被用作提供参数的源。但是,如果设置为false,则整个Message可以作为参数的源。

    如果没有传递过程参数,则此属性默认为true。这意味着,通过使用默认的BeanPropertySqlParameterSourceFactory,有效负载的 Bean 属性被用作存储过程或存储函数的参数值的源。

    但是,如果传递了过程参数,则此属性(默认情况下)计算为falseProcedureParameter将提供 SPEL 表达式。因此,具有对整个Message的访问是非常有益的。该属性设置在底层StoredProcExecutor上。可选的。

# 公共配置子元素

存储过程组件共享一组公共的子元素,你可以使用这些子元素定义参数并将参数传递给存储过程或存储函数。以下要素是可用的:

  • parameter

  • returning-resultset

  • sql-parameter-definition

  • poller

  • parameter:提供一种机制来提供存储过程参数。参数可以是静态的,也可以通过使用 SPEL 表达式提供。

    <int-jdbc:parameter name=""         (1)
                        type=""         (2)
                        value=""/>      (3)
    
    <int-jdbc:parameter name=""
                        expression=""/> (4)
    

    +<1>要传递到存储过程或存储函数中的参数的名称。必须的。<2>此属性指定该值的类型。如果不提供任何内容,则此属性默认为java.lang.String。此属性仅在使用value属性时使用。可选的。<3>参数的值。你必须提供此属性或expression属性。可选的。<4>而不是value属性,你可以指定一个用于传递参数值的 SPEL 表达式。如果指定expression,则不允许使用value属性。可选的。

    可选的。

  • returning-resultset:存储过程可能返回多个结果集。通过设置一个或多个returning-resultset元素,你可以指定RowMappers来将每个返回的ResultSet转换为有意义的对象。可选的。

    <int-jdbc:returning-resultset name="" row-mapper="" />
    
  • sql-parameter-definition:如果使用完全支持的数据库,则通常不必指定存储过程参数定义。相反,可以从 JDBC 元数据自动派生这些参数。但是,如果你使用的是不完全支持的数据库,则必须使用sql-parameter-definition元素显式地设置这些参数。

    你还可以选择使用ignore-column-meta-data属性来关闭通过 JDBC 获得的参数元数据信息的任何处理。

    <int-jdbc:sql-parameter-definition
                                       name=""                           (1)
                                       direction="IN"                    (2)
                                       type="STRING"                     (3)
                                       scale="5"                         (4)
                                       type-name="FOO_STRUCT"            (5)
                                       return-type="fooSqlReturnType"/>  (6)
    
    1 指定 SQL 参数的名称。
    required。
    2 指定 SQL 参数定义的方向。
    默认值为IN
    有效值为:INOUT,和INOUT
    如果你的过程返回结果集,请使用returning-resultset元素。
    可选的。
    3 用于此 SQL 参数定义的 SQL 类型。
    转换为一个整数值,由java.sql.Types定义。
    或者,你也可以提供整数值。
    如果未显式设置此属性,它默认为“varchar”。
    可选。
    4 SQL 参数的比例。
    仅用于数字和十进制参数。
    可选。
    5 typeName用于用户命名的类型,例如:STRUCTDISTINCTJAVA_OBJECT,以及命名的数组类型。
    该属性与scale属性是互斥的。
    可选的。
    6 对复杂类型的自定义值处理程序的引用。
    [SqlReturnType](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/jdbc/core/sqlreturntype.html)的实现。
    该属性与scale属性相互排斥,仅适用于 out 和 inout 参数。
    可选参数。
  • poller:如果这个端点是PollingConsumer,则允许你配置消息 poller。可选的。

# 定义参数源

参数源控制检索 Spring 集成消息属性并将其映射到相关的存储过程输入参数的技术。

存储过程组件遵循某些规则。默认情况下,Message有效负载的 Bean 属性被用作存储过程输入参数的源。在这种情况下,使用BeanPropertySqlParameterSourceFactory。对于基本的用例来说,这可能就足够了。下一个示例演示了这种默认行为。

为了通过使用BeanPropertySqlParameterSourceFactory对 Bean 属性进行“自动”查找,你的 Bean 属性必须用小写字母定义。
这是由于在org.springframework.jdbc.core.metadata.CallMetaDataContext(Java 方法是matchInParameterValuesWithCallParameters())中,检索到的存储过程参数声明被转换为小写。
结果是,如果具有 camel-case Bean 属性(例如lastName),则查找失败。
在这种情况下,提供显式的ProcedureParameter

假设我们有一个有效负载,它由一个简单的 Bean 组成,具有以下三个属性:idnamedescription。此外,我们有一个名为INSERT_COFFEE的简单存储过程,它接受三个输入参数:idnamedescription。我们还使用了一个完全支持的数据库。在这种情况下,存储过程出站适配器的以下配置就足够了:

<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
    channel="insertCoffeeProcedureRequestChannel"
    stored-procedure-name="INSERT_COFFEE"/>

对于更复杂的选项,可以考虑传入一个或多个ProcedureParameter值。

如果确实显式地提供ProcedureParameter值,则默认情况下,参数处理使用ExpressionEvaluatingSqlParameterSourceFactory,以启用 SPEL 表达式的全部功能。

如果你需要更多地控制参数的检索方式,可以考虑使用sql-parameter-source-factory属性来传入SqlParameterSourceFactory的自定义实现。

# 存储过程入站通道适配器

下面的清单调用了对存储过程入站通道适配器很重要的属性:

<int-jdbc:stored-proc-inbound-channel-adapter
                                   channel=""                                    (1)
                                   stored-procedure-name=""
                                   data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   skip-undeclared-results=""                    (2)
                                   return-value-required="false"                 (3)
    <int:poller/>
    <int-jdbc:sql-parameter-definition name="" direction="IN"
                                               type="STRING"
                                               scale=""/>
    <int-jdbc:parameter name="" type="" value=""/>
    <int-jdbc:parameter name="" expression=""/>
    <int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 查询消息发送到的通道。
如果存储过程或函数不返回任何数据,Message的有效负载为空。
需要。
2 如果将此属性设置为true,那么存储过程调用的所有结果,如果没有相应的SqlOutParameter声明,都将被忽略,
例如,存储过程可以返回更新计数值,即使你的存储过程只声明了一个结果参数。
确切的行为取决于数据库实现。
该值设置在底层JdbcTemplate上。
值默认为true
可选的。
3 指示是否应包括此过程的返回值。
自 Spring 积分 3.0 起。
可选。

# 存储过程出站通道适配器

下面的清单调用了对存储过程出站通道适配器很重要的属性:

<int-jdbc:stored-proc-outbound-channel-adapter channel=""                        (1)
                                               stored-procedure-name=""
                                               data-source=""
                                               auto-startup="true"
                                               id=""
                                               ignore-column-meta-data="false"
                                               order=""                          (2)
                                               sql-parameter-source-factory=""
                                               use-payload-as-parameter-source="">
    <int:poller fixed-rate=""/>
    <int-jdbc:sql-parameter-definition name=""/>
    <int-jdbc:parameter name=""/>

</int-jdbc:stored-proc-outbound-channel-adapter>
1 这个端点的接收消息通道。
需要。
2 指定当此端点作为订阅服务器连接到某个通道时的调用顺序。
当该通道使用failover调度策略时,这一点尤其相关。
当该端点本身是具有队列的通道的轮询消费者时,它没有任何作用。
可选的。

# 存储过程出站网关

下面的清单调用了对存储过程出站通道适配器很重要的属性:

<int-jdbc:stored-proc-outbound-gateway request-channel=""                        (1)
                                       stored-procedure-name=""
                                       data-source=""
                                   auto-startup="true"
                                   id=""
                                   ignore-column-meta-data="false"
                                   is-function="false"
                                   order=""
                                   reply-channel=""                              (2)
                                   reply-timeout=""                              (3)
                                   return-value-required="false"                 (4)
                                   skip-undeclared-results=""                    (5)
                                   sql-parameter-source-factory=""
                                   use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
                                   type=""
                                   scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 这个端点的接收消息通道。
需要。
2 接收到数据库响应后应向其发送回复的消息通道。
可选的。
3 让你指定这个网关在成功发送回复消息之前要等待多长时间才能抛出异常。
请记住,当发送到DirectChannel时,调用发生在发送方的线程中。
因此,发送操作的失败可能是由下游的其他组件引起的。
默认情况下,网关无限期地等待。
该值以毫秒为单位指定。
可选。
4 指示是否应包含此过程的返回值。
可选的。
5 如果skip-undeclared-results属性被设置为true,那么来自存储过程调用的所有没有相应的SqlOutParameter声明的结果都将被忽略,
例如,存储过程可能返回一个更新计数值,即使你的存储过程只声明了一个结果参数。
确切的行为取决于数据库。
该值设置在底层JdbcTemplate上。
值默认为true
可选的。

# 示例

本节包含两个调用阿帕奇德比 (opens new window)存储过程的示例。第一个过程调用返回ResultSet的存储过程。通过使用RowMapper,数据被转换成域对象,然后该域对象成为 Spring 集成消息有效负载。

在第二个示例中,我们调用了一个存储过程,该过程使用输出参数来返回数据。

看看Spring Integration Samples project (opens new window)

该项目包含这里引用的 Apache Derby 示例,以及如何运行它的说明。
Spring Integration Samples 项目还提供了使用 Oracle 存储过程的example (opens new window)

在第一个示例中,我们调用一个名为FIND_ALL_COFFEE_BEVERAGES的存储过程,该过程不定义任何输入参数,但返回一个ResultSet

在 Apache Derby 中,存储过程是用 Java 实现的。下面的清单显示了方法签名:

public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
            throws SQLException {
    ...
}

下面的清单显示了相应的 SQL:

CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';

在 Spring 集成中,现在可以通过例如使用stored-proc-outbound-gateway来调用这个存储过程,如下例所示:

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
                                       data-source="dataSource"
                                       request-channel="findAllProcedureRequestChannel"
                                       expect-single-result="true"
                                       stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
    row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>

在第二个示例中,我们调用一个名为FIND_COFFEE的存储过程,该过程有一个输入参数。它使用一个输出参数,而不是返回ResultSet。下面的示例展示了方法签名:

public static void findCoffee(int coffeeId, String[] coffeeDescription)
            throws SQLException {
    ...
}

下面的清单显示了相应的 SQL:

CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';

在 Spring 集成中,你现在可以通过例如使用stored-proc-outbound-gateway来调用这个存储过程,如下例所示:

<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
                                       data-source="dataSource"
                                       request-channel="findCoffeeProcedureRequestChannel"
                                       skip-undeclared-results="true"
                                       stored-procedure-name="FIND_COFFEE"
                                       expect-single-result="true">
    <int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>

# JDBC 锁定注册表

4.3 版引入了JdbcLockRegistry。某些组件(例如,聚合器和重排序程序)使用从LockRegistry实例获得的锁来确保一次只有一个线程操作组。DefaultLockRegistry在单个组件中执行此功能。现在,你可以在这些组件上配置一个外部锁注册中心。当与共享的MessageGroupStore一起使用时,你可以使用JdbcLockRegistry来跨多个应用程序实例提供此功能,使得一次只有一个实例可以操作组。

当一个本地线程释放一个锁时,另一个本地线程通常可以立即获得该锁。如果一个使用不同注册中心实例的线程释放了一个锁,那么获取该锁可能需要多达 100ms 的时间。

JdbcLockRegistry是基于LockRepository抽象的,它有一个DefaultLockRepository实现。数据库模式脚本位于org.springframework.integration.jdbc包中,该包针对特定的 RDBMS 供应商进行了划分。例如,下面的清单显示了锁表的 H2DDL:

CREATE TABLE INT_LOCK  (
    LOCK_KEY CHAR(36),
    REGION VARCHAR(100),
    CLIENT_ID CHAR(36),
    CREATED_DATE TIMESTAMP NOT NULL,
    constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);

INT_可以根据目标数据库的设计要求进行更改。因此,必须在DefaultLockRepository Bean 定义上使用prefix属性。

有时,一个应用程序已经移动到这样一种状态,它无法释放分布式锁并删除数据库中的特定记录。为此,其他应用程序可以在下一个锁定调用时过期这些死锁。为此目的,在DefaultLockRepository上提供了timeToLive选项。你可能还希望为给定的DefaultLockRepository实例存储的锁指定CLIENT_ID。如果是这样,你可以指定与DefaultLockRepository关联的id作为构造函数参数。

从版本 5.1.8 开始,JdbcLockRegistry可以配置idleBetweenTries-aDuration来在锁记录插入/更新执行之间休眠。默认情况下,它是100毫秒,并且在某些环境中,非领导者经常污染与数据源的连接。

从版本 5.4 开始,RenewableLockRegistry接口被引入并添加到JdbcLockRegistry。在锁定过程中必须调用renewLock()方法,以防止锁定过程的持续时间超过锁定时间。因此,生存时间可以大大减少,部署可以迅速夺回丢失的锁。

只有当锁被当前线程持有时,才能进行锁更新。

使用 5.5.6 版本的字符串,JdbcLockRegistry支持通过JdbcLockRegistry.setCacheCapacity()自动清理JdbcLockRegistry.locks中的 JDBClock 的缓存。有关更多信息,请参见其 Javadocs。

# JDBC 元数据存储

5.0 版本引入了 JDBCMetadataStore(参见元数据存储)实现。你可以使用JdbcMetadataStore在应用程序重启期间维护元数据状态。此MetadataStore实现可与以下适配器一起使用:

要将这些适配器配置为使用JdbcMetadataStore,请使用 Bean 名metadataStore声明 Spring Bean。提要入站通道适配器和提要入站通道适配器都会自动拾取并使用声明的JdbcMetadataStore,如下例所示:

@Bean
public MetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
}

org.springframework.integration.jdbc包为几个 RDMBS 供应商提供了数据库模式脚本。例如,下面的清单显示了元数据表的 H2DDL:

CREATE TABLE INT_METADATA_STORE  (
	METADATA_KEY VARCHAR(255) NOT NULL,
	METADATA_VALUE VARCHAR(4000),
	REGION VARCHAR(100) NOT NULL,
	constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);

你可以更改INT_前缀以匹配目标数据库的设计要求。你还可以配置JdbcMetadataStore以使用自定义前缀。

JdbcMetadataStore实现了ConcurrentMetadataStore,使其在多个应用程序实例之间可靠地共享,在多个应用程序实例中,只有一个实例可以存储或修改键值。由于事务担保,所有这些操作都是原子操作。

事务管理必须使用JdbcMetadataStore。入站通道适配器可以在 Poller 配置中提供对TransactionManager的引用。与非事务性的MetadataStore实现不同,对于JdbcMetadataStore,只有在事务提交之后,条目才会出现在目标表中。当发生回滚时,不会将任何条目添加到INT_METADATA_STORE表中。

从版本 5.0.7 开始,你可以使用特定于 RDBMS 供应商的lockHint选项配置JdbcMetadataStore选项,用于对元数据存储条目进行基于锁的查询。默认情况下,它是FOR UPDATE,如果目标数据库不支持行锁定功能,则可以使用空字符串进行配置。在更新前锁定行的SELECT表达式中,请咨询你的供应商,以获得特定的和可能的提示。