stratosphere / stratosphere

Stratosphere is now Apache Flink.
https://github.com/apache/incubator-flink
Apache License 2.0
197 stars 84 forks source link

Provide a python map / python reducer in java api. #615

Open rmetzger opened 10 years ago

rmetzger commented 10 years ago

Current language binding code https://github.com/stratosphere/stratosphere/issues/528

StephanEwen commented 10 years ago

At some point we have to be careful not to offer too many methods on DataSet. Otherwise, people get lost.

How about we offer a specific java mapper (as part of the python project) that implements the bridge between java and python. We could then use it the following way:

DataSet<String> set = ...;

set.map(new PythonMapper("./path/to/the/python/script/")).writeTo(...);
zentol commented 10 years ago

I have started working on this.

zentol commented 10 years ago

..and ran into an issue.

This is the current state: there is one generic PythonMapper class

public class PythonMapper<IN extends Tuple,OUT extends Tuple> extends MapFunction<IN, OUT> {
...
}

if you write a plan, you have to create a new class: (which really contains nothing else)

public static class PythonMapperCustom extends PythonMapper<Tuple1<String>,Tuple1<String>> {
    public PythonMapperCustom(String scriptPath) {
        super(scriptPath);
    }
}

(using PythonMapper directly results in type errors, this is what i tried first)

then you can (theoretically) do this:

data
   .map(new PythonMapperCustom(path))
   .print();

but now i get this error:

Exception in thread "main" java.lang.IllegalArgumentException: Generic function base class must be immediate super class.
    at eu.stratosphere.api.java.typeutils.TypeExtractor.getSuperParameterizedType(TypeExtractor.java:165)
    at eu.stratosphere.api.java.typeutils.TypeExtractor.getTemplateTypes(TypeExtractor.java:135)
    at eu.stratosphere.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:31)
    at eu.stratosphere.api.java.operators.MapOperator.<init>(MapOperator.java:34)
    at eu.stratosphere.api.java.DataSet.map(DataSet.java:75)
    at eu.stratosphere.languagebinding.example.NewClass.main(NewClass.java:44)

so, i can't use the generic PythonMapper directly due to type issues, nor can i create a less generic subclass from it.

any help would be appreciated.

twalthr commented 10 years ago

The current TypeExtractor implementation does not support this complex generic stuctures. I'm currently working on this.

Try my current working branch https://github.com/twalthr/stratosphere/tree/TypeExtractorTests it should fix this issue.

zentol commented 10 years ago

@twalthr thanks! it actually compiles now. but there were 2 issues with your branch i had to fix, ExecutionEnvironment and AvroInputFormat call methods that were formerly in TypeInformation ()and now moved to TypeExtractor) (getForClass() / getForObject())

twalthr commented 10 years ago

ok, thanks for the info. i forgot to refactor my changes in all classes ^^

fhueske commented 10 years ago

Just on a side note. Is it possible to make PythonMapper more generic, such that it works on all kinds of input types instead of only Tuples?

zentol commented 10 years ago

@fhueske generally, yes. my plan was to make it work with tuples first, because after than i can encapsulate anything that is not a tuple in a tuple1 and dont have to make substantial changes to any of the underlying classes.

fhueske commented 10 years ago

Ok, sounds like a good plan! From my point of view, it would be important that the Tuple1 encapsulation happens under the hood such that the user is not aware of it, i.e., the user instantiates a PythonMapper (and writes the Python function) for the actual data to process and not for a Tuple1 holding the data.

zentol commented 10 years ago

@fhueske thats exactly how it works (as of 5 minutes ago) for both in- and output.

fhueske commented 10 years ago

Great! :smile:

zentol commented 10 years ago

@twalthr I have a question for you. Currently I have to write this to create and apply a python map function:

public static class EdgeByIdProjector extends PythonMap<Edge, Edge> {
        public EdgeByIdProjector(String scriptPath) {
            super(scriptPath);
        }
    }

DataSet<Edge> edgesByDegree = edgesWithDegrees
                .map(new EdgeByIdProjector(scriptPath));

it would be neat if this would be sufficient:

DataSet<Edge> edgesByDegree = edgesWithDegrees
                .map(new PythonMap<EdgeWithDegrees,Edge>(scriptPath));

but this throws

eu.stratosphere.api.java.functions.InvalidTypesException: Type of TypeVariable 'OUT' in 'class eu.stratosphere.languagebinding.api.java.python.functions.PythonMap' could not be determined. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).
    at eu.stratosphere.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:267)
    at eu.stratosphere.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:106)
    at eu.stratosphere.api.java.typeutils.TypeExtractor.getGroupReduceReturnTypes(TypeExtractor.java:50)
    at eu.stratosphere.api.java.operators.ReduceGroupOperator.<init>(ReduceGroupOperator.java:46)
    at eu.stratosphere.api.java.DataSet.reduceGroup(DataSet.java:244)
    at eu.stratosphere.languagebinding.api.java.python.functions.PythonGroupReduceTest.testGroupReduce(PythonGroupReduceTest.java:33)

This would be another option:

DataSet<Edge> edgesByDegree = edgesWithDegrees
                .map(new PythonMap(scriptPath, Edge.class, Edge.class));

Do you have an idea how i get one of the two latter options to work?

twalthr commented 10 years ago

This is a very common issue with the TypeExtractor, but we can not prevent this Exception. Due to type erasure, Java throws all generic parameters away, if you use:

new Function<ClassA,ClassB>();

The parameters stay only present if you extend from it:

new Function<ClassA,ClassB>() { };

If you would use a class PythonIdentityMapper<IN> extends MapFunction<IN,IN> then the TypeExtractor recognizes the same variable (name) and use the Input type as output type.

StephanEwen commented 10 years ago

We always need to determine the "result" type of a function. There is an interface called ResultTypeQueryable which you can implement. It would allow the API to simply ask the function what the result type is.

The interface is queried currently only for input formats, but we can add this to all other functions as well.