influxdata / influxdb-gds-connector

Google Data Studio Connector for InfluxDB.
MIT License
11 stars 4 forks source link

InfluxDB Connector Issue #8

Closed gefaila closed 3 years ago

gefaila commented 4 years ago

Very excited to try this connector. I have a bucket with a lot of data in. It seemed to make the connector 'fall over'

Community Connector Error There was an error caused by the community connector. Please report the issue to the provider of this community connector if this issue persists.

Connector details "GetFields from: https://eu-central-1-1.aws.cloud2.influxdata.com" returned an error:Exception: Request failed for https://eu-central-1-1.aws.cloud2.influxdata.com returned code 400. Truncated server response: {"code":"invalid","message":"runtime error @1:110-1:143: drop: schema collision detected: column \"_value\" is both of type int and float"} (use muteHttpExceptions option to examine full response)

Error ID: fb5b13d2

I'm quite experienced on InfluxDB and there is nothing fundamentally wrong with having _value as being int and float for different _field values. In fact it's fairly fundamental. There must be something that the connector is assuming about InfluxDB 2.0 data that is (in general) not always true.

Any ideas how I'd move forward to use this excellent tool?

bednar commented 4 years ago

Hi @gefaila,

I'm quite experienced on InfluxDB and there is nothing fundamentally wrong with having _value as being int and float for different _field values.

Yes, but Google Data Studio requires static schema. The connector internally uses pivot function to determine schema for GDS.

Schema query:

from(bucket: "my-bucket") 
    |> range(start: time(v: 1)) 
    |> filter(fn: (r) => r["_measurement"] == "circleci") 
    |> drop(columns: ["tag1", "tag2", "tag3"]) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
    |> limit(n:1)

You could prepare Task that normalize your data into new Measurement. See - https://github.com/influxdata/influxdb-gds-connector/tree/master/examples#performance

Regards

gefaila commented 4 years ago

Hi @bednar, Thanks for engaging here! πŸ‘

The Schema query runs fine on the data in my bucket, It produces a table each of the hosts I have in the database.

2020-11-09-10-50_chronograf_data.xlsx

Is there anything wrong with this data it's returning to GDS?

If, not, are there any documented guidelines for what this data should conform to?

I checked out the link you sent above

You could prepare Task that normalize your data into new Measurement. See - https://github.com/influxdata/influxdb-gds-connector/tree/master/examples#performance

However, there is no actual definition of acceptable and unacceptable data, Just a single example of a very long flux query that is apparently returning something that is OK for GDS.

Maybe some clarification is needed for the general case? Thanks

bednar commented 4 years ago

Hi @gefaila,

The schema query should produce only one table, because the GDS needs consistent schema.

Based on your export 2020-11-09-10-50_chronograf_data.xlsx you missed add dropping of tags. Try this:

bucket = "my-bucket"
measurement = "my-measurement"
tags = ["host", "method"]

from(bucket: bucket) 
    |> range(start: time(v: 1)) 
    |> filter(fn: (r) => r["_measurement"] == measurement) 
    |> drop(columns: tags) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
    |> limit(n:1)

You could find all your tags by:

import "influxdata/influxdb/v1"

bucket = "my-bucket"
measurement = "gds"

v1.tagKeys(
  bucket: bucket,
  predicate: (r) => r._measurement == measurement,
  start: duration(v: uint(v: 1970-01-01) - uint(v: now()))
)
|> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field")

However, there is no actual definition of acceptable and unacceptable data, Just a single example of a very long flux query that is apparently returning something that is OK for GDS.

Maybe some clarification is needed for the general case?

Yeah, you are right. We need clarify how to data should looks like to use in GDS.

Could you export result of schema query to CSV?

Regards.

gefaila commented 4 years ago

In that case I'd like to create a filter in the GDS connection page that allows me to select the tag filters that I'm interested in. That way I can drop the tags. Otherwise it's unpredictable which 'host' I'd end up with and that's not useful.

Furthermore, the querying of InfluxDB data with from(bucket: "my-bucket") |> range(start: time(v: 1)) is not great. Our database is large and there's no need for this. It would be way more useful to additionally select a time that I'm interested in (e.g. last month, year etc). But that query looks over all schemas and data since 1970! Not very sane default behaviour. But it's a great tool and I'm looking forward to being able to create dashboards into my data. πŸ‘

gefaila commented 4 years ago

The schema query should produce only one table, because the GDS needs consistent schema.

A good way of giving the user the ability to make sure the data returned was suitable for GDS would be to allow the user to write a flux query that returned data in the format GDS needs it. E.g. one table with agreed structure.

Presumably, users of the tool are able to do this, otherwise they wouldn't be connecting InfluxDB? :-)

bednar commented 4 years ago

Furthermore, the querying of InfluxDB data with from(bucket: "my-bucket") |> range(start: time(v: 1)) is not great.

The query with this range(start: time(v: 1)) is only used for determine the schema.

For getting data we use range specified in report - https://support.google.com/datastudio/answer/9272806?hl=en. By default, the date range provided will be the last 28 days excluding today. If a user applies a date range filter for a report, then the date range provided will reflect the user selection.

A good way of giving the user the ability to make sure the data returned was suitable for GDS would be to allow the user to write a flux query that returned data in the format GDS needs it. E.g. one table with agreed structure.

We use two type of queries:

GetSchema

from(bucket:"my-bucket") 
    |> range(start: time(v: 1)) 
    |> filter(fn: (r) => r["_measurement"] == "my-measurement") 
    |> drop(columns: tags) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
    |> limit(n:1)

GetData

from(bucket: "my-bucket") 
   |> range(start: 2020-04-20T00:00:00Z, stop: 2020-05-20T23:59:59Z)
   |> filter(fn: (r) => r["_measurement"] == "my-measurement") 
   |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

What do you think about ability to specify Flux filter?

You will be able to specify something like: r["host"] == "R1901_1742" and this Flux filter will be append after |> filter(fn: (r) => r["_measurement"] == "my-measurement") in GetSchema and GetData queries.

Regards

gefaila commented 3 years ago

What do you think about ability to specify Flux filter?

You will be able to specify something like: r["host"] == "R1901_1742" and this Flux filter will be append after |> filter(fn: (r) => r["_measurement"] == "my-measurement") in GetSchema and GetData queries.

I quite like that.
The 'Data Explorer' feature does already what I think GDS needs to do ... i.e. allow the user to select filters for tags that deliver a data-set in the form that GDS needs.

image

gefaila commented 3 years ago

However, stepping back a bit, I see no reason why I would not want to build a dashboard that allowed the filtering to happen 'afterwards'. Again, this is parallel to the Chronograf dashboards with a variable.

On one of my dashboards I can select the host that I'm interested in viewing. image

To me that seems much more sensible and usable functionality to aim for in GDS.

What do you think? Possible? πŸ‘

bednar commented 3 years ago

What do you think? Possible?

Yeah, of course. We use this type of filter here: https://datastudio.google.com/s/p19vh-b82Sw - "Country Filter".

Do you think that update our docs to clarify required schema for GDS will be enough? Something like:

Required schema

The Google Data Studio requires know schema of your data. Each column requires a data type that should be consistent across whole table (= measurement in InfluxDB). For that reason the InfluxDB Connector needs to determine schema from your InfluxDB by this Flux query:

import "influxdata/influxdb/v1"

bucket = "my-bucket"
measurement = "my-measurement"

tags = v1.tagKeys(
  bucket: bucket,
  predicate: (r) => r._measurement == measurement,
  start: duration(v: uint(v: 1970-01-01) - uint(v: now()))
) |> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field")
  |> findColumn(fn: (key) => true, column: "_value")

from(bucket: bucket) 
  |> range(start: time(v: 1)) 
  |> filter(fn: (r) => r["_measurement"] == measurement) 
  |> drop(fn: (column) => contains(value: column, set: tags))
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
  |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
  |> limit(n:1)

Please be ensure that the query above could be successfully use in your InfluxDB.

Links

Regrads

gefaila commented 3 years ago

What do you think about ability to specify Flux filter?

You will be able to specify something like: r["host"] == "R1901_1742" and this Flux filter will be append after |> filter(fn: (r) => r["_measurement"] == "my-measurement") in GetSchema and GetData queries.

Well I don't see why it's necessary for the user to specify flux filters.

You've already confirmed that filtering is possible after building the dashboard

Yeah, of course. We use this type of filter here: https://datastudio.google.com/s/p19vh-b82Sw - "Country Filter".

So why compel the user to filter so that you can even make a connection!

The fact is that a normal bucket will contain data with multiple tags. Therefore your schema will not normally work. What the user would obviously like is to be able to build a GDS dashboard from the data they have. They don't want to have to create copies of the data in other buckets, simply because GDS can't cope with multiple tags.

gefaila commented 3 years ago

Do you think that update our docs to clarify required schema for GDS will be enough? Something like:

Required schema The Google Data Studio requires know schema of your data. Each column requires a data type that should be consistent across whole table (= measurement in InfluxDB). For that reason the InfluxDB Connector needs to determine schema from your InfluxDB by this Flux query:

No not really. I'd just like the GDS to work on my InfluxDB tables as-is. The functionality you are implementing is awesome, but it's going to be limited in application because it forces users to create copies of the data in other buckets that conform to the GDS requirements, simply because GDS can't cope with _fields being of different types for different tag sets.

I think the real problem is this:

The Google Data Studio requires know schema of your data. Each column requires a data type that should be consistent across whole table (= measurement in InfluxDB)

Actually InfluxDB only requires that data types are consistent within tables as defined by a set of tags. So actually it's acceptable to InfluxDB that: Valve_Open is a boolean when _measurement="data",host="1643" but is an integer when _measurement="data",host=1235

The root assumption that the data type should be consistent for all tables returned where measurement = "data" is not what InfluxDB assumes at all.

gefaila commented 3 years ago

What I don't quite understand is the actual tags that GDS doesn't like in my data tables. And I can't run the flux query you specify for the reasons I give above. i.e. you are forcing a search through all data from 1970 to current day. Apart from the fact that this takes longer than the timeout, the query will in most cases run out of memory. It's impossible in the general case where people are using InfluxDB for what it's designed for GB of data every day.

gefaila commented 3 years ago

I can only get the flux query to run by considering less time:

import "influxdata/influxdb/v1"

bucket = "PLC_Router_Data" measurement = "data"

tags = v1.tagKeys( bucket: bucket, predicate: (r) => r._measurement == measurement, start: -3d ) |> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field") |> findColumn(fn: (key) => true, column: "_value")

from(bucket: bucket) |> range(start: -3d) |> filter(fn: (r) => r["_measurement"] == measurement) |> drop(fn: (column) => contains(value: column, set: tags)) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) |> limit(n:1)

Then this delivers a single table

Even querying 30d causes InfluxDB cloud to run out of memory. This will be the case for most customers who have significant data in their tables

gefaila commented 3 years ago

Querying 30days runs out of memory Imagine querying 300 days. You can forget 3000 days. But your flux query goes through 18,578 days!!!! You've got to be kidding

bednar commented 3 years ago

What I don't quite understand is the actual tags that GDS doesn't like in my data tables.

The schema collision detected: column "_value" is both of type int and float" is response from InfluxDB to the schema query.

And I can't run the flux query you specify for the reasons I give above. i.e. you are forcing a search through all data from 1970 to current day. Apart from the fact that this takes longer than the timeout, the query will in most cases run out of memory. It's impossible in the general case where people are using InfluxDB for what it's designed for GB of data every day.

The key problem here is that the Flux doesn't have show-field-keys as IFQL. The our schema query is just workaround for this.

Querying 30days runs out of memory Imagine querying 300 days. You can forget 3000 days. But your flux query goes through 18,578 days!!!! You've got to be kidding

Yes, it scan whole your measurement :(

Even querying 30d causes InfluxDB cloud to run out of memory.

Try to insert this line protocols into database:

my-measurement,tag1=a field1=10 1
my-measurement,tag2=a field2=10 1605189244

If we specify the range to -30d we loose the field1.

This will be the case for most customers who have significant data in their tables.

So we have to add an advance option into Connector configuration that will limit range in Schema query.

bednar commented 3 years ago

New option in Configuration screen:

_The range that is used for determine your InfluxDB data schema. - https://todo_link_to_doc_ Schema Range -30d

What do you think?

gefaila commented 3 years ago

It's the

pivot()

that's taking the time and killing the query time and memory.

But the good news is you don't need it. The following flux query returns the necessary data and it completes in only a few sec while scanning all data.

import "influxdata/influxdb/v1" bucket = "my_bucket" measurement = "my_measurement" filter_tags = ["host","_field","_value"] // in addition to _field and _value allow the user to keep some tags to filter by later from(bucket: bucket) |> range(start: -100000d) |> filter(fn: (r) => r["_measurement"] == measurement) |> keep(columns: filter_tags) |> unique(column: "_field")

It contains tables that show data types for everything in the database _field,_value, #datatype, tags

Can you work with that to build the schema you need?

bednar commented 3 years ago

@gefaila nice catch πŸ‘ Thanks!

I think the following query could be a solution:

import "influxdata/influxdb/v1"

bucket = "my-bucket"
measurement = "my-measurement"
start_range = duration(v: uint(v: 1970-01-01) - uint(v: now()))

v1.tagKeys(
  bucket: bucket,
  predicate: (r) => r._measurement == measurement,
  start: start_range
) |> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field")
  |> yield(name: "tags")

from(bucket: bucket)
  |> range(start: start_range)
  |> filter(fn: (r) => r["_measurement"] == measurement)
  |> keep(fn: (column) => column == "_field" or column == "_value")
  |> unique(column: "_field")
  |> yield(name: "fields")

Could you try it with your data? If you could share a result of query it will be awesome.

gefaila commented 3 years ago

It works! πŸ‘ But there is no need for the

start_range = duration(v: uint(v: 1970-01-01) - uint(v: now()))

you only need

start_range =-1000y

For me this returns the following

2020-11-13-10-12_chronograf_data.zip

gefaila commented 3 years ago

Hi @bednar How's things progressing? Were you able to use the returned data of that query to construct a usable schema?

bednar commented 3 years ago

Hi @gefaila,

I want start works on this as soon as possible ... probably at Thursday or Friday.

Thanks a lot with your help, the query is fine and we will use it πŸ‘

Regards

bednar commented 3 years ago

Hi @gefaila,

you could track progress at #9

Regards

gefaila commented 3 years ago

Really really cool! Thanks for delivering this. I think lots of people will love it!

I enjoyed oiling the creative cogs!!

Cheers Andrew

On Fri, 20 Nov 2020 at 09:47, Jakub BednΓ‘Ε™ notifications@github.com wrote:

Hi @gefaila,

you could track progress at #9

Regards

β€” You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or unsubscribe.

bednar commented 3 years ago

Hi @gefaila,

there is prepared a test version of the Connector:

https://datastudio.google.com/u/0/datasources/create?connectorId=AKfycbySDF4eD7wmA_awZ6aoCwENuXs1Opw_T0DIJ8F-MVI

Could you test it with your schema?

gefaila commented 3 years ago

Hi Jacub, I'm sure it's a small thing .. but it's doesn't quite yet work.

I get this in return:

Community Connector Error

There was an error caused by the community connector. Please report the issue to the provider of this community connector if this issue persists.

Connector details

"GetFields from: https://eu-central-1-1.aws.cloud2.influxdata.com" returned an error:Exception: Argument too large: value

Error ID: 943f700f

bednar commented 3 years ago

It is caused by too large schema. We have to change how we cache the produced schema.

gefaila commented 3 years ago

It is caused by too large schema. We have to change how we cache the produced schema. ...

I see. I think my schema size is probably average for someone who is using InfluxDB in a real-world context.

bednar commented 3 years ago

Hi @gefaila,

the commit https://github.com/influxdata/influxdb-gds-connector/pull/9/commits/118edfef9197edb111b5d5307108a891c9f7938d fixes: Argument too large: value

Could you try the fixed version?

https://datastudio.google.com/u/0/datasources/create?connectorId=AKfycbySDF4eD7wmA_awZ6aoCwENuXs1Opw_T0DIJ8F-MVI

I think my schema size is probably average for someone who is using InfluxDB in a real-world context.

It is true, but Google Data Studio is like other visualisation and analytics tool - their loves well-structured data.

Here is nice article from Tableau - https://www.tableau.com/learn/get-started/data-structure

gefaila commented 3 years ago

I think influx tables fulfill all of that.

Can you explain what GDS needs the influx data to conform to?

Do you have an example influx bucket that conforms to this?

Can you give us a list that outlines in plain language the restrictions on influx buckets? E.g. 1) all records must have ..... 2) ...

gefaila commented 3 years ago

I'm failing to see how our industrial IIOT data doesn't conform to this. Is there a specific problem that our data doesn't conform to that you can point to and clarify?

bednar commented 3 years ago

From my experience with GDS there are these constrains:

1. Avoid unnecessary data

2. Avoid null values

3. Avoid data blending

4. Data range

start = -28d stop = now()

from(bucket: bucket) |> range(start: start, stop: stop) |> filter(fn: (r) => r["_measurement"] == measurement) |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")



Thanks you @gefaila to help us to improve our connector. Thanks again πŸ‘
gefaila commented 3 years ago

Hi @bednar ! We meet again. You are very active on Influx! So I've come back to try the InfluxDB connector. I was wondering if you resolved the issue here. It seems it's not resolved image

As before, we have IoT data and of course some of it is Boolean and some is Float and some are integers. Our use is normal for InfluxDB and our bucket is normal for InfluxDB. So we can say our database is "normal".

But the connector doesn't like this "normal" Bucket.

But I'd really like to use your connector.

Can we fix it?

gefaila commented 3 years ago

I'm sure you may ask what data is yielded by the query above ....

bucket = "my-bucket"
measurement = "my-repository"

start = -28d
stop = now()

from(bucket: bucket) 
 |> range(start: start, stop: stop) 
 |> filter(fn: (r) => r["_measurement"] == measurement) 
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Gives the following file. 2021-10-12_09_59_influxdb_data.zip

It would be better if you could let the customer write the flux so that the data came in the way your connector needs it. Is that possible?

bednar commented 3 years ago

Hi @bednar ! We meet again. You are very active on Influx! So I've come back to try the InfluxDB connector. I was wondering if you resolved the issue here. It seems it's not resolved image

The https://github.com/influxdata/influxdb-gds-connector/pull/9 improve a schema query and currently is in approving. You can try this version by following link: https://datastudio.google.com/u/0/datasources/create?connectorId=AKfycbySDF4eD7wmA_awZ6aoCwENuXs1Opw_T0DIJ8F-MVI

As before, we have IoT data and of course some of it is Boolean and some is Float and some are integers. Our use is normal for InfluxDB and our bucket is normal for InfluxDB. So we can say our database is "normal".

But the connector doesn't like this "normal" Bucket.

But I'd really like to use your connector.

Can we fix it?

There is a problem with requirements from GDS. The GDS expects static tabular schema. So we are not able to supports scheme where field has a different types.

I'm sure you may ask what data is yielded by the query above ....

bucket = "my-bucket"
measurement = "my-repository"

start = -28d
stop = now()

from(bucket: bucket) 
 |> range(start: start, stop: stop) 
 |> filter(fn: (r) => r["_measurement"] == measurement) 
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Gives the following file. 2021-10-12_09_59_influxdb_data.zip

It would be better if you could let the customer write the flux so that the data came in the way your connector needs it. Is that possible?

Currently we don't support this type of configuration. How will looks your query according to your provided data - 2021-10-12_09_59_influxdb_data.zip?