influxdata / influxdb-client-python

InfluxDB 2.0 python client
https://influxdb-client.readthedocs.io/en/stable/
MIT License
706 stars 185 forks source link

AttributeError: 'WriteApi' object has no attribute '_subject' #626

Closed jalees closed 8 months ago

jalees commented 8 months ago

Specifications

Code sample to reproduce problem


from influxdb_client import InfluxDBClient, Point, WritePrecision
import pandas as pd
from datetime import datetime
import threading

url = "http://localhost:8086"
token = "**********************"
org = "grafana_on_mac"
bucket = "grafana_test"

client = InfluxDBClient(url=url, org=org,token=token)

csv_file = "covid_19_india.csv"

def convert_to_utc(date_str, time_str):
    datetime_str = f"{date_str} {time_str}"
    dt_obj = datetime.strptime(datetime_str, "%Y-%m-%d %I:%M %p")
    return dt_obj.isoformat()

def insert_data_to_influxdb(row):
    try:
        point = Point("covid_stats") \
            .tag("State", row['State/UnionTerritory']) \
            .time(convert_to_utc(row['Date'], row['Time']), WritePrecision.NS) \
            .field("Cured", int(row['Cured'])) \
            .field("Deaths", int(row['Deaths'])) \
            .field("Confirmed", int(row['Confirmed']))

        client.write_api().write(bucket=bucket, record=point)
    except Exception as e:
        print(f"Error occurred during insertion: {e}")

def parallel_insertion(data):
    try:
        with Pool() as pool:
            pool.map(insert_data_to_influxdb, data.to_dict('records'))

        print("Data insertion complete.")
    except Exception as e:
        print(f"Error occurred during parallel insertion: {e}")

try:
    data = pd.read_csv(csv_file)
    parallel_insertion(data)
finally:
    client.close()

Expected behavior

The values from the csv file should be added to the influx DB

Actual behavior

Started seeing below error and finally nothing was written to the DB:

AttributeError: 'WriteApi' object has no attribute '_subject'
Error occurred during insertion: Invalid value for `api_client`, must be defined.
Exception ignored in: <function WriteApi.__del__ at 0x10638e980>
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/influxdb_client/client/write_api.py", line 411, in __del__
    if self._subject:
       ^^^^^^^^^^^^^
AttributeError: 'WriteApi' object has no attribute '_subject'
Error occurred during insertion: Invalid value for `api_client`, must be defined.
Exception ignored in: <function WriteApi.__del__ at 0x10638e980>
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.11/site-packages/influxdb_client/client/write_api.py", line 411, in __del__
    if self._subject:
       ^^^^^^^^^^^^^

Additional info

minimal csv info:

image
bednar commented 8 months ago

The issue you're experiencing seems to be due to incorrect usage of the multiprocessing features. Multiprocessing in Python can be tricky, especially when dealing with connections and shared resources. To guide you towards a proper implementation, I recommend examining the example provided by the InfluxDB client for Python, which demonstrates a correct and efficient way to use multiprocessing for data import tasks. You can find the example here: Import Data Set Multiprocessing Example.

This example will show you how to structure your multiprocessing setup properly, especially in the context of handling InfluxDB connections and writing data.

If after consulting the example you still face issues or have any questions, feel free to ask for further assistance.