Closed javaHelper closed 4 years ago
Assume CompositeItemWriter has four writers configured which writes data into four different tables and delegates request sequentially to writers within the same transaction scope. If 1st writer failed to write the data, that writer will generates the exceptions and 2nd , 3rd and 4th writer will never get called due to Atomic nature provided Spring Batch which is default behavior.
Approach – We need to create a separate “Scope of Transaction” for each writers configured within CompositeItemWriter so that even if 1st writer failed, calls should go to 2nd, 3rd and 4th writers and other writers should perform its task successfully. If we can manage the propagation levels separately (create a new Transaction scope) for each writer by overriding the default behavior of the CompositeItemWriter then this may help to solve this issue.
If we do above, say 1st writer works successfully and 2nd writer failed, then due to transaction roll back Spring Batch will reduced the commit-interval to 1 in order to identify the actual junk record, then reset the delegates chain and it will again start from beginning and call the all delegates here since 1st writer was successful we should not allow to call and call should start from 2nd index (here delegate index and commit-interval=1 helps to identify fresh chunk Vs failed chunk) and 3rd and 4th writer will be called. This way we can save the data as well save garbage data with the help of SkipListeners.
@Data
@EqualsAndHashCode(callSuper=false)
@Slf4j
public class MyCompositeItemWriter<T> extends CompositeItemWriter<T> {
private List<ItemWriter<? super T>> delegates;
private boolean ignoreItemStream = false;
private StepExecution stepExecution;
@BeforeStep
public void getInterstepData(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@AfterChunk
public void afterChunk(ChunkContext chunkContext) {
Integer commitInterval = (Integer) stepExecution.getExecutionContext().get("commit-internal");
if (commitInterval == null || commitInterval.intValue() != 1) {
stepExecution.getExecutionContext().put("delegateIndx", "0");
}
}
@Override
public void setIgnoreItemStream(boolean ignoreItemStream) {
this.ignoreItemStream = ignoreItemStream;
}
@Override
public void write(List<? extends T> item) throws Exception {
// Check the Delegate Index to identify which delegate has been broken
String delegateIndx = (String) stepExecution.getExecutionContext().get("delegateIndx");
int currDelegateIndx = delegateIndx == null ? 0 : Integer.parseInt(delegateIndx);
for (int i = currDelegateIndx; i < delegates.size(); i++) {
delegates.get(i).write(item);
}
}
@Override
public void setDelegates(List<ItemWriter<? super T>> delegates) {
super.setDelegates(delegates);
this.delegates = delegates;
}
}
and Sample Writer
public class EmployeeItemWriter implements ItemWriter<Employee> {
@Autowired
private NamedParameterJdbcTemplate namedJdbcTemplate;
@Value("#{stepExecution.jobExecution.jobId}")
private Long jobId;
@Value("#{stepExecution}")
private StepExecution stepExecution;
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void write(List<? extends Employee> items) throws Exception {
List<Map<String, Object>> batchValues = new ArrayList<>(items.size());
for (Employee emp : items) {
batchValues.add(new MapSqlParameterSource()
.addValue("", emp.get....)
.addValue("", emp.get....)
.addValue("", emp.get....)
.addValue("", emp.get....)
.addValue("", emp.get....)
.addValue("", emp.get....)
.addValue("", emp.get....)
.getValues());
}
try {
Integer commitInterval = (Integer) stepExecution.getExecutionContext().get("commit-internal");
if (commitInterval == null || commitInterval.intValue() != 1) {
stepExecution.getExecutionContext().put("delegateIndx", index);
}
int[] updateCounts = namedJdbcTemplate.batchUpdate(SQL, batchValues.toArray(new Map[items.size()]));
} catch (Exception e) {
log.error("## EmployeeWriter | Error occurred in BatchUpdate for JobId ={} ##", jobId);
stepExecution.getExecutionContext().put("commit-interval", "1");
throw new CustomException(e.getMessage(), jobId, this.getClass().getSimpleName());
}
}
}
This solution seem working except one condition - If any record break at 1st writer that record will not be propagated to 2nd, 3rd and 4th writer, also If any record break at 2nd writer that record will not be propagated to 3rd and 4th writer and so on.
Kindly guide if this look good. Please do the needful. Anxiously awaiting for response. Many Thanks in advance.
@mminella - Even if I dont use @Transactional(propagation = Propagation.REQUIRES_NEW)
still I get the same result with the above code. Could you please guide if the above solution looks good ?
The custom composite writer should be simple. Something along the lines of (this code is not tested):
public class MyCompositeItemWriter<T> implements ItemWriter<T> {
private List<ItemWriter<? super T>> delegates;
@Override
public void write(List<? extends T> items) throws Exception {
for(ItemWriter delegate : delegates) {
try {
delegate.write(items);
}
catch (Exception e) {
// Do logging/error handling here
}
}
}
@Override
public void setDelegates(List<ItemWriter<? super T>> delegates) {
super.setDelegates(delegates);
this.delegates = delegates;
}
}
The above will guarantee that all writers are called regardless of the exception thrown in one of them. The transaction will commit normally.
@mminella - Thank Michael. I will try solution you suggested ... One thing I've noticed that say I have STOCK_ID = 1, 10, 194 and 1392 items
Using @Transactional(propagation = Propagation.REQUIRES_NEW)
Say STOCK_ID = 1 item got failed to write data from 1st writer, then I am handling the exception and re-throwing it, now CompositeItemWriter delegate will retried and commit-interval will be 1. But this time first writer will get the STOCK_ID = 10 item assume it got saved and same item will be passed to 2nd writer and assume it got failed, caching the exception and re-throwing it back, now 2nd writer will get the STOCK_ID = 194 and assume it got saved successfully in 3rd and 4th writer and after the chunk, skip listener will be called I got the 2 items which were failed from 1st and 2nd writer and I am again calling the delegates from skipListeners
and passing the STOCK_ID = 1 to 2nd, 3rd and 4th writer and same for STOCK_ID =10 passing it to 3rd and 4th writer and now if any exception arises then we're saving that item into the separate error list and saving into error table. Here we need to identify if we're reprocessing delegates from SkipListeners then STOCK_ID = 1 may get failed in 2nd, 3rd and 4th writer and if failed then error list will give me details STOCK_ID = 1 got failed from all writers and it was supposed to go into which tables etc.
Without Using @Transactional(propagation = Propagation.REQUIRES_NEW)
Assume 1st and 2nd writer saved the data successfully, but 3rd writer failed then 1st and 2nd tables data will not be committed and there I am loosing everything, since Tx is getting committed at the end of chunk, hence I must need to use @Transactional(propagation = Propagation.REQUIRES_NEW)
to achieve above scenario explained.
Could you please guide me further ? Many thanks in advance for guiding me. Please revert .
@mminella - In our case we had two different writer who were inserting the data into the same table since data we'were deriving was different. Now we've combine these 2 writers and it resulted into smooth transaction works OOTB provided by Spring Batch
Say if STOCK_ID = 1 failed for one writer then that item is failed for all other writer since we're using the SkipListener to skip the records, this was we're getting the consistent behavior. Here we dont need to use Custom CompositeItemWriter , its working well with the inbuilt API.
Thanks for your great help and please revert !
Dear Michael,
Could you please guide us here : https://stackoverflow.com/questions/63296065/how-to-override-spring-batch-compositeitemwriter-manage-transaction-for-delegate ?
We're looking to achieve this scenario somehow as its business need.