influxdata / influxdb-client-python

InfluxDB 2.0 python client
https://influxdb-client.readthedocs.io/en/stable/
MIT License
721 stars 187 forks source link

Memory leak in query_data_frame #548

Closed karponi closed 5 months ago

karponi commented 1 year ago

Specifications

Code sample to reproduce problem


def query_influx(
    query, input_data, config, client
) -> pd.DataFrame:
    with InfluxHelper(influx_config=config.influx) as influx:
        logger.info(
            "%s: fetching values from influx from %s to %s",
            input_data.machine,
            input_data.dateFrom,
            input_data.dateTo,
        )

        return client.query_api.query_data_frame(
            query=query,
            params=dict(
                _bucket=influx.influx_config.bucket,
                machine=input_data.machine,
                start_date=input_data.dateFrom,
                stop_date=input_data.dateTo,
            ),
            org=influx.influx_config.org,
        )

def _retrieve_influx_data(input_data) -> pd.DataFrame:

    influx_get_query = """
                   from(bucket: _bucket)
                   |> range(start: start_date, stop: stop_date)
                   """ + (
        "|> filter(fn: (r) => r._measurement == machine and r._field =~/^("
        + "|".join(config.features)
        + ")$/)"
    )

    data_frame = query_influx(
        query=influx_get_query, input_data=input_data, config=config
    )

    logger.info("%s: processing influx results", input_data.machine)
    return process_influx_results(data_frame)

Expected behavior

The memory should not increase at every call of query_influx

Actual behavior

The memory is being increased at every call, leading to crashes

Additional info

here's the output of the memory_profiler:

Line     Mem usage    Increment  Occurrences   Line Contents
=============================================================
    24    185.8 MiB    185.8 MiB           1   @profile
    25                                         def query_influx(
    26                                             query, input_data, config, client
    27                                         ) -> pd.DataFrame:
    28                                             """
    29                                             Queries influx "common" bucket for data.
    30                                             Args:
    31                                                 input_data: the configuration sent by the request
    32                                                 config: the configuration file of the app
    33                                         
    34                                             Returns: the result of the query as a dataframe and the number of rows of the result
    35                                         
    36                                             """
    37                                         
    38    291.5 MiB      0.0 MiB           2       with InfluxHelper(influx_config=config.influx) as influx:
    39    185.8 MiB      0.0 MiB           2           logger.info(
    40    185.8 MiB      0.0 MiB           1               "%s: fetching values from influx from %s to %s",
    41    185.8 MiB      0.0 MiB           1               input_data.machine,
    42    185.8 MiB      0.0 MiB           1               input_data.dateFrom,
    43    185.8 MiB      0.0 MiB           1               input_data.dateTo,
    44                                                 )
    45                                         
    46    291.5 MiB    105.7 MiB           2           return client.query_api.query_data_frame(
    47    185.8 MiB      0.0 MiB           1               query=query,
    48    185.8 MiB      0.0 MiB           2               params=dict(
    49    185.8 MiB      0.0 MiB           1                   _bucket=influx.influx_config.bucket,
    50    185.8 MiB      0.0 MiB           1                   machine=input_data.machine,
    51    185.8 MiB      0.0 MiB           1                   start_date=input_data.dateFrom,
    52    185.8 MiB      0.0 MiB           1                   stop_date=input_data.dateTo,
    53                                                     ),
    54    185.8 MiB      0.0 MiB           1               org=influx.influx_config.org,
    55                                                 )

Line     Mem usage    Increment  Occurrences   Line Contents
=============================================================
    93    184.2 MiB    184.2 MiB           1   @profile
    94                                         def _retrieve_influx_data(input_data) -> pd.DataFrame:
    95                                         
    96    184.2 MiB      0.0 MiB           2       influx_get_query = """
    97                                                            from(bucket: _bucket)
    98                                                            |> range(start: start_date, stop: stop_date)
   100    184.2 MiB      0.0 MiB          3         """ + ( "|> filter(fn: (r) => r._measurement == machine and r._field =~/^("                     
   101    184.2 MiB      0.0 MiB           1           + "|".join(config.features)
   102    184.2 MiB      0.0 MiB           1           + ")$/)"
   103                                             )
   104                                         
   **105    291.5 MiB    107.3 MiB           2       data_frame = query_influx(
   106    184.2 MiB      0.0 MiB           1           query=influx_get_query, input_data=input_data, config=config
   107                                             )
   108                                         
   109    291.5 MiB      0.0 MiB           1       logger.info("%s: processing influx results", input_data.machine)
   110**                                                                             
   111    324.9 MiB     33.4 MiB           1       return process_influx_results(data_frame)
alespour commented 5 months ago

I cannot reproduce memory leak unless I intentionally create one by holding a ref to returned data frame. Code I used attached.

without leak:

Filename: issue_548.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    43     74.5 MiB     74.5 MiB           1   @profile
    44                                         def run(api) -> int:
    45     74.5 MiB      0.0 MiB           1       n = 0
    46     77.3 MiB      2.8 MiB           1       n += _retrieve_influx_data(query_api)
    47     77.4 MiB      0.1 MiB           1       n += _retrieve_influx_data(query_api)
    48     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    49     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    50     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    51     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    52     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    53     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    54     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    55     77.4 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    56     77.4 MiB      0.0 MiB           1       return n

with intentional leak (--leak):

Filename: issue_548.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
    43     74.5 MiB     74.5 MiB           1   @profile
    44                                         def run(api) -> int:
    45     74.5 MiB      0.0 MiB           1       n = 0
    46     77.8 MiB      3.3 MiB           1       n += _retrieve_influx_data(query_api)
    47     77.8 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    48     78.1 MiB      0.3 MiB           1       n += _retrieve_influx_data(query_api)
    49     78.3 MiB      0.3 MiB           1       n += _retrieve_influx_data(query_api)
    50     78.4 MiB      0.1 MiB           1       n += _retrieve_influx_data(query_api)
    51     78.7 MiB      0.3 MiB           1       n += _retrieve_influx_data(query_api)
    52     78.7 MiB      0.1 MiB           1       n += _retrieve_influx_data(query_api)
    53     79.0 MiB      0.2 MiB           1       n += _retrieve_influx_data(query_api)
    54     79.1 MiB      0.1 MiB           1       n += _retrieve_influx_data(query_api)
    55     79.1 MiB      0.0 MiB           1       n += _retrieve_influx_data(query_api)
    56     79.1 MiB      0.0 MiB           1       return n
alespour commented 5 months ago

Could not reproduce.