twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.48k stars 704 forks source link

The problem with combination TupleEntrySchemeIterator and "bad" record reader #1887

Closed dieu closed 5 years ago

dieu commented 5 years ago

Hello,

We encounter the problem with the combination of TupleEntrySchemeIterator (from cascading) and "bad" record reader like "ElepehantBird".

With the new scalding version, we got nice optimization for toIterableExecution, where we don't need to dump data on hdfs if data can be read directly. So, we begin to read some data directly from submitter node by HadoopTupleEntrySchemeIterator which extends TupleEntrySchemeIterator.

TupleEntrySchemeIterator use inputIterator (which return recordReader for particular split, https://github.com/Cascading/cascading/blob/2.6.1/cascading-hadoop/src/main/shared/cascading/tap/hadoop/io/HadoopTupleEntrySchemeIterator.java#L52), and iterates through recordReaders to get data from each split, but the problem that TupleEntrySchemeIterator don't invoke createKeyand createValue on a next reacordReader and do it only once in constructor.

That works fine in most cases, but not with complex recordReaders which reads data in methods createKeyand createValue.

One possible solution is copy HadoopTupleEntrySchemeIterator, TupleEntrySchemeIterator, Hfs to scalding source and fix TupleEntrySchemeIterator (one line) and use them, since creating a new version of cascading will be hell. @johnynek what do you think?

johnynek commented 5 years ago

oh man...

Good catch.

I wonder what @cwensel thinks. I think we should send a PR with the fix. Maybe he would publish a version of cascading 2.x with the fix? If he does not have time, the code copy makes sense if we can avoid copying a ton of code.

I don't think he is happy that we are stuck on an old version, but at the same time, I sunk a lot of time getting tests to pass on cascading 3 and just ran out of time getting graphs that wouldn't throw exceptions.

dieu commented 5 years ago

oh man...

Good catch.

I wonder what @cwensel thinks. I think we should send a PR with the fix. Maybe he would publish a version of cascading 2.x with the fix? If he does not have time, the code copy makes sense if we can avoid copying a ton of code.

Yep, definitely will do PR for cascading as well.

I don't think he is happy that we are stuck on an old version, but at the same time, I sunk a lot of time getting tests to pass on cascading 3 and just ran out of time getting graphs that wouldn't throw exceptions.

I think we can come back to upgrade in next release, should be much simpler with changes from 0.18.x, at least it will be isolated (mostly) in backend code.

I will publish the solution, for now, to show how much we need to copy.

johnynek commented 5 years ago

@dieu can you easily run your integration tests at twitter with this? changing Hfs does make me a bit nervous...

dieu commented 5 years ago

@dieu can you easily run your integration tests at twitter with this? changing Hfs does make me a bit nervous...

yes, we will do, I'm also not a fan of it, but if I right remember java and inheritance it's should safe.

dieu commented 5 years ago

@dieu can you easily run your integration tests at twitter with this? changing Hfs does make me a bit nervous...

our internal test suite is passed.

dieu commented 5 years ago

@johnynek added assert into openForRead to check that we are using our ScaldingHfs instead of cascading Hfs.

cwensel commented 5 years ago

I commented on the PR.

But in short, Cascading 3.x has a sourceRePrepare for just this case I believe.

I urge you to upgrade, as re-invoking sourcePrepare will likely have many side-effects.

and yes, the new planner is a pain, but it is much faster and is user serviceable. And was only ever intended to plan jobs that it would encounter in the wild, not via a random generator (if I remember your complaints correctly, grin).

johnynek commented 5 years ago

Chris I think you vastly underestimate the variety of jobs found in the wild. The generator only generates validly types scalding jobs. There should be none of those that don’t run.

The fact that you can write a well typed job that throws at runtime is a problem and a regression.

cwensel commented 5 years ago

Then I misunderstood the nature of the Scalding tests.

We only have 700 MR related tests, what I’m hearing is there clearly needs to be investment in more.

johnynek commented 5 years ago

If there was a clear list of what isn’t allowed now it might be okay since our planner is very powerful now and can plan around it but last time I didn’t see that list. Things like hashjoins after merges or hashJoins after cogroups or whatever.

Anyway. We sunk a ton of time into this over the past several years. It was easier to build a spark backend without using cascading than get this working. From my perspective we would be better off improving the spark backend going forward.

johnynek commented 5 years ago

@dieu did you do any performance comparison on this?

I'm still nervous about this. We haven't seen this bug for some reason, and I guess it is because we don't really use elephantbird.... which makes me think the best fix is elephantbird. I'm nervous about everyone paying a performance hit even though it won't hit every source.

dieu commented 5 years ago

@dieu did you do any performance comparison on this?

Unfortunately, we don't have performance tests for this case.

I'm still nervous about this. We haven't seen this bug for some reason, and I guess it is because we don't really use elephantbird.... which makes me think the best fix is elephantbird.

Unfortunately, it's not only in elephantbird, but at least in parquet as well. https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java#L83 and https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java#L153

That makes me believe that it's a typical pattern since Hadoop API doesn't have any contract what you could do in createValue/createKey, and I think it necessary to call those methods on an instance of recordReader at least once.

I'm nervous about everyone paying a performance hit even though it won't hit every source.

Could you elaborate what is performance penalty we got here, please?

let's say we have two possible situations:

in both cases, we do it only when we switch between splits, so we do this calls the only number of splits, what's compare to reading real data is nothing to me.

johnynek commented 5 years ago

@dieu okay, I overlooked that the number of calls is O(number of splits). That's not a concern.

If this is green on your internal tests, I'm in favor of merging.

Thanks for working through this in detail.