๋ฐฐ์น ์์ ์ ๋๋์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ค๋ ํน์ง์ ๊ฐ์ง๋ค. Spring Batch์์๋ ์ด๋ฅผ "์ฒญํฌ"๋ผ๋ ๋จ์๋ก ํธ๋์ญ์ ๊ฒฝ๊ณ๋ฅผ ๋๋์ด ์ฒ๋ฆฌํ ์ ์๋ค. ๊ฐ์ฅ ์ผ๋ฐ์ ์ผ๋ก ์ฒญํฌ ๊ธฐ๋ฐ ์ฒ๋ฆฌ๋ฅผ ๊ตฌํํ๋ค.
ํ ๋ฒ์ ์ ์ฒด ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๋๊ฒ ์๋, ์ง์ ๋ ์ฒญํฌ ํฌ๊ธฐ๋งํผ์ ๋ฐ์ดํฐ๋ฅผ ์ปค๋ฐ/๋กค๋ฐฑํ ์ ์๋ค. ๋ํ, ๋ฐ์ดํฐ๋ฅผ ๋๋์ด ์ฒ๋ฆฌํ๋ฏ๋ก ๋ณ๋ ฌ ์ฒ๋ฆฌํ๋๋ก ํ์ฌ ์ฑ๋ฅ์ ๊ทน๋ํ ์ํฌ ์๋ ์๋ค.
Chunk๋ ํ ํธ๋์ญ์ ์์ ์ฒ๋ฆฌ๋ ๋ฐ์ดํฐ ๋ฉ์ด๋ฆฌ๋ฅผ ์๋ฏธํ๋ค. ๊ตฌ์ฒด์ ์ผ๋ก ์์๋ณด๊ธฐ ์ํด, ์๋ ์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ ๋ค์ด์ด๊ทธ๋จ์ ์ดํด๋ณด์.
์ ์ํ ์์
์ด ์คํ๋๋ฉด ๋ฐ์ดํฐ๋ฅผ Chunk ์๋งํผ ์ฝ์ด์จ๋ค. ์ด๋, ํ๋์ฉ ์ฝ์ด์ค๋ ๊ฒ์ ์ ์ ์๋ค. ์ฆ, ๋ฐ์ดํฐ๋ฅผ Chunk ์๋งํผ ๋ฐ๋ณตํด์ ํ๋์ฉ ์ฝ์ด์ค๋ ๊ฒ์ด๋ค.
๊ทธ๋ฆฌ๊ณ , ํ๋์ Chunk(์ฝ์ด๋ค์ธ ๋ฐ์ดํฐ ๋ฉ์ด๋ฆฌ)๋ฅผ ํ๋ฒ์ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ค.
์ด๋ฅผ ์๋์ฝ๋๋ก ๋ํ๋ด๋ฉด ์๋์ ๊ฐ๋ค.
List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
Object item = itemReader.read();
if (item != null) {
items.add(item);
}
}
itemWriter.write(items);
๋จ์ํ Tasklet::execute()๋ฅผ ๊ตฌํํ ๋ฐฉ์๊ณผ๋ ๋ค๋ฅด๊ฒ Chunk๋ผ๋ ๊ฐ๋
์ด ์ฌ์ฉ๋ ๊ฒ์ด๋ค.
Tasklet::execute()๋ฅผ ํตํด ๋จ์ผ ํ์คํฌ๋ฅผ ์ ์ํ๋ ๊ฒ์ RepeatStatus์ ์ฌ์ฉํด์ Tasklet์ ๋ฐ๋ณต์ ์ผ๋ก ์ํํด์๋ค.
๋ฐ๋ฉด์, ์ฒญํฌ ๊ธฐ๋ฐ ์์
์ Tasklet ์ธํฐํ์ด์ค๋ฅผ ๋จ์ํ ๊ตฌํํ๋ ๊ฒ์ด ์๋๋ผ ChunkOrientedTasklet๋ผ๋ ๊ฐ์ฒด๋ก ์ฒญํฌ ๊ธฐ๋ฐ ์์
์ ์ํํ๋ค.
ChunkOrientedTasklet๋ ์ด๋ป๊ฒ ๋ค๋ฅธ์ง ์๋ ๊ทธ๋ฆผ์ผ๋ก ์ดํด๋ณด์.
์ด์จ๋ ๋ ๋ค Tasklet::execute()๋ฅผ ๊ตฌํํ ๊ฒ์ด๋ฉฐ, ์ฐจ์ด์ ์ ๊ตฌํ๋ฐฉ์์ ์ฐจ์ด๋ค.
Tasklet::execute()๋ฅผ ์ง์ ์ปค์คํ
ํ๊ฒ ๊ตฌํํ๋ค๋ฉด ๋จ์ผ ํ์คํฌ๋ก ๊ตฌํ๋๋ค.
๋ฐ๋ฉด์, Chunk ์งํฅ ์ฒ๋ฆฌ๋ฅผ ์ํ ChunkOrientedTasklet์ Reader & Processor & Writer ๋ฌถ์์ผ๋ก ํ๋์ Tasklet์ ์ ์ํ๋ค.
์ปค์คํ
ํ Tasklet ๊ตฌํ์ Chunk ๋จ์์ ํธ๋์ญ์
๊ด๋ฆฌ๊ฐ ํ์ ์๊ฑฐ๋ ํ๋ ๊ฒฝ์ฐ์ ๋ณดํต ์ฌ์ฉ๋๋ค.
์ด์ ๋ถํฐ ChunkOrientedTasklet๊ฐ ์ด๋ป๊ฒ ํ์คํฌ ์ํ์ ํ๋์ง ์์๋ณด์.
Chunk ์งํฅ ์ฒ๋ฆฌ๋ฅผ ์ํด, Job์ ์ด๋ป๊ฒ ๊ตฌ์ฑํ๋์ง ์์๋ฅผ ํตํด ์์๋ณด์.
public class TransferNewUserJobConfiguration {
// 1. Job ์ ์
@Bean
public Job transferNewUserJob() {
return new JobBuilder("TRANSFER_NEW_USER_JOB", jobRepository)
.start(transferNewUserStep(null))
.build();
}
// 2. Step ์ ์ (Chunk ์งํฅ Step ๊ตฌ์ฑ: Reader/Processor/Writer)
@Bean
@JobScope
public Step transferNewUserStep(
@Value("#{jobParameters['targetDate']}") LocalDate targetDate
) {
return new StepBuilder("TRANSFER_NEW_USER_STEP", jobRepository)
.<User, User>chunk(CHUNK_SIZE, platformTransactionManager)
.reader(reader())
.processor(processor(null))
.writer(writer())
.build();
}
// 3. Reader ์ ์
@Bean
@StepScope
public JpaPagingItemReader<User> reader() {
return new JpaPagingItemReaderBuilder<User>()
.name("TRANSFER_NEW_USER_STEP_READER")
.entityManagerFactory(entityManagerFactory)
.queryString("""
SELECT u
FROM User u
""")
.pageSize(CHUNK_SIZE)
.build();
}
// 4. Processor ์ ์
@Bean
@StepScope
public ItemProcessor<User, User> processor(
@Value("#{jobParameters['targetDate']}") final LocalDate targetDate
) {
return new FunctionItemProcessor<>(user -> {
if (user.getRegisteredAt().toLocalDate().isEqual(targetDate)) {
return user;
}
return null;
});
}
// 5. Writer ์ ์
@Bean
@StepScope
public ItemWriter<User> writer() {
return chunk -> chunk.getItems().forEach(user ->
log.info("DB์ ์ ์ ์ ๋ณด ์ ์ฅ => id: {}, name: {}", user.getId(), user.getName())
);
}
}- Job์ ์ ์ํ๋ค. Job์ Step์ผ๋ก ๊ตฌ์ฑ๋๋ค๋ ๊ฒ์ ๋ณํจ์๋ค.
- Step์ ์ ์ํ๋ค. Step์ ์ ์ํ ๋, StepBuilder์ Chunk ์งํฅ ์ฒ๋ฆฌ๋ฅผ ์ํด
chunk()๋ฉ์๋๋ฅผ ํธ์ถํ๋ค. ๊ทธ๋ฆฌ๊ณ , Chunk ์งํฅ ์ฒ๋ฆฌ๋ต๊ฒ Reader, Processor, Writer๋ก Step์ ๋จ๊ณ๋ฅผ ๊ตฌ๋ถ์ง์ด ์ ์ํ๋ค.
StepBuilder::chunk() ๋ฉ์๋๋ ๋ค์ ์ ์์ ์ดํด๋ณด์. Reader๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ๋ ์ญํ ์ ํ๊ณ , ์ฌ๊ธฐ์๋ ItemReader์ ๊ตฌํ์ฒด์ธJpaPagingItemReader๋ฅผ ์ฌ์ฉํ๊ณ ์๋ค. ์ ๋ ฅ ํ์ ์User๋ก ์ ์ํ๋ค. ์ด ์ ๋ ฅ ํ์ ์ Processor๋ก ๋์ด๊ฐ๊ฒ ๋๋ค.Processor๋ ์ ๋ ฅ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์, ์ถ๋ ฅ ๋ฐ์ดํฐ๋ก ๋ณํํ๋ค. ์ฌ๊ธฐ์๋ User ํ์ ์ ๋ฐ์, User๋ฅผ ๊ทธ๋๋ก ๋ฐํํ๋ ์๋ฏธ๋กItemProcessor<User, User>๋ผ๊ณ ๋ฐํํ์ ์ ๋ช ์ํ๋ค.Writer๋ Processor๋ก๋ถํฐ ๊ฐ๊ณต๋ Userํ์ ์ ๋ฐ์ดํฐ๋ค์ chunk ๋จ์๋ก ๋ฐ์์, ํ๋ฒ์ DB์ ์ฐ๊ธฐ ์์ ์ ์ํํ๋ค.
ItemReader, ItemProcessor, ItemWriter์ ๋์ ๋ฐฉ์์ ๋ํด์๋ ๋ณ๋๋ก ๊ธ๋ก ์์๋ณด๊ณ ,
์ฌ๊ธฐ์๋ StepBuilder์ chunk()๋ฉ์๋์ ๋์ ๋ฐฉ์์ ๋ํด์๋ง ์์๋ณด์.
8.2์์ ๋ณด์๋ฏ์ด ์ฒญํฌ ์งํฅ ๋ฐฉ์์ Step ์์ฑ์ ์ํด์ StepBuilder์ chunk() ๋ฉ์๋๋ฅผ ํธ์ถํ๋ค.
์ด ๋ฉ์๋ ํธ์ถ์ ํตํด ChunkOrientedTasklet์ ์์ฑํ ์ ์๋๋ฐ, ํ๋ฒ ๋ฉ์๋๋ฅผ ์ดํด๋ณด์.
public class StepBuilder extends StepBuilderHelper<StepBuilder> {
// SimpleStepBuilder๋ฅผ ๋ฐํ
public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize, PlatformTransactionManager transactionManager) {
return ((SimpleStepBuilder)(new SimpleStepBuilder(this)).transactionManager(transactionManager)).chunk(chunkSize);
}
}chunk() ๋ฉ์๋๋ SimpleStepBuilder๋ฅผ ๋ฐํํ๋ค. ์ด๋ TaskletStep์ ์์ฑํด์ฃผ๋ bulid()๋ฉ์๋๋ฅผ ๊ฐ์ง๊ณ ์๋ค.
์์, Tasklet::execute()๋ฅผ ๊ตฌํํ๋ ๋จ์ผ ํ์คํฌ ๋ฐฉ์์์๋ Tasklet์ ์์ฑํ๊ธด ํ๋ค.
ํ์ง๋ง ์ด๋๋ StepBuilder::tasklet() ํธ์ถ์ ํตํด TaskletStepBuilder๋ฅผ ๋ฐํ๋ฐ์ ๋ฐ๋ฉด, StepBuilder::chunk()์ ํตํด SimpleStepBuilder์ ๋ฐํ๋ฐ๋๋ค.
SimpleStepBuilder์ ๋ด๋ถ ๊ตฌ์กฐ๋ฅผ ๋ณด๋ฉด์ ์ด๋ป๊ฒ ChunkOrientedTasklet์ด ์์ฑ๋๋์ง ํ์ธํด๋ณด์.
public class SimpleStepBuilder<I, O> extends AbstractTaskletStepBuilder<SimpleStepBuilder<I, O>> {
private ItemReader<? extends I> reader;
private ItemWriter<? super O> writer;
private ItemProcessor<? super I, ? extends O> processor;
public TaskletStep build() {
this.registerStepListenerAsItemListener();
this.registerAsStreamsAndListeners(this.reader, this.processor, this.writer);
return super.build();
}
protected Tasklet createTasklet() {
Assert.state(this.reader != null, "ItemReader must be provided");
Assert.state(this.writer != null, "ItemWriter must be provided");
RepeatOperations repeatOperations = this.createChunkOperations();
SimpleChunkProvider<I> chunkProvider = new SimpleChunkProvider(this.getReader(), repeatOperations);
SimpleChunkProcessor<I, O> chunkProcessor = new SimpleChunkProcessor(this.getProcessor(), this.getWriter());
chunkProvider.setListeners(new ArrayList(this.itemListeners));
chunkProvider.setMeterRegistry(this.meterRegistry);
chunkProcessor.setListeners(new ArrayList(this.itemListeners));
chunkProcessor.setMeterRegistry(this.meterRegistry);
ChunkOrientedTasklet<I> tasklet = new ChunkOrientedTasklet(chunkProvider, chunkProcessor);
tasklet.setBuffering(!this.readerTransactionalQueue);
return tasklet;
}
public SimpleStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
this.reader = reader;
return this;
}
public SimpleStepBuilder<I, O> writer(ItemWriter<? super O> writer) {
this.writer = writer;
return this;
}
public SimpleStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> processor) {
this.processor = processor;
return this;
}
}ํ์ํ ๋ฉ์๋๋ค๋ง ๊ฐ์ ธ์๋ค.
8.2์ ์์ Step์ ๊ตฌ์ฑํ ๋, reader(), processor(), writer() ์์๋ก ๊ตฌ์ฑํ์๋ค. ๊ทธ๋ ํธ์ถ๋๋ ๋ฉ์๋๊ฐ ์ ๋ฉ์๋๋ค์ด๋ค.
๊ทธ๋ฆฌ๊ณ build()๋ฉ์๋๋ฅผ ์ดํด๋ณด๋ฉด, ๋ถ๋ชจ์ build()๋ฅผ ํธ์ถํ๋๋ฐ, ์ฝ๋๋ก ํ๋ฒ ์ดํด๋ณด์.
public abstract class AbstractTaskletStepBuilder<B extends AbstractTaskletStepBuilder<B>> extends StepBuilderHelper<B> {
// ...
public TaskletStep build() {
TaskletStep step = new TaskletStep(this.getName());
// ...
step.setTasklet(this.createTasklet()); // TaskletStep์ด ์คํํ Tasklet ๋ฑ๋ก
return step;
}
}SimpleStepBuilder์ ๋ถ๋ชจ์ธ AbstractTaskletStepBuilder์ build()๋ฉ์๋๋ค.
๋ด๋ถ์ ์ผ๋ก TaskletStep์ ์์ฑํ๊ณ Tasklet์ ์ ์ํ ๋, createTasklet() ๋ฉ์๋๋ฅผ ํธ์ถํ๋ค.
์ด createTasklet() ๋ฉ์๋๋ SimpleStepBuilder์ ๊ตฌํ๋์ด ์๋ ๋ฉ์๋๋ก, ์ ์ฝ๋๋ฅผ ์ดํด๋ณด๋ฉด ChunkOrientedTasklet๋ฅผ ์์ฑํ๋ ๊ฒ์ ์ ์ ์๋ค.
๋ด๋ถ์ ์ผ๋ก ์ด๋ป๊ฒ ChunkOrientedTasklet์ ์์ฑํ๋์ง ์์๋ดค์ง๋ง,
๊ฒฐ๊ตญ์๋ SimpleStepBuilder์ด TaskletStep์ ์์ฑํ๊ณ , ๋ด๋ถ Tasklet์ ์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ ๋ฐฉ์์ธ ChunkOrientedTasklet์ผ๋ก ์์ฑํ๋ ๊ฒ์ด๋ค.
ChunkOrientedTasklet๋ ๊ฒฐ๊ตญ ์ฒญํฌ ์งํฅ ์ฒ๋ฆฌ์ ๋ํ ๋ก์ง์ด ์ ์๋์ด ์๋ Tasklet์ธ ์
์ด๋ค.
๋๋ฌธ์, Tasklet์ ๊ตฌํํ๊ณ ์๊ณ , ๊ฒฐ๊ตญ์๋ Tasklet::execute()๋ฉ์๋๋ฅผ ๊ตฌํํ๊ณ ์์ ๊ฒ์ด๋ค.(Tasklet ๊ธ ์ฐธ๊ณ )
๊ตฌํ ์ฝ๋๋ฅผ ์ง์ ๋ณด๋ฉฐ ์์๋ณด์.
public class ChunkOrientedTasklet<I> implements Tasklet {
private static final String INPUTS_KEY = "INPUTS";
private final ChunkProcessor<I> chunkProcessor;
private final ChunkProvider<I> chunkProvider;
private boolean buffering = true;
public ChunkOrientedTasklet(ChunkProvider<I> chunkProvider, ChunkProcessor<I> chunkProcessor) {
this.chunkProvider = chunkProvider;
this.chunkProcessor = chunkProcessor;
}
public void setBuffering(boolean buffering) {
this.buffering = buffering;
}
@Nullable
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
Chunk<I> inputs = (Chunk)chunkContext.getAttribute("INPUTS");
if (inputs == null) {
inputs = this.chunkProvider.provide(contribution); // 1. Reader๋ฅผ ํตํด ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋๋ฆฐ๋ค.
if (this.buffering) {
chunkContext.setAttribute("INPUTS", inputs);
}
}
this.chunkProcessor.process(contribution, inputs); // 2. Processor์ Writer ๋ก์ง์ ์ฒ๋ฆฌํ๋ค.
this.chunkProvider.postProcess(contribution, inputs);
if (inputs.isBusy()) {
return RepeatStatus.CONTINUABLE;
} else {
chunkContext.removeAttribute("INPUTS");
chunkContext.setComplete();
return RepeatStatus.continueIf(!inputs.isEnd());
}
}
}์ ํด๋์ค๋ ChunkOrientedTasklet ํด๋์ค(Spring Batch 5.1 ๊ธฐ์ค)์์ ๋ก๊น
์ ์ ์ธํ ์ ์ฒด ๋ก์ง์ ์ ๋ถ ๊ฐ์ ธ์จ ๊ฒ์ด๋ค.
๊น๋ํ๊ฒ execute() ๋ฉ์๋๋ง ์ ์๋์ด ์๋ค.
์ฃผ์์ด ๋ฌ๋ฆฐ chunkProvider.provide()์ chunkProcessor.process()๋ง ์ธ๋ถ์ ์ผ๋ก ์ดํดํ๋ฉด ์ถฉ๋ถํ ๊ฒ ๊ฐ๋ค. ๊ฐ๊ฐ Reader, Processor&Writer ๋ก์ง์ ๋ด๋นํ๋ค.
๊ทธ๋ฆฌ๊ณ ๊ฐ๊ฐ SimpleChunkProvider ํด๋์ค์ SimpleChunkProviderํด๋์ค๋ง ๊ตฌํํ๊ณ ์๋ค.
public class SimpleChunkProvider<I> implements ChunkProvider<I> {
protected final ItemReader<? extends I> itemReader;
private final RepeatOperations repeatOperations;
public Chunk<I> provide(final StepContribution contribution) throws Exception {
Chunk<I> inputs = new Chunk(new Object[0]);
this.repeatOperations.iterate((context) -> { // ๋ฐ๋ณตํด์ ์์ดํ
์ ์ฝ์
Object item;
try {
item = this.read(contribution, inputs); // Reader::read()๋ฅผ ํตํด ๋ฐ์ดํฐ ์ฝ์ด๋ค์
} catch (SkipOverflowException var12) {
status = "FAILURE";
var8 = RepeatStatus.FINISHED;
} finally {
this.stopTimer(sample, contribution.getStepExecution(), status);
}
if (item == null) {
inputs.setEnd();
return RepeatStatus.FINISHED;
}
inputs.add(item); // ๋ฆฌ์คํธ์ ๋ฐ์ดํฐ ์ถ๊ฐ
contribution.incrementReadCount();
return RepeatStatus.CONTINUABLE;
});
return inputs;
}
}
protected I read(StepContribution contribution, Chunk<I> chunk) throws SkipOverflowException, Exception {
return this.doRead();
}
protected final I doRead() throws Exception {
// ...
this.listener.beforeRead();
I item = this.itemReader.read();
if (item != null) {
this.listener.afterRead(item);
}
return item;
}provide() ๋ฉ์๋๋ repeatOperations๋ฅผ ํตํด ๋ฐ๋ณตํด์ ์์ดํ
์ ์ฝ์ด๋ค์ธ๋ค. RepeatOperations์ ๊ตฌํ์ฒด๋ RepeatTemplate์ผ๋ก ์ด๊ธฐํ ๋์ด ์๋ค.
Tasklet์ ์ค๋ช
ํ๋ ๊ธ์์ RepeatTemplate์ completionPolicyํ๋์ ๋ํด ์ค๋ช
ํ ์ ์ด ์๋ค.
SimpleChunkProvider์์ ์ด๊ธฐํ ๋์ด ์๋ RepeatTemplate์ CompletionPolicy๋ SimpleCompletionPolicy ํด๋์ค์ ๊ฐ์ฒด๊ฐ ์ด๊ธฐํ ๋์ด ์๋ค.
๊ทธ๋ฆฌ๊ณ , SimpleCompletionPolicy๊ฐ ๊ตฌํํ๊ณ ์๋ isComplete() ๋ฉ์๋๋ ์๋์ ๊ฐ๋ค.
read() ๋ฉ์๋์ ๋ํด์๋ ItemReader ์ฑํฐ์์ ์ค๋ช
ํ๊ณ ์์ผ๋ ์ฐธ๊ณ ํ์.
public class SimpleCompletionPolicy extends DefaultResultCompletionPolicy {
public boolean isComplete() {
return this.getStartedCount() >= SimpleCompletionPolicy.this.chunkSize;
}
}์ฆ, SimpleChunkProvider ๋ด๋ถ์ ์ด๊ธฐํ ๋์ด ์๋ RepeatTemplate์ CompletePolicy๋ SimpleCompletionPolicy์ด๋ฉฐ,
ํด๋น ํด๋์ค๋ Step ์์ฑ ์ ์ค์ ํ chunk size์ ๊ฐ๊ณผ ๊ฐ๊ฑฐ๋ ํฌ๋ค๋ฉด ์๋ฃ ์ฒ๋ฆฌ๋ฅผ ํ๋ค. ์ดํ์๋ ๋ ์ด์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋ค์ด์ง ์๋๋ค.
(RepeatTemplate์ด๋ CompletionPolicy ๋ฑ๊ณผ ๊ด๋ จ๋ ๊ฐ๋
์ batch05์์ ์ ๋ฆฌํ๊ณ ์๋ค.)
์ด์จ๋ , ์ด๋ ๊ฒ ์ฝ์ด๋ค์ธ ๋ฐ์ดํฐ๋ฅผ chunkProcessor.process()๋ก ๋๊ฒจ ๋๋จธ์ง ๋ก์ง์ ์ํํ๋ค.
ChunkOrientedTasklet::execute() ๋ด๋ถ์์ chunkProvider.provide(contribution)์ ํตํด inputs ํ๋์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด๋ค์๋ค.
๊ทธ๋ฆฌ๊ณ , ์ฝ์ด๋๋ฆฐ ๋ฐ์ดํฐ๋ฅผ SimpleChunkProcessor ๊ตฌํ์ฒด์ process() ๋ฉ์๋๋ฅผ ํตํด ๋๊ธด๋ค.
public class SimpleChunkProcessor<I, O> implements ChunkProcessor<I>, InitializingBean {
private ItemProcessor<? super I, ? extends O> itemProcessor;
private ItemWriter<? super O> itemWriter;
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {
this.initializeUserData(inputs);
if (!this.isComplete(inputs)) {
Chunk<O> outputs = this.transform(contribution, inputs); // 1. Processor๋ฅผ ํตํด ๋ฐ์ดํฐ ๊ฐ๊ณต
contribution.incrementFilterCount((long)this.getFilterCount(inputs, outputs));
this.write(contribution, inputs, this.getAdjustedOutputs(inputs, outputs)); // 2. Writer๋ฅผ ํตํด ๋ฐ์ดํฐ ์ฐ๊ธฐ ์์
์ํ
}
}
protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
Chunk<O> outputs = new Chunk<>();
for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
final I item = iterator.next();
O output;
Timer.Sample sample = BatchMetrics.createTimerSample(this.meterRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
output = doProcess(item); // ํ๋์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๊ณตํ๊ธฐ ์ํ ํธ์ถ
}
catch (Exception e) {
inputs.clear();
status = BatchMetrics.STATUS_FAILURE;
throw e;
}
finally {
stopTimer(sample, contribution.getStepExecution(), "item.process", status, "Item processing");
}
if (output != null) {
outputs.add(output);
}
else {
iterator.remove();
}
}
if (inputs.isEnd()) {
outputs.setEnd();
}
return outputs;
}
protected final O doProcess(I item) throws Exception {
Object result;
if (this.itemProcessor == null) {
result = item;
return result;
} else {
try {
this.listener.beforeProcess(item);
result = this.itemProcessor.process(item); // 1. Step ์ ์ํ ๋, ์์ฑํ process() ๋ก์ง ํธ์ถ
this.listener.afterProcess(item, result);
return result;
} catch (Exception var3) {
Exception e = var3;
this.listener.onProcessError(item, e);
throw e;
}
}
}
protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
Timer.Sample sample = BatchMetrics.createTimerSample(this.meterRegistry);
String status = BatchMetrics.STATUS_SUCCESS;
try {
doWrite(outputs);
}
catch (Exception e) {
inputs.clear();
status = BatchMetrics.STATUS_FAILURE;
throw e;
}
finally {
stopTimer(sample, contribution.getStepExecution(), "chunk.write", status, "Chunk writing");
}
contribution.incrementWriteCount(outputs.size());
}
protected final void doWrite(Chunk<O> items) throws Exception {
// ...
try {
listener.beforeWrite(items);
writeItems(items);
doAfterWrite(items);
}
catch (Exception e) {
doOnWriteError(e, items);
throw e;
}
}
protected void writeItems(Chunk<O> items) throws Exception {
if (itemWriter != null) {
itemWriter.write(items); // 2. Step ์ ์ํ ๋, ์์ฑํ write() ๋ก์ง ํธ์ถ
}
}
}process()๋ฉ์๋ ํธ์ถ์ ํตํด inputs ํ๋ผ๋ฏธํฐ๋ฅผ ๋ฐ์, ๋ด๋ถ์ ์ผ๋ก Processor ๋ก์ง๊ณผ Writer ๋ก์ง์ ์ํํ๋ค. ๊ฐ๊ฐ transform(), write() ๋ฉ์๋๊ฐ ์ด์ ํด๋นํ๋ค.
-
transform()chunk size ๋งํผ ํน์ ํ์ ์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๊ณตํด์ผ ํ๋ค.doProcess()์ ๋จ์ผ ๋ฐ์ดํฐ๋ฅผ ๋๊ฒจ, Step์ ์ ์ํ ๋ ์์ฑํ๋ ItemProcessor ๋ก์ง์ ์ํํ๋ค. -
write()transform()์ ํตํด ๊ฐ๊ณต๋ ๋ฐ์ดํฐ๋ฅผ DB์ ์ฐ๊ธฐ ์ํด, ํธ์ถํ๋ค. ์ด๋ ๋ฐ๋ณต๋ฌธ์ ํตํด ์ํ๋๋๊ฒ ์๋ ๊ฐ๊ณต๋ ๋ฐ์ดํฐ ๋ฆฌ์คํธ(Chunk<O> items)๋ฅผ ํ๋ฒ์ ์ด๋ค.
์ด๋ฒ ๊ธ์์๋ ChunkOrientedTasklet์ ์์ฑ๊ณผ ์คํ ๋ฐฉ์์ ๋ํด ์์๋ณด์๋ค.
์์๋ณด๊ธฐ์ ์์ Tasklet::execute()๋ฅผ ์ปค์คํ
ํ๊ฒ ์ ์ํ ๋ฐฉ์๊ณผ์ ์ฐจ์ด๋ ์์๋ณด์๋ค.


