influxdata / influxdb-client-go

InfluxDB 2 Go Client
MIT License
611 stars 116 forks source link

WriteAPI writes to database in a very slow rate #333

Closed fthdrmzzz closed 2 years ago

fthdrmzzz commented 2 years ago

Steps to reproduce: List the minimal actions needed to reproduce the behavior.

  1. ... Run influxdb in a docker container
  2. ... Using blocking WriteAPI send 250 points.
  3. ... Using QueryAPI execute flux Query to get # of rows.

Expected behavior: 250 expected to be retrieved as # of rows.

Actual behavior: Much less number is retrieved. When I query influxdb by hand on the UI, I observed that the points are saved to the database one by one at a very slow rate.

Specifications:

The way I write 250 points:

writeAPI := repo.client.WriteAPIBlocking(repo.cfg.Org, repo.cfg.Bucket)
err = writeAPI.WritePoint(context.Background(), pts...)
return err

The way I query db:

rowCount    = fmt.Sprintf(`from(bucket: "%s") 
|> range(start: -1h) 
|> filter(fn: (r) => r["_measurement"] == "messages")
|> filter(fn: (r) => r["_field"] == "dataValue" or r["_field"] == "stringValue" or r["_field"] == "value" or r["_field"] == "boolValue" or r["_field"] == "sum" )
|> group(columns: ["_measurement"])
|> count()
|> yield(name: "count")`, repoCfg.Bucket)

queryAPI := client.QueryAPI(repoCfg.Org)

// get QueryTableResult
result, err := queryAPI.Query(context.Background(), rowCount)
vlastahajek commented 2 years ago

All points are sent in a single call. It takes 28ms to write 1441 simple points (two tags, two fields) to InfluxDB 2.20 in Docker Desktop running on a laptop with Windows 10.

Did you set a timestamp for each point? If not, the server assigns a timestamp and it can be the same for several points and the last wins.

fthdrmzzz commented 2 years ago

I set a timestamp for each point in the code,

Here How I prepare the points


    var pts []*write.Point
    for _, msg := range msgs {
        tgs, flds := senmlTags(msg), senmlFields(msg)

        sec, dec := math.Modf(msg.Time)
        t := time.Unix(int64(sec), int64(dec*(1e9)))

        pt := influxdb2.NewPoint("messages", tgs, flds, t)
        pts = append(pts, pt)
    }

    return pts, nil
vlastahajek commented 2 years ago

And the time of points are in the range of the last hour, as set in the query?

fthdrmzzz commented 2 years ago

Yes, I also tried different variations. I tried querying for last 6 hours and get the same result.

I tried writing the points by non-blocking api and got the same results.

vlastahajek commented 2 years ago

OK. Does WritePrecision is left set to Nanoseconds? Can you enable debug and check data that are sent?

client := influxdb2.NewClientWithOptions("http://localhost:8086", "my-token", influxdb2.DefaultOptions().SetLogLevel(log.DebugLevel))
fthdrmzzz commented 2 years ago

I did not do such configuration by hand. If It is not default It is not set. I am checking the sent data as you suggested

fthdrmzzz commented 2 years ago

Here you can find the debug output:

2022/06/07 16:38:58 influxdb2client I! HTTP POST req to http://localhost:55001/api/v2/buckets

2022/06/07 16:38:58 influxdb2client D! Writing batch: messages,channel=45,name=test\ name,publisher=2580,subtopic=topic protocol="http",unit="km",updateTime=5456565466,value=5 1654609138716399192
messages,channel=45,name=test\ name,publisher=2580 boolValue=true,protocol="http",unit="km",updateTime=5456565466 1654609139716399192
messages,channel=45,name=test\ name,publisher=2580 protocol="http",stringValue="value",unit="km",updateTime=5456565466 1654609140716399192
...
>> total 250 message lines
...
messages,channel=45,name=test\ name,publisher=2580 dataValue="base64",protocol="http",unit="km",updateTime=5456565466 1654609141716399192
messages,channel=45,name=test\ name,publisher=2580 protocol="http",sum=42,unit="km",updateTime=5456565466 1654609142716399192

2022/06/07 16:38:58 influxdb2client I! HTTP POST req to http://localhost:55001/api/v2/write?bucket=test-bucket&org=test&precision=ns
2022/06/07 16:39:08 influxdb2client D! Query: {"dialect":{"annotations":["datatype","group","default"],"delimiter":",","header":true},"query":"from(bucket: \"test-bucket\") \n\t|\u003e range(start: -1h) \n\t|\u003e filter(fn: (r) =\u003e r[\"_measurement\"] == \"messages\")\n\t|\u003e filter(fn: (r) =\u003e r[\"_field\"] == \"dataValue\" or r[\"_field\"] == \"stringValue\" or r[\"_field\"] == \"value\" or r[\"_field\"] == \"boolValue\" or r[\"_field\"] == \"sum\" )\n\t|\u003e group(columns: [\"_measurement\"])\n\t|\u003e count()\n\t|\u003e yield(name: \"count\")","type":"flux"}
2022/06/07 16:39:08 influxdb2client I! HTTP POST req to http://localhost:55001/api/v2/query?org=test

If I'm checking the correct query, here It is set to nanoseconds.

2022/06/07 16:38:58 influxdb2client I! HTTP POST req to http://localhost:55001/api/v2/write?bucket=test-bucket&org=test&precision=ns
vlastahajek commented 2 years ago

Does your data excerpt show the first 3 and the last 2 rows?

If you see 250 data rows and there is no the same timestamp with the same line, there must be 250 rows in the DB, for sure. How many rows count the query returns?

fthdrmzzz commented 2 years ago

It returns 250 rows. I checked that timestamp of rows are unique.

I want to point this out: It gives 250 rows in the db eventually. But It does this in a very slow rate so that my test fails. In the debug log you can see the query.

Query log is after 250 lines of message log. However, query returns some unexpected number (11 for this run). If I wait and query again, I can see the number of messages increasing. It reaches to 250 eventually.

vlastahajek commented 2 years ago

As you are using the blocking write client, at the time the write call completes and the application continues with the query, the server got all data. Can you check InfluxDB server logs, if there is any useful information?

Where are the docker host and client? Are they on the same machine?

fthdrmzzz commented 2 years ago

InfluxDB is in a container in my machine.

Client is directly run in my machine.

Im not sure if this is the correct way to share but here is the logs of the influxdb If its wrong please tell me so I can edit.

ts=2022-06-08T08:53:40.733824Z lvl=info msg="Welcome to InfluxDB" log_id=0axd9W~G000 version=v2.2.0 commit=a2f8538837 build_date=2022-04-06T17:36:40Z

ts=2022-06-08T08:53:40.733864Z lvl=warn msg="nats-port argument is deprecated and unused" log_id=0axd9W~G000

ts=2022-06-08T08:53:40.741677Z lvl=info msg="Resources opened" log_id=0axd9W~G000 service=bolt path=/var/lib/influxdb2/influxd.bolt

ts=2022-06-08T08:53:40.741852Z lvl=info msg="Resources opened" log_id=0axd9W~G000 service=sqlite path=/var/lib/influxdb2/influxd.sqlite

ts=2022-06-08T08:53:40.743708Z lvl=info msg="Bringing up metadata migrations" log_id=0axd9W~G000 service="KV migrations" migration_count=19

ts=2022-06-08T08:53:40.920191Z lvl=info msg="Bringing up metadata migrations" log_id=0axd9W~G000 service="SQL migrations" migration_count=5

ts=2022-06-08T08:53:40.953094Z lvl=info msg="Using data dir" log_id=0axd9W~G000 service=storage-engine service=store path=/var/lib/influxdb2/engine/data

ts=2022-06-08T08:53:40.953225Z lvl=info msg="Compaction settings" log_id=0axd9W~G000 service=storage-engine service=store max_concurrent_compactions=1 throughput_bytes_per_second=50331648 throughput_bytes_per_second_burst=50331648

ts=2022-06-08T08:53:40.953267Z lvl=info msg="Open store (start)" log_id=0axd9W~G000 service=storage-engine service=store op_name=tsdb_open op_event=start

ts=2022-06-08T08:53:40.953350Z lvl=info msg="Open store (end)" log_id=0axd9W~G000 service=storage-engine service=store op_name=tsdb_open op_event=end op_elapsed=0.084ms

ts=2022-06-08T08:53:40.953413Z lvl=info msg="Starting retention policy enforcement service" log_id=0axd9W~G000 service=retention check_interval=30m

ts=2022-06-08T08:53:40.953502Z lvl=info msg="Starting precreation service" log_id=0axd9W~G000 service=shard-precreation check_interval=10m advance_period=30m

ts=2022-06-08T08:53:40.956564Z lvl=info msg="Starting query controller" log_id=0axd9W~G000 service=storage-reads concurrency_quota=1024 initial_memory_bytes_quota_per_query=9223372036854775807 memory_bytes_quota_per_query=9223372036854775807 max_memory_bytes=0 queue_size=1024

ts=2022-06-08T08:53:40.961421Z lvl=info msg="Configuring InfluxQL statement executor (zeros indicate unlimited)." log_id=0axd9W~G000 max_select_point=0 max_select_series=0 max_select_buckets=0

ts=2022-06-08T08:53:40.980470Z lvl=info msg=Listening log_id=0axd9W~G000 service=tcp-listener transport=http addr=:9999 port=9999

ts=2022-06-08T08:53:40.980862Z lvl=info msg=Starting log_id=0axd9W~G000 service=telemetry interval=8h

User Organization Bucket

test test test-bucket

ts=2022-06-08T08:53:42.646520Z lvl=info msg="Welcome to InfluxDB" log_id=0axd9dTW000 version=v2.2.0 commit=a2f8538837 build_date=2022-04-06T17:36:40Z

ts=2022-06-08T08:53:42.646766Z lvl=warn msg="nats-port argument is deprecated and unused" log_id=0axd9dTW000

ts=2022-06-08T08:53:42.655675Z lvl=info msg="Resources opened" log_id=0axd9dTW000 service=bolt path=/var/lib/influxdb2/influxd.bolt

ts=2022-06-08T08:53:42.656286Z lvl=info msg="Resources opened" log_id=0axd9dTW000 service=sqlite path=/var/lib/influxdb2/influxd.sqlite

ts=2022-06-08T08:53:42.661727Z lvl=info msg="Checking InfluxDB metadata for prior version." log_id=0axd9dTW000 bolt_path=/var/lib/influxdb2/influxd.bolt

ts=2022-06-08T08:53:42.662162Z lvl=info msg="Using data dir" log_id=0axd9dTW000 service=storage-engine service=store path=/var/lib/influxdb2/engine/data

ts=2022-06-08T08:53:42.662208Z lvl=info msg="Compaction settings" log_id=0axd9dTW000 service=storage-engine service=store max_concurrent_compactions=1 throughput_bytes_per_second=50331648 throughput_bytes_per_second_burst=50331648

ts=2022-06-08T08:53:42.662423Z lvl=info msg="Open store (start)" log_id=0axd9dTW000 service=storage-engine service=store op_name=tsdb_open op_event=start

ts=2022-06-08T08:53:42.663429Z lvl=info msg="Open store (end)" log_id=0axd9dTW000 service=storage-engine service=store op_name=tsdb_open op_event=end op_elapsed=1.006ms

ts=2022-06-08T08:53:42.663514Z lvl=info msg="Starting retention policy enforcement service" log_id=0axd9dTW000 service=retention check_interval=30m

ts=2022-06-08T08:53:42.663546Z lvl=info msg="Starting precreation service" log_id=0axd9dTW000 service=shard-precreation check_interval=10m advance_period=30m

ts=2022-06-08T08:53:42.664782Z lvl=info msg="Starting query controller" log_id=0axd9dTW000 service=storage-reads concurrency_quota=1024 initial_memory_bytes_quota_per_query=9223372036854775807 memory_bytes_quota_per_query=9223372036854775807 max_memory_bytes=0 queue_size=1024

ts=2022-06-08T08:53:42.667831Z lvl=info msg="Configuring InfluxQL statement executor (zeros indicate unlimited)." log_id=0axd9dTW000 max_select_point=0 max_select_series=0 max_select_buckets=0

ts=2022-06-08T08:53:42.676690Z lvl=info msg=Listening log_id=0axd9dTW000 service=tcp-listener transport=http addr=:8086 port=8086

ts=2022-06-08T08:53:42.678409Z lvl=info msg=Starting log_id=0axd9dTW000 service=telemetry interval=8h

2022-06-08T08:53:41. info pinging influxd... {"system": "docker", "ping_attempt": "0"}

2022-06-08T08:53:41. info got response from influxd, proceeding {"system": "docker", "total_pings": "1"}

2022-06-08T08:53:41. info Executing user-provided scripts {"system": "docker", "script_dir": "/docker-entrypoint-initdb.d"}

2022-06-08T08:53:41. info initialization complete, shutting down background influxd {"system": "docker"}

Command "print-config" is deprecated, use the influx-cli command server-config to display the configuration values from the running server

Command "print-config" is deprecated, use the influx-cli command server-config to display the configuration values from the running server

Command "print-config" is deprecated, use the influx-cli command server-config to display the configuration values from the running server

2022-06-08T08:53:42. info found existing boltdb file, skipping setup wrapper {"system": "docker", "bolt_path": "/var/lib/influxdb2/influxd.bolt"}
vlastahajek commented 2 years ago

Thanks. You may write some point in the server future time. You can check this, by adding stop with some future time to the query time range, e.g. stop: 1h.

Can you verify, that origin of data and server have time synced?

vlastahajek commented 2 years ago

@fthdrmzzz, anything new?

fthdrmzzz commented 2 years ago

Sir I will update you and continue on the issue on friday due to my availability. Please do not close this issue.

fthdrmzzz commented 2 years ago

You may write some point in the server future time.

It turns out that this is the case for the problem. I can get all 250 messages when I added stop: 1h. Thank you for your time.