krasserm / akka-persistence-kafka

A replicated Akka Persistence journal backed by Apache Kafka
Apache License 2.0
201 stars 59 forks source link

Write messages #24

Closed giena closed 9 years ago

giena commented 9 years ago

The asyncWriteMessages is like this: def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = { val sends = messages.groupBy(.persistenceId).map { case (pid, msgs) => writerFor(pid).ask(messages)(writeTimeout) } Future.sequence(sends).map( => ()) }

I think it should be like this: def asyncWriteMessages(messages: Seq[PersistentRepr]): Future[Unit] = { val sends = messages.groupBy(.persistenceId).map { case (pid, msgs) => writerFor(pid).ask(msgs)(writeTimeout) } Future.sequence(sends).map( => ()) }

Do i make a mistake?

krasserm commented 9 years ago

Ouch, that's really a bug, good catch. It just didn't surface because a message batch sent by akka-persistence only contains messages with the same persistenceId. Do you want to create a pull request? Maybe along with a test that demonstrates that the bug is fixed?

giena commented 9 years ago

I've created a pull request: https://github.com/krasserm/akka-persistence-kafka/pull/25 But no test.