apache / incubator-wayang

Apache Wayang(incubating) is the first cross-platform data processing system.
https://wayang.incubator.apache.org/
Apache License 2.0
174 stars 70 forks source link

Add JdbcJoinOperator and PostgresJoinOperator #385

Closed juripetersen closed 7 months ago

juripetersen commented 7 months ago

References #384

The proposed solution utilises a somewhat "hacky" solution to the problem of providing a generic interface for JoinOperators. This is taken from previous examples of ProjectionDescriptor that allow to give a "hardcoded" sqlImplementation that specifies how the Keys are to be retrieved.

As you can see in the provided test, this results in JoinOperators taking two string identifiers that specify the tables name and key to join on:

       try (Connection jdbcConnection = hsqldbPlatform.createDatabaseDescriptor(configuration).createJdbcConnection()) {
            final Statement statement = jdbcConnection.createStatement();
            statement.execute("CREATE TABLE testA (a INT, b VARCHAR(6));");
            statement.execute("INSERT INTO testA VALUES (0, 'zero');");
            statement.execute("CREATE TABLE testB (a INT, b INT);");
            statement.execute("INSERT INTO testB VALUES (0, 100);");
        }

        final ExecutionOperator joinOperator = new HsqldbJoinOperator<Integer>(
            new TransformationDescriptor<Record, Integer>(
                (record) -> (Integer) record.getField(0),
                Record.class,
                Integer.class
            ).withSqlImplementation("testA", "a"),
            new TransformationDescriptor<Record, Integer>(
                (record) -> (Integer) record.getField(0),
                Record.class,
                Integer.class
            ).withSqlImplementation("testB", "a")
        );

Thus, the JoinMapping specifies the need of presence for these values in the mapping itself:

    private SubplanPattern createSubplanPattern() {
        OperatorPattern<JoinOperator<Record, Record, Object>> operatorPattern = new OperatorPattern<>(
                "join",
                new JoinOperator<Record, Record, Object>(
                        null,
                        null,
                        DataSetType.createDefault(Record.class),
                        DataSetType.createDefault(Record.class)
                ),
                false
        )
            .withAdditionalTest(op -> op.getKeyDescriptor0() instanceof TransformationDescriptor)
            .withAdditionalTest(op -> op.getKeyDescriptor1() instanceof TransformationDescriptor)
            .withAdditionalTest(op -> op.getKeyDescriptor0().getSqlImplementation() != null)
            .withAdditionalTest(op -> op.getKeyDescriptor1().getSqlImplementation() != null);
        return SubplanPattern.createSingleton(operatorPattern);
    }
kbeedkar commented 7 months ago

@juripetersen Thanks for the contributions. Could you please check couple of comments I left?

juripetersen commented 7 months ago

@juripetersen Thanks for the contributions. Could you please check couple of comments I left?

@kbeedkar Sure, however I can't seem to find your comments yet.

juripetersen commented 7 months ago

I can see the problem of having a cyclic reference and will remove that.

zkaoudi commented 7 months ago

Discussing offline with @juripetersen, we saw that there may be the possibility to use a JoinDescriptor similar to this one that is for the projections: https://github.com/apache/incubator-wayang/blob/main/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/function/ProjectionDescriptor.java

@kbeedkar what do you think?

kbeedkar commented 7 months ago

That makes sense. However, I suggest checking if we can follow the same approach as for filters. https://github.com/apache/incubator-wayang/blob/5e6a07edaae3e7ebac9ac73814f95bafe741845d/wayang-platforms/wayang-postgres/src/main/java/org/apache/wayang/postgres/operators/PostgresFilterOperator.java#L30

i.e., the postgresJoin's constructor takes in two arguments of type TransformationDescriptor<Record, Key>

juripetersen commented 7 months ago

@kbeedkar the PostgresFilterOperator uses the sqlImplementation and the executor only compiles this "hardcoded" information into the desired SQL query string. This can be seen in the JDBC FunctionCompiler:

public class FunctionCompiler {

    /**
     * Compile a predicate to a SQL {@code WHERE} clause.
     *
     * @param descriptor describes the predicate
     * @return a compiled SQL {@code WHERE} clause
     */
    public String compile(PredicateDescriptor descriptor) {
        final String sqlImplementation = descriptor.getSqlImplementation();
        assert sqlImplementation != null;
        return sqlImplementation;
    }
}

This is also where the currently proposed solution of this PR was inspired from. @zkaoudi and I were discussing an approach to avoid this and actually use a generic interface in Joins. However, this seems to be non-trivial and is likely to break backwards compatibility.

juripetersen commented 7 months ago

I can imagine using something like this when defining the Join:

        final ExecutionOperator joinOperator = new HsqldbJoinOperator<Integer>(
            new JoinDescriptor<Record, Record, Integer>(
                "tableName1",
                "tableName2",
                "keyName1",
                "keyName2",
                Record.class,
                Record.class,
                Integer.class
            )
        );

Where JoinDescriptor extends TransformationDescriptor and JdbcJoinOperator can only be derived from JoinOperators that are defined with JoinDescriptor.

But this would obviously only work for Jdbc based Joins and not for Java or spark joins since the FunctionDescriptor is omitted. So the solution proposed right now with the extension of TransformationDescriptor seems to be better than the JoinDescriptor.

zkaoudi commented 7 months ago

Yeah, we wouldn't want to break things for Spark and Java. So maybe we merge your PR with the sqlImplementation and create an issue about this for all operators, as it requires a more general approach.

zkaoudi commented 7 months ago

@juripetersen can you please check? There are some compilation errors that make the build fail.

kbeedkar commented 7 months ago

Yes, extending the TransformationDescriptor sounds good.

juripetersen commented 7 months ago

@zkaoudi will sort the compilation out now and push afterwards.

juripetersen commented 7 months ago

@zkaoudi @kbeedkar compilation works fine locally now.