# 项目阅读器和项目编写器
# 条目阅读器和条目编写器
XMLJavaBoth
所有批处理都可以用最简单的形式描述为读取大量数据,执行某种类型的计算或转换,并将结果写出来。 Spring Batch 提供了三个关键接口来帮助执行大容量读写:ItemReader
、ItemProcessor
和ItemWriter
。
# ItemReader
虽然是一个简单的概念,但ItemReader
是从许多不同类型的输入提供数据的手段。最常见的例子包括:
平面文件:平面文件项读取器从平面文件中读取数据行,该文件通常用文件中固定位置定义的数据字段或用某些特殊字符(例如逗号)分隔的数据字段来描述记录。
XML:XML
ItemReaders
独立于用于解析、映射和验证对象的技术来处理 XML。输入数据允许根据 XSD 模式验证 XML 文件。数据库:访问数据库资源以返回结果集,这些结果集可以映射到对象以进行处理。默认的 SQL
ItemReader
实现调用RowMapper
以返回对象,如果需要重新启动,则跟踪当前行,存储基本统计信息,并提供一些事务增强,稍后将对此进行说明。
还有更多的可能性,但我们将重点放在本章的基本可能性上。在Appendix A中可以找到所有可用ItemReader
实现的完整列表。
ItemReader
是用于通用输入操作的基本接口,如以下接口定义所示:
public interface ItemReader<T> {
T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}
read
方法定义了ItemReader
中最基本的契约。调用它将返回一个项,如果没有更多项,则返回null
。项目可以表示文件中的行、数据库中的行或 XML 文件中的元素。通常预期这些被映射到一个可用的域对象(例如Trade
,Foo
,或其他),但是在契约中没有这样做的要求。
预计ItemReader
接口的实现方式仅是前向的。但是,如果底层资源是事务性的(例如 JMS 队列),那么在回滚场景中,调用read
可能会在随后的调用中返回相同的逻辑项。还值得注意的是,缺少由ItemReader
处理的项并不会导致抛出异常。例如,配置了返回 0 结果的查询的数据库ItemReader
在read
的第一次调用时返回null
。
# ItemWriter
ItemWriter
在功能上类似于ItemReader
,但具有反向操作。资源仍然需要定位、打开和关闭,但它们的不同之处在于ItemWriter
写出,而不是读入。在数据库或队列的情况下,这些操作可以是插入、更新或发送。输出的序列化的格式是特定于每个批处理作业的。
与ItemReader
一样,ItemWriter
是一个相当通用的接口,如下面的接口定义所示:
public interface ItemWriter<T> {
void write(List<? extends T> items) throws Exception;
}
与read
上的ItemReader
一样,write
提供了ItemWriter
的基本契约。它尝试写出传入的项目列表,只要它是打开的。由于通常期望将项目“批处理”到一个块中,然后输出,因此接口接受一个项目列表,而不是一个项目本身。在写出列表之后,可以在从写方法返回之前执行任何必要的刷新。例如,如果对 Hibernate DAO 进行写操作,则可以对每个项进行多个 write 调用。然后,写入器可以在返回之前调用 Hibernate 会话上的flush
。
# ItemStream
ItemReaders
和ItemWriters
都很好地服务于它们各自的目的,但是它们之间有一个共同的关注点,那就是需要另一个接口。通常,作为批处理作业范围的一部分,读取器和编写器需要被打开、关闭,并且需要一种机制来保持状态。ItemStream
接口实现了这一目的,如下例所示:
public interface ItemStream {
void open(ExecutionContext executionContext) throws ItemStreamException;
void update(ExecutionContext executionContext) throws ItemStreamException;
void close() throws ItemStreamException;
}
在描述每个方法之前,我们应该提到ExecutionContext
。如果ItemReader
的客户端也实现ItemStream
,则在调用read
之前,应该调用open
,以便打开任何资源,例如文件或获得连接。类似的限制适用于实现ItemStream
的ItemWriter
。正如在第 2 章中提到的,如果在ExecutionContext
中找到了预期的数据,则可以使用它在其初始状态以外的位置启动ItemReader
或ItemWriter
。相反,调用close
是为了确保在打开期间分配的任何资源都被安全地释放。调用update
主要是为了确保当前持有的任何状态都被加载到所提供的ExecutionContext
中。在提交之前调用此方法,以确保在提交之前将当前状态持久化到数据库中。
在ItemStream
的客户端是Step
(来自 Spring 批处理核心)的特殊情况下,将为每个分步执行创建一个ExecutionContext
,以允许用户存储特定执行的状态,期望在再次启动相同的JobInstance
时返回。对于那些熟悉 Quartz 的人,其语义非常类似于 QuartzJobDataMap
。
# 委托模式并与步骤一起注册
请注意,CompositeItemWriter
是委托模式的一个示例,这在 Spring 批处理中很常见。委托本身可能实现回调接口,例如StepListener
。如果它们确实存在,并且如果它们是作为Job
中的Step
的一部分与 Spring 批处理核心一起使用的,那么几乎肯定需要用Step
手动注册它们。直接连接到Step
的读取器、编写器或处理器如果实现ItemStream
或StepListener
接口,就会自动注册。但是,由于委托不为Step
所知,因此需要将它们作为侦听器或流注入(或者在适当的情况下将两者都注入)。
下面的示例展示了如何将委托作为流注入到 XML 中:
XML 配置
<job id="ioSampleJob">
<step name="step1">
<tasklet>
<chunk reader="fooReader" processor="fooProcessor" writer="compositeItemWriter"
commit-interval="2">
<streams>
<stream ref="barWriter" />
</streams>
</chunk>
</tasklet>
</step>
</job>
<bean id="compositeItemWriter" class="...CustomCompositeItemWriter">
<property name="delegate" ref="barWriter" />
</bean>
<bean id="barWriter" class="...BarWriter" />
下面的示例展示了如何将委托作为流注入到 XML 中:
Java 配置
@Bean
public Job ioSampleJob() {
return this.jobBuilderFactory.get("ioSampleJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return this.stepBuilderFactory.get("step1")
.<String, String>chunk(2)
.reader(fooReader())
.processor(fooProcessor())
.writer(compositeItemWriter())
.stream(barWriter())
.build();
}
@Bean
public CustomCompositeItemWriter compositeItemWriter() {
CustomCompositeItemWriter writer = new CustomCompositeItemWriter();
writer.setDelegate(barWriter());
return writer;
}
@Bean
public BarWriter barWriter() {
return new BarWriter();
}
# 平面文件
交换大容量数据的最常见机制之一一直是平面文件。与 XML 不同的是,XML 有一个一致的标准来定义它是如何结构化的(XSD),任何读取平面文件的人都必须提前确切地了解文件是如何结构化的。一般来说,所有的平面文件都分为两种类型:定长和定长。分隔符文件是那些字段被分隔符(如逗号)分隔的文件。固定长度文件的字段是固定长度的。
# theFieldSet
在处理 Spring 批处理中的平面文件时,无论它是用于输入还是输出,最重要的类之一是FieldSet
。许多体系结构和库包含帮助你从文件中读取的抽象,但它们通常返回String
或String
对象的数组。这真的只会让你走到一半。FieldSet
是 Spring 批处理的抽象,用于从文件资源中绑定字段。它允许开发人员以与处理数据库输入大致相同的方式处理文件输入。aFieldSet
在概念上类似于 jdbcResultSet
。FieldSet
只需要一个参数:一个String
令牌数组。还可以选择地配置字段的名称,以便可以按照ResultSet
之后的模式通过索引或名称访问字段,如以下示例所示:
String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);
在FieldSet
接口上还有许多选项,例如Date
、long、BigDecimal
,等等。FieldSet
的最大优点是它提供了对平面文件输入的一致解析。在处理由格式异常引起的错误或进行简单的数据转换时,它可以是一致的,而不是以潜在的意外方式对每个批处理作业进行不同的解析。
# FlatFileItemReader
平面文件是最多包含二维(表格)数据的任何类型的文件。 Spring 批处理框架中的平面文件的读取是由一个名为FlatFileItemReader
的类提供的,该类为平面文件的读取和解析提供了基本功能。FlatFileItemReader
的两个最重要的必需依赖项是Resource
和LineMapper
。LineMapper
接口将在下一节中进行更多的探讨。资源属性表示 Spring 核心Resource
。说明如何创建这种类型的 bean 的文档可以在Spring Framework, Chapter 5. Resources (opens new window)中找到。因此,除了展示下面的简单示例之外,本指南不涉及创建Resource
对象的细节:
Resource resource = new FileSystemResource("resources/trades.csv");
在复杂的批处理环境中,目录结构通常由 Enterprise 应用程序集成基础设施管理,在该基础设施中,外部接口的下拉区被建立,用于将文件从 FTP 位置移动到批处理位置,反之亦然。文件移动实用程序超出了 Spring 批处理体系结构的范围,但是批处理作业流将文件移动实用程序作为步骤包含在作业流中并不少见。批处理架构只需要知道如何定位要处理的文件。 Spring 批处理开始从该起点将数据送入管道的过程。然而,Spring Integration (opens new window)提供了许多这类服务。
FlatFileItemReader
中的其他属性允许你进一步指定如何解释数据,如下表所示:
Property | Type | 说明 |
---|---|---|
comments | String[] | 指定表示注释行的行前缀。 |
encoding | String | 指定要使用的文本编码。默认值是Charset.defaultCharset() 。 |
lineMapper | LineMapper | 将表示项的String 转换为Object 。 |
linesToSkip | int | 文件顶部要忽略的行数。 |
recordSeparatorPolicy | RecordSeparatorPolicy | 用于确定行尾的位置 ,并执行类似于在引号字符串中的行尾上继续的操作。 |
resource | Resource | 可供阅读的资源。 |
skippedLinesCallback | LineCallbackHandler | 传递 中要跳过的文件行的原始行内容的接口。如果 linesToSkip 被设置为 2,那么这个接口被调用了两次。 |
strict | boolean | 在严格模式下,如果输入资源不存在 ,读取器将在 ExecutionContext 上抛出异常。否则,它会记录问题并继续处理。 |
# LineMapper
与RowMapper
一样,它接受一个低层次的构造,例如ResultSet
并返回一个Object
,平面文件处理需要相同的构造来将String
行转换为Object
,如以下接口定义所示:
public interface LineMapper<T> {
T mapLine(String line, int lineNumber) throws Exception;
}
基本的约定是,给定当前行和与其相关联的行号,映射器应该返回一个结果域对象。这类似于RowMapper
,因为每一行都与其行号关联,就像ResultSet
中的每一行都与其行号关联一样。这允许将行号绑定到结果域对象,以进行身份比较或进行更有信息量的日志记录。然而,与RowMapper
不同的是,LineMapper
给出的是一条未加工的线,正如上面讨论的那样,这条线只能让你达到一半。该行必须标记为FieldSet
,然后可以映射到对象,如本文档后面所述。
# LineTokenizer
将一行输入转换为FieldSet
的抽象是必要的,因为可能有许多格式的平面文件数据需要转换为FieldSet
。在 Spring 批处理中,这个接口是LineTokenizer
:
public interface LineTokenizer {
FieldSet tokenize(String line);
}
aLineTokenizer
的契约是这样的,给定一条输入线(理论上String
可以包含多条线),返回一个代表该线的FieldSet
。然后可以将这个FieldSet
传递给FieldSetMapper
。 Spring 批处理包含以下LineTokenizer
实现:
DelimitedLineTokenizer
:用于记录中的字段用分隔符分隔的文件。最常见的分隔符是逗号,但也经常使用管道或分号。FixedLengthTokenizer
:用于记录中的字段都是“固定宽度”的文件。必须为每个记录类型定义每个字段的宽度。PatternMatchingCompositeLineTokenizer
:通过检查模式,确定在特定行上应该使用记号符列表中的哪一个LineTokenizer
。
# FieldSetMapper
FieldSetMapper
接口定义了一个方法mapFieldSet
,它接受一个FieldSet
对象并将其内容映射到一个对象。该对象可以是自定义 DTO、域对象或数组,具体取决于作业的需要。FieldSetMapper
与LineTokenizer
结合使用,以将资源中的一行数据转换为所需类型的对象,如以下接口定义所示:
public interface FieldSetMapper<T> {
T mapFieldSet(FieldSet fieldSet) throws BindException;
}
使用的模式与JdbcTemplate
使用的RowMapper
相同。
# DefaultLineMapper
既然已经定义了在平面文件中读取的基本接口,那么显然需要三个基本步骤:
从文件中读出一行。
将
String
行传递到LineTokenizer#tokenize()
方法中,以检索FieldSet
。将从标记化返回的
FieldSet
传递到FieldSetMapper
,从ItemReader#read()
方法返回结果。
上面描述的两个接口代表两个独立的任务:将一行转换为FieldSet
,并将FieldSet
映射到域对象。由于LineTokenizer
的输入与LineMapper
(一行)的输入匹配,并且FieldSetMapper
的输出与LineMapper
的输出匹配,因此提供了一个同时使用LineTokenizer
和FieldSetMapper
的默认实现。下面的类定义中显示的DefaultLineMapper
表示大多数用户需要的行为:
public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {
private LineTokenizer tokenizer;
private FieldSetMapper<T> fieldSetMapper;
public T mapLine(String line, int lineNumber) throws Exception {
return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
}
public void setLineTokenizer(LineTokenizer tokenizer) {
this.tokenizer = tokenizer;
}
public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
this.fieldSetMapper = fieldSetMapper;
}
}
上述功能是在默认实现中提供的,而不是内置在阅读器本身中(就像框架的以前版本中所做的那样),以允许用户在控制解析过程中具有更大的灵活性,尤其是在需要访问原始行的情况下。
# 简单分隔的文件读取示例
下面的示例演示了如何在实际的域场景中读取平面文件。这个特定的批处理作业从以下文件中读取足球运动员:
ID,lastName,firstName,position,birthYear,debutYear
"AbduKa00,Abdul-Jabbar,Karim,rb,1974,1996",
"AbduRa00,Abdullah,Rabih,rb,1975,1999",
"AberWa00,Abercrombie,Walter,rb,1959,1982",
"AbraDa00,Abramowicz,Danny,wr,1945,1967",
"AdamBo00,Adams,Bob,te,1946,1969",
"AdamCh00,Adams,Charlie,wr,1979,2003"
此文件的内容映射到以下Player
域对象:
public class Player implements Serializable {
private String ID;
private String lastName;
private String firstName;
private String position;
private int birthYear;
private int debutYear;
public String toString() {
return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
",First Name=" + firstName + ",Position=" + position +
",Birth Year=" + birthYear + ",DebutYear=" +
debutYear;
}
// setters and getters...
}
要将FieldSet
映射到Player
对象中,需要定义一个返回播放机的FieldSetMapper
,如下例所示:
protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fieldSet) {
Player player = new Player();
player.setID(fieldSet.readString(0));
player.setLastName(fieldSet.readString(1));
player.setFirstName(fieldSet.readString(2));
player.setPosition(fieldSet.readString(3));
player.setBirthYear(fieldSet.readInt(4));
player.setDebutYear(fieldSet.readInt(5));
return player;
}
}
然后,可以通过正确地构造FlatFileItemReader
并调用read
来读取文件,如以下示例所示:
FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
//DelimitedLineTokenizer defaults to comma as its delimiter
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();
对read
的每次调用都会从文件中的每一行返回一个新的Player
对象。当到达文件的末尾时,将返回null
。
# 按名称映射字段
还有一个额外的功能块是DelimitedLineTokenizer
和FixedLengthTokenizer
都允许的,它在功能上类似于 JDBCResultSet
。字段的名称可以被注入到这些LineTokenizer
实现中,以增加映射函数的可读性。首先,将平面文件中所有字段的列名注入到记号生成器中,如下例所示:
tokenizer.setNames(new String[] {"ID", "lastName", "firstName", "position", "birthYear", "debutYear"});
aFieldSetMapper
可以如下方式使用此信息:
public class PlayerMapper implements FieldSetMapper<Player> {
public Player mapFieldSet(FieldSet fs) {
if (fs == null) {
return null;
}
Player player = new Player();
player.setID(fs.readString("ID"));
player.setLastName(fs.readString("lastName"));
player.setFirstName(fs.readString("firstName"));
player.setPosition(fs.readString("position"));
player.setDebutYear(fs.readInt("debutYear"));
player.setBirthYear(fs.readInt("birthYear"));
return player;
}
}
# 向域对象自动设置字段集
对于许多人来说,必须为FieldSetMapper
编写特定的RowMapper
,就像为JdbcTemplate
编写特定的RowMapper
一样麻烦。 Spring 批处理通过提供FieldSetMapper
使这一点变得更容易,该批处理通过使用 JavaBean 规范将字段名称与对象上的 setter 匹配来自动映射字段。
再次使用 Football 示例,BeanWrapperFieldSetMapper
配置在 XML 中看起来像以下代码片段:
XML 配置
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="player" />
</bean>
<bean id="player"
class="org.springframework.batch.sample.domain.Player"
scope="prototype" />
再次使用 Football 示例,BeanWrapperFieldSetMapper
配置在 Java 中看起来像以下代码片段:
Java 配置
@Bean
public FieldSetMapper fieldSetMapper() {
BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();
fieldSetMapper.setPrototypeBeanName("player");
return fieldSetMapper;
}
@Bean
@Scope("prototype")
public Player player() {
return new Player();
}
对于FieldSet
中的每个条目,映射器在Player
对象的新实例上查找相应的 setter(由于这个原因,需要原型作用域),就像 Spring 容器查找匹配属性名的 setter 一样。映射FieldSet
中的每个可用字段,并返回结果Player
对象,不需要任何代码。
# 固定长度文件格式
到目前为止,只对分隔的文件进行了详细的讨论。然而,它们只代表了文件阅读图片的一半。许多使用平面文件的组织使用固定长度格式。下面是固定长度文件的示例:
UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5
虽然这看起来像是一个很大的域,但它实际上代表了 4 个不同的域:
ISIN:所订购商品的唯一标识符-12 个字符长。
数量:订购的商品数量-3 个字符长。
价格:该商品的价格-5 个字符长.
顾客:订购该商品的顾客的 ID-9 个字符长。
在配置FixedLengthLineTokenizer
时,这些长度中的每一个都必须以范围的形式提供。
下面的示例展示了如何在 XML 中为FixedLengthLineTokenizer
定义范围:
XML 配置
<bean id="fixedLengthLineTokenizer"
class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
<property name="names" value="ISIN,Quantity,Price,Customer" />
<property name="columns" value="1-12, 13-15, 16-20, 21-29" />
</bean>
因为FixedLengthLineTokenizer
使用与前面讨论的相同的LineTokenizer
接口,所以它返回相同的FieldSet
,就像使用了分隔符一样。这允许在处理其输出时使用相同的方法,例如使用BeanWrapperFieldSetMapper
。
支持前面的范围语法需要在ApplicationContext 中配置专门的属性编辑器RangeArrayPropertyEditor 。然而,这 Bean 是在使用批处理名称空间的 ApplicationContext 中自动声明的。 |
---|
下面的示例展示了如何在 Java 中为FixedLengthLineTokenizer
定义范围:
Java 配置
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
tokenizer.setColumns(new Range(1, 12),
new Range(13, 15),
new Range(16, 20),
new Range(21, 29));
return tokenizer;
}
因为FixedLengthLineTokenizer
使用与上面讨论的相同的LineTokenizer
接口,所以它返回相同的FieldSet
,就像使用了分隔符一样。这使得在处理其输出时可以使用相同的方法,例如使用BeanWrapperFieldSetMapper
。
# 单个文件中的多个记录类型
到目前为止,所有的文件读取示例都为了简单起见做出了一个关键假设:文件中的所有记录都具有相同的格式。然而,情况可能并不总是如此。很常见的一种情况是,一个文件可能具有不同格式的记录,这些记录需要以不同的方式进行标记并映射到不同的对象。下面的文件摘录说明了这一点:
USER;Smith;Peter;;T;20014539;F
LINEA;1044391041ABC037.49G201XX1383.12H
LINEB;2134776319DEF422.99M005LI
在这个文件中,我们有三种类型的记录,“user”、“linea”和“lineb”。“user”行对应于User
对象。“linea”和“lineb”都对应于Line
对象,尽管“linea”比“lineb”有更多的信息。
ItemReader
单独读取每一行,但是我们必须指定不同的LineTokenizer
和FieldSetMapper
对象,以便ItemWriter
接收正确的项。PatternMatchingCompositeLineMapper
允许配置模式到LineTokenizers
的映射和模式到FieldSetMappers
的映射,从而简化了这一过程。
下面的示例展示了如何在 XML 中为FixedLengthLineTokenizer
定义范围:
XML 配置
<bean id="orderFileLineMapper"
class="org.spr...PatternMatchingCompositeLineMapper">
<property name="tokenizers">
<map>
<entry key="USER*" value-ref="userTokenizer" />
<entry key="LINEA*" value-ref="lineATokenizer" />
<entry key="LINEB*" value-ref="lineBTokenizer" />
</map>
</property>
<property name="fieldSetMappers">
<map>
<entry key="USER*" value-ref="userFieldSetMapper" />
<entry key="LINE*" value-ref="lineFieldSetMapper" />
</map>
</property>
</bean>
Java 配置
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
PatternMatchingCompositeLineMapper lineMapper =
new PatternMatchingCompositeLineMapper();
Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
tokenizers.put("USER*", userTokenizer());
tokenizers.put("LINEA*", lineATokenizer());
tokenizers.put("LINEB*", lineBTokenizer());
lineMapper.setTokenizers(tokenizers);
Map<String, FieldSetMapper> mappers = new HashMap<>(2);
mappers.put("USER*", userFieldSetMapper());
mappers.put("LINE*", lineFieldSetMapper());
lineMapper.setFieldSetMappers(mappers);
return lineMapper;
}
在这个示例中,“linea”和“lineb”有单独的LineTokenizer
实例,但它们都使用相同的FieldSetMapper
。
PatternMatchingCompositeLineMapper
使用PatternMatcher#match
方法为每一行选择正确的委托。PatternMatcher
允许两个具有特殊含义的通配符:问号(“?”)恰好匹配一个字符,而星号(“*”)匹配零个或更多字符。请注意,在前面的配置中,所有模式都以星号结尾,使它们有效地成为行的前缀。无论配置中的顺序如何,PatternMatcher
始终匹配最特定的模式。因此,如果“line*”和“linea*”都被列为模式,那么“linea”将匹配模式“linea*”,而“lineb”将匹配模式“line*”。此外,单个星号(“*”)可以通过匹配任何其他模式不匹配的任何行来作为默认设置。
下面的示例展示了如何匹配 XML 中任何其他模式都不匹配的行:
XML 配置
<entry key="*" value-ref="defaultLineTokenizer" />
下面的示例展示了如何匹配 Java 中任何其他模式都不匹配的行:
Java 配置
...
tokenizers.put("*", defaultLineTokenizer());
...
还有一个PatternMatchingCompositeLineTokenizer
可以单独用于标记化。
平面文件中包含的记录跨越多行也是很常见的。要处理这种情况,需要一种更复杂的策略。在multiLineRecords
示例中可以找到这种常见模式的演示。
# 平面文件中的异常处理
在许多情况下,对一行进行标记化可能会导致抛出异常。许多平面文件是不完美的,包含格式不正确的记录。许多用户在记录问题、原始行号和行号时选择跳过这些错误行。这些日志稍后可以手动检查,也可以通过另一个批处理作业进行检查。出于这个原因, Spring Batch 为处理解析异常提供了一个异常层次结构:FlatFileParseException
和FlatFileFormatException
。当试图读取文件时遇到任何错误时,FlatFileParseException
将抛出FlatFileItemReader
。FlatFileFormatException
由LineTokenizer
接口的实现抛出,并指示在标记时遇到的更具体的错误。
# IncorrectTokenCountException
DelimitedLineTokenizer
和FixedLengthLineTokenizer
都可以指定可用于创建FieldSet
的列名。但是,如果列名的数量与对一行进行标记时发现的列数不匹配,则无法创建FieldSet
,并抛出一个IncorrectTokenCountException
,其中包含遇到的令牌数量和预期的数量,如以下示例所示:
tokenizer.setNames(new String[] {"A", "B", "C", "D"});
try {
tokenizer.tokenize("a,b,c");
}
catch (IncorrectTokenCountException e) {
assertEquals(4, e.getExpectedCount());
assertEquals(3, e.getActualCount());
}
因为标记器配置了 4 个列名,但在文件中只找到了 3 个令牌,所以抛出了一个IncorrectTokenCountException
。
# IncorrectLineLengthException
以固定长度格式格式化的文件在解析时有额外的要求,因为与分隔格式不同,每个列必须严格遵守其预定义的宽度。如果行的总长度不等于此列的最大值,则抛出一个异常,如以下示例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5),
new Range(6, 10),
new Range(11, 15) });
try {
tokenizer.tokenize("12345");
fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
assertEquals(15, ex.getExpectedLength());
assertEquals(5, ex.getActualLength());
}
上面的记号生成器的配置范围是:1-5、6-10 和 11-1 5.因此,这条线的总长度是 1 5.但是,在前面的示例中,传入了长度为 5 的行,从而引发了IncorrectLineLengthException
。在此抛出一个异常,而不是仅映射第一列,这样可以使行的处理更早失败,并且所包含的信息比在试图在FieldSetMapper
中读取第 2 列时失败时所包含的信息更多。然而,在某些情况下,直线的长度并不总是恒定的。因此,可以通过“严格”属性关闭对行长的验证,如下例所示:
tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));
前面的示例与前面的示例几乎相同,只是调用了tokenizer.setStrict(false)
。这个设置告诉标记器在标记行时不要强制行长。现在正确地创建并返回了FieldSet
。但是,对于其余的值,它只包含空标记。
# FlatFileItemWriter
写入平面文件也存在从文件读入时必须克服的问题。一个步骤必须能够以事务性的方式编写分隔格式或固定长度格式。
# LineAggregator
正如LineTokenizer
接口是获取一个项并将其转换为String
所必需的一样,文件写入必须有一种方法,可以将多个字段聚合到一个字符串中,以便将其写入文件。在 Spring 批处理中,这是LineAggregator
,如下面的接口定义所示:
public interface LineAggregator<T> {
public String aggregate(T item);
}
LineAggregator
是LineTokenizer
的逻辑对立面。LineTokenizer
接受一个String
并返回一个FieldSet
,而LineAggregator
接受一个item
并返回一个String
。
# PassThroughLineAggregator
LineAggregator
接口的最基本的实现是PassThroughLineAggregator
,它假定对象已经是一个字符串,或者它的字符串表示可以用于编写,如下面的代码所示:
public class PassThroughLineAggregator<T> implements LineAggregator<T> {
public String aggregate(T item) {
return item.toString();
}
}
如果需要直接控制创建字符串,那么前面的实现是有用的,但是FlatFileItemWriter
的优点,例如事务和重新启动支持,是必要的。
# 简化文件编写示例
既然LineAggregator
接口及其最基本的实现PassThroughLineAggregator
已经定义好了,那么编写的基本流程就可以解释了:
要写入的对象被传递给
LineAggregator
,以获得String
。返回的
String
被写入配置的文件。
下面摘自FlatFileItemWriter
的代码表达了这一点:
public void write(T item) throws Exception {
write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}
在 XML 中,配置的一个简单示例可能如下所示:
XML 配置
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" value="file:target/test-outputs/output.txt" />
<property name="lineAggregator">
<bean class="org.spr...PassThroughLineAggregator"/>
</property>
</bean>
在 Java 中,配置的一个简单示例可能如下所示:
Java 配置
@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<Foo>()
.name("itemWriter")
.resource(new FileSystemResource("target/test-outputs/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
# FieldExtractor
前面的示例对于对文件的写入的最基本使用可能是有用的。然而,FlatFileItemWriter
的大多数用户都有一个需要写出的域对象,因此必须将其转换为一行。在文件阅读中,需要进行以下操作:
从文件中读出一行。
将该行传递到
LineTokenizer#tokenize()
方法中,以便检索FieldSet
。将从标记化返回的
FieldSet
传递到FieldSetMapper
,从ItemReader#read()
方法返回结果。
编写文件也有类似但相反的步骤:
把要写的东西交给作者。
将项目上的字段转换为数组。
将生成的数组聚合为一条线。
因为框架无法知道需要从对象中写出哪些字段,所以必须编写FieldExtractor
才能完成将项转换为数组的任务,如下面的接口定义所示:
public interface FieldExtractor<T> {
Object[] extract(T item);
}
FieldExtractor
接口的实现应该从提供的对象的字段创建一个数组,然后可以在元素之间使用分隔符写出该数组,或者作为固定宽度线的一部分。
# PassThroughFieldExtractor
在许多情况下,需要写出集合,例如一个数组,Collection
或FieldSet
。从这些集合类型中的一种“提取”一个数组是非常简单的。要做到这一点,将集合转换为一个数组。因此,在此场景中应该使用PassThroughFieldExtractor
。应该注意的是,如果传入的对象不是集合的类型,那么PassThroughFieldExtractor
将返回一个仅包含要提取的项的数组。
# BeanWrapperFieldExtractor
与文件读取部分中描述的BeanWrapperFieldSetMapper
一样,通常更好的方法是配置如何将域对象转换为对象数组,而不是自己编写转换。BeanWrapperFieldExtractor
提供了这种功能,如以下示例所示:
BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });
String first = "Alan";
String last = "Turing";
int born = 1912;
Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);
assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);
这个提取器实现只有一个必需的属性:要映射的字段的名称。正如BeanWrapperFieldSetMapper
需要字段名称来将FieldSet
上的字段映射到所提供对象上的 setter 一样,BeanWrapperFieldExtractor
也需要名称来映射到 getter 以创建对象数组。值得注意的是,名称的顺序决定了数组中字段的顺序。
# 分隔的文件编写示例
最基本的平面文件格式是一种所有字段都用分隔符分隔的格式。这可以使用DelimitedLineAggregator
来完成。下面的示例写出了一个简单的域对象,该对象表示对客户帐户的信用:
public class CustomerCredit {
private int id;
private String name;
private BigDecimal credit;
//getters and setters removed for clarity
}
由于正在使用域对象,因此必须提供FieldExtractor
接口的实现以及要使用的分隔符。
下面的示例展示了如何在 XML 中使用带有分隔符的FieldExtractor
:
XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...DelimitedLineAggregator">
<property name="delimiter" value=","/>
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit"/>
</bean>
</property>
</bean>
</property>
</bean>
下面的示例展示了如何在 Java 中使用带有分隔符的FieldExtractor
:
Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
在前面的示例中,本章前面描述的BeanWrapperFieldExtractor
用于将CustomerCredit
中的名称和信用字段转换为一个对象数组,然后在每个字段之间使用逗号写出该对象数组。
也可以使用FlatFileItemWriterBuilder.DelimitedBuilder
自动创建BeanWrapperFieldExtractor
和DelimitedLineAggregator
,如以下示例所示:
Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.delimited()
.delimiter("|")
.names(new String[] {"name", "credit"})
.build();
}
# 固定宽度文件编写示例
分隔符并不是唯一一种平面文件格式。许多人更喜欢为每个列使用一个设置的宽度来划分字段,这通常称为“固定宽度”。 Spring 批处理在用FormatterLineAggregator
写文件时支持这一点。
使用上述相同的CustomerCredit
域对象,可以在 XML 中进行如下配置:
XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator">
<bean class="org.spr...FormatterLineAggregator">
<property name="fieldExtractor">
<bean class="org.spr...BeanWrapperFieldExtractor">
<property name="names" value="name,credit" />
</bean>
</property>
<property name="format" value="%-9s%-2.0f" />
</bean>
</property>
</bean>
使用上面描述的相同的CustomerCredit
域对象,可以在 Java 中进行如下配置:
Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"name", "credit"});
fieldExtractor.afterPropertiesSet();
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
lineAggregator.setFieldExtractor(fieldExtractor);
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.lineAggregator(lineAggregator)
.build();
}
前面的大多数示例看起来应该很熟悉。但是,格式属性的值是新的。
下面的示例显示了 XML 中的格式属性:
<property name="format" value="%-9s%-2.0f" />
下面的示例显示了 Java 中的 format 属性:
...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...
底层实现是使用作为 Java5 的一部分添加的相同的Formatter
构建的。JavaFormatter
基于 C 编程语言的printf
功能。关于如何配置格式化程序的大多数详细信息可以在Formatter (opens new window)的 Javadoc 中找到。
也可以使用FlatFileItemWriterBuilder.FormattedBuilder
自动创建BeanWrapperFieldExtractor
和FormatterLineAggregator
,如以下示例所示:
Java 配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
return new FlatFileItemWriterBuilder<CustomerCredit>()
.name("customerCreditWriter")
.resource(outputResource)
.formatted()
.format("%-9s%-2.0f")
.names(new String[] {"name", "credit"})
.build();
}
# 处理文件创建
FlatFileItemReader
与文件资源的关系非常简单。当读取器被初始化时,它会打开该文件(如果它存在的话),如果它不存在,则会抛出一个异常。写文件并不是那么简单。乍一看,对于FlatFileItemWriter
似乎应该存在类似的直接契约:如果文件已经存在,则抛出一个异常,如果不存在,则创建它并开始写入。然而,重新启动Job
可能会导致问题。在正常的重启场景中,契约是相反的:如果文件存在,则从最后一个已知的良好位置开始向它写入,如果不存在,则抛出一个异常。但是,如果此作业的文件名总是相同,会发生什么情况?在这种情况下,如果文件存在,你可能想要删除它,除非是重新启动。由于这种可能性,FlatFileItemWriter
包含属性shouldDeleteIfExists
。将此属性设置为 true 将导致在打开 Writer 时删除同名的现有文件。
# XML 项读取器和编写器
Spring Batch 提供了用于读取 XML 记录并将它们映射到 Java 对象以及将 Java 对象写为 XML 记录的事务基础设施。
流 XML 上的约束 STAX API 用于 I/O,因为其他标准的 XML 解析 API 不符合批处理 的要求(DOM 一次将整个输入加载到内存中,SAX 通过允许用户仅提供回调来控制 解析过程)。 |
---|
我们需要考虑 XML 输入和输出如何在 Spring 批处理中工作。首先,有几个概念与文件读写不同,但在 Spring 批 XML 处理中很常见。使用 XML 处理,不是需要标记的记录行(FieldSet
实例),而是假设 XML 资源是与单个记录相对应的“片段”的集合,如下图所示:
图 1.XML 输入
在上面的场景中,“trade”标记被定义为“root 元素”。“<trade>”和“</trade>”之间的所有内容都被视为一个“片段”。 Spring 批处理使用对象/XML 映射(OXM)将片段绑定到对象。然而, Spring 批处理并不绑定到任何特定的 XML 绑定技术。典型的用途是委托给Spring OXM (opens new window),这为最流行的 OXM 技术提供了统一的抽象。对 Spring OXM 的依赖是可选的,如果需要,可以选择实现 Spring 批处理特定接口。与 OXM 支持的技术之间的关系如下图所示:
图 2.OXM 绑定
通过介绍 OXM 以及如何使用 XML 片段来表示记录,我们现在可以更仔细地研究阅读器和编写器。
# StaxEventItemReader
StaxEventItemReader
配置为处理来自 XML 输入流的记录提供了一个典型的设置。首先,考虑StaxEventItemReader
可以处理的以下一组 XML 记录:
<?xml version="1.0" encoding="UTF-8"?>
<records>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0001</isin>
<quantity>5</quantity>
<price>11.39</price>
<customer>Customer1</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0002</isin>
<quantity>2</quantity>
<price>72.99</price>
<customer>Customer2c</customer>
</trade>
<trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
<isin>XYZ0003</isin>
<quantity>9</quantity>
<price>99.99</price>
<customer>Customer3</customer>
</trade>
</records>
为了能够处理 XML 记录,需要具备以下条件:
根元素名称:构成要映射的对象的片段的根元素的名称。示例配置用“交易价值”演示了这一点。
资源:表示要读取的文件的 Spring 资源。
Unmarshaller
: Spring OXM 提供的一种解组功能,用于将 XML 片段映射到对象。
下面的示例展示了如何定义一个StaxEventItemReader
,它与一个名为trade
的根元素、一个资源data/iosample/input/input.xml
和一个在 XML 中名为tradeMarshaller
的解组器一起工作:
XML 配置
<bean id="itemReader" class="org.springframework.batch.item.xml.StaxEventItemReader">
<property name="fragmentRootElementName" value="trade" />
<property name="resource" value="org/springframework/batch/item/xml/domain/trades.xml" />
<property name="unmarshaller" ref="tradeMarshaller" />
</bean>
下面的示例展示了如何定义一个StaxEventItemReader
,它与一个名为trade
的根元素、一个资源data/iosample/input/input.xml
和一个在 Java 中名为tradeMarshaller
的解组器一起工作:
Java 配置
@Bean
public StaxEventItemReader itemReader() {
return new StaxEventItemReaderBuilder<Trade>()
.name("itemReader")
.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
.addFragmentRootElements("trade")
.unmarshaller(tradeMarshaller())
.build();
}
请注意,在本例中,我们选择使用XStreamMarshaller
,它接受作为映射传入的别名,第一个键和值是片段的名称(即根元素)和要绑定的对象类型。然后,类似于FieldSet
,映射到对象类型中的字段的其他元素的名称在映射中被描述为键/值对。在配置文件中,我们可以使用 Spring 配置实用程序来描述所需的别名。
下面的示例展示了如何用 XML 描述别名:
XML 配置
<bean id="tradeMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="trade"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
下面的示例展示了如何在 Java 中描述别名:
Java 配置
@Bean
public XStreamMarshaller tradeMarshaller() {
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
XStreamMarshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
return marshaller;
}
在输入时,读取器读取 XML 资源,直到它识别出一个新的片段即将开始。默认情况下,读取器匹配元素名,以识别一个新片段即将开始。阅读器从片段中创建一个独立的 XML 文档,并将该文档传递给一个反序列化器(通常是围绕 Spring OXMUnmarshaller
的包装器),以将 XML 映射到一个 Java 对象。
总之,这个过程类似于下面的 Java 代码,它使用由 Spring 配置提供的注入:
StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());
boolean hasNext = true;
Trade trade = null;
while (hasNext) {
trade = xmlStaxEventItemReader.read();
if (trade == null) {
hasNext = false;
}
else {
System.out.println(trade);
}
}
# StaxEventItemWriter
输出与输入对称地工作。StaxEventItemWriter
需要一个Resource
、一个编组器和一个rootTagName
。将 Java 对象传递给编组器(通常是标准的 Spring OXM 编组器),该编组器通过使用自定义事件编写器将 OXM 工具为每个片段产生的StartDocument
和EndDocument
事件进行过滤,从而将其写到Resource
。
下面的 XML 示例使用MarshallingEventWriterSerializer
:
XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">
<property name="resource" ref="outputResource" />
<property name="marshaller" ref="tradeMarshaller" />
<property name="rootTagName" value="trade" />
<property name="overwriteOutput" value="true" />
</bean>
下面的 Java 示例使用MarshallingEventWriterSerializer
:
Java 配置
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
return new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(tradeMarshaller())
.resource(outputResource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
}
前面的配置设置了三个必需的属性,并设置了可选的overwriteOutput=true
attrbute,这在本章前面提到过,用于指定现有文件是否可以重写。
下面的 XML 示例使用了与本章前面所示的阅读示例中使用的相同的编组器:
XML 配置
<bean id="customerCreditMarshaller"
class="org.springframework.oxm.xstream.XStreamMarshaller">
<property name="aliases">
<util:map id="aliases">
<entry key="customer"
value="org.springframework.batch.sample.domain.trade.Trade" />
<entry key="price" value="java.math.BigDecimal" />
<entry key="isin" value="java.lang.String" />
<entry key="customer" value="java.lang.String" />
<entry key="quantity" value="java.lang.Long" />
</util:map>
</property>
</bean>
下面的 Java 示例使用了与本章前面所示的阅读示例中使用的收集器相同的收集器:
Java 配置
@Bean
public XStreamMarshaller customerCreditMarshaller() {
XStreamMarshaller marshaller = new XStreamMarshaller();
Map<String, Class> aliases = new HashMap<>();
aliases.put("trade", Trade.class);
aliases.put("price", BigDecimal.class);
aliases.put("isin", String.class);
aliases.put("customer", String.class);
aliases.put("quantity", Long.class);
marshaller.setAliases(aliases);
return marshaller;
}
作为 Java 示例的总结,下面的代码演示了讨论的所有要点,并演示了所需属性的编程设置:
FileSystemResource resource = new FileSystemResource("data/outputFile.xml")
Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);
StaxEventItemWriter staxItemWriter =
new StaxEventItemWriterBuilder<Trade>()
.name("tradesWriter")
.marshaller(marshaller)
.resource(resource)
.rootTagName("trade")
.overwriteOutput(true)
.build();
staxItemWriter.afterPropertiesSet();
ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);
# JSON 条目阅读器和编写器
Spring Batch 以以下格式提供对读取和写入 JSON 资源的支持:
[
{
"isin": "123",
"quantity": 1,
"price": 1.2,
"customer": "foo"
},
{
"isin": "456",
"quantity": 2,
"price": 1.4,
"customer": "bar"
}
]
假定 JSON 资源是与单个项对应的 JSON 对象数组。 Spring 批处理不绑定到任何特定的 JSON 库。
# JsonItemReader
JsonItemReader
将 JSON 解析和绑定委托给org.springframework.batch.item.json.JsonObjectReader
接口的实现。该接口旨在通过使用流 API 以块形式读取 JSON 对象来实现。目前提供了两种实现方式:
Jackson (opens new window)通过
org.springframework.batch.item.json.JacksonJsonObjectReader
Gson (opens new window)通过
org.springframework.batch.item.json.GsonJsonObjectReader
要能够处理 JSON 记录,需要具备以下条件:
Resource
:表示要读取的 JSON 文件的 Spring 资源。JsonObjectReader
:用于解析并将 JSON 对象绑定到项的 JSON 对象阅读器
下面的示例展示了如何基于 Jackson 定义一个JsonItemReader
并与前面的 JSON 资源org/springframework/batch/item/json/trades.json
一起工作的JsonObjectReader
:
@Bean
public JsonItemReader<Trade> jsonItemReader() {
return new JsonItemReaderBuilder<Trade>()
.jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonItemReader")
.build();
}
# JsonFileItemWriter
JsonFileItemWriter
将项的编组委托给org.springframework.batch.item.json.JsonObjectMarshaller
接口。这个接口的契约是将一个对象带到一个 JSONString
。目前提供了两种实现方式:
Jackson (opens new window)通过
org.springframework.batch.item.json.JacksonJsonObjectMarshaller
Gson (opens new window)通过
org.springframework.batch.item.json.GsonJsonObjectMarshaller
为了能够编写 JSON 记录,需要具备以下条件:
Resource
:表示要写入的 JSON 文件的一个 SpringResource
JsonObjectMarshaller
:一个 JSON 对象编组器将 Marshall 对象转换为 JSON 格式
下面的示例展示了如何定义JsonFileItemWriter
:
@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
return new JsonFileItemWriterBuilder<Trade>()
.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
.resource(new ClassPathResource("trades.json"))
.name("tradeJsonFileItemWriter")
.build();
}
# 多文件输入
在一个Step
中处理多个文件是一个常见的要求。假设所有文件都具有相同的格式,MultiResourceItemReader
在 XML 和平面文件处理中都支持这种类型的输入。考虑目录中的以下文件:
file-1.txt file-2.txt ignored.txt
File-1.TXT 和 File-2.TXT 的格式相同,出于业务原因,应该一起处理。MultiResourceItemReader
可以通过使用通配符在两个文件中读取。
下面的示例展示了如何使用 XML 中的通配符读取文件:
XML 配置
<bean id="multiResourceReader" class="org.spr...MultiResourceItemReader">
<property name="resources" value="classpath:data/input/file-*.txt" />
<property name="delegate" ref="flatFileItemReader" />
</bean>
下面的示例展示了如何在 Java 中使用通配符读取文件:
Java 配置
@Bean
public MultiResourceItemReader multiResourceReader() {
return new MultiResourceItemReaderBuilder<Foo>()
.delegate(flatFileItemReader())
.resources(resources())
.build();
}
引用的委托是一个简单的FlatFileItemReader
。上面的配置读取两个文件的输入,处理回滚和重新启动场景。应该注意的是,与任何ItemReader
一样,在重新启动时添加额外的输入(在这种情况下是一个文件)可能会导致潜在的问题。建议批处理作业使用它们自己的独立目录,直到成功完成为止。
通过使用MultiResourceItemReader#setComparator(Comparator) 对输入资源进行排序,以确保在重新启动场景中的作业运行之间保留资源排序。 |
---|
# 数据库
像大多数 Enterprise 应用程序样式一样,数据库是批处理的中心存储机制。然而,由于系统必须使用的数据集的巨大规模,批处理与其他应用程序样式不同。如果 SQL 语句返回 100 万行,那么结果集可能会将所有返回的结果保存在内存中,直到所有行都被读取为止。 Spring Batch 为此问题提供了两种类型的解决方案:
[基于游标的
ItemReader
实现][分页
ItemReader
实现]
# 基于光标的ItemReader
实现
使用数据库游标通常是大多数批处理开发人员的默认方法,因为它是数据库解决关系数据“流”问题的方法。JavaResultSet
类本质上是一种用于操作游标的面向对象机制。aResultSet
维护当前数据行的游标。在ResultSet
上调用next
将光标移动到下一行。 Spring 基于批处理游标的ItemReader
实现在初始化时打开游标,并在每次调用read
时将游标向前移动一行,返回可用于处理的映射对象。然后调用close
方法,以确保释放所有资源。 Spring 核心JdbcTemplate
通过使用回调模式来完全映射ResultSet
中的所有行,并在将控制权返回给方法调用方之前关闭,从而绕过了这个问题。然而,在批处理中,这必须等到步骤完成。下图显示了基于游标的ItemReader
如何工作的通用关系图。请注意,虽然示例使用 SQL(因为 SQL 是广为人知的),但任何技术都可以实现基本方法。
图 3.游标示例
这个例子说明了基本模式。给定一个有三列的“foo”表:ID
、NAME
和BAR
,选择 ID 大于 1 但小于 7 的所有行。这将把游标的开头(第 1 行)放在 ID2 上。该行的结果应该是一个完全映射的Foo
对象。调用read()
再次将光标移动到下一行,即 ID 为 3 的Foo
。在每个read
之后写出这些读取的结果,从而允许对对象进行垃圾收集(假设没有实例变量维护对它们的引用)。
# JdbcCursorItemReader
JdbcCursorItemReader
是基于光标的技术的 JDBC 实现。它可以直接与ResultSet
一起工作,并且需要针对从DataSource
获得的连接运行 SQL 语句。下面的数据库模式用作示例:
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人更喜欢为每一行使用域对象,因此下面的示例使用RowMapper
接口的实现来映射CustomerCredit
对象:
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为JdbcCursorItemReader
与JdbcTemplate
共享关键接口,所以查看如何使用JdbcTemplate
在此数据中读取数据的示例非常有用,以便将其与ItemReader
进行对比。为了这个示例的目的,假设CUSTOMER
数据库中有 1,000 行。第一个示例使用JdbcTemplate
:
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
在运行前面的代码片段之后,customerCredits
列表包含 1,000 个CustomerCredit
对象。在查询方法中,从DataSource
获得连接,对其运行所提供的 SQL,并对mapRow
中的每一行调用ResultSet
方法。将其与JdbcCursorItemReader
的方法进行对比,如下例所示:
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
在运行前面的代码片段之后,计数器等于 1,00 0.如果上面的代码将返回的customerCredit
放入一个列表中,结果将与JdbcTemplate
示例完全相同。然而,ItemReader
的一大优势在于,它允许项目被“流化”。read
方法可以调用一次,该项可以由一个ItemWriter
写出,然后可以用read
获得下一个项。这使得项目的读写可以在“块”中完成,并定期提交,这是高性能批处理的本质。此外,很容易地将其配置为将Step
注入到 Spring 批中。
下面的示例展示了如何在 XML 中将ItemReader
插入到Step
中:
XML 配置
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的示例展示了如何在 Java 中将ItemReader
注入Step
:
Java 配置
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
# 附加属性
因为在 Java 中有很多不同的打开光标的选项,所以JdbcCursorItemReader
上有很多可以设置的属性,如下表所示:
ignoreWarnings | 确定是否记录了 SQLwarns 或是否导致异常。 默认值是 true (这意味着记录了警告)。 |
---|---|
fetchSize | 当ResultSet 对象所使用的ResultSet 对象需要更多行时,向 JDBC 驱动程序提供有关应该从数据库中获取的行数的提示。默认情况下,不会给出任何提示。 |
maxRows | 设置底层ResultSet 在任何时候都可以的最大行数的限制。 |
queryTimeout | 将驱动程序等待Statement 对象的秒数设置为运行。如果超过限制,则抛出 DataAccessException 。(有关详细信息,请咨询你的驱动程序供应商文档)。 |
verifyCursorPosition | 因为由ItemReader 持有的相同ResultSet 被传递到RowMapper ,所以用户可以自己调用ResultSet.next() ,这可能会导致阅读器的内部计数出现问题。将该值设置为true 会导致在 RowMapper 调用后,如果光标位置与以前不同,将引发一个异常。 |
saveState | 指示是否应将读取器的状态保存在ExecutionContext 提供的ItemStream#update(ExecutionContext) 中。默认值为true 。 |
driverSupportsAbsolute | 指示 JDBC 驱动程序是否支持 设置 ResultSet 上的绝对行。对于支持ResultSet.absolute() 的 JDBC 驱动程序,建议将其设置为true ,因为这可能会提高性能,特别是在使用大数据集时发生步骤失败时。默认值为 false 。 |
setUseSharedExtendedConnection | 指示用于光标的连接 是否应由所有其他处理使用,从而共享相同的 事务。如果将其设置为 false ,然后用它自己的连接打开光标,并且不参与启动的任何事务对于步骤处理的其余部分, 如果将此标志设置为 true ,则必须将数据源包装在ExtendedConnectionDataSourceProxy 中,以防止连接被关闭,并在每次提交后释放。当你将此选项设置为 true 时,用于打开光标的语句将使用’只读’和’持有 _ 游标 _over_commit’选项创建。 这允许在事务启动时保持光标打开,并在 步骤处理中执行提交。要使用此功能,你需要一个支持此功能的数据库,以及一个支持 JDBC3.0 或更高版本的 JDBC 驱动程序。默认值为 false 。 |
# HibernateCursorItemReader
正如正常的 Spring 用户对是否使用 ORM 解决方案做出重要的决定,这会影响他们是否使用JdbcTemplate
或HibernateTemplate
, Spring 批处理用户具有相同的选项。HibernateCursorItemReader
是 Hibernate 游标技术的实现。 Hibernate 的批量使用一直颇具争议。这在很大程度上是因为 Hibernate 最初是为了支持在线应用程序样式而开发的。然而,这并不意味着它不能用于批处理。解决这个问题的最简单的方法是使用StatelessSession
,而不是使用标准会话。这删除了 Hibernate 使用的所有缓存和脏检查,这可能会在批处理场景中导致问题。有关无状态会话和正常 Hibernate 会话之间的差异的更多信息,请参阅你的特定 Hibernate 版本的文档。HibernateCursorItemReader
允许你声明一个 HQL 语句,并传入一个SessionFactory
,它将在每个调用中传回一个项,以与JdbcCursorItemReader
相同的基本方式进行读取。下面的示例配置使用了与 JDBC 阅读器相同的“客户信用”示例:
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
这个配置的ItemReader
以与JdbcCursorItemReader
所描述的完全相同的方式返回CustomerCredit
对象,假设 Hibernate 已经为Customer
表正确地创建了映射文件。“useStatelession”属性默认为 true,但在此添加此属性是为了提请注意打开或关闭它的能力。还值得注意的是,可以使用setFetchSize
属性设置底层游标的 fetch 大小。与JdbcCursorItemReader
一样,配置也很简单。
下面的示例展示了如何在 XML 中注入 Hibernate ItemReader
:
XML 配置
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
下面的示例展示了如何在 Java 中注入 Hibernate ItemReader
:
Java 配置
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
# StoredProcedureItemReader
有时需要使用存储过程来获取游标数据。StoredProcedureItemReader
的工作原理与JdbcCursorItemReader
类似,不同的是,它运行的是返回光标的存储过程,而不是运行查询来获取光标。存储过程可以以三种不同的方式返回光标:
作为返回的
ResultSet
(由 SQL Server、Sybase、DB2、Derby 和 MySQL 使用)。作为 ref-cursor 作为 out 参数返回(Oracle 和 PostgreSQL 使用)。
作为存储函数调用的返回值。
下面的 XML 示例配置使用了与前面的示例相同的“客户信用”示例:
XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的 Java 示例配置使用了与前面的示例相同的“客户信用”示例:
Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
前面的示例依赖于存储过程来提供ResultSet
作为返回的结果(前面的选项 1)。
如果存储过程返回了ref-cursor
(选项 2),那么我们将需要提供输出参数的位置,即返回的ref-cursor
。
下面的示例展示了如何使用第一个参数作为 XML 中的 ref-cursor:
XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的示例展示了如何使用第一个参数作为 Java 中的 ref-cursor:
Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
如果光标是从存储函数返回的(选项 3),则需要将属性“function”设置为true
。它的默认值为false
。
下面的示例在 XML 中向true
显示了属性:
XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.sample.domain.CustomerCreditRowMapper"/>
</property>
</bean>
下面的示例在 Java 中向true
显示了属性:
Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
在所有这些情况下,我们需要定义一个RowMapper
以及一个DataSource
和实际的过程名称。
如果存储过程或函数接受参数,则必须使用parameters
属性声明和设置参数。下面的示例为 Oracle 声明了三个参数。第一个参数是返回 ref-cursor 的out
参数,第二个和第三个参数是参数中的INTEGER
类型的值。
下面的示例展示了如何使用 XML 中的参数:
XML 配置
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
下面的示例展示了如何使用 Java 中的参数:
Java 配置
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
除了参数声明外,我们还需要指定一个PreparedStatementSetter
实现,该实现为调用设置参数值。这与上面的JdbcCursorItemReader
的工作原理相同。附加属性中列出的所有附加属性也适用于StoredProcedureItemReader
。
# 分页ItemReader
实现
使用数据库游标的一种替代方法是运行多个查询,其中每个查询获取部分结果。我们把这一部分称为一个页面。每个查询必须指定起始行号和我们希望在页面中返回的行数。
# JdbcPagingItemReader
分页ItemReader
的一个实现是JdbcPagingItemReader
。JdbcPagingItemReader
需要一个PagingQueryProvider
,负责提供用于检索构成页面的行的 SQL 查询。由于每个数据库都有自己的策略来提供分页支持,因此我们需要为每个受支持的数据库类型使用不同的PagingQueryProvider
。还有SqlPagingQueryProviderFactoryBean
自动检测正在使用的数据库,并确定适当的PagingQueryProvider
实现。这简化了配置,是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean
要求你指定select
子句和from
子句。你还可以提供一个可选的where
子句。这些子句和所需的sortKey
用于构建 SQL 语句。
在sortKey 上有一个唯一的键约束是很重要的,以保证在两次执行之间不会丢失任何数据。 |
---|
打开读取器后,它会以与任何其他ItemReader
相同的基本方式,将每个调用返回一个项到read
。当需要额外的行时,分页会在幕后进行。
下面的 XML 示例配置使用了与前面显示的基于游标的ItemReaders
类似的“客户信用”示例:
XML 配置
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
下面的 Java 示例配置使用了与前面显示的基于游标的ItemReaders
类似的“客户信用”示例:
Java 配置
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
此配置的ItemReader
使用RowMapper
返回CustomerCredit
对象,该对象必须指定。“PageSize”属性确定每次运行查询时从数据库中读取的实体的数量。
“parametervalues”属性可用于为查询指定一个Map
参数值。如果在where
子句中使用命名参数,则每个条目的键应该与命名参数的名称匹配。如果使用传统的“?”占位符,那么每个条目的键应该是占位符的编号,从 1 开始。
# JpaPagingItemReader
分页ItemReader
的另一个实现是JpaPagingItemReader
。 JPA 不具有类似于 Hibernate 的概念,因此我们不得不使用由 JPA 规范提供的其他特征。由于 JPA 支持分页,所以当涉及到使用 JPA 进行批处理时,这是一个自然的选择。在读取每个页面之后,这些实体将被分离,持久性上下文将被清除,从而允许在页面被处理之后对这些实体进行垃圾收集。
JpaPagingItemReader
允许你声明一个 JPQL 语句,并传入一个EntityManagerFactory
。然后,它在每个调用中传回一个项,以与任何其他ItemReader
相同的基本方式进行读取。当需要额外的实体时,寻呼就会在幕后进行。
下面的 XML 示例配置使用了与前面显示的 JDBC 阅读器相同的“客户信用”示例:
XML 配置
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
下面的 Java 示例配置使用了与前面显示的 JDBC 阅读器相同的“客户信用”示例:
Java 配置
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
这个配置的ItemReader
以与上面描述的JdbcPagingItemReader
对象完全相同的方式返回CustomerCredit
对象,假设CustomerCredit
对象具有正确的 JPA 注释或 ORM 映射文件。“PageSize”属性确定每个查询执行从数据库中读取的实体的数量。
# 数据库项目编写器
虽然平面文件和 XML 文件都有一个特定的ItemWriter
实例,但在数据库世界中没有完全相同的实例。这是因为事务提供了所需的所有功能。ItemWriter
实现对于文件来说是必要的,因为它们必须像事务一样工作,跟踪写好的项目,并在适当的时候刷新或清除。数据库不需要此功能,因为写操作已经包含在事务中了。用户可以创建自己的 DAO 来实现ItemWriter
接口,或者使用自定义的ItemWriter
接口,这是为通用处理问题编写的。无论哪种方式,它们的工作都应该没有任何问题。需要注意的一点是批处理输出所提供的性能和错误处理能力。当使用 Hibernate 作为ItemWriter
时,这是最常见的,但是当使用 JDBC 批处理模式时,可能会有相同的问题。批处理数据库输出没有任何固有的缺陷,前提是我们要小心刷新,并且数据中没有错误。然而,书写时的任何错误都可能导致混淆,因为无法知道是哪个单独的项目导致了异常,或者即使是任何单独的项目是负责任的,如下图所示:
图 4.刷新错误
如果项目在写入之前被缓冲,则在提交之前刷新缓冲区之前不会抛出任何错误。例如,假设每个块写 20 个项,第 15 个项抛出一个DataIntegrityViolationException
。就Step
而言,所有 20 个项都已成功写入,因为只有在实际写入它们之前,才能知道发生了错误。一旦调用Session#flush()
,将清空缓冲区并命中异常。在这一点上,Step
是无能为力的。事务必须回滚。通常,此异常可能会导致跳过该项(取决于跳过/重试策略),然后不会再次写入该项。但是,在批处理场景中,无法知道是哪个项导致了问题。当故障发生时,整个缓冲区正在被写入。解决此问题的唯一方法是在每个项目之后进行刷新,如下图所示:
图 5.写错误
这是一个常见的用例,尤其是在使用 Hibernate 时,而ItemWriter
的实现的简单准则是在每次调用write()
时刷新。这样做允许可靠地跳过项, Spring 批处理在内部处理错误后对ItemWriter
的调用的粒度。
# 重用现有服务
批处理系统通常与其他应用程序样式结合使用。最常见的是在线系统,但它也可以通过移动每个应用程序样式使用的必要的大容量数据来支持集成,甚至支持厚客户机应用程序。由于这个原因,许多用户希望在其批处理作业中重用现有的 DAO 或其他服务是很常见的。 Spring 容器本身通过允许注入任何必要的类,使这一点变得相当容易。然而,可能存在现有服务需要充当ItemReader
或ItemWriter
的情况,要么是为了满足另一个 Spring 批处理类的依赖关系,要么是因为它确实是主要的ItemReader
的一个步骤。为每个需要包装的服务编写一个适配器类是相当琐碎的,但是由于这是一个常见的问题, Spring Batch 提供了实现:ItemReaderAdapter
和ItemWriterAdapter
。这两个类都通过调用委托模式来实现标准 Spring 方法,并且设置起来相当简单。
下面的 XML 示例使用ItemReaderAdapter
:
XML 配置
<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="generateFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
下面的 Java 示例使用ItemReaderAdapter
:
Java 配置
@Bean
public ItemReaderAdapter itemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(fooService());
reader.setTargetMethod("generateFoo");
return reader;
}
@Bean
public FooService fooService() {
return new FooService();
}
需要注意的一点是,targetMethod
的契约必须与read
的契约相同:当耗尽时,它返回null
。否则,它返回一个Object
。根据ItemWriter
的实现,任何其他方法都会阻止框架知道处理应该何时结束,从而导致无限循环或错误失败。
下面的 XML 示例使用ItemWriterAdapter
:
XML 配置
<bean id="itemWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<property name="targetObject" ref="fooService" />
<property name="targetMethod" value="processFoo" />
</bean>
<bean id="fooService" class="org.springframework.batch.item.sample.FooService" />
下面的 Java 示例使用ItemWriterAdapter
:
Java 配置
@Bean
public ItemWriterAdapter itemWriter() {
ItemWriterAdapter writer = new ItemWriterAdapter();
writer.setTargetObject(fooService());
writer.setTargetMethod("processFoo");
return writer;
}
@Bean
public FooService fooService() {
return new FooService();
}
# 防止状态持久性
默认情况下,所有ItemReader
和ItemWriter
实现在提交之前将其当前状态存储在ExecutionContext
中。然而,这可能并不总是理想的行为。例如,许多开发人员选择通过使用过程指示器使他们的数据库阅读器“可重新运行”。在输入数据中添加一个额外的列,以指示是否对其进行了处理。当读取(或写入)特定记录时,处理后的标志从false
翻转到true
。然后,SQL 语句可以在where
子句中包含一个额外的语句,例如where PROCESSED_IND = false
,从而确保在重新启动的情况下仅返回未处理的记录。在这种情况下,最好不要存储任何状态,例如当前行号,因为它在重新启动时是不相关的。由于这个原因,所有的读者和作者都包括“SaveState”财产。
Bean 下面的定义展示了如何防止 XML 中的状态持久性:
XML 配置
<bean id="playerSummarizationSource" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.batch.sample.PlayerSummaryMapper" />
</property>
<property name="saveState" value="false" />
<property name="sql">
<value>
SELECT games.player_id, games.year_no, SUM(COMPLETES),
SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),
SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),
SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)
from games, players where players.player_id =
games.player_id group by games.player_id, games.year_no
</value>
</property>
</bean>
Bean 下面的定义展示了如何在 Java 中防止状态持久性:
Java 配置
@Bean
public JdbcCursorItemReader playerSummarizationSource(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<PlayerSummary>()
.dataSource(dataSource)
.rowMapper(new PlayerSummaryMapper())
.saveState(false)
.sql("SELECT games.player_id, games.year_no, SUM(COMPLETES),"
+ "SUM(ATTEMPTS), SUM(PASSING_YARDS), SUM(PASSING_TD),"
+ "SUM(INTERCEPTIONS), SUM(RUSHES), SUM(RUSH_YARDS),"
+ "SUM(RECEPTIONS), SUM(RECEPTIONS_YARDS), SUM(TOTAL_TD)"
+ "from games, players where players.player_id ="
+ "games.player_id group by games.player_id, games.year_no")
.build();
}
上面配置的ItemReader
不会在ExecutionContext
中为其参与的任何执行创建任何条目。
# 创建自定义项目阅读器和项目编写器
到目前为止,本章已经讨论了 Spring 批处理中的读和写的基本契约,以及这样做的一些常见实现。然而,这些都是相当通用的,并且有许多潜在的场景可能不会被开箱即用的实现所覆盖。本节通过使用一个简单的示例,展示了如何创建自定义ItemReader
和ItemWriter
实现,并正确地实现它们的契约。ItemReader
还实现了ItemStream
,以说明如何使读取器或写入器重新启动。
# 自定义ItemReader
示例
为了这个示例的目的,我们创建了一个简单的ItemReader
实现,该实现从提供的列表中读取数据。我们首先实现ItemReader
的最基本契约,即read
方法,如以下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类获取一个项目列表,并一次返回一个项目,将每个项目从列表中删除。当列表为空时,它返回null
,从而满足ItemReader
的最基本要求,如下面的测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
# 使ItemReader
可重启
最后的挑战是使ItemReader
重新启动。目前,如果处理被中断并重新开始,ItemReader
必须在开始时开始。这实际上在许多场景中都是有效的,但有时更可取的做法是,在批处理作业停止的地方重新启动它。关键的判别式通常是读者是有状态的还是无状态的。无状态的读者不需要担心重启性,但是有状态的读者必须尝试在重新启动时重建其最后已知的状态。出于这个原因,我们建议你在可能的情况下保持自定义阅读器的无状态,这样你就不必担心重启性了。
如果确实需要存储状态,那么应该使用ItemStream
接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
在每次调用ItemStream``update
方法时,ItemReader
的当前索引都存储在提供的ExecutionContext
中,其键为“current.index”。当调用ItemStream``open
方法时,将检查ExecutionContext
是否包含带有该键的条目。如果找到了键,则将当前索引移动到该位置。这是一个相当微不足道的例子,但它仍然符合一般合同:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数ItemReaders
都有更复杂的重启逻辑。例如,JdbcCursorItemReader
将最后处理的行的行 ID 存储在游标中。
还值得注意的是,ExecutionContext
中使用的键不应该是微不足道的。这是因为相同的ExecutionContext
用于ItemStreams
中的所有Step
。在大多数情况下,只需在键前加上类名就足以保证唯一性。然而,在很少的情况下,在相同的步骤中使用两个相同类型的ItemStream
(如果需要输出两个文件,可能会发生这种情况),则需要一个更唯一的名称。由于这个原因,许多 Spring 批处理ItemReader
和ItemWriter
实现都有一个setName()
属性,该属性允许重写这个键名。
# 自定义ItemWriter
示例
实现自定义ItemWriter
在许多方面与上面的ItemReader
示例相似,但在足够多的方面有所不同,以保证它自己的示例。然而,添加可重启性本质上是相同的,因此在本例中不涉及它。与ItemReader
示例一样,使用List
是为了使示例尽可能简单:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(List<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
# 使ItemWriter
重新启动
要使ItemWriter
可重启,我们将遵循与ItemReader
相同的过程,添加并实现ItemStream
接口以同步执行上下文。在这个示例中,我们可能必须计算处理的项目的数量,并将其添加为页脚记录。如果需要这样做,我们可以在ItemWriter
中实现ItemStream
,这样,如果流被重新打开,计数器将从执行上下文中重新构造。
在许多实际的情况下,自定义ItemWriters
也会委托给另一个本身是可重启的编写器(例如,当写到文件时),或者它会写到事务资源,因此不需要重启,因为它是无状态的。当你有一个有状态的编写器时,你可能应该确保实现ItemStream
以及ItemWriter
。还请记住,Writer 的客户机需要知道ItemStream
,因此你可能需要在配置中将其注册为流。
# 项读取器和编写器实现
在本节中,我们将向你介绍在前几节中尚未讨论过的读者和作者。
# 装饰者
在某些情况下,用户需要将专门的行为附加到预先存在的ItemReader
。 Spring Batch 提供了一些开箱即用的装饰器,它们可以将额外的行为添加到你的ItemReader
和ItemWriter
实现中。
Spring 批处理包括以下装饰器:
[
SynchronizedItemStreamReader
][
SingleItemPeekableItemReader
][
SynchronizedItemStreamWriter
][
MultiResourceItemWriter
][
ClassifierCompositeItemWriter
][
ClassifierCompositeItemProcessor
]
# SynchronizedItemStreamReader
当使用不是线程安全的ItemReader
时, Spring Batch 提供SynchronizedItemStreamReader
decorator,该 decorator 可用于使ItemReader
线程安全。 Spring 批处理提供了一个SynchronizedItemStreamReaderBuilder
来构造SynchronizedItemStreamReader
的实例。
# SingleItemPeekableItemReader
Spring 批处理包括向ItemReader
添加 PEEK 方法的装饰器。这种 peek 方法允许用户提前查看一项。对 Peek 的重复调用返回相同的项,这是从read
方法返回的下一个项。 Spring 批处理提供了一个SingleItemPeekableItemReaderBuilder
来构造SingleItemPeekableItemReader
的实例。
SingleitemPeekableitemreader 的 Peek 方法不是线程安全的,因为它不可能 在多个线程中执行 Peek。窥视 的线程中只有一个会在下一次调用中获得要读取的项。 |
---|
# SynchronizedItemStreamWriter
当使用不是线程安全的ItemWriter
时, Spring Batch 提供SynchronizedItemStreamWriter
decorator,该 decorator 可用于使ItemWriter
线程安全。 Spring 批处理提供了一个SynchronizedItemStreamWriterBuilder
来构造SynchronizedItemStreamWriter
的实例。
# MultiResourceItemWriter
当当前资源中写入的项数超过itemCountLimitPerResource
时,MultiResourceItemWriter
包装一个ResourceAwareItemWriterItemStream
并创建一个新的输出资源。 Spring 批处理提供了一个MultiResourceItemWriterBuilder
来构造MultiResourceItemWriter
的实例。
# ClassifierCompositeItemWriter
ClassifierCompositeItemWriter
调用用于每个项的ItemWriter
实现的集合之一,该实现基于通过提供的Classifier
实现的路由器模式。如果所有委托都是线程安全的,则实现是线程安全的。 Spring 批处理提供了一个ClassifierCompositeItemWriterBuilder
来构造ClassifierCompositeItemWriter
的实例。
# ClassifierCompositeItemProcessor
ClassifierCompositeItemProcessor
是一个ItemProcessor
,它调用ItemProcessor
实现的集合之一,该实现基于通过所提供的Classifier
实现的路由器模式。 Spring 批处理提供了一个ClassifierCompositeItemProcessorBuilder
来构造ClassifierCompositeItemProcessor
的实例。
# 消息阅读器和消息编写器
Spring Batch 为常用的消息传递系统提供了以下读取器和编写器:
[
AmqpItemReader
][
AmqpItemWriter
][
JmsItemReader
][
JmsItemWriter
][
KafkaItemReader
][
KafkaItemWriter
]
# AmqpItemReader
AmqpItemReader
是一个ItemReader
,它使用AmqpTemplate
来接收或转换来自交换的消息。 Spring 批处理提供了一个AmqpItemReaderBuilder
来构造AmqpItemReader
的实例。
# AmqpItemWriter
AmqpItemWriter
是一个ItemWriter
,它使用AmqpTemplate
向 AMQP 交换发送消息。如果提供的AmqpTemplate
中未指定名称,则将消息发送到无名交换机。 Spring 批处理提供了AmqpItemWriterBuilder
来构造AmqpItemWriter
的实例。
# JmsItemReader
对于使用JmsTemplate
的 JMS,ItemReader
是ItemReader
。模板应该有一个默认的目标,它用于为read()
方法提供项。 Spring 批处理提供了一个JmsItemReaderBuilder
来构造JmsItemReader
的实例。
# JmsItemWriter
对于使用JmsTemplate
的 JMS,ItemWriter
是ItemWriter
。模板应该有一个默认的目的地,用于在write(List)
中发送项。 Spring 批处理提供了一个JmsItemWriterBuilder
来构造JmsItemWriter
的实例。
# KafkaItemReader
对于 Apache Kafka 主题,KafkaItemReader
是ItemReader
。可以将其配置为从同一主题的多个分区中读取消息。它在执行上下文中存储消息偏移量,以支持重新启动功能。 Spring 批处理提供了一个KafkaItemReaderBuilder
来构造KafkaItemReader
的实例。
# KafkaItemWriter
KafkaItemWriter
是用于 Apache Kafka 的ItemWriter
,它使用KafkaTemplate
将事件发送到默认主题。 Spring 批处理提供了一个KafkaItemWriterBuilder
来构造KafkaItemWriter
的实例。
# 数据库阅读器
Spring Batch 提供以下数据库阅读器:
[
Neo4jItemReader
](#NEO4jitemreader)[
MongoItemReader
][
HibernateCursorItemReader
][
HibernatePagingItemReader
][
RepositoryItemReader
]
# Neo4jItemReader
Neo4jItemReader
是一个ItemReader
,它使用分页技术从图数据库 NEO4j 中读取对象。 Spring 批处理提供了一个Neo4jItemReaderBuilder
来构造Neo4jItemReader
的实例。
# MongoItemReader
MongoItemReader
是一个ItemReader
,它使用分页技术从 MongoDB 读取文档。 Spring 批处理提供了一个MongoItemReaderBuilder
来构造MongoItemReader
的实例。
# HibernateCursorItemReader
HibernateCursorItemReader
是用于读取在 Hibernate 之上构建的数据库记录的ItemStreamReader
。它执行 HQL 查询,然后在初始化时,在调用read()
方法时对结果集进行迭代,依次返回与当前行对应的对象。 Spring 批处理提供了一个HibernateCursorItemReaderBuilder
来构造HibernateCursorItemReader
的实例。
# HibernatePagingItemReader
HibernatePagingItemReader
是一个ItemReader
,用于读取建立在 Hibernate 之上的数据库记录,并且一次只读取固定数量的项。 Spring 批处理提供了一个HibernatePagingItemReaderBuilder
来构造HibernatePagingItemReader
的实例。
# RepositoryItemReader
RepositoryItemReader
是通过使用PagingAndSortingRepository
读取记录的ItemReader
。 Spring 批处理提供了一个RepositoryItemReaderBuilder
来构造RepositoryItemReader
的实例。
# 数据库编写者
Spring Batch 提供以下数据库编写器:
[
Neo4jItemWriter
](#NEO4jitemwriter)[
MongoItemWriter
][
RepositoryItemWriter
][
HibernateItemWriter
][
JdbcBatchItemWriter
][
JpaItemWriter
][
GemfireItemWriter
]
# Neo4jItemWriter
Neo4jItemWriter
是一个ItemWriter
实现,它将写到 NEO4J 数据库。 Spring 批处理提供了一个Neo4jItemWriterBuilder
来构造Neo4jItemWriter
的实例。
# MongoItemWriter
MongoItemWriter
是一个ItemWriter
实现,它使用 Spring data 的MongoOperations
的实现将数据写到 MongoDB 存储。 Spring 批处理提供了一个MongoItemWriterBuilder
来构造MongoItemWriter
的实例。
# RepositoryItemWriter
RepositoryItemWriter
是来自 Spring 数据的ItemWriter
包装器。 Spring 批处理提供了一个RepositoryItemWriterBuilder
来构造RepositoryItemWriter
的实例。
# HibernateItemWriter
HibernateItemWriter
是一个ItemWriter
,它使用一个 Hibernate 会话来保存或更新不是当前 Hibernate 会话的一部分的实体。 Spring 批处理提供了一个HibernateItemWriterBuilder
来构造HibernateItemWriter
的实例。
# JdbcBatchItemWriter
JdbcBatchItemWriter
是一个ItemWriter
,它使用NamedParameterJdbcTemplate
中的批处理特性来为提供的所有项执行一批语句。 Spring 批处理提供了一个JdbcBatchItemWriterBuilder
来构造JdbcBatchItemWriter
的实例。
# JpaItemWriter
JpaItemWriter
是一个ItemWriter
,它使用 JPA EntityManagerFactory
来合并不属于持久性上下文的任何实体。 Spring 批处理提供了一个JpaItemWriterBuilder
来构造JpaItemWriter
的实例。
# GemfireItemWriter
GemfireItemWriter
是一个ItemWriter
,它使用一个GemfireTemplate
将项目存储在 Gemfire 中,作为键/值对。 Spring 批处理提供了一个GemfireItemWriterBuilder
来构造GemfireItemWriter
的实例。
# 专业阅读器
Spring Batch 提供以下专门的阅读器:
[
LdifReader
][
MappingLdifReader
][
AvroItemReader
]
# LdifReader
AvroItemWriter
读取来自Resource
的 LDIF(LDAP 数据交换格式)记录,对它们进行解析,并为执行的每个LdapAttribute
返回一个LdapAttribute
对象。 Spring 批处理提供了一个LdifReaderBuilder
来构造LdifReader
的实例。
# MappingLdifReader
MappingLdifReader
从Resource
读取 LDIF(LDAP 数据交换格式)记录,解析它们,然后将每个 LDIF 记录映射到 POJO(普通的旧 Java 对象)。每个读都返回一个 POJO。 Spring 批处理提供了一个MappingLdifReaderBuilder
来构造MappingLdifReader
的实例。
# AvroItemReader
AvroItemReader
从资源中读取序列化的 AVRO 数据。每个读取返回由 Java 类或 AVRO 模式指定的类型的实例。读取器可以被可选地配置为嵌入 AVRO 模式的输入或不嵌入该模式的输入。 Spring 批处理提供了一个AvroItemReaderBuilder
来构造AvroItemReader
的实例。
# 专业作家
Spring Batch 提供以下专业的写作人员:
[
SimpleMailMessageItemWriter
][
AvroItemWriter
]
# SimpleMailMessageItemWriter
SimpleMailMessageItemWriter
是可以发送邮件的ItemWriter
。它将消息的实际发送委托给MailSender
的实例。 Spring 批处理提供了一个SimpleMailMessageItemWriterBuilder
来构造SimpleMailMessageItemWriter
的实例。
# AvroItemWriter
AvroItemWrite
根据给定的类型或模式将 Java 对象序列化到一个 WriteableResource。编写器可以被可选地配置为在输出中嵌入或不嵌入 AVRO 模式。 Spring 批处理提供了一个AvroItemWriterBuilder
来构造AvroItemWriter
的实例。
# 专用处理器
Spring Batch 提供以下专门的处理器:
- [
ScriptItemProcessor
]
# ScriptItemProcessor
ScriptItemProcessor
是一个ItemProcessor
,它将当前项目传递给提供的脚本,并且该脚本的结果将由处理器返回。 Spring 批处理提供了一个ScriptItemProcessorBuilder
来构造ScriptItemProcessor
的实例。