https://spring.io/guides/gs/batch-processing/
Introduction:
A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.
Spring Batch provides reusable functions that are essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. It also provides more advanced technical services and features that will enable extremely high-volume and high performance batch jobs through optimization and partitioning techniques. Simple as well as complex, high-volume batch jobs can leverage the framework in a highly scalable manner to process significant volumes of information.
Steps for using batch to load data from csv files, then convert data, finally wirte the processed data into database.
Define Job and Step 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Configuration @EnableBatchProcessing public class CsvFileImportJob { @Bean public Job initializeCsvFileImportJob ( JobBuilderFactory jobBuilderFactory, Step csvFileImportStep, CsvFileListener csvFileListener) { return jobBuilderFactory.get("initializeCsvFileImportJob" ) .preventRestart() .incrementer(new RunIdIncrementer()) .start(csvFileImportStep) .listener(csvFileListener) .build(); } @Bean public Step csvFileImportStep ( StepBuilderFactory stepBuilderFactory, CsvItemReader csvItemReader, CsvItemProcessor csvItemProcessor, CsvItemWriter csvItemWriter ) { return stepBuilderFactory.get("csvFileImportStep" ) .<Person, Person>chunk(1 ) .reader(csvItemReader) .processor(csvItemProcessor) .writer(csvItemWriter) .build(); } }
Define ItemReader 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Component @Slf4j @JobScope public class CsvItemReader extends MultiResourceItemReader <Person > { @Value("#{jobExecutionContext['fileNames']}") private List<String> fileNames; @PostConstruct public void initializeReader () { log.info("Reader... initialize" ); log.info("fileNames:{}" , fileNames); setResources(initResources()); setDelegate(initFlatFileItemReader()); } @SneakyThrows private Resource[] initResources() { return fileNames.stream().map(ClassPathResource::new ).toArray(Resource[]::new ); } private FlatFileItemReader<Person> initFlatFileItemReader () { return new FlatFileItemReaderBuilder<Person>() .name("Csv-File-Reader" ) .linesToSkip(1 ) .skippedLinesCallback(s -> { log.info("skip first line: {}" , s); }) .lineMapper(lineMapper()) .build(); } private LineMapper<Person> lineMapper () { DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer(); delimitedLineTokenizer.setDelimiter("," ); delimitedLineTokenizer.setNames(Arrays.stream(Person.class.getDeclaredFields()).map(Field::getName).toArray(String[]::new )); DefaultLineMapper<Person> defaultLineMapper = new DefaultLineMapper<>(); defaultLineMapper.setLineTokenizer(delimitedLineTokenizer); BeanWrapperFieldSetMapper<Person> fieldSetMapper = new BeanWrapperFieldSetMapper<>(); fieldSetMapper.setTargetType(Person.class); defaultLineMapper.setFieldSetMapper(fieldSetMapper); return defaultLineMapper; } }
Define ItemProcessor 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component @Slf4j public class CsvItemProcessor implements ItemProcessor <Person , Person > { @Override public Person process (final Person person) { final String firstName = person.getFirstName().toUpperCase(); final String lastName = person.getLastName().toUpperCase(); final Person transformedPerson = new Person(firstName, lastName); log.info("Converting (" + person + ") into (" + transformedPerson + ")" ); return transformedPerson; } }
Define ItemWriter 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Component @RequiredArgsConstructor @Slf4j public class CsvItemWriter extends JdbcBatchItemWriter <Person > { private final DataSource dataSource; @PostConstruct public void initWriter () { log.info("Writer... initialize" ); this .setDataSource(dataSource); this .setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()); this .setSql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)" ); } }
Define JobListener 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component @Slf4j public class CsvFileListener implements JobExecutionListener { @Override public void beforeJob (JobExecution jobExecution) { ExecutionContext context = jobExecution.getExecutionContext(); context.put("fileNames" , new ArrayList<String>() {{ add("sample-data.csv" ); }}); log.info("before job ..." ); } @Override public void afterJob (JobExecution jobExecution) { log.info("after job ..." ); } }
Config JobLauncher & JobRepository 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Configuration @RequiredArgsConstructor public class BatchLauncherConfig { private final DataSource dataSource; private final PlatformTransactionManager transactionManager; @Bean public JobRepository customJobRepository () throws Exception { JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); factory.setDataSource(dataSource); factory.setTransactionManager(transactionManager); return factory.getObject(); } @Bean public JobLauncher customJobLauncher () throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(customJobRepository()); jobLauncher.afterPropertiesSet(); return jobLauncher; } }
Define Common Launcher for Controller use 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Component @RequiredArgsConstructor public class CommonLauncher implements ApplicationContextAware { private ApplicationContext applicationContext; private final JobLauncher customJobLauncher; @Override public void setApplicationContext (ApplicationContext applicationContext) throws BeansException { this .applicationContext = applicationContext; } @SneakyThrows public void run (String jobName) { Job job = (Job) applicationContext.getBean(jobName); JobParameters jobParameters = new JobParametersBuilder() .addString("traceId" , UUID.randomUUID().toString()) .toJobParameters(); customJobLauncher.run(job, jobParameters); } } @RestController @RequiredArgsConstructor public class JobController { private final CommonLauncher commonLauncher; @GetMapping("run") public void run () { commonLauncher.run("initializeCsvFileImportJob" ); } }
done.
Title: Spring-Batch
Author: mjd507
Date: 2022-03-27
Last Update: 2024-01-27
Blog Link: https://mjd507.github.io/2022/03/27/Spring-Batch/
Copyright Declaration: This station is mainly used to sort out incomprehensible knowledge. I have not fully mastered most of the content. Please refer carefully.