《SpringBatch再入门-整理个人使用方法》

因为很久没有接触SpringBatch,所以我想整理一下自己对它的使用方法,所以写了一篇文章。

※下面的话仅代表我的个人观点,并且我觉得有朝一日我会写一个实践篇。

前提 tí)

    • SpringBatchを使おうとして、挫折しそうor挫折済みの人向け

 

    • この後出てくるSpringBatchの用語が分からない人は、まず本家のイントロダクションと用語を説明したページを読んでからの方が良い

 

    上記を雑に読んだあと、実際に使ってみて感じた結果をまとめているので、本家で説明していることと違うことがあるかもしれない。

为什么要使用SpringBatch?

因为想要进行异步处理。

比如说,通过网络接收请求并在后台执行必要的逻辑,有时候即使是突发奇想也会想去做。(感到后悔)

如果使用Spring Batch,可以定义这个Bean,

@Bean
public JobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        // 非同期処理を定義、並列数は3
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
    taskExecutor.setConcurrencyLimit(3);
    jobLauncher.setTaskExecutor(taskExecutor);
        return jobLauncher;
}

执行。

/** 非同期処理するようにしたJobLuncher */
@Autowired
JobLauncher jobLauncher;

/** 処理を定義したジョブ */
@Autowired
Job job;

public void execute(Long orderId) throws Exception{
    // 引数を定義する
    JobParameters jobParameters = new JobParametersBuilder().addLong(
         "order.id", orderId, true).toJobParameters();
    // 非同期処理 開始!
    jobLauncher.run(job, new JobParameters());
}

在使用SpringBatch时,我认为不会有让人困扰的地方。
我认为困难在于创建Job的方式。

因为不想自己创建异步处理机制。

如果只需要执行异步处理,可以实现Runnable接口或者Callable接口,并将其传递给Executor类们即可。

然而,如果多人一起开始创建相同非同步处理,会发生什么呢?
资源将会枯竭,GC频繁触发可能导致变慢,我们可能会面临OutOfMemoryError的困扰。
为了防止这种情况,就需要建立一个机制,但我不想这样做。

如果使用Spring Batch的话,
就不需要自己创建这样的机制,只需要共享上述的使用方式就可以了。
我认为可以向Google老师询问是一个非常大的优势。

而且,与自己编写相比,SpringBatch更加功能强大。
例如,通过使用JobParameter等信息,它可以进行多重执行抑制等操作。
在上述示例中,通过检查order.id的值,它将防止重新执行已完成的Job。
由于还使用了事务等,因此无需考虑自己处理这些问题。
(虽然这些行为可能会让人对SpringBatch产生厌恶的感觉)

为了避免夸张, 我不想这么做。

如果需要创建异步处理,我认为可以考虑使用RabbitMQ或Kafka等消息队列来实现。

然而,若需要进行这些设置,运维工作将变得非常繁重。如果队列进程死掉了呢?如果网络死掉了呢?监控呢?

我认为,非同步处理会带来很多好处,超越了考虑这些问题所需要的努力,但这种情况并不常见,只在处理事务数量非常庞大的情况下才会出现。

如果是使用Spring Batch的话,
可以使用所谓的Java(Spring)进程+关系数据库(RDB)的常规配置来创建。
如果不使用RDB,而且只有一个进程通过系统运行,也可以使用类似H2DB的嵌入式数据库。

Chunk?Tasklet?用哪个比较好?

首先,只需要创建一个Tasklet就可以完成Job。

从感觉来说,我只是假设会有偶尔会动的异步处理,且它是随机临时发生的。关于Chunk、Step或Flow的分支、重试/跳过等一切都可以忽略不计。

在每一条DB记录上执行一个作业的概念,简单地创建一个作业。

调用无参数的函数

只需执行xxxService#execute()的操作。非常简单。


 @Configuration
 @EnableBatchProcessing
 public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private OrderService orderService;

    @Bean
    public JobLauncher jobLauncher(JobRepository jobRepository) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return jobLauncher;
    }

    @Bean
    public Job job(Step step1) throws Exception {
        return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();
    }

    @Bean
    public Step step1(Tasklet tasklet1) {
        return stepBuilderFactory.get("step1").tasklet(tasklet1).build();
    }

    @Bean
    public Tasklet tasklet1() {
        // xxxService#execute()を実行する
        MethodInvokingTaskletAdapter tasklet = new MethodInvokingTaskletAdapter();
        tasklet.setTargetObject(orderService);
        tasklet.setTargetMethod("execute");
        return tasklet;
    }
 }

调用带有参数的函数。

如果有参数,就会变成这样。


    @StepScope
    @Bean
    public Tasklet tasklet1(@Value("#{jobParameters['order.id']}") Long orderId) {
        // xxxService#execute()を実行する
        MethodInvokingTaskletAdapter tasklet = new MethodInvokingTaskletAdapter();
        tasklet.setTargetObject(orderService);
        tasklet.setTargetMethod("execute");
        tasklet.setArguments(new Object[] { orderId });
        return tasklet;
    }
 }

如果有异常情况发生,则会异常终止,但作业具备再次执行的可能性。

比如,如果遇到了业务上不允许出现的数值,或者发生了数据库访问错误等等,如果需要将其视为失败,并在之后进行重新操作的情况,总之可以直接引发异常。

如果这样做,SpringBatch会记录并提供关于作业执行失败的信息,并允许重新执行。同时也会发生回滚,不会对数据库进行写入。

如果调用的函数没有引发异常,将正常结束。

异常终止了,但要确保不回滚。

工作虽然失败但也可以不回退。
在业务处理上发生失败时,可以用来更新数据库到失败的状态。

    @Bean
    public Step step1(Tasklet tasklet1) {
        return stepBuilderFactory.get("step1").tasklet(tasklet1)
                .exceptionHandler(exceptionHandler()).build();
    }

    private ExceptionHandler exceptionHandler() {
        return new ExceptionHandler() {

            @Override
            public void handleException(RepeatContext context, Throwable throwable) throws Throwable {
                // 例外を投げず、終了する
                context.setTerminateOnly();
            }
        };
    }

只需要一种选择:按照以下方式,我们只需要实现ExceptionHandler。同时,在这个方法中,我们可以使用RepeatContext的setTerminateOnly()方法来标记为失败。

意识到事务边界,将Tasklet(步骤)分成多个部分。

在SpringBatch中,无需特别设置,即可在Tasklet(Step)的开始和结束之间自动启用事务。
因此,非常幸运的是,即使在Tasklet执行过程中发生异常并导致异常终止,更新的记录也会被回滚。

然而相反地,一个Tasklet中进行的数据库更新在该Tasklet结束之前不会被提交。

因此,如果你想要在一个Tasklet中正确提交进行的数据库更新,你可以通过将Tasklet分割为想要提交的单元来实现。


    @Bean
    public Job job(Step beforeStep1, Step inProgressStep2, Step afterStep3) throws Exception {
        return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer())
            .start(beforeStep1)
            .next(inProgressStep2)
            .next(afterStep3)
            .build();
    }

如果工作等待执行的情况很多的话,或许可以考虑使用Chunk。

如果工作次数很多,我打算将它们一次性处理完。

例如,在使用Tasklet的情况下,我们以每个DB记录对应一个Job的方式进行构建。
而在使用Chunk的情况下,我们以每个DB记录对应一个Chunk的方式进行构建,
并且对于每个将多个DB记录合并成一个请求的情况,我们会运行一个Job。

对于我的情况来说,因为我没有使用它,所以只是一种猜测而已。
如果使用它会怎样,我想另外总结一下。

广告
将在 10 秒后关闭
bannerAds