# 常见的批处理模式

# 常见的批处理模式

XMLJavaBoth

一些批处理作业可以完全由 Spring 批处理中现成的组件组装而成。例如,ItemReaderItemWriter实现可以被配置为覆盖广泛的场景。然而,在大多数情况下,必须编写自定义代码。应用程序开发人员的主要 API 入口点是TaskletItemReaderItemWriter和各种侦听器接口。大多数简单的批处理作业可以使用 Spring 批处理中的现成输入ItemReader,但是在处理和编写过程中通常存在需要开发人员实现ItemWriterItemProcessor的定制问题。

在这一章中,我们提供了几个自定义业务逻辑中常见模式的示例。这些示例主要以侦听器接口为特征。应该注意的是,如果合适的话,ItemReaderItemWriter也可以实现侦听器接口。

# 记录项目处理和失败

一个常见的用例是需要在一个步骤中对错误进行特殊处理,逐项处理,可能是登录到一个特殊的通道,或者将一条记录插入到数据库中。面向块的Step(从 Step Factory Bean 创建)允许用户实现这个用例,它使用一个简单的ItemReadListener表示read上的错误,使用一个ItemWriteListener表示write上的错误。以下代码片段演示了记录读写失败的侦听器:

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

在实现了这个侦听器之后,必须用一个步骤对其进行注册。

下面的示例展示了如何用 XML 中的一个步骤注册侦听器:

XML 配置

<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>

下面的示例展示了如何使用 STEP Java 注册侦听器:

Java 配置

@Bean
public Step simpleStep() {
	return this.stepBuilderFactory.get("simpleStep")
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}
如果你的侦听器在onError()方法中执行任何操作,则它必须位于
将被回滚的事务中。如果需要在onError()方法中使用事务性
资源,例如数据库,请考虑向该方法添加声明性
事务(有关详细信息,请参见 Spring Core Reference Guide),并给其
传播属性一个值REQUIRES_NEW

# 由于业务原因手动停止作业

Spring Batch 通过JobOperator接口提供了stop()方法,但这实际上是供操作员而不是应用程序程序员使用的。有时,从业务逻辑中停止作业执行更方便或更有意义。

最简单的方法是抛出RuntimeException(这种方法既不会无限期地重试,也不会被跳过)。例如,可以使用自定义异常类型,如下例所示:

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

另一种停止执行步骤的简单方法是从ItemReader返回null,如以下示例所示:

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前面的示例实际上依赖于这样一个事实,即存在CompletionPolicy策略的默认实现,当要处理的项是null时,该策略发出一个完整批处理的信号。可以实现一个更复杂的完成策略,并通过SimpleStepFactoryBean注入Step

下面的示例展示了如何在 XML 中的一个步骤中注入一个完成策略:

XML 配置

<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

下面的示例展示了如何在 Java 中的一个步骤中注入一个完成策略:

Java 配置

@Bean
public Step simpleStep() {
	return this.stepBuilderFactory.get("simpleStep")
				.<String, String>chunk(new SpecialCompletionPolicy())
				.reader(reader())
				.writer(writer())
				.build();
}

一种替代方法是在StepExecution中设置一个标志,这是由Step实现在框架中检查项之间的处理。要实现此替代方案,我们需要访问当前的StepExecution,这可以通过实现StepListener并将其注册到Step来实现。下面的示例展示了一个设置标志的侦听器:

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

设置标志时,默认的行为是抛出JobInterruptedException。这种行为可以通过StepInterruptionPolicy来控制。然而,唯一的选择是抛出或不抛出异常,因此这始终是工作的异常结束。

# 添加页脚记录

通常,当写入平面文件时,在所有处理完成后,必须在文件的末尾附加一个“页脚”记录。这可以使用由 Spring 批提供的FlatFileFooterCallback接口来实现。FlatFileFooterCallback(及其对应的FlatFileHeaderCallback)是FlatFileItemWriter的可选属性,可以添加到项编写器中。

下面的示例展示了如何在 XML 中使用FlatFileHeaderCallbackFlatFileFooterCallback:

XML 配置

<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

下面的示例展示了如何在 Java 中使用FlatFileHeaderCallbackFlatFileFooterCallback:

Java 配置

@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

页脚回调接口只有一个方法,在必须写入页脚时调用该方法,如以下接口定义所示:

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

# 编写摘要页脚

涉及页脚记录的一个常见要求是在输出过程中聚合信息,并将这些信息附加到文件的末尾。这个页脚通常用作文件的摘要或提供校验和。

例如,如果一个批处理作业正在将Trade记录写入一个平面文件,并且要求将所有Trades的总量放入一个页脚中,那么可以使用以下ItemWriter实现:

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(List<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

这个TradeItemWriter存储了一个totalAmount值,该值随着从每个Trade条目中写入的amount而增加。在处理最后一个Trade之后,框架调用writeFooter,这将totalAmount放入文件。请注意,write方法使用了一个临时变量chunkTotal,该变量存储了块中Trade数量的总和。这样做是为了确保,如果在write方法中发生跳过,totalAmount保持不变。只有在write方法结束时,在保证不抛出异常之后,我们才更新totalAmount

为了调用writeFooter方法,TradeItemWriter(它实现FlatFileFooterCallback)必须连接到FlatFileItemWriter中,作为footerCallback

下面的示例展示了如何在 XML 中连接TradeItemWriter:

XML 配置

<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

下面的示例展示了如何在 Java 中连接TradeItemWriter:

Java 配置

@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

到目前为止,只有当Step不可重启时,TradeItemWriter的写入方式才能正确地执行。这是因为类是有状态的(因为它存储totalAmount),但是totalAmount不会持久化到数据库中。因此,在重新启动的情况下无法检索到它。为了使这个类重新启动,ItemStream接口应该与openupdate方法一起实现,如下面的示例所示:

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

更新方法将最新版本的totalAmount存储到ExecutionContext,就在该对象持久化到数据库之前。open 方法从ExecutionContext中检索任何已存在的totalAmount,并将其用作处理的起点,从而允许TradeItemWriter在重新启动时在上次运行Step时未启动的地方进行拾取。

# 基于项目阅读器的驾驶查询

关于读者和作家的章节中,讨论了利用分页进行数据库输入的问题。许多数据库供应商(例如 DB2)都有非常悲观的锁定策略,如果正在读取的表也需要由在线应用程序的其他部分使用,这些策略可能会导致问题。此外,在非常大的数据集上打开游标可能会导致某些供应商的数据库出现问题。因此,许多项目更喜欢使用“驱动查询”方法来读取数据。这种方法的工作原理是对键进行迭代,而不是对需要返回的整个对象进行迭代,如下图所示:

驾驶查询工作

图 1。驾驶查询工作

正如你所看到的,前面图片中显示的示例使用了与基于游标的示例中使用的相同的“foo”表。但是,在 SQL 语句中只选择了 ID,而不是选择整行。因此,不是从read返回FOO对象,而是返回Integer对象。然后可以使用这个数字来查询“details”,这是一个完整的Foo对象,如下图所示:

驱动查询示例

图 2。驱动查询示例

应该使用ItemProcessor将从驱动查询中获得的键转换为完整的Foo对象。现有的 DAO 可以用于基于该键查询完整的对象。

# 多行记录

虽然平面文件的情况通常是,每个记录都被限制在单行中,但一个文件的记录可能跨越多行,并具有多种格式,这是很常见的。下面摘自一个文件,展示了这种安排的一个例子:

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

以“hea”开头的行和以“fot”开头的行之间的所有内容都被视为一条记录。为了正确处理这种情况,必须考虑以下几点:

  • 而不是一次读取一条记录,ItemReader必须将多行记录的每一行作为一个组来读取,以便它可以完整地传递给ItemWriter

  • 每一种行类型可能需要以不同的方式进行标记。

由于单个记录跨越多行,并且我们可能不知道有多少行,因此ItemReader必须小心,以始终读取整个记录。为了做到这一点,应该将自定义ItemReader实现为FlatFileItemReader的包装器。

下面的示例展示了如何在 XML 中实现自定义ItemReader:

XML 配置

<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

下面的示例展示了如何在 Java 中实现自定义ItemReader:

Java 配置

@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

为了确保每一行都被正确地标记,这对于固定长度的输入尤其重要,PatternMatchingCompositeLineTokenizer可以在委托FlatFileItemReader上使用。有关更多详细信息,请参见[FlatFileItemReader中的 Readers and Writers 章节]。然后,委托读取器使用PassThroughFieldSetMapper将每一行的FieldSet传递到包装ItemReader

下面的示例展示了如何确保每一行都正确地在 XML 中进行了标记:

XML 内容

<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

下面的示例展示了如何确保每一行都在 Java 中被正确地标记:

Java 内容

@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

这个包装器必须能够识别记录的结尾,以便它可以在其委托上连续调用read(),直到达到结尾。对于读取的每一行,包装器应该构建要返回的项。一旦到达页脚,就可以将项目返回以交付给ItemProcessorItemWriter,如以下示例所示:

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

# 执行系统命令

许多批处理作业要求从批处理作业中调用外部命令。这样的进程可以由调度器单独启动,但是有关运行的公共元数据的优势将会丧失。此外,一个多步骤的工作也需要被分解成多个工作。

因为这种需求是如此普遍, Spring Batch 提供了用于调用系统命令的Tasklet实现。

下面的示例展示了如何调用 XML 中的外部命令:

XML 配置

<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

下面的示例展示了如何在 Java 中调用外部命令:

Java 配置

@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

# 未找到输入时的处理步骤完成

在许多批处理场景中,在数据库或文件中找不到要处理的行并不是例外情况。将Step简单地视为未找到工作,并在读取 0 项的情况下完成。所有的ItemReader实现都是在 Spring 批处理中提供的,默认为这种方法。如果即使存在输入,也没有写出任何内容,这可能会导致一些混乱(如果文件被错误命名或出现类似问题,通常会发生这种情况)。因此,应该检查元数据本身,以确定框架需要处理多少工作。然而,如果发现没有输入被认为是例外情况怎么办?在这种情况下,最好的解决方案是通过编程方式检查元数据,以确保未处理任何项目并导致失败。因为这是一个常见的用例, Spring Batch 提供了一个具有这种功能的侦听器,如NoWorkFoundStepExecutionListener的类定义所示:

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

前面的StepExecutionListener在“afterstep”阶段检查StepExecutionreadCount属性,以确定是否没有读取任何项。如果是这种情况,将返回一个退出代码FAILED,表示Step应该失败。否则,将返回null,这不会影响Step的状态。

# 将数据传递给未来的步骤

将信息从一个步骤传递到另一个步骤通常是有用的。这可以通过ExecutionContext来完成。问题是有两个ExecutionContexts:一个在Step水平,一个在Job水平。Step``ExecutionContext只保留到步骤的长度,而Job``ExecutionContext则保留到整个Job。另一方面,Step``ExecutionContext每次Step提交一个块时都会更新Job``ExecutionContext,而Step只在每个Step的末尾更新。

这种分离的结果是,当Step执行时,所有数据都必须放在Step``ExecutionContext中。这样做可以确保在Step运行时正确地存储数据。如果数据被存储到Job``ExecutionContext,那么在Step执行期间它不会被持久化。如果Step失败,则该数据丢失。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(List<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

要使将来Steps可以使用该数据,必须在步骤完成后将其“提升”到Job``ExecutionContext。 Spring Batch 为此提供了ExecutionContextPromotionListener。侦听器必须配置与必须提升的ExecutionContext中的数据相关的键。它还可以配置一个退出代码模式列表(COMPLETED是默认的)。与所有侦听器一样,它必须在Step上注册。

下面的示例展示了如何在 XML 中将一个步骤提升到Job``ExecutionContext:

XML 配置

<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

下面的示例展示了如何在 Java 中将一个步骤提升到Job``ExecutionContext:

Java 配置

@Bean
public Job job1() {
	return this.jobBuilderFactory.get("job1")
				.start(step1())
				.next(step1())
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

最后,必须从Job``ExecutionContext中检索保存的值,如下例所示:

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(List<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}