influxdata / influxdb-client-python

InfluxDB 2.0 python client
https://influxdb-client.readthedocs.io/en/stable/
MIT License
706 stars 185 forks source link

"query_data_frame()" triggers "pandas.errors.InvalidIndexError" when the response data table has a column named "result" #649

Closed maparham closed 4 months ago

maparham commented 5 months ago

Specifications

Code sample to reproduce problem

from datetime import datetime
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(
    url="localhost:8086",
    token="...",
    org="...",
)

write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
delete_api = client.delete_api()

# for a fresh start
delete_api.delete(
    start="2009-01-02T23:00:00Z",
    stop=datetime.now(),
    bucket="delete_me",
    org="...",
    predicate="_measurement=my_measurement",
)

# rename "result" and "table" to anything else and it works!
_point1 = Point("my_measurement").tag("location", "Prague").field("result", 1111)
_point2 = Point("my_measurement").tag("location", "New York").field("table", 2222)

write_api.write(bucket="delete_me", record=[_point1, _point2])

"""
Query: using Pandas DataFrame
"""
data_frames = query_api.query_data_frame(
    'from(bucket:"delete_me") '
    "|> range(start: -100m) "
    '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
)
print(data_frames[0].to_string())

client.close()

Expected behavior

Actual behavior

 File "/home/sss/.local/lib/python3.10/site-packages/influxdb_client/client/flux_csv_parser.py", line 259, in _prepare_data_frame
    df = pd.concat([self._data_frame.astype(_temp_df.dtypes), _temp_df])
  File "/home/sss/.local/lib/python3.10/site-packages/pandas/util/_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "/home/sss/.local/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 307, in concat
    return op.get_result()
  File "/home/sss/.local/lib/python3.10/site-packages/pandas/core/reshape/concat.py", line 528, in get_result
    indexers[ax] = obj_labels.get_indexer(new_labels)
  File "/home/sss/.local/lib/python3.10/site-packages/pandas/core/indexes/base.py", line 3442, in get_indexer
    raise InvalidIndexError(self._requires_unique_msg)
pandas.errors.InvalidIndexError: Reindexing only valid with uniquely valued Index objects

Additional info

No response

bednar commented 5 months ago

Hi @maparham,

Thank you for utilizing our client for your InfluxDB operations.

I noticed you're looking to ingest data that includes columns named result and table. Since these are reserved words within our system, used specifically for parsing data into Pandas DataFrames, you'll need to adjust your flux query to accommodate this.

You can tweak your flux query by renaming these columns within the query itself, ensuring that the reserved columns are not directly used. Here's an example of how you could modify your query to achieve this:

data_frames = query_api.query_data_frame(
    'from(bucket:"my-bucket") '
    "|> range(start: -100m) "
    '|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
    '|> map(fn: (r) => ({r with table_value: r.table, result_value: r.result})) '
    '|> drop(columns: ["table", "result"]) '
)

In this modified query, we use the map function to create new columns (table_value and result_value) that contain the data from the table and result columns, respectively. Afterward, we remove the original table and result columns with the drop function to avoid conflicts.

This approach allows you to retain all necessary data without conflicting with the reserved column names used during the DataFrame parsing process.

Should you have any further questions or need additional guidance, please don't hesitate to reach out.

Best Regards