apache / pekko-connectors

Apache Pekko Connectors is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Apache Pekko.
https://pekko.apache.org/
Apache License 2.0
56 stars 27 forks source link

AMQP: Avoid allocating a new ByteString on each consumed message #592

Closed dimamo5 closed 2 weeks ago

dimamo5 commented 3 months ago

Avoids copying a new array and a new allocation by using the fromArrayUnsafe method. It should be ok to build from this unsafe method as long as body is not exposed to the outside.

pjfanning commented 3 months ago

Thanks @dimamo5 but I am not sure that this is necessarily a good idea. How can we be sure that AMQP provider does not reuse byte arrays? We can only use fromArrayUnsafe if we are 100% sure that the byte array won't be reused. Even if 1 AMQP solution can be proved not to reuse arrays, can we be sure that all AMQP solutions don't?

dimamo5 commented 3 months ago

The critical bit is the implementation of com.rabbitmq.client.Channel used in the DefaultConsumer. The default ChannelN and RecoveryAwareChannelN from rabbitmq client do not reuse byte arrays, since they eventually call https://github.com/rabbitmq/rabbitmq-java-client/blob/ca510a078092a3c64b1219feac527bc738b63313/src/main/java/com/rabbitmq/client/impl/CommandAssembler.java#L145 that performs a System.arraycopy.

Regarding the AMQP provider, I don't think this connector allows for customization of the channel directly from the provider so either one of the abovementioned channels will be used.

pjfanning commented 3 months ago

RabbitMQ is not the only AMQP implementation. We could make the ByteString behaviour configurable but I don't think we can hardcore this change.

pjfanning commented 2 months ago

@dimamo5 I added an avoidArrayCopy setting that defaults to false (existing behaviour). I pushed this change to your PR, if you don't mind. Could you have a look?

If this approach is ok, new tests will need to be added - that test with avoidArrayCopy=true.

dimamo5 commented 1 month ago

@pjfanning sorry for the slow response. This is what I was looking for. Thank you.

pjfanning commented 1 month ago

@dimamo5 I've added code so that message writing can also avoid copying the array data for the ByteStrings - also based on settings.avoidArrayCopy

I'm pretty busy with other Pekko stuff. Would you have time to add some test cases that say duplicate existing test cases for read and write cases but where settings.avoidArrayCopy is set to true?

pjfanning commented 2 weeks ago

I wrote most of the code in this PR so I think I won't be able to approve it.

laglangyue commented 2 weeks ago

LGTM

raboof commented 2 weeks ago

dynamodb was failing with [error] warnings found and -Werror specified, re-running those

pjfanning commented 2 weeks ago

dynamodb was failing with [error] warnings found and -Werror specified, re-running those

This issue was in main branch. It was fixed with #708