influxdata / influxdb-client-python

InfluxDB 2.0 python client
https://influxdb-client.readthedocs.io/en/stable/
MIT License
706 stars 185 forks source link

Pivotted query result with long and float type columns causes ValueError in `_to_value()` #621

Closed zenzxjul closed 7 months ago

zenzxjul commented 8 months ago

Specifications

Code sample to reproduce problem

import os
from influxdb_client import InfluxDBClient

with InfluxDBClient(token=os.environ["INFLUXDB_TOKEN"],
                        url=os.environ["INFLUXDB_HOST"],
                        org=os.environ["INFLUXDB_ORG"]) as client:

    query_api = client.query_api()
    bucket = os.environ["INFLUXDB_BUCKET"]
    org = os.environ["INFLUXDB_ORG"]

    flux_query = f'''
        from(bucket: "{bucket}")
            |> range(start: 2023-12-15T13:19:54Z, stop: 2023-12-15T13:19:57Z)
            |> filter(fn: (r) => r["_measurement"] == "test")
            |> filter(fn: (r) => r["_field"] == "test_double" or r["_field"] == "test_long")
            |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")  
        '''

    df = query_api.query_data_frame(org=org, query=flux_query)

    print(df.head)

Expected behavior

Print the head of the table like:

PS C:\Users\zenzxjul> python value_error.py
<bound method NDFrame.head of     result  table                            _time                    _start                     _stop _measurement  test_double  test_long
0  _result      0 2023-12-15 13:19:55.372000+00:00 2023-12-15 13:19:54+00:00 2023-12-15 13:19:57+00:00         test          4.0        NaN
1  _result      0        2023-12-15 13:19:56+00:00 2023-12-15 13:19:54+00:00 2023-12-15 13:19:57+00:00         test          NaN        1.0>

Actual behavior

PS C:\Users\zenzxjul> python value_error.py
Traceback (most recent call last):
  File "C:\Users\zenzxjul\value_error.py", line 20, in <module>
    df = query_api.query_data_frame(org=org, query=flux_query)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\query_api.py", line 254, in query_data_frame
    return self._to_data_frames(_generator)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\_base.py", line 318, in _to_data_frames
    _dataFrames = list(_generator)
                  ^^^^^^^^^^^^^^^^
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\flux_csv_parser.py", line 115, in generator
    for val in parser._parse_flux_response():
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\flux_csv_parser.py", line 126, in _parse_flux_response
    for val in self._parse_flux_response_row(metadata, csv):
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\flux_csv_parser.py", line 228, in _parse_flux_response_row
    flux_record = self.parse_record(metadata.table_index - 1, metadata.table, csv)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\flux_csv_parser.py", line 265, in parse_record
    record.values[column_name] = self._to_value(str_val, fluxColumn)
                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\flux_csv_parser.py", line 277, in _to_value
    return self._to_value(np.nan, column)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\zenzxjul\AppData\Local\Programs\Python\Python311\Lib\site-packages\influxdb_client\client\flux_csv_parser.py", line 289, in _to_value
    return int(str_val)
           ^^^^^^^^^^^^
ValueError: cannot convert float NaN to integer

Additional info

Minimal csv data to reproduce:

#group,false,false,true,true,false,false,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string
#default,_result,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement
,,0,2023-12-15T13:19:54Z,2023-12-15T13:19:57Z,2023-12-15T13:19:55.372Z,4,test_double,test

#group,false,false,true,true,false,false,true,true
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string
#default,_result,,,,,,,
,result,table,_start,_stop,_time,_value,_field,_measurement
,,1,2023-12-15T13:19:54Z,2023-12-15T13:19:57Z,2023-12-15T13:19:56Z,1,test_long,test
zenzxjul commented 8 months ago

The issue arose from inpecting Quix's new InfluxDB 2.0 connector that uses query_data_frame().

mkmark commented 8 months ago

int(np.nan) causes the error. For temporary fix, replace the function.

import numpy as np

import influxdb_client.client.flux_csv_parser
from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.client.flux_csv_parser import FluxSerializationMode

class FluxCsvParser(influxdb_client.client.flux_csv_parser.FluxCsvParser):
  def _to_value(self, str_val, column):
    if str_val == '' or str_val is None:
        default_value = column.default_value
        if default_value == '' or default_value is None:
            if self._serialization_mode is FluxSerializationMode.dataFrame:
                return self._to_value(np.nan, column)
            return None
        return self._to_value(default_value, column)

    if "string" == column.data_type:
        return str_val

    if "boolean" == column.data_type:
        return "true" == str_val

    if "unsignedLong" == column.data_type or "long" == column.data_type:
        if str_val is np.nan:
            return int(0)
        return int(str_val)

    if "double" == column.data_type:
        if str_val is np.nan:
            return float(0)
        return float(str_val)

    if "base64Binary" == column.data_type:
        return base64.b64decode(str_val)

    if "dateTime:RFC3339" == column.data_type or "dateTime:RFC3339Nano" == column.data_type:
        return get_date_helper().parse_date(str_val)

    if "duration" == column.data_type:
        # todo better type ?
        return int(str_val)

influxdb_client.client.flux_csv_parser.FluxCsvParser._to_value = FluxCsvParser._to_value
bednar commented 8 months ago

@mkmark thanks for your temp fix, we will add this to our backlog or

@mkmark, @zenzxjul is this something you would be willing to help with? All PR is welcome and we will be happy to review your submission.