# 缩放和并行处理

# 缩放和并行处理

XMLJavaBoth

许多批处理问题可以通过单线程、单流程作业来解决,因此在考虑更复杂的实现之前,正确地检查它是否满足你的需求始终是一个好主意。衡量一项实际工作的性能,看看最简单的实现是否首先满足你的需求。即使使用标准的硬件,你也可以在一分钟内读写几百兆的文件。

Spring 当你准备好开始用一些并行处理来实现一个作业时, Spring Batch 提供了一系列选项,这些选项在本章中进行了描述,尽管其他地方也介绍了一些特性。在高层次上,有两种并行处理模式:

  • 单过程、多线程

  • 多进程

这些指标也可分为以下几类:

  • 多线程步骤(单进程)

  • 并行步骤(单一过程)

  • 步骤的远程分块(多进程)

  • 划分一个步骤(单个或多个进程)

首先,我们回顾一下单流程选项。然后,我们回顾了多进程的选择。

# 多线程步骤

启动并行处理的最简单方法是在步骤配置中添加TaskExecutor

例如,你可以添加tasklet的一个属性,如下所示:

<step id="loading">
    <tasklet task-executor="taskExecutor">...</tasklet>
</step>

当使用 Java 配置时,可以将TaskExecutor添加到该步骤中,如以下示例所示:

Java 配置

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
	return this.stepBuilderFactory.get("sampleStep")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.build();
}

在此示例中,taskExecutor是对另一个 Bean 定义的引用,该定义实现了TaskExecutor接口。[TaskExecutor](https://DOCS. Spring.io/ Spring/DOCS/current/javadoc-api/org/springframework/core/core/task/taskexecutor.html)是一个标准的 Spring 接口,因此请参阅 Spring 用户指南以获得可用实现的详细信息。最简单的多线程TaskExecutorSimpleAsyncTaskExecutor

上述配置的结果是,Step通过在单独的执行线程中读取、处理和写入每个项块(每个提交间隔)来执行。请注意,这意味着要处理的项没有固定的顺序,并且块可能包含与单线程情况相比非连续的项。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,Tasklet 配置中还有一个油门限制,默认为 4。你可能需要增加这一点,以确保线程池得到充分利用。

例如,你可能会增加油门限制,如以下示例所示:

<step id="loading"> <tasklet
    task-executor="taskExecutor"
    throttle-limit="20">...</tasklet>
</step>

在使用 Java 配置时,构建器提供对油门限制的访问,如以下示例所示:

Java 配置

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
	return this.stepBuilderFactory.get("sampleStep")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.taskExecutor(taskExecutor)
				.throttleLimit(20)
				.build();
}

还请注意,在你的步骤中使用的任何池资源都可能对并发性施加限制,例如DataSource。确保这些资源中的池至少与步骤中所需的并发线程数量一样大。

对于一些常见的批处理用例,使用多线程Step实现有一些实际的限制。Step中的许多参与者(例如读者和作者)是有状态的。如果状态不是由线程隔离的,那么这些组件在多线程Step中是不可用的。特别是, Spring 批中的大多数现成的读取器和编写器都不是为多线程使用而设计的。然而,可以使用无状态的或线程安全的读取器和编写器,并且在Spring Batch Samples (opens new window)中有一个示例(称为parallelJob),该示例显示了使用过程指示器(参见防止状态持久性)来跟踪在数据库输入表中已处理的项。

Spring 批处理提供了ItemWriterItemReader的一些实现方式。通常,他们会在 Javadoc 中说明它们是否是线程安全的,或者你必须做什么来避免在并发环境中出现问题。如果 Javadoc 中没有信息,则可以检查实现,以查看是否存在任何状态。如果阅读器不是线程安全的,那么你可以使用提供的SynchronizedItemStreamReader来装饰它,或者在你自己的同步委托程序中使用它。你可以将调用同步到read(),并且只要处理和写入是块中最昂贵的部分,你的步骤仍然可以比在单线程配置中快得多地完成。

# 平行步骤

只要需要并行化的应用程序逻辑可以划分为不同的职责,并分配给各个步骤,那么就可以在单个流程中进行并行化。并行步骤执行很容易配置和使用。

例如,与step3并行执行(step1,step2)的步骤是直接的,如以下示例所示:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

当使用 Java 配置时,与(step1,step2)并行执行步骤step3是很简单的,如以下示例所示:

Java 配置

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

可配置任务执行器用于指定应该使用哪个TaskExecutor实现来执行各个流。默认值是SyncTaskExecutor,但是需要一个异步TaskExecutor来并行运行这些步骤。请注意,该作业确保在聚合退出状态和转换之前,拆分中的每个流都已完成。

有关更多详细信息,请参见拆分流一节。

# 远程分块

在远程分块中,Step处理被分割到多个进程中,通过一些中间件相互通信。下图显示了该模式:

远程分块

图 1。远程分块

Manager 组件是一个单独的进程,工作人员是多个远程进程。如果 Manager 不是瓶颈,那么这种模式最有效,因此处理必须比读取项目更昂贵(在实践中通常是这种情况)。

Manager 是 Spring 批处理Step的实现,其中ItemWriter被一个通用版本代替,该版本知道如何将项目块作为消息发送到中间件。工人是正在使用的任何中间件的标准侦听器(例如,对于 JMS,他们将是MessageListener实现),他们的角色是通过ItemWriterItemProcessor加上ItemWriter接口使用标准的项块。使用这种模式的优点之一是读写器、处理器和写写器组件是现成的(与用于步骤的本地执行的组件相同)。这些项是动态划分的,工作是通过中间件共享的,因此,如果侦听器都是热心的消费者,那么负载平衡就是自动的。

中间件必须是持久的,保证交付,并且每条消息只有一个使用者。JMS 是显而易见的候选者,但在网格计算和共享内存产品空间中存在其他选项(例如 JavaSpace)。

有关更多详细信息,请参见Spring Batch Integration - Remote Chunking一节。

# 分区

Spring 批处理还提供了用于分区Step执行并远程执行它的 SPI。在这种情况下,远程参与者是Step实例,这些实例可以很容易地被配置并用于本地处理。下图显示了该模式:

分区概述

图 2。划分

Job作为Step实例的序列在左侧运行,其中一个Step实例被标记为管理器。这张图中的工人都是Step的相同实例,它实际上可以代替经理,从而导致Job的结果相同。工作人员通常是远程服务,但也可能是执行的本地线程。在此模式中,经理发送给工作人员的消息不需要是持久的,也不需要有保证的交付。 Spring JobRepository中的批处理元数据确保每个工作者执行一次,并且对于每个Job执行只执行一次。

Spring 批处理中的 SPI 由Step(称为PartitionStep)的特殊实现和需要为特定环境实现的两个策略接口组成。策略接口是PartitionHandlerStepExecutionSplitter,它们的作用在下面的序列图中显示:

分区 SPI

图 3。分区 SPI

在这种情况下,右边的Step是“远程”工作者,因此,潜在地,有许多对象和或进程在扮演这个角色,并且PartitionStep被显示为驱动执行。

下面的示例显示了使用 XML 配置时的PartitionStep配置:

<step id="step1.manager">
    <partition step="step1" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</step>

下面的示例显示了使用 Java 配置时的PartitionStep配置:

Java 配置

@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
        .<String, String>partitioner("step1", partitioner())
        .step(step1())
        .gridSize(10)
        .taskExecutor(taskExecutor())
        .build();
}

与多线程步骤的throttle-limit属性类似,grid-size属性防止任务执行器被来自单个步骤的请求饱和。

有一个简单的示例,可以在Spring Batch Samples (opens new window)的单元测试套件中进行复制和扩展(参见partition*Job.xml配置)。

Spring 批处理为被称为“Step1:Partition0”的分区创建步骤执行,以此类推。为了保持一致性,许多人更喜欢将 Manager 步骤称为“Step1:Manager”。你可以为步骤使用别名(通过指定name属性而不是id属性)。

# 分区处理程序

PartitionHandler是了解远程或网格环境结构的组件。它能够将StepExecution请求发送到远程Step实例,并以某种特定于织物的格式包装,例如 DTO。它不需要知道如何分割输入数据或如何聚合多个Step执行的结果。一般来说,它可能也不需要了解弹性或故障转移,因为在许多情况下,这些都是织物的功能。在任何情况下, Spring 批处理总是提供独立于织物的重启性。失败的Job总是可以重新启动,并且只重新执行失败的Steps

PartitionHandler接口可以为各种结构类型提供专门的实现,包括简单的 RMI 远程处理、EJB 远程处理、自定义 Web 服务、JMS、Java 空间、共享内存网格(如 Terracotta 或 Coherence)和网格执行结构(如 GridGain)。 Spring 批处理不包含用于任何专有网格或远程织物的实现方式。

Spring 然而,批处理确实提供了PartitionHandler的一种有用的实现,该实现使用 Spring 中的TaskExecutor策略,在单独的执行线程中本地执行Step实例。该实现被称为TaskExecutorPartitionHandler

TaskExecutorPartitionHandler是使用前面显示的 XML 名称空间进行配置的步骤的默认值。也可以显式地对其进行配置,如以下示例所示:

<step id="step1.manager">
    <partition step="step1" handler="handler"/>
</step>

<bean class="org.spr...TaskExecutorPartitionHandler">
    <property name="taskExecutor" ref="taskExecutor"/>
    <property name="step" ref="step1" />
    <property name="gridSize" value="10" />
</bean>

TaskExecutorPartitionHandler可以在 Java 配置中显式地进行配置,如以下示例所示:

Java 配置

@Bean
public Step step1Manager() {
    return stepBuilderFactory.get("step1.manager")
        .partitioner("step1", partitioner())
        .partitionHandler(partitionHandler())
        .build();
}

@Bean
public PartitionHandler partitionHandler() {
    TaskExecutorPartitionHandler retVal = new TaskExecutorPartitionHandler();
    retVal.setTaskExecutor(taskExecutor());
    retVal.setStep(step1());
    retVal.setGridSize(10);
    return retVal;
}

gridSize属性决定要创建的独立步骤执行的数量,因此它可以与TaskExecutor中线程池的大小匹配。或者,可以将其设置为比可用的线程数量更大,这使得工作块更小。

TaskExecutorPartitionHandler对于 IO 密集型Step实例很有用,例如复制大量文件或将文件系统复制到内容管理系统中。它还可以通过提供Step实现来用于远程执行,该实现是远程调用的代理(例如使用 Spring remoting)。

# 分割者

Partitioner有一个更简单的职责:仅为新的步骤执行生成执行上下文作为输入参数(无需担心重新启动)。它只有一个方法,如下面的接口定义所示:

public interface Partitioner {
    Map<String, ExecutionContext> partition(int gridSize);
}

这个方法的返回值将每个步骤执行的唯一名称(String)与输入参数(ExecutionContext)以ExecutionContext的形式关联起来。这些名称稍后会在批处理元数据中显示为分区StepExecutions中的步骤名称。ExecutionContext只是一组名称-值对,因此它可能包含一系列主键、行号或输入文件的位置。然后,远程Step通常使用#{…​}占位符(在步骤作用域中的后期绑定)绑定到上下文输入,如下一节所示。

步骤执行的名称(由Partitioner返回的Map中的键)需要在Job的步骤执行中是唯一的,但没有任何其他特定的要求。要做到这一点(并使名称对用户有意义),最简单的方法是使用前缀 + 后缀命名约定,其中前缀是正在执行的步骤的名称(它本身在Job中是唯一的),后缀只是一个计数器。在使用该约定的框架中有一个SimplePartitioner

可以使用一个名为PartitionNameProvider的可选接口来提供与分区本身分开的分区名称。如果Partitioner实现了这个接口,那么在重新启动时,只会查询名称。如果分区是昂贵的,这可以是一个有用的优化。由PartitionNameProvider提供的名称必须与Partitioner提供的名称匹配。

# 将输入数据绑定到步骤

PartitionHandler执行的步骤具有相同的配置,并且它们的输入参数在运行时从ExecutionContext绑定,这是非常有效的。 Spring 批处理的 StepScope 特性很容易做到这一点(在后期绑定一节中更详细地介绍)。例如,如果Partitioner使用一个名为fileName的属性键创建ExecutionContext实例,并针对每个步骤调用指向不同的文件(或目录),则Partitioner输出可能类似于下表的内容:

步骤执行名称(键) ExecutionContext (value)
filecopy:分区 0 fileName=/home/data/one
filecopy:partition1 fileName=/home/data/two
filecopy:partition2 fileName=/home/data/three

然后,可以使用与执行上下文的后期绑定将文件名绑定到一个步骤。

下面的示例展示了如何在 XML 中定义后期绑定:

XML 配置

<bean id="itemReader" scope="step"
      class="org.spr...MultiResourceItemReader">
    <property name="resources" value="#{stepExecutionContext[fileName]}/*"/>
</bean>

下面的示例展示了如何在 Java 中定义后期绑定:

Java 配置

@Bean
public MultiResourceItemReader itemReader(
	@Value("#{stepExecutionContext['fileName']}/*") Resource [] resources) {
	return new MultiResourceItemReaderBuilder<String>()
			.delegate(fileReader())
			.name("itemReader")
			.resources(resources)
			.build();
}