学习使用Spring batch从CSV文件读取记录,并使用 StaxEventItemWriter
输出经过处理的记录转换为 XML 的数据。
JobLauncher: 顾名思义,该领域对象就是Job的启动器,其作用就是绑定一组JobParameters到Job上,然后运行该Job。
Job: 定义,配置批处理任务的领域对象,该对象的作用,第一是做Step的容器,配置该批处理任务需要的Step,以及他们之间的逻辑关系。第二是配置该批处理任务的特征,比方说名字,是否可重启,是否对JobParameters进行验证以及验证规则等。
Step: 定义批处理任务中一个对立的逻辑任务处理单元。基本上的业务逻辑处理代码都是封装在Step中的。Step有2种实现形式,一种是Tasklet形式的,这种形式非常自由,开发人员只需要实现Tasklet接口,其中的逻辑完全有自己决定,另一种是Chunk-Oriented形式的,这种形式定义了一个Step的流程必须是“读-处理(可选)-写”,当然Spring Batch也对每一个步骤提供了接口ItemReader, ItemProcessor,ItemWriter还有很多常用的默认实现(读文件,读数据库,写文件,写数据库等等)。 每一个Step只能由一个Tasklet或者一个Chunk构成。
JobRepository: 该领域对象会为Spring Batch的运维数据提供一种持久化机制。其为所有的运维数据的提供CRUD的操作接口,并为所有的操作提供事务支持。
项目概述 在这个应用程序中,我们将执行以下任务:
使用 FlatFileItemReader
从CSV文件读取交易记录
使用 CustomItemProcessor
进行项目的业务处理。当 ItemReader
读取一个项目,而 ItemWriter
写入它们时,ItemProcessor
提供一个转换或应用其他业务处理的访问点。
使用 StaxEventItemWriter
获取 CustomItemProcessor
的处理结果,并将它转换成 XML 类型数据作为最终输出。
使用 MyBatisBatchItemWriter
获取 CustomItemProcessor
的处理结果,并将它转换成 XML 类型数据作为最终输出。
查看MySQL
工程结构
Maven 依赖 sqlite-jdbc
和 mysql-connector-java
可以选择其中一个。 当选择其中一种时,同时也要在 applicationContext.xml
文件中做出相应的改动。
改动:
依赖的版本由 platform-bom
来统一管理
添加 mybatis
, mybatis-spring
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.littlefxc.example</groupId > <artifactId > Spring-CSV-to-DB</artifactId > <version > 1.0-snapshot</version > <properties > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <spring.version > 5.0.9.RELEASE</spring.version > <spring.batch.version > 4.0.1.RELEASE</spring.batch.version > <sqlite.version > 3.8.11.2</sqlite.version > <mysql.version > 5.1.47</mysql.version > </properties > <dependencyManagement > <dependencies > <dependency > <groupId > io.spring.platform</groupId > <artifactId > platform-bom</artifactId > <version > Cairo-RELEASE</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <dependencies > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > </dependency > <dependency > <groupId > org.springframework</groupId > <artifactId > spring-oxm</artifactId > </dependency > <dependency > <groupId > org.mybatis</groupId > <artifactId > mybatis</artifactId > <version > 3.5.0</version > </dependency > <dependency > <groupId > org.mybatis</groupId > <artifactId > mybatis-spring</artifactId > <version > 2.0.0</version > </dependency > <dependency > <groupId > org.springframework</groupId > <artifactId > spring-jdbc</artifactId > </dependency > <dependency > <groupId > org.springframework.batch</groupId > <artifactId > spring-batch-core</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies > </project >
applicationContext.xml 我们将使用 FlatFileItemReader
读取 CSV 文件。 我们将使用它的标准配置,包括 DefaultLineMapper
,DelimitedLineTokenizer
和 BeanWrapperFieldSetMapper
类。 为了在XML文件中输出记录,我们将使用 StaxEventItemWriter
作为标准编写器。
改动:
将输出XML变为输出到mysql
Spring Batch 持久层框架由 spring-jdbc
改为 mybatis
, mybatis-spring
当然,原来的输出 itemWriter
去掉注释后,仍然起作用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:jdbc ="http://www.springframework.org/schema/jdbc" xmlns:batch ="http://www.springframework.org/schema/batch" xmlns:p ="http://www.springframework.org/schema/p" xmlns:context ="http://www.springframework.org/schema/context" xmlns:tx ="http://www.springframework.org/schema/tx" xsi:schemaLocation =" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd" > <context:property-placeholder location ="classpath:application.properties" /> <context:component-scan base-package ="com.littlefxc.examples.batch" /> <jdbc:initialize-database > <jdbc:script location ="${batch.schema-drop}" /> <jdbc:script location ="${batch.schema-create}" /> <jdbc:script location ="${project.schema-drop}" /> <jdbc:script location ="${project.schema-create}" /> </jdbc:initialize-database > <bean id ="dataSource" class ="org.springframework.jdbc.datasource.DriverManagerDataSource" p:driverClassName ="${jdbc.driver-class-name}" p:url ="${jdbc.url}" p:username ="${jdbc.username}" p:password ="${jdbc.password}" /> <bean id ="transactionManager" class ="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref ="dataSource" /> <tx:annotation-driven /> <bean id ="sqlSessionFactory" class ="org.mybatis.spring.SqlSessionFactoryBean" p:dataSource-ref ="dataSource" p:typeAliasesPackage ="${mybatis.type-aliases-package}" p:configLocation ="${mybatis.configuration}" /> <bean class ="org.mybatis.spring.mapper.MapperScannerConfigurer" p:basePackage ="com.littlefxc.examples.batch.dao" p:sqlSessionFactoryBeanName ="sqlSessionFactory" /> <bean id ="jobRepository" class ="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref ="dataSource" p:transactionManager-ref ="transactionManager" p:databaseType ="mysql" /> <bean id ="jobLauncher" class ="org.springframework.batch.core.launch.support.SimpleJobLauncher" p:jobRepository-ref ="jobRepository" /> <bean id ="itemReader" class ="org.springframework.batch.item.file.FlatFileItemReader" > <property name ="resource" value ="input/record.csv" /> <property name ="linesToSkip" value ="1" /> <property name ="lineMapper" > <bean class ="org.springframework.batch.item.file.mapping.DefaultLineMapper" > <property name ="lineTokenizer" > <bean class ="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" > <property name ="names" value ="username,user_id,transaction_date,transaction_amount" /> </bean > </property > <property name ="fieldSetMapper" ref ="recordFieldSetMapper" /> </bean > </property > </bean > <bean id ="itemProcessor" class ="com.littlefxc.examples.batch.service.CustomItemProcessor" /> <bean id ="itemWriter" class ="org.mybatis.spring.batch.MyBatisBatchItemWriter" > <property name ="sqlSessionFactory" ref ="sqlSessionFactory" /> <property name ="statementId" value ="insertTransactionRecord" /> </bean > <batch:job id ="firstBatchJob" > <batch:step id ="step1" > <batch:tasklet > <batch:chunk reader ="itemReader" processor ="itemProcessor" writer ="itemWriter" commit-interval ="2" /> </batch:tasklet > </batch:step > </batch:job > </beans >
RecordFieldSetMapper ItemReader
的属性,作用是将 FieldSet
转换为对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.littlefxc.examples.batch.service;import com.littlefxc.examples.batch.model.TransactionRecord;import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;import org.springframework.validation.BindException;import java.text.ParseException;import java.text.SimpleDateFormat;public class RecordFieldSetMapper implements FieldSetMapper <Transaction > { public Transaction mapFieldSet (FieldSet fieldSet) throws BindException { SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy" ); Transaction transactionRecord = new Transaction(); transactionRecord.setUsername(fieldSet.readString("username" )); transactionRecord.setUserId(fieldSet.readInt("user_id" )); transactionRecord.setAmount(fieldSet.readDouble("transaction_amount" )); String dateString = fieldSet.readString("transaction_date" ); try { transactionRecord.setTransactionDate(dateFormat.parse(dateString)); } catch (ParseException e) { e.printStackTrace(); } return transactionRecord; } }
CustomItemProcessor 自定义实现接口 ItemProcessor
, 作为 ItemReader
和 ItemWriter
的转换点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package com.littlefxc.examples.batch.service;import com.littlefxc.examples.batch.model.TransactionRecord;import org.springframework.batch.item.file.mapping.FieldSetMapper;import org.springframework.batch.item.file.transform.FieldSet;import org.springframework.validation.BindException;import java.text.ParseException;import java.text.SimpleDateFormat;public class RecordFieldSetMapper implements FieldSetMapper <Transaction > { public Transaction mapFieldSet (FieldSet fieldSet) throws BindException { SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy" ); Transaction transactionRecord = new Transaction(); transactionRecord.setUsername(fieldSet.readString("username" )); transactionRecord.setUserId(fieldSet.readInt("user_id" )); transactionRecord.setAmount(fieldSet.readDouble("transaction_amount" )); String dateString = fieldSet.readString("transaction_date" ); try { transactionRecord.setTransactionDate(dateFormat.parse(dateString)); } catch (ParseException e) { e.printStackTrace(); } return transactionRecord; } }
模型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.littlefxc.examples.batch.model;import lombok.Data;import javax.xml.bind.annotation.XmlRootElement;import java.util.Date;@Data @XmlRootElement(name = "transactionRecord") public class Transaction { private String username; private int userId; private Date transactionDate; private double amount; }
record.csv 1 2 3 devendra, 1234, 31/10/2015, 10000 john , 2134, 3/12/2015 , 12321 robin , 2134, 2/02/2015 , 23411
启动程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.littlefxc.examples.batch;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobExecution;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.context.support.ClassPathXmlApplicationContext;public class App { public static void main (String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(); context.setConfigLocations("classpath:spring-context.xml" ); context.refresh(); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher" ); Job job = (Job) context.getBean("firstBatchJob" ); System.out.println("Starting the batch job" ); try { JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Job Status : " + execution.getStatus()); System.out.println("Job completed" ); } catch (Exception e) { e.printStackTrace(); System.out.println("Job failed" ); } } }
验证 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 <?xml version="1.0" encoding="UTF-8"?> <transactionRecord > <transactionRecord > <amount > 10000.0</amount > <transactionDate > 2015-10-31T00:00:00+08:00</transactionDate > <userId > 1234</userId > <username > devendra</username > </transactionRecord > <transactionRecord > <amount > 12321.0</amount > <transactionDate > 2015-12-03T00:00:00+08:00</transactionDate > <userId > 2134</userId > <username > john</username > </transactionRecord > <transactionRecord > <amount > 23411.0</amount > <transactionDate > 2015-02-02T00:00:00+08:00</transactionDate > <userId > 2134</userId > <username > robin</username > </transactionRecord > </transactionRecord >
附录: application.properties 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 batch.schema-drop=org/springframework/batch/core/schema-drop-mysql.sql batch.schema-create=org/springframework/batch/core/schema-mysql.sql batch.sql=INSERT INTO transaction_record (user_id, username, transaction_date, amount) VALUES (:userId, :username, :transactionDate, :amount) jdbc.url=jdbc:mysql://192.168.120.63:3306/batch?useSSL=false jdbc.username=root jdbc.password=123456 jdbc.driver-class-name=com.mysql.jdbc.Driver project.schema-drop=classpath:schema-drop.sql project.schema-create=classpath:schema.sql mybatis.configuration=classpath:mybatis-config.xml mybatis.type-aliases-package=com.littlefxc.examples.batch.model mybatis.mapper.base-package=com.littlefxc.examples.batch.dao
mybatis-config.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 <?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd" > <configuration > <settings > <setting name ="cacheEnabled" value ="true" /> <setting name ="lazyLoadingEnabled" value ="true" /> <setting name ="multipleResultSetsEnabled" value ="true" /> <setting name ="useColumnLabel" value ="true" /> <setting name ="useGeneratedKeys" value ="false" /> <setting name ="autoMappingBehavior" value ="PARTIAL" /> <setting name ="autoMappingUnknownColumnBehavior" value ="WARNING" /> <setting name ="defaultExecutorType" value ="SIMPLE" /> <setting name ="defaultStatementTimeout" value ="25" /> <setting name ="defaultFetchSize" value ="100" /> <setting name ="safeRowBoundsEnabled" value ="false" /> <setting name ="mapUnderscoreToCamelCase" value ="false" /> <setting name ="localCacheScope" value ="SESSION" /> <setting name ="jdbcTypeForNull" value ="OTHER" /> <setting name ="lazyLoadTriggerMethods" value ="equals,clone,hashCode,toString" /> </settings > </configuration >
schema.sql 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 SET NAMES utf8mb4;SET FOREIGN_KEY_CHECKS = 0 ;# DROP TABLE IF EXISTS `transaction_record`; CREATE TABLE `transaction_record` ( `username` varchar (255 ) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL , `user_id` bigint (20 ) NOT NULL , `transaction_date` datetime(6 ) NOT NULL , `amount` double (11 , 0 ) NOT NULL , PRIMARY KEY (`username`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic ; SET FOREIGN_KEY_CHECKS = 1 ;
schema-drop.sql 1 DROP TABLE IF EXISTS `transaction_record`;