peter-lawrey / Java-Chronicle

Java Indexed Record Chronicle
1.22k stars 193 forks source link

Idea: Work Queue Semantics #17

Closed kutchar closed 11 years ago

kutchar commented 11 years ago

Hi Peter,

This is a great project. I was wondering if you have any plans for adding "Work Queue" semantics to this, a la beanstalkd and libkestrel. I've been looking for a robust in-process work queue for Java for a while now and I couldn't find one. The former is a C daemon and the latter is a semi-abandoned Scala library.

-- Drew

peter-lawrey commented 11 years ago

The Chronicle already acts as queue but is very low level. esp in terms of serialization to get maximum performance. Can you provide an example of what you mean and I will see what I can do to make Chronicle more friendly.

If you just need a thread to thread high performance queue I suggest looking at the http://lmax-exchange.github.com/disruptor/ If you don't need durability this is much faster, more flexible and easier to use as a queue.

kutchar commented 11 years ago

beanstalkd has a great documentation but TL;DR it allows you to have producers that put messages into the queue and a consumer can tentatively reserve a message and if it fails processing the message, it becomes available for another consumer to pick up. I haven't really dug into the Chronicle code but I think this could be done by having multiple durable Chronicle queues.

Work queues are great when you want to have async processing but I'm not aware of a durable/persisted one for Java.

peter-lawrey commented 11 years ago

There may be a different assumptions in the design of these libraries. Having multiple consumers which do the same thing is fairly traditional and can help support higher throughputs. Chronicle is designed with the assumption that multiple consumers which do the same thing won't help you because

Peter.

peter-lawrey commented 11 years ago

From a few benchmarks I have seen for beanstalkd it appears it can handle 3-10K messages per second for one consumer and possibly 30 - 100K messages per second with multiple producers and consumers, I couldn't find examples which discuss latency. I couldn't determine if this included persistence.

Chronicle is designed to handle over one million durable messages per second with a single producer and consumer. with an end to end latency of around one micro-second inter-process on the same machine. The TCP connection can handle similar throughputs but with additional latency introduced by TCP ~40 micro-seconds depending on the hardware used.

peter-lawrey commented 11 years ago

libkestrel looks like a much more similar project in terms of design particularly the JournaledQueue

peter-lawrey commented 11 years ago

BTW: I am adding support for arbitrary Java objects and wrappers so that they behave like Lists, queue, maps etc. The performance of Java Serialization is the biggest challenge, even a BigDecimal takes 80 micro-second to Serialize/deserialize. :P

I can see that beanstalkd offers lots of functionality to play with the queues. I would be interested in what you believe would be immediately useful. (Apart from the try again if we get a random errors which I feel is flawed thinking ;)

kutchar commented 11 years ago

Thanks for the clarifications. I'm not too clear about how to use Chronicle just yet, but I'm very intrigued since I've been really enjoying your writings on Vanilla Java. Are there any docs on how to use Chronicle?

I don't think retry policy is necessarily flawed since specially in distributed systems you might be in a situation where different messages are supposed to be delivered to different systems, and if one of those systems is down, you want to use some sort of back-off strategy instead of retrying non-stop or throwing the message away. I would like to hear your thoughts on this.

As far as features, here's my prioritization:

  1. Crash safety. If the JVM crashes, you can restart and pickup processing messages where you left off.
  2. The disk data-structure should be manageable. Not sure how Chronicle manages the disk structures, but I've seen queues that allocate files which never get deleted.
  3. Be memory efficient. Keep a good ratio between items on disk vs in memory.
  4. Ability to return an event to the head of the queue
  5. Ability to delay an event
  6. I think it's good design to keep the events opaque ByteBuffers or byte[] and have the client decide how it wants to serialize/deserialze them.

Now the following are semi requirements that can potentially be implemented on the client side.

  1. Have a failure count on messages (how many times a message was retried and failed.)
  2. Messages can have a TTL

BTW, what's the maximum size an event can practically be?

peter-lawrey commented 11 years ago

What I have is very low level, how to use the code examples. What is sorely lacking is high level design documentation. Your questions have help clarify what very reasonable misconceptions one might have about it based on how other products work.

Chronicle follows a guaranteed delivery model. The consumer is responsible for retrying so provided the publisher is running it will receive every message. It doesn't matter how long either are down provided the chronicle logs haven't been rolled. (As this is not done automatically, it should be a surprise which this happens ;)

Crash safety. If the JVM crashes, you can restart and pickup processing messages where you left off.

You have crash safety for complete messages. If you crash in the middle of writing a message, that message is lost. No more than one partial message can be lost this way. A message should take well under a micro-second to write so the window of opportunity for failure in this way is small.

The disk data-structure should be manageable. Not sure how Chronicle manages the disk structures, but I've seen queues that allocate files which never get deleted.

This is true. They are never deleted by Chronicle. They can be rotated by closing the chronicle and opening a new one, with a new name. It is assumed you would do them nightly or weekly depending on your requirements. You might compress them and keep them for an extended period of time, possibly years. This is where the size of your disk might be important. e.g. say you average 10,000 messages per second over a day with a size of 100 bytes each. This means you need at least 9 GB of free disk space. If you need to keep a week at a time you need about 65 GB of free disk space.

Be memory efficient. Keep a good ratio between items on disk vs in memory

This is handled transparently by the OS and depends on how much free memory you have. It is not something you need to configure and all the performance results given make this assumption.

Ability to return an event to the head of the queue

As events are never removed from the queue so you should never need to do this. However you can add an event to the queue again.

Ability to delay an event

In a system were minimal latency is critical, I don't know why you would want to delay delivery of an event. I would have understand this usecase better to say how it could be supported.

I think it's good design to keep the events opaque ByteBuffers or byte[] and have the client decide how it wants to serialize/deserialze them.

I agree here. Making the serialization generic is much easier to develop to and I am adding support in version 1.7 for this. It is much slower (up to 100x slower), but it think it a price worth paying given you always have the option of using the more efficient serialization if you really need it.

Have a failure count on messages (how many times a message was retried and failed.)

This falls in the category of randomly failing message processing for me. I don't understand why you would want system which is so unreliable and I believe it's entirely avoidable. (I could be wrong of course)

Messages can have a TTL

They can have a time to live but it's not supported directly. i.e. you can add a TTL field which you consumer can honour or ignore ;)

BTW just adding a timing and checking it on the consumer can slow down performance by 25%, as it is a non-trivial function and is a system call on many OSes. Note: Java Chronicle can do it's entire end-to-end process with inputs and outputs in a real application without less than one system call per message on average.

Thank you for this conversation. It gives me many ideas/assumptions I have made in my documentation.

kutchar commented 11 years ago

You're very welcome. Looking forward to seeing this project's progress.

peter-lawrey commented 11 years ago

Closing for now. This is a good idea, some analysis would need to be done to extract specific requirements which will suit the assumptions Chronicle makes.