open-telemetry / opentelemetry-collector

OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
4.27k stars 1.41k forks source link

Persistent Queue issues/questions #5894

Open ZachTB123 opened 2 years ago

ZachTB123 commented 2 years ago

I'm in the process of evaluating the persistent queueing functionality. I'm using the v0.57.2 release. I am unable to get persistent queuing to work when using multiple replicas of the collector. I currently have two different environments I'm testing this out in and I cannot get either to work. Both of these environments are expected to handle a large volume of telemetry so we have autoscaling set up.

Our first environment is using K8s. We are using the OpenTelemetry Helm chart and have autoscaling enabled. I created a PVC with the ReadWriteMany access mode and mounted it using extraVolumeMounts. persistent_storage_enabled is set to true and the directory used by the file_storage extension matches the mounted volume path. When the number of replicas is increased, the new replicas that were spun up fail to start due to a timeout which I assume is because it cannot get a lock on the file. Here is a snippet from the logs:

2022-08-10T18:54:47.655Z    info    zapgrpc/zapgrpc.go:174  [core] [Channel #1] Channel switches to new LB policy "pick_first"  {"grpc_log": true}
2022-08-10T18:54:47.655Z    info    zapgrpc/zapgrpc.go:174  [core] [Channel #1 SubChannel #2] Subchannel created    {"grpc_log": true}
2022-08-10T18:54:47.655Z    info    zapgrpc/zapgrpc.go:174  [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to CONNECTING  {"grpc_log": true}
2022-08-10T18:54:47.655Z    info    zapgrpc/zapgrpc.go:174  [core] [Channel #1 SubChannel #2] Subchannel picks a new address "redacted:4317" to connect {"grpc_log": true}
2022-08-10T18:54:47.655Z    info    zapgrpc/zapgrpc.go:174  [core] pickfirstBalancer: UpdateSubConnState: 0xc0008c4770, {CONNECTING <nil>}  {"grpc_log": true}
2022-08-10T18:54:47.655Z    info    zapgrpc/zapgrpc.go:174  [core] [Channel #1] Channel Connectivity change to CONNECTING   {"grpc_log": true}
2022-08-10T18:54:47.836Z    info    zapgrpc/zapgrpc.go:174  [core] [Channel #1 SubChannel #2] Subchannel Connectivity change to READY   {"grpc_log": true}
2022-08-10T18:54:47.837Z    info    zapgrpc/zapgrpc.go:174  [core] pickfirstBalancer: UpdateSubConnState: 0xc0008c4770, {READY <nil>}   {"grpc_log": true}
2022-08-10T18:54:47.837Z    info    zapgrpc/zapgrpc.go:174  [core] [Channel #1] Channel Connectivity change to READY    {"grpc_log": true}
Error: cannot start pipelines: timeout
2022/08/10 18:54:48 collector server run finished with error: cannot start pipelines: timeout

Our other environment uses Azure Container Apps. I mounted a file share from Azure Files using this approach. As soon as I scale up to multiple replicas, all replicas stop working and constantly restart. I see messages like the following:

"2022-08-10T18:49:44.433Z   info    service/telemetry.go:103    Setting up own telemetry..."
"created by go.etcd.io/bbolt.(*DB).freepages"
"2022-08-10T18:49:44.475Z   info    extensions/extensions.go:49 Extension started.  {""kind"": ""extension"", ""name"": ""file_storage""}"
"go.etcd.io/bbolt.(*DB).freepages.func2()"
"2022-08-10T18:49:44.475Z   info    extensions/extensions.go:45 Extension is starting...    {""kind"": ""extension"", ""name"": ""health_check""}"
"panic: freepages: failed to get all reachable pages (page 5: multiple references)"
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/db.go:1054 +0x1f6"
"2022-08-10T18:49:44.475Z   info    healthcheckextension@v0.57.2/healthcheckextension.go:44 Starting health_check extension {""kind"": ""extension"", ""name"": ""health_check"", ""config"": {""Endpoint"":""0.0.0.0:13133"",""TLSSetting"":null,""CORS"":null,""Auth"":null,""MaxRequestBodySize"":0,""IncludeMetadata"":false,""Path"":""/"",""CheckCollectorPipeline"":{""Enabled"":false,""Interval"":""5m"",""ExporterFailureThreshold"":5}}}"
"2022-08-10T18:49:44.433Z   info    service/telemetry.go:138    Serving Prometheus metrics  {""address"": ""0.0.0.0:8888"", ""level"": ""detailed""}"
"2022-08-10T18:49:44.475Z   info    extensions/extensions.go:45 Extension is starting...    {""kind"": ""extension"", ""name"": ""oauth2client""}"
"goroutine 96 [running]:"
"2022-08-10T18:49:44.475Z   info    extensions/extensions.go:49 Extension started.  {""kind"": ""extension"", ""name"": ""oauth2client""}"
"2022-08-10T18:44:32.663Z   info    extensions/extensions.go:45 Extension is starting...    {""kind"": ""extension"", ""name"": ""file_storage""}"
"2022-08-10T18:44:32.663Z   info    healthcheckextension@v0.57.2/healthcheckextension.go:44 Starting health_check extension {""kind"": ""extension"", ""name"": ""health_check"", ""config"": {""Endpoint"":""0.0.0.0:13133"",""TLSSetting"":null,""CORS"":null,""Auth"":null,""MaxRequestBodySize"":0,""IncludeMetadata"":false,""Path"":""/"",""CheckCollectorPipeline"":{""Enabled"":false,""Interval"":""5m"",""ExporterFailureThreshold"":5}}}"
"panic: freepages: failed to get all reachable pages (page 5: multiple references)"
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/db.go:1056 +0x99"
"2022-08-10T18:44:32.663Z   info    extensions/extensions.go:49 Extension started.  {""kind"": ""extension"", ""name"": ""health_check""}"
"2022-08-10T18:44:32.663Z   info    extensions/extensions.go:45 Extension is starting...    {""kind"": ""extension"", ""name"": ""oauth2client""}"
"2022-08-10T18:44:32.629Z   info    service/telemetry.go:138    Serving Prometheus metrics  {""address"": ""0.0.0.0:8888"", ""level"": ""detailed""}"
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/db.go:1054 +0x1f6"

"2022-08-10T18:44:32.663Z   info    pipelines/pipelines.go:78   Exporter is starting... {""kind"": ""exporter"", ""data_type"": ""logs"", ""name"": ""otlp/hcp""}"
"2022-08-10T18:44:32.663Z   info    extensions/extensions.go:49 Extension started.  {""kind"": ""extension"", ""name"": ""oauth2client""}"
"goroutine 135 [running]:"
"2022-08-10T18:44:32.629Z   info    service/telemetry.go:103    Setting up own telemetry..."
"created by go.etcd.io/bbolt.(*DB).freepages"
"2022-08-10T18:44:32.663Z   info    extensions/extensions.go:45 Extension is starting...    {""kind"": ""extension"", ""name"": ""health_check""}"
"2022-08-10T18:44:32.663Z   info    extensions/extensions.go:49 Extension started.  {""kind"": ""extension"", ""name"": ""file_storage""}"
"2022-08-10T18:44:32.662Z   info    extensions/extensions.go:42 Starting extensions..."
"2022-08-10T18:44:32.663Z   info    pipelines/pipelines.go:74   Starting exporters..."
"go.etcd.io/bbolt.(*DB).freepages.func2()"
"goroutine 131 [running]:"
"created by go.etcd.io/bbolt.(*DB).freepages"
"go.etcd.io/bbolt.(*DB).freepages.func2()"

or

"2022-08-09T17:30:24.887Z   info    pipelines/pipelines.go:78   Exporter is starting... {""kind"": ""exporter"", ""data_type"": ""logs"", ""name"": ""otlp/hcp""}"
"main.runInteractive({{0xc000358ba0, 0xc000358d50, 0xc000358c30, 0xc000358b10}, {{0x31365ba, 0x1d}, {0x3165761, 0x2b}, {0x30fa8ac, 0x5}}, ...})"
go.etcd.io/bbolt.(*Tx).Commit(0xc00016ce00)
github.com/spf13/cobra.(*Command).ExecuteC(0xc0003acc80)
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/exporter/exporterhelper/internal/persistent_storage.go:247 +0x394"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*persistentContiguousStorage).enqueueNotDispatchedReqs(0xc000b641e0, {0xc0003b4300, 0x30, 0xc000f0f248})"
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/bucket.go:537 +0xea"
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/db.go:748 +0xe5"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage.(*fileStorageClient).Batch(0xc000f0efa8, {0xc00071ab30, 0x40aa25}, {0xc000907100, 0xc0007d9b60, 0x0})"
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/service/collector.go:200 +0x1e9"
panic: page 32 already freed
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/exporter/exporterhelper/internal/persistent_storage.go:133 +0x288"
"go.opentelemetry.io/collector/component.StartFunc.Start(0xc000f0f470, {0x36ecec8, 0xc000128000}, {0x36ee198, 0xc0009182e8})"
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/service/service.go:115 +0xa6"
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/bucket.go:570 +0x32f"
"go.opentelemetry.io/collector/service.NewCommand.func1(0xc0003acc80, {0x30ff580, 0x2, 0x2})"
"github.com/spf13/cobra.(*Command).execute(0xc0003acc80, {0xc000122160, 0x2, 0x2})"
"   /go/pkg/mod/github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage@v0.57.2/filestorage/client.go:143 +0xc5"
"go.etcd.io/bbolt.(*freelist).free(0xc00067c300, 0x21e5ec9, 0x7ff69aaca000)"
"go.opentelemetry.io/collector/service.(*Collector).Run(0xc000f0fb00, {0x36ecec8, 0xc000128000})"
"   /tmp/build/main.go:32 +0x5d"
"2022-08-09T17:30:24.913Z   info    internal/persistent_storage.go:315  Fetching items left for dispatch by consumers   {""kind"": ""exporter"", ""data_type"": ""logs"", ""name"": ""otlp/hcp"", ""queueName"": ""otlp/hcp-logs"", ""numberOfItems"": 48}"
goroutine 1 [running]:
github.com/spf13/cobra.(*Command).Execute(...)

main.run(...)
go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*batchStruct).execute(...)
"2022-08-09T17:30:24.886Z   info    extensions/extensions.go:42 Starting extensions..."
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/exporter/exporterhelper/internal/persistent_storage_batch.go:53"
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/exporter/exporterhelper/queued_retry_experimental.go:162 +0xcf"
main.main()
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/service/internal/pipelines/pipelines.go:79 +0x752"
"   /go/pkg/mod/github.com/spf13/cobra@v1.5.0/command.go:918"
"   /go/pkg/mod/github.com/spf13/cobra@v1.5.0/command.go:990 +0x3bc"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewPersistentQueue({0x36ecec8, 0xc000128000}, {0xc00071b2f8, 0x8}, {0x30fa2fa, 0x4}, 0xc000763e4d, 0xc000369800, {0x370c618, 0xc00090f770}, ...)"
go.etcd.io/bbolt.(*node).spill(0xc0007364d0)
"go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*persistentContiguousStorage).put(0xc000b641e0, {0x7ff69af542d0, 0xc0007d9b60})"
"2022-08-09T17:30:24.887Z   info    extensions/extensions.go:49 Extension started.  {""kind"": ""extension"", ""name"": ""health_check""}"
"2022-08-09T17:30:24.886Z   info    extensions/extensions.go:49 Extension started.  {""kind"": ""extension"", ""name"": ""oauth2client""}"
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/node.go:346 +0xaa"
"go.opentelemetry.io/collector/service/internal/pipelines.(*Pipelines).StartAll(0xc00090f450, {0x36ecec8, 0xc000128000}, {0x36ee160, 0xc000736620})"
"go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).start(0xc000838900, {0x36ecec8, 0xc000128000}, {0x36ee198, 0xc0009182e8})"
"2022-08-09T17:30:24.913Z   warn    internal/persistent_storage.go:350  Failed unmarshalling item   {""kind"": ""exporter"", ""data_type"": ""logs"", ""name"": ""otlp/hcp"", ""queueName"": ""otlp/hcp-logs"", ""key"": ""7108903"", ""error"": ""value not set""}"
"2022-08-09T17:30:24.856Z   info    service/telemetry.go:103    Setting up own telemetry..."
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/exporter/exporterhelper/internal/persistent_queue.go:52 +0x13a"
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/exporter/exporterhelper/common.go:193 +0xae"
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/exporter/exporterhelper/queued_retry_experimental.go:198 +0x45"
"   /tmp/build/main.go:25 +0x1d8"
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/component/component.go:82 +0x43"
go.etcd.io/bbolt.(*node).spill(0xc0007363f0)
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/node.go:359 +0x216"
go.etcd.io/bbolt.(*Bucket).spill(0xc00016ce18)
"   /go/pkg/mod/go.etcd.io/bbolt@v1.3.6/tx.go:160 +0xe7"
go.etcd.io/bbolt.(*Bucket).spill(0xc0004a6cc0)
"   /go/pkg/mod/go.opentelemetry.io/collector@v0.57.2/service/command.go:50 +0x499"
"go.etcd.io/bbolt.(*DB).Update(0xc000076400, 0xc000f0eee0)"
"go.opentelemetry.io/collector/exporter/exporterhelper.(*queuedRetrySender).initializePersistentQueue(0xc000838900, {0x36ecec8, 0xc000128000}, {0x36ee198, 0xc0009182e8})"
"go.opentelemetry.io/collector/exporter/exporterhelper.newBaseExporter.func1({0x36ecec8, 0xc000128000}, {0x36ee198, 0xc0009182e8})"

My question is: is persistent queuing supposed to work with a collector with multiple replicas?

bogdandrutu commented 2 years ago

/cc @swiatekm-sumo @pmm-sumo ?

pmm-sumo commented 2 years ago

File storage client is using bbolt underneath. One of its limitations is that the same database cannot be used by multiple processes. (I believe it's worth adding this as a note to README)

The solution would be to configure the replicas in a way that uses separate path for each of them or to use another type of storage extension, which should be possible after the recent updates made by @swiatekm-sumo. I haven't tested it with dbstorage but it could work (though performance impact will might be prohibitive). @swiatekm-sumo what do you think?

Another solution might be building in a mechanism which could avoid conflict in naming different replica storage files

swiatekm commented 2 years ago

I don't think this is really bbolt's fault - we get a nice error because it doesn't allow concurrent access, but if it did, the collectors would clobber each others' data. Extensions give components storage clients based on those components' identity, which is what allows the data to persist over restarts, but the identity of the collector itself doesn't really enter into it, and if it did, we'd need to be very careful about determining it.

I think it's reasonable to expect that multiple storage extensions (be it from the same collector instance or from different ones) should have separate storage directories. We should document this better and try to have better error handling for it, though.

This makes @ZachTB123's use case awkward, and would require parametrising the storage locations with something like the Pod name, injected via the Downwards API. To be completely honest, doing it this way in Kubernetes feels like an antipattern - if you want persistent storage for a Deployment, you should just use a StatefulSet, and have a separate Volume for each Pod.

ZachTB123 commented 2 years ago

My concern with tying replicas to specific volumes (whether through paths or using StatefulSet) is making sure there is a way to ensure the persistent queue is completely drained before scaling down. For example, if there was an extended outage on the destination we are trying to export to, the replica's volume will start to fill up. Once the export destination is available again, that data in the persistent queue should start to drain. If the number of replicas were to decrease during this time (through auto scaling for example), I would like there to be some guarantee that the data in the volume used by that replica be completely emptied before being shut down. I don't want to encounter a situation where the data would still be in that volume and we would have to wait until the next time that replica comes up to drain the queue. In my situation I need to guarantee timely delivery of the data and I can make no guarantees on the number of replicas for our collector (besides having one replica minimum). This was where I was hoping we could just have one volume shared by all replicas to ensure the queue is drained no matter which replicas come and go from autoscaling.

swiatekm commented 2 years ago

My concern with tying replicas to specific volumes (whether through paths or using StatefulSet) is making sure there is a way to ensure the persistent queue is completely drained before scaling down. For example, if there was an extended outage on the destination we are trying to export to, the replica's volume will start to fill up. Once the export destination is available again, that data in the persistent queue should start to drain. If the number of replicas were to decrease during this time (through auto scaling for example), I would like there to be some guarantee that the data in the volume used by that replica be completely emptied before being shut down. I don't want to encounter a situation where the data would still be in that volume and we would have to wait until the next time that replica comes up to drain the queue. In my situation I need to guarantee timely delivery of the data and I can make no guarantees on the number of replicas for our collector (besides having one replica minimum). This was where I was hoping we could just have one volume shared by all replicas to ensure the queue is drained no matter which replicas come and go from autoscaling.

Currently, the queue assumes it's the sole consumer of the storage it's using. Even if we were to implement a storage extension allowing concurrent access from different processes (for example a Redis client), I don't think the current storage API is expressive enough to implement a queue like that. In principle, I think it'd be possible with a completely separate queue implementation, without using the storage at all.

I'm aware of the scaling problem, but it hasn't been an issue for us in nearly half a year of using OT to monitor our prod infrastructure. In practice, if you're burning down the queue, your instances should have high resource consumption (as long as you're not I/O bound on the exporter) and an autoscaler shouldn't scale them down.

If you actually want to pursue the idea of a queue sharable by multiple collector processes, we should open a new issue for it, I think.

ZachTB123 commented 2 years ago

I created #5902 for my previously mentioned concerns around draining the persistent queue.

dosmanak commented 6 months ago

Hello. I found the persistent sending queue is in alpha state for 2 years already. Is there a reason not to move it to beta?