scylladb / scylla-cdc-java

Apache License 2.0
24 stars 15 forks source link

StackOverflowError when retrying reading from the middle of window with large amount of data #36

Open avelanarius opened 3 years ago

avelanarius commented 3 years ago

When there is an exception while reading a window, we restart it with TaskState pointing to last correctly read change. After restarting, we read the entire window from beginning, but we ignore unnecessary rows (up to last correctly read change).

The problem with current implementation is that it does this ignoring recursively (see last findNext at line 148): https://github.com/scylladb/scylla-cdc-java/blob/a2c3c1823d1f5c210bd210e2d048de0f02ceeb93/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3WorkerCQL.java#L104-L151

I have observed one instance of this causing StackOverflowError. What's bad is that it won't be able to succeed upon restart (it will try to do it again after restart and fail in the same fashion).

The long term fix for this is to implement smarter resuming of reading (do not skip client-side, but craft such CQL queries that will read the correct portion of window).

/cc @haaawk Would you approve of a quick fix for this problem (wrapping findNext with while to do the skipping iteratively, not recursively) - doing "smarter resuming" will require more time and there are other priorities right now. Of course I will do "smarter resuming" later.

haaawk commented 3 years ago

@avelanarius Let's do the while for now but let's not close this issue and do smart resuming whenever the time allows.

methodmissing commented 3 years ago

Can confirm this behaviour, and also that it fails in the same way on restart.

. Will retry after backoff (0 ms).   [com.scylladb.cdc.model.worker.TaskAction]
java.lang.StackOverflowError
    at java.base/java.util.stream.Sink$ChainedReference.cancellationRequested(Sink.java:263)
    at java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:127)
    at java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:502)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:488)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:543)
    at com.scylladb.cdc.model.worker.ChangeSchema.getColumnDefinition(ChangeSchema.java:375)
    at com.scylladb.cdc.model.worker.RawChange.getCell(RawChange.java:78)
    at com.scylladb.cdc.model.worker.RawChange.getId(RawChange.java:44)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:148)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
    at com.scylladb.cdc.cql.driver3.Driver3WorkerCQL$Driver3Reader.findNext(Driver3WorkerCQL.java:151)
haaawk commented 3 years ago

On a second thought, I don't think while will work. Unless not a simple while. we can do while over getAvailableWithoutFetching elements but not more. Fetching more is blocking and that would cause the program to fail here if we block.

methodmissing commented 3 years ago

I'll take a deeper look today. We can reproduce it with a load test reliably in our staging environment. Appears to have auto recovered, but digging further into logs for correlation.

Screenshot 2021-09-24 at 10 36 10