krasserm / akka-persistence-kafka

A replicated Akka Persistence journal backed by Apache Kafka
Apache License 2.0
201 stars 59 forks source link

Transactional producers #36

Open olivierdeckers opened 7 years ago

olivierdeckers commented 7 years ago

This is an early version with kafka 0.11 support and transactional producers implemented, as described in #20. I would welcome some early feedback.

While this version works and passes the tests, I think some additional work should still be done:

krasserm commented 7 years ago

@olivierdeckers thanks a lot for your PR. At a first glance, your proposal is a good start I think. I don't have the bandwidth at the moment to make a more detailed review, sorry for that, but it would be great if you could drive that project into a direction which you think is most useful. If you prefer making contributions to this repo, I'm happy to give you write access so that you can push directly, otherwise, I think it makes more sense to work on a separate fork. Please let me know.

olivierdeckers commented 7 years ago

Great! I think it would be best if I continue work on this repository.

giena commented 7 years ago

Hi, I worked on it on our own fork at https://github.com/worldline-messaging/akka-persistence-kafka/

olivierdeckers commented 7 years ago

Hi Giena,

I see that you have already done some of the things I mentioned above in your repository. Would you be willing to contribute some smaller pull requests for these things once this branch is merged?

giena commented 7 years ago

Hi Olivier,

Here at worldline-messaging we work on that fork for a long time ago. We have done some evolutions (new akka persistence interface API of akka 2.4, then akka 2.5, updated dependencies) but we didn't merge because we wait for some solution on the deprecated old producer. It seems that the new transactional producer do the job. And i'm sure our version is near to be ready. Did you test it? I forgot to mention that I have deleted the embedded server classes, because kafka project has already its own classes.

I took a look at your commits, Do not forget that the send method is asynchronous.

olivierdeckers commented 7 years ago

Hi Giena,

I checked out your code, but one of the tests fails on my machine: (org.apache.kafka.common.errors.RecordTooLargeException). This is probably due to the kafka configuration of the testserver not accepting large enough messages.

I went through your commit, and I have some questions:

  1. I see you still have the StickyPartitioner class, but it doesn't seem to be used anymore. Why did you remove it?
  2. I noticed you also changed the orphan snapshot offset from 4 to 7, because the offsets are not continuous. Do you know why this is?
  3. The asynchronous send calls in this PR work because the commitTransaction call that follows them flushes the messages in the buffer and is synchronous. Your version doesn't block the thread while the messages are being sent to kafka, which will clearly improve the throughput. But what happens if another set of events is persisted while the messages are still being sent? Won't it initialize a new transaction and make the previous transaction fail with a ProducerFencedException? Writing a batch of events with differing persistenceIds will also start multiple transactions on the same producer in parallel, which I as far as I can see will crash the producer.