Open EmielBruijntjes opened 11 years ago
Short response: I've actually been working on creating a library with an interface that addresses most of your points you bring up. I also appreciate your thoughts on what kind of public API would be useful, I have been soliciting input on what would be useful from an API standpoint on the rabbitmq-discuss list.
Responses to some of your points above:
So I'd like to criticize a little, and have a suggestion for a redesign that would make the library great to use.
The criticism is, unfortunately, deserved. The public API for rabbitmq-c is a mixture of high and low-level APIs, insane memory management, confusing error handling, and doesn't provide any kind of good interface for dealing with the async bits of AMQP.
If we now publish a message to the broker (using the C++ library), the library internally blocks until an answer comes back. This is not how we'd like our software to behave. The situation is even worse when we consume messages, because then we need to pass a timeout to make the library block by design. This is a big no-no either as it wastes CPU cycles.
SimpleAmqpClient was designed to work around the fact rabbitmq-c doesn't deal with the async portions of AMQP well (or at all depending on how you look at it). It's use case was for RPC style communication, so SimpleAmqpClient exposed a sync API to things that are async when you look at the AMQ Protocol. This give a reasonable error-handling scenario, but performance suffers as a result, and using the library in a pub-sub scenario where you may have many consumers is far from optimal.
I don't know if it is allowed to run "select()" calls from outside the library on the socket to find out if a socket is readable, but since the library now also supports secure connections, I think that it might not be possible to handle that. A secure connection could be in a state that it internally needs to send out data to the remote peer, while we're only monitoring it for readability.
The use of the socket fd outside of rabbitmq-c has always been 'at your own peril'. In v0.3.0 you could get away with it. In v0.4.0 the picture gets a little more complicated: if you're careful and have an understanding of how OpenSSL works, its possible to use select()
with the socket fd, if you enabled heartbeats, then the picture got even more complicated.
I'm very happy to hear that you share the same ideas. Do you also have an idea when such an improved library will see the light? Are we speaking about weeks, months or years? In fact, we even consider forking the project because we need it so much. But forking is about the last thing we want to do, a shared library that is used and tested by many others would be much better than one more fork. So if there is any way how we can contribute to this project and speed up the development, please let me know.
What is your opinion on the suggested architecture of the library that was proposed in my previous message - with a bottom layer that only takes care of parsing and generating amqp frames - so that higher layers can use that bottom layer to speak to a RabbitMQ server using TCP, SSL or any other protocol. In fact, we've already created an interface for the parsing side of it:
#include <sys/types.h>
#include <stdint.h>
/**
* Handle for the parsing engine
*/
typedef struct _amqp_parse_t amqp_parse_t;
// frame types
#define AMQP_FRAME_METHOD 1
#define AMQP_FRAME_HEADER 2
#define AMQP_FRAME_CONTENT 3
#define AMQP_FRAME_HEARTBEAT 4
/**
* An AMQP field table entry (a single record)
*
* @param keysize length of the key
* @param key name of the record
* @param type record type
* @param data record data (optional, depending on record type)
* @param next pointer to next record
*/
typedef struct _amqp_table_entry_t {
uint8_t keysize; // length of the key
const char *key; // name of the key
uint8_t type; // record type
const char *data; // record data
} amqp_table_entry_t;
/**
* An AMQP field table
*
* @param count number of items in the table
* @param first pointer to first entry
* @param last pointer to last entry
*/
typedef struct _amqp_table_t {
uint16_t count; // number of items in the list
amqp_table_entry_t *first; // pointer to first entry
amqp_tabel_entry_t *last; // pointer to last entry
} amqp_table_t;
/**
* A method frame payload
*
* @param class the class id, compare to AMQP_CLASS_*
* @param method the method id, compare to AMQP_METHOD_*
*/
typedef struct _amqp_method_t {
uint8_t class; // class id
uint8_t method; // method id
amqp_table_t arguments; // method arguments
} amqp_method_t;
/**
* An AMQP frame
*
* @param type the message type, compare to the AMQP_FRAME_* constants
* @param channel the channel id the frame belongs to
* @param method only relevant if type == AMQP_FRAME_METHOD
*/
typedef struct _amqp_frame_t {
uint8_t type; // message type
uint16_t channel; // channel the frame belongs to
union {
amqp_method_t method; // the frame method, only if type == AMQP_FRAME_METHOD
};
} amqp_frame_t;
/**
* Callback function that should be implemented by the calling application
*
* The frame should be deallocated by calling amqp_frame_destroy() after you've
* finished processing it.
*
* @param frame The received frame
* @param cookie Pointer to cookie data
*/
typedef void(*amqp_parse_callback_t)(amqp_frame_t *frame, void *cookie);
/**
* Initialize the AMQP parse engine
*
* Before you can start parsing data that comes in over an AMQP socket, you
* first need to initialize the parse engine. You can do so by calling this
* 'amqp_parse_init()' function. You need to pass in a callback function that
* will be called for each frame that is received.
*
* You can also supply an additional cookie pointer. This is a memory address
* or value that is completely ignored by the library, but that will be passed
* to the callback function and that can be used to store your own data.
*
* The parse handle that is returned, can be used in subsequent calls to
* amqp_parse().
*
* The returned handle should be deallocated with amqp_parse_destroy().
*
* @param callback
* @param cookie
* @return handle
*/
amqp_parse_t *amqp_parse_init(amqp_parse_callback_t *callback, void *cookie);
/**
* Function to parse a buffer of AMQP data into AMQP frames
*
* You need to provide the parse state handle that was originally created with
* the amqp_parse_init() function. The second and third parameters point to
* a buffer that is parsed. This method returns the number of bytes from the
* buffer that have been processed. For the next call, you need to pass the
* buffer from the position where it was left of.
*
* For every frame that was detected in the input, the callback method that was
* registered with the amqp_parse_init() function will be called.
*
* This method returns -1 if the buffer could not be parsed, or held invalid
* data. In that situation, it is best to break the connection, as it should
* not occur with regular AMQP communications.
*
* @param handle Parse state handle
* @param buffer Pointer to the buffer
* @param size Size of the buffer
* @return Number of bytes processed
*/
ssize_t amqp_parse(amqp_parse_t *handle, void *buffer, ssize_t size);
/**
* Destroy any resources allocated for the parse handle.
*
* @param handle Handle to be destroyed
*/
void amqp_parse_destroy(amqp_parse_t *handle);
Btw, do you have a wish list on amazon.com?
Do you also have an idea when such an improved library will see the light? Are we speaking about weeks, months or years? In fact, we even consider forking the project because we need it so much.
Weeks, a lot of the 'difficult' problems I've either solved when working on rabbitmq-c & SimpleAmqpClient, or are going to be farmed out to external libraries such as libuv. That said as with any software project - its hard to give a completely accurate estimate as to "when it'll be done". I do recognize the pressing need for a high-quality native-code amqp library, so I am giving it some priority.
So if there is any way how we can contribute to this project and speed up the development, please let me know.
Once I get something usable - I'm going to need someone to test it, and give some feedback on the public API.
What is your opinion on the suggested architecture of the library that was proposed in my previous message - with a bottom layer that only takes care of parsing and generating amqp frames - so that higher layers can use that bottom layer to speak to a RabbitMQ server using TCP, SSL or any other protocol.
I think we're mostly on the same page with way the library should be put together. My current thoughts on how things should be layered:
Btw, do you have a wish list on amazon.com?
I don't currently. Should I put one together?
To be honest: your response worries me.
I really hope you're not going to make the mistake of relying on libuv (or any other library) for the event loop - especially not in the lower layers of the lib. There are many different event loop libraries out there that are also popular: libevent and libev to name only two. Not even to mention all the applications that have implemented their own select() or poll() loops. It should be easy to integrate the AMQP lib with any of these event loop mechanisms. You wrote "it would be nice to allow others to use their own favorite event loop." - I would suggest to change this to "it is essential that the library can be used with any thinkable event loop".
Further more, I hope that you're not going to start up threads and manage thread pools. I see no point in doing this. Processing and sending AMQP data will probably not be the most CPU expensive task in applications that use this lib, and the price for the extra threads and possible mutex calls is not necessary at all. At the current moment, we are forced to use threads for AMQP communication to work around the annoying blocking I/O calls in the amqp lib, but we hate doing this. We'd like to use AMQP without starting extra threads.
Have you seen the c-ares library? It's a library for async DNS lookups, and it uses a callback function that gets called when the library needs to check a socket for readability or writability. A similar solution would be perfect for AMQP. The c-ares lib can handle many async DNS lookup at the same time, without having its own event loop, and without starting any threads.
You mentioned that the lower layers of the library will not have a public API. However, this seems to be the only interesting part of the architecture, because it's the only part that does not create threads and that does not rely on specific event loop mechanisms. If there is one element of the library that I'd like to have a public, well designed and well documented API, it is this bottom layer.
Yes, of course you need a wish list, I'm happy to buy you a book or two if you take away a lot of work for us.
I think I understand your trepidation in regard to what you think I'm planning on doing with regard to this library (this library is going to be HEAVY!). Let me expand on a few points to help clarify and justify what I'm planning on doing:
There is a need for a native code (callable from C/C++) full-featured AMQP client library. For better or worse the 'reference client' implementations (the .NET and Java clients linked above) provide an API that is mostly synchronous with async callbacks/continuations the various async bits. While AMQP doesn't specify an API, the API of the reference implementations has become a somewhat de-facto standard, and thus my desire for the 'high-level' implementation to follow this API. This API favors correctness and ease of use over raw performance. I'd also like this interface to run on a variety of platforms, thus libuv (which is known to work well on: Linux, Mac, *BSD, Win32, iOS, Android, even WP8, and probably others) is selected as a default choice.
Now I realize that this API does not serve everyone's needs, thus the need for the second lower level API that revolves around an event loop
Have you seen the c-ares library? It's a library for async DNS lookups, and it uses a callback function that gets called when the library needs to check a socket for readability or writability. A similar solution would be perfect for AMQP.
I have seen this sort of thing before, unfortunately AMQP includes a heartbeat feature intended to quickly detect whether the underlying TCP connection has been closed. So doing nothing the library would need to be signaled on a third event: a timeout. While this is not necessarily a huge issue it puts a much greater burden on users of the library to maintain this timeout correctly especially if they are dealing with multiple timeouts (this is deceptively tricky to get right). The other issue that quickly crops up: is while this is fairly well supported on Linux, problems quickly crop up supporting Darwin (kqueue, poll and select() all have issues if you start including fds that are of the non-socket variety), and then there's Win32, which doesn't support anything but network sockets in their select() implementation. Win32 does include an an eventing interface that works with networking and other event-based signals, but its a completely different API.
Long story short: yes that implementation would work, but it would be non-trivial for users to get right.
Further more, I hope that you're not going to start up threads and manage thread pools. I see no point in doing this. Processing and sending AMQP data will probably not be the most CPU expensive task in applications that use this lib, and the price for the extra threads and possible mutex calls is not necessary at all. At the current moment, we are forced to use threads for AMQP communication to work around the annoying blocking I/O calls in the amqp lib, but we hate doing this. We'd like to use AMQP without starting extra threads.
To make the high-level API work (as described above) we end up running one thread per-connection object in an event loop. The reason behind the thread-pool is to allow callbacks such as message delivery to directly process their payload and not worry about hanging up the event-loop in the case the processing of the message takes a long time (again this is experience noticing that developers seem to ignore the "don't do expensive computations in the event loop" warnings). rabbitmq-c will not itself implement a thread-pool (that is way outside the scope of this library).
The event-loop API doesn't need to kick off any behind-the-scenes threads or fire things off to a threadpool. It'll be the responsibility of the users of the library to pump the event-loop.
I really hope you're not going to make the mistake of relying on libuv (or any other library) for the event loop - especially not in the lower layers of the lib. There are many different event loop libraries out there that are also popular: libevent and libev to name only two. Not even to mention all the applications that have implemented their own select() or poll() loops. It should be easy to integrate the AMQP lib with any of these event loop mechanisms. You wrote "it would be nice to allow others to use their own favorite event loop." - I would suggest to change this to "it is essential that the library can be used with any thinkable event loop".
In order to abstract away the event loop mechanism I need to start somewhere. Starting with libuv is a good place as its actively developed and works well on a lot of platforms. An abstraction of this will be created - though I need to start with something concrete in order to get there, so its likely it'll be something that'll be created on the second iteration of the library.
You mentioned that the lower layers of the library will not have a public API. However, this seems to be the only interesting part of the architecture, because it's the only part that does not create threads and that does not rely on specific event loop mechanisms. If there is one element of the library that I'd like to have a public, well designed and well documented API, it is this bottom layer.
I can understand your desire for a lower-level, and I think this is something that could happen. I do want to be careful with this though: part of what makes the API of rabbitmq-c a bit convoluted is that early in the development of the library they decision was made to expose nearly everything, which tied our hands when we came back to improve the library. I'd like to avoid leave the lower-layers somewhat fluid until we find something that works well and can be exposed to users.
Sorry Alan, I'm afraid that I do not follow you. You're saying that the library does not only have to deal with sockets, but also with timeouts to be capable of handling heartbeats. But the example c-ares library that I mentioned also uses timeouts, and it has a very simple API for it. A library can have socket callbacks and timeout callbacks and still offer a very simple API. The c-ares library is very much comparable to the rabbitmq library and does exactly this. It also deals with a certain network protocol (DNS vs AMQP), and it offers a simple API, it can handle many, many connections at the same time, deals with timeouts and it does not use threads or its own event loop. I really see no reason why this would be different for a RabbitMQ library, because in the end it also is a lib that does network IO.
Your other argument is that it would be problematic on systems like Windows and Darwin because the select/poll/etc event loop on these systems can only handle sockets and timeouts - and does not work (well) with other file descriptors. Why is this an issue in the first place? Sockets and timeouts are the only things that are needed - so the lib would be perfectly usable on such systems. But even if it really was an issue (and it isnt!) - it would still make no sense that unix software would have to pay the price for the wrong design of the Windows platform. But like I said, this is not an issue because as far as I know all systems are capable of dealing with sockets and timeouts in a select/poll loop.
What is worrying is that you wrote that the next rabbitmq library is going to be a heavy. I can only hope that you're going to change your mind on this. AMQP is just a network protocol, like many other protocols. The reason why we (and anyone) uses a C library is because C is the language of choice when performance matters. We choose for C because we want a low level API, with great performance and memory consumption. We need a lightweight library, with a simple API, that can be integrated in our own event loop - and we do not want to have it create its own threads, or be dependent on specific libraries like libuv.
Another argument of yours is that there are lousy developers who do CPU intensive processing in the event loops. To overcome this hypothetical situation - if I understand your claim correctly - the AMQP lib is going to make its own threads so that it will always be responsive, even if the main event loop is busy running a heavy algorithm. However, we have chosen for the C language because we want to get best performance from our software, and I do not want to pay the price for extra threads, because of the assumption that other programmers could be making a mess of their code.
You wrote that in the future, a quality low level API might be added. To be honest with you, this sounds like the software is being designed in the wrong order - as if one is building a house first and will worry about the foundation later on.
I hope you can reconsider your design choices.
Hi Alan and Emiel, let me add some things to this discussion. I propose that a C implementation of AMQP should meet the following criteria:
Thus, I have to concur with most of Emiel's points: please don't start threads, please don't commit yourself to a specific event loop (such as libuv). But, in the name of avoidance of unnecessary complexity, I'd like to make an even more radical proposal. Emiel mentioned the c-ares library which, indeed, has a very nice callback API. But, as Alan has correctly identified, handling the various timeouts is very tricky. So under the hood, the c-ares library has to maintain a timeout priority queue somewhere. Another library with a similar design is libcURL (no wonder, it's by the same author). I have a project which uses both c-ares and libcURL, asynchronously managed with libev, and I'm currently adding AMQP to the picture. So I might easily end up with four priority queues (the three network libraries plus libev) for timeouts when a single one would suffice. As you probably know, implementing a single priority queue which meets the high performance criterion when at the same time faced with external constraints such as real-time system or not, is an impossibility. So my suggestion is, for the network library designer, to scrap the priority queue, and the onerous multiplexer interface. Keep things simple for yourself and let the event loop handle all the timeouts, and all the callbacks. Yes, Alan, you are right, for users of simple interfaces such as select() and friends, this means they have to roll their own timeout thing. But this isn't really your problem. Simply let your users decide what implementation fits their specific need best.
One more vote to avoid threads and specific event loop.
Right now anyone who works with rabbitmq-c and event loops have to introduce one or more threads to interact with rabbitmq-c. This is very inconvenient, especially when one have to receive the message in thread, pass it to event loop, process, and then pass it back to thread for acknowledge. It could be much simpler if rabbitmq-c would support nonblocking model.
I think that library shouldn't care about users who don't understand how to work with nonblocking code, especially if other users have to pay performance price for such support.
@TheCount @cellscape Noted.
So what happened with this, nearly two years later? By quickly looking through the code, it would seem that the answer is nothing at all - it's all blocking calls and hardcoded to it's own event loop.
Am I correct?
Is there an alternative library with a more, errm... generous design?
I wrote a different AMQP library to overcome the blocking nature of this rabbitmq-c library. It is in C++ though. You may find it useful: https://github.com/CopernicaMarketingSoftware/AMQP-CPP.
Thanks Emiel, that looks really good actually. I was just reading the Asio integration examples as your reply arrived. Very good stuff, cheers!
On 6 August 2015 at 12:41, Emiel Bruijntjes notifications@github.com wrote:
I wrote a different AMQP library to overcome the blocking nature of this rabbitmq-c library. It is in C++ though. You may find it useful: https://github.com/CopernicaMarketingSoftware/AMQP-CPP.
— Reply to this email directly or view it on GitHub https://github.com/alanxz/rabbitmq-c/issues/138#issuecomment-128321319.
Radu-Adrian Popescu Mobile +31-(0)611 00 45 20
@rpopescu correct: rabbitmq-c does not support an event loop at this time. Emiel's AMQP-CPP is worth considering if it fits your needs.
Re-opening this as its still a valid feature-request.
Hi EmielBruijntjes, I need an async rabbitmq implementation in C, could very well use your C++ implementation, with some changes. I wanted to ask you if someone has already tried your C++ library with a C application??
Not that I know, but a simple wrapper around it should not be that much of a challenge.
Thanks Emiel for your prompt response. I am trying to understand the usage. I am using the TcpHandler. In the monitor function, i get the fd, which i am adding on my epoll (using epoll instead of select). However, what i didn't get is, how to receive on this fd. So, lets say this fd becomes readable, should i call size_t parse(char *buffer, size_t size)? Then, whats the use of channel.consume(name, AMQP::noack).onReceived(receiveMessageCallback)?
After spending some more time, correct me, if this is the flow On fd becoming readable, call connection->process, which in turn will call the callback registered with the channel via channel.consume??
It would be nice to have some examples around this?
@achandak123: you mind taking this discussion over to the AMQP-CPP issue tracker?
It'll clutter this bug less, and be more discoverable for those who want to use AMQP-CPP.
Sure, will do.
First of all, Alan, great to see that this library is so actively worked on, and that it gets better all the time. However, we still find it very hard to integrate in our software. So I'd like to criticize a little, and have a suggestion for a redesign that would make the library great to use.
We've been using the RabbitMQ library for our MailerQ application - mainly in combination with the SimpleAmqpClient C++ library, due to the fact that the previous version of the rabbitmq C library was hardly documented, while the C++ library was. For our application we need to publish and consume from and to many different exchanges and queues at the same time, and we do not want the application to block while it is busy doing DNS calls, busy connecting or busy sending or retrieving data from the broker.
I've read about AMQP that it should never be necessary to end up in a waiting state, because a client does not have to wait for an answer from the broker before it sends out a new request. It is possible to send out many requests over a connection, and process the answers whenever the socket becomes readable. At least, that's what I've read. The SimpleAmqpClient library however, does end up in waiting states all the time, so we've been looking at this C library instead. But I'm afraid that this rabbitmq C library sometimes does the same too, and that our client code needs to access all sort of data structures that should (in my opinion) stay internal in the library. Good examples of how to make a true async client are missing.
If we now publish a message to the broker (using the C++ library), the library internally blocks until an answer comes back. This is not how we'd like our software to behave. The situation is even worse when we consume messages, because then we need to pass a timeout to make the library block by design. This is a big no-no either as it wastes CPU cycles.
To overcome all these problems, we've chosen for a design in which we start up new threads to do the AMQP communication. We create a new thread for every queue that we want to consume from, and a thread for every exchange we'd like to publish to. This means that we set up many different threads and many TCP connections - a completely undesirable design if you realize the intention of AMQP, but sadly necessary to prevent blocking calls as good as we can (and the publish and ack calls are still not efficient).
I don't know if it is allowed to run "select()" calls from outside the library on the socket to find out if a socket is readable, but since the library now also supports secure connections, I think that it might not be possible to handle that. A secure connection could be in a state that it internally needs to send out data to the remote peer, while we're only monitoring it for readability.
The AMQP library also has a couple of public functions to do memory management. These are functions like 'amqp_maybe_release_buffers', 'amqp_maybe_release_buffers_on_channel', etcetera. In my opinion, such functions are too low level and shouldn't be part of the public API of a library. A user of the library should not have to deal with this sort of memory management. A simple library handle that is passed to every library call should be sufficient, and the library should then do its own memory management. This is the usual approach that you find in almost every library that was designed to communicate with servers.
Therefore, I would like to suggest a three-layered design to make this library great to use:
Most applications can use the top layer of the library. For such applications it is not a problem if a function blocks, and when it waits until the operation has been completed and a response from the broker has been received. This layer has functions like:
All synchronous functions above block until the operation has been completed, and the result of the operation is returned by the function. The amqp_consume() call might return an allocated message that should later be deallocated with a call to amqp_free() - but that's about the only memory management that is necessary from outside the library.
The next and more advanced layer would be the middle layer. This layer is intended for applications that want to use the library in an async fashion. They want to send instructions to the broker - but they never want the socket operations to block. Such applications have their own internal event loop, and they provide the AMQP library with a callback function that is called by the library to add filedescriptors or timeouts to the main event loop:
typedef void (amqp_socket_callback_t)(int fd, bool readable, bool writable, struct timeval tv);
A pointer to such a callback should be supplied to the AMQP library when it is initialized, and every time that the library is about to block on a socket operation, it calls this function to inform the calling application that it wants to register a filedescriptor to the main event loop, and that it should be checked for readability and/or writability and/or with a certain timeout (maybe this timeout parameter is not even necessary).
Further more, the library offers a function (amqp_async_process) that should be called from this main event loop to inform the library that the socket is once again readable or writable, and that it can be further processed. The async library has functions like these:
The amqp_async_process() function should be called every time the socket becomes readable or writable in the main event loop, and it handles the new situation that the library is in. For example, when the socket becomes readable it checks what sort of answer came in from the broker.
Because the calls are asynchronous, the amqp_async_process() method could be processing answers from the broker from instructions that were sent way earlier. It should somehow have a way to notify the original caller. That's why all amqpasync* functions have a callback function and a void pointer as their last two arguments, for example:
int amqp_async_publish(amqp_async_t , ..., amqp_publish_callback_t , void *);
These last two parameters are internally cached by amqp, and whenever amqp_process() function notices that a publish operation has succeeded or failed, the RabbitMQ library will call the appropriate callback function, and will supply the same void* parameter to the function, so that the library user can use this parameter to link the call to the callback to the earlier amqp_async_publish() call.
The top level, the one with the synchronous calls, could internally be completely built on top of this middle layer. Every synchronous call can be implemented as a number of calls to their underlying async counter parts.
The final layer would be the bottom layer of the library. This layer can be used by applications that like to set up their own TCP connection to a broker, and it offers functions to create packets that can be sent to a broker, and to parse data that was received from a broker. This layer already more or less exists in the AMQP library, with functions like 'amqp_handle_input'. However, the current functions take a connection_state, while the bottom layer should not have anything to do with the state of the connection - at best it has a parse_state* to remember that it is halfway parsing a packet. The bottom layer should have functions to create messages that can be sent over a connection:
All these functions allocate a structure that holds the data that should be sent to the broker for declaring an exchange, declaring a queue, et cetera.
The bottom layer also needs functions to parse incoming data:
The amqp_parse_init() function is called with a pointer to a callback function that will be called for every packet that is received, the amqp_parse_process() function is called to parse a buffer of data, and it will internally call the registered callback for every packet received, and the amqp_parse_destroy() function is finally used after you're done parsing to deallocate any resources taken up by the parsing process.
And needless to say, this middle layer is of course implemented by making calls to this bottom layer.
I think with the above design, building applications with the RabbitMQ library becomes much more easier, and it will also result in code that is much easier to maintain - both for the maintainers of the RabbitMQ library, as well as for users.
What do you think?