SpringBatch中ListItemReader问题分析

SpringBatch中ListItemReader问题分析与总结

项目中使用了SpringBatch作为批处理框架。在使用的过程中曾经遇到一系列问题,本文章主要针对ListItemReader的问题进行记录、分析、总结、分享,期望能够抛砖引玉以及提升独立思考与解决问题能力。

此处先贴一下ListItemReader的源码:

/**
 * An {@link ItemReader} that pulls data from a list. Useful for testing.
 * 
 * @author Dave Syer
 * @author jojoldu
 *
 */
public class ListItemReader<T> implements ItemReader<T> {

	private List<T> list;

	public ListItemReader(List<T> list) {
		// If it is a proxy we assume it knows how to deal with its own state.
		// (It's probably transaction aware.)
		if (AopUtils.isAopProxy(list)) {
			this.list = list;
		}
		else {
			this.list = new LinkedList<>(list);
		}
	}

    @Nullable
	@Override
	public T read() {
		if (!list.isEmpty()) {
			return list.remove(0);
		}
		return null;
	}

}

1.问题现象

  1. 系统运行过程中频繁出现批处理超时预警:即batch_step_execution表中start_time、end_time相减,得到的处理时间超出预期,且与业务逻辑处理的实际执行时间相差较大。

  2. 系统导入/导出大数据量文件时出现慢、内存占用较高的现象。

2.问题分析

2.1 并发问题

2.1.1 问题原因

严格来说这并不算ListltemReader本身的问题,而是 Spring 在创建一个被@StepScope修饰的Bean的时候的处理机制的问题。

在创建Bean时,Spring 使用了synchronized来保证线程安全性,这意味着此时在整个应用中创建@StepScope的Bean是同步执行的。如果一旦创建的过程比较长,那么后续的创建Bean都将阻塞。

将这个情况放到项目中:如果有多个这样的Bean同时执行,则意味着其他的使用了@StepScope的job将无法启动(需等待同步代码块执行完后释放锁),这对系统来说是个致命问题。

以下是StepScope关键源码:

    @Override
    public Object get(String name, ObjectFactory<?> objectFactory) {
        StepContext context = getContext();
        Object scopedObject = context.getAttribute(name);

        if (scopedObject == null) {

            synchronized (mutex) {
                scopedObject = context.getAttribute(name);
                if (scopedObject == null) {

                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("Creating object in scope=%s, name=%s", this.getName(), name));
                    }


                    scopedObject = objectFactory.getObject();
                    context.setAttribute(name, scopedObject);

                }

            }

        }
        return scopedObject;
    }

以下是一个典型的问题业务代码示例:

    @StepScope
    public ListItemReader<TestDO> reader() {
        //典型的问题代码:执行数据库查询
        List<TestDO> queryFromDatabaseList = this.dao.selectList();
        ListItemReader<TestDO> reader = new ListItemReader<TestDO>(queryFromDatabaseList);
        return reader;
    }

2.1.2 解决方式

明白了问题的原因,那么解决办法也很简单:

2.1.2.1 使用其他Tasklet

上文提及的ItemReader是属于ChunkOrientedTasklet依赖的组件。 ChunkOrientedTasklet是SpringBatch中Tasklet的一个实现,是一个基于Chunk的批处理方式。与传统的ItemStream相比,它提供了更高效的处理方式, 可以更快、更高效地处理大量教据。 它的实现原理是将数据成多chunk,每个chunk由ItemReader 读取一组数据 ,然后交ItemProcessor进行处理,并最终给ItemWriter负责写入逻辑。每个chunk大小可以配置。

再回到我们的问题,我们的问题是使用了@StepScope的ItemReader在创建的过程中会阻塞其他job的执行。这是一个基于ChunkOrientedTasklet来实现的job。 那么解决方式之一,则是我们不使用ChunkOrientedTasklet,而使用其他的Tasklet来实现,例如TaskletStep。 但与此同时,程序也将失去了ChunkOrientedTasklet带来的优点,最典型的影响是

    1.无法通过查询 batch_step_execution相关表确认数据处理状态了

    2.无法利用ChunkOrientedTasklet的分批次处理大量数据,需要自行处理其中的分批次处理逻辑了

以下是一个使用Tasklet的示例:

	@Bean
	public Job job1() {
		return jobs.get("job1")
				.start(step1())
				.next(step2())
				.build();
	}

	@Bean
	public Step step1() {
		return steps.get("step1")
				.tasklet((contribution, chunkContext) -> {
					System.out.println("hello");
					// simulate processing time
					Thread.sleep(random.nextInt(3000));
					return RepeatStatus.FINISHED;
				})
				.build();
	}
2.1.2.1 使用其他ItemReader

使用TaskletStep的方式会丧失处理分批次大数据量数据的优点,如果希望继续使用ChunkOrientedTasklet,则可以考虑使用以下ItemReader来替代ListItemReader。 例如:数据库查询类:JdbcCurosorItemReader/文件处理类:FlatFileItemReader等。 更多的请查看官网[docs.spring.io/spring-batc…]

2.2.线程安全问题

这并不是ListItemReader的独有问题,而是使用了SpringBatch的多线程处理机制时将会遇到的问题。而ListItemReader的该问题较为突出。

以下是SpringBatch中使用多线程处理错误的示例代码。

@StepScope
public ListItemReader<TestDO> reader() {
	List<TestDO> list = new ArrayList();
	for(int i=0;i<10000;i++){
		list.add(new TestDO());
	}
	ListItemReader<TestDO> reader = new ListItemReader<TestDO>(list);
	return reader;
}

@Bean
public TaskExecutor taskExecutor(){
  return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
  return this.stepBuilderFactory.get("sampleStep")
    .<String, String>chunk(10)
    .reader(reader())
    .writer(writer())
    .taskExecutor(taskExecutor)
    .build();
}

2.2.1问题原因

ListItemReader本身是线程安全的,并且它是无状态的,但是当调用read方法时,由于其底层是ArrayList存储数据,那么如果结合SpringBatch 的多线程处理机制,将会导致线程安全性问题。

2.2.2 解决方式

2.2.2.1 不使用多线程进行处理

谨慎评估当前业务场景是否需要多线程进行处理,以下是可以参考的一些评估依据 1.要处理的数据量是否较大?当前业务场景是否确实需要? 2.系统资源(CPU、内存等)是否足够? 3.开发人员对多线程编程的掌握程度如何,是否能够hold住可能出现的问题?

2.2.2.2 使用同步关键字进行处理

以下是一个线程安全的ItemReader示例:

public class ThreadSafeIteratorItemReader<T> implements ItemReader<T> {
    private final Iterator<T> iterator;

    public ThreadSafeIteratorItemReader(Iterable<T> iterable) {
        Assert.notNull(iterable, "Iterable argument cannot be null!");
        this.iterator = iterable.iterator();
    }

    public ThreadSafeIteratorItemReader(Iterator<T> iterator) {
        Assert.notNull(iterator, "Iterator argument cannot be null!");
        this.iterator = iterator;
    }

    public T read() throws Exception {
        T t;
        synchronized (this) {
            t = this.iterator.hasNext() ? this.iterator.next() : null;
        }
        return t;
    }
}

2.2.2.3 使用线程安全的容器

如CopyOnWriteArrayList作为ListItemReader的底层实现。

2.3 性能问题

2.3.1 问题原因

  1. ListItemReader在read时使用的是remove(0)来实现的.我们知道ArrayList擅长随机访问元素,不擅长插入/删除元素,以及LinkedList擅长插入/删除元素,不擅长随机访问无素,所以在大部分情况下,read 方法调用时性能是较差的。(MyBatis的selectList查询结果默认使用ArrayList)

  2. ArrayList的remove(0)方法的工现是基于System.arraycopy来实现的,那么意味着这个过程需要大量的内存,一旦数据量较大,则容易出现大量gc甚至OOM的情况出现,从 而导致性能大幅度下降或者崩溃的情况。

  3. ListItemReader的构造器中使用了isAopProxy来判定List 的实现类型,这基本上决定了没有场景可以使用到性能略好的LinkedList.

3. 总结与思考

  1. 综合以上情况可知,ListItemReader的使用场景非常局限,必须是小量数据且处理迅速且处理速度能够稳定的场景。考虑到主要大部分业务开发类型的job还是数据查询为主,数据量的评估对开发人员总体要求较高,一般情况不使用ListItemReader。

  2. 架构师/技术负责人在引入一项“新技术”到项目中时,不仅需要从宏观层面做好调研,在项目开发时、线上运行时亦需要关注实际使用情况,警惕盲区。

  3. 开发人员在根据项目中demo进行使用时,亦需要多看源码进行反思、总结,保持对技术的好奇心与保持独立思考的能力。

全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务