Open thedrow opened 8 years ago
Makes sense to me. Once I implement Redis cluster support, I'm going to look into supporting Kafka.
Kafka is a little heavy to me. we use https://nats.io/ inside the api and protocol is very simple
NATS looks interesting too. I'm broadening the scope of this ticket for other similar kinds of suggestions.
@notedit It really depends on your use case. Both Redis and NATS don't guarantee delivery.
@thedrow : while Redis pub/sub does not guarantee delivery, Nchan's use of Redis does provide a delivery guarantee, because the published messages are stored in memory and published via pub/sub, and the store-and-publish operation itself is atomic.
I don't know if NATS can do publish-and-store atomically, but if so, I suspect it would also be possible to guarantee message delivery with it as well.
NATS does not store
I'll mention that MongoDB capped collections make for excellent push message queues, as they're implemented behind-the-scenes as ring buffers and are highly optimized, being used as the basis for MongoDB's own replication oplog.
In light-weight testing using one for RPC I was able to process 1.9 million bidirectional (request→response) DRPC requests per second using one mongod, with two producers, and four consumers in Python… five years ago. It's silly efficient, generally insert-only, and natural order. Using a variety of "write concerns" lets one adjust the volatility of inserts, i.e. journal-confirmed or not, and the reads, given an allowance for replication lag, can be distributed across multiple nodes. Here are some slides and sample code from a presentation I gave on the approach to DRPC, but the listening process is basically the same for other uses. (This use allowed me to replace rabbitmq + zeromq—yes, both—on one project with just MongoDB.)
@amcgregor : That's pretty interesting. Can I also atomically update metadata while publishing a message? If so, this might make it a top contender for Next Storage Engine to implement.
edit:
You cannot delete documents from a capped collection. To remove all documents from a collection, use the drop() method to drop the collection and recreate the capped collection.
That complicates things a bit because changing the buffer size is an important (and advanced) functionality of Nchan. The main use case here is publishing snapshots (queue size 1, erases all previous messages) followed by deltas (large queue size) to a given channel.
@slact The document (effectively "JSON object", though supporting more types and better integer accuracy) you insert has no particular schema unless one is explicitly enforced through the use of document validation. You are free to add whatever data or metadata you wish. The principal limitation is that when querying in a "stream inserts to me" (tailable cursor) way you can not alter the sorting order (it must be $natural
, the default) though, and this is part of the awesomeness, you can filter. Adding indexes is technically possible, but not recommended, as each index decreases write performance, and they are only used when querying in a non-tailing way. The suggestion on the documentation page for tailable cursors is highly effective against the provided-by-default index of _id
, which are range-queryable.
You can not update the document in a way that would require moving of the record, i.e. having it grow. You can atomically update any field that doesn't grow in size, such as booleans and integers, as well as alter dynamically sized structures (strings, arrays, embedded documents, etc.) in ways that preserve the size or reduce it, such as $pop
or $unset
or assignment ($set
) of a smaller string than the original. (It isn't uncommon to have "padding values" that are $unset
on first update to keep headroom in the records.)
Atomic operations on boolean flags in capped collections, i.e. to update a delivered
flag, are great and quite useful for locking. For locking scenarios you can simulate (atomically) compare-and-swap through the careful use of filtered updates and checking of the modifiedCount
return value to gauge success.
So if I understand you correctly, the way to achieve atomicity is through locks. By 'atomicity' here I mean atomic conditional updates to multiple documents. For example, here's the message publishing script that runs in Redis. The nice thing here is that the script is an atomic operation by default, as Redis is strictly single-threaded.
The vast majority of the code in that script does not seem to be necessary when using MongoDB. Nchan's situation is very similar to the RPC one. I mentioned locking only as a typical example use of compare-and-swap. While possible, it's best to avoid situations that might require that, and the operation is way more expressive than most people think at first glance. (Two-phase commits are a thing, too, but still.) I might recommend having:
nUpserted
vs. nModified
return values to detect creation. Or just an update, treating a nModified
of zero as indication the channel does not exist. $push
and $pull
subscribers.Now there are a few approaches; one might be:
Or:
The former case is most efficient in terms of potentially wasted disk space through capped collection over-allocation, by only having one shared among channels, but requires a TTL to maintain the message store sanely. It also allows potentially useful cross-channel analysis. The latter case does not require a TTL index and instead allows one to limit both the on-disk size and "message backlog size" / logical document count. The single capped collection case is simpler in terms of listening, the latter is slightly more flexible and naturally obscures messages for channels that listeners shouldn't hear about. (You can include such details in the tailing query in the single-capped collection case, though.)
You can also implement a TTL in the multi-capped-collection case through inclusion of the expiry time in the capped collection document inserted for the message and subsequent inclusion of a date range filter in the listener's tailing query, preventing stale records from appearing when catching up.
The tracking collection might have documents that look like:
{
channel: "opaque-channel-identifier",
subscribers: []
}
No need to include a "last message" time, as that can be queried from the other collections. A message might look like:
{
channel: "opaque-channel-identifier",
message: "opaque message",
mime_type: "text/plain",
expires: ISODateTime(…)
}
A creation timestamp isn't needed as it's included in the ObjectId
the client driver adds to the document as _id
if an _id
is missing when inserting. ObjectId
is a very powerful unique record identification token/tag, hitting collision only after ~16 million inserts per second from a single process.
Very interesting, thanks for the detailed writeup. The next step for me would be to familiarize myself with the C client lib and its bottlenecks -- currently the Redis throughput is capped by hiredis (the C client) copying the entire command into its internal memory buffer first. I wonder how the MongoDB C lib handles this, and how easy it is to plug it into nginx's custom event loop.
I'd like to 2nd the MongoDB request. MongoDB as a store should be a very good solution. Capped collections might be faster, however, they have limitations. The ability to independently expire or directly delete documents is important I think and is not supported in Capped. So, 2 different options would be ideal (capped or regular).
Postgres also has a subscription/notification mechanism (https://www.postgresql.org/docs/9.6/static/sql-notify.html), it would be cool to be able to use it with nchan.
How about RabbitMQ and/or Apache Pulsar? Aside from the heavy nature of them compared to Redis, any other obvious downsides?
The ability to independently expire or directly delete documents is important I think and is not supported in Capped.
Not entirely true. The actual, true limitation on capped collections is that any modification to a record must not grow that record. In this way you can prepare by inserting a padding field (say 4KB of padding), which on every update operation is $unset
as an aside to the actual update operation, leaving explicit headroom for growth.
Alternatively, there are many possible changes (e.g. toggling a boolean, modifying an integer) that would require no growth, and additionally would be atomic. In this way, for example, I have implemented an "Expires" trait that both involves a TTL index for auto-deletion, but also explicit validation on record load, since that TTL cleanup might not always a) be complete, b) catch everything on every run. In this exact way, "deleted" messages can be ignored by Nchan (excluded from the capped collection tailing query filter) and checked/enforced for loaded records.
We can use librdkafka to publish and subscribe to messages in nchan.