ScalaConsultants / reactive-rabbit

Reactive Streams driver for AMQP protocol. Powered by RabbitMQ library.
Apache License 2.0
184 stars 40 forks source link

Ordered and async publish #28

Closed LGLO closed 8 years ago

LGLO commented 8 years ago

This is to fix #6 . I've chosen to use at most one thread*. There was STM used in other class so I used it in ExchangeSubscriber also to protect buffer of messages to send along with flag indicating if new Future should be started.

Now having buffer of messages to publish there is possibility to expose number of elements that ExchangeSubscriber requests in onSubscribe in constructor. Do you think it is useful? Another questions is: should active Subscription be canceled on basicPublish error?

mkiedys commented 8 years ago

Is this tested anywhere?

LGLO commented 8 years ago

It was tested but not sufficiently. Today I found out that publish was called after onComplete closed channel. Now I'm using "App" test with 3 streams: One simply publishes N messages to to Exchange E(with routing key for Q1). Second reads from Q1 and publishes to E again with other routing key for Q2. Third reads messages from Q2 and checks if order is preserved. See https://gist.github.com/LGLO/01d31aec6fa619ed6bd4 .

I'm going to test reactive-rabbit on machine with more cores next weeks, so lets wait.

mkiedys commented 8 years ago

This is tricky part to get right so good test is a must.

LGLO commented 8 years ago

I've checked it again. Found one more rare condition: when last basicPublish before onComplete thrown then channel would have been closed twice. I also prevent closing channel that was already closed not by application. There is additional test in other PR(that build failed on Travis, but not on added test, so I guess there is some interference) that ensures messages are not lost.

LGLO commented 8 years ago

It is protected from concurrent access by publishingThreadRunning. It's read or written only by publishing thread or by Future that waits until publishing thread is not running.

mkiedys commented 8 years ago

You may not have concurrent accesses but what about visibility? It is not guaranteed that non-volatile variable will be visible across two different CPU cores - explicit memory barrier is required in order to flush the cache. Volatile annotation will generate a memory barrier.

LGLO commented 8 years ago

You are right. I'd like not to mix methods of handling concurrency, thus I'm wrapping this flag as Ref[Boolean] with your name change suggestion.

mkiedys commented 8 years ago

Merged here https://github.com/ScalaConsultants/reactive-rabbit/commit/09072f7528fed0569b3bc1e38c635aa3b2c00d3e.