vibhatha / pyiceberg_substrait

Apache License 2.0
0 stars 0 forks source link

Select statements with partial number of columns yielding invalid results #24

Open vibhatha opened 1 year ago

vibhatha commented 1 year ago

The projection and output_names handling has a faulty logic.

When deciding output_names, ReadRel.projection and ProjectRel.expressions, the logic doesn't extract the fields accurately.

projected_schema seems to be wrong in these examples. That must be fixed.

vibhatha commented 1 year ago

Better to look into IcebergFileDownloader.download function where projected_schema, file_schema are used to extract accurate information.

Vital part would be to look into the projection updates and expression updates.

vibhatha commented 1 year ago

From the existing plan we can gather important information. It contains the required output_names which are an accurate representation of ouput columns of the data.

The names here and the names in the file are different.

  1. First compare original plan root_rel.names vs projected_schema and file_schema

    FROM THE ORIGINAL SQL Statement, we should be able to extract the required columns

    import sqlparse

    sql_statement = "SELECT A, B, C FROM TABLE;"
    parsed = sqlparse.parse(sql_statement)

    # Assuming the first statement is the one we want
    statement = parsed[0]

    # Extract column names
    col_names = None

    for token in statement.tokens:
        if isinstance(token, sqlparse.sql.IdentifierList):
            str_token = str(token)
            col_names = str_token.split(",")

    trimmed_cols = []
    for col_name in col_names:
        trimmed_cols.append(col_name.strip())

use the trimmed_cols in `IcebergFileDownloader.download(selected_fields=trimmed_cols) within the function in the sc = self.table.scan(selected_fields=selected_fields)

This would only load the required information.

If there are filters, we should be able to use the same logic and push them down as Substrait filter expressions. Need to think about this too.

vibhatha commented 1 year ago

TODO: if selected_fields = [*], we need to make sure we update the names accordingly

assert len(root_rel_names) == len(existing_root_rel_names)
update output names
if editor.plan.relations:
    relations = editor.plan.relations
    if relations:
        if relations[0].HasField("root"):
            rel_root = relations[0].root
            rel_root.names[:] = output_names

If the lengths are not the same, we have to find out how to update the names. It would be better to use correct field names