Srping Batch 实现批批处理任务
引言
Spring Batch 是一个轻量级的开源框架,它提供了一种简单的方式来处理大量的数据。它基于 Spring 框架,提供了一套批处理框架,可以处理各种类型的批处理任务,如 ETL、数据导入 / 导出、报表生成等。Spring Batch 提供了一些重要的概念,如 Job、Step、ItemReader、ItemProcessor、ItemWriter 等,这些概念可以帮助我们构建可重用的批处理应用程序。通过 Spring Batch,我们可以轻松地实现批处理的并发、容错、重试等功能,同时也可以方便地与其他 Spring 组件集成,如 Spring Boot、Spring Data 等。总之,Spring Batch 是一个非常强大、灵活、易于使用的批处理框架,可以帮助我们快速构建高效、可靠的批处理应用程序
本文目的主要是教大家如何快速地使用 Spring Boot 集成 Spring Batch 实现一个定时的批处理作业 Demo,所以不会对 Spring Batch 理论部分进行过多的介绍。
创建数据库
本文以操作数据库的批处理示例,当我们的批处理作业需要操作数据库时,Spring Batch 要求在数据库中创建好批处理作业的元数据的存储表格。如下,其中以 batch 开头的表,是 Spring Batch 用来存储每次执行作业所产生的元数据。
-- do not edit this file
-- BATCH JOB 实例表 包含与 aJobInstance 相关的所有信息
-- JOB ID 由 batch_job_seq 分配
-- JOB 名称, 与 spring 配置一致
-- JOB KEY 对 job 参数的 MD5 编码, 正因为有这个字段的存在,同一个 job 如果第一次运行成功,第二次再运行会抛出 JobInstanceAlreadyCompleteException 异常。
CREATE TABLE BATCH_JOB_INSTANCE (
JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_NAME VARCHAR(100) NOT NULL,
JOB_KEY VARCHAR(32) NOT NULL,
constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINE=InnoDB;
-- 该 BATCH_JOB_EXECUTION 表包含与该 JobExecution 对象相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT ,
JOB_INSTANCE_ID BIGINT NOT NULL,
CREATE_TIME DATETIME NOT NULL,
START_TIME DATETIME DEFAULT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)
references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINE=InnoDB;
-- 该表包含与该 JobParameters 对象相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (
JOB_EXECUTION_ID BIGINT NOT NULL ,
TYPE_CD VARCHAR(6) NOT NULL ,
KEY_NAME VARCHAR(100) NOT NULL ,
STRING_VAL VARCHAR(250) ,
DATE_VAL DATETIME DEFAULT NULL ,
LONG_VAL BIGINT ,
DOUBLE_VAL DOUBLE PRECISION ,
IDENTIFYING CHAR(1) NOT NULL ,
constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
-- 该表包含与该 StepExecution 对象相关的所有信息
CREATE TABLE BATCH_STEP_EXECUTION (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,
VERSION BIGINT NOT NULL,
STEP_NAME VARCHAR(100) NOT NULL,
JOB_EXECUTION_ID BIGINT NOT NULL,
START_TIME DATETIME NOT NULL ,
END_TIME DATETIME DEFAULT NULL ,
STATUS VARCHAR(10) ,
COMMIT_COUNT BIGINT ,
READ_COUNT BIGINT ,
FILTER_COUNT BIGINT ,
WRITE_COUNT BIGINT ,
READ_SKIP_COUNT BIGINT ,
WRITE_SKIP_COUNT BIGINT ,
PROCESS_SKIP_COUNT BIGINT ,
ROLLBACK_COUNT BIGINT ,
EXIT_CODE VARCHAR(2500) ,
EXIT_MESSAGE VARCHAR(2500) ,
LAST_UPDATED DATETIME,
constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
-- 该 BATCH_STEP_EXECUTION_CONTEXT 表包含 ExecutionContext 与 Step 相关的所有信息
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (
STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)
references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINE=InnoDB;
-- 该表包含 ExecutionContext 与 Job 相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (
JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,
SHORT_CONTEXT VARCHAR(2500) NOT NULL,
SERIALIZED_CONTEXT TEXT ,
constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINE=InnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (
ID BIGINT NOT NULL,
UNIQUE_KEY CHAR(1) NOT NULL,
constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINE=InnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, '0' as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ);
示例中使用的 tt_dot_flow 表
CREATE TABLE `tt_dot_flow` (
`id` bigint NOT NULL AUTO_INCREMENT,
`src_city_code` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '始发城市代码',
`dest_city_code` varchar(10) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '目的城市代码',
`src_city_name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '始发城市',
`dest_city_name` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '目的城市',
PRIMARY KEY (`id`),
UNIQUE KEY `tt_dot_flow_src_city_code_IDX` (`src_city_code`,`dest_city_code`)
) ENGINE=InnoDB AUTO_INCREMENT=8951 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
项目配置
核心依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
完整依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.1</version>
</parent>
<groupId>cn.phixlin</groupId>
<artifactId>spring-batch-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.6</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct</artifactId>
<version>1.5.5.Final</version>
</dependency>
<dependency>
<groupId>org.mapstruct</groupId>
<artifactId>mapstruct-processor</artifactId>
<version>1.5.5.Final</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
配置文件
spring:
datasource:
p6spy: ture
url: jdbc:mysql://ip:3306/databases?allowMultiQueries=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&serverTimezone=GMT%2B8&autoReconnect=true&useSSL=false&useCursorFetch=true&defaultFetchSize=100
username: username
password: password
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.zaxxer.hikari.HikariDataSource
hikari:
# 最大连接池数量
maxPoolSize: 20
# 最小空闲线程数量
minIdle: 0
# 配置获取连接等待超时的时间
connectionTimeout: 30000
# 校验超时时间
validationTimeout: 5000
# 空闲连接存活最大时间,默认 10 分钟
idleTimeout: 600000
# 此属性控制池中连接的最长生命周期,值 0 表示无限生命周期,默认 30 分钟
maxLifetime: 1800000
# 连接测试 query(配置检测连接是否有效)
connectionTestQuery: SELECT 1
# 多久检查一次连接的活性
keepaliveTime: 30000
mybatis-plus:
mapper-locations:
- classpath:mappers/*.xml
代码示例
dotflow 表的实体类
package cn.phixlin.domain.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@Data
@TableName("tt_dot_flow")
public class DotFlowEntity implements Serializable {
@TableId(value = "id", type = IdType.AUTO)
private Long id;
private String srcCityCode;
private String destCityCode;
private String srcCityName;
private String destCityName;
}
创建 ItemReader,示例为读取 csv 文件
private final static String[] names = new String[]{"src_city_code", "dest_city_code", "src_city_name", "dest_city_name"};
@Bean("readerForCsv")
public ItemReader<DotFlowDto> readerForCsv() {
return new FlatFileItemReaderBuilder<DotFlowDto>()
.name("demoReader")
.resource(new FileSystemResource("app/deploy/data/dot_flow.csv")).delimited()
.delimiter(",").names(names)
.fieldSetMapper(f -> new DotFlowDto()
.setSrcCityCode(f.readString("src_city_code"))
.setSrcCityName(f.readString("src_city_name"))
.setDestCityCode(f.readString("dest_city_code"))
.setDestCityName(f.readString("dest_city_name"))).build();
}
创建 ItemProcessor,示例为将数据转换为实体类
@Bean("processData")
public ItemProcessor<DotFlowDto, DotFlowEntity> processData(DotFlowMapstruct dotFlowMapstruct) {
return f -> {
log.info("process data src_city_name = {}", f.getSrcCityName());
return dotFlowMapstruct.toEntity(f);
};
}
创建 ItemWriter,示例为写入到数据库
@Bean("writeData")
public ItemWriter<DotFlowEntity> writeData(DotFlowService dotFlowService) {
return list -> dotFlowService.saveOrUpdateBatch(list.stream().map(f -> (DotFlowEntity) f).collect(Collectors.toList()));
}
创建 Step
@Bean(value = "demoStep")
public Step demoStep(StepBuilderFactory stepBuilderFactory,
@Qualifier("readerForCsv") ItemReader<DotFlowDto> readerForCsv,
@Qualifier("processData") ItemProcessor<DotFlowDto, DotFlowEntity> processData,
@Qualifier("writeData")ItemWriter<DotFlowEntity> writeData) {
return stepBuilderFactory.get("demoStep")
.<DotFlowDto, DotFlowEntity>chunk(1000).faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class)
.reader(readerForCsv)
.listener(new ItemReadListener<DotFlowDto>() {
@Override
public void beforeRead(){}
@Override
public void afterRead(DotFlowDto item) {
if (item.getSrcCityCode().equals("021")) log.info("item: {}", item);}
@Override
public void onReadError(Exception ex) {
log.error("read err {}", ex.getMessage(), ex);
}
}).processor(processData)
.writer(writeData)
.listener(new ItemWriteListener<DotFlowEntity>() {
@Override
public void beforeWrite(List<? extends DotFlowEntity> items){}
@Override
public void afterWrite(List<? extends DotFlowEntity> items) {
log.info("write finish size: {}", items.size());}
@Override
public void onWriteError(Exception exception, List<? extends DotFlowEntity> items){}
}).build();
}
创建 Job
@Bean("demoJob")
public Job demoJob(JobBuilderFactory jobBuilderFactory, @Qualifier("demoStep")Step demoStep) {
return jobBuilderFactory.get("demoJob").flow(demoStep)
.end()
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("domeJob 开始, id: {}", jobExecution.getJobId());}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("domoJob 结束: {}", jobExecution);}
}).build();
}
SpringBatchConfig.java 完整代码
package cn.phixlin.config;
import cn.phixlin.domain.entity.DotFlowEntity;
import cn.phixlin.domain.dto.DotFlowDto;
import cn.phixlin.mapstruct.DotFlowMapstruct;
import cn.phixlin.service.DotFlowService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import java.util.List;
import java.util.stream.Collectors;
@Slf4j
@Configuration
@EnableBatchProcessing
public class SpringBatchConfig {
private final static String[] names = new String[]{"src_city_code", "dest_city_code", "src_city_name", "dest_city_name"};
// 定义 Job
@Bean("demoJob")
public Job demoJob(JobBuilderFactory jobBuilderFactory, @Qualifier("demoStep")Step demoStep) {
return jobBuilderFactory.get("demoJob").flow(demoStep)
.end()
.listener(new JobExecutionListener() {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("domeJob 开始, id: {}", jobExecution.getJobId());}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("domoJob 结束: {}", jobExecution);}
}).build();
}
// 定义 Step
@Bean(value = "demoStep")
public Step demoStep(StepBuilderFactory stepBuilderFactory,
@Qualifier("readerForCsv") ItemReader<DotFlowDto> readerForCsv,
@Qualifier("processData") ItemProcessor<DotFlowDto, DotFlowEntity> processData,
@Qualifier("writeData")ItemWriter<DotFlowEntity> writeData) {
return stepBuilderFactory.get("demoStep")
.<DotFlowDto, DotFlowEntity>chunk(1000).faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class)
.reader(readerForCsv)
.listener(new ItemReadListener<DotFlowDto>() {
@Override
public void beforeRead(){}
@Override
public void afterRead(DotFlowDto item) {
if (item.getSrcCityCode().equals("021")) log.info("item: {}", item);}
@Override
public void onReadError(Exception ex) {
log.error("read err {}", ex.getMessage(), ex);
}
}).processor(processData)
.writer(writeData)
.listener(new ItemWriteListener<DotFlowEntity>() {
@Override
public void beforeWrite(List<? extends DotFlowEntity> items){}
@Override
public void afterWrite(List<? extends DotFlowEntity> items) {
log.info("write finish size: {}", items.size());}
@Override
public void onWriteError(Exception exception, List<? extends DotFlowEntity> items){}}).build();}
@Bean("readerForCsv")
public ItemReader<DotFlowDto> readerForCsv() {
return new FlatFileItemReaderBuilder<DotFlowDto>()
.name("demoReader")
.resource(new FileSystemResource("/app/deploy/data/dot_flow.csv")).delimited()
.delimiter(",").names(names)
.fieldSetMapper(f -> new DotFlowDto()
.setSrcCityCode(f.readString("src_city_code"))
.setSrcCityName(f.readString("src_city_name"))
.setDestCityCode(f.readString("dest_city_code"))
.setDestCityName(f.readString("dest_city_name"))).build();
}
@Bean("processData")
public ItemProcessor<DotFlowDto, DotFlowEntity> processData(DotFlowMapstruct dotFlowMapstruct) {
return f -> {
log.info("process data src_city_name = {}", f.getSrcCityName());
return dotFlowMapstruct.toEntity(f);
};
}
@Bean("writeData")
public ItemWriter<DotFlowEntity> writeData(DotFlowService dotFlowService) {
return list -> dotFlowService.saveOrUpdateBatch(list.stream().map(f -> (DotFlowEntity) f).collect(Collectors.toList()));
}
}
启动 Job,进行测试
package cn.phixlin.controller;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@RequiredArgsConstructor
public class TestController {
private final JobLauncher jobLauncher;
private final Job demoJob;
@RequestMapping(value = "testJob", method = RequestMethod.GET)
public void testJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
log.info("batch job start .....");
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis()).toJobParameters();
JobExecution run = jobLauncher.run(demoJob, jobParameters);
log.info("batch job end, Status: {}", run.getStatus());}
}
后记
在基本的批处理任务基础上,可以进一步扩展功能,使其更加完善和实用。例如:
- 多步骤批处理:一个 Job 可以包含多个 Step,每个 Step 可以有不同的 ItemReader、ItemProcessor 和 ItemWriter。
- 并行处理:通过配置多个线程或分布式处理,提升批处理任务的性能。
- 错误处理和重试:配置错误处理和重试机制,提高批处理任务的可靠性。
- 数据验证:在处理数据前进行数据验证,确保数据的正确性。