# 缩放和并行处理
# 缩放和并行处理
XMLJavaBoth
许多批处理问题可以通过单线程、单流程作业来解决,因此在考虑更复杂的实现之前,正确地检查它是否满足你的需求始终是一个好主意。衡量一项实际工作的性能,看看最简单的实现是否首先满足你的需求。即使使用标准的硬件,你也可以在一分钟内读写几百兆的文件。
Spring 当你准备好开始用一些并行处理来实现一个作业时, Spring Batch 提供了一系列选项,这些选项在本章中进行了描述,尽管其他地方也介绍了一些特性。在高层次上,有两种并行处理模式:
单过程、多线程
多进程
这些指标也可分为以下几类:
多线程步骤(单进程)
并行步骤(单一过程)
步骤的远程分块(多进程)
划分一个步骤(单个或多个进程)
首先,我们回顾一下单流程选项。然后,我们回顾了多进程的选择。
# 多线程步骤
启动并行处理的最简单方法是在步骤配置中添加TaskExecutor
。
例如,你可以添加tasklet
的一个属性,如下所示:
<step id="loading">
<tasklet task-executor="taskExecutor">...</tasklet>
</step>
当使用 Java 配置时,可以将TaskExecutor
添加到该步骤中,如以下示例所示:
Java 配置
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
在此示例中,taskExecutor
是对另一个 Bean 定义的引用,该定义实现了TaskExecutor
接口。[TaskExecutor
](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/core/core/task/taskexecutor.html)是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以获得可用实现的详细信息。最简单的多线程TaskExecutor
是SimpleAsyncTaskExecutor
。
上述配置的结果是,Step
通过在单独的执行线程中读取、处理和写入每个项块(每个提交间隔)来执行。请注意,这意味着要处理的项没有固定的顺序,并且块可能包含与单线程情况相比非连续的项。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,Tasklet 配置中还有一个油门限制,默认为 4。你可能需要增加这一点,以确保线程池得到充分利用。
例如,你可能会增加油门限制,如以下示例所示:
<step id="loading"> <tasklet
task-executor="taskExecutor"
throttle-limit="20">...</tasklet>
</step>
在使用 Java 配置时,构建器提供对油门限制的访问,如以下示例所示:
Java 配置
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.throttleLimit(20)
.build();
}
还请注意,在你的步骤中使用的任何池资源都可能对并发性施加限制,例如DataSource
。确保这些资源中的池至少与步骤中所需的并发线程数量一样大。
对于一些常见的批处理用例,使用多线程Step
实现有一些实际的限制。Step
中的许多参与者(例如读者和作者)是有状态的。如果状态不是由线程隔离的,那么这些组件在多线程Step
中是不可用的。特别是, Spring 批中的大多数现成的读取器和编写器都不是为多线程使用而设计的。然而,可以使用无状态的或线程安全的读取器和编写器,并且在Spring Batch Samples (opens new window)中有一个示例(称为parallelJob
),该示例显示了使用过程指示器(参见防止状态持久性)来跟踪在数据库输入表中已处理的项。
Spring 批处理提供了ItemWriter
和ItemReader
的一些实现方式。通常,他们会在 Javadoc 中说明它们是否是线程安全的,或者你必须做什么来避免在并发环境中出现问题。如果 Javadoc 中没有信息,则可以检查实现,以查看是否存在任何状态。如果阅读器不是线程安全的,那么你可以使用提供的SynchronizedItemStreamReader
来装饰它,或者在你自己的同步委托程序中使用它。你可以将调用同步到read()
,并且只要处理和写入是块中最昂贵的部分,你的步骤仍然可以比在单线程配置中快得多地完成。
# 平行步骤
只要需要并行化的应用程序逻辑可以划分为不同的职责,并分配给各个步骤,那么就可以在单个流程中进行并行化。并行步骤执行很容易配置和使用。
例如,与step3
并行执行(step1,step2)
的步骤是直接的,如以下示例所示:
<job id="job1">
<split id="split1" task-executor="taskExecutor" next="step4">
<flow>
<step id="step1" parent="s1" next="step2"/>
<step id="step2" parent="s2"/>
</flow>
<flow>
<step id="step3" parent="s3"/>
</flow>
</split>
<step id="step4" parent="s4"/>
</job>
<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>
当使用 Java 配置时,与(step1,step2)
并行执行步骤step3
是很简单的,如以下示例所示:
Java 配置
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(splitFlow())
.next(step4())
.build() //builds FlowJobBuilder instance
.build(); //builds Job instance
}
@Bean
public Flow splitFlow() {
return new FlowBuilder<SimpleFlow>("splitFlow")
.split(taskExecutor())
.add(flow1(), flow2())
.build();
}
@Bean
public Flow flow1() {
return new FlowBuilder<SimpleFlow>("flow1")
.start(step1())
.next(step2())
.build();
}
@Bean
public Flow flow2() {
return new FlowBuilder<SimpleFlow>("flow2")
.start(step3())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
return new SimpleAsyncTaskExecutor("spring_batch");
}
可配置任务执行器用于指定应该使用哪个TaskExecutor
实现来执行各个流。默认值是SyncTaskExecutor
,但是需要一个异步TaskExecutor
来并行运行这些步骤。请注意,该作业确保在聚合退出状态和转换之前,拆分中的每个流都已完成。
有关更多详细信息,请参见拆分流一节。
# 远程分块
在远程分块中,Step
处理被分割到多个进程中,通过一些中间件相互通信。下图显示了该模式:
图 1。远程分块
Manager 组件是一个单独的进程,工作人员是多个远程进程。如果 Manager 不是瓶颈,那么这种模式最有效,因此处理必须比读取项目更昂贵(在实践中通常是这种情况)。
Manager 是 Spring 批处理Step
的实现,其中ItemWriter
被一个通用版本代替,该版本知道如何将项目块作为消息发送到中间件。工人是正在使用的任何中间件的标准侦听器(例如,对于 JMS,他们将是MessageListener
实现),他们的角色是通过ItemWriter
或ItemProcessor
加上ItemWriter
接口使用标准的项块。使用这种模式的优点之一是读写器、处理器和写写器组件是现成的(与用于步骤的本地执行的组件相同)。这些项是动态划分的,工作是通过中间件共享的,因此,如果侦听器都是热心的消费者,那么负载平衡就是自动的。
中间件必须是持久的,保证交付,并且每条消息只有一个使用者。JMS 是显而易见的候选者,但在网格计算和共享内存产品空间中存在其他选项(例如 JavaSpace)。
有关更多详细信息,请参见Spring Batch Integration - Remote Chunking一节。
# 分区
Spring 批处理还提供了用于分区Step
执行并远程执行它的 SPI。在这种情况下,远程参与者是Step
实例,这些实例可以很容易地被配置并用于本地处理。下图显示了该模式:
图 2。划分
Job
作为Step
实例的序列在左侧运行,其中一个Step
实例被标记为管理器。这张图中的工人都是Step
的相同实例,它实际上可以代替经理,从而导致Job
的结果相同。工作人员通常是远程服务,但也可能是执行的本地线程。在此模式中,经理发送给工作人员的消息不需要是持久的,也不需要有保证的交付。 Spring JobRepository
中的批处理元数据确保每个工作者执行一次,并且对于每个Job
执行只执行一次。
Spring 批处理中的 SPI 由Step
(称为PartitionStep
)的特殊实现和需要为特定环境实现的两个策略接口组成。策略接口是PartitionHandler
和StepExecutionSplitter
,它们的作用在下面的序列图中显示:
图 3。分区 SPI
在这种情况下,右边的Step
是“远程”工作者,因此,潜在地,有许多对象和或进程在扮演这个角色,并且PartitionStep
被显示为驱动执行。
下面的示例显示了使用 XML 配置时的PartitionStep
配置:
<step id="step1.manager">
<partition step="step1" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</step>
下面的示例显示了使用 Java 配置时的PartitionStep
配置:
Java 配置
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.<String, String>partitioner("step1", partitioner())
.step(step1())
.gridSize(10)
.taskExecutor(taskExecutor())
.build();
}
与多线程步骤的throttle-limit
属性类似,grid-size
属性防止任务执行器被来自单个步骤的请求饱和。
有一个简单的示例,可以在Spring Batch Samples (opens new window)的单元测试套件中进行复制和扩展(参见partition*Job.xml
配置)。
Spring 批处理为被称为“Step1:Partition0”的分区创建步骤执行,以此类推。为了保持一致性,许多人更喜欢将 Manager 步骤称为“Step1:Manager”。你可以为步骤使用别名(通过指定name
属性而不是id
属性)。
# 分区处理程序
PartitionHandler
是了解远程或网格环境结构的组件。它能够将StepExecution
请求发送到远程Step
实例,并以某种特定于织物的格式包装,例如 DTO。它不需要知道如何分割输入数据或如何聚合多个Step
执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是织物的功能。在任何情况下, Spring 批处理总是提供独立于织物的重启性。失败的Job
总是可以重新启动,并且只重新执行失败的Steps
。
PartitionHandler
接口可以为各种结构类型提供专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java 空间、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。 Spring 批处理不包含用于任何专有网格或远程织物的实现方式。
Spring 然而,批处理确实提供了PartitionHandler
的一种有用的实现,该实现使用 Spring 中的TaskExecutor
策略,在单独的执行线程中本地执行Step
实例。该实现被称为TaskExecutorPartitionHandler
。
TaskExecutorPartitionHandler
是使用前面显示的 XML 名称空间进行配置的步骤的默认值。也可以显式地对其进行配置,如以下示例所示:
<step id="step1.manager">
<partition step="step1" handler="handler"/>
</step>
<bean class="org.spr...TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
TaskExecutorPartitionHandler
可以在 Java 配置中显式地进行配置,如以下示例所示:
Java 配置
@Bean
public Step step1Manager() {
return stepBuilderFactory.get("step1.manager")
.partitioner("step1", partitioner())
.partitionHandler(partitionHandler())
.build();
}
@Bean
public PartitionHandler partitionHandler() {
TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
retVal.setTaskExecutor(taskExecutor());
retVal.setStep(step1());
retVal.setGridSize(10);
return retVal;
}
gridSize
属性决定要创建的独立步骤执行的数量,因此它可以与TaskExecutor
中线程池的大小匹配。或者,可以将其设置为比可用的线程数量更大,这使得工作块更小。
TaskExecutorPartitionHandler
对于 IO 密集型Step
实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供Step
实现来用于远程执行,该实现是远程调用的代理(例如使用 Spring remoting)。
# 分割者
Partitioner
有一个更简单的职责:仅为新的步骤执行生成执行上下文作为输入参数(无需担心重新启动)。它只有一个方法,如下面的接口定义所示:
public interface Partitioner {
Map<String, ExecutionContext> partition(int gridSize);
}
这个方法的返回值将每个步骤执行的唯一名称(String
)与输入参数(ExecutionContext
)以ExecutionContext
的形式关联起来。这些名称稍后会在批处理元数据中显示为分区StepExecutions
中的步骤名称。ExecutionContext
只是一组名称-值对,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程Step
通常使用#{…}
占位符(在步骤作用域中的后期绑定)绑定到上下文输入,如下一节所示。
步骤执行的名称(由Partitioner
返回的Map
中的键)需要在Job
的步骤执行中是唯一的,但没有任何其他特定的要求。要做到这一点(并使名称对用户有意义),最简单的方法是使用前缀 + 后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在Job
中是唯一的),后缀只是一个计数器。在使用该约定的框架中有一个SimplePartitioner
。
可以使用一个名为PartitionNameProvider
的可选接口来提供与分区本身分开的分区名称。如果Partitioner
实现了这个接口,那么在重新启动时,只会查询名称。如果分区是昂贵的,这可以是一个有用的优化。由PartitionNameProvider
提供的名称必须与Partitioner
提供的名称匹配。
# 将输入数据绑定到步骤
由PartitionHandler
执行的步骤具有相同的配置,并且它们的输入参数在运行时从ExecutionContext
绑定,这是非常有效的。 Spring 批处理的 StepScope 特性很容易做到这一点(在后期绑定一节中更详细地介绍)。例如,如果Partitioner
使用一个名为fileName
的属性键创建ExecutionContext
实例,并针对每个步骤调用指向不同的文件(或目录),则Partitioner
输出可能类似于下表的内容:
步骤执行名称(键) | ExecutionContext (value) |
---|---|
filecopy:分区 0 | fileName=/home/data/one |
filecopy:partition1 | fileName=/home/data/two |
filecopy:partition2 | fileName=/home/data/three |
然后,可以使用与执行上下文的后期绑定将文件名绑定到一个步骤。
下面的示例展示了如何在 XML 中定义后期绑定:
XML 配置
<bean id="itemReader" scope="step"
class="org.spr...MultiResourceItemReader">
<property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>
下面的示例展示了如何在 Java 中定义后期绑定:
Java 配置
@Bean
public MultiResourceItemReader itemReader(
@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
return new MultiResourceItemReaderBuilder<String>()
.delegate(fileReader())
.name("itemReader")
.resources(resources)
.build();
}