elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
12.05k stars 4.89k forks source link

GCP PubSub Input Performance #35029

Closed kcreddy closed 1 day ago

kcreddy commented 1 year ago

GCP Pubsub input has certain bottlenecks which needs to be addressed:

elasticmachine commented 1 year ago

Pinging @elastic/security-external-integrations (Team:Security-External Integrations)

elasticmachine commented 5 months ago

Pinging @elastic/security-service-integrations (Team:Security-Service Integrations)

kcreddy commented 2 months ago

Closed https://github.com/elastic/beats/pull/37657 which was aimed at creating multiple pubsub clients rather than beat pipeline clients. Having multiple beat pipeline clients helps in reducing lock contention, but as seen in the attached mutex profiles in the PR, multiple pubsub clients doesn't really reduce it.

The solution we need is similar to AWS S3's SQS event processor: https://github.com/elastic/beats/blob/5b24b7d28afb35583233b3849128f8b2f1d18abc/x-pack/filebeat/input/awss3/sqs_s3_event.go#L310-L311

Here, the S3 input creates 1 pipeline client for each SQS message to process all S3 events within that SQS message. But in this case GCP Pubsub input must create a Sync Pool for maintaining multiple pipeline clients, and each pipeline client must process a pubsub message. Although there would be CPU cost associated with this pool for fetching and releasing the clients, it might help reduce lock contention and the benefits and drawbacks should be tested.

kcreddy commented 1 week ago

Hey @andrewkroh,

I tried 2 variations which we discussed:

  1. adding pipeline clients inside an array
  2. using sync.Pool.

As pet the results below, there is not any performance improvement observed in both variations.

Both variations are taken from this base commit which is close to v8.14.0 tag: https://github.com/elastic/beats/commit/4c4b2f899587cdb251b2c1d947395eb7664842bd The base filebeat version I used to compare is v8.14.0 tag (which has minor changes only related to docs and CI compared to the commit 4c4b2f899587cdb251b2c1d947395eb7664842bd)

Variation 1: adding pipeline clients inside an array

https://github.com/elastic/beats/compare/v8.14.0...kcreddy:beats:variation1-array: 2 files changed: x-pack/filebeat/input/gcppubsub/config.go and x-pack/filebeat/input/gcppubsub/input.go

  1. Create configurable number of Outleter (pipeline clients) and store them in an array.
  2. Choose a pipeline client randomly from the array during sub.Receive and use it to publish messages. PR: https://github.com/elastic/beats/pull/39999

Results: default -> subscription.num_goroutines: 1, subscription.max_outstanding_messages: 1600 ocl -> outlet.num_pipelines (pipeline clients) config setting. seed -> including a seed time.Now().UnixNano() (inside PR). g2 -> subscription.num_goroutines: 2 m2000 -> subscription.max_outstanding_messages: 2000 r30s -> Test ran for 30 seconds

  1. Events Per 30 seconds, measured from Non-zero metrics log.
Run 8.14.0 8.14.1 (2 ocl) 8.14.1 (5 ocl) 8.14.1 (5 ocl+seed)
default-r30s 86k 90k 88k 89k
g2m2000r30s 186k 197k 170k 177k
  1. Contentions: from running pprof command: go tool pprof -seconds 30 http://localhost:5066/debug/pprof/mutex pprof.filebeat.contentions.delay.001.8.14.0-default-r30s.pb.gz pprof.filebeat.contentions.delay.001-8.14.0-g2m2000-r30s.pb.gz pprof.filebeat.contentions.delay.001-8.14.1-default-ocl5-seed-r30s.pb.gz pprof.filebeat.contentions.delay.001-8.14.1-g2m2000-ocl5-seed-r30s.pb.gz

Variation 2: using sync.Pool.

https://github.com/elastic/beats/compare/v8.14.0...kcreddy:beats:variation2-syncpool: 1 file changed: x-pack/filebeat/input/gcppubsub/input.go

  1. Create a sync.Pool that returns an Outleter (pipeline client)
  2. In sub.Receive, use pool.Get() and use it to publish messages and put it back using pool.Put(). PR: https://github.com/elastic/beats/pull/39998

Results: default -> subscription.num_goroutines: 1, subscription.max_outstanding_messages: 1600 g2 -> subscription.num_goroutines: 2 m2000 -> subscription.max_outstanding_messages: 2000 r30s -> Test ran for 30 seconds

  1. Events Per 30 seconds, measured from Non-zero metrics log.
Run 8.14.0 8.14.1
default-r30s 86k 86k
g2m2000r30s 186k 165k
  1. Contentions: from running pprof command: go tool pprof -seconds 30 http://localhost:5066/debug/pprof/mutex pprof.filebeat.contentions.delay.001.8.14.0-default-r30s.pb.gz pprof.filebeat.contentions.delay.001-8.14.0-g2m2000-r30s.pb.gz pprof.filebeat.contentions.delay.001.8.14.1-default-syncpool-r30s.pb.gz pprof.filebeat.contentions.delay.001.8.14.1-g2m2000-syncpool-r30s.pb.gz
andrewkroh commented 3 days ago

The contention for the lock within the beat.Client Publish() will be most noticeable when there is some edge-processing being used (like dissecting a message in the Beat). For our integrations we have shifted most processing out of this area where the lock is being held and into Elasticsearch ingest node. The exceptions are for the default processors that are always present in Agent.

If there is little edge processing then optimizing to avoid contention on the publish lock probably isn't worthwhile. The sync.Pool experiment is showing that. And if we did pursue this we would need to use something other than sync.Pool to provide a pool of clients because we would want some upper-bound on the number of clients and we need to be able to Close() the clients.

Let's verify that we can achieve 20k EPS with no code changes, and record the settings we used. We can refer to those in the future if we are doing tuning.

kcreddy commented 1 day ago

I have run few more tests on existing filebeat (no code changes) to check if we can reach much higher throughput just by tuning the existing settings. The follow tests are run with output.file configuration, with no changes dedfault internal queue settings.

default -> subscription.num_goroutines: 1, subscription.max_outstanding_messages: 1600
g -> subscription.num_goroutines. Example: g2 => subscription.num_goroutines: 2
m -> subscription.max_outstanding_messages. Example: m2000 => subscription.max_outstanding_messages: 2000
r30s -> Test ran for 30 seconds
Run 8.14.0 (current GA) EPS
default-r30s 86k 3k
g2m2000r30s 186k 6k
g5m2000r30s 408k 14k
g5m6000r30s 580k 19k
g10m2000r30s 496k 17k
g10m6000r30s 630k 21k

So, just by tuning subscription.num_goroutines and subscription.max_outstanding_messages, we can go more than 20k events per second and beyond.

Due to lack of throughput improvement, only optimising for contention isn't worth it to pursue this investigation to add multiple outlet (pipeline) clients further.