influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.62k stars 3.54k forks source link

Points in the same series being returned in separate tables (plus duplicate points with _all_ values matching) #24127

Open gazpachoking opened 1 year ago

gazpachoking commented 1 year ago

Steps to reproduce: More details in this forum topic

  1. I'm using a rollup task to write data to a new bucket.
  2. I wrote another flux query to put older data into the same format as this new bucket, and add that historical data to my new bucket.
  3. Query the new bucket for a single series, the resultant data comes back in two tables, even though both tables have matching tags/field name for all the columns in the group key. Query:
    from(bucket: "power_circuit_lores")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "kw")
    |> filter(fn: (r) => r["_field"] == "pair_min")
    |> filter(fn: (r) => r["circuit_pair_id"] == "3303")
    |> yield(name: "mean")

    Result: image

I think somehow the data must differ between the two series, even though it appears the same. During troubleshooting, I also ran into this issue:

  1. Query the data that normally produces two tables.
  2. Call group on the same group keys that were already being used. This results in all of the data in one table.
  3. Call sort based on _time, which results in my graphs looking like they are supposed to.
  4. Call to on this resultant data, in hopes that it overwrites existing data in the bucket making the query work properly from now on.
  5. Now I have 2 points in the same bucket with identical data, which are still returned in separate tables:
    from(bucket: "power_circuit_lores")
    |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_field"] == "ckt_max")
    |> filter(fn: (r) => r["circuit_id"] == "3304")
    |> filter(fn: (r) => r["_time"] == 2023-03-01T22:00:00.000Z)
    |> yield(name: "mean")
    #group,false,false,true,true,false,false,true,true,true,true,true,true,true
    #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string,string,string,string
    #default,mean,,,,,,,,,,,,
    ,result,table,_start,_stop,_time,_value,_field,_measurement,account_id,circuit_id,circuit_pair_id,facility_id,panel_id
    ,,0,2023-02-27T15:01:59.527182778Z,2023-03-06T15:01:59.527182778Z,2023-03-01T22:00:00Z,1.248,ckt_max,kw,52,3304,3304,2,349
    ,,1,2023-02-27T15:01:59.527182778Z,2023-03-06T15:01:59.527182778Z,2023-03-01T22:00:00Z,1.248,ckt_max,kw,52,3304,3304,2,349

    This also seems like a bug, as the documentation says that points with the same time and tags should overwrite the old data instead of storing a duplicate point.

Expected behavior: Get a single table returned.

Actual behavior: Two tables are returned, with identical data in all the group key columns.

Environment info:

gazpachoking commented 1 year ago

To do any work with the data in these buckets I always have to run group(mode: "except", columns: ["_time", "_value"]) or something similar, otherwise I get errors like:

runtime error @18:12-18:18: mean: aggregate found duplicate table with key: {_field=ckt_mean,_measurement=kw,_start=2023-02-05T19:05:06.774867596Z,_stop=2023-03-07T19:05:06.774867596Z,account_id=172,circuit_id=3756,circuit_pair_id=3756,facility_id=2,panel_id=384}
Vollbrecht commented 1 year ago

i got the same issue. started when upgrading from 2.6.0 to 2.6.1 . I feed the db from telegraph and it started doing it despite nothing else changed

XuZhen86 commented 1 year ago

I ran into this issue while migrating 23,225,584 data points from one bucket to another. Looks like the tags have an effect(?!)

The source bucket was working as intended, as this query produces 1 row with the count and no errors:

from(bucket: "EnergyRecords2")
    |> range(start: 2023-02-01T00:00:00Z, stop: 2023-07-11T20:15:00-04:00)
    |> filter(fn: (r) => r["_measurement"] == "power")
    |> filter(fn: (r) => r["input_id"] == "l12")
    |> count()
    |> yield()

This produces 1 series in bucket Dumpster, which is desired:

from(bucket: "EnergyRecords2")
    |> range(start: 2023-02-01T00:00:00Z, stop: 2023-07-11T20:15:00-04:00)
    |> filter(fn: (r) => r["_measurement"] == "power")
    |> filter(fn: (r) => r["input_id"] == "l12")
    |> map(
        fn: (r) =>
            ({
                _measurement: "circuit",
                _field: "power",
                _value: r["_value"],
                _time: r["_time"],
                circuit: "12",  // Only 1 tag.
            }),
    )
    |> to(bucket: "Dumpster1", org: "Organization")

This produces 2 series:

from(bucket: "EnergyRecords2")
    |> range(start: 2023-02-01T00:00:00Z, stop: 2023-07-11T20:15:00-04:00)
    |> filter(fn: (r) => r["_measurement"] == "power")
    |> filter(fn: (r) => r["input_id"] == "l12")
    |> map(
        fn: (r) =>
            ({
                _measurement: "circuit",
                _field: "power",
                _value: r["_value"],
                _time: r["_time"],
                circuit: "12",
                circuit_amps_max: "15",  // A second tag.
            }),
    )
    |> to(bucket: "Dumpster1", org: "Organization")

This also produces 2 series:

from(bucket: "EnergyRecords2")
    |> range(start: 2023-02-01T00:00:00Z, stop: 2023-07-11T20:15:00-04:00)
    |> filter(fn: (r) => r["_measurement"] == "power")
    |> filter(fn: (r) => r["input_id"] == "l12")
    |> map(
        fn: (r) =>
            ({
                _measurement: "circuit",
                _field: "power",
                _value: r["_value"],
                _time: r["_time"],
                circuit: "12",
                device_id: "left",  // A second tag.
            }),
    )
    |> to(bucket: "Dumpster1", org: "Organization")

The Dumpster1 bucket was cleared with influx delete command between runs. Also I tried reducing the time range to 7 days, but the issue persists.

Before that I did a giant query that migrates multiple series, which ran slowly but works as intended. This query uses map() with if statements, which produced the correct result even with the 2-month time range.

In addition, the CPU and disk IO patterns look different. Looks like the troubled queries were doing Read-Process-Write at the same time. The trouble-free query was using CPU almost exclusively until the very end where it had disk write IO.

from(bucket: "EnergyRecords2")
    |> range(start: 2023-02-01T00:00:00Z, stop: 2023-04-01T00:00:00Z)
    |> filter(fn: (r) => r["_measurement"] == "power")
    |> filter(
        fn: (r) =>
            r["input_id"] == "l1" or r["input_id"] == "l2" or r["input_id"] == "l3" or r["input_id"] == "l4"
                or
                r["input_id"] == "l5" or r["input_id"] == "l6" or r["input_id"] == "l7" or r["input_id"] == "l8"
                or
                r["input_id"] == "l9" or r["input_id"] == "l10" or r["input_id"] == "l11" or r["input_id"] == "l12"
                or
                r["input_id"] == "l13" or r["input_id"] == "l14" or r["input_id"] == "l15" or r["input_id"] == "l16",
    )
    |> map(
        fn: (r) =>
            if r["input_id"] == "l1" then
                {
                    _measurement: "circuit",
                    _field: "power",
                    _value: r["_value"],
                    _time: r["_time"],
                    circuit: "1",
                    circuit_amps_max: "30",
                    device_id: "left",
                    nick_name: "Dryer",
                    phase: "a",
                    phase_amps_max: "200",
                    unit: "mW",
                }
            else if r["input_id"] == "l2" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l3" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l4" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l5" then
                die(msg: "Unexpedted input_id l5")
            else if r["input_id"] == "l6" then
                die(msg: "Unexpedted input_id l6")
            else if r["input_id"] == "l7" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l8" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l9" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l10" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l11" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l12" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l13" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l14" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l15" then
                // Same as above but with different tag values.
            else if r["input_id"] == "l16" then
                die(msg: "Unexpedted input_id l16")
            else
                die(msg: "Unexpedted input_id"),
    )
    |> to(bucket: "Dumpster2", org: "Organization")

Some more observation, the troubled query was using vectorizedMap

the troubled query was using `vectorizedMap`

Whereas the trouble-free query was using map

trouble-free query was using `map`
brettbeeson commented 1 year ago

Hi!

I have a similiar problem which I described on the community site.

@gazpachoking, did you get any resolution on yours duplicate values issues?

My environment is:

    ubu18 5.19.0-46-generic #47~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Wed Jun 21 15:35:31 UTC 2 x86_64 x86_64 x86_64 GNU/Linux 
    Influx CLI dev (git: none) build_date: 2023-04-28T14:24:14Z
    InfluxDB v2.7.1 (git: 407fa622e9) build_date: 2023-04-28T13:24:27Z
PuzzleSoft commented 1 year ago

I think this might be related.

I'm doing a max() on a entire bucket and I get a several duplicated tables in the result:

image

0/3 and 1/5 are duplicates and several others in the result.

When doing another max() on the result, I get a "duplicated table error".

The flux is: from(bucket: "MK1runninghours") |> range(start: 0) |> filter(fn: (r) => r["tag"] == "H2") |> max() |> yield(name: "mean")

This bucket is filled by a daily task - that goes three days back to cover network outages and thereby recalculates the previous values.

H2 = basis |> map( fn: (r) => ({ _measurement: r._measurement, _time: r._time, main: r.main, process: r.process, _value: r.H2, _field: "val", tag: "H2", }), ) |> group( columns: [ "_measurement", "main", "process", "tag", "_field", ], ) |> aggregateWindow(every: 1h, fn: mean, createEmpty: true) |> drop(columns: ["_start", "_stop"]) |> to(bucket: "MK1runninghours")

janhuddel commented 11 months ago

I am facing the same problem. I want to migrate my data to a new bucket and also have the problem that data points from the same series result in different tables.

Does my migration project fail now or are there any workarounds?

XuZhen86 commented 11 months ago

I added a "map, fn, if" statement and it appears to resolve the issue. Please see above for an example. IIRC the issue is resolved as long as there's an "if" statement, even if it's a dummy "if" that never branches. The query would run slower and uses more memory(?)

janhuddel commented 11 months ago

Thank you for the advice about the dummy IF statement. That seems to be a good workaround.

gazpachoking commented 11 months ago

I've just been explicitly calling group and sort for every query on my data. That fixes the issue. Really annoying it's required, but I don't think we'll be getting a real fix in influxdb 2.0.

brettbeeson commented 9 months ago

Hi!

I understand Flux is moving to maintenance, but I reckon this should be fixed, as it's a fundamental function of the engine. For example, calling last() should not return two values!

@Anaisdg Could you advocate for a fix?

Example with last() and the workaround (as mentioned above) works in my case:

...
|> last()
// returns 2 values!

...
|> map( fn: (r) => ({r with _time: if exists r._time then r._time else r._time })  )
|> last()
// returns 1 value
brettbeeson commented 7 months ago

Arrrggghhh... this caught me again! Maybe it's a sign I should give up on Influx and migrate to TimeScaleDB?

aas-hydrologiq commented 1 month ago

I ran into this when introducing more tags into a bucket (from 1 tag to 3 tags), so I presume it's the underlying data structure causing this.

I found that the map and group/sort workarounds were massively decreasing the performance of the queries. An alternative workaround, which I've found to be more performant on my dataset is to make use of pivot and unpivot, an example is below.

import "experimental"
import "influxdata/influxdb/schema"

data |> schema.fieldsAsCols() |> experimental.unpivot()

I hope this will be fixed in flux but that seems unlikely. It would be good to understand the root cause and if there is a better workaround.