influxdata / influxdb-client-python

InfluxDB 2.0 python client
https://influxdb-client.readthedocs.io/en/stable/
MIT License
724 stars 187 forks source link

The error asyncio.exceptions.CancelledError occurred while parsing the CSV. #671

Open kivdev opened 2 months ago

kivdev commented 2 months ago

Specifications

Code sample to reproduce problem

async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
        # Stream of FluxRecords
        query_api = client.query_api()
        records = await query_api.query_stream(
        (
            'from(bucket:"{bucket}") |> range(start: {start}, stop: {stop})'
            '|> filter(fn: (r) => r["_measurement"] == "{measurement}")'
            '|> filter(fn: (r) => r.name == "{metric}")'
        ).format(
            bucket='metrics', start='2023-01-01T00:00:00Z', stop='2023-12-31T23:59:59Z', metric='http_request_total_count', measurement='http')
        )
        async for record in records:
            print(record)

Expected behavior

Executed without errors.

Actual behavior

File "/app/my_app/service.py", line 51, in get_influxdb_data
    async for record in records:
  File "/usr/local/lib/python3.11/site-packages/influxdb_client/client/flux_csv_parser.py", line 141, in _parse_flux_response_async
    async for csv in self._reader:
  File "/usr/local/lib/python3.11/site-packages/aiocsv/readers.py", line 54, in __anext__
    return await self._parser.__anext__()
asyncio.exceptions.CancelledError

Additional info

The error occurs only if there is a lot of data. (251,395 records with JSON content)

bednar commented 1 month ago

Hi @kivdev,

Thank you for reaching out with your issue. To better understand and address the problem you're experiencing, it would be incredibly helpful if you could share an example of how your data looks. An anonymized export from the InfluxDB UI would be ideal.

This information will allow us to accurately simulate your scenario and work towards a resolution.

Thanks a lot for your cooperation. Looking forward to your response.

Best Regards.

kivdev commented 1 month ago

Hi @bednar,

The data is stored as a string in the format [{"client": str, "requests": int, "date_report": str}, ....]. Unfortunately, I cannot provide a slice from the database.

nastynaz commented 4 weeks ago

I'm also having this error. I'm returning a query with 300k rows. Any updates? @bednar

The exact code I'm using is:

result = await influxdb_connector.query_data_frame(query, use_extension_dtypes=True)

The flux query is:

  bucket = from(bucket: "quotes")
    |> range(start: {start_rfc3339}, stop: {end_rfc3339})
    |> filter(fn: (r) => r["_measurement"] == "quote")
    |> filter(fn: (r) => r["coin"] == "{coin}")
    |> filter(fn: (r) => r["_field"] =~ /^(mid_price|bid_price|ask_price)$/ )

  bucket
    |> group(columns: ["exchange"])
    |> drop(columns: ["_start", "_stop", "_measurement", "fiat", "coin"])
    |> rename(columns: {{_time: "time"}})
    |> yield(name: "result")

The bottom of the exception output:

 File [~/dev/brain/analysis/src/data_loader/influxdb_loader.py:16](http://localhost:8888/doc/tree/~/dev/brain/analysis/src/data_loader/influxdb_loader.py#line=15), in InfluxDBLoader.load_data(query)
     14 try:
     15   logger.debug(f"executing influxdb query {query}")
---> 16   result = await influxdb_connector.query_data_frame(query, use_extension_dtypes=True)
     17   return result
     18 except Exception as e:

File [~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/query_api_async.py:161](http://localhost:8888/doc/tree/~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/query_api_async.py#line=160), in QueryApiAsync.query_data_frame(self, query, org, data_frame_index, params, use_extension_dtypes)
    157 _generator = await self.query_data_frame_stream(query, org=org, data_frame_index=data_frame_index,
    158                                                 params=params, use_extension_dtypes=use_extension_dtypes)
    160 dataframes = []
--> 161 async for dataframe in _generator:
    162     dataframes.append(dataframe)
    164 return self._to_data_frames(dataframes)

File [~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/flux_csv_parser.py:141](http://localhost:8888/doc/tree/~/dev/brain/analysis/.venv/lib/python3.11/site-packages/influxdb_client/client/flux_csv_parser.py#line=140), in FluxCsvParser._parse_flux_response_async(self)
    138 metadata = _FluxCsvParserMetadata()
    140 try:
--> 141     async for csv in self._reader:
    142         for val in self._parse_flux_response_row(metadata, csv):
    143             yield val

File [~/dev/brain/analysis/.venv/lib/python3.11/site-packages/aiocsv/readers.py:54](http://localhost:8888/doc/tree/~/dev/brain/analysis/.venv/lib/python3.11/site-packages/aiocsv/readers.py#line=53), in AsyncReader.__anext__(self)
     53 async def __anext__(self) -> List[str]:
---> 54     return await self._parser.__anext__()

CancelledError:

Edit: By breaking down my query into smaller time ranges it works. The upper limit for a single fetch seems to be around 2.4m rows before it fails

karel-rehor commented 4 days ago

I've managed to reproduce this issue with the attached script. exploreIssue671.py.txt

$ python examples/exploreIssue671.py seed size 100000
$ python examples/exploreIssue671.py query

Will investigate further.