aliyun / alibabacloud-odps-maxframe-client

Apache License 2.0
4 stars 2 forks source link

MaxFrame GroupBy with as_index=False causes the loss of the index column #6

Closed snowljs closed 4 months ago

snowljs commented 4 months ago

When using groupby with parameter as_index=False, the following error is encountered:


ODPSError Traceback (most recent call last)

~/maxframe/core/maxframe/session.py in _execute(session, wait, show_progress, progress_update_interval, cancelled, *tileables, **kwargs) 1033 # set cancelled to avoid wait task leak 1034 cancelled.set() -> 1035 await execution_info 1036 else: 1037 return execution_info

~/maxframe/core/maxframe/session.py in wait() 73 74 async def wait(): ---> 75 return await self._aio_task 76 77 self._future_local.future = fut = asyncio.run_coroutine_threadsafe(

~/maxframe/core/maxframe_client/session/odps.py in _run_in_background(self, dag_info, tileables, progress) 287 progress.value = 1.0 288 elif dag_info.status == DagStatus.FAILED: --> 289 dag_info.error_info.reraise() 290 291 if dag_info.status in (DagStatus.RUNNING, DagStatus.CANCELLED):

~/maxframe/core/maxframe/protocol.py in reraise(self) 206 def reraise(self): 207 if self.raw_error_source == ErrorSource.PYTHON: --> 208 raise self.raw_error_data 209 raise RemoteException(self.error_messages, self.error_tracebacks, []) 210

/apsara/odps/KubeTempRoot/dailyrunnew/executor-dailyrunnew-5/MAXFRAME_TASK_default_2024-04-02-21-07-16-0543_1_108161_1/python/framedriver/maxframe_framedriver/services/dag/dagrunner.py in _run()

/apsara/odps/KubeTempRoot/dailyrunnew/executor-dailyrunnew-5/MAXFRAME_TASK_default_2024-04-02-21-07-16-0543_1_108161_1/python/framedriver/maxframe/protocol.py in reraise()

/apsara/odps/KubeTempRoot/dailyrunnew/executor-dailyrunnew-5/MAXFRAME_TASK_default_2024-04-02-21-07-16-0543_1_108161_1/python/framedriver/maxframe_framedriver/runners/mcsql.py in _run()

/apsara/odps/KubeTempRoot/dailyrunnew/executor-dailyrunnew-5/MAXFRAME_TASK_default_2024-04-02-21-07-16-0543_1_108161_1/python/framedriver/maxframe_framedriver/runners/mcsql.py in _execute_sql_interactive()

/apsara/odps/KubeTempRoot/dailyrunnew/executor-dailyrunnew-5/MAXFRAME_TASK_default_2024-04-02-21-07-16-0543_1_108161_1/python/framedriver/maxframe_framedriver/runners/mcsql.py in _execute_sql_offline()

/apsara/odps/KubeTempRoot/dailyrunnew/executor-dailyrunnew-5/MAXFRAME_TASK_default_2024-04-02-21-07-16-0543_1_108161_1/python/framedriver/maxframe_framedriver/runners/mcsql.py in _wait_for_instance_success()

/apsara/odps/KubeTempRoot/dailyrunnew/executor-dailyrunnew-5/MAXFRAME_TASK_default_2024-04-02-21-07-16-0543_1_108161_1/python/framedriver/maxframe_framedriver/runners/mcsql.py in _check_instance_success()

ODPSError: ODPS-0130071: InstanceId: 20240403024124100gy1nar8r8un5 ODPS-0130071:[12,24] Semantic analysis exception - wrong columns count 2 in data source, requires 3 columns (includes dynamic partitions if any)

snowljs commented 4 months ago

Constructing input data

from odps import ODPS
from odps.df import DataFrame as ODPSDataFrame

data_sets = [{
    "table_name": "product",
    "table_schema" : "index bigint, product_id bigint, product_name string, current_price bigint",
    "source_type": "records",
    "records" : [
        [1, 100, 'Nokia', 1000],
        [2, 200, 'Apple', 5000],
        [3, 300, 'Samsung', 9000]
    ],
},
{
    "table_name" : "sales",
    "table_schema" : "index bigint, sale_id bigint, product_id bigint, user_id bigint, year bigint, quantity bigint, price bigint",
    "source_type": "records",
    "records" : [
        [1, 1, 100, 101, 2008, 10, 5000],
        [2, 2, 300, 101, 2009, 7, 4000],
        [3, 4, 100, 102, 2011, 9, 4000],
        [4, 5, 200, 102, 2013, 6, 6000],
        [5, 8, 300, 102, 2015, 10, 9000],
        [6, 9, 100, 102, 2015, 6, 2000]
    ],
    "lifecycle": 5
}]

import pandas as pd
def prepare_data(o: ODPS, data_sets, suffix="", drop_if_exists=False):
    for index, data in enumerate(data_sets):
        table_name = data.get("table_name")
        table_schema = data.get("table_schema")
        source_type = data.get("source_type")

        if not table_name or not table_schema or not source_type:
            raise ValueError(f"Dataset at index {index} is missing one or more required keys: 'table_name', 'table_schema', or 'source_type'.")

        lifecycle = data.get("lifecycle", 5)
        table_name += suffix

        print(f"Processing {table_name}...")
        if drop_if_exists:
            print(f"Deleting {table_name}...")
            o.delete_table(table_name, if_exists=True)

        o.create_table(name=table_name, table_schema=table_schema, lifecycle=lifecycle, if_not_exists=True)

        if source_type == "local_file":
            file_path = data.get("file")
            if not file_path:
                raise ValueError(f"Dataset at index {index} with source_type 'local_file' is missing the 'file' key.")
            sep = data.get("sep", ",")
            pd_df = pd.read_csv(file_path, sep=sep)
            ODPSDataFrame(pd_df).persist(table_name, drop_table=True)
        elif source_type == 'records':
            records = data.get("records")
            if not records:
                raise ValueError(f"Dataset at index {index} with source_type 'records' is missing the 'records' key.")
            with o.get_table(table_name).open_writer() as writer:
                writer.write(records)
        else:
            raise ValueError(f"Unknown data set source_type: {source_type}")

        print(f"Processed {table_name} Done")

prepare_data(o, data_sets, "_maxframe_debug", True)

Maxframe code

from maxframe.session import new_session
import maxframe.dataframe as md

session = new_session(o)
print(session.session_id)

sales = md.read_odps_table("sales_maxframe_debug", index_col="index")

df = sales.groupby(['product_id'], as_index=False) \
          .agg(total_quantity=('quantity', 'sum'))
df.execute().fetch()
devinJiu commented 4 months ago

The issue is caused by the column being incorrectly overwritten during the groupby process. This has been fixed in version v0.1.0b4; you can switch to version v0.1.0b4 for use.