批处理的核心场景
Spring Batch能解决的批处理场景
Spring Batch为批处理提供了一个轻量化的解决方案,它根据批处理的需要迭代处理各种记录,提供事物功能。但是Spring Batch仅仅适用于"脱机"场景,在处理的过程中不能和外部进行任何交互,也不允许有任何输入。
Spring Batch的目标
Spring Batch结构

如上图,通常情况下一个独立的JVM程序就是仅仅用于处理批处理,而不要和其他功能重叠。 在最后一层基础设置(Infrastructure)部分主要分为3个部分。JobLauncher、Job以及Step。每一个Step又细分为ItemReader、ItemProcessor、ItemWirte。使用Spring Batch主要就是知道每一个基础设置负责的内容,然后在对应的设施中实现对应的业务。
Spring Batch 批处理原则与建议
当我们构建一个批处理的过程时,必须注意以下原则:
批处理的通用策略
和软件开发的设计模式一样,批处理也有各种各样的现成模式可供参考。当一个开发(设计)人员开始执行批处理任务时,应该将业务逻辑拆分为一下的步骤或者板块分批执行:
以上五个步骤是一个标准的数据批处理过程,Spring batch框架为业务实现提供了以上几个功能入口。
数据额外处理
某些情况需要实现对数据进行额外处理,在进入批处理之前通过其他方式将数据进行处理。主要内容有:
常规数据源
批处理的数据源通常包括:
典型的处理过程
在执行2,3点批处理时需要注意事物隔离等级。
Spring Batch批处理的核心概念
下图是批处理的核心流程图。

Spring Batch同样按照批处理的标准实现了各个层级的组件。并且在框架级别保证数据的完整性和事物性。
如图所示,在一个标准的批处理任务中涵盖的核心概念有JobLauncher、Job、Step,一个Job可以涵盖多个Step,一个Job对应一个启动的JobLauncher。一个Step中分为ItemReader、ItemProcessor、ItemWriter,根据字面意思它们分别对应数据提取、数据处理和数据写入。此外JobLauncher、Job、Step会产生元数据(Metadata),它们会被存储到JobRepository中。
Job
简单的说Job是封装一个批处理过程的实体,与其他的Spring项目类似,Job可以通过XML或Java类配置,称为“Job Configuration”。如下图Job是单个批处理的最顶层。

为了便于理解,可以简单的将Job理解为是每一步(Step)实例的容器。他结合了多个Step,为它们提供统一的服务同时也为Step提供个性化的服务,比如步骤重启。通常情况下Job的配置包含以下内容:
Spring Batch为Job接口提供了默认的实现——SimpleJob,其中实现了一些标准的批处理方法。下面的代码展示了如可注入一个Job。
@Bean
public Job footballJob() {
return this.jobBuilderFactory.get("footballJob") //get中命名了Job的名称
.start(playerLoad()) //playerLoad、gameLoad、playerSummarization都是Step
.next(gameLoad())
.next(playerSummarization())
.end()
.build();
}
JobInstance
JobInstance是指批处理作业运行的实例。例如一个批处理必须在每天执行一次,系统在2019年5月1日执行了一次我们称之为2019-05-01的实例,类似的还会有2019-05-02、2019-05-03实例。通常情况下,一个JobInstance对应一个JobParameters,对应多个JobExecution。(JobParameters、JobExecution见后文)。同一个JobInstance具有相同的上下文(ExecutionContext内容见后文)。
JobParameters
前面讨论了JobInstance与Job的区别,但是具体的区别内容都是通过JobParameters体现的。一个JobParameters对象中包含了一系列Job运行相关的参数,这些参数可以用于参考或者用于实际的业务使用。对应的关系如下图:

当我们执行2个不同的JobInstance时JobParameters中的属性都会有差异。可以简单的认为一个JobInstance的标识就是Job+JobParameters。
JobExecution
JobExecution可以理解为单次运行Job的容器。一次JobInstance执行的结果可能是成功、也可能是失败。但是对于Spring Batch框架而言,只有返回运行成功才会视为完成一次批处理。例如2019-05-01执行了一次JobInstance,但是执行的过程失败,因此第二次还会有一个“相同的”的JobInstance被执行。
Job用于定义批处理如何执行,JobInstance纯粹的就是一个处理对象,把所有的运行内容和信息组织在一起,主要是为了当面临问题时定义正确的重启参数。而JobExecution是运行时的“容器”,记录动态运行时的各种属性和上线文。他包括的信息有:

以上这些内容Spring Batch都会通过JobRepository进行持久化(这些信息官方文成称之为MetaData),因此在对应的数据源中可以看到下列信息:
BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

当某个Job批处理任务失败之后会在对应的数据库表中路对应的状态。假设1月1号执行的任务失败,技术团队花费了大量的时间解决这个问题到了第二天21才继续执行这个任务。
BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

从数据上看好似JobInstance是一个接一个顺序执行的,但是对于Spring Batch并没有进行任何控制。不同的JobInstance很有可能是同时在运行(相同的JobInstance同时运行会抛出JobExecutionAlreadyRunningException异常)。
Step
Step是批处理重复运行的最小单元,它按照顺序定义了一次执行的必要过程。因此每个Job可以视作由一个或多个多个Step组成。一个Step包含了所有所有进行批处理的必要信息,这些信息的内容是由开发人员决定的并没有统一的标准。一个Step可以很简单,也可以很复杂。他可以是复杂业务的组合,也有可能仅仅用于迁移数据。与JobExecution的概念类似,Step也有特定的StepExecution,关系结构如下:

StepExecution表示单次执行Step的容器,每次Step执行时都会有一个新的StepExecution被创建。与JobExecution不同的是,当某个Step执行失败后默认并不会重新执行。StepExecution包含以下属性:

ExecutionContext
前文已经多次提到ExecutionContext。可以简单的认为ExecutionContext提供了一个Key/Value机制,在StepExecution和JobExecution对象的任何位置都可以获取到ExecutionContext中的任何数据。最有价值的作用是记录数据的执行位置,以便发生重启时候从对应的位置继续执行:
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition())
比如在任务中有一个名为“loadData”的Step,他的作用是从文件中读取数据写入到数据库,当第一次执行失败后,数据库中有如下数据:
BATCH_JOB_INSTANCE:

BATCH_JOB_EXECUTION_PARAMS:

BATCH_JOB_EXECUTION:

BATCH_STEP_EXECUTION:

BATCH_STEP_EXECUTION_CONTEXT: |STEP_EXEC_ID|SHORT_CONTEXT| |---|---| |1|{piece.count=40321}|
在上面的例子中,Step运行30分钟处理了40321个“pieces”,我们姑且认为“pieces”表示行间的行数(实际就是每个Step完成循环处理的个数)。这个值会在每个commit之前被更新记录在ExecutionContext中(更新需要用到StepListener后文会详细说明)。当我们再次重启这个Job时并记录在BATCH_STEP_EXECUTION_CONTEXT中的数据会加载到ExecutionContext中,这样当我们继续执行批处理任务时可以从上一次中断的位置继续处理。例如下面的代码在ItemReader中检查上次执行的结果,并从中断的位置继续执行:
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
ExecutionContext是根据JobInstance进行管理的,因此只要是相同的实例都会具备相同的ExecutionContext(无论是否停止)。此外通过以下方法都可以获得一个ExecutionContext:
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
但是这2个ExecutionContext并不相同,前者是在一个Step中每次Commit数据之间共享,后者是在Step与Step之间共享。
JobRepository
JobRepository是所有前面介绍的对象实例的持久化机制。他为JobLauncher、Job、Step的实现提供了CRUD操作。当一个Job第一次被启动时,一个JobExecution会从数据源中获取到,同时在执行的过程中StepExecution、JobExecution的实现都会记录到数据源中。使用@EnableBatchProcessing注解后JobRepository会进行自动化配置。
JobLauncher
JobLauncher为Job的启动运行提供了一个边界的入口,在启动Job的同时还可以定制JobParameters:
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
竟然都看到最后了,给小编点个关注吧,小编还会持续更新的,只收藏不点关注的都是在耍流氓!
| 留言与评论(共有 0 条评论) |