Open axw opened 4 years ago
https://github.com/elastic/apm-server/compare/master...axw:processor-concurrent-read
In this branch I have modified processor/stream to:
map[string]interface{}
s concurrently with validation and translation into model types (would partially address https://github.com/elastic/apm-server/issues/1285, but see below)On master with heavy.ndjson
the benchmark I get ~19MB/s, with this branch I get ~27MB/s. Once https://github.com/elastic/apm-server/issues/3551 is done, as mentioned in https://github.com/elastic/apm-server/issues/1285#issuecomment-607569788, it would no longer be possible to parallelise decode/validate; but I expect validation will be so fast that it won't matter.
We can come back to this once #3551 is done.
The
processor/stream
code reads in batches of 10 events at a time:https://github.com/elastic/apm-server/blob/17e0f7a9f1a3aa1453e5df8b7d4cf746e9d7179d/processor/stream/processor.go#L279
Once a batch is received, they are dispatched to the publisher, which transforms and sends them through the libbeat pipeline to be recorded in Elasticsearch.
By default, agents will close the stream after 10 seconds, or after it reaches a certain size (~750K). So if an agent sends fewer than 10 events, the
processor/stream
code will generally block waiting for the stream to end before it dispatches to the publisher.We should consider adding a timeout (or context with timeout) to the
StreamReader.Read
method to avoid this.