jonppe / influx_to_victoriametrics

Python InfluxDB to VictoriaMetrics exporter script
7 stars 2 forks source link

Error while migrating: #1

Open JasperE84 opened 1 year ago

JasperE84 commented 1 year ago

Hi there, thanks for sharing this project,

While trying to export my influxdb_v2 data I get this error:

Traceback (most recent call last):
  File "/home/myname/work/influx_to_victoriametrics/influx_export.py", line 168, in <module>
    main(vars(parser.parse_args()))
  File "/home/myname/work/influx_to_victoriametrics/influx_export.py", line 91, in main
    measurements_and_fields = [
  File "/home/myname/work/influx_to_victoriametrics/influx_export.py", line 92, in <listcomp>
    gr[0] for df in timeseries for gr in df.groupby(["_measurement", "_field"])
AttributeError: 'str' object has no attribute 'groupby'

If I enter this bit of code

    print(vars(timeseries))
    print(timeseries)
    for df in timeseries:
        print(df)

Before

 measurements_and_fields = [
        gr[0] for df in timeseries for gr in df.groupby(["_measurement", "_field"])
    ]

Output is:

{'_is_copy': None, '_mgr': BlockManager
Items: Index(['result', 'table', '_start', '_stop', '_time', '_value', '_field',
       '_measurement'],
      dtype='object')
Axis 1: Int64Index([0, 1, 2, 3, 4], dtype='int64')
ObjectBlock: slice(0, 1, 1), 1 x 5, dtype: object
NumericBlock: slice(1, 2, 1), 1 x 5, dtype: int64
DatetimeTZBlock: slice(2, 3, 1), 1 x 5, dtype: datetime64[ns, tzutc()]
DatetimeTZBlock: slice(3, 4, 1), 1 x 5, dtype: datetime64[ns, tzutc()]
DatetimeTZBlock: slice(4, 5, 1), 1 x 5, dtype: datetime64[ns, tzutc()]
NumericBlock: slice(5, 6, 1), 1 x 5, dtype: float64
ObjectBlock: slice(6, 7, 1), 1 x 5, dtype: object
ObjectBlock: slice(7, 8, 1), 1 x 5, dtype: object, '_item_cache': {}, '_attrs': {}, '_flags': <Flags(allows_duplicate_labels=True)>}
    result  table                    _start                            _stop                     _time     _value              _field   _measurement
0  _result      0 1970-01-01 00:00:00+00:00 2023-03-22 20:10:33.090618+00:00 2022-06-08 14:12:35+00:00  9175680.0    cumulativeEnergy     inverter01
1  _result      1 1970-01-01 00:00:00+00:00 2023-03-22 20:10:33.090618+00:00 2022-06-12 17:44:21+00:00    37250.0        currentPower     inverter01
2  _result      2 1970-01-01 00:00:00+00:00 2023-03-22 20:10:33.090618+00:00 2022-06-08 14:12:35+00:00    38790.0       realTimePower     inverter01
3  _result      3 1970-01-01 00:00:00+00:00 2023-03-22 20:10:33.090618+00:00 2022-06-13 22:15:00+00:00    24336.0     interval_energy  transformer01
4  _result      4 1970-01-01 00:00:00+00:00 2023-03-22 20:10:33.090618+00:00 2022-06-13 22:15:00+00:00    97344.0  interval_power_avg  transformer01
result
table
_start
_stop
_time
_value
_field
_measurement

Not a python expert here, but looks like df is a string which can't be grouped, instead of an object with _field and _measurement properties. Any idea how to fix this?

pki791 commented 6 months ago

Got same problem, do You solve it?

JasperE84 commented 6 months ago

I did actually Here's the diff from what's left of the solution in my work folder ;)

I think the key change was the brackets [ ] around timeseries object name. The rest of the changes apply some filtering for what I wanted to export.

     measurements_and_fields = [
-        gr[0] for df in timeseries for gr in df.groupby(["_measurement", "_field"])
+        gr[0] for df in [timeseries] for gr in df.where(df["_measurement"] == "transformer01").groupby(["_measurement", "_field"])
     ]

I made some further mods to dome some filtering and transformation on the export before import. Here's the full diff of my work folder, maybe its helpful to you

:~/work/influx_to_victoriametrics$ git diff -r HEAD
diff --git a/influx_export.py b/influx_export.py
index 663bb29..8ec41af 100755
--- a/influx_export.py
+++ b/influx_export.py
@@ -44,6 +44,7 @@ def get_influxdb_lines(df: pd.DataFrame) -> str:
     """
     line = df["_measurement"]

+
     for col_name in get_tag_cols(df):
         line += ("," + col_name + "=") + df[col_name].astype(str)

@@ -55,7 +56,7 @@ def get_influxdb_lines(df: pd.DataFrame) -> str:
         + " "
         + df["_time"].astype(int).astype(str)
     )
-    return "\n".join(line)
+    return "\n".join(line).replace("interval_energy","intervalEnergy").replace("interval_power_avg","intervalPowerAvg")

 def main(args: Dict[str, str]):
@@ -80,26 +81,37 @@ def main(args: Dict[str, str]):
     |> first()"""
     timeseries: List[pd.DataFrame] = query_api.query_data_frame(first_in_series)

+    # print(vars(timeseries))
+    # print(timeseries)
+    for dfa in [timeseries]:
+        print(f"dfa: {dfa}")
+
+
+
     # get all unique measurement-field pairs and then fetch and export them one-by-one.
     # With really large databases the results should be possibly split further
     # Something like query_data_frame_stream() might be then useful.
     measurements_and_fields = [
-        gr[0] for df in timeseries for gr in df.groupby(["_measurement", "_field"])
+        gr[0] for df in [timeseries] for gr in df.where(df["_measurement"] == "transformer01").groupby(["_measurement", "_field"])
     ]
     print(f"Found {len(measurements_and_fields)} unique time series")
     for meas, field in measurements_and_fields:
-        print(f"Exporting {meas}_{field}")
-        whole_series = f"""
-        from(bucket: "{bucket}")
-        |> range(start: 0, stop: now())
-        |> filter(fn: (r) => r["_measurement"] == "{meas}")
-        |> filter(fn: (r) => r["_field"] == "{field}")
-        """
-        df = query_api.query_data_frame(whole_series)
-
-        line = get_influxdb_lines(df)
-        # "db" is added as an extra tag for the value.
-        requests.post(f"{url}/write?db={bucket}", data=line)
+        if meas == "transformer01":
+            print(f"Exporting {meas}_{field}")
+            whole_series = f"""
+                from(bucket: "{bucket}")
+                |> range(start: 2022-06-15T00:00:00.000000000Z, stop: now())
+                |> filter(fn: (r) => r["_measurement"] == "{meas}")
+                |> filter(fn: (r) => r["_field"] == "{field}")
+            """
+            df = query_api.query_data_frame(whole_series)
+
+
+            line = get_influxdb_lines(df)
+            # "db" is added as an extra tag for the value.
+            print(line)
+            resp = requests.post(f"{url}/write?db={bucket}&nocache=1", data=line)
+            #sys.exit()

 if __name__ == "__main__":
pki791 commented 6 months ago

Thank You, that helped me much. I forked and add some more changes which helped me to migrate.

Updated

My fork is here https://github.com/pki791/influx_to_victoriametrics

frli4797 commented 3 months ago

Did some additional work on the above, also supporting chunks, dry-runs and a few other things. Still work in progress, but seems to manage some oddly formed data as well, since the api query calls sometimes returns a list and sometimes returns a DataFrame.

Didn't do it as a fork, as then changes to the structure just became too many and big.

https://github.com/frli4797/influxv2tovm

Schtallone commented 3 weeks ago

Did some additional work on the above, also supporting chunks, dry-runs and a few other things. Still work in progress, but seems to manage some oddly formed data as well, since the api query calls sometimes returns a list and sometimes returns a DataFrame.

Didn't do it as a fork, as then changes to the structure just became too many and big.

https://github.com/frli4797/influxv2tovm

Hello,

trying to migrate influxdb 2.x data to VM, I got the following error messages: `

Dry run True Pivot False Finding unique time series. Traceback (most recent call last): File "/homeassistant/pyscript/influxv2tovm.py", line 343, in <module> main(vars(parser.parse_args())) File "/homeassistant/pyscript/influxv2tovm.py", line 268, in main migrator.migrate() File "/homeassistant/pyscript/influxv2tovm.py", line 95, in migrate measurements_and_fields = self.__find_all_measurements() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/homeassistant/pyscript/influxv2tovm.py", line 188, in __find_all_measurements measurements_and_fields.update(df[self.__measurement_key].unique()) ~~^^^^^^^^^^^^^^^^^^^^^^^^ TypeError: string indices must be integers, not 'str' Exception ignored in: <function InfluxMigrator.__del__ at 0x7f38143d1580> Traceback (most recent call last): File "/homeassistant/pyscript/influxv2tovm.py", line 78, in __del__ self.__progress_file.close() ^^^^^^^^^^^^^^^^^^^^ AttributeError: 'InfluxMigrator' object has no attribute '_InfluxMigrator__progress_file

I tried to migrate a special bucket with filtered Data (time frame) and dropped tag values.

When I try to migrate an acual active filled bucket without any filtering or shaping, the dry run works and I get the following end-notice:

Exception ignored in: <function InfluxMigrator.__del__ at 0x7f9c8b5f9580>ading. Total: 524.3 kB (1/1) Traceback (most recent call last): File "/homeassistant/pyscript/influxv2tovm.py", line 78, in __del__ self.__progress_file.close() ^^^^^^^^^^^^^^^^^^^^ AttributeError: 'InfluxMigrator' object has no attribute '_InfluxMigrator__progress_file' All done

Thanks for any help on this

Greets