apache / superset

Apache Superset is a Data Visualization and Data Exploration Platform
https://superset.apache.org/
Apache License 2.0
62.91k stars 13.91k forks source link

fix(csv_export): use custom CSV_EXPORT parameters in pd.read_csv for pivot table #30961

Open frlm opened 4 days ago

frlm commented 4 days ago

Title: fix(csv_export): use custom CSV_EXPORT parameters in pd.read_csv

Bug description

Function: apply_post_process

The issue is that pd.read_csv uses the default values of pandas instead of the parameters defined in CSV_EXPORT in superset_config. This problem is rarely noticeable when using the separator , and the decimal .. However, with the configuration CSV_EXPORT='{"encoding": "utf-8", "sep": ";", "decimal": ","}', the issue becomes evident. This change ensures that pd.read_csv uses the parameters defined in CSV_EXPORT.

Steps to reproduce error:

image

Cause: The error is generated by an anomaly in the input DataFrame df, which has the following format (a single column with all distinct fields separated by a semicolon separator):

,state;name;sum__num
0,other;Michael;1047996
1,other;Christopher;803607
2,other;James;749686

Fix: Added a bug fix to read data with right CSV_EXPORT settings

Code Changes:

        elif query["result_format"] == ChartDataResultFormat.CSV:
            df = pd.read_csv(StringIO(data), 
                             delimiter=superset_config.CSV_EXPORT.get('sep'),
                             encoding=superset_config.CSV_EXPORT.get('encoding'),
                             decimal=superset_config.CSV_EXPORT.get('decimal'))

Complete Code


def apply_post_process(
    result: dict[Any, Any],
    form_data: Optional[dict[str, Any]] = None,
    datasource: Optional[Union["BaseDatasource", "Query"]] = None,
) -> dict[Any, Any]:
    form_data = form_data or {}

    viz_type = form_data.get("viz_type")
    if viz_type not in post_processors:
        return result

    post_processor = post_processors[viz_type]

    for query in result["queries"]:
        if query["result_format"] not in (rf.value for rf in ChartDataResultFormat):
            raise Exception(  # pylint: disable=broad-exception-raised
                f"Result format {query['result_format']} not supported"
            )

        data = query["data"]

        if isinstance(data, str):
            data = data.strip()

        if not data:
            # do not try to process empty data
            continue

        if query["result_format"] == ChartDataResultFormat.JSON:
            df = pd.DataFrame.from_dict(data)
        elif query["result_format"] == ChartDataResultFormat.CSV:
            df = pd.read_csv(StringIO(data), 
                             delimiter=superset_config.CSV_EXPORT.get('sep'),
                             encoding=superset_config.CSV_EXPORT.get('encoding'),
                             decimal=superset_config.CSV_EXPORT.get('decimal'))

        # convert all columns to verbose (label) name
        if datasource:
            df.rename(columns=datasource.data["verbose_map"], inplace=True)

        processed_df = post_processor(df, form_data, datasource)

        query["colnames"] = list(processed_df.columns)
        query["indexnames"] = list(processed_df.index)
        query["coltypes"] = extract_dataframe_dtypes(processed_df, datasource)
        query["rowcount"] = len(processed_df.index)

        # Flatten hierarchical columns/index since they are represented as
        # `Tuple[str]`. Otherwise encoding to JSON later will fail because
        # maps cannot have tuples as their keys in JSON.
        processed_df.columns = [
            " ".join(str(name) for name in column).strip()
            if isinstance(column, tuple)
            else column
            for column in processed_df.columns
        ]
        processed_df.index = [
            " ".join(str(name) for name in index).strip()
            if isinstance(index, tuple)
            else index
            for index in processed_df.index
        ]

        if query["result_format"] == ChartDataResultFormat.JSON:
            query["data"] = processed_df.to_dict()
        elif query["result_format"] == ChartDataResultFormat.CSV:
            buf = StringIO()
            processed_df.to_csv(buf)
            buf.seek(0)
            query["data"] = buf.getvalue()

    return result
frlm commented 3 days ago

UPDATE: Export Pivot Tables into CSV Format

In the last commit, I made a small change to export pivot tables without flattening multi-index rows/columns. This feature, initially implemented by the community for managing JSON export, is particularly unhelpful when dealing with pivot tables exported into CSV format with a large number of columns corresponding to a multi-index.

To better explain the issue, I have attached an example. If I create a pivot table with many columns associated with multi-index rows, the flattening process transforms them into a single column with a field value obtained by concatenating all fields separated by a space. This approach is not very effective if we want to use these exports in Excel or other tools.

Example:

I created this pivot table using your example dataset:

pivot_table

The AS-IS behavior generates this type of file, where the MultiIndex columns get collapsed into one row and MultiIndex Rows get merged into a single column, obtained by concatenating all fields separated by a space. This export is not very easy to use in Excel for the next steps.

image

export_as_is_20241119_101212.csv

The TO-BE behavior generates this type of file, where the MultiIndex rows/columns get preserved:

image

export_to_be_20241119_100853.csv

Code Complete

def apply_post_process(
    result: dict[Any, Any],
    form_data: Optional[dict[str, Any]] = None,
    datasource: Optional[Union["BaseDatasource", "Query"]] = None,
) -> dict[Any, Any]:
    form_data = form_data or {}

    viz_type = form_data.get("viz_type")
    if viz_type not in post_processors:
        return result

    post_processor = post_processors[viz_type]

    for query in result["queries"]:
        if query["result_format"] not in (rf.value for rf in ChartDataResultFormat):
            raise Exception(  # pylint: disable=broad-exception-raised
                f"Result format {query['result_format']} not supported"
            )

        data = query["data"]

        if isinstance(data, str):
            data = data.strip()

        if not data:
            # do not try to process empty data
            continue

        if query["result_format"] == ChartDataResultFormat.JSON:
            df = pd.DataFrame.from_dict(data)
        elif query["result_format"] == ChartDataResultFormat.CSV:
            df = pd.read_csv(StringIO(data), 
                             sep=csv_export_settings.get('sep', ','), 
                             encoding=csv_export_settings.get('encoding', 'utf-8'), 
                             decimal=csv_export_settings.get('decimal', '.'))

        # convert all columns to verbose (label) name
        if datasource:
            df.rename(columns=datasource.data["verbose_map"], inplace=True)

        processed_df = post_processor(df, form_data, datasource)

        query["colnames"] = list(processed_df.columns)
        query["indexnames"] = list(processed_df.index)
        query["coltypes"] = extract_dataframe_dtypes(processed_df, datasource)
        query["rowcount"] = len(processed_df.index)

        if query["result_format"] == ChartDataResultFormat.JSON:
            # Flatten hierarchical columns/index since they are represented as
            # `Tuple[str]`. Otherwise encoding to JSON later will fail because
            # maps cannot have tuples as their keys in JSON.
            processed_df.columns = [
                " ".join(str(name) for name in column).strip()
                if isinstance(column, tuple)
                else column
                for column in processed_df.columns
            ]
            processed_df.index = [
                " ".join(str(name) for name in index).strip()
                if isinstance(index, tuple)
                else index
                for index in processed_df.index
            ]
            query["data"] = processed_df.to_dict()

        elif query["result_format"] == ChartDataResultFormat.CSV:
            buf = StringIO()
            processed_df.to_csv(buf, 
                                sep=csv_export_settings.get('sep', ','), 
                                encoding=csv_export_settings.get('encoding', 'utf-8'), 
                                decimal=csv_export_settings.get('decimal', '.'))
            buf.seek(0)
            query["data"] = buf.getvalue()

    return result
frlm commented 1 day ago

I don't understand what I have to do, can you check if there are issues?