Volue-Public / energy-mesh-python

A Python API able to communicate with Volue Energy's Mesh server.
Other
10 stars 0 forks source link

Document and maybe increase gRPC message size limits #421

Closed erny-powel closed 2 months ago

erny-powel commented 6 months ago

We currently use the default gRPC message size limit of 4 MB. This should be documented and maybe increased, although there are downsides to that.

See https://github.com/Volue/energy-mesh/issues/5029. See https://github.com/Volue/energy-ProjectMgmt-AneoExternal/issues/84#issue-2143990847.

gunnara commented 6 months ago

@erny-powel

Code for the generator function:

from typing import List, Union

import pandas as pd
import pyarrow as pa
from pandas.api.types import is_datetime64_any_dtype
from pytz import timezone
from volue.mesh import Timeseries
from itertools import batched

def generate_pa_table_from_df(
    df: pd.DataFrame,
    user_flags: Union[List[pa.Array], None],
    time_info: str = "UTC",
) -> pa.Table:
    """
    This is a generator function to convert a DataFrame to a PyArrow Table with the
    specified schema for each column in the dataframe. Hence, does the function
    return a generator object, with will return pa.Table for each column in the
    dataframe.

    The schema used is according to how the write-function in the MESH-API expects
    the data to be. Notice, that it's not within the scope of this function to check
    if the data is valid for the MESH-model.

    Conversion to pyarrow-table is done creating a list of pa.Array from the
    DataFrame given. The list of pa.Array is then used to create a pyarrow-table
    using the function pa.Table.from_arrays().

    The dataframe supplied must have unaware date-time values as first column, and the
    other columns are assumed to be values to be written to the MESH-model. If the
    data-frame has column-names that are int and not equal to default column-names,
    they are returned as a list. This since data-frames returned fromTSS-client have
    column-names as ts-key and they are used by the MESH-API.

    Since the dataframe don't contain flag values, must be supplied by user or set
    internally within this function. NB! By default, the function will set all flags
    to Timeseries.PointFlags.OK.value which is equivalent to 0, if no flags are
    supplied. Flags are set for each column in the dataset and must if supplied be
    pa.array(pa.uint32()). Validity of flags is not checked in this function,
    only dimension the list of flag arrays.

    Time information is set to UTC by default, but can be set to "NORWEGIAN_LOCAL" or
    "NORWEGIAN_NORMAL". Time index in the dataframe then converted to the timezone
    according to what the user has specified. If no timezone is given, pyarrow
    default to UTC.

    Parameters:
    - df (pd.DataFrame): Source DataFrame to convert.
    - flags (pa.array(pa.uint32()), optional): Array of flags to use for the conversion.
    - time_info (str, optional): Timezone date-values originate from, if not given,
      UTC is assumed.

    Returns:
    - pa.Table: PyArrow Table with the specified schema.
    """

    # Remove duplicate rows in the dataframe
    df = df.loc[~df.index.duplicated(keep="first")]

    # Make batch-variable to hold the batch-size for the generator, which is used to
    # split data into smaller chunks.
    batch_size = 10000
    delta_batch = 600
    batch_list = range(0, df.shape[0])
    if batch_size <= df.shape[0] < batch_size + delta_batch:
        batch_size = (batch_size + delta_batch) / 2

    # Check if all column names are integers
    if all(isinstance(col, int) for col in df.columns):
        # Generate a sequence of integers from 0 to the max column number
        expected_columns = list(range(len(df.columns)))
        # Check if the actual column names match the expected sequence
        if not list(df.columns) == expected_columns:
            df.columns = range(df.shape[1])

    # Verify that the index not is multi-index and that it's of dtype `datetime64`.
    if df.index.nlevels > 1:
        raise ValueError("DataFrame supplied can't have multi-index.")
    elif not is_datetime64_any_dtype(df.index):
        raise ValueError("Index of given DataFrame must be of dtype `datetime64`.")

    # Convert the index to the timezone specified by the user, also taking to
    # account daylight saving times. If no timezone is given, pyarrow default to UTC.
    time_index = list(df.index.to_pydatetime())
    if time_info == "NORWEGIAN_NORMAL":
        pa_tz = timezone("Europe/Oslo")
    elif time_info == "UTC":
        pa_tz = timezone("UTC")
    else:
        # Raise value error if time_info is not valid
        raise ValueError("time_info must be 'NORWEGIAN_NORMAL' or 'UTC'.")

    # Get dimension of dataframe
    n_rows = df.shape[0]
    n_cols = df.shape[1]

    # Check if user has supplied flags.
    if user_flags is None:
        # Create OK-flag if none
        flags = [pa.array([Timeseries.PointFlags.OK.value] * n_rows)] * n_cols

    else:
        # Check dimension of user-supplied flags, number of element in list must
        # equal columns the dataframe. For each column length of  must be equal
        # to the number of rows in the dataframe.
        if len(user_flags) != n_cols:
            raise ValueError(
                "Number of elements in list of user_flags must be equal to number of "
                "columns of in dataframe."
            )

        if not all(len(arr) == n_rows for arr in user_flags):
            raise ValueError(
                "Length of each array in list of user_flags must be equal to number of "
                "rows in the dataframe."
            )
        flags = user_flags

    # Create the array for time-stamps, which is the index of the dataframe and equal
    # for all columns.
    ts_array = pa.array(time_index, type=pa.timestamp("ms", tz=pa_tz))

    # Loop over the columns in the dataframe and create a pa.Table for each column.
    counter = 0
    while counter < n_cols:
        flag_array = flags[counter].cast(pa.uint32())
        value_array = pa.array(df.iloc[:, counter].values, type=pa.float64())
        pa_table = pa.Table.from_arrays(
            [ts_array, flag_array, value_array],
            schema=Timeseries.schema,
        )
        for idx in batched(batch_list, batch_size):
            yield pa_table.slice(idx[0], idx[-1])
        counter += 1
gunnara commented 6 months ago

Code to digest, written to take a dataframe, send it to the generator-function that return a pyarrow-table for each vector or a chunk of this vector.

# Imports
import pickle
from pathlib import Path
from volue.mesh import Connection, Timeseries
from src.common.mesh_functions import generate_pa_table_from_df
import time
from datetime import timedelta

# Set up connection info for MESH-API
PORT = 50051
HOST = "10.20.7.7"
""
KEYS = ["Inflow"]  # ,"InflowScenario"]

# Define the path to the pickle-file with the ts-data
base_path = Path(__file__).resolve().parents[4] / "data" / "ts_data"
pickle_file_name = "inflow_ts_data.pkl"
full_path = base_path / pickle_file_name
with open(full_path, "rb") as f:
    inflow_ts_data = pickle.load(f)

for key_i in KEYS:
    # Convert the index to datetime and set create time-zone name.
    inflow_pd = inflow_ts_data[key_i]
    index = inflow_pd.index.to_pydatetime()

    # There is an error in the data from the TSS-client, where a row is occurring twice.
    # This row must be removed.
    inflow_pd = inflow_pd.drop_duplicates()

    # Drop all rows with NaN-values, given that all columns are nan.
    inflow_pd = inflow_pd.dropna(how="all")

    # Extract the keys for in the order returned from the TSS-client.
    ts_key_tss = inflow_pd.columns
    teller = 0
    # Write to test-database
    connection = Connection(host=HOST, port=PORT)
    for col_i, _ in enumerate(ts_key_tss):
        # Due to chunking of the data, include the call to the generator in to a
        # separate function.
        for table_i in generate_pa_table_from_df(
            inflow_pd, user_flags=None, time_info="UTC"
        ):
            ...

            ts_key = ts_key_tss[col_i]
            try:
                with connection.create_session() as session:
                    timeseries = Timeseries(
                        table=table_i,
                        timskey=ts_key,
                    )
                    session.write_timeseries_points(timeseries=timeseries)
                    session.commit()

                teller +=1
                print("Teller er :", teller)

            except Exception as e:
                print(e)
                exit()
gunnara commented 6 months ago

@erny-powel

In my latest test, I did actually establish a connection for each time time it is writing to the DB using a with statement, which should close the connection for each loop. Did still get the same error. The code added here is not production code. But to use it, create a pd.Dataframe with with datetime as index and and headers as ts-keys, and store it as a picklefile.

tnoczyns-volue commented 2 months ago

In my latest test, I did actually establish a connection for each time time it is writing to the DB using a with statement, which should close the connection for each loop. Did still get the same error. The code added here is not production code. But to use it, create a pd.Dataframe with with datetime as index and and headers as ts-keys, and store it as a picklefile.

Hi @gunnara, you're getting here the RESOURCE_EXHAUSTED error? I understand the PyArrow table generator, you provided in one of the comments above, does the slicing into chunks so that the output table size is lower than the message max size limit. Can you confirm the PyArrow table[^1] size is less than 4MB? E.g. by printing table_i.nbytes before calling write_timeseries_points?

Additionally:

[^1]: Provided as input to `write_timeseries_points. [^2]: Mesh logs every gRPC request to this file by default, unless explicitly configured not to. Please check mesh.json configuration option: "RequestLogging" under "Log".

tnoczyns-volue commented 2 months ago