unlockha / beanio

Automatically exported from code.google.com/p/beanio
Apache License 2.0
1 stars 0 forks source link

Create a thread-safe wrapper for org.beanio.spring.BeanIOFlatFileItemReader #98

Open GoogleCodeExporter opened 8 years ago

GoogleCodeExporter commented 8 years ago
Problem can be easily seen with parallel tasklet in spring-batch job, e.g.
 <tasklet task-executor="taskExecutor" throttle-limit="5">
                <chunk reader="beanIOItemReader" writer="MyBatisItemWriter" commit-interval="100"/>
 </tasklet>

What is the expected output? What do you see instead?
I see an exception:
SEVERE: Encountered an error executing the step
org.beanio.UnidentifiedRecordException: Unidentified record at line 2126
    at org.beanio.internal.parser.UnmarshallingContext.recordUnidentifiedException(UnmarshallingContext.java:382)
    at org.beanio.internal.parser.BeanReaderImpl.nextRecord(BeanReaderImpl.java:186)
    at org.beanio.internal.parser.BeanReaderImpl.internalRead(BeanReaderImpl.java:96)
    at org.beanio.internal.parser.BeanReaderImpl.read(BeanReaderImpl.java:67)
    at org.beanio.spring.BeanIOFlatFileItemReader.doRead(BeanIOFlatFileItemReader.java:82)
    at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83)
    at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:131)
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:119)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
    at com.sun.proxy.$Proxy0.read(Unknown Source)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:155)
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:114)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:108)
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:395)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:267)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:77)
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

What version of BeanIO are you using? What JDK version?

BeanIO 2.1.0.M2, jdk1.7.0_45, Windows 7 x64

Please provide any additional information below.

As a workaround I use

public class SynchronizedBeanIOFlatFileItemReader<T> extends 
BeanIOFlatFileItemReader<T> {
    @Override
    protected synchronized T doRead() throws Exception {
        return super.doRead();
    }

    @Override
    protected synchronized void doOpen() throws Exception {
        super.doOpen();
    }

    @Override
    protected synchronized void doClose() throws Exception {
        super.doClose();
    }

    @Override
    protected synchronized void jumpToItem(int itemIndex) throws Exception {
        super.jumpToItem(itemIndex);
    }
}

It helps.

Original issue reported on code.google.com by tiv...@gmail.com on 9 Jan 2014 at 3:05

GoogleCodeExporter commented 8 years ago
Readers and writers created by BeanIO are not thread safe, and 
BeanIOFlatFileItemReader will not be changed because synchronization would add 
extra overhead most clients don't want/need.  However, I will look into adding 
your workaround wrapper class into the BeanIO source code for clients that do 
need that behavior.

Thanks,
Kevin

Original comment by kevin.s...@gmail.com on 11 Jan 2014 at 8:18

GoogleCodeExporter commented 8 years ago
Well, I am not talking about core readers and writers, but only about spring 
batch integration classes. Parallel processing is a standard feature and I 
think it should be supported.

Original comment by tiv...@gmail.com on 12 Jan 2014 at 11:39