Closed ahaid closed 10 years ago
Hi, Can you please define more precisely what you been by your system being bottlenecked by the time it takes to publish messages ? Thanks
Sure. Here's a scenario. I have a consumer that takes a message off a queue. The message is a Json array with 1000 elements. I iterate through the elements and create a list of new messages which will be published to a different queue. The publisher is called for each message and instructs the channel actor to do the publish:
channel ! Publish("", message.messageType, message.messageBytes, Some(props))
I have a profile trace running on this code and notice it screams through the parsing of the JsonArray but while publishing each new message, it gets delayed waiting on the channel actor.
At the same time, other code in this application may be publishing messages too. They are all waiting for the channel actor to publish messages.
Does that help?
Sorry I don't get it. Sending a Publish message to an actor is asynchronous, what is the 'it' that gets delayed waiting on the channel actor ? How does it get delayed ? (sending a message to an actor is about 1000 times faster than publishing a message through the broker) Likewise, what does 'they are all waiting for the actor to publish messages' mean ? I'll try and work up a simple example tomorrow to better understand the problem but it seems that I am missing something here.
Here's a little more info to help you set it up for yourself or maybe you can spot the problem.
This code iterates the samples and publishes messages
val startPublishing = System.nanoTime
// generate messages for placement on the next queues
for (sample <- samples) {
// do the work
Publisher.publishMessage(NotifyAndPersist(sample))
}
NewRelic.addCustomParameter("PublishTime", (System.nanoTime - startPublishing) / 1000000 + "ms")
Here is a screenshot of the trace:
Here is the value of the custom attribute named "PublishTime" which was set in the first block of code above: PublishTime: 6426ms
This is the publisher:
object Publisher {
implicit val system = ActorSystem(MessageBrokerContext.akkaConfig.system)
val connFactory = new ConnectionFactory()
val servers = for (server <- MessageBrokerContext.rabbitConfig.hosts.split(",")) yield new Address(server, MessageBrokerContext.rabbitConfig.port)
// create an AMQP connection
connFactory.setUsername(MessageBrokerContext.rabbitConfig.user)
connFactory.setPassword(MessageBrokerContext.rabbitConfig.password)
val conn = system.actorOf(ConnectionOwner.props(connFactory, 5 second, None, Option(servers)), "connection")
val channel = ConnectionOwner.createChildActor(conn, ChannelOwner.props() , name = Option("routedChannelOwner"))
Amqp.waitForConnection(system, channel).await()
@Trace
def publishMessage(message: QueueableMessage) = {
val props = new AMQP.BasicProperties("application/json", "utf-8", null, 2, 1, "", "", null, message.messageType, new DateTime(message.createdOn).toDate, message.messageType, MessageBrokerContext.rabbitConfig.user, "", "")
// Declare the exchange if it's specified in the message
if (message.exchangeType != None) {
channel ! DeclareExchange(ExchangeParameters(message.exchangeType.get.toString, false, QueueUtils.getExchangeType(message.exchangeType.get), durable = true))
channel ! Publish(message.exchangeType.get.toString, message.messageType, message.messageBytes, Some(props))
}
else {
val exchangeArgs = Map("x-dead-letter-exchange" -> ("dlx-" + message.messageType), "x-dead-letter-routing-key" -> message.messageType)
channel ! DeclareQueue(QueueParameters(name = message.messageType, passive = false, durable = true, exclusive = false, autodelete = false, exchangeArgs))
channel ! Publish("", message.messageType, message.messageBytes, Some(props))
}
}
}
It still don'get it it: It seems that all publishMessage() does is send a couple of messages to an actor, so what you're measuring is the time it took to send messages to local actors, not the time it took to actually publish them (in a AMQP sense). You should be able to send 100s of 1000s of messages/s so there's something strange here, either a dispatcher pb or @Trace or NewRelic.addCustomParameter() is adding a lot of overhead. So unless I've missed something big, you spent more than 6s just sending messages which means you must have sent million of them.
I still don't understand how channel actors keep other parts of you code waiting, but if your problem is puiblishing throughput and the proposed solution is to create many of channel actors and hide them behind a router, I don't think it will improve anything. Channels are just multiplexed over the same amqp connection and the actor overhead should be minimal. What is the throughput that you measure on the broker (in messages/s and kb/s) ?
I've written a few quick and dirty tests to show what I mean: https://gist.github.com/sstone/1a5a9b404983a31fc18d What we get is: test 1 (just send messages to a channel actor without waiting for a reply): sending 10,000 messages takes 40 ms test 2: sending 10000 messages takes 22 ms, publishing them takes 860 ms test 3: instead of 1 channel actor we now use 100 behind a router. We get the same results as in test 2
Does that make sense ? Thanks
Well this is embarrassing. You are absolutely right. I ran your tests then ran them again with a bit more of my code in place. The problem was in the serialization of the class I was trying to put on the queue.
This line here:
Publisher.publishMessage(NotifyAndPersist(sample))
The NotifyAndPersist class has a self-serialization trait that is using the Jackson Json library to turn itself into a byte array.
When I try to publish 10000 of them it takes 1000ms because if the time it needs to serialize the object. The publishing is obviously screaming fast.
Thank you for your diligence in helping me! I am sheepishly closing this case.
No problem, thanks!
My system is being bottlenecked by the time it takes to publish messages. I'm putting millions of messages through of varying sizes and they are being stalled while waiting for the channel actor to publish previous messages. I would like the producer actor to have access to multiple channel actors to prevent this.
The actor system supports creation of actors "withRouter" which allows multiple children to be created and balances the load between them - like this:
context.actorOf(props.withRouter(FromConfig))
The config would look something like this:
I forked your repo and am trying to set this up inline like this:
ConnectionOwner.createChildActor(conn, ChannelOwner.props().withRouter(RoundRobinRouter(nrOfInstances = 10)), name = Some("someChanel"))
But I'm running into a problem. The 'connect message is being sent from the 10 channel actors but no connection is ever made.
I noticed you have a test that creates multiple channel actors explicitly instead of using "withRouter" so you may have already found a way around this problem. If so, how do I hook my Producer up to the multiple channel actors? If not, can you help me figure out a way to get the round-robin effect working?
Here is my repo with the code changes I've made and a new test in ChannelOwnerSpec that fails to connect. https://github.com/ahaid/amqp-client/commit/4e1afd4762b9a838e31c1369c5ac47e8264ee61c Can you help?