Closed asafm closed 5 months ago
I'm not opposed to changing the internal approach to the way the SDK does exports. Would you have the time to create a PR for this change, @asafm ?
This streaming visitor proposal deviates from the spec and its hard to predict how future spec changes would interact it. I’m worried about painting ourselves into a corner by implementing a liberal interpretation of the spec.
This proposed change would be very invasive, touching large chunks of the internals of the SDK. The opentelemetry-java project is currently resource constrained - we’re down to just two maintainers, and only one full time maintainer. There are many competing priorities in the OpenTelemetry project, and I’m skeptical that this can be given the attention required to see it through.
I’d like to explore the idea of achieving the desired reduction memory allocation / churn without such disruptive changes to MetricExporter and the internals. Here are some ideas that come to mind:
I think if all these steps were taken, the number of allocations per collection could be very low.
You mention an aggregation-and-filtering class. Presumably you’re interested in filtering because printing a million points in the prometheus exposition format seems outlandish. Maybe you’d want to track all the series, but selectively read them out. Its currently expensive for a reader to filter metrics since it has to read all metrics and skip the ones that are irrelevant. We could potentially improve this in the current design by allowing readers to pass MetricProducers a predicate to filter by scope, instrument, or point data (i.e. attributes). This would require a change to the spec.
I'm not opposed to changing the internal approach to the way the SDK does exports. Would you have the time to create a PR for this change, @asafm ?
@jkwatson Once I prepare the design proposal to Apache Pulsar, and they approve it, yes.
I think if this API is complementary to the existing API, it'd be a great addition. This even goes into some discussions we've had, specification level, around how to adapt existing metric solutions w/ OTEL.
SPecifically, if I understand this correctly, you're trying to bridge Pulsar's existing metrics into OTEL rather than using OTEL to collect and report them via its API?
@jack-berg raises valid specification concerns. I still think creating an experimental / optional hook for Java here would be beneficial and I'd love to expand the metric specification in OTEL to allow this use case.
First, thank you, @jkwatson, for the prompt reply, and thank you, @jack-berg, for the thorough reply - I highly appreciate it. I have given it much thought, so it took me some time to reply. I will address each issue raised, and also I would like your help and experience in brainstorming this, as I think this is truly beneficial to OTel beyond Apache Pulsar's scope.
This streaming visitor proposal deviates from the spec and its hard to predict how future spec changes would interact it
I read the specs multiple times just to be sure. I'm quoting from the spec:
Exports a batch of Metric points. Protocol exporters that will implement this function are typically expected to serialize and transmit the data to the destination. ... batch - a batch of Metric points. The exact data type of the batch is language specific, typically it is some kind of list. The exact type of Metric point is language specific, and is typically optimized for high performance. Here are some examples:
The way I see it is this: The exporter should receive a list of metric data points. Either having it up-front as a list or iterating over it doesn't change that semantic. An iterator is simply of more performant way to relay that information. The visitor pattern (i.e., Streaming API) is just another form of an iterator but one that is optimized for high performance:
visitAttributes(attr, metric-data)
).Resource -= Instrumentation Scope -= Instrument -= Attributes - Metric
data and relays it using that hierarchy. I want to expand on that last point since I think it's truly important. The main goal of the reader is to read data from the SDK and relay it to the exporter to export it. Both required exporters need to have the data structured in a hierarchy anyway:
MetricData
only to do additional work, which bears more memory allocation cost, to restructure it back to that hierarchy of Resource -= Instrumentation Scope -= Instrument -= Attributes - Metric
. Instrumentation -= Attributes - Metric
, and Resource
and InstrumentationScope
are flattened into the Attributes when printed out. So it too needs to spend an additional cost to restructure it.
If both exporters need it as such, it makes sense to relay that information using the streaming API suggested above to save that restructuring work. Also, as mentioned before, using the minimum amount of memory required for this.So in my understanding, the SDK is allowed to use the MetricsBatch
data structure utilizing visitor/streaming to relay the metric points data to the exporter per the defined spec. I guess we can always open that specific point for discussion in the OTel spec to see if we exceed the SDK boundaries.
This proposed change would be very invasive, touching large chunks of the internals of the SDK. The opentelemetry-java project is currently resource constrained - we’re down to just two maintainers, and only one full time maintainer. There are many competing priorities in the OpenTelemetry project, and I’m skeptical that this can be given the attention required to see it through.
This a concern I can't say a lot about. This project is owned by the maintainers; you decide on it as you have more context. I can only contribute a bit of information to help make that decision in my opinion:
I agree that it is invasive, as it has to go all the way to the Aggregator Handle, yet as I said, I think the added complexity exists but is low, and added value to the end user is immense.
I’d like to explore the idea of achieving the desired reduction memory allocation / churn without such disruptive changes to MetricExporter and the internals. Here are some ideas that come to mind:
Its true that we allocate immutable snapshots PointData for each metric point for each collection, but this doesn’t need to be the case. Concurrent collections are not possible so it should be possible / reasonable for each point to have a mutable PointData which is updated with the current aggregated value on collection. Same applies to each instrument, which allocates a MetricData for each collection. Each instrument could have a mutable MetricData updated with the current collection of PointData on each collection. This same technique could be applied to the various lists involved in producing the snapshots as well.
I took the time to explore the code in depth and draft a pseudo code of how it will look like.
DefaultSynchronousMetricStorage
mutableMap<Attributes, MutableMetricData> metricDataCache
collectAndReset()
AggregatorHandle aggregatorHandle = entry.getValue()
// TODO What about the exemplars
mutableMetricData = metricDataCache.get(entry.getKey())
mutableMetricData.reset()
aggregatorHandle.accumulateThenReset(attributes, mutableMetricData)
return metricDataCache.values() // return Collection<MutableMetricData>
For synchronous storage, we can keep a MutableMetricData
per Attribute.
The aggregatorHandle can snapshot the values into it.
We can return the values collection to avoid creating another map and more MapEntry, which has the complexity of O(metricPointsCount)
, which, again, for large scale, can amount to 70M.
AsynchronousMetricStorage
void recordDouble(double value, Attributes attributes) {
// This line needs to change
T accumulation = aggregator.accumulateDoubleMeasurement(value, attributes, Context.current());
if (accumulation != null) {
recordAccumulation(accumulation, attributes);
}
// This needs to work with MutableMetricData (map of attributes to it)
}
For asynchronous storage, we need to switch over from accumulation to MutableMetricPoint.
Aggregator
default T accumulateLongMeasurement(long value, Attributes attributes, Context context) {
// we need to avoid creating a handle per attribute per collection
AggregatorHandle<T, U> handle = createHandle();
handle.recordLong(value, attributes, context);
return handle.accumulateThenReset(attributes);
}
Also, we need to avoid creating a new AggregatorHandle
per each Attributes
and save it per Attributes
.
MutableMetricData
will contain something like Data<MutableHistogramPointData>
.
IMO, when you review how the code will change, the code will end up in a much less maintainable way and elegance compared with the visitor approach:
Also, the Aggregation and filtering decorator will need to spend time sorting the data since it works in the scope of an instrument, and the SDK or any other metric producer doesn't guarantee ordering for a list of metric points. The same goes for the exporters - they need to spend time doing group by to get it in the proper hierarchy.
The last point is memory consumption. I think we'll end up taking metricsPointCount x 2-3
memory. At least x2 since we keep raw data in memory and the "snapshot" data, and we need to make sure, and it is hard, that we're not allocating anything other, which is O(metricsPointCount)
.
So, in my opinion, the cached re-usable objects approach doesn't give you easier-to-maintain code compared with the visitor pattern and bears the costs of x2-x3 memory consumption (not allocation).
The OTLP exporter does allocate many Marshaller objects, but while this makes the code readable its not necessary. Refactoring to reduce allocations would be necessary with the proposed streaming approach as well.
I agree - this needs changing in any path chosen.
Each reader does have its own storage, so each registered reader multiplies the storage footprint. Separate storage is always needed when temporality is delta, since even if both readers are delta they can be read at different intervals. If temporality is cumulative AND the default aggregation is the same, optimizations can be made to share storage between readers. However, I don’t think multiple readers is very common. If you want to export to multiple locations its better to do so from the collector. Additionally, there’s a spec proposal to allow views to be configured at a per reader level, which would further limit the applicability of shared storage. Finally, this optimization would make the code harder to understand and maintain so it’s not free.
What I meant is that once you resolve through the view which storages you need, then using a storage registry you can obtain one based on its parameters, and it might give you an existing instance. The parameters are:
So if those parameters are the same for a given reader and its resolved views, the storage can be re-used across readers. The spec proposal for additional views should not get in the way of that reusability.
I do agree that it is not very popular; hence, I'm ok with taking that optimization out of scope for this proposal.
You mention an aggregation-and-filtering class. Presumably you’re interested in filtering because printing a million points in the prometheus exposition format seems outlandish. Maybe you’d want to track all the series, but selectively read them out. Its currently expensive for a reader to filter metrics since it has to read all metrics and skip the ones that are irrelevant. We could potentially improve this in the current design by allowing readers to pass MetricProducers a predicate to filter by scope, instrument, or point data (i.e. attributes). This would require a change to the spec.
I'll explain the need. Aggregation is about aggregating in the level of the instrument and a set of Attributes. For example:
Let's take pulsar.messaging.in.bytes
as an example. It normally has attributes of a cluster, namespace, topicGroup, and topic. I can say I want to aggregate it to topicGroup only for topic=logs-*
. The rest would remain as is.
Example:
Raw data:
pulsar.messaging.in.bytes
cluster=c1, namespace=ns1, topicGroup=logs, topic=logs-a1 10
cluster=c1, namespace=ns1, topicGroup=logs, topic=logs-a2 20
cluster=c1, namespace=ns1, topicGroup=logs, topic=logs-a3 30
cluster=c1, namespace=ns1, topicGroup=billing, topic=billing-b1 100
cluster=c1, namespace=ns1, topicGroup=billing, topic=billing-b2 200
After aggregation
pulsar.messaging.in.bytes
cluster=c1, namespace=ns1, topicGroup=logs, 60
cluster=c1, namespace=ns1, topicGroup=billing, topic=billing-b1 100
cluster=c1, namespace=ns1, topicGroup=billing, topic=billing-b2 200
In terms of filtering, the aggregation rule will allow you to match many instruments but only emit certain ones.
It is designed as a decorator to the exporter since a Pulsar operator (person) can configure it to use by default a granularity of topic groups (say 100) instead of 1M topics, and say keep 3-5 instruments. If a certain topicGroup misbehaves they can "open it for debug" by adding a rule that set the aggregation for this topic group to be none, and also have 20 instruments if needed to further diagnose the issue.
Sorry for the lengthy reply :) I just wanted to make sure I relay my thinking properly.
@jsuereth
I think if this API is complementary to the existing API, it'd be a great addition. This even goes into some discussions we've had, specification level, around how to adapt existing metric solutions w/ OTEL.
Yes we add an additional way for a metrics producer to produce metrics. The most performance implementation would utilize the visitor pattern implementation, while the normal method would simply run a visitor aimed at aggregating it all to a list of metric points and return it:
MetricsProducer {
// existing method renamed slightly
// The implementation can be default and simply call collectAllMetrics, run the MetricsBatch with a visitor which aggregates to a list and returns it
Collection<MetricData> collectAllMetricsAsCollection()
// new method
MetricsBatch collectAllMetrics()
}
A similar approach can be done with the exporter.
SPecifically, if I understand this correctly, you're trying to bridge Pulsar's existing metrics into OTEL rather than using OTEL to collect and report them via its API?
I want to delete the usage of existing metrics libraries in Pulsar and use only OTel to define report, and export those metrics either with Prometheus exporter or OTLP exporter, based on user configuration.
@jack-berg raises valid specification concerns. I still think creating an experimental / optional hook for Java here would be beneficial and I'd love to expand the metric specification in OTEL to allow this use case.
I would be happy to work with you together on this spec enhancement if you instruct me on the process for it.
There's a lot to unpack there. I could nitpick things here and there, but I'll try to stick to higher level ideas.
I think its a good idea to optimize the metrics SDK. Whatever optimizations are made should be done in reasonably small chunks, and backed with JMH tests that show how they're helping.
Changes that impact the public API surface area of the metrics SDK (i.e. a visitor pattern) can be prototyped in internal packages, or in the incubator depending on the context, but also needs to go through the spec to ensure that the Java implementation doesn't end up getting in the way of future spec changes.
What I meant is that once you resolve through the view which storages you need, then using a storage registry you can obtain one based on its parameters, and it might give you an existing instance.
Delta readers will always need unique storages since for them, reading metrics has a side affect of reseting the aggregations. If two delta readers share the same storage, they'll each only get half the picture 😛
Thanks for the support @jack-berg !
First step, as you said, is to make sure it follows current specifications. I've created a Discussion in the specification's repository to validate that: https://github.com/open-telemetry/opentelemetry-specification/discussions/3128 @jsuereth Appreciate any opinion or help advance this discussion. 🙏
I'm somewhat late to this party - not sure how I missed this discussion, but there we are.
First things first - thanks @asafm for presenting this use case and some interesting design ideas.
I would love to see some public, reproducible benchmarks that clearly demonstrate how bad the issue is. Is that possible?
E.g. If we have a 5ms read/write goal, then how much does the current implementation miss by? Is that miss still present (or still a problem) when using the low-pause collectors (such as ZGC / Shenandoah)?
Then we can look at implementing some of @jack-berg suggested internal changes to see if they help. I'm very mindful of the problems involved with changing the API / causing future problems with spec - and even more worried about Jack's point about how lightly staffed this project currently is.
@kittylyst Hi Ben, thanks and appreciate you're joining the conversation. If you look at the numbers I've written above at 1M topics, I don't think such tests are needed, as the amount of objects to garbage collect is super high (700M). As I wrote, Apache Pulsar have witnessed those chocking points in the past, with Prometheus client which uses the same concept of batching it all into one big list of metric points upon collection. At collection time we saw how Pulsar latency was high. Once we moved away into the streaming concept, this problem vanished.
As @jack-berg said, my general plan was to add the functionality to complement existing functionality, thus you get to choose which mode you wish to use upon creation of exporter. This will allow me to write benchmarks to show the difference in memory pressure - the memory allocations - between the two modes.
As I wrote, I think an observability library goal is to go unnoticed. Once we have the streaming mode as an option, we achieve that, since the memory consumption / allocation is in an order of magnitude smaller by design. I presume OTel library would also be used in smaller constraints devices such an Android, so having that library minimize its memory allocation footprint should be a good goal to achieve IMO.
As I replied in my long reply (sorry about that), I think the suggested direction of having re-usable objects, leads to quite complicated / non-elegant implementation which will be hard to reason with. My plan was, if spec wise is ok, to implement that as @jack-berg said "on the side", and show it's a reasonable change and not complicated as it might sound like - in effect show the trade-off is quite small in light of what you gain. Of course, do it in small chunks.
If you look at the numbers I've written above at 1M topics, I don't think such tests are needed, as the amount of objects to garbage collect is super high (700M).
Java GC is a complex beast, and is not in any sense O(#objects allocated). That's why you need to test it.
Also - microbenchmarks are not a good way to go here, so you would need to write a realistic test harness app to compare the two modes anyway.
My suggestion is really just that you write that harness app first - then it can be examined by other people for known or possible pathological (positive or negative) performance effects, and the effects of different GCs and configurations can be explored. This provides a positive, provable benefit to the project and to users - regardless of whether the experimental work to change to a streaming mode is ultimately successful or not.
Thanks for replying @kittylyst. Does OpenTelemetry have such a harness? How did it test itself to know it won't create any load on apps - be it sensitive systems like databases or query engines, mobile devices like Android?
Why micro benchmarks are not good enough? If I can show, I've lowered the amount of memory allocations per collection (which runs periodically) by x100, using such benchmarks, why isn't it a sufficient metrics / guideline to follow, given the codebase end result is reasonable and readable of course.
Regarding harness app - there are so many versatile workloads out there, be it I/O heavy, databases, mobile devices, and so many others, how can I create one universal app simulating it all? Isn't the idea is to try lowering any cost the library bears in terms of CPU, memory used, memory allocated?
Summarizing code changes done by maintainers, up until now, related partly to this issue: https://github.com/open-telemetry/opentelemetry-java/pull/5183 https://github.com/open-telemetry/opentelemetry-java/pull/5182 https://github.com/open-telemetry/opentelemetry-java/pull/5184 https://github.com/open-telemetry/opentelemetry-java/pull/5142 https://github.com/open-telemetry/opentelemetry-java/pull/5176 https://github.com/open-telemetry/opentelemetry-java/pull/5184 https://github.com/open-telemetry/opentelemetry-java/pull/5020
---- Up to there, the feature is done; beyond is nice to have optimizations
[ ] Add eviction in Object Pool to reduce memory used
[ ] Reduce allocations caused by AttributeProcessor if used by view
[ ] Add statistics on ObjectPool?
🚧 In progress
Context
Apache Pulsar is a distributed, horizontally scalable messaging platform. Think of Kafka + RabbitMQ as one scalable platform. Its primary nodes are called Brokers. Pulsar clients use topics to produce and consume messages. Pulsar has a unique feature that allows it to support up to 1 million topics cluster-wide, and work is being done to support 1 million topics in a single broker. Also, it can support very low latency - less than 5ms per write/read.
Pulsar broker today exposes topic-level, producer, and consumer-level metrics (a topic can have many connected consumers and producers). One of the first issues that happened far in the past was memory pressure - during metrics collection, many metric data points objects were allocated, leading to the CPU spending time running the garbage collector and causing latency to spike well beyond the promised 5ms. This was solved by custom code, which iterated each topic and encoded metric values in Prometheus text format directly to an off-heap byte buffer, thus avoiding memory allocation. Mutable resettable objects were used (thread local instead of object pool) to facilitate data transition between Pulsar business logic classes - think of it as an object pool of DTOs. This has worked well, and Pulsar can withstand its promised 5ms latency. The byte buffer is written to the Prometheus REST endpoint HTTP response object output stream.
Today one of the significant caveats Pulsar has to use that feature up to its full potential is the metrics. If you have as low as 100k topics per broker, when each topic has 70 unique metrics, this leads to emitting 7 million metric data points. When a broker hosts 1M topics, it will emit 70M metric data points. This has several downsides impacting end users:
One of the features I'm currently designing for Pulsar is the ability to configure aggregation and filtering in a Pulsar broker. Users can specify topic groups, which would typically be in the hundreds. The topic metrics will be emitted in the granularity of topic groups and not topics, thus reducing to normally usable cardinality. Users can dynamically alter a specific group to get it in the granularity of topics enabling them to "debug" issues with this group. Filtering would allow them to get all 70 metrics for a given group while getting the essential five metrics for all other groups.
Since Pulsar metrics code today is composed of multiple custom metric stacks accumulated over ten years, it requires consolidating it into a single metrics library. This gives the ability to choose a new metrics library for Apache Pulsar and ditch that custom code. After a lengthy evaluation of all existing Java metric libraries, I have concluded that OpenTelemetry is the fittest: Clear, understandable API, elegant SDK specifications leading to the elegant composable design, powerful features set, soon-to-be industry-wide standard, very active community, and foundation-based.
The problem
When the SDK is used in an environment where many metric data points are created per metrics collection (many = gt 500k up to 70M), the amount of allocated objects on the heap is very large (1x - 10x the number of metric data points, meaning in Pulsar case to 1M to 700M objects in the extreme case), leading to CPU spent in garbage collection instead doing Pulsar code hence impacting latency, making it far higher than 5ms and leading to instability of the latency.
There are multiple sources of this memory allocation:
MetricReader
interface and theMetricsExporter
interfaces were designed to receive the metrics collected from memory by the SDK using a list (collection); thus, each metric point is allocated one object.(instrument, Attributes),
meaning each reader doubles the amount of memory required and the amount of memory allocation, thus leading to double the garbage collection.The proposed solution
Batch to Streaming API
The proposed idea is to switch the metrics collection methodology from batching - producing a list - to streaming, meaning iterating the results using the visitor pattern. It is similar to the difference between different ways to do XML/JSON parsing: DOM Parsers vs. SAX parsers. Switching to streaming API will start with aggregator handle and storage classes, continue with
MetricProducer
andMetricReader
, and end withMetricExporter
, which will allow us to minimize heap object allocation to a bare minimum during metrics collection by streaming the data directly to the socket used by the exporter or an off-heap byte array (later to be written by the exporter to the socket).The following is a pseudo-code sketch of the suggested change to the SDK. It uses the visitor design pattern coupled with re-usable metric data points objects, referred to as
Mutable*
in the pseudo-code below.Mode getMode()
method by default will return BATCH to be backward compatible with exporters created outside of this SDK. SDK exporter implementations will return STREAMING. This will allow the Metric Reader orchestrating it all to choose which method to execute for the exporter.MetricReader
is currently not allowed to be created outside the SDK; hence its implementation is changeable.MetricProducer
interface is internal and hence susceptible to change. If needed, we can use the same principle ofgetMode()
to decide which method to call on theMetricProducer
to collect the metrics.If there is an agreement, a more detailed design will be presented, perhaps with better ideas / naming to achieve the same goal.
Streaming Exporters
The OTLP exporter can be modified to sequentially encode the metric data, as the visitor assures us the method will be called in the proper order (Resource, instrumentation scope, instrument, attributes); thus, we can write directly to the output stream, removing the need to allocate marshaller objects per data point.
The same can be achieved more easily with Prometheus exporter, writing directly to the HTTP output stream.
Reusing storage for the same conditions
If two metric readers share the same aggregation temporality with the same parameters, the same storage can be used for both. If an instrument is configured the same in two metric readers, in terms of aggregation function and temporality, the same storage instance can be used. For example, if we're using a Prometheus reader as is and an OTLP exporter as is, with cumulative aggregation, all instrument storage instances can be created once to be used for both readers.
Using this in Apache Pulsar
Following this change, I can write an aggregation-and-filtering class decorating the SDK OTLP exporter, which performs the configured aggregation per topic group within an instrument. I can present the design for that if it is needed.
Benefits
Notes
If this idea is agreed upon, I can contribute a more detailed design and implement it.